Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/Bio/MarkovModel.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/Bio/MarkovModel.py Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,679 @@ +# Copyright 2002 by Jeffrey Chang. +# All rights reserved. +# +# This file is part of the Biopython distribution and governed by your +# choice of the "Biopython License Agreement" or the "BSD 3-Clause License". +# Please see the LICENSE file that should have been included as part of this +# package. +"""A state-emitting MarkovModel. + +Note terminology similar to Manning and Schutze is used. + + +Functions: +train_bw Train a markov model using the Baum-Welch algorithm. +train_visible Train a visible markov model using MLE. +find_states Find the a state sequence that explains some observations. + +load Load a MarkovModel. +save Save a MarkovModel. + +Classes: +MarkovModel Holds the description of a markov model +""" + +import warnings +from Bio import BiopythonDeprecationWarning + +warnings.warn( + "The 'Bio.MarkovModel' module is deprecated and will be removed in a " + "future release of Biopython. Consider using the hmmlearn package instead.", + BiopythonDeprecationWarning, +) + + +try: + import numpy as np +except ImportError: + from Bio import MissingPythonDependencyError + + raise MissingPythonDependencyError( + "Please install NumPy if you want to use Bio.MarkovModel. " + "See http://www.numpy.org/" + ) from None + +logaddexp = np.logaddexp + + +def itemindex(values): + """Return a dictionary of values with their sequence offset as keys.""" + d = {} + entries = enumerate(values[::-1]) + n = len(values) - 1 + for index, key in entries: + d[key] = n - index + return d + + +np.random.seed() + +VERY_SMALL_NUMBER = 1e-300 +LOG0 = np.log(VERY_SMALL_NUMBER) + + +class MarkovModel: + """Create a state-emitting MarkovModel object.""" + + def __init__( + self, states, alphabet, p_initial=None, p_transition=None, p_emission=None + ): + """Initialize the class.""" + self.states = states + self.alphabet = alphabet + self.p_initial = p_initial + self.p_transition = p_transition + self.p_emission = p_emission + + def __str__(self): + """Create a string representation of the MarkovModel object.""" + from io import StringIO + + handle = StringIO() + save(self, handle) + handle.seek(0) + return handle.read() + + +def _readline_and_check_start(handle, start): + """Read the first line and evaluate that begisn with the correct start (PRIVATE).""" + line = handle.readline() + if not line.startswith(start): + raise ValueError(f"I expected {start!r} but got {line!r}") + return line + + +def load(handle): + """Parse a file handle into a MarkovModel object.""" + # Load the states. + line = _readline_and_check_start(handle, "STATES:") + states = line.split()[1:] + + # Load the alphabet. + line = _readline_and_check_start(handle, "ALPHABET:") + alphabet = line.split()[1:] + + mm = MarkovModel(states, alphabet) + N, M = len(states), len(alphabet) + + # Load the initial probabilities. + mm.p_initial = np.zeros(N) + line = _readline_and_check_start(handle, "INITIAL:") + for i in range(len(states)): + line = _readline_and_check_start(handle, f" {states[i]}:") + mm.p_initial[i] = float(line.split()[-1]) + + # Load the transition. + mm.p_transition = np.zeros((N, N)) + line = _readline_and_check_start(handle, "TRANSITION:") + for i in range(len(states)): + line = _readline_and_check_start(handle, f" {states[i]}:") + mm.p_transition[i, :] = [float(v) for v in line.split()[1:]] + + # Load the emission. + mm.p_emission = np.zeros((N, M)) + line = _readline_and_check_start(handle, "EMISSION:") + for i in range(len(states)): + line = _readline_and_check_start(handle, f" {states[i]}:") + mm.p_emission[i, :] = [float(v) for v in line.split()[1:]] + + return mm + + +def save(mm, handle): + """Save MarkovModel object into handle.""" + # This will fail if there are spaces in the states or alphabet. + w = handle.write + w(f"STATES: {' '.join(mm.states)}\n") + w(f"ALPHABET: {' '.join(mm.alphabet)}\n") + w("INITIAL:\n") + for i in range(len(mm.p_initial)): + w(f" {mm.states[i]}: {mm.p_initial[i]:g}\n") + w("TRANSITION:\n") + for i in range(len(mm.p_transition)): + w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_transition[i])}\n") + w("EMISSION:\n") + for i in range(len(mm.p_emission)): + w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_emission[i])}\n") + + +# XXX allow them to specify starting points +def train_bw( + states, + alphabet, + training_data, + pseudo_initial=None, + pseudo_transition=None, + pseudo_emission=None, + update_fn=None, +): + """Train a MarkovModel using the Baum-Welch algorithm. + + Train a MarkovModel using the Baum-Welch algorithm. states is a list + of strings that describe the names of each state. alphabet is a + list of objects that indicate the allowed outputs. training_data + is a list of observations. Each observation is a list of objects + from the alphabet. + + pseudo_initial, pseudo_transition, and pseudo_emission are + optional parameters that you can use to assign pseudo-counts to + different matrices. They should be matrices of the appropriate + size that contain numbers to add to each parameter matrix, before + normalization. + + update_fn is an optional callback that takes parameters + (iteration, log_likelihood). It is called once per iteration. + """ + N, M = len(states), len(alphabet) + if not training_data: + raise ValueError("No training data given.") + if pseudo_initial is not None: + pseudo_initial = np.asarray(pseudo_initial) + if pseudo_initial.shape != (N,): + raise ValueError("pseudo_initial not shape len(states)") + if pseudo_transition is not None: + pseudo_transition = np.asarray(pseudo_transition) + if pseudo_transition.shape != (N, N): + raise ValueError("pseudo_transition not shape len(states) X len(states)") + if pseudo_emission is not None: + pseudo_emission = np.asarray(pseudo_emission) + if pseudo_emission.shape != (N, M): + raise ValueError("pseudo_emission not shape len(states) X len(alphabet)") + + # Training data is given as a list of members of the alphabet. + # Replace those with indexes into the alphabet list for easier + # computation. + training_outputs = [] + indexes = itemindex(alphabet) + for outputs in training_data: + training_outputs.append([indexes[x] for x in outputs]) + + # Do some sanity checking on the outputs. + lengths = [len(x) for x in training_outputs] + if min(lengths) == 0: + raise ValueError("I got training data with outputs of length 0") + + # Do the training with baum welch. + x = _baum_welch( + N, + M, + training_outputs, + pseudo_initial=pseudo_initial, + pseudo_transition=pseudo_transition, + pseudo_emission=pseudo_emission, + update_fn=update_fn, + ) + p_initial, p_transition, p_emission = x + return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) + + +MAX_ITERATIONS = 1000 + + +def _baum_welch( + N, + M, + training_outputs, + p_initial=None, + p_transition=None, + p_emission=None, + pseudo_initial=None, + pseudo_transition=None, + pseudo_emission=None, + update_fn=None, +): + """Implement the Baum-Welch algorithm to evaluate unknown parameters in the MarkovModel object (PRIVATE).""" + if p_initial is None: + p_initial = _random_norm(N) + else: + p_initial = _copy_and_check(p_initial, (N,)) + + if p_transition is None: + p_transition = _random_norm((N, N)) + else: + p_transition = _copy_and_check(p_transition, (N, N)) + if p_emission is None: + p_emission = _random_norm((N, M)) + else: + p_emission = _copy_and_check(p_emission, (N, M)) + + # Do all the calculations in log space to avoid underflows. + lp_initial = np.log(p_initial) + lp_transition = np.log(p_transition) + lp_emission = np.log(p_emission) + if pseudo_initial is not None: + lpseudo_initial = np.log(pseudo_initial) + else: + lpseudo_initial = None + if pseudo_transition is not None: + lpseudo_transition = np.log(pseudo_transition) + else: + lpseudo_transition = None + if pseudo_emission is not None: + lpseudo_emission = np.log(pseudo_emission) + else: + lpseudo_emission = None + + # Iterate through each sequence of output, updating the parameters + # to the HMM. Stop when the log likelihoods of the sequences + # stops varying. + prev_llik = None + for i in range(MAX_ITERATIONS): + llik = LOG0 + for outputs in training_outputs: + llik += _baum_welch_one( + N, + M, + outputs, + lp_initial, + lp_transition, + lp_emission, + lpseudo_initial, + lpseudo_transition, + lpseudo_emission, + ) + if update_fn is not None: + update_fn(i, llik) + if prev_llik is not None and np.fabs(prev_llik - llik) < 0.1: + break + prev_llik = llik + else: + raise RuntimeError("HMM did not converge in %d iterations" % MAX_ITERATIONS) + + # Return everything back in normal space. + return [np.exp(_) for _ in (lp_initial, lp_transition, lp_emission)] + + +def _baum_welch_one( + N, + M, + outputs, + lp_initial, + lp_transition, + lp_emission, + lpseudo_initial, + lpseudo_transition, + lpseudo_emission, +): + """Execute one step for Baum-Welch algorithm (PRIVATE). + + Do one iteration of Baum-Welch based on a sequence of output. + Changes the value for lp_initial, lp_transition and lp_emission in place. + """ + T = len(outputs) + fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs) + bmat = _backward(N, T, lp_transition, lp_emission, outputs) + + # Calculate the probability of traversing each arc for any given + # transition. + lp_arc = np.zeros((N, N, T)) + for t in range(T): + k = outputs[t] + lp_traverse = np.zeros((N, N)) # P going over one arc. + for i in range(N): + for j in range(N): + # P(getting to this arc) + # P(making this transition) + # P(emitting this character) + # P(going to the end) + lp = ( + fmat[i][t] + + lp_transition[i][j] + + lp_emission[i][k] + + bmat[j][t + 1] + ) + lp_traverse[i][j] = lp + # Normalize the probability for this time step. + lp_arc[:, :, t] = lp_traverse - _logsum(lp_traverse) + + # Sum of all the transitions out of state i at time t. + lp_arcout_t = np.zeros((N, T)) + for t in range(T): + for i in range(N): + lp_arcout_t[i][t] = _logsum(lp_arc[i, :, t]) + + # Sum of all the transitions out of state i. + lp_arcout = np.zeros(N) + for i in range(N): + lp_arcout[i] = _logsum(lp_arcout_t[i, :]) + + # UPDATE P_INITIAL. + lp_initial = lp_arcout_t[:, 0] + if lpseudo_initial is not None: + lp_initial = _logvecadd(lp_initial, lpseudo_initial) + lp_initial = lp_initial - _logsum(lp_initial) + + # UPDATE P_TRANSITION. p_transition[i][j] is the sum of all the + # transitions from i to j, normalized by the sum of the + # transitions out of i. + for i in range(N): + for j in range(N): + lp_transition[i][j] = _logsum(lp_arc[i, j, :]) - lp_arcout[i] + if lpseudo_transition is not None: + lp_transition[i] = _logvecadd(lp_transition[i], lpseudo_transition) + lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i]) + + # UPDATE P_EMISSION. lp_emission[i][k] is the sum of all the + # transitions out of i when k is observed, divided by the sum of + # the transitions out of i. + for i in range(N): + ksum = np.zeros(M) + LOG0 # ksum[k] is the sum of all i with k. + for t in range(T): + k = outputs[t] + for j in range(N): + ksum[k] = logaddexp(ksum[k], lp_arc[i, j, t]) + ksum = ksum - _logsum(ksum) # Normalize + if lpseudo_emission is not None: + ksum = _logvecadd(ksum, lpseudo_emission[i]) + ksum = ksum - _logsum(ksum) # Renormalize + lp_emission[i, :] = ksum + + # Calculate the log likelihood of the output based on the forward + # matrix. Since the parameters of the HMM has changed, the log + # likelihoods are going to be a step behind, and we might be doing + # one extra iteration of training. The alternative is to rerun + # the _forward algorithm and calculate from the clean one, but + # that may be more expensive than overshooting the training by one + # step. + return _logsum(fmat[:, T]) + + +def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs): + """Implement forward algorithm (PRIVATE). + + Calculate a Nx(T+1) matrix, where the last column is the total + probability of the output. + """ + matrix = np.zeros((N, T + 1)) + + # Initialize the first column to be the initial values. + matrix[:, 0] = lp_initial + for t in range(1, T + 1): + k = outputs[t - 1] + for j in range(N): + # The probability of the state is the sum of the + # transitions from all the states from time t-1. + lprob = LOG0 + for i in range(N): + lp = matrix[i][t - 1] + lp_transition[i][j] + lp_emission[i][k] + lprob = logaddexp(lprob, lp) + matrix[j][t] = lprob + return matrix + + +def _backward(N, T, lp_transition, lp_emission, outputs): + """Implement backward algorithm (PRIVATE).""" + matrix = np.zeros((N, T + 1)) + for t in range(T - 1, -1, -1): + k = outputs[t] + for i in range(N): + # The probability of the state is the sum of the + # transitions from all the states from time t+1. + lprob = LOG0 + for j in range(N): + lp = matrix[j][t + 1] + lp_transition[i][j] + lp_emission[i][k] + lprob = logaddexp(lprob, lp) + matrix[i][t] = lprob + return matrix + + +def train_visible( + states, + alphabet, + training_data, + pseudo_initial=None, + pseudo_transition=None, + pseudo_emission=None, +): + """Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters. + + Train a visible MarkovModel using maximum likelihoood estimates + for each of the parameters. states is a list of strings that + describe the names of each state. alphabet is a list of objects + that indicate the allowed outputs. training_data is a list of + (outputs, observed states) where outputs is a list of the emission + from the alphabet, and observed states is a list of states from + states. + + pseudo_initial, pseudo_transition, and pseudo_emission are + optional parameters that you can use to assign pseudo-counts to + different matrices. They should be matrices of the appropriate + size that contain numbers to add to each parameter matrix. + """ + N, M = len(states), len(alphabet) + if pseudo_initial is not None: + pseudo_initial = np.asarray(pseudo_initial) + if pseudo_initial.shape != (N,): + raise ValueError("pseudo_initial not shape len(states)") + if pseudo_transition is not None: + pseudo_transition = np.asarray(pseudo_transition) + if pseudo_transition.shape != (N, N): + raise ValueError("pseudo_transition not shape len(states) X len(states)") + if pseudo_emission is not None: + pseudo_emission = np.asarray(pseudo_emission) + if pseudo_emission.shape != (N, M): + raise ValueError("pseudo_emission not shape len(states) X len(alphabet)") + + # Training data is given as a list of members of the alphabet. + # Replace those with indexes into the alphabet list for easier + # computation. + training_states, training_outputs = [], [] + states_indexes = itemindex(states) + outputs_indexes = itemindex(alphabet) + for toutputs, tstates in training_data: + if len(tstates) != len(toutputs): + raise ValueError("states and outputs not aligned") + training_states.append([states_indexes[x] for x in tstates]) + training_outputs.append([outputs_indexes[x] for x in toutputs]) + + x = _mle( + N, + M, + training_outputs, + training_states, + pseudo_initial, + pseudo_transition, + pseudo_emission, + ) + p_initial, p_transition, p_emission = x + + return MarkovModel(states, alphabet, p_initial, p_transition, p_emission) + + +def _mle( + N, + M, + training_outputs, + training_states, + pseudo_initial, + pseudo_transition, + pseudo_emission, +): + """Implement Maximum likelihood estimation algorithm (PRIVATE).""" + # p_initial is the probability that a sequence of states starts + # off with a particular one. + p_initial = np.zeros(N) + if pseudo_initial: + p_initial = p_initial + pseudo_initial + for states in training_states: + p_initial[states[0]] += 1 + p_initial = _normalize(p_initial) + + # p_transition is the probability that a state leads to the next + # one. C(i,j)/C(i) where i and j are states. + p_transition = np.zeros((N, N)) + if pseudo_transition: + p_transition = p_transition + pseudo_transition + for states in training_states: + for n in range(len(states) - 1): + i, j = states[n], states[n + 1] + p_transition[i, j] += 1 + for i in range(len(p_transition)): + p_transition[i, :] = p_transition[i, :] / sum(p_transition[i, :]) + + # p_emission is the probability of an output given a state. + # C(s,o)|C(s) where o is an output and s is a state. + p_emission = np.zeros((N, M)) + if pseudo_emission: + p_emission = p_emission + pseudo_emission + p_emission = np.ones((N, M)) + for outputs, states in zip(training_outputs, training_states): + for o, s in zip(outputs, states): + p_emission[s, o] += 1 + for i in range(len(p_emission)): + p_emission[i, :] = p_emission[i, :] / sum(p_emission[i, :]) + + return p_initial, p_transition, p_emission + + +def _argmaxes(vector, allowance=None): + """Return indices of the maximum values aong the vector (PRIVATE).""" + return [np.argmax(vector)] + + +def find_states(markov_model, output): + """Find states in the given Markov model output. + + Returns a list of (states, score) tuples. + """ + mm = markov_model + N = len(mm.states) + + # _viterbi does calculations in log space. Add a tiny bit to the + # matrices so that the logs will not break. + lp_initial = np.log(mm.p_initial + VERY_SMALL_NUMBER) + lp_transition = np.log(mm.p_transition + VERY_SMALL_NUMBER) + lp_emission = np.log(mm.p_emission + VERY_SMALL_NUMBER) + # Change output into a list of indexes into the alphabet. + indexes = itemindex(mm.alphabet) + output = [indexes[x] for x in output] + + # Run the viterbi algorithm. + results = _viterbi(N, lp_initial, lp_transition, lp_emission, output) + + for i in range(len(results)): + states, score = results[i] + results[i] = [mm.states[x] for x in states], np.exp(score) + return results + + +def _viterbi(N, lp_initial, lp_transition, lp_emission, output): + """Implement Viterbi algorithm to find most likely states for a given input (PRIVATE).""" + T = len(output) + # Store the backtrace in a NxT matrix. + backtrace = [] # list of indexes of states in previous timestep. + for i in range(N): + backtrace.append([None] * T) + + # Store the best scores. + scores = np.zeros((N, T)) + scores[:, 0] = lp_initial + lp_emission[:, output[0]] + for t in range(1, T): + k = output[t] + for j in range(N): + # Find the most likely place it came from. + i_scores = scores[:, t - 1] + lp_transition[:, j] + lp_emission[j, k] + indexes = _argmaxes(i_scores) + scores[j, t] = i_scores[indexes[0]] + backtrace[j][t] = indexes + + # Do the backtrace. First, find a good place to start. Then, + # we'll follow the backtrace matrix to find the list of states. + # In the event of ties, there may be multiple paths back through + # the matrix, which implies a recursive solution. We'll simulate + # it by keeping our own stack. + in_process = [] # list of (t, states, score) + results = [] # return values. list of (states, score) + indexes = _argmaxes(scores[:, T - 1]) # pick the first place + for i in indexes: + in_process.append((T - 1, [i], scores[i][T - 1])) + while in_process: + t, states, score = in_process.pop() + if t == 0: + results.append((states, score)) + else: + indexes = backtrace[states[0]][t] + for i in indexes: + in_process.append((t - 1, [i] + states, score)) + return results + + +def _normalize(matrix): + """Normalize matrix object (PRIVATE).""" + if len(matrix.shape) == 1: + matrix = matrix / sum(matrix) + elif len(matrix.shape) == 2: + # Normalize by rows. + for i in range(len(matrix)): + matrix[i, :] = matrix[i, :] / sum(matrix[i, :]) + else: + raise ValueError("I cannot handle matrixes of that shape") + return matrix + + +def _uniform_norm(shape): + """Normalize a uniform matrix (PRIVATE).""" + matrix = np.ones(shape) + return _normalize(matrix) + + +def _random_norm(shape): + """Normalize a random matrix (PRIVATE).""" + matrix = np.random.random(shape) + return _normalize(matrix) + + +def _copy_and_check(matrix, desired_shape): + """Copy a matrix and check its dimension. Normalize at the end (PRIVATE).""" + # Copy the matrix. + matrix = np.array(matrix, copy=1) + # Check the dimensions. + if matrix.shape != desired_shape: + raise ValueError("Incorrect dimension") + # Make sure it's normalized. + if len(matrix.shape) == 1: + if np.fabs(sum(matrix) - 1.0) > 0.01: + raise ValueError("matrix not normalized to 1.0") + elif len(matrix.shape) == 2: + for i in range(len(matrix)): + if np.fabs(sum(matrix[i]) - 1.0) > 0.01: + raise ValueError("matrix %d not normalized to 1.0" % i) + else: + raise ValueError("I don't handle matrices > 2 dimensions") + return matrix + + +def _logsum(matrix): + """Implement logsum for a matrix object (PRIVATE).""" + if len(matrix.shape) > 1: + vec = np.reshape(matrix, (np.prod(matrix.shape),)) + else: + vec = matrix + sum = LOG0 + for num in vec: + sum = logaddexp(sum, num) + return sum + + +def _logvecadd(logvec1, logvec2): + """Implement a log sum for two vector objects (PRIVATE).""" + assert len(logvec1) == len(logvec2), "vectors aren't the same length" + sumvec = np.zeros(len(logvec1)) + for i in range(len(logvec1)): + sumvec[i] = logaddexp(logvec1[i], logvec2[i]) + return sumvec + + +def _exp_logsum(numbers): + """Return the exponential of a logsum (PRIVATE).""" + sum = _logsum(numbers) + return np.exp(sum)