jpayne@68: # Copyright 2002 by Jeffrey Chang. jpayne@68: # All rights reserved. jpayne@68: # jpayne@68: # This file is part of the Biopython distribution and governed by your jpayne@68: # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". jpayne@68: # Please see the LICENSE file that should have been included as part of this jpayne@68: # package. jpayne@68: """A state-emitting MarkovModel. jpayne@68: jpayne@68: Note terminology similar to Manning and Schutze is used. jpayne@68: jpayne@68: jpayne@68: Functions: jpayne@68: train_bw Train a markov model using the Baum-Welch algorithm. jpayne@68: train_visible Train a visible markov model using MLE. jpayne@68: find_states Find the a state sequence that explains some observations. jpayne@68: jpayne@68: load Load a MarkovModel. jpayne@68: save Save a MarkovModel. jpayne@68: jpayne@68: Classes: jpayne@68: MarkovModel Holds the description of a markov model jpayne@68: """ jpayne@68: jpayne@68: import warnings jpayne@68: from Bio import BiopythonDeprecationWarning jpayne@68: jpayne@68: warnings.warn( jpayne@68: "The 'Bio.MarkovModel' module is deprecated and will be removed in a " jpayne@68: "future release of Biopython. Consider using the hmmlearn package instead.", jpayne@68: BiopythonDeprecationWarning, jpayne@68: ) jpayne@68: jpayne@68: jpayne@68: try: jpayne@68: import numpy as np jpayne@68: except ImportError: jpayne@68: from Bio import MissingPythonDependencyError jpayne@68: jpayne@68: raise MissingPythonDependencyError( jpayne@68: "Please install NumPy if you want to use Bio.MarkovModel. " jpayne@68: "See http://www.numpy.org/" jpayne@68: ) from None jpayne@68: jpayne@68: logaddexp = np.logaddexp jpayne@68: jpayne@68: jpayne@68: def itemindex(values): jpayne@68: """Return a dictionary of values with their sequence offset as keys.""" jpayne@68: d = {} jpayne@68: entries = enumerate(values[::-1]) jpayne@68: n = len(values) - 1 jpayne@68: for index, key in entries: jpayne@68: d[key] = n - index jpayne@68: return d jpayne@68: jpayne@68: jpayne@68: np.random.seed() jpayne@68: jpayne@68: VERY_SMALL_NUMBER = 1e-300 jpayne@68: LOG0 = np.log(VERY_SMALL_NUMBER) jpayne@68: jpayne@68: jpayne@68: class MarkovModel: jpayne@68: """Create a state-emitting MarkovModel object.""" jpayne@68: jpayne@68: def __init__( jpayne@68: self, states, alphabet, p_initial=None, p_transition=None, p_emission=None jpayne@68: ): jpayne@68: """Initialize the class.""" jpayne@68: self.states = states jpayne@68: self.alphabet = alphabet jpayne@68: self.p_initial = p_initial jpayne@68: self.p_transition = p_transition jpayne@68: self.p_emission = p_emission jpayne@68: jpayne@68: def __str__(self): jpayne@68: """Create a string representation of the MarkovModel object.""" jpayne@68: from io import StringIO jpayne@68: jpayne@68: handle = StringIO() jpayne@68: save(self, handle) jpayne@68: handle.seek(0) jpayne@68: return handle.read() jpayne@68: jpayne@68: jpayne@68: def _readline_and_check_start(handle, start): jpayne@68: """Read the first line and evaluate that begisn with the correct start (PRIVATE).""" jpayne@68: line = handle.readline() jpayne@68: if not line.startswith(start): jpayne@68: raise ValueError(f"I expected {start!r} but got {line!r}") jpayne@68: return line jpayne@68: jpayne@68: jpayne@68: def load(handle): jpayne@68: """Parse a file handle into a MarkovModel object.""" jpayne@68: # Load the states. jpayne@68: line = _readline_and_check_start(handle, "STATES:") jpayne@68: states = line.split()[1:] jpayne@68: jpayne@68: # Load the alphabet. jpayne@68: line = _readline_and_check_start(handle, "ALPHABET:") jpayne@68: alphabet = line.split()[1:] jpayne@68: jpayne@68: mm = MarkovModel(states, alphabet) jpayne@68: N, M = len(states), len(alphabet) jpayne@68: jpayne@68: # Load the initial probabilities. jpayne@68: mm.p_initial = np.zeros(N) jpayne@68: line = _readline_and_check_start(handle, "INITIAL:") jpayne@68: for i in range(len(states)): jpayne@68: line = _readline_and_check_start(handle, f" {states[i]}:") jpayne@68: mm.p_initial[i] = float(line.split()[-1]) jpayne@68: jpayne@68: # Load the transition. jpayne@68: mm.p_transition = np.zeros((N, N)) jpayne@68: line = _readline_and_check_start(handle, "TRANSITION:") jpayne@68: for i in range(len(states)): jpayne@68: line = _readline_and_check_start(handle, f" {states[i]}:") jpayne@68: mm.p_transition[i, :] = [float(v) for v in line.split()[1:]] jpayne@68: jpayne@68: # Load the emission. jpayne@68: mm.p_emission = np.zeros((N, M)) jpayne@68: line = _readline_and_check_start(handle, "EMISSION:") jpayne@68: for i in range(len(states)): jpayne@68: line = _readline_and_check_start(handle, f" {states[i]}:") jpayne@68: mm.p_emission[i, :] = [float(v) for v in line.split()[1:]] jpayne@68: jpayne@68: return mm jpayne@68: jpayne@68: jpayne@68: def save(mm, handle): jpayne@68: """Save MarkovModel object into handle.""" jpayne@68: # This will fail if there are spaces in the states or alphabet. jpayne@68: w = handle.write jpayne@68: w(f"STATES: {' '.join(mm.states)}\n") jpayne@68: w(f"ALPHABET: {' '.join(mm.alphabet)}\n") jpayne@68: w("INITIAL:\n") jpayne@68: for i in range(len(mm.p_initial)): jpayne@68: w(f" {mm.states[i]}: {mm.p_initial[i]:g}\n") jpayne@68: w("TRANSITION:\n") jpayne@68: for i in range(len(mm.p_transition)): jpayne@68: w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_transition[i])}\n") jpayne@68: w("EMISSION:\n") jpayne@68: for i in range(len(mm.p_emission)): jpayne@68: w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_emission[i])}\n") jpayne@68: jpayne@68: jpayne@68: # XXX allow them to specify starting points jpayne@68: def train_bw( jpayne@68: states, jpayne@68: alphabet, jpayne@68: training_data, jpayne@68: pseudo_initial=None, jpayne@68: pseudo_transition=None, jpayne@68: pseudo_emission=None, jpayne@68: update_fn=None, jpayne@68: ): jpayne@68: """Train a MarkovModel using the Baum-Welch algorithm. jpayne@68: jpayne@68: Train a MarkovModel using the Baum-Welch algorithm. states is a list jpayne@68: of strings that describe the names of each state. alphabet is a jpayne@68: list of objects that indicate the allowed outputs. training_data jpayne@68: is a list of observations. Each observation is a list of objects jpayne@68: from the alphabet. jpayne@68: jpayne@68: pseudo_initial, pseudo_transition, and pseudo_emission are jpayne@68: optional parameters that you can use to assign pseudo-counts to jpayne@68: different matrices. They should be matrices of the appropriate jpayne@68: size that contain numbers to add to each parameter matrix, before jpayne@68: normalization. jpayne@68: jpayne@68: update_fn is an optional callback that takes parameters jpayne@68: (iteration, log_likelihood). It is called once per iteration. jpayne@68: """ jpayne@68: N, M = len(states), len(alphabet) jpayne@68: if not training_data: jpayne@68: raise ValueError("No training data given.") jpayne@68: if pseudo_initial is not None: jpayne@68: pseudo_initial = np.asarray(pseudo_initial) jpayne@68: if pseudo_initial.shape != (N,): jpayne@68: raise ValueError("pseudo_initial not shape len(states)") jpayne@68: if pseudo_transition is not None: jpayne@68: pseudo_transition = np.asarray(pseudo_transition) jpayne@68: if pseudo_transition.shape != (N, N): jpayne@68: raise ValueError("pseudo_transition not shape len(states) X len(states)") jpayne@68: if pseudo_emission is not None: jpayne@68: pseudo_emission = np.asarray(pseudo_emission) jpayne@68: if pseudo_emission.shape != (N, M): jpayne@68: raise ValueError("pseudo_emission not shape len(states) X len(alphabet)") jpayne@68: jpayne@68: # Training data is given as a list of members of the alphabet. jpayne@68: # Replace those with indexes into the alphabet list for easier jpayne@68: # computation. jpayne@68: training_outputs = [] jpayne@68: indexes = itemindex(alphabet) jpayne@68: for outputs in training_data: jpayne@68: training_outputs.append([indexes[x] for x in outputs]) jpayne@68: jpayne@68: # Do some sanity checking on the outputs. jpayne@68: lengths = [len(x) for x in training_outputs] jpayne@68: if min(lengths) == 0: jpayne@68: raise ValueError("I got training data with outputs of length 0") jpayne@68: jpayne@68: # Do the training with baum welch. jpayne@68: x = _baum_welch( jpayne@68: N, jpayne@68: M, jpayne@68: training_outputs, jpayne@68: pseudo_initial=pseudo_initial, jpayne@68: pseudo_transition=pseudo_transition, jpayne@68: pseudo_emission=pseudo_emission, jpayne@68: update_fn=update_fn, jpayne@68: ) jpayne@68: p_initial, p_transition, p_emission = x jpayne@68: return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) jpayne@68: jpayne@68: jpayne@68: MAX_ITERATIONS = 1000 jpayne@68: jpayne@68: jpayne@68: def _baum_welch( jpayne@68: N, jpayne@68: M, jpayne@68: training_outputs, jpayne@68: p_initial=None, jpayne@68: p_transition=None, jpayne@68: p_emission=None, jpayne@68: pseudo_initial=None, jpayne@68: pseudo_transition=None, jpayne@68: pseudo_emission=None, jpayne@68: update_fn=None, jpayne@68: ): jpayne@68: """Implement the Baum-Welch algorithm to evaluate unknown parameters in the MarkovModel object (PRIVATE).""" jpayne@68: if p_initial is None: jpayne@68: p_initial = _random_norm(N) jpayne@68: else: jpayne@68: p_initial = _copy_and_check(p_initial, (N,)) jpayne@68: jpayne@68: if p_transition is None: jpayne@68: p_transition = _random_norm((N, N)) jpayne@68: else: jpayne@68: p_transition = _copy_and_check(p_transition, (N, N)) jpayne@68: if p_emission is None: jpayne@68: p_emission = _random_norm((N, M)) jpayne@68: else: jpayne@68: p_emission = _copy_and_check(p_emission, (N, M)) jpayne@68: jpayne@68: # Do all the calculations in log space to avoid underflows. jpayne@68: lp_initial = np.log(p_initial) jpayne@68: lp_transition = np.log(p_transition) jpayne@68: lp_emission = np.log(p_emission) jpayne@68: if pseudo_initial is not None: jpayne@68: lpseudo_initial = np.log(pseudo_initial) jpayne@68: else: jpayne@68: lpseudo_initial = None jpayne@68: if pseudo_transition is not None: jpayne@68: lpseudo_transition = np.log(pseudo_transition) jpayne@68: else: jpayne@68: lpseudo_transition = None jpayne@68: if pseudo_emission is not None: jpayne@68: lpseudo_emission = np.log(pseudo_emission) jpayne@68: else: jpayne@68: lpseudo_emission = None jpayne@68: jpayne@68: # Iterate through each sequence of output, updating the parameters jpayne@68: # to the HMM. Stop when the log likelihoods of the sequences jpayne@68: # stops varying. jpayne@68: prev_llik = None jpayne@68: for i in range(MAX_ITERATIONS): jpayne@68: llik = LOG0 jpayne@68: for outputs in training_outputs: jpayne@68: llik += _baum_welch_one( jpayne@68: N, jpayne@68: M, jpayne@68: outputs, jpayne@68: lp_initial, jpayne@68: lp_transition, jpayne@68: lp_emission, jpayne@68: lpseudo_initial, jpayne@68: lpseudo_transition, jpayne@68: lpseudo_emission, jpayne@68: ) jpayne@68: if update_fn is not None: jpayne@68: update_fn(i, llik) jpayne@68: if prev_llik is not None and np.fabs(prev_llik - llik) < 0.1: jpayne@68: break jpayne@68: prev_llik = llik jpayne@68: else: jpayne@68: raise RuntimeError("HMM did not converge in %d iterations" % MAX_ITERATIONS) jpayne@68: jpayne@68: # Return everything back in normal space. jpayne@68: return [np.exp(_) for _ in (lp_initial, lp_transition, lp_emission)] jpayne@68: jpayne@68: jpayne@68: def _baum_welch_one( jpayne@68: N, jpayne@68: M, jpayne@68: outputs, jpayne@68: lp_initial, jpayne@68: lp_transition, jpayne@68: lp_emission, jpayne@68: lpseudo_initial, jpayne@68: lpseudo_transition, jpayne@68: lpseudo_emission, jpayne@68: ): jpayne@68: """Execute one step for Baum-Welch algorithm (PRIVATE). jpayne@68: jpayne@68: Do one iteration of Baum-Welch based on a sequence of output. jpayne@68: Changes the value for lp_initial, lp_transition and lp_emission in place. jpayne@68: """ jpayne@68: T = len(outputs) jpayne@68: fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs) jpayne@68: bmat = _backward(N, T, lp_transition, lp_emission, outputs) jpayne@68: jpayne@68: # Calculate the probability of traversing each arc for any given jpayne@68: # transition. jpayne@68: lp_arc = np.zeros((N, N, T)) jpayne@68: for t in range(T): jpayne@68: k = outputs[t] jpayne@68: lp_traverse = np.zeros((N, N)) # P going over one arc. jpayne@68: for i in range(N): jpayne@68: for j in range(N): jpayne@68: # P(getting to this arc) jpayne@68: # P(making this transition) jpayne@68: # P(emitting this character) jpayne@68: # P(going to the end) jpayne@68: lp = ( jpayne@68: fmat[i][t] jpayne@68: + lp_transition[i][j] jpayne@68: + lp_emission[i][k] jpayne@68: + bmat[j][t + 1] jpayne@68: ) jpayne@68: lp_traverse[i][j] = lp jpayne@68: # Normalize the probability for this time step. jpayne@68: lp_arc[:, :, t] = lp_traverse - _logsum(lp_traverse) jpayne@68: jpayne@68: # Sum of all the transitions out of state i at time t. jpayne@68: lp_arcout_t = np.zeros((N, T)) jpayne@68: for t in range(T): jpayne@68: for i in range(N): jpayne@68: lp_arcout_t[i][t] = _logsum(lp_arc[i, :, t]) jpayne@68: jpayne@68: # Sum of all the transitions out of state i. jpayne@68: lp_arcout = np.zeros(N) jpayne@68: for i in range(N): jpayne@68: lp_arcout[i] = _logsum(lp_arcout_t[i, :]) jpayne@68: jpayne@68: # UPDATE P_INITIAL. jpayne@68: lp_initial = lp_arcout_t[:, 0] jpayne@68: if lpseudo_initial is not None: jpayne@68: lp_initial = _logvecadd(lp_initial, lpseudo_initial) jpayne@68: lp_initial = lp_initial - _logsum(lp_initial) jpayne@68: jpayne@68: # UPDATE P_TRANSITION. p_transition[i][j] is the sum of all the jpayne@68: # transitions from i to j, normalized by the sum of the jpayne@68: # transitions out of i. jpayne@68: for i in range(N): jpayne@68: for j in range(N): jpayne@68: lp_transition[i][j] = _logsum(lp_arc[i, j, :]) - lp_arcout[i] jpayne@68: if lpseudo_transition is not None: jpayne@68: lp_transition[i] = _logvecadd(lp_transition[i], lpseudo_transition) jpayne@68: lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i]) jpayne@68: jpayne@68: # UPDATE P_EMISSION. lp_emission[i][k] is the sum of all the jpayne@68: # transitions out of i when k is observed, divided by the sum of jpayne@68: # the transitions out of i. jpayne@68: for i in range(N): jpayne@68: ksum = np.zeros(M) + LOG0 # ksum[k] is the sum of all i with k. jpayne@68: for t in range(T): jpayne@68: k = outputs[t] jpayne@68: for j in range(N): jpayne@68: ksum[k] = logaddexp(ksum[k], lp_arc[i, j, t]) jpayne@68: ksum = ksum - _logsum(ksum) # Normalize jpayne@68: if lpseudo_emission is not None: jpayne@68: ksum = _logvecadd(ksum, lpseudo_emission[i]) jpayne@68: ksum = ksum - _logsum(ksum) # Renormalize jpayne@68: lp_emission[i, :] = ksum jpayne@68: jpayne@68: # Calculate the log likelihood of the output based on the forward jpayne@68: # matrix. Since the parameters of the HMM has changed, the log jpayne@68: # likelihoods are going to be a step behind, and we might be doing jpayne@68: # one extra iteration of training. The alternative is to rerun jpayne@68: # the _forward algorithm and calculate from the clean one, but jpayne@68: # that may be more expensive than overshooting the training by one jpayne@68: # step. jpayne@68: return _logsum(fmat[:, T]) jpayne@68: jpayne@68: jpayne@68: def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs): jpayne@68: """Implement forward algorithm (PRIVATE). jpayne@68: jpayne@68: Calculate a Nx(T+1) matrix, where the last column is the total jpayne@68: probability of the output. jpayne@68: """ jpayne@68: matrix = np.zeros((N, T + 1)) jpayne@68: jpayne@68: # Initialize the first column to be the initial values. jpayne@68: matrix[:, 0] = lp_initial jpayne@68: for t in range(1, T + 1): jpayne@68: k = outputs[t - 1] jpayne@68: for j in range(N): jpayne@68: # The probability of the state is the sum of the jpayne@68: # transitions from all the states from time t-1. jpayne@68: lprob = LOG0 jpayne@68: for i in range(N): jpayne@68: lp = matrix[i][t - 1] + lp_transition[i][j] + lp_emission[i][k] jpayne@68: lprob = logaddexp(lprob, lp) jpayne@68: matrix[j][t] = lprob jpayne@68: return matrix jpayne@68: jpayne@68: jpayne@68: def _backward(N, T, lp_transition, lp_emission, outputs): jpayne@68: """Implement backward algorithm (PRIVATE).""" jpayne@68: matrix = np.zeros((N, T + 1)) jpayne@68: for t in range(T - 1, -1, -1): jpayne@68: k = outputs[t] jpayne@68: for i in range(N): jpayne@68: # The probability of the state is the sum of the jpayne@68: # transitions from all the states from time t+1. jpayne@68: lprob = LOG0 jpayne@68: for j in range(N): jpayne@68: lp = matrix[j][t + 1] + lp_transition[i][j] + lp_emission[i][k] jpayne@68: lprob = logaddexp(lprob, lp) jpayne@68: matrix[i][t] = lprob jpayne@68: return matrix jpayne@68: jpayne@68: jpayne@68: def train_visible( jpayne@68: states, jpayne@68: alphabet, jpayne@68: training_data, jpayne@68: pseudo_initial=None, jpayne@68: pseudo_transition=None, jpayne@68: pseudo_emission=None, jpayne@68: ): jpayne@68: """Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters. jpayne@68: jpayne@68: Train a visible MarkovModel using maximum likelihoood estimates jpayne@68: for each of the parameters. states is a list of strings that jpayne@68: describe the names of each state. alphabet is a list of objects jpayne@68: that indicate the allowed outputs. training_data is a list of jpayne@68: (outputs, observed states) where outputs is a list of the emission jpayne@68: from the alphabet, and observed states is a list of states from jpayne@68: states. jpayne@68: jpayne@68: pseudo_initial, pseudo_transition, and pseudo_emission are jpayne@68: optional parameters that you can use to assign pseudo-counts to jpayne@68: different matrices. They should be matrices of the appropriate jpayne@68: size that contain numbers to add to each parameter matrix. jpayne@68: """ jpayne@68: N, M = len(states), len(alphabet) jpayne@68: if pseudo_initial is not None: jpayne@68: pseudo_initial = np.asarray(pseudo_initial) jpayne@68: if pseudo_initial.shape != (N,): jpayne@68: raise ValueError("pseudo_initial not shape len(states)") jpayne@68: if pseudo_transition is not None: jpayne@68: pseudo_transition = np.asarray(pseudo_transition) jpayne@68: if pseudo_transition.shape != (N, N): jpayne@68: raise ValueError("pseudo_transition not shape len(states) X len(states)") jpayne@68: if pseudo_emission is not None: jpayne@68: pseudo_emission = np.asarray(pseudo_emission) jpayne@68: if pseudo_emission.shape != (N, M): jpayne@68: raise ValueError("pseudo_emission not shape len(states) X len(alphabet)") jpayne@68: jpayne@68: # Training data is given as a list of members of the alphabet. jpayne@68: # Replace those with indexes into the alphabet list for easier jpayne@68: # computation. jpayne@68: training_states, training_outputs = [], [] jpayne@68: states_indexes = itemindex(states) jpayne@68: outputs_indexes = itemindex(alphabet) jpayne@68: for toutputs, tstates in training_data: jpayne@68: if len(tstates) != len(toutputs): jpayne@68: raise ValueError("states and outputs not aligned") jpayne@68: training_states.append([states_indexes[x] for x in tstates]) jpayne@68: training_outputs.append([outputs_indexes[x] for x in toutputs]) jpayne@68: jpayne@68: x = _mle( jpayne@68: N, jpayne@68: M, jpayne@68: training_outputs, jpayne@68: training_states, jpayne@68: pseudo_initial, jpayne@68: pseudo_transition, jpayne@68: pseudo_emission, jpayne@68: ) jpayne@68: p_initial, p_transition, p_emission = x jpayne@68: jpayne@68: return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) jpayne@68: jpayne@68: jpayne@68: def _mle( jpayne@68: N, jpayne@68: M, jpayne@68: training_outputs, jpayne@68: training_states, jpayne@68: pseudo_initial, jpayne@68: pseudo_transition, jpayne@68: pseudo_emission, jpayne@68: ): jpayne@68: """Implement Maximum likelihood estimation algorithm (PRIVATE).""" jpayne@68: # p_initial is the probability that a sequence of states starts jpayne@68: # off with a particular one. jpayne@68: p_initial = np.zeros(N) jpayne@68: if pseudo_initial: jpayne@68: p_initial = p_initial + pseudo_initial jpayne@68: for states in training_states: jpayne@68: p_initial[states[0]] += 1 jpayne@68: p_initial = _normalize(p_initial) jpayne@68: jpayne@68: # p_transition is the probability that a state leads to the next jpayne@68: # one. C(i,j)/C(i) where i and j are states. jpayne@68: p_transition = np.zeros((N, N)) jpayne@68: if pseudo_transition: jpayne@68: p_transition = p_transition + pseudo_transition jpayne@68: for states in training_states: jpayne@68: for n in range(len(states) - 1): jpayne@68: i, j = states[n], states[n + 1] jpayne@68: p_transition[i, j] += 1 jpayne@68: for i in range(len(p_transition)): jpayne@68: p_transition[i, :] = p_transition[i, :] / sum(p_transition[i, :]) jpayne@68: jpayne@68: # p_emission is the probability of an output given a state. jpayne@68: # C(s,o)|C(s) where o is an output and s is a state. jpayne@68: p_emission = np.zeros((N, M)) jpayne@68: if pseudo_emission: jpayne@68: p_emission = p_emission + pseudo_emission jpayne@68: p_emission = np.ones((N, M)) jpayne@68: for outputs, states in zip(training_outputs, training_states): jpayne@68: for o, s in zip(outputs, states): jpayne@68: p_emission[s, o] += 1 jpayne@68: for i in range(len(p_emission)): jpayne@68: p_emission[i, :] = p_emission[i, :] / sum(p_emission[i, :]) jpayne@68: jpayne@68: return p_initial, p_transition, p_emission jpayne@68: jpayne@68: jpayne@68: def _argmaxes(vector, allowance=None): jpayne@68: """Return indices of the maximum values aong the vector (PRIVATE).""" jpayne@68: return [np.argmax(vector)] jpayne@68: jpayne@68: jpayne@68: def find_states(markov_model, output): jpayne@68: """Find states in the given Markov model output. jpayne@68: jpayne@68: Returns a list of (states, score) tuples. jpayne@68: """ jpayne@68: mm = markov_model jpayne@68: N = len(mm.states) jpayne@68: jpayne@68: # _viterbi does calculations in log space. Add a tiny bit to the jpayne@68: # matrices so that the logs will not break. jpayne@68: lp_initial = np.log(mm.p_initial + VERY_SMALL_NUMBER) jpayne@68: lp_transition = np.log(mm.p_transition + VERY_SMALL_NUMBER) jpayne@68: lp_emission = np.log(mm.p_emission + VERY_SMALL_NUMBER) jpayne@68: # Change output into a list of indexes into the alphabet. jpayne@68: indexes = itemindex(mm.alphabet) jpayne@68: output = [indexes[x] for x in output] jpayne@68: jpayne@68: # Run the viterbi algorithm. jpayne@68: results = _viterbi(N, lp_initial, lp_transition, lp_emission, output) jpayne@68: jpayne@68: for i in range(len(results)): jpayne@68: states, score = results[i] jpayne@68: results[i] = [mm.states[x] for x in states], np.exp(score) jpayne@68: return results jpayne@68: jpayne@68: jpayne@68: def _viterbi(N, lp_initial, lp_transition, lp_emission, output): jpayne@68: """Implement Viterbi algorithm to find most likely states for a given input (PRIVATE).""" jpayne@68: T = len(output) jpayne@68: # Store the backtrace in a NxT matrix. jpayne@68: backtrace = [] # list of indexes of states in previous timestep. jpayne@68: for i in range(N): jpayne@68: backtrace.append([None] * T) jpayne@68: jpayne@68: # Store the best scores. jpayne@68: scores = np.zeros((N, T)) jpayne@68: scores[:, 0] = lp_initial + lp_emission[:, output[0]] jpayne@68: for t in range(1, T): jpayne@68: k = output[t] jpayne@68: for j in range(N): jpayne@68: # Find the most likely place it came from. jpayne@68: i_scores = scores[:, t - 1] + lp_transition[:, j] + lp_emission[j, k] jpayne@68: indexes = _argmaxes(i_scores) jpayne@68: scores[j, t] = i_scores[indexes[0]] jpayne@68: backtrace[j][t] = indexes jpayne@68: jpayne@68: # Do the backtrace. First, find a good place to start. Then, jpayne@68: # we'll follow the backtrace matrix to find the list of states. jpayne@68: # In the event of ties, there may be multiple paths back through jpayne@68: # the matrix, which implies a recursive solution. We'll simulate jpayne@68: # it by keeping our own stack. jpayne@68: in_process = [] # list of (t, states, score) jpayne@68: results = [] # return values. list of (states, score) jpayne@68: indexes = _argmaxes(scores[:, T - 1]) # pick the first place jpayne@68: for i in indexes: jpayne@68: in_process.append((T - 1, [i], scores[i][T - 1])) jpayne@68: while in_process: jpayne@68: t, states, score = in_process.pop() jpayne@68: if t == 0: jpayne@68: results.append((states, score)) jpayne@68: else: jpayne@68: indexes = backtrace[states[0]][t] jpayne@68: for i in indexes: jpayne@68: in_process.append((t - 1, [i] + states, score)) jpayne@68: return results jpayne@68: jpayne@68: jpayne@68: def _normalize(matrix): jpayne@68: """Normalize matrix object (PRIVATE).""" jpayne@68: if len(matrix.shape) == 1: jpayne@68: matrix = matrix / sum(matrix) jpayne@68: elif len(matrix.shape) == 2: jpayne@68: # Normalize by rows. jpayne@68: for i in range(len(matrix)): jpayne@68: matrix[i, :] = matrix[i, :] / sum(matrix[i, :]) jpayne@68: else: jpayne@68: raise ValueError("I cannot handle matrixes of that shape") jpayne@68: return matrix jpayne@68: jpayne@68: jpayne@68: def _uniform_norm(shape): jpayne@68: """Normalize a uniform matrix (PRIVATE).""" jpayne@68: matrix = np.ones(shape) jpayne@68: return _normalize(matrix) jpayne@68: jpayne@68: jpayne@68: def _random_norm(shape): jpayne@68: """Normalize a random matrix (PRIVATE).""" jpayne@68: matrix = np.random.random(shape) jpayne@68: return _normalize(matrix) jpayne@68: jpayne@68: jpayne@68: def _copy_and_check(matrix, desired_shape): jpayne@68: """Copy a matrix and check its dimension. Normalize at the end (PRIVATE).""" jpayne@68: # Copy the matrix. jpayne@68: matrix = np.array(matrix, copy=1) jpayne@68: # Check the dimensions. jpayne@68: if matrix.shape != desired_shape: jpayne@68: raise ValueError("Incorrect dimension") jpayne@68: # Make sure it's normalized. jpayne@68: if len(matrix.shape) == 1: jpayne@68: if np.fabs(sum(matrix) - 1.0) > 0.01: jpayne@68: raise ValueError("matrix not normalized to 1.0") jpayne@68: elif len(matrix.shape) == 2: jpayne@68: for i in range(len(matrix)): jpayne@68: if np.fabs(sum(matrix[i]) - 1.0) > 0.01: jpayne@68: raise ValueError("matrix %d not normalized to 1.0" % i) jpayne@68: else: jpayne@68: raise ValueError("I don't handle matrices > 2 dimensions") jpayne@68: return matrix jpayne@68: jpayne@68: jpayne@68: def _logsum(matrix): jpayne@68: """Implement logsum for a matrix object (PRIVATE).""" jpayne@68: if len(matrix.shape) > 1: jpayne@68: vec = np.reshape(matrix, (np.prod(matrix.shape),)) jpayne@68: else: jpayne@68: vec = matrix jpayne@68: sum = LOG0 jpayne@68: for num in vec: jpayne@68: sum = logaddexp(sum, num) jpayne@68: return sum jpayne@68: jpayne@68: jpayne@68: def _logvecadd(logvec1, logvec2): jpayne@68: """Implement a log sum for two vector objects (PRIVATE).""" jpayne@68: assert len(logvec1) == len(logvec2), "vectors aren't the same length" jpayne@68: sumvec = np.zeros(len(logvec1)) jpayne@68: for i in range(len(logvec1)): jpayne@68: sumvec[i] = logaddexp(logvec1[i], logvec2[i]) jpayne@68: return sumvec jpayne@68: jpayne@68: jpayne@68: def _exp_logsum(numbers): jpayne@68: """Return the exponential of a logsum (PRIVATE).""" jpayne@68: sum = _logsum(numbers) jpayne@68: return np.exp(sum)