jpayne@69: # Copyright 2002 by Jeffrey Chang. jpayne@69: # All rights reserved. jpayne@69: # jpayne@69: # This file is part of the Biopython distribution and governed by your jpayne@69: # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". jpayne@69: # Please see the LICENSE file that should have been included as part of this jpayne@69: # package. jpayne@69: """A state-emitting MarkovModel. jpayne@69: jpayne@69: Note terminology similar to Manning and Schutze is used. jpayne@69: jpayne@69: jpayne@69: Functions: jpayne@69: train_bw Train a markov model using the Baum-Welch algorithm. jpayne@69: train_visible Train a visible markov model using MLE. jpayne@69: find_states Find the a state sequence that explains some observations. jpayne@69: jpayne@69: load Load a MarkovModel. jpayne@69: save Save a MarkovModel. jpayne@69: jpayne@69: Classes: jpayne@69: MarkovModel Holds the description of a markov model jpayne@69: """ jpayne@69: jpayne@69: import warnings jpayne@69: from Bio import BiopythonDeprecationWarning jpayne@69: jpayne@69: warnings.warn( jpayne@69: "The 'Bio.MarkovModel' module is deprecated and will be removed in a " jpayne@69: "future release of Biopython. Consider using the hmmlearn package instead.", jpayne@69: BiopythonDeprecationWarning, jpayne@69: ) jpayne@69: jpayne@69: jpayne@69: try: jpayne@69: import numpy as np jpayne@69: except ImportError: jpayne@69: from Bio import MissingPythonDependencyError jpayne@69: jpayne@69: raise MissingPythonDependencyError( jpayne@69: "Please install NumPy if you want to use Bio.MarkovModel. " jpayne@69: "See http://www.numpy.org/" jpayne@69: ) from None jpayne@69: jpayne@69: logaddexp = np.logaddexp jpayne@69: jpayne@69: jpayne@69: def itemindex(values): jpayne@69: """Return a dictionary of values with their sequence offset as keys.""" jpayne@69: d = {} jpayne@69: entries = enumerate(values[::-1]) jpayne@69: n = len(values) - 1 jpayne@69: for index, key in entries: jpayne@69: d[key] = n - index jpayne@69: return d jpayne@69: jpayne@69: jpayne@69: np.random.seed() jpayne@69: jpayne@69: VERY_SMALL_NUMBER = 1e-300 jpayne@69: LOG0 = np.log(VERY_SMALL_NUMBER) jpayne@69: jpayne@69: jpayne@69: class MarkovModel: jpayne@69: """Create a state-emitting MarkovModel object.""" jpayne@69: jpayne@69: def __init__( jpayne@69: self, states, alphabet, p_initial=None, p_transition=None, p_emission=None jpayne@69: ): jpayne@69: """Initialize the class.""" jpayne@69: self.states = states jpayne@69: self.alphabet = alphabet jpayne@69: self.p_initial = p_initial jpayne@69: self.p_transition = p_transition jpayne@69: self.p_emission = p_emission jpayne@69: jpayne@69: def __str__(self): jpayne@69: """Create a string representation of the MarkovModel object.""" jpayne@69: from io import StringIO jpayne@69: jpayne@69: handle = StringIO() jpayne@69: save(self, handle) jpayne@69: handle.seek(0) jpayne@69: return handle.read() jpayne@69: jpayne@69: jpayne@69: def _readline_and_check_start(handle, start): jpayne@69: """Read the first line and evaluate that begisn with the correct start (PRIVATE).""" jpayne@69: line = handle.readline() jpayne@69: if not line.startswith(start): jpayne@69: raise ValueError(f"I expected {start!r} but got {line!r}") jpayne@69: return line jpayne@69: jpayne@69: jpayne@69: def load(handle): jpayne@69: """Parse a file handle into a MarkovModel object.""" jpayne@69: # Load the states. jpayne@69: line = _readline_and_check_start(handle, "STATES:") jpayne@69: states = line.split()[1:] jpayne@69: jpayne@69: # Load the alphabet. jpayne@69: line = _readline_and_check_start(handle, "ALPHABET:") jpayne@69: alphabet = line.split()[1:] jpayne@69: jpayne@69: mm = MarkovModel(states, alphabet) jpayne@69: N, M = len(states), len(alphabet) jpayne@69: jpayne@69: # Load the initial probabilities. jpayne@69: mm.p_initial = np.zeros(N) jpayne@69: line = _readline_and_check_start(handle, "INITIAL:") jpayne@69: for i in range(len(states)): jpayne@69: line = _readline_and_check_start(handle, f" {states[i]}:") jpayne@69: mm.p_initial[i] = float(line.split()[-1]) jpayne@69: jpayne@69: # Load the transition. jpayne@69: mm.p_transition = np.zeros((N, N)) jpayne@69: line = _readline_and_check_start(handle, "TRANSITION:") jpayne@69: for i in range(len(states)): jpayne@69: line = _readline_and_check_start(handle, f" {states[i]}:") jpayne@69: mm.p_transition[i, :] = [float(v) for v in line.split()[1:]] jpayne@69: jpayne@69: # Load the emission. jpayne@69: mm.p_emission = np.zeros((N, M)) jpayne@69: line = _readline_and_check_start(handle, "EMISSION:") jpayne@69: for i in range(len(states)): jpayne@69: line = _readline_and_check_start(handle, f" {states[i]}:") jpayne@69: mm.p_emission[i, :] = [float(v) for v in line.split()[1:]] jpayne@69: jpayne@69: return mm jpayne@69: jpayne@69: jpayne@69: def save(mm, handle): jpayne@69: """Save MarkovModel object into handle.""" jpayne@69: # This will fail if there are spaces in the states or alphabet. jpayne@69: w = handle.write jpayne@69: w(f"STATES: {' '.join(mm.states)}\n") jpayne@69: w(f"ALPHABET: {' '.join(mm.alphabet)}\n") jpayne@69: w("INITIAL:\n") jpayne@69: for i in range(len(mm.p_initial)): jpayne@69: w(f" {mm.states[i]}: {mm.p_initial[i]:g}\n") jpayne@69: w("TRANSITION:\n") jpayne@69: for i in range(len(mm.p_transition)): jpayne@69: w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_transition[i])}\n") jpayne@69: w("EMISSION:\n") jpayne@69: for i in range(len(mm.p_emission)): jpayne@69: w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_emission[i])}\n") jpayne@69: jpayne@69: jpayne@69: # XXX allow them to specify starting points jpayne@69: def train_bw( jpayne@69: states, jpayne@69: alphabet, jpayne@69: training_data, jpayne@69: pseudo_initial=None, jpayne@69: pseudo_transition=None, jpayne@69: pseudo_emission=None, jpayne@69: update_fn=None, jpayne@69: ): jpayne@69: """Train a MarkovModel using the Baum-Welch algorithm. jpayne@69: jpayne@69: Train a MarkovModel using the Baum-Welch algorithm. states is a list jpayne@69: of strings that describe the names of each state. alphabet is a jpayne@69: list of objects that indicate the allowed outputs. training_data jpayne@69: is a list of observations. Each observation is a list of objects jpayne@69: from the alphabet. jpayne@69: jpayne@69: pseudo_initial, pseudo_transition, and pseudo_emission are jpayne@69: optional parameters that you can use to assign pseudo-counts to jpayne@69: different matrices. They should be matrices of the appropriate jpayne@69: size that contain numbers to add to each parameter matrix, before jpayne@69: normalization. jpayne@69: jpayne@69: update_fn is an optional callback that takes parameters jpayne@69: (iteration, log_likelihood). It is called once per iteration. jpayne@69: """ jpayne@69: N, M = len(states), len(alphabet) jpayne@69: if not training_data: jpayne@69: raise ValueError("No training data given.") jpayne@69: if pseudo_initial is not None: jpayne@69: pseudo_initial = np.asarray(pseudo_initial) jpayne@69: if pseudo_initial.shape != (N,): jpayne@69: raise ValueError("pseudo_initial not shape len(states)") jpayne@69: if pseudo_transition is not None: jpayne@69: pseudo_transition = np.asarray(pseudo_transition) jpayne@69: if pseudo_transition.shape != (N, N): jpayne@69: raise ValueError("pseudo_transition not shape len(states) X len(states)") jpayne@69: if pseudo_emission is not None: jpayne@69: pseudo_emission = np.asarray(pseudo_emission) jpayne@69: if pseudo_emission.shape != (N, M): jpayne@69: raise ValueError("pseudo_emission not shape len(states) X len(alphabet)") jpayne@69: jpayne@69: # Training data is given as a list of members of the alphabet. jpayne@69: # Replace those with indexes into the alphabet list for easier jpayne@69: # computation. jpayne@69: training_outputs = [] jpayne@69: indexes = itemindex(alphabet) jpayne@69: for outputs in training_data: jpayne@69: training_outputs.append([indexes[x] for x in outputs]) jpayne@69: jpayne@69: # Do some sanity checking on the outputs. jpayne@69: lengths = [len(x) for x in training_outputs] jpayne@69: if min(lengths) == 0: jpayne@69: raise ValueError("I got training data with outputs of length 0") jpayne@69: jpayne@69: # Do the training with baum welch. jpayne@69: x = _baum_welch( jpayne@69: N, jpayne@69: M, jpayne@69: training_outputs, jpayne@69: pseudo_initial=pseudo_initial, jpayne@69: pseudo_transition=pseudo_transition, jpayne@69: pseudo_emission=pseudo_emission, jpayne@69: update_fn=update_fn, jpayne@69: ) jpayne@69: p_initial, p_transition, p_emission = x jpayne@69: return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) jpayne@69: jpayne@69: jpayne@69: MAX_ITERATIONS = 1000 jpayne@69: jpayne@69: jpayne@69: def _baum_welch( jpayne@69: N, jpayne@69: M, jpayne@69: training_outputs, jpayne@69: p_initial=None, jpayne@69: p_transition=None, jpayne@69: p_emission=None, jpayne@69: pseudo_initial=None, jpayne@69: pseudo_transition=None, jpayne@69: pseudo_emission=None, jpayne@69: update_fn=None, jpayne@69: ): jpayne@69: """Implement the Baum-Welch algorithm to evaluate unknown parameters in the MarkovModel object (PRIVATE).""" jpayne@69: if p_initial is None: jpayne@69: p_initial = _random_norm(N) jpayne@69: else: jpayne@69: p_initial = _copy_and_check(p_initial, (N,)) jpayne@69: jpayne@69: if p_transition is None: jpayne@69: p_transition = _random_norm((N, N)) jpayne@69: else: jpayne@69: p_transition = _copy_and_check(p_transition, (N, N)) jpayne@69: if p_emission is None: jpayne@69: p_emission = _random_norm((N, M)) jpayne@69: else: jpayne@69: p_emission = _copy_and_check(p_emission, (N, M)) jpayne@69: jpayne@69: # Do all the calculations in log space to avoid underflows. jpayne@69: lp_initial = np.log(p_initial) jpayne@69: lp_transition = np.log(p_transition) jpayne@69: lp_emission = np.log(p_emission) jpayne@69: if pseudo_initial is not None: jpayne@69: lpseudo_initial = np.log(pseudo_initial) jpayne@69: else: jpayne@69: lpseudo_initial = None jpayne@69: if pseudo_transition is not None: jpayne@69: lpseudo_transition = np.log(pseudo_transition) jpayne@69: else: jpayne@69: lpseudo_transition = None jpayne@69: if pseudo_emission is not None: jpayne@69: lpseudo_emission = np.log(pseudo_emission) jpayne@69: else: jpayne@69: lpseudo_emission = None jpayne@69: jpayne@69: # Iterate through each sequence of output, updating the parameters jpayne@69: # to the HMM. Stop when the log likelihoods of the sequences jpayne@69: # stops varying. jpayne@69: prev_llik = None jpayne@69: for i in range(MAX_ITERATIONS): jpayne@69: llik = LOG0 jpayne@69: for outputs in training_outputs: jpayne@69: llik += _baum_welch_one( jpayne@69: N, jpayne@69: M, jpayne@69: outputs, jpayne@69: lp_initial, jpayne@69: lp_transition, jpayne@69: lp_emission, jpayne@69: lpseudo_initial, jpayne@69: lpseudo_transition, jpayne@69: lpseudo_emission, jpayne@69: ) jpayne@69: if update_fn is not None: jpayne@69: update_fn(i, llik) jpayne@69: if prev_llik is not None and np.fabs(prev_llik - llik) < 0.1: jpayne@69: break jpayne@69: prev_llik = llik jpayne@69: else: jpayne@69: raise RuntimeError("HMM did not converge in %d iterations" % MAX_ITERATIONS) jpayne@69: jpayne@69: # Return everything back in normal space. jpayne@69: return [np.exp(_) for _ in (lp_initial, lp_transition, lp_emission)] jpayne@69: jpayne@69: jpayne@69: def _baum_welch_one( jpayne@69: N, jpayne@69: M, jpayne@69: outputs, jpayne@69: lp_initial, jpayne@69: lp_transition, jpayne@69: lp_emission, jpayne@69: lpseudo_initial, jpayne@69: lpseudo_transition, jpayne@69: lpseudo_emission, jpayne@69: ): jpayne@69: """Execute one step for Baum-Welch algorithm (PRIVATE). jpayne@69: jpayne@69: Do one iteration of Baum-Welch based on a sequence of output. jpayne@69: Changes the value for lp_initial, lp_transition and lp_emission in place. jpayne@69: """ jpayne@69: T = len(outputs) jpayne@69: fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs) jpayne@69: bmat = _backward(N, T, lp_transition, lp_emission, outputs) jpayne@69: jpayne@69: # Calculate the probability of traversing each arc for any given jpayne@69: # transition. jpayne@69: lp_arc = np.zeros((N, N, T)) jpayne@69: for t in range(T): jpayne@69: k = outputs[t] jpayne@69: lp_traverse = np.zeros((N, N)) # P going over one arc. jpayne@69: for i in range(N): jpayne@69: for j in range(N): jpayne@69: # P(getting to this arc) jpayne@69: # P(making this transition) jpayne@69: # P(emitting this character) jpayne@69: # P(going to the end) jpayne@69: lp = ( jpayne@69: fmat[i][t] jpayne@69: + lp_transition[i][j] jpayne@69: + lp_emission[i][k] jpayne@69: + bmat[j][t + 1] jpayne@69: ) jpayne@69: lp_traverse[i][j] = lp jpayne@69: # Normalize the probability for this time step. jpayne@69: lp_arc[:, :, t] = lp_traverse - _logsum(lp_traverse) jpayne@69: jpayne@69: # Sum of all the transitions out of state i at time t. jpayne@69: lp_arcout_t = np.zeros((N, T)) jpayne@69: for t in range(T): jpayne@69: for i in range(N): jpayne@69: lp_arcout_t[i][t] = _logsum(lp_arc[i, :, t]) jpayne@69: jpayne@69: # Sum of all the transitions out of state i. jpayne@69: lp_arcout = np.zeros(N) jpayne@69: for i in range(N): jpayne@69: lp_arcout[i] = _logsum(lp_arcout_t[i, :]) jpayne@69: jpayne@69: # UPDATE P_INITIAL. jpayne@69: lp_initial = lp_arcout_t[:, 0] jpayne@69: if lpseudo_initial is not None: jpayne@69: lp_initial = _logvecadd(lp_initial, lpseudo_initial) jpayne@69: lp_initial = lp_initial - _logsum(lp_initial) jpayne@69: jpayne@69: # UPDATE P_TRANSITION. p_transition[i][j] is the sum of all the jpayne@69: # transitions from i to j, normalized by the sum of the jpayne@69: # transitions out of i. jpayne@69: for i in range(N): jpayne@69: for j in range(N): jpayne@69: lp_transition[i][j] = _logsum(lp_arc[i, j, :]) - lp_arcout[i] jpayne@69: if lpseudo_transition is not None: jpayne@69: lp_transition[i] = _logvecadd(lp_transition[i], lpseudo_transition) jpayne@69: lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i]) jpayne@69: jpayne@69: # UPDATE P_EMISSION. lp_emission[i][k] is the sum of all the jpayne@69: # transitions out of i when k is observed, divided by the sum of jpayne@69: # the transitions out of i. jpayne@69: for i in range(N): jpayne@69: ksum = np.zeros(M) + LOG0 # ksum[k] is the sum of all i with k. jpayne@69: for t in range(T): jpayne@69: k = outputs[t] jpayne@69: for j in range(N): jpayne@69: ksum[k] = logaddexp(ksum[k], lp_arc[i, j, t]) jpayne@69: ksum = ksum - _logsum(ksum) # Normalize jpayne@69: if lpseudo_emission is not None: jpayne@69: ksum = _logvecadd(ksum, lpseudo_emission[i]) jpayne@69: ksum = ksum - _logsum(ksum) # Renormalize jpayne@69: lp_emission[i, :] = ksum jpayne@69: jpayne@69: # Calculate the log likelihood of the output based on the forward jpayne@69: # matrix. Since the parameters of the HMM has changed, the log jpayne@69: # likelihoods are going to be a step behind, and we might be doing jpayne@69: # one extra iteration of training. The alternative is to rerun jpayne@69: # the _forward algorithm and calculate from the clean one, but jpayne@69: # that may be more expensive than overshooting the training by one jpayne@69: # step. jpayne@69: return _logsum(fmat[:, T]) jpayne@69: jpayne@69: jpayne@69: def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs): jpayne@69: """Implement forward algorithm (PRIVATE). jpayne@69: jpayne@69: Calculate a Nx(T+1) matrix, where the last column is the total jpayne@69: probability of the output. jpayne@69: """ jpayne@69: matrix = np.zeros((N, T + 1)) jpayne@69: jpayne@69: # Initialize the first column to be the initial values. jpayne@69: matrix[:, 0] = lp_initial jpayne@69: for t in range(1, T + 1): jpayne@69: k = outputs[t - 1] jpayne@69: for j in range(N): jpayne@69: # The probability of the state is the sum of the jpayne@69: # transitions from all the states from time t-1. jpayne@69: lprob = LOG0 jpayne@69: for i in range(N): jpayne@69: lp = matrix[i][t - 1] + lp_transition[i][j] + lp_emission[i][k] jpayne@69: lprob = logaddexp(lprob, lp) jpayne@69: matrix[j][t] = lprob jpayne@69: return matrix jpayne@69: jpayne@69: jpayne@69: def _backward(N, T, lp_transition, lp_emission, outputs): jpayne@69: """Implement backward algorithm (PRIVATE).""" jpayne@69: matrix = np.zeros((N, T + 1)) jpayne@69: for t in range(T - 1, -1, -1): jpayne@69: k = outputs[t] jpayne@69: for i in range(N): jpayne@69: # The probability of the state is the sum of the jpayne@69: # transitions from all the states from time t+1. jpayne@69: lprob = LOG0 jpayne@69: for j in range(N): jpayne@69: lp = matrix[j][t + 1] + lp_transition[i][j] + lp_emission[i][k] jpayne@69: lprob = logaddexp(lprob, lp) jpayne@69: matrix[i][t] = lprob jpayne@69: return matrix jpayne@69: jpayne@69: jpayne@69: def train_visible( jpayne@69: states, jpayne@69: alphabet, jpayne@69: training_data, jpayne@69: pseudo_initial=None, jpayne@69: pseudo_transition=None, jpayne@69: pseudo_emission=None, jpayne@69: ): jpayne@69: """Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters. jpayne@69: jpayne@69: Train a visible MarkovModel using maximum likelihoood estimates jpayne@69: for each of the parameters. states is a list of strings that jpayne@69: describe the names of each state. alphabet is a list of objects jpayne@69: that indicate the allowed outputs. training_data is a list of jpayne@69: (outputs, observed states) where outputs is a list of the emission jpayne@69: from the alphabet, and observed states is a list of states from jpayne@69: states. jpayne@69: jpayne@69: pseudo_initial, pseudo_transition, and pseudo_emission are jpayne@69: optional parameters that you can use to assign pseudo-counts to jpayne@69: different matrices. They should be matrices of the appropriate jpayne@69: size that contain numbers to add to each parameter matrix. jpayne@69: """ jpayne@69: N, M = len(states), len(alphabet) jpayne@69: if pseudo_initial is not None: jpayne@69: pseudo_initial = np.asarray(pseudo_initial) jpayne@69: if pseudo_initial.shape != (N,): jpayne@69: raise ValueError("pseudo_initial not shape len(states)") jpayne@69: if pseudo_transition is not None: jpayne@69: pseudo_transition = np.asarray(pseudo_transition) jpayne@69: if pseudo_transition.shape != (N, N): jpayne@69: raise ValueError("pseudo_transition not shape len(states) X len(states)") jpayne@69: if pseudo_emission is not None: jpayne@69: pseudo_emission = np.asarray(pseudo_emission) jpayne@69: if pseudo_emission.shape != (N, M): jpayne@69: raise ValueError("pseudo_emission not shape len(states) X len(alphabet)") jpayne@69: jpayne@69: # Training data is given as a list of members of the alphabet. jpayne@69: # Replace those with indexes into the alphabet list for easier jpayne@69: # computation. jpayne@69: training_states, training_outputs = [], [] jpayne@69: states_indexes = itemindex(states) jpayne@69: outputs_indexes = itemindex(alphabet) jpayne@69: for toutputs, tstates in training_data: jpayne@69: if len(tstates) != len(toutputs): jpayne@69: raise ValueError("states and outputs not aligned") jpayne@69: training_states.append([states_indexes[x] for x in tstates]) jpayne@69: training_outputs.append([outputs_indexes[x] for x in toutputs]) jpayne@69: jpayne@69: x = _mle( jpayne@69: N, jpayne@69: M, jpayne@69: training_outputs, jpayne@69: training_states, jpayne@69: pseudo_initial, jpayne@69: pseudo_transition, jpayne@69: pseudo_emission, jpayne@69: ) jpayne@69: p_initial, p_transition, p_emission = x jpayne@69: jpayne@69: return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) jpayne@69: jpayne@69: jpayne@69: def _mle( jpayne@69: N, jpayne@69: M, jpayne@69: training_outputs, jpayne@69: training_states, jpayne@69: pseudo_initial, jpayne@69: pseudo_transition, jpayne@69: pseudo_emission, jpayne@69: ): jpayne@69: """Implement Maximum likelihood estimation algorithm (PRIVATE).""" jpayne@69: # p_initial is the probability that a sequence of states starts jpayne@69: # off with a particular one. jpayne@69: p_initial = np.zeros(N) jpayne@69: if pseudo_initial: jpayne@69: p_initial = p_initial + pseudo_initial jpayne@69: for states in training_states: jpayne@69: p_initial[states[0]] += 1 jpayne@69: p_initial = _normalize(p_initial) jpayne@69: jpayne@69: # p_transition is the probability that a state leads to the next jpayne@69: # one. C(i,j)/C(i) where i and j are states. jpayne@69: p_transition = np.zeros((N, N)) jpayne@69: if pseudo_transition: jpayne@69: p_transition = p_transition + pseudo_transition jpayne@69: for states in training_states: jpayne@69: for n in range(len(states) - 1): jpayne@69: i, j = states[n], states[n + 1] jpayne@69: p_transition[i, j] += 1 jpayne@69: for i in range(len(p_transition)): jpayne@69: p_transition[i, :] = p_transition[i, :] / sum(p_transition[i, :]) jpayne@69: jpayne@69: # p_emission is the probability of an output given a state. jpayne@69: # C(s,o)|C(s) where o is an output and s is a state. jpayne@69: p_emission = np.zeros((N, M)) jpayne@69: if pseudo_emission: jpayne@69: p_emission = p_emission + pseudo_emission jpayne@69: p_emission = np.ones((N, M)) jpayne@69: for outputs, states in zip(training_outputs, training_states): jpayne@69: for o, s in zip(outputs, states): jpayne@69: p_emission[s, o] += 1 jpayne@69: for i in range(len(p_emission)): jpayne@69: p_emission[i, :] = p_emission[i, :] / sum(p_emission[i, :]) jpayne@69: jpayne@69: return p_initial, p_transition, p_emission jpayne@69: jpayne@69: jpayne@69: def _argmaxes(vector, allowance=None): jpayne@69: """Return indices of the maximum values aong the vector (PRIVATE).""" jpayne@69: return [np.argmax(vector)] jpayne@69: jpayne@69: jpayne@69: def find_states(markov_model, output): jpayne@69: """Find states in the given Markov model output. jpayne@69: jpayne@69: Returns a list of (states, score) tuples. jpayne@69: """ jpayne@69: mm = markov_model jpayne@69: N = len(mm.states) jpayne@69: jpayne@69: # _viterbi does calculations in log space. Add a tiny bit to the jpayne@69: # matrices so that the logs will not break. jpayne@69: lp_initial = np.log(mm.p_initial + VERY_SMALL_NUMBER) jpayne@69: lp_transition = np.log(mm.p_transition + VERY_SMALL_NUMBER) jpayne@69: lp_emission = np.log(mm.p_emission + VERY_SMALL_NUMBER) jpayne@69: # Change output into a list of indexes into the alphabet. jpayne@69: indexes = itemindex(mm.alphabet) jpayne@69: output = [indexes[x] for x in output] jpayne@69: jpayne@69: # Run the viterbi algorithm. jpayne@69: results = _viterbi(N, lp_initial, lp_transition, lp_emission, output) jpayne@69: jpayne@69: for i in range(len(results)): jpayne@69: states, score = results[i] jpayne@69: results[i] = [mm.states[x] for x in states], np.exp(score) jpayne@69: return results jpayne@69: jpayne@69: jpayne@69: def _viterbi(N, lp_initial, lp_transition, lp_emission, output): jpayne@69: """Implement Viterbi algorithm to find most likely states for a given input (PRIVATE).""" jpayne@69: T = len(output) jpayne@69: # Store the backtrace in a NxT matrix. jpayne@69: backtrace = [] # list of indexes of states in previous timestep. jpayne@69: for i in range(N): jpayne@69: backtrace.append([None] * T) jpayne@69: jpayne@69: # Store the best scores. jpayne@69: scores = np.zeros((N, T)) jpayne@69: scores[:, 0] = lp_initial + lp_emission[:, output[0]] jpayne@69: for t in range(1, T): jpayne@69: k = output[t] jpayne@69: for j in range(N): jpayne@69: # Find the most likely place it came from. jpayne@69: i_scores = scores[:, t - 1] + lp_transition[:, j] + lp_emission[j, k] jpayne@69: indexes = _argmaxes(i_scores) jpayne@69: scores[j, t] = i_scores[indexes[0]] jpayne@69: backtrace[j][t] = indexes jpayne@69: jpayne@69: # Do the backtrace. First, find a good place to start. Then, jpayne@69: # we'll follow the backtrace matrix to find the list of states. jpayne@69: # In the event of ties, there may be multiple paths back through jpayne@69: # the matrix, which implies a recursive solution. We'll simulate jpayne@69: # it by keeping our own stack. jpayne@69: in_process = [] # list of (t, states, score) jpayne@69: results = [] # return values. list of (states, score) jpayne@69: indexes = _argmaxes(scores[:, T - 1]) # pick the first place jpayne@69: for i in indexes: jpayne@69: in_process.append((T - 1, [i], scores[i][T - 1])) jpayne@69: while in_process: jpayne@69: t, states, score = in_process.pop() jpayne@69: if t == 0: jpayne@69: results.append((states, score)) jpayne@69: else: jpayne@69: indexes = backtrace[states[0]][t] jpayne@69: for i in indexes: jpayne@69: in_process.append((t - 1, [i] + states, score)) jpayne@69: return results jpayne@69: jpayne@69: jpayne@69: def _normalize(matrix): jpayne@69: """Normalize matrix object (PRIVATE).""" jpayne@69: if len(matrix.shape) == 1: jpayne@69: matrix = matrix / sum(matrix) jpayne@69: elif len(matrix.shape) == 2: jpayne@69: # Normalize by rows. jpayne@69: for i in range(len(matrix)): jpayne@69: matrix[i, :] = matrix[i, :] / sum(matrix[i, :]) jpayne@69: else: jpayne@69: raise ValueError("I cannot handle matrixes of that shape") jpayne@69: return matrix jpayne@69: jpayne@69: jpayne@69: def _uniform_norm(shape): jpayne@69: """Normalize a uniform matrix (PRIVATE).""" jpayne@69: matrix = np.ones(shape) jpayne@69: return _normalize(matrix) jpayne@69: jpayne@69: jpayne@69: def _random_norm(shape): jpayne@69: """Normalize a random matrix (PRIVATE).""" jpayne@69: matrix = np.random.random(shape) jpayne@69: return _normalize(matrix) jpayne@69: jpayne@69: jpayne@69: def _copy_and_check(matrix, desired_shape): jpayne@69: """Copy a matrix and check its dimension. Normalize at the end (PRIVATE).""" jpayne@69: # Copy the matrix. jpayne@69: matrix = np.array(matrix, copy=1) jpayne@69: # Check the dimensions. jpayne@69: if matrix.shape != desired_shape: jpayne@69: raise ValueError("Incorrect dimension") jpayne@69: # Make sure it's normalized. jpayne@69: if len(matrix.shape) == 1: jpayne@69: if np.fabs(sum(matrix) - 1.0) > 0.01: jpayne@69: raise ValueError("matrix not normalized to 1.0") jpayne@69: elif len(matrix.shape) == 2: jpayne@69: for i in range(len(matrix)): jpayne@69: if np.fabs(sum(matrix[i]) - 1.0) > 0.01: jpayne@69: raise ValueError("matrix %d not normalized to 1.0" % i) jpayne@69: else: jpayne@69: raise ValueError("I don't handle matrices > 2 dimensions") jpayne@69: return matrix jpayne@69: jpayne@69: jpayne@69: def _logsum(matrix): jpayne@69: """Implement logsum for a matrix object (PRIVATE).""" jpayne@69: if len(matrix.shape) > 1: jpayne@69: vec = np.reshape(matrix, (np.prod(matrix.shape),)) jpayne@69: else: jpayne@69: vec = matrix jpayne@69: sum = LOG0 jpayne@69: for num in vec: jpayne@69: sum = logaddexp(sum, num) jpayne@69: return sum jpayne@69: jpayne@69: jpayne@69: def _logvecadd(logvec1, logvec2): jpayne@69: """Implement a log sum for two vector objects (PRIVATE).""" jpayne@69: assert len(logvec1) == len(logvec2), "vectors aren't the same length" jpayne@69: sumvec = np.zeros(len(logvec1)) jpayne@69: for i in range(len(logvec1)): jpayne@69: sumvec[i] = logaddexp(logvec1[i], logvec2[i]) jpayne@69: return sumvec jpayne@69: jpayne@69: jpayne@69: def _exp_logsum(numbers): jpayne@69: """Return the exponential of a logsum (PRIVATE).""" jpayne@69: sum = _logsum(numbers) jpayne@69: return np.exp(sum)