jpayne@69
|
1 # Copyright 2002 by Jeffrey Chang.
|
jpayne@69
|
2 # All rights reserved.
|
jpayne@69
|
3 #
|
jpayne@69
|
4 # This file is part of the Biopython distribution and governed by your
|
jpayne@69
|
5 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
|
jpayne@69
|
6 # Please see the LICENSE file that should have been included as part of this
|
jpayne@69
|
7 # package.
|
jpayne@69
|
8 """A state-emitting MarkovModel.
|
jpayne@69
|
9
|
jpayne@69
|
10 Note terminology similar to Manning and Schutze is used.
|
jpayne@69
|
11
|
jpayne@69
|
12
|
jpayne@69
|
13 Functions:
|
jpayne@69
|
14 train_bw Train a markov model using the Baum-Welch algorithm.
|
jpayne@69
|
15 train_visible Train a visible markov model using MLE.
|
jpayne@69
|
16 find_states Find the a state sequence that explains some observations.
|
jpayne@69
|
17
|
jpayne@69
|
18 load Load a MarkovModel.
|
jpayne@69
|
19 save Save a MarkovModel.
|
jpayne@69
|
20
|
jpayne@69
|
21 Classes:
|
jpayne@69
|
22 MarkovModel Holds the description of a markov model
|
jpayne@69
|
23 """
|
jpayne@69
|
24
|
jpayne@69
|
25 import warnings
|
jpayne@69
|
26 from Bio import BiopythonDeprecationWarning
|
jpayne@69
|
27
|
jpayne@69
|
28 warnings.warn(
|
jpayne@69
|
29 "The 'Bio.MarkovModel' module is deprecated and will be removed in a "
|
jpayne@69
|
30 "future release of Biopython. Consider using the hmmlearn package instead.",
|
jpayne@69
|
31 BiopythonDeprecationWarning,
|
jpayne@69
|
32 )
|
jpayne@69
|
33
|
jpayne@69
|
34
|
jpayne@69
|
35 try:
|
jpayne@69
|
36 import numpy as np
|
jpayne@69
|
37 except ImportError:
|
jpayne@69
|
38 from Bio import MissingPythonDependencyError
|
jpayne@69
|
39
|
jpayne@69
|
40 raise MissingPythonDependencyError(
|
jpayne@69
|
41 "Please install NumPy if you want to use Bio.MarkovModel. "
|
jpayne@69
|
42 "See http://www.numpy.org/"
|
jpayne@69
|
43 ) from None
|
jpayne@69
|
44
|
jpayne@69
|
45 logaddexp = np.logaddexp
|
jpayne@69
|
46
|
jpayne@69
|
47
|
jpayne@69
|
48 def itemindex(values):
|
jpayne@69
|
49 """Return a dictionary of values with their sequence offset as keys."""
|
jpayne@69
|
50 d = {}
|
jpayne@69
|
51 entries = enumerate(values[::-1])
|
jpayne@69
|
52 n = len(values) - 1
|
jpayne@69
|
53 for index, key in entries:
|
jpayne@69
|
54 d[key] = n - index
|
jpayne@69
|
55 return d
|
jpayne@69
|
56
|
jpayne@69
|
57
|
jpayne@69
|
58 np.random.seed()
|
jpayne@69
|
59
|
jpayne@69
|
60 VERY_SMALL_NUMBER = 1e-300
|
jpayne@69
|
61 LOG0 = np.log(VERY_SMALL_NUMBER)
|
jpayne@69
|
62
|
jpayne@69
|
63
|
jpayne@69
|
64 class MarkovModel:
|
jpayne@69
|
65 """Create a state-emitting MarkovModel object."""
|
jpayne@69
|
66
|
jpayne@69
|
67 def __init__(
|
jpayne@69
|
68 self, states, alphabet, p_initial=None, p_transition=None, p_emission=None
|
jpayne@69
|
69 ):
|
jpayne@69
|
70 """Initialize the class."""
|
jpayne@69
|
71 self.states = states
|
jpayne@69
|
72 self.alphabet = alphabet
|
jpayne@69
|
73 self.p_initial = p_initial
|
jpayne@69
|
74 self.p_transition = p_transition
|
jpayne@69
|
75 self.p_emission = p_emission
|
jpayne@69
|
76
|
jpayne@69
|
77 def __str__(self):
|
jpayne@69
|
78 """Create a string representation of the MarkovModel object."""
|
jpayne@69
|
79 from io import StringIO
|
jpayne@69
|
80
|
jpayne@69
|
81 handle = StringIO()
|
jpayne@69
|
82 save(self, handle)
|
jpayne@69
|
83 handle.seek(0)
|
jpayne@69
|
84 return handle.read()
|
jpayne@69
|
85
|
jpayne@69
|
86
|
jpayne@69
|
87 def _readline_and_check_start(handle, start):
|
jpayne@69
|
88 """Read the first line and evaluate that begisn with the correct start (PRIVATE)."""
|
jpayne@69
|
89 line = handle.readline()
|
jpayne@69
|
90 if not line.startswith(start):
|
jpayne@69
|
91 raise ValueError(f"I expected {start!r} but got {line!r}")
|
jpayne@69
|
92 return line
|
jpayne@69
|
93
|
jpayne@69
|
94
|
jpayne@69
|
95 def load(handle):
|
jpayne@69
|
96 """Parse a file handle into a MarkovModel object."""
|
jpayne@69
|
97 # Load the states.
|
jpayne@69
|
98 line = _readline_and_check_start(handle, "STATES:")
|
jpayne@69
|
99 states = line.split()[1:]
|
jpayne@69
|
100
|
jpayne@69
|
101 # Load the alphabet.
|
jpayne@69
|
102 line = _readline_and_check_start(handle, "ALPHABET:")
|
jpayne@69
|
103 alphabet = line.split()[1:]
|
jpayne@69
|
104
|
jpayne@69
|
105 mm = MarkovModel(states, alphabet)
|
jpayne@69
|
106 N, M = len(states), len(alphabet)
|
jpayne@69
|
107
|
jpayne@69
|
108 # Load the initial probabilities.
|
jpayne@69
|
109 mm.p_initial = np.zeros(N)
|
jpayne@69
|
110 line = _readline_and_check_start(handle, "INITIAL:")
|
jpayne@69
|
111 for i in range(len(states)):
|
jpayne@69
|
112 line = _readline_and_check_start(handle, f" {states[i]}:")
|
jpayne@69
|
113 mm.p_initial[i] = float(line.split()[-1])
|
jpayne@69
|
114
|
jpayne@69
|
115 # Load the transition.
|
jpayne@69
|
116 mm.p_transition = np.zeros((N, N))
|
jpayne@69
|
117 line = _readline_and_check_start(handle, "TRANSITION:")
|
jpayne@69
|
118 for i in range(len(states)):
|
jpayne@69
|
119 line = _readline_and_check_start(handle, f" {states[i]}:")
|
jpayne@69
|
120 mm.p_transition[i, :] = [float(v) for v in line.split()[1:]]
|
jpayne@69
|
121
|
jpayne@69
|
122 # Load the emission.
|
jpayne@69
|
123 mm.p_emission = np.zeros((N, M))
|
jpayne@69
|
124 line = _readline_and_check_start(handle, "EMISSION:")
|
jpayne@69
|
125 for i in range(len(states)):
|
jpayne@69
|
126 line = _readline_and_check_start(handle, f" {states[i]}:")
|
jpayne@69
|
127 mm.p_emission[i, :] = [float(v) for v in line.split()[1:]]
|
jpayne@69
|
128
|
jpayne@69
|
129 return mm
|
jpayne@69
|
130
|
jpayne@69
|
131
|
jpayne@69
|
132 def save(mm, handle):
|
jpayne@69
|
133 """Save MarkovModel object into handle."""
|
jpayne@69
|
134 # This will fail if there are spaces in the states or alphabet.
|
jpayne@69
|
135 w = handle.write
|
jpayne@69
|
136 w(f"STATES: {' '.join(mm.states)}\n")
|
jpayne@69
|
137 w(f"ALPHABET: {' '.join(mm.alphabet)}\n")
|
jpayne@69
|
138 w("INITIAL:\n")
|
jpayne@69
|
139 for i in range(len(mm.p_initial)):
|
jpayne@69
|
140 w(f" {mm.states[i]}: {mm.p_initial[i]:g}\n")
|
jpayne@69
|
141 w("TRANSITION:\n")
|
jpayne@69
|
142 for i in range(len(mm.p_transition)):
|
jpayne@69
|
143 w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_transition[i])}\n")
|
jpayne@69
|
144 w("EMISSION:\n")
|
jpayne@69
|
145 for i in range(len(mm.p_emission)):
|
jpayne@69
|
146 w(f" {mm.states[i]}: {' '.join(str(x) for x in mm.p_emission[i])}\n")
|
jpayne@69
|
147
|
jpayne@69
|
148
|
jpayne@69
|
149 # XXX allow them to specify starting points
|
jpayne@69
|
150 def train_bw(
|
jpayne@69
|
151 states,
|
jpayne@69
|
152 alphabet,
|
jpayne@69
|
153 training_data,
|
jpayne@69
|
154 pseudo_initial=None,
|
jpayne@69
|
155 pseudo_transition=None,
|
jpayne@69
|
156 pseudo_emission=None,
|
jpayne@69
|
157 update_fn=None,
|
jpayne@69
|
158 ):
|
jpayne@69
|
159 """Train a MarkovModel using the Baum-Welch algorithm.
|
jpayne@69
|
160
|
jpayne@69
|
161 Train a MarkovModel using the Baum-Welch algorithm. states is a list
|
jpayne@69
|
162 of strings that describe the names of each state. alphabet is a
|
jpayne@69
|
163 list of objects that indicate the allowed outputs. training_data
|
jpayne@69
|
164 is a list of observations. Each observation is a list of objects
|
jpayne@69
|
165 from the alphabet.
|
jpayne@69
|
166
|
jpayne@69
|
167 pseudo_initial, pseudo_transition, and pseudo_emission are
|
jpayne@69
|
168 optional parameters that you can use to assign pseudo-counts to
|
jpayne@69
|
169 different matrices. They should be matrices of the appropriate
|
jpayne@69
|
170 size that contain numbers to add to each parameter matrix, before
|
jpayne@69
|
171 normalization.
|
jpayne@69
|
172
|
jpayne@69
|
173 update_fn is an optional callback that takes parameters
|
jpayne@69
|
174 (iteration, log_likelihood). It is called once per iteration.
|
jpayne@69
|
175 """
|
jpayne@69
|
176 N, M = len(states), len(alphabet)
|
jpayne@69
|
177 if not training_data:
|
jpayne@69
|
178 raise ValueError("No training data given.")
|
jpayne@69
|
179 if pseudo_initial is not None:
|
jpayne@69
|
180 pseudo_initial = np.asarray(pseudo_initial)
|
jpayne@69
|
181 if pseudo_initial.shape != (N,):
|
jpayne@69
|
182 raise ValueError("pseudo_initial not shape len(states)")
|
jpayne@69
|
183 if pseudo_transition is not None:
|
jpayne@69
|
184 pseudo_transition = np.asarray(pseudo_transition)
|
jpayne@69
|
185 if pseudo_transition.shape != (N, N):
|
jpayne@69
|
186 raise ValueError("pseudo_transition not shape len(states) X len(states)")
|
jpayne@69
|
187 if pseudo_emission is not None:
|
jpayne@69
|
188 pseudo_emission = np.asarray(pseudo_emission)
|
jpayne@69
|
189 if pseudo_emission.shape != (N, M):
|
jpayne@69
|
190 raise ValueError("pseudo_emission not shape len(states) X len(alphabet)")
|
jpayne@69
|
191
|
jpayne@69
|
192 # Training data is given as a list of members of the alphabet.
|
jpayne@69
|
193 # Replace those with indexes into the alphabet list for easier
|
jpayne@69
|
194 # computation.
|
jpayne@69
|
195 training_outputs = []
|
jpayne@69
|
196 indexes = itemindex(alphabet)
|
jpayne@69
|
197 for outputs in training_data:
|
jpayne@69
|
198 training_outputs.append([indexes[x] for x in outputs])
|
jpayne@69
|
199
|
jpayne@69
|
200 # Do some sanity checking on the outputs.
|
jpayne@69
|
201 lengths = [len(x) for x in training_outputs]
|
jpayne@69
|
202 if min(lengths) == 0:
|
jpayne@69
|
203 raise ValueError("I got training data with outputs of length 0")
|
jpayne@69
|
204
|
jpayne@69
|
205 # Do the training with baum welch.
|
jpayne@69
|
206 x = _baum_welch(
|
jpayne@69
|
207 N,
|
jpayne@69
|
208 M,
|
jpayne@69
|
209 training_outputs,
|
jpayne@69
|
210 pseudo_initial=pseudo_initial,
|
jpayne@69
|
211 pseudo_transition=pseudo_transition,
|
jpayne@69
|
212 pseudo_emission=pseudo_emission,
|
jpayne@69
|
213 update_fn=update_fn,
|
jpayne@69
|
214 )
|
jpayne@69
|
215 p_initial, p_transition, p_emission = x
|
jpayne@69
|
216 return MarkovModel(states, alphabet, p_initial, p_transition, p_emission)
|
jpayne@69
|
217
|
jpayne@69
|
218
|
jpayne@69
|
219 MAX_ITERATIONS = 1000
|
jpayne@69
|
220
|
jpayne@69
|
221
|
jpayne@69
|
222 def _baum_welch(
|
jpayne@69
|
223 N,
|
jpayne@69
|
224 M,
|
jpayne@69
|
225 training_outputs,
|
jpayne@69
|
226 p_initial=None,
|
jpayne@69
|
227 p_transition=None,
|
jpayne@69
|
228 p_emission=None,
|
jpayne@69
|
229 pseudo_initial=None,
|
jpayne@69
|
230 pseudo_transition=None,
|
jpayne@69
|
231 pseudo_emission=None,
|
jpayne@69
|
232 update_fn=None,
|
jpayne@69
|
233 ):
|
jpayne@69
|
234 """Implement the Baum-Welch algorithm to evaluate unknown parameters in the MarkovModel object (PRIVATE)."""
|
jpayne@69
|
235 if p_initial is None:
|
jpayne@69
|
236 p_initial = _random_norm(N)
|
jpayne@69
|
237 else:
|
jpayne@69
|
238 p_initial = _copy_and_check(p_initial, (N,))
|
jpayne@69
|
239
|
jpayne@69
|
240 if p_transition is None:
|
jpayne@69
|
241 p_transition = _random_norm((N, N))
|
jpayne@69
|
242 else:
|
jpayne@69
|
243 p_transition = _copy_and_check(p_transition, (N, N))
|
jpayne@69
|
244 if p_emission is None:
|
jpayne@69
|
245 p_emission = _random_norm((N, M))
|
jpayne@69
|
246 else:
|
jpayne@69
|
247 p_emission = _copy_and_check(p_emission, (N, M))
|
jpayne@69
|
248
|
jpayne@69
|
249 # Do all the calculations in log space to avoid underflows.
|
jpayne@69
|
250 lp_initial = np.log(p_initial)
|
jpayne@69
|
251 lp_transition = np.log(p_transition)
|
jpayne@69
|
252 lp_emission = np.log(p_emission)
|
jpayne@69
|
253 if pseudo_initial is not None:
|
jpayne@69
|
254 lpseudo_initial = np.log(pseudo_initial)
|
jpayne@69
|
255 else:
|
jpayne@69
|
256 lpseudo_initial = None
|
jpayne@69
|
257 if pseudo_transition is not None:
|
jpayne@69
|
258 lpseudo_transition = np.log(pseudo_transition)
|
jpayne@69
|
259 else:
|
jpayne@69
|
260 lpseudo_transition = None
|
jpayne@69
|
261 if pseudo_emission is not None:
|
jpayne@69
|
262 lpseudo_emission = np.log(pseudo_emission)
|
jpayne@69
|
263 else:
|
jpayne@69
|
264 lpseudo_emission = None
|
jpayne@69
|
265
|
jpayne@69
|
266 # Iterate through each sequence of output, updating the parameters
|
jpayne@69
|
267 # to the HMM. Stop when the log likelihoods of the sequences
|
jpayne@69
|
268 # stops varying.
|
jpayne@69
|
269 prev_llik = None
|
jpayne@69
|
270 for i in range(MAX_ITERATIONS):
|
jpayne@69
|
271 llik = LOG0
|
jpayne@69
|
272 for outputs in training_outputs:
|
jpayne@69
|
273 llik += _baum_welch_one(
|
jpayne@69
|
274 N,
|
jpayne@69
|
275 M,
|
jpayne@69
|
276 outputs,
|
jpayne@69
|
277 lp_initial,
|
jpayne@69
|
278 lp_transition,
|
jpayne@69
|
279 lp_emission,
|
jpayne@69
|
280 lpseudo_initial,
|
jpayne@69
|
281 lpseudo_transition,
|
jpayne@69
|
282 lpseudo_emission,
|
jpayne@69
|
283 )
|
jpayne@69
|
284 if update_fn is not None:
|
jpayne@69
|
285 update_fn(i, llik)
|
jpayne@69
|
286 if prev_llik is not None and np.fabs(prev_llik - llik) < 0.1:
|
jpayne@69
|
287 break
|
jpayne@69
|
288 prev_llik = llik
|
jpayne@69
|
289 else:
|
jpayne@69
|
290 raise RuntimeError("HMM did not converge in %d iterations" % MAX_ITERATIONS)
|
jpayne@69
|
291
|
jpayne@69
|
292 # Return everything back in normal space.
|
jpayne@69
|
293 return [np.exp(_) for _ in (lp_initial, lp_transition, lp_emission)]
|
jpayne@69
|
294
|
jpayne@69
|
295
|
jpayne@69
|
296 def _baum_welch_one(
|
jpayne@69
|
297 N,
|
jpayne@69
|
298 M,
|
jpayne@69
|
299 outputs,
|
jpayne@69
|
300 lp_initial,
|
jpayne@69
|
301 lp_transition,
|
jpayne@69
|
302 lp_emission,
|
jpayne@69
|
303 lpseudo_initial,
|
jpayne@69
|
304 lpseudo_transition,
|
jpayne@69
|
305 lpseudo_emission,
|
jpayne@69
|
306 ):
|
jpayne@69
|
307 """Execute one step for Baum-Welch algorithm (PRIVATE).
|
jpayne@69
|
308
|
jpayne@69
|
309 Do one iteration of Baum-Welch based on a sequence of output.
|
jpayne@69
|
310 Changes the value for lp_initial, lp_transition and lp_emission in place.
|
jpayne@69
|
311 """
|
jpayne@69
|
312 T = len(outputs)
|
jpayne@69
|
313 fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs)
|
jpayne@69
|
314 bmat = _backward(N, T, lp_transition, lp_emission, outputs)
|
jpayne@69
|
315
|
jpayne@69
|
316 # Calculate the probability of traversing each arc for any given
|
jpayne@69
|
317 # transition.
|
jpayne@69
|
318 lp_arc = np.zeros((N, N, T))
|
jpayne@69
|
319 for t in range(T):
|
jpayne@69
|
320 k = outputs[t]
|
jpayne@69
|
321 lp_traverse = np.zeros((N, N)) # P going over one arc.
|
jpayne@69
|
322 for i in range(N):
|
jpayne@69
|
323 for j in range(N):
|
jpayne@69
|
324 # P(getting to this arc)
|
jpayne@69
|
325 # P(making this transition)
|
jpayne@69
|
326 # P(emitting this character)
|
jpayne@69
|
327 # P(going to the end)
|
jpayne@69
|
328 lp = (
|
jpayne@69
|
329 fmat[i][t]
|
jpayne@69
|
330 + lp_transition[i][j]
|
jpayne@69
|
331 + lp_emission[i][k]
|
jpayne@69
|
332 + bmat[j][t + 1]
|
jpayne@69
|
333 )
|
jpayne@69
|
334 lp_traverse[i][j] = lp
|
jpayne@69
|
335 # Normalize the probability for this time step.
|
jpayne@69
|
336 lp_arc[:, :, t] = lp_traverse - _logsum(lp_traverse)
|
jpayne@69
|
337
|
jpayne@69
|
338 # Sum of all the transitions out of state i at time t.
|
jpayne@69
|
339 lp_arcout_t = np.zeros((N, T))
|
jpayne@69
|
340 for t in range(T):
|
jpayne@69
|
341 for i in range(N):
|
jpayne@69
|
342 lp_arcout_t[i][t] = _logsum(lp_arc[i, :, t])
|
jpayne@69
|
343
|
jpayne@69
|
344 # Sum of all the transitions out of state i.
|
jpayne@69
|
345 lp_arcout = np.zeros(N)
|
jpayne@69
|
346 for i in range(N):
|
jpayne@69
|
347 lp_arcout[i] = _logsum(lp_arcout_t[i, :])
|
jpayne@69
|
348
|
jpayne@69
|
349 # UPDATE P_INITIAL.
|
jpayne@69
|
350 lp_initial = lp_arcout_t[:, 0]
|
jpayne@69
|
351 if lpseudo_initial is not None:
|
jpayne@69
|
352 lp_initial = _logvecadd(lp_initial, lpseudo_initial)
|
jpayne@69
|
353 lp_initial = lp_initial - _logsum(lp_initial)
|
jpayne@69
|
354
|
jpayne@69
|
355 # UPDATE P_TRANSITION. p_transition[i][j] is the sum of all the
|
jpayne@69
|
356 # transitions from i to j, normalized by the sum of the
|
jpayne@69
|
357 # transitions out of i.
|
jpayne@69
|
358 for i in range(N):
|
jpayne@69
|
359 for j in range(N):
|
jpayne@69
|
360 lp_transition[i][j] = _logsum(lp_arc[i, j, :]) - lp_arcout[i]
|
jpayne@69
|
361 if lpseudo_transition is not None:
|
jpayne@69
|
362 lp_transition[i] = _logvecadd(lp_transition[i], lpseudo_transition)
|
jpayne@69
|
363 lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i])
|
jpayne@69
|
364
|
jpayne@69
|
365 # UPDATE P_EMISSION. lp_emission[i][k] is the sum of all the
|
jpayne@69
|
366 # transitions out of i when k is observed, divided by the sum of
|
jpayne@69
|
367 # the transitions out of i.
|
jpayne@69
|
368 for i in range(N):
|
jpayne@69
|
369 ksum = np.zeros(M) + LOG0 # ksum[k] is the sum of all i with k.
|
jpayne@69
|
370 for t in range(T):
|
jpayne@69
|
371 k = outputs[t]
|
jpayne@69
|
372 for j in range(N):
|
jpayne@69
|
373 ksum[k] = logaddexp(ksum[k], lp_arc[i, j, t])
|
jpayne@69
|
374 ksum = ksum - _logsum(ksum) # Normalize
|
jpayne@69
|
375 if lpseudo_emission is not None:
|
jpayne@69
|
376 ksum = _logvecadd(ksum, lpseudo_emission[i])
|
jpayne@69
|
377 ksum = ksum - _logsum(ksum) # Renormalize
|
jpayne@69
|
378 lp_emission[i, :] = ksum
|
jpayne@69
|
379
|
jpayne@69
|
380 # Calculate the log likelihood of the output based on the forward
|
jpayne@69
|
381 # matrix. Since the parameters of the HMM has changed, the log
|
jpayne@69
|
382 # likelihoods are going to be a step behind, and we might be doing
|
jpayne@69
|
383 # one extra iteration of training. The alternative is to rerun
|
jpayne@69
|
384 # the _forward algorithm and calculate from the clean one, but
|
jpayne@69
|
385 # that may be more expensive than overshooting the training by one
|
jpayne@69
|
386 # step.
|
jpayne@69
|
387 return _logsum(fmat[:, T])
|
jpayne@69
|
388
|
jpayne@69
|
389
|
jpayne@69
|
390 def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs):
|
jpayne@69
|
391 """Implement forward algorithm (PRIVATE).
|
jpayne@69
|
392
|
jpayne@69
|
393 Calculate a Nx(T+1) matrix, where the last column is the total
|
jpayne@69
|
394 probability of the output.
|
jpayne@69
|
395 """
|
jpayne@69
|
396 matrix = np.zeros((N, T + 1))
|
jpayne@69
|
397
|
jpayne@69
|
398 # Initialize the first column to be the initial values.
|
jpayne@69
|
399 matrix[:, 0] = lp_initial
|
jpayne@69
|
400 for t in range(1, T + 1):
|
jpayne@69
|
401 k = outputs[t - 1]
|
jpayne@69
|
402 for j in range(N):
|
jpayne@69
|
403 # The probability of the state is the sum of the
|
jpayne@69
|
404 # transitions from all the states from time t-1.
|
jpayne@69
|
405 lprob = LOG0
|
jpayne@69
|
406 for i in range(N):
|
jpayne@69
|
407 lp = matrix[i][t - 1] + lp_transition[i][j] + lp_emission[i][k]
|
jpayne@69
|
408 lprob = logaddexp(lprob, lp)
|
jpayne@69
|
409 matrix[j][t] = lprob
|
jpayne@69
|
410 return matrix
|
jpayne@69
|
411
|
jpayne@69
|
412
|
jpayne@69
|
413 def _backward(N, T, lp_transition, lp_emission, outputs):
|
jpayne@69
|
414 """Implement backward algorithm (PRIVATE)."""
|
jpayne@69
|
415 matrix = np.zeros((N, T + 1))
|
jpayne@69
|
416 for t in range(T - 1, -1, -1):
|
jpayne@69
|
417 k = outputs[t]
|
jpayne@69
|
418 for i in range(N):
|
jpayne@69
|
419 # The probability of the state is the sum of the
|
jpayne@69
|
420 # transitions from all the states from time t+1.
|
jpayne@69
|
421 lprob = LOG0
|
jpayne@69
|
422 for j in range(N):
|
jpayne@69
|
423 lp = matrix[j][t + 1] + lp_transition[i][j] + lp_emission[i][k]
|
jpayne@69
|
424 lprob = logaddexp(lprob, lp)
|
jpayne@69
|
425 matrix[i][t] = lprob
|
jpayne@69
|
426 return matrix
|
jpayne@69
|
427
|
jpayne@69
|
428
|
jpayne@69
|
429 def train_visible(
|
jpayne@69
|
430 states,
|
jpayne@69
|
431 alphabet,
|
jpayne@69
|
432 training_data,
|
jpayne@69
|
433 pseudo_initial=None,
|
jpayne@69
|
434 pseudo_transition=None,
|
jpayne@69
|
435 pseudo_emission=None,
|
jpayne@69
|
436 ):
|
jpayne@69
|
437 """Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters.
|
jpayne@69
|
438
|
jpayne@69
|
439 Train a visible MarkovModel using maximum likelihoood estimates
|
jpayne@69
|
440 for each of the parameters. states is a list of strings that
|
jpayne@69
|
441 describe the names of each state. alphabet is a list of objects
|
jpayne@69
|
442 that indicate the allowed outputs. training_data is a list of
|
jpayne@69
|
443 (outputs, observed states) where outputs is a list of the emission
|
jpayne@69
|
444 from the alphabet, and observed states is a list of states from
|
jpayne@69
|
445 states.
|
jpayne@69
|
446
|
jpayne@69
|
447 pseudo_initial, pseudo_transition, and pseudo_emission are
|
jpayne@69
|
448 optional parameters that you can use to assign pseudo-counts to
|
jpayne@69
|
449 different matrices. They should be matrices of the appropriate
|
jpayne@69
|
450 size that contain numbers to add to each parameter matrix.
|
jpayne@69
|
451 """
|
jpayne@69
|
452 N, M = len(states), len(alphabet)
|
jpayne@69
|
453 if pseudo_initial is not None:
|
jpayne@69
|
454 pseudo_initial = np.asarray(pseudo_initial)
|
jpayne@69
|
455 if pseudo_initial.shape != (N,):
|
jpayne@69
|
456 raise ValueError("pseudo_initial not shape len(states)")
|
jpayne@69
|
457 if pseudo_transition is not None:
|
jpayne@69
|
458 pseudo_transition = np.asarray(pseudo_transition)
|
jpayne@69
|
459 if pseudo_transition.shape != (N, N):
|
jpayne@69
|
460 raise ValueError("pseudo_transition not shape len(states) X len(states)")
|
jpayne@69
|
461 if pseudo_emission is not None:
|
jpayne@69
|
462 pseudo_emission = np.asarray(pseudo_emission)
|
jpayne@69
|
463 if pseudo_emission.shape != (N, M):
|
jpayne@69
|
464 raise ValueError("pseudo_emission not shape len(states) X len(alphabet)")
|
jpayne@69
|
465
|
jpayne@69
|
466 # Training data is given as a list of members of the alphabet.
|
jpayne@69
|
467 # Replace those with indexes into the alphabet list for easier
|
jpayne@69
|
468 # computation.
|
jpayne@69
|
469 training_states, training_outputs = [], []
|
jpayne@69
|
470 states_indexes = itemindex(states)
|
jpayne@69
|
471 outputs_indexes = itemindex(alphabet)
|
jpayne@69
|
472 for toutputs, tstates in training_data:
|
jpayne@69
|
473 if len(tstates) != len(toutputs):
|
jpayne@69
|
474 raise ValueError("states and outputs not aligned")
|
jpayne@69
|
475 training_states.append([states_indexes[x] for x in tstates])
|
jpayne@69
|
476 training_outputs.append([outputs_indexes[x] for x in toutputs])
|
jpayne@69
|
477
|
jpayne@69
|
478 x = _mle(
|
jpayne@69
|
479 N,
|
jpayne@69
|
480 M,
|
jpayne@69
|
481 training_outputs,
|
jpayne@69
|
482 training_states,
|
jpayne@69
|
483 pseudo_initial,
|
jpayne@69
|
484 pseudo_transition,
|
jpayne@69
|
485 pseudo_emission,
|
jpayne@69
|
486 )
|
jpayne@69
|
487 p_initial, p_transition, p_emission = x
|
jpayne@69
|
488
|
jpayne@69
|
489 return MarkovModel(states, alphabet, p_initial, p_transition, p_emission)
|
jpayne@69
|
490
|
jpayne@69
|
491
|
jpayne@69
|
492 def _mle(
|
jpayne@69
|
493 N,
|
jpayne@69
|
494 M,
|
jpayne@69
|
495 training_outputs,
|
jpayne@69
|
496 training_states,
|
jpayne@69
|
497 pseudo_initial,
|
jpayne@69
|
498 pseudo_transition,
|
jpayne@69
|
499 pseudo_emission,
|
jpayne@69
|
500 ):
|
jpayne@69
|
501 """Implement Maximum likelihood estimation algorithm (PRIVATE)."""
|
jpayne@69
|
502 # p_initial is the probability that a sequence of states starts
|
jpayne@69
|
503 # off with a particular one.
|
jpayne@69
|
504 p_initial = np.zeros(N)
|
jpayne@69
|
505 if pseudo_initial:
|
jpayne@69
|
506 p_initial = p_initial + pseudo_initial
|
jpayne@69
|
507 for states in training_states:
|
jpayne@69
|
508 p_initial[states[0]] += 1
|
jpayne@69
|
509 p_initial = _normalize(p_initial)
|
jpayne@69
|
510
|
jpayne@69
|
511 # p_transition is the probability that a state leads to the next
|
jpayne@69
|
512 # one. C(i,j)/C(i) where i and j are states.
|
jpayne@69
|
513 p_transition = np.zeros((N, N))
|
jpayne@69
|
514 if pseudo_transition:
|
jpayne@69
|
515 p_transition = p_transition + pseudo_transition
|
jpayne@69
|
516 for states in training_states:
|
jpayne@69
|
517 for n in range(len(states) - 1):
|
jpayne@69
|
518 i, j = states[n], states[n + 1]
|
jpayne@69
|
519 p_transition[i, j] += 1
|
jpayne@69
|
520 for i in range(len(p_transition)):
|
jpayne@69
|
521 p_transition[i, :] = p_transition[i, :] / sum(p_transition[i, :])
|
jpayne@69
|
522
|
jpayne@69
|
523 # p_emission is the probability of an output given a state.
|
jpayne@69
|
524 # C(s,o)|C(s) where o is an output and s is a state.
|
jpayne@69
|
525 p_emission = np.zeros((N, M))
|
jpayne@69
|
526 if pseudo_emission:
|
jpayne@69
|
527 p_emission = p_emission + pseudo_emission
|
jpayne@69
|
528 p_emission = np.ones((N, M))
|
jpayne@69
|
529 for outputs, states in zip(training_outputs, training_states):
|
jpayne@69
|
530 for o, s in zip(outputs, states):
|
jpayne@69
|
531 p_emission[s, o] += 1
|
jpayne@69
|
532 for i in range(len(p_emission)):
|
jpayne@69
|
533 p_emission[i, :] = p_emission[i, :] / sum(p_emission[i, :])
|
jpayne@69
|
534
|
jpayne@69
|
535 return p_initial, p_transition, p_emission
|
jpayne@69
|
536
|
jpayne@69
|
537
|
jpayne@69
|
538 def _argmaxes(vector, allowance=None):
|
jpayne@69
|
539 """Return indices of the maximum values aong the vector (PRIVATE)."""
|
jpayne@69
|
540 return [np.argmax(vector)]
|
jpayne@69
|
541
|
jpayne@69
|
542
|
jpayne@69
|
543 def find_states(markov_model, output):
|
jpayne@69
|
544 """Find states in the given Markov model output.
|
jpayne@69
|
545
|
jpayne@69
|
546 Returns a list of (states, score) tuples.
|
jpayne@69
|
547 """
|
jpayne@69
|
548 mm = markov_model
|
jpayne@69
|
549 N = len(mm.states)
|
jpayne@69
|
550
|
jpayne@69
|
551 # _viterbi does calculations in log space. Add a tiny bit to the
|
jpayne@69
|
552 # matrices so that the logs will not break.
|
jpayne@69
|
553 lp_initial = np.log(mm.p_initial + VERY_SMALL_NUMBER)
|
jpayne@69
|
554 lp_transition = np.log(mm.p_transition + VERY_SMALL_NUMBER)
|
jpayne@69
|
555 lp_emission = np.log(mm.p_emission + VERY_SMALL_NUMBER)
|
jpayne@69
|
556 # Change output into a list of indexes into the alphabet.
|
jpayne@69
|
557 indexes = itemindex(mm.alphabet)
|
jpayne@69
|
558 output = [indexes[x] for x in output]
|
jpayne@69
|
559
|
jpayne@69
|
560 # Run the viterbi algorithm.
|
jpayne@69
|
561 results = _viterbi(N, lp_initial, lp_transition, lp_emission, output)
|
jpayne@69
|
562
|
jpayne@69
|
563 for i in range(len(results)):
|
jpayne@69
|
564 states, score = results[i]
|
jpayne@69
|
565 results[i] = [mm.states[x] for x in states], np.exp(score)
|
jpayne@69
|
566 return results
|
jpayne@69
|
567
|
jpayne@69
|
568
|
jpayne@69
|
569 def _viterbi(N, lp_initial, lp_transition, lp_emission, output):
|
jpayne@69
|
570 """Implement Viterbi algorithm to find most likely states for a given input (PRIVATE)."""
|
jpayne@69
|
571 T = len(output)
|
jpayne@69
|
572 # Store the backtrace in a NxT matrix.
|
jpayne@69
|
573 backtrace = [] # list of indexes of states in previous timestep.
|
jpayne@69
|
574 for i in range(N):
|
jpayne@69
|
575 backtrace.append([None] * T)
|
jpayne@69
|
576
|
jpayne@69
|
577 # Store the best scores.
|
jpayne@69
|
578 scores = np.zeros((N, T))
|
jpayne@69
|
579 scores[:, 0] = lp_initial + lp_emission[:, output[0]]
|
jpayne@69
|
580 for t in range(1, T):
|
jpayne@69
|
581 k = output[t]
|
jpayne@69
|
582 for j in range(N):
|
jpayne@69
|
583 # Find the most likely place it came from.
|
jpayne@69
|
584 i_scores = scores[:, t - 1] + lp_transition[:, j] + lp_emission[j, k]
|
jpayne@69
|
585 indexes = _argmaxes(i_scores)
|
jpayne@69
|
586 scores[j, t] = i_scores[indexes[0]]
|
jpayne@69
|
587 backtrace[j][t] = indexes
|
jpayne@69
|
588
|
jpayne@69
|
589 # Do the backtrace. First, find a good place to start. Then,
|
jpayne@69
|
590 # we'll follow the backtrace matrix to find the list of states.
|
jpayne@69
|
591 # In the event of ties, there may be multiple paths back through
|
jpayne@69
|
592 # the matrix, which implies a recursive solution. We'll simulate
|
jpayne@69
|
593 # it by keeping our own stack.
|
jpayne@69
|
594 in_process = [] # list of (t, states, score)
|
jpayne@69
|
595 results = [] # return values. list of (states, score)
|
jpayne@69
|
596 indexes = _argmaxes(scores[:, T - 1]) # pick the first place
|
jpayne@69
|
597 for i in indexes:
|
jpayne@69
|
598 in_process.append((T - 1, [i], scores[i][T - 1]))
|
jpayne@69
|
599 while in_process:
|
jpayne@69
|
600 t, states, score = in_process.pop()
|
jpayne@69
|
601 if t == 0:
|
jpayne@69
|
602 results.append((states, score))
|
jpayne@69
|
603 else:
|
jpayne@69
|
604 indexes = backtrace[states[0]][t]
|
jpayne@69
|
605 for i in indexes:
|
jpayne@69
|
606 in_process.append((t - 1, [i] + states, score))
|
jpayne@69
|
607 return results
|
jpayne@69
|
608
|
jpayne@69
|
609
|
jpayne@69
|
610 def _normalize(matrix):
|
jpayne@69
|
611 """Normalize matrix object (PRIVATE)."""
|
jpayne@69
|
612 if len(matrix.shape) == 1:
|
jpayne@69
|
613 matrix = matrix / sum(matrix)
|
jpayne@69
|
614 elif len(matrix.shape) == 2:
|
jpayne@69
|
615 # Normalize by rows.
|
jpayne@69
|
616 for i in range(len(matrix)):
|
jpayne@69
|
617 matrix[i, :] = matrix[i, :] / sum(matrix[i, :])
|
jpayne@69
|
618 else:
|
jpayne@69
|
619 raise ValueError("I cannot handle matrixes of that shape")
|
jpayne@69
|
620 return matrix
|
jpayne@69
|
621
|
jpayne@69
|
622
|
jpayne@69
|
623 def _uniform_norm(shape):
|
jpayne@69
|
624 """Normalize a uniform matrix (PRIVATE)."""
|
jpayne@69
|
625 matrix = np.ones(shape)
|
jpayne@69
|
626 return _normalize(matrix)
|
jpayne@69
|
627
|
jpayne@69
|
628
|
jpayne@69
|
629 def _random_norm(shape):
|
jpayne@69
|
630 """Normalize a random matrix (PRIVATE)."""
|
jpayne@69
|
631 matrix = np.random.random(shape)
|
jpayne@69
|
632 return _normalize(matrix)
|
jpayne@69
|
633
|
jpayne@69
|
634
|
jpayne@69
|
635 def _copy_and_check(matrix, desired_shape):
|
jpayne@69
|
636 """Copy a matrix and check its dimension. Normalize at the end (PRIVATE)."""
|
jpayne@69
|
637 # Copy the matrix.
|
jpayne@69
|
638 matrix = np.array(matrix, copy=1)
|
jpayne@69
|
639 # Check the dimensions.
|
jpayne@69
|
640 if matrix.shape != desired_shape:
|
jpayne@69
|
641 raise ValueError("Incorrect dimension")
|
jpayne@69
|
642 # Make sure it's normalized.
|
jpayne@69
|
643 if len(matrix.shape) == 1:
|
jpayne@69
|
644 if np.fabs(sum(matrix) - 1.0) > 0.01:
|
jpayne@69
|
645 raise ValueError("matrix not normalized to 1.0")
|
jpayne@69
|
646 elif len(matrix.shape) == 2:
|
jpayne@69
|
647 for i in range(len(matrix)):
|
jpayne@69
|
648 if np.fabs(sum(matrix[i]) - 1.0) > 0.01:
|
jpayne@69
|
649 raise ValueError("matrix %d not normalized to 1.0" % i)
|
jpayne@69
|
650 else:
|
jpayne@69
|
651 raise ValueError("I don't handle matrices > 2 dimensions")
|
jpayne@69
|
652 return matrix
|
jpayne@69
|
653
|
jpayne@69
|
654
|
jpayne@69
|
655 def _logsum(matrix):
|
jpayne@69
|
656 """Implement logsum for a matrix object (PRIVATE)."""
|
jpayne@69
|
657 if len(matrix.shape) > 1:
|
jpayne@69
|
658 vec = np.reshape(matrix, (np.prod(matrix.shape),))
|
jpayne@69
|
659 else:
|
jpayne@69
|
660 vec = matrix
|
jpayne@69
|
661 sum = LOG0
|
jpayne@69
|
662 for num in vec:
|
jpayne@69
|
663 sum = logaddexp(sum, num)
|
jpayne@69
|
664 return sum
|
jpayne@69
|
665
|
jpayne@69
|
666
|
jpayne@69
|
667 def _logvecadd(logvec1, logvec2):
|
jpayne@69
|
668 """Implement a log sum for two vector objects (PRIVATE)."""
|
jpayne@69
|
669 assert len(logvec1) == len(logvec2), "vectors aren't the same length"
|
jpayne@69
|
670 sumvec = np.zeros(len(logvec1))
|
jpayne@69
|
671 for i in range(len(logvec1)):
|
jpayne@69
|
672 sumvec[i] = logaddexp(logvec1[i], logvec2[i])
|
jpayne@69
|
673 return sumvec
|
jpayne@69
|
674
|
jpayne@69
|
675
|
jpayne@69
|
676 def _exp_logsum(numbers):
|
jpayne@69
|
677 """Return the exponential of a logsum (PRIVATE)."""
|
jpayne@69
|
678 sum = _logsum(numbers)
|
jpayne@69
|
679 return np.exp(sum)
|