annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/Bio/MarkovModel.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 # Copyright 2002 by Jeffrey Chang.
jpayne@69 2 # All rights reserved.
jpayne@69 3 #
jpayne@69 4 # This file is part of the Biopython distribution and governed by your
jpayne@69 5 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
jpayne@69 6 # Please see the LICENSE file that should have been included as part of this
jpayne@69 7 # package.
jpayne@69 8 """A state-emitting MarkovModel.
jpayne@69 9
jpayne@69 10 Note terminology similar to Manning and Schutze is used.
jpayne@69 11
jpayne@69 12
jpayne@69 13 Functions:
jpayne@69 14 train_bw Train a markov model using the Baum-Welch algorithm.
jpayne@69 15 train_visible Train a visible markov model using MLE.
jpayne@69 16 find_states Find the a state sequence that explains some observations.
jpayne@69 17
jpayne@69 18 load Load a MarkovModel.
jpayne@69 19 save Save a MarkovModel.
jpayne@69 20
jpayne@69 21 Classes:
jpayne@69 22 MarkovModel Holds the description of a markov model
jpayne@69 23 """
jpayne@69 24
jpayne@69 25 import warnings
jpayne@69 26 from Bio import BiopythonDeprecationWarning
jpayne@69 27
jpayne@69 28 warnings.warn(
jpayne@69 29 "The 'Bio.MarkovModel' module is deprecated and will be removed in a "
jpayne@69 30 "future release of Biopython. Consider using the hmmlearn package instead.",
jpayne@69 31 BiopythonDeprecationWarning,
jpayne@69 32 )
jpayne@69 33
jpayne@69 34
jpayne@69 35 try:
jpayne@69 36 import numpy as np
jpayne@69 37 except ImportError:
jpayne@69 38 from Bio import MissingPythonDependencyError
jpayne@69 39
jpayne@69 40 raise MissingPythonDependencyError(
jpayne@69 41 "Please install NumPy if you want to use Bio.MarkovModel. "
jpayne@69 42 "See http://www.numpy.org/"
jpayne@69 43 ) from None
jpayne@69 44
jpayne@69 45 logaddexp = np.logaddexp
jpayne@69 46
jpayne@69 47
jpayne@69 48 def itemindex(values):
jpayne@69 49 """Return a dictionary of values with their sequence offset as keys."""
jpayne@69 50 d = {}
jpayne@69 51 entries = enumerate(values[::-1])
jpayne@69 52 n = len(values) - 1
jpayne@69 53 for index, key in entries:
jpayne@69 54 d[key] = n - index
jpayne@69 55 return d
jpayne@69 56
jpayne@69 57
jpayne@69 58 np.random.seed()
jpayne@69 59
jpayne@69 60 VERY_SMALL_NUMBER = 1e-300
jpayne@69 61 LOG0 = np.log(VERY_SMALL_NUMBER)
jpayne@69 62
jpayne@69 63
jpayne@69 64 class MarkovModel:
jpayne@69 65 """Create a state-emitting MarkovModel object."""
jpayne@69 66
jpayne@69 67 def __init__(
jpayne@69 68 self, states, alphabet, p_initial=None, p_transition=None, p_emission=None
jpayne@69 69 ):
jpayne@69 70 """Initialize the class."""
jpayne@69 71 self.states = states
jpayne@69 72 self.alphabet = alphabet
jpayne@69 73 self.p_initial = p_initial
jpayne@69 74 self.p_transition = p_transition
jpayne@69 75 self.p_emission = p_emission
jpayne@69 76
jpayne@69 77 def __str__(self):
jpayne@69 78 """Create a string representation of the MarkovModel object."""
jpayne@69 79 from io import StringIO
jpayne@69 80
jpayne@69 81 handle = StringIO()
jpayne@69 82 save(self, handle)
jpayne@69 83 handle.seek(0)
jpayne@69 84 return handle.read()
jpayne@69 85
jpayne@69 86
jpayne@69 87 def _readline_and_check_start(handle, start):
jpayne@69 88 """Read the first line and evaluate that begisn with the correct start (PRIVATE)."""
jpayne@69 89 line = handle.readline()
jpayne@69 90 if not line.startswith(start):
jpayne@69 91 raise ValueError(f"I expected {start!r} but got {line!r}")
jpayne@69 92 return line
jpayne@69 93
jpayne@69 94
jpayne@69 95 def load(handle):
jpayne@69 96 """Parse a file handle into a MarkovModel object."""
jpayne@69 97 # Load the states.
jpayne@69 98 line = _readline_and_check_start(handle, "STATES:")
jpayne@69 99 states = line.split()[1:]
jpayne@69 100
jpayne@69 101 # Load the alphabet.
jpayne@69 102 line = _readline_and_check_start(handle, "ALPHABET:")
jpayne@69 103 alphabet = line.split()[1:]
jpayne@69 104
jpayne@69 105 mm = MarkovModel(states, alphabet)
jpayne@69 106 N, M = len(states), len(alphabet)
jpayne@69 107
jpayne@69 108 # Load the initial probabilities.
jpayne@69 109 mm.p_initial = np.zeros(N)
jpayne@69 110 line = _readline_and_check_start(handle, "INITIAL:")
jpayne@69 111 for i in range(len(states)):
jpayne@69 112 line = _readline_and_check_start(handle, f" {states[i]}:")
jpayne@69 113 mm.p_initial[i] = float(line.split()[-1])
jpayne@69 114
jpayne@69 115 # Load the transition.
jpayne@69 116 mm.p_transition = np.zeros((N, N))
jpayne@69 117 line = _readline_and_check_start(handle, "TRANSITION:")
jpayne@69 118 for i in range(len(states)):
jpayne@69 119 line = _readline_and_check_start(handle, f" {states[i]}:")
jpayne@69 120 mm.p_transition[i, :] = [float(v) for v in line.split()[1:]]
jpayne@69 121
jpayne@69 122 # Load the emission.
jpayne@69 123 mm.p_emission = np.zeros((N, M))
jpayne@69 124 line = _readline_and_check_start(handle, "EMISSION:")
jpayne@69 125 for i in range(len(states)):
jpayne@69 126 line = _readline_and_check_start(handle, f" {states[i]}:")
jpayne@69 127 mm.p_emission[i, :] = [float(v) for v in line.split()[1:]]
jpayne@69 128
jpayne@69 129 return mm
jpayne@69 130
jpayne@69 131
jpayne@69 132 def save(mm, handle):
jpayne@69 133 """Save MarkovModel object into handle."""
jpayne@69 134 # This will fail if there are spaces in the states or alphabet.
jpayne@69 135 w = handle.write
jpayne@69 136 w(f"STATES: {' '.join(mm.states)}\n")
jpayne@69 137 w(f"ALPHABET: {' '.join(mm.alphabet)}\n")
jpayne@69 138 w("INITIAL:\n")
jpayne@69 139 for i in range(len(mm.p_initial)):
jpayne@69 140 w(f" {mm.states[i]}: {mm.p_initial[i]:g}\n")
jpayne@69 141 w("TRANSITION:\n")
jpayne@69 142 for i in range(len(mm.p_transition)):
jpayne@69 143 w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_transition[i])}\n")
jpayne@69 144 w("EMISSION:\n")
jpayne@69 145 for i in range(len(mm.p_emission)):
jpayne@69 146 w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_emission[i])}\n")
jpayne@69 147
jpayne@69 148
jpayne@69 149 # XXX allow them to specify starting points
jpayne@69 150 def train_bw(
jpayne@69 151 states,
jpayne@69 152 alphabet,
jpayne@69 153 training_data,
jpayne@69 154 pseudo_initial=None,
jpayne@69 155 pseudo_transition=None,
jpayne@69 156 pseudo_emission=None,
jpayne@69 157 update_fn=None,
jpayne@69 158 ):
jpayne@69 159 """Train a MarkovModel using the Baum-Welch algorithm.
jpayne@69 160
jpayne@69 161 Train a MarkovModel using the Baum-Welch algorithm. states is a list
jpayne@69 162 of strings that describe the names of each state. alphabet is a
jpayne@69 163 list of objects that indicate the allowed outputs. training_data
jpayne@69 164 is a list of observations. Each observation is a list of objects
jpayne@69 165 from the alphabet.
jpayne@69 166
jpayne@69 167 pseudo_initial, pseudo_transition, and pseudo_emission are
jpayne@69 168 optional parameters that you can use to assign pseudo-counts to
jpayne@69 169 different matrices. They should be matrices of the appropriate
jpayne@69 170 size that contain numbers to add to each parameter matrix, before
jpayne@69 171 normalization.
jpayne@69 172
jpayne@69 173 update_fn is an optional callback that takes parameters
jpayne@69 174 (iteration, log_likelihood). It is called once per iteration.
jpayne@69 175 """
jpayne@69 176 N, M = len(states), len(alphabet)
jpayne@69 177 if not training_data:
jpayne@69 178 raise ValueError("No training data given.")
jpayne@69 179 if pseudo_initial is not None:
jpayne@69 180 pseudo_initial = np.asarray(pseudo_initial)
jpayne@69 181 if pseudo_initial.shape != (N,):
jpayne@69 182 raise ValueError("pseudo_initial not shape len(states)")
jpayne@69 183 if pseudo_transition is not None:
jpayne@69 184 pseudo_transition = np.asarray(pseudo_transition)
jpayne@69 185 if pseudo_transition.shape != (N, N):
jpayne@69 186 raise ValueError("pseudo_transition not shape len(states) X len(states)")
jpayne@69 187 if pseudo_emission is not None:
jpayne@69 188 pseudo_emission = np.asarray(pseudo_emission)
jpayne@69 189 if pseudo_emission.shape != (N, M):
jpayne@69 190 raise ValueError("pseudo_emission not shape len(states) X len(alphabet)")
jpayne@69 191
jpayne@69 192 # Training data is given as a list of members of the alphabet.
jpayne@69 193 # Replace those with indexes into the alphabet list for easier
jpayne@69 194 # computation.
jpayne@69 195 training_outputs = []
jpayne@69 196 indexes = itemindex(alphabet)
jpayne@69 197 for outputs in training_data:
jpayne@69 198 training_outputs.append([indexes[x] for x in outputs])
jpayne@69 199
jpayne@69 200 # Do some sanity checking on the outputs.
jpayne@69 201 lengths = [len(x) for x in training_outputs]
jpayne@69 202 if min(lengths) == 0:
jpayne@69 203 raise ValueError("I got training data with outputs of length 0")
jpayne@69 204
jpayne@69 205 # Do the training with baum welch.
jpayne@69 206 x = _baum_welch(
jpayne@69 207 N,
jpayne@69 208 M,
jpayne@69 209 training_outputs,
jpayne@69 210 pseudo_initial=pseudo_initial,
jpayne@69 211 pseudo_transition=pseudo_transition,
jpayne@69 212 pseudo_emission=pseudo_emission,
jpayne@69 213 update_fn=update_fn,
jpayne@69 214 )
jpayne@69 215 p_initial, p_transition, p_emission = x
jpayne@69 216 return MarkovModel(states, alphabet, p_initial, p_transition, p_emission)
jpayne@69 217
jpayne@69 218
jpayne@69 219 MAX_ITERATIONS = 1000
jpayne@69 220
jpayne@69 221
jpayne@69 222 def _baum_welch(
jpayne@69 223 N,
jpayne@69 224 M,
jpayne@69 225 training_outputs,
jpayne@69 226 p_initial=None,
jpayne@69 227 p_transition=None,
jpayne@69 228 p_emission=None,
jpayne@69 229 pseudo_initial=None,
jpayne@69 230 pseudo_transition=None,
jpayne@69 231 pseudo_emission=None,
jpayne@69 232 update_fn=None,
jpayne@69 233 ):
jpayne@69 234 """Implement the Baum-Welch algorithm to evaluate unknown parameters in the MarkovModel object (PRIVATE)."""
jpayne@69 235 if p_initial is None:
jpayne@69 236 p_initial = _random_norm(N)
jpayne@69 237 else:
jpayne@69 238 p_initial = _copy_and_check(p_initial, (N,))
jpayne@69 239
jpayne@69 240 if p_transition is None:
jpayne@69 241 p_transition = _random_norm((N, N))
jpayne@69 242 else:
jpayne@69 243 p_transition = _copy_and_check(p_transition, (N, N))
jpayne@69 244 if p_emission is None:
jpayne@69 245 p_emission = _random_norm((N, M))
jpayne@69 246 else:
jpayne@69 247 p_emission = _copy_and_check(p_emission, (N, M))
jpayne@69 248
jpayne@69 249 # Do all the calculations in log space to avoid underflows.
jpayne@69 250 lp_initial = np.log(p_initial)
jpayne@69 251 lp_transition = np.log(p_transition)
jpayne@69 252 lp_emission = np.log(p_emission)
jpayne@69 253 if pseudo_initial is not None:
jpayne@69 254 lpseudo_initial = np.log(pseudo_initial)
jpayne@69 255 else:
jpayne@69 256 lpseudo_initial = None
jpayne@69 257 if pseudo_transition is not None:
jpayne@69 258 lpseudo_transition = np.log(pseudo_transition)
jpayne@69 259 else:
jpayne@69 260 lpseudo_transition = None
jpayne@69 261 if pseudo_emission is not None:
jpayne@69 262 lpseudo_emission = np.log(pseudo_emission)
jpayne@69 263 else:
jpayne@69 264 lpseudo_emission = None
jpayne@69 265
jpayne@69 266 # Iterate through each sequence of output, updating the parameters
jpayne@69 267 # to the HMM. Stop when the log likelihoods of the sequences
jpayne@69 268 # stops varying.
jpayne@69 269 prev_llik = None
jpayne@69 270 for i in range(MAX_ITERATIONS):
jpayne@69 271 llik = LOG0
jpayne@69 272 for outputs in training_outputs:
jpayne@69 273 llik += _baum_welch_one(
jpayne@69 274 N,
jpayne@69 275 M,
jpayne@69 276 outputs,
jpayne@69 277 lp_initial,
jpayne@69 278 lp_transition,
jpayne@69 279 lp_emission,
jpayne@69 280 lpseudo_initial,
jpayne@69 281 lpseudo_transition,
jpayne@69 282 lpseudo_emission,
jpayne@69 283 )
jpayne@69 284 if update_fn is not None:
jpayne@69 285 update_fn(i, llik)
jpayne@69 286 if prev_llik is not None and np.fabs(prev_llik - llik) < 0.1:
jpayne@69 287 break
jpayne@69 288 prev_llik = llik
jpayne@69 289 else:
jpayne@69 290 raise RuntimeError("HMM did not converge in %d iterations" % MAX_ITERATIONS)
jpayne@69 291
jpayne@69 292 # Return everything back in normal space.
jpayne@69 293 return [np.exp(_) for _ in (lp_initial, lp_transition, lp_emission)]
jpayne@69 294
jpayne@69 295
jpayne@69 296 def _baum_welch_one(
jpayne@69 297 N,
jpayne@69 298 M,
jpayne@69 299 outputs,
jpayne@69 300 lp_initial,
jpayne@69 301 lp_transition,
jpayne@69 302 lp_emission,
jpayne@69 303 lpseudo_initial,
jpayne@69 304 lpseudo_transition,
jpayne@69 305 lpseudo_emission,
jpayne@69 306 ):
jpayne@69 307 """Execute one step for Baum-Welch algorithm (PRIVATE).
jpayne@69 308
jpayne@69 309 Do one iteration of Baum-Welch based on a sequence of output.
jpayne@69 310 Changes the value for lp_initial, lp_transition and lp_emission in place.
jpayne@69 311 """
jpayne@69 312 T = len(outputs)
jpayne@69 313 fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs)
jpayne@69 314 bmat = _backward(N, T, lp_transition, lp_emission, outputs)
jpayne@69 315
jpayne@69 316 # Calculate the probability of traversing each arc for any given
jpayne@69 317 # transition.
jpayne@69 318 lp_arc = np.zeros((N, N, T))
jpayne@69 319 for t in range(T):
jpayne@69 320 k = outputs[t]
jpayne@69 321 lp_traverse = np.zeros((N, N)) # P going over one arc.
jpayne@69 322 for i in range(N):
jpayne@69 323 for j in range(N):
jpayne@69 324 # P(getting to this arc)
jpayne@69 325 # P(making this transition)
jpayne@69 326 # P(emitting this character)
jpayne@69 327 # P(going to the end)
jpayne@69 328 lp = (
jpayne@69 329 fmat[i][t]
jpayne@69 330 + lp_transition[i][j]
jpayne@69 331 + lp_emission[i][k]
jpayne@69 332 + bmat[j][t + 1]
jpayne@69 333 )
jpayne@69 334 lp_traverse[i][j] = lp
jpayne@69 335 # Normalize the probability for this time step.
jpayne@69 336 lp_arc[:, :, t] = lp_traverse - _logsum(lp_traverse)
jpayne@69 337
jpayne@69 338 # Sum of all the transitions out of state i at time t.
jpayne@69 339 lp_arcout_t = np.zeros((N, T))
jpayne@69 340 for t in range(T):
jpayne@69 341 for i in range(N):
jpayne@69 342 lp_arcout_t[i][t] = _logsum(lp_arc[i, :, t])
jpayne@69 343
jpayne@69 344 # Sum of all the transitions out of state i.
jpayne@69 345 lp_arcout = np.zeros(N)
jpayne@69 346 for i in range(N):
jpayne@69 347 lp_arcout[i] = _logsum(lp_arcout_t[i, :])
jpayne@69 348
jpayne@69 349 # UPDATE P_INITIAL.
jpayne@69 350 lp_initial = lp_arcout_t[:, 0]
jpayne@69 351 if lpseudo_initial is not None:
jpayne@69 352 lp_initial = _logvecadd(lp_initial, lpseudo_initial)
jpayne@69 353 lp_initial = lp_initial - _logsum(lp_initial)
jpayne@69 354
jpayne@69 355 # UPDATE P_TRANSITION. p_transition[i][j] is the sum of all the
jpayne@69 356 # transitions from i to j, normalized by the sum of the
jpayne@69 357 # transitions out of i.
jpayne@69 358 for i in range(N):
jpayne@69 359 for j in range(N):
jpayne@69 360 lp_transition[i][j] = _logsum(lp_arc[i, j, :]) - lp_arcout[i]
jpayne@69 361 if lpseudo_transition is not None:
jpayne@69 362 lp_transition[i] = _logvecadd(lp_transition[i], lpseudo_transition)
jpayne@69 363 lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i])
jpayne@69 364
jpayne@69 365 # UPDATE P_EMISSION. lp_emission[i][k] is the sum of all the
jpayne@69 366 # transitions out of i when k is observed, divided by the sum of
jpayne@69 367 # the transitions out of i.
jpayne@69 368 for i in range(N):
jpayne@69 369 ksum = np.zeros(M) + LOG0 # ksum[k] is the sum of all i with k.
jpayne@69 370 for t in range(T):
jpayne@69 371 k = outputs[t]
jpayne@69 372 for j in range(N):
jpayne@69 373 ksum[k] = logaddexp(ksum[k], lp_arc[i, j, t])
jpayne@69 374 ksum = ksum - _logsum(ksum) # Normalize
jpayne@69 375 if lpseudo_emission is not None:
jpayne@69 376 ksum = _logvecadd(ksum, lpseudo_emission[i])
jpayne@69 377 ksum = ksum - _logsum(ksum) # Renormalize
jpayne@69 378 lp_emission[i, :] = ksum
jpayne@69 379
jpayne@69 380 # Calculate the log likelihood of the output based on the forward
jpayne@69 381 # matrix. Since the parameters of the HMM has changed, the log
jpayne@69 382 # likelihoods are going to be a step behind, and we might be doing
jpayne@69 383 # one extra iteration of training. The alternative is to rerun
jpayne@69 384 # the _forward algorithm and calculate from the clean one, but
jpayne@69 385 # that may be more expensive than overshooting the training by one
jpayne@69 386 # step.
jpayne@69 387 return _logsum(fmat[:, T])
jpayne@69 388
jpayne@69 389
jpayne@69 390 def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs):
jpayne@69 391 """Implement forward algorithm (PRIVATE).
jpayne@69 392
jpayne@69 393 Calculate a Nx(T+1) matrix, where the last column is the total
jpayne@69 394 probability of the output.
jpayne@69 395 """
jpayne@69 396 matrix = np.zeros((N, T + 1))
jpayne@69 397
jpayne@69 398 # Initialize the first column to be the initial values.
jpayne@69 399 matrix[:, 0] = lp_initial
jpayne@69 400 for t in range(1, T + 1):
jpayne@69 401 k = outputs[t - 1]
jpayne@69 402 for j in range(N):
jpayne@69 403 # The probability of the state is the sum of the
jpayne@69 404 # transitions from all the states from time t-1.
jpayne@69 405 lprob = LOG0
jpayne@69 406 for i in range(N):
jpayne@69 407 lp = matrix[i][t - 1] + lp_transition[i][j] + lp_emission[i][k]
jpayne@69 408 lprob = logaddexp(lprob, lp)
jpayne@69 409 matrix[j][t] = lprob
jpayne@69 410 return matrix
jpayne@69 411
jpayne@69 412
jpayne@69 413 def _backward(N, T, lp_transition, lp_emission, outputs):
jpayne@69 414 """Implement backward algorithm (PRIVATE)."""
jpayne@69 415 matrix = np.zeros((N, T + 1))
jpayne@69 416 for t in range(T - 1, -1, -1):
jpayne@69 417 k = outputs[t]
jpayne@69 418 for i in range(N):
jpayne@69 419 # The probability of the state is the sum of the
jpayne@69 420 # transitions from all the states from time t+1.
jpayne@69 421 lprob = LOG0
jpayne@69 422 for j in range(N):
jpayne@69 423 lp = matrix[j][t + 1] + lp_transition[i][j] + lp_emission[i][k]
jpayne@69 424 lprob = logaddexp(lprob, lp)
jpayne@69 425 matrix[i][t] = lprob
jpayne@69 426 return matrix
jpayne@69 427
jpayne@69 428
jpayne@69 429 def train_visible(
jpayne@69 430 states,
jpayne@69 431 alphabet,
jpayne@69 432 training_data,
jpayne@69 433 pseudo_initial=None,
jpayne@69 434 pseudo_transition=None,
jpayne@69 435 pseudo_emission=None,
jpayne@69 436 ):
jpayne@69 437 """Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters.
jpayne@69 438
jpayne@69 439 Train a visible MarkovModel using maximum likelihoood estimates
jpayne@69 440 for each of the parameters. states is a list of strings that
jpayne@69 441 describe the names of each state. alphabet is a list of objects
jpayne@69 442 that indicate the allowed outputs. training_data is a list of
jpayne@69 443 (outputs, observed states) where outputs is a list of the emission
jpayne@69 444 from the alphabet, and observed states is a list of states from
jpayne@69 445 states.
jpayne@69 446
jpayne@69 447 pseudo_initial, pseudo_transition, and pseudo_emission are
jpayne@69 448 optional parameters that you can use to assign pseudo-counts to
jpayne@69 449 different matrices. They should be matrices of the appropriate
jpayne@69 450 size that contain numbers to add to each parameter matrix.
jpayne@69 451 """
jpayne@69 452 N, M = len(states), len(alphabet)
jpayne@69 453 if pseudo_initial is not None:
jpayne@69 454 pseudo_initial = np.asarray(pseudo_initial)
jpayne@69 455 if pseudo_initial.shape != (N,):
jpayne@69 456 raise ValueError("pseudo_initial not shape len(states)")
jpayne@69 457 if pseudo_transition is not None:
jpayne@69 458 pseudo_transition = np.asarray(pseudo_transition)
jpayne@69 459 if pseudo_transition.shape != (N, N):
jpayne@69 460 raise ValueError("pseudo_transition not shape len(states) X len(states)")
jpayne@69 461 if pseudo_emission is not None:
jpayne@69 462 pseudo_emission = np.asarray(pseudo_emission)
jpayne@69 463 if pseudo_emission.shape != (N, M):
jpayne@69 464 raise ValueError("pseudo_emission not shape len(states) X len(alphabet)")
jpayne@69 465
jpayne@69 466 # Training data is given as a list of members of the alphabet.
jpayne@69 467 # Replace those with indexes into the alphabet list for easier
jpayne@69 468 # computation.
jpayne@69 469 training_states, training_outputs = [], []
jpayne@69 470 states_indexes = itemindex(states)
jpayne@69 471 outputs_indexes = itemindex(alphabet)
jpayne@69 472 for toutputs, tstates in training_data:
jpayne@69 473 if len(tstates) != len(toutputs):
jpayne@69 474 raise ValueError("states and outputs not aligned")
jpayne@69 475 training_states.append([states_indexes[x] for x in tstates])
jpayne@69 476 training_outputs.append([outputs_indexes[x] for x in toutputs])
jpayne@69 477
jpayne@69 478 x = _mle(
jpayne@69 479 N,
jpayne@69 480 M,
jpayne@69 481 training_outputs,
jpayne@69 482 training_states,
jpayne@69 483 pseudo_initial,
jpayne@69 484 pseudo_transition,
jpayne@69 485 pseudo_emission,
jpayne@69 486 )
jpayne@69 487 p_initial, p_transition, p_emission = x
jpayne@69 488
jpayne@69 489 return MarkovModel(states, alphabet, p_initial, p_transition, p_emission)
jpayne@69 490
jpayne@69 491
jpayne@69 492 def _mle(
jpayne@69 493 N,
jpayne@69 494 M,
jpayne@69 495 training_outputs,
jpayne@69 496 training_states,
jpayne@69 497 pseudo_initial,
jpayne@69 498 pseudo_transition,
jpayne@69 499 pseudo_emission,
jpayne@69 500 ):
jpayne@69 501 """Implement Maximum likelihood estimation algorithm (PRIVATE)."""
jpayne@69 502 # p_initial is the probability that a sequence of states starts
jpayne@69 503 # off with a particular one.
jpayne@69 504 p_initial = np.zeros(N)
jpayne@69 505 if pseudo_initial:
jpayne@69 506 p_initial = p_initial + pseudo_initial
jpayne@69 507 for states in training_states:
jpayne@69 508 p_initial[states[0]] += 1
jpayne@69 509 p_initial = _normalize(p_initial)
jpayne@69 510
jpayne@69 511 # p_transition is the probability that a state leads to the next
jpayne@69 512 # one. C(i,j)/C(i) where i and j are states.
jpayne@69 513 p_transition = np.zeros((N, N))
jpayne@69 514 if pseudo_transition:
jpayne@69 515 p_transition = p_transition + pseudo_transition
jpayne@69 516 for states in training_states:
jpayne@69 517 for n in range(len(states) - 1):
jpayne@69 518 i, j = states[n], states[n + 1]
jpayne@69 519 p_transition[i, j] += 1
jpayne@69 520 for i in range(len(p_transition)):
jpayne@69 521 p_transition[i, :] = p_transition[i, :] / sum(p_transition[i, :])
jpayne@69 522
jpayne@69 523 # p_emission is the probability of an output given a state.
jpayne@69 524 # C(s,o)|C(s) where o is an output and s is a state.
jpayne@69 525 p_emission = np.zeros((N, M))
jpayne@69 526 if pseudo_emission:
jpayne@69 527 p_emission = p_emission + pseudo_emission
jpayne@69 528 p_emission = np.ones((N, M))
jpayne@69 529 for outputs, states in zip(training_outputs, training_states):
jpayne@69 530 for o, s in zip(outputs, states):
jpayne@69 531 p_emission[s, o] += 1
jpayne@69 532 for i in range(len(p_emission)):
jpayne@69 533 p_emission[i, :] = p_emission[i, :] / sum(p_emission[i, :])
jpayne@69 534
jpayne@69 535 return p_initial, p_transition, p_emission
jpayne@69 536
jpayne@69 537
jpayne@69 538 def _argmaxes(vector, allowance=None):
jpayne@69 539 """Return indices of the maximum values aong the vector (PRIVATE)."""
jpayne@69 540 return [np.argmax(vector)]
jpayne@69 541
jpayne@69 542
jpayne@69 543 def find_states(markov_model, output):
jpayne@69 544 """Find states in the given Markov model output.
jpayne@69 545
jpayne@69 546 Returns a list of (states, score) tuples.
jpayne@69 547 """
jpayne@69 548 mm = markov_model
jpayne@69 549 N = len(mm.states)
jpayne@69 550
jpayne@69 551 # _viterbi does calculations in log space. Add a tiny bit to the
jpayne@69 552 # matrices so that the logs will not break.
jpayne@69 553 lp_initial = np.log(mm.p_initial + VERY_SMALL_NUMBER)
jpayne@69 554 lp_transition = np.log(mm.p_transition + VERY_SMALL_NUMBER)
jpayne@69 555 lp_emission = np.log(mm.p_emission + VERY_SMALL_NUMBER)
jpayne@69 556 # Change output into a list of indexes into the alphabet.
jpayne@69 557 indexes = itemindex(mm.alphabet)
jpayne@69 558 output = [indexes[x] for x in output]
jpayne@69 559
jpayne@69 560 # Run the viterbi algorithm.
jpayne@69 561 results = _viterbi(N, lp_initial, lp_transition, lp_emission, output)
jpayne@69 562
jpayne@69 563 for i in range(len(results)):
jpayne@69 564 states, score = results[i]
jpayne@69 565 results[i] = [mm.states[x] for x in states], np.exp(score)
jpayne@69 566 return results
jpayne@69 567
jpayne@69 568
jpayne@69 569 def _viterbi(N, lp_initial, lp_transition, lp_emission, output):
jpayne@69 570 """Implement Viterbi algorithm to find most likely states for a given input (PRIVATE)."""
jpayne@69 571 T = len(output)
jpayne@69 572 # Store the backtrace in a NxT matrix.
jpayne@69 573 backtrace = [] # list of indexes of states in previous timestep.
jpayne@69 574 for i in range(N):
jpayne@69 575 backtrace.append([None] * T)
jpayne@69 576
jpayne@69 577 # Store the best scores.
jpayne@69 578 scores = np.zeros((N, T))
jpayne@69 579 scores[:, 0] = lp_initial + lp_emission[:, output[0]]
jpayne@69 580 for t in range(1, T):
jpayne@69 581 k = output[t]
jpayne@69 582 for j in range(N):
jpayne@69 583 # Find the most likely place it came from.
jpayne@69 584 i_scores = scores[:, t - 1] + lp_transition[:, j] + lp_emission[j, k]
jpayne@69 585 indexes = _argmaxes(i_scores)
jpayne@69 586 scores[j, t] = i_scores[indexes[0]]
jpayne@69 587 backtrace[j][t] = indexes
jpayne@69 588
jpayne@69 589 # Do the backtrace. First, find a good place to start. Then,
jpayne@69 590 # we'll follow the backtrace matrix to find the list of states.
jpayne@69 591 # In the event of ties, there may be multiple paths back through
jpayne@69 592 # the matrix, which implies a recursive solution. We'll simulate
jpayne@69 593 # it by keeping our own stack.
jpayne@69 594 in_process = [] # list of (t, states, score)
jpayne@69 595 results = [] # return values. list of (states, score)
jpayne@69 596 indexes = _argmaxes(scores[:, T - 1]) # pick the first place
jpayne@69 597 for i in indexes:
jpayne@69 598 in_process.append((T - 1, [i], scores[i][T - 1]))
jpayne@69 599 while in_process:
jpayne@69 600 t, states, score = in_process.pop()
jpayne@69 601 if t == 0:
jpayne@69 602 results.append((states, score))
jpayne@69 603 else:
jpayne@69 604 indexes = backtrace[states[0]][t]
jpayne@69 605 for i in indexes:
jpayne@69 606 in_process.append((t - 1, [i] + states, score))
jpayne@69 607 return results
jpayne@69 608
jpayne@69 609
jpayne@69 610 def _normalize(matrix):
jpayne@69 611 """Normalize matrix object (PRIVATE)."""
jpayne@69 612 if len(matrix.shape) == 1:
jpayne@69 613 matrix = matrix / sum(matrix)
jpayne@69 614 elif len(matrix.shape) == 2:
jpayne@69 615 # Normalize by rows.
jpayne@69 616 for i in range(len(matrix)):
jpayne@69 617 matrix[i, :] = matrix[i, :] / sum(matrix[i, :])
jpayne@69 618 else:
jpayne@69 619 raise ValueError("I cannot handle matrixes of that shape")
jpayne@69 620 return matrix
jpayne@69 621
jpayne@69 622
jpayne@69 623 def _uniform_norm(shape):
jpayne@69 624 """Normalize a uniform matrix (PRIVATE)."""
jpayne@69 625 matrix = np.ones(shape)
jpayne@69 626 return _normalize(matrix)
jpayne@69 627
jpayne@69 628
jpayne@69 629 def _random_norm(shape):
jpayne@69 630 """Normalize a random matrix (PRIVATE)."""
jpayne@69 631 matrix = np.random.random(shape)
jpayne@69 632 return _normalize(matrix)
jpayne@69 633
jpayne@69 634
jpayne@69 635 def _copy_and_check(matrix, desired_shape):
jpayne@69 636 """Copy a matrix and check its dimension. Normalize at the end (PRIVATE)."""
jpayne@69 637 # Copy the matrix.
jpayne@69 638 matrix = np.array(matrix, copy=1)
jpayne@69 639 # Check the dimensions.
jpayne@69 640 if matrix.shape != desired_shape:
jpayne@69 641 raise ValueError("Incorrect dimension")
jpayne@69 642 # Make sure it's normalized.
jpayne@69 643 if len(matrix.shape) == 1:
jpayne@69 644 if np.fabs(sum(matrix) - 1.0) > 0.01:
jpayne@69 645 raise ValueError("matrix not normalized to 1.0")
jpayne@69 646 elif len(matrix.shape) == 2:
jpayne@69 647 for i in range(len(matrix)):
jpayne@69 648 if np.fabs(sum(matrix[i]) - 1.0) > 0.01:
jpayne@69 649 raise ValueError("matrix %d not normalized to 1.0" % i)
jpayne@69 650 else:
jpayne@69 651 raise ValueError("I don't handle matrices > 2 dimensions")
jpayne@69 652 return matrix
jpayne@69 653
jpayne@69 654
jpayne@69 655 def _logsum(matrix):
jpayne@69 656 """Implement logsum for a matrix object (PRIVATE)."""
jpayne@69 657 if len(matrix.shape) > 1:
jpayne@69 658 vec = np.reshape(matrix, (np.prod(matrix.shape),))
jpayne@69 659 else:
jpayne@69 660 vec = matrix
jpayne@69 661 sum = LOG0
jpayne@69 662 for num in vec:
jpayne@69 663 sum = logaddexp(sum, num)
jpayne@69 664 return sum
jpayne@69 665
jpayne@69 666
jpayne@69 667 def _logvecadd(logvec1, logvec2):
jpayne@69 668 """Implement a log sum for two vector objects (PRIVATE)."""
jpayne@69 669 assert len(logvec1) == len(logvec2), "vectors aren't the same length"
jpayne@69 670 sumvec = np.zeros(len(logvec1))
jpayne@69 671 for i in range(len(logvec1)):
jpayne@69 672 sumvec[i] = logaddexp(logvec1[i], logvec2[i])
jpayne@69 673 return sumvec
jpayne@69 674
jpayne@69 675
jpayne@69 676 def _exp_logsum(numbers):
jpayne@69 677 """Return the exponential of a logsum (PRIVATE)."""
jpayne@69 678 sum = _logsum(numbers)
jpayne@69 679 return np.exp(sum)