jpayne@69: # Copyright 2001 by Jeffrey Chang. All rights reserved. jpayne@69: # jpayne@69: # This file is part of the Biopython distribution and governed by your jpayne@69: # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". jpayne@69: # Please see the LICENSE file that should have been included as part of this jpayne@69: # package. jpayne@69: """Maximum Entropy code. jpayne@69: jpayne@69: Uses Improved Iterative Scaling. jpayne@69: """ jpayne@69: # TODO Define terminology jpayne@69: jpayne@69: from functools import reduce jpayne@69: import warnings jpayne@69: jpayne@69: try: jpayne@69: import numpy as np jpayne@69: except ImportError: jpayne@69: from Bio import MissingPythonDependencyError jpayne@69: jpayne@69: raise MissingPythonDependencyError( jpayne@69: "Please install NumPy if you want to use Bio.MaxEntropy. " jpayne@69: "See http://www.numpy.org/" jpayne@69: ) from None jpayne@69: jpayne@69: jpayne@69: from Bio import BiopythonDeprecationWarning jpayne@69: jpayne@69: warnings.warn( jpayne@69: "The 'Bio.MaxEntropy' module is deprecated and will be removed in a future " jpayne@69: "release of Biopython. Consider using scikit-learn instead.", jpayne@69: BiopythonDeprecationWarning, jpayne@69: ) jpayne@69: jpayne@69: jpayne@69: class MaxEntropy: jpayne@69: """Hold information for a Maximum Entropy classifier. jpayne@69: jpayne@69: Members: jpayne@69: classes List of the possible classes of data. jpayne@69: alphas List of the weights for each feature. jpayne@69: feature_fns List of the feature functions. jpayne@69: jpayne@69: Car data from example Naive Bayes Classifier example by Eric Meisner November 22, 2003 jpayne@69: http://www.inf.u-szeged.hu/~ormandi/teaching jpayne@69: jpayne@69: >>> from Bio.MaxEntropy import train, classify jpayne@69: >>> xcar = [ jpayne@69: ... ['Red', 'Sports', 'Domestic'], jpayne@69: ... ['Red', 'Sports', 'Domestic'], jpayne@69: ... ['Red', 'Sports', 'Domestic'], jpayne@69: ... ['Yellow', 'Sports', 'Domestic'], jpayne@69: ... ['Yellow', 'Sports', 'Imported'], jpayne@69: ... ['Yellow', 'SUV', 'Imported'], jpayne@69: ... ['Yellow', 'SUV', 'Imported'], jpayne@69: ... ['Yellow', 'SUV', 'Domestic'], jpayne@69: ... ['Red', 'SUV', 'Imported'], jpayne@69: ... ['Red', 'Sports', 'Imported']] jpayne@69: >>> ycar = ['Yes','No','Yes','No','Yes','No','Yes','No','No','Yes'] jpayne@69: jpayne@69: Requires some rules or features jpayne@69: jpayne@69: >>> def udf1(ts, cl): jpayne@69: ... return ts[0] != 'Red' jpayne@69: ... jpayne@69: >>> def udf2(ts, cl): jpayne@69: ... return ts[1] != 'Sports' jpayne@69: ... jpayne@69: >>> def udf3(ts, cl): jpayne@69: ... return ts[2] != 'Domestic' jpayne@69: ... jpayne@69: >>> user_functions = [udf1, udf2, udf3] # must be an iterable type jpayne@69: >>> xe = train(xcar, ycar, user_functions) jpayne@69: >>> for xv, yv in zip(xcar, ycar): jpayne@69: ... xc = classify(xe, xv) jpayne@69: ... print('Pred: %s gives %s y is %s' % (xv, xc, yv)) jpayne@69: ... jpayne@69: Pred: ['Red', 'Sports', 'Domestic'] gives No y is Yes jpayne@69: Pred: ['Red', 'Sports', 'Domestic'] gives No y is No jpayne@69: Pred: ['Red', 'Sports', 'Domestic'] gives No y is Yes jpayne@69: Pred: ['Yellow', 'Sports', 'Domestic'] gives No y is No jpayne@69: Pred: ['Yellow', 'Sports', 'Imported'] gives No y is Yes jpayne@69: Pred: ['Yellow', 'SUV', 'Imported'] gives No y is No jpayne@69: Pred: ['Yellow', 'SUV', 'Imported'] gives No y is Yes jpayne@69: Pred: ['Yellow', 'SUV', 'Domestic'] gives No y is No jpayne@69: Pred: ['Red', 'SUV', 'Imported'] gives No y is No jpayne@69: Pred: ['Red', 'Sports', 'Imported'] gives No y is Yes jpayne@69: """ jpayne@69: jpayne@69: def __init__(self): jpayne@69: """Initialize the class.""" jpayne@69: self.classes = [] jpayne@69: self.alphas = [] jpayne@69: self.feature_fns = [] jpayne@69: jpayne@69: jpayne@69: def calculate(me, observation): jpayne@69: """Calculate the log of the probability for each class. jpayne@69: jpayne@69: me is a MaxEntropy object that has been trained. observation is a vector jpayne@69: representing the observed data. The return value is a list of jpayne@69: unnormalized log probabilities for each class. jpayne@69: """ jpayne@69: scores = [] jpayne@69: assert len(me.feature_fns) == len(me.alphas) jpayne@69: for klass in me.classes: jpayne@69: lprob = 0.0 jpayne@69: for fn, alpha in zip(me.feature_fns, me.alphas): jpayne@69: lprob += fn(observation, klass) * alpha jpayne@69: scores.append(lprob) jpayne@69: return scores jpayne@69: jpayne@69: jpayne@69: def classify(me, observation): jpayne@69: """Classify an observation into a class.""" jpayne@69: scores = calculate(me, observation) jpayne@69: max_score, klass = scores[0], me.classes[0] jpayne@69: for i in range(1, len(scores)): jpayne@69: if scores[i] > max_score: jpayne@69: max_score, klass = scores[i], me.classes[i] jpayne@69: return klass jpayne@69: jpayne@69: jpayne@69: def _eval_feature_fn(fn, xs, classes): jpayne@69: """Evaluate a feature function on every instance of the training set and class (PRIVATE). jpayne@69: jpayne@69: fn is a callback function that takes two parameters: a jpayne@69: training instance and a class. Return a dictionary of (training jpayne@69: set index, class index) -> non-zero value. Values of 0 are not jpayne@69: stored in the dictionary. jpayne@69: """ jpayne@69: values = {} jpayne@69: for i in range(len(xs)): jpayne@69: for j in range(len(classes)): jpayne@69: f = fn(xs[i], classes[j]) jpayne@69: if f != 0: jpayne@69: values[(i, j)] = f jpayne@69: return values jpayne@69: jpayne@69: jpayne@69: def _calc_empirical_expects(xs, ys, classes, features): jpayne@69: """Calculate the expectation of each function from the data (PRIVATE). jpayne@69: jpayne@69: This is the constraint for the maximum entropy distribution. Return a jpayne@69: list of expectations, parallel to the list of features. jpayne@69: """ jpayne@69: # E[f_i] = SUM_x,y P(x, y) f(x, y) jpayne@69: # = 1/N f(x, y) jpayne@69: class2index = {} jpayne@69: for index, key in enumerate(classes): jpayne@69: class2index[key] = index jpayne@69: ys_i = [class2index[y] for y in ys] jpayne@69: jpayne@69: expect = [] jpayne@69: N = len(xs) jpayne@69: for feature in features: jpayne@69: s = 0 jpayne@69: for i in range(N): jpayne@69: s += feature.get((i, ys_i[i]), 0) jpayne@69: expect.append(s / N) jpayne@69: return expect jpayne@69: jpayne@69: jpayne@69: def _calc_model_expects(xs, classes, features, alphas): jpayne@69: """Calculate the expectation of each feature from the model (PRIVATE). jpayne@69: jpayne@69: This is not used in maximum entropy training, but provides a good function jpayne@69: for debugging. jpayne@69: """ jpayne@69: # SUM_X P(x) SUM_Y P(Y|X) F(X, Y) jpayne@69: # = 1/N SUM_X SUM_Y P(Y|X) F(X, Y) jpayne@69: p_yx = _calc_p_class_given_x(xs, classes, features, alphas) jpayne@69: jpayne@69: expects = [] jpayne@69: for feature in features: jpayne@69: sum = 0.0 jpayne@69: for (i, j), f in feature.items(): jpayne@69: sum += p_yx[i][j] * f jpayne@69: expects.append(sum / len(xs)) jpayne@69: return expects jpayne@69: jpayne@69: jpayne@69: def _calc_p_class_given_x(xs, classes, features, alphas): jpayne@69: """Calculate conditional probability P(y|x) (PRIVATE). jpayne@69: jpayne@69: y is the class and x is an instance from the training set. jpayne@69: Return a XSxCLASSES matrix of probabilities. jpayne@69: """ jpayne@69: prob_yx = np.zeros((len(xs), len(classes))) jpayne@69: jpayne@69: # Calculate log P(y, x). jpayne@69: assert len(features) == len(alphas) jpayne@69: for feature, alpha in zip(features, alphas): jpayne@69: for (x, y), f in feature.items(): jpayne@69: prob_yx[x][y] += alpha * f jpayne@69: # Take an exponent to get P(y, x) jpayne@69: prob_yx = np.exp(prob_yx) jpayne@69: # Divide out the probability over each class, so we get P(y|x). jpayne@69: for i in range(len(xs)): jpayne@69: z = sum(prob_yx[i]) jpayne@69: prob_yx[i] = prob_yx[i] / z jpayne@69: return prob_yx jpayne@69: jpayne@69: jpayne@69: def _calc_f_sharp(N, nclasses, features): jpayne@69: """Calculate a matrix of f sharp values (PRIVATE).""" jpayne@69: # f#(x, y) = SUM_i feature(x, y) jpayne@69: f_sharp = np.zeros((N, nclasses)) jpayne@69: for feature in features: jpayne@69: for (i, j), f in feature.items(): jpayne@69: f_sharp[i][j] += f jpayne@69: return f_sharp jpayne@69: jpayne@69: jpayne@69: def _iis_solve_delta( jpayne@69: N, feature, f_sharp, empirical, prob_yx, max_newton_iterations, newton_converge jpayne@69: ): jpayne@69: """Solve delta using Newton's method (PRIVATE).""" jpayne@69: # SUM_x P(x) * SUM_c P(c|x) f_i(x, c) e^[delta_i * f#(x, c)] = 0 jpayne@69: delta = 0.0 jpayne@69: iters = 0 jpayne@69: while iters < max_newton_iterations: # iterate for Newton's method jpayne@69: f_newton = df_newton = 0.0 # evaluate the function and derivative jpayne@69: for (i, j), f in feature.items(): jpayne@69: prod = prob_yx[i][j] * f * np.exp(delta * f_sharp[i][j]) jpayne@69: f_newton += prod jpayne@69: df_newton += prod * f_sharp[i][j] jpayne@69: f_newton, df_newton = empirical - f_newton / N, -df_newton / N jpayne@69: jpayne@69: ratio = f_newton / df_newton jpayne@69: delta -= ratio jpayne@69: if np.fabs(ratio) < newton_converge: # converged jpayne@69: break jpayne@69: iters = iters + 1 jpayne@69: else: jpayne@69: raise RuntimeError("Newton's method did not converge") jpayne@69: return delta jpayne@69: jpayne@69: jpayne@69: def _train_iis( jpayne@69: xs, jpayne@69: classes, jpayne@69: features, jpayne@69: f_sharp, jpayne@69: alphas, jpayne@69: e_empirical, jpayne@69: max_newton_iterations, jpayne@69: newton_converge, jpayne@69: ): jpayne@69: """Do one iteration of hill climbing to find better alphas (PRIVATE).""" jpayne@69: # This is a good function to parallelize. jpayne@69: jpayne@69: # Pre-calculate P(y|x) jpayne@69: p_yx = _calc_p_class_given_x(xs, classes, features, alphas) jpayne@69: jpayne@69: N = len(xs) jpayne@69: newalphas = alphas[:] jpayne@69: for i in range(len(alphas)): jpayne@69: delta = _iis_solve_delta( jpayne@69: N, jpayne@69: features[i], jpayne@69: f_sharp, jpayne@69: e_empirical[i], jpayne@69: p_yx, jpayne@69: max_newton_iterations, jpayne@69: newton_converge, jpayne@69: ) jpayne@69: newalphas[i] += delta jpayne@69: return newalphas jpayne@69: jpayne@69: jpayne@69: def train( jpayne@69: training_set, jpayne@69: results, jpayne@69: feature_fns, jpayne@69: update_fn=None, jpayne@69: max_iis_iterations=10000, jpayne@69: iis_converge=1.0e-5, jpayne@69: max_newton_iterations=100, jpayne@69: newton_converge=1.0e-10, jpayne@69: ): jpayne@69: """Train a maximum entropy classifier, returns MaxEntropy object. jpayne@69: jpayne@69: Train a maximum entropy classifier on a training set. jpayne@69: training_set is a list of observations. results is a list of the jpayne@69: class assignments for each observation. feature_fns is a list of jpayne@69: the features. These are callback functions that take an jpayne@69: observation and class and return a 1 or 0. update_fn is a jpayne@69: callback function that is called at each training iteration. It is jpayne@69: passed a MaxEntropy object that encapsulates the current state of jpayne@69: the training. jpayne@69: jpayne@69: The maximum number of iterations and the convergence criterion for IIS jpayne@69: are given by max_iis_iterations and iis_converge, respectively, while jpayne@69: max_newton_iterations and newton_converge are the maximum number jpayne@69: of iterations and the convergence criterion for Newton's method. jpayne@69: """ jpayne@69: if not training_set: jpayne@69: raise ValueError("No data in the training set.") jpayne@69: if len(training_set) != len(results): jpayne@69: raise ValueError("training_set and results should be parallel lists.") jpayne@69: jpayne@69: # Rename variables for convenience. jpayne@69: xs, ys = training_set, results jpayne@69: jpayne@69: # Get a list of all the classes that need to be trained. jpayne@69: classes = sorted(set(results)) jpayne@69: jpayne@69: # Cache values for all features. jpayne@69: features = [_eval_feature_fn(fn, training_set, classes) for fn in feature_fns] jpayne@69: # Cache values for f#. jpayne@69: f_sharp = _calc_f_sharp(len(training_set), len(classes), features) jpayne@69: jpayne@69: # Pre-calculate the empirical expectations of the features. jpayne@69: e_empirical = _calc_empirical_expects(xs, ys, classes, features) jpayne@69: jpayne@69: # Now train the alpha parameters to weigh each feature. jpayne@69: alphas = [0.0] * len(features) jpayne@69: iters = 0 jpayne@69: while iters < max_iis_iterations: jpayne@69: nalphas = _train_iis( jpayne@69: xs, jpayne@69: classes, jpayne@69: features, jpayne@69: f_sharp, jpayne@69: alphas, jpayne@69: e_empirical, jpayne@69: max_newton_iterations, jpayne@69: newton_converge, jpayne@69: ) jpayne@69: diff = [np.fabs(x - y) for x, y in zip(alphas, nalphas)] jpayne@69: diff = reduce(np.add, diff, 0) jpayne@69: alphas = nalphas jpayne@69: jpayne@69: me = MaxEntropy() jpayne@69: me.alphas, me.classes, me.feature_fns = alphas, classes, feature_fns jpayne@69: if update_fn is not None: jpayne@69: update_fn(me) jpayne@69: jpayne@69: if diff < iis_converge: # converged jpayne@69: break jpayne@69: else: jpayne@69: raise RuntimeError("IIS did not converge") jpayne@69: jpayne@69: return me jpayne@69: jpayne@69: jpayne@69: if __name__ == "__main__": jpayne@69: from Bio._utils import run_doctest jpayne@69: jpayne@69: run_doctest(verbose=0)