jpayne@0: #! /usr/bin/env python jpayne@0: jpayne@0: import argparse jpayne@0: import binascii jpayne@0: from collections import OrderedDict jpayne@0: from contextlib import contextmanager jpayne@0: import xml.dom.minidom as dom jpayne@0: from xml.etree import ElementTree as xml jpayne@0: from functools import partial, wraps jpayne@0: from multiprocessing.pool import ThreadPool as Pool jpayne@0: from inspect import getargspec jpayne@0: import os, os.path, sys, errno jpayne@0: import imp jpayne@0: import subprocess jpayne@0: import logging jpayne@0: jpayne@0: ver = "unknown" jpayne@0: jpayne@0: jpayne@0: usage = """ CFSAN Genomics QA Core jpayne@0: May 15, 2015 jpayne@0: Justin Payne jpayne@0: justin.payne@fda.hhs.gov jpayne@0: jpayne@0: usage: jpayne@0: {0} /path/to/sandbox (production mode) jpayne@0: {0} {{0}} jpayne@0: jpayne@0: """.format(os.path.basename(__file__)) jpayne@0: jpayne@0: PHRED_OFFSET = 33 jpayne@0: jpayne@0: input_subdir = 'Input' jpayne@0: output_subdir = 'Output' jpayne@0: jpayne@0: #Heng Li's readfq implementation jpayne@0: #https://github.com/lh3/readfq/blob/master/readfq.py jpayne@0: def readfq(fp): # this is a generator function jpayne@0: last = None # this is a buffer keeping the last unprocessed line jpayne@0: while True: # mimic closure; is it a bad idea? jpayne@0: if not last: # the first record or a record following a fastq jpayne@0: for l in fp: # search for the start of the next record jpayne@0: if l[0] in '>@': # fasta/q header line jpayne@0: last = l[:-1] # save this line jpayne@0: break jpayne@0: if not last: break jpayne@0: name, seqs, last = last[1:].partition(" ")[0], [], None jpayne@0: for l in fp: # read the sequence jpayne@0: if l[0] in '@+>': jpayne@0: last = l[:-1] jpayne@0: break jpayne@0: seqs.append(l[:-1]) jpayne@0: if not last or last[0] != '+': # this is a fasta record jpayne@0: yield name, ''.join(seqs), [] # yield a fasta record jpayne@0: if not last: break jpayne@0: else: # this is a fastq record jpayne@0: seq, leng, seqs = ''.join(seqs), 0, [] jpayne@0: for l in fp: # read the quality jpayne@0: seqs.append(l[:-1]) jpayne@0: leng += len(l) - 1 jpayne@0: if leng >= len(seq): # have read enough quality jpayne@0: last = None jpayne@0: yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record jpayne@0: break jpayne@0: if last: # reach EOF before reading enough quality jpayne@0: yield name, seq, [] # yield a fasta record instead jpayne@0: break jpayne@0: jpayne@0: #from qa_core import qa_test, new_output_file, open_input, readfq, qa_params jpayne@0: jpayne@0: qa_tests = OrderedDict() jpayne@0: jpayne@0: qa_params = OrderedDict() jpayne@0: jpayne@0: qa_core = imp.new_module("qa_core") jpayne@0: sys.modules['qa_core'] = qa_core jpayne@0: jpayne@0: def qa_test(func=None, disable=False, profile=True): jpayne@0: "QA Test decorator" jpayne@0: def qa_test_decorator(qa_func): jpayne@0: #define a parameter-binding wrapper jpayne@0: @wraps(qa_func) jpayne@0: def wrap(namespace): jpayne@0: "Use partial application of functions to bind parameters declared in the QA function from a namespace object" jpayne@0: monad = qa_func jpayne@0: for argument in getargspec(qa_func).args: jpayne@0: if hasattr(namespace, argument) and getattr(namespace, argument): jpayne@0: monad = partial(monad, **{argument: getattr(namespace, argument)}) jpayne@0: jpayne@0: #and then call it jpayne@0: return monad() jpayne@0: jpayne@0: wrap.__profile_enabled__ = profile jpayne@0: if not disable: jpayne@0: # if not name: jpayne@0: # name = qa_func.__name__ jpayne@0: qa_tests[qa_func.__name__] = wrap jpayne@0: return wrap jpayne@0: if func: jpayne@0: return qa_test_decorator(func) jpayne@0: return qa_test_decorator jpayne@0: jpayne@0: qa_core.qa_test = qa_test jpayne@0: qa_core.readfq = readfq jpayne@0: qa_core.qa_params = qa_params jpayne@0: jpayne@0: @qa_test jpayne@0: def version(): jpayne@0: "Collect repository commit id from git" jpayne@0: p = os.path.abspath(os.path.dirname(__file__)) jpayne@0: try: jpayne@0: ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '') jpayne@0: except subprocess.CalledProcessError: jpayne@0: ref = "+".join(qa_tests.keys())[:255] jpayne@0: return ref jpayne@0: jpayne@0: @qa_test jpayne@0: def name(annotation=None): jpayne@0: if annotation: jpayne@0: return annotation jpayne@0: jpayne@0: @contextmanager jpayne@0: def open_file(name, path='.', mode='w'): jpayne@0: with open(os.path.join(path, name), mode) as f: jpayne@0: if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b': jpayne@0: import gzip jpayne@0: with gzip.open(os.path.join(path, name), mode) as g: jpayne@0: yield g jpayne@0: else: jpayne@0: f.seek(0) jpayne@0: yield f jpayne@0: jpayne@0: def suite(namespace, log=logging.getLogger('qa.suite')): jpayne@0: for name, test in qa_tests.items(): jpayne@0: logg = log.getChild(name) jpayne@0: try: jpayne@0: result = test(namespace) jpayne@0: if result is not None: jpayne@0: logg.info(result) jpayne@0: yield name, result jpayne@0: except Exception as e: jpayne@0: import traceback jpayne@0: logg.error(traceback.format_exc()) jpayne@0: jpayne@0: jpayne@0: def main(namespace): jpayne@0: if namespace.annotation: jpayne@0: qa_params['annotation'] = namespace.annotation jpayne@0: parallelism = vars(namespace).get('parallelism', 1) jpayne@0: if parallelism > 1: jpayne@0: qa_params['map implementation'] = Pool(parallelism).map jpayne@0: if namespace.mode not in ('suite', 'profile'): jpayne@0: #it's a path to a sandbox job jpayne@0: logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode)) jpayne@0: tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml")) jpayne@0: root = tree.getroot() jpayne@0: #logging.getLogger('qa.input').debug(xml.tostring(root).strip()) jpayne@0: for element in root: jpayne@0: val = getattr(namespace, element.tag) jpayne@0: if isinstance(val, list): jpayne@0: val.append(element.text) jpayne@0: else: jpayne@0: setattr(namespace, element.tag, element.text) jpayne@0: #set up utility functions jpayne@0: path = os.path.join(namespace.mode, 'Output') jpayne@0: try: jpayne@0: os.makedirs(path) jpayne@0: except OSError as e: jpayne@0: if e.errno != errno.EEXIST: jpayne@0: raise jpayne@0: except Exception as e: jpayne@0: logging.getLogger('qa.setup').error(e) jpayne@0: qa_core.new_output_file = partial(open_file, path=path) jpayne@0: #qa_core.new_output_file = lambda f: open_file(f, path=path) jpayne@0: qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r') jpayne@0: #import and run tests jpayne@0: logging.getLogger('qa.tests').info("Loading and running tests...") jpayne@0: import gims_qa_tests jpayne@0: root = xml.Element("qaQcCoverageOutput") jpayne@0: tree = xml.ElementTree(element=root) jpayne@0: [logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()] jpayne@0: for i, (name, result) in enumerate(suite(namespace)): jpayne@0: if isinstance(result, xml.Element): jpayne@0: root.append(result) jpayne@0: else: jpayne@0: if result: jpayne@0: xml.SubElement(root, name).text = str(result) jpayne@0: jpayne@0: try: jpayne@0: with qa_core.new_output_file('output.xml') as out: jpayne@0: d = dom.parseString(xml.tostring(root)) jpayne@0: d.writexml(out, addindent=' ', newl='\n') jpayne@0: logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode)) jpayne@0: except Exception as e: jpayne@0: logging.getLogger('qa.results').error(e) jpayne@0: elif namespace.mode == 'suite': jpayne@0: qa_core.open_input = partial(open_file, mode='rb') jpayne@0: qa_core.new_output_file = open_file jpayne@0: if not namespace.output_plots: jpayne@0: qa_params['plots'] = False jpayne@0: else: jpayne@0: #set up directory for plots jpayne@0: pass jpayne@0: import gims_qa_tests jpayne@0: def fix_value(result): jpayne@0: if isinstance(result, xml.Element): jpayne@0: return xml.tostring(result) jpayne@0: return result jpayne@0: results = {name:fix_value(result) for name, result in suite(namespace)} jpayne@0: import csv jpayne@0: wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys())) jpayne@0: wtr.writeheader() jpayne@0: wtr.writerow(results) jpayne@0: elif namespace.mode == 'profile': jpayne@0: return profile(namespace) jpayne@0: jpayne@0: jpayne@0: def profile(namespace): jpayne@0: log = logging.getLogger('qa.profile') jpayne@0: import cProfile jpayne@0: import StringIO jpayne@0: from Bio.Seq import Seq jpayne@0: from Bio.SeqRecord import SeqRecord jpayne@0: from Bio.Alphabet import IUPAC jpayne@0: import random jpayne@0: from random import choice as c jpayne@0: from tempfile import TemporaryFile as tmp jpayne@0: jpayne@0: qa_params['plots'] = False jpayne@0: jpayne@0: profile_fastq = StringIO.StringIO() jpayne@0: log.info("Setup...") jpayne@0: #create some fake FASTQ 'reads' jpayne@0: for r in range(10000): jpayne@0: #boring random distribution jpayne@0: #s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)]) jpayne@0: #interesting diamond distribution jpayne@0: s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)]) jpayne@0: s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)]) jpayne@0: jpayne@0: q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)] jpayne@0: rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r)) jpayne@0: rec.letter_annotations['phred_quality'] = q jpayne@0: profile_fastq.write(rec.format('fastq')) jpayne@0: @contextmanager jpayne@0: def input_stub(filename, mode=None): jpayne@0: if 'xml' in filename: jpayne@0: profile_xml.seek(0) jpayne@0: yield profile_xml jpayne@0: elif 'fastq' in filename: jpayne@0: profile_fastq.seek(0) jpayne@0: yield profile_fastq jpayne@0: else: jpayne@0: yield open('/dev/null', 'r') jpayne@0: jpayne@0: qa_core.open_input = input_stub jpayne@0: profiling_buffer = StringIO.StringIO() jpayne@0: jpayne@0: @contextmanager jpayne@0: def get_profiling_buffer(*a, **k): jpayne@0: yield profiling_buffer jpayne@0: profiling_buffer.seek(0) jpayne@0: jpayne@0: qa_core.new_output_file = get_profiling_buffer jpayne@0: import gims_qa_tests jpayne@0: jpayne@0: namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq') jpayne@0: namespace.sequencingType = 'Illumina MiSeq' jpayne@0: namespace.approximateGenomeSize = 1000000 jpayne@0: jpayne@0: def run_tests(namespace, log): jpayne@0: log.getChild('run') jpayne@0: log.info(vars(namespace)) jpayne@0: for name, test in qa_tests.items(): jpayne@0: if test.__profile_enabled__: jpayne@0: test(namespace) jpayne@0: log.info("Profiling...") jpayne@0: return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime') jpayne@0: jpayne@0: if __name__ == '__main__': jpayne@0: jpayne@0: parser = argparse.ArgumentParser() jpayne@0: parser.add_argument("mode") jpayne@0: parser.add_argument('sequenceFileName', nargs='*') jpayne@0: parser.add_argument('--genus','-g') jpayne@0: parser.add_argument('--species', '-s') jpayne@0: parser.add_argument('--sequencingType', '-t') jpayne@0: parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000) jpayne@0: parser.add_argument('--coverageThreshold', '-c', type=int, default=20) jpayne@0: parser.add_argument('--minimumQScore', '-q', type=float, default=27.0) jpayne@0: parser.add_argument('--databasePath', '--db') jpayne@0: parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout) jpayne@0: parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout) jpayne@0: parser.add_argument('--output_plots') jpayne@0: parser.add_argument('--parallelism', '-p', type=int, default=1) jpayne@0: parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation') jpayne@0: params = parser.parse_args() jpayne@0: logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log) jpayne@0: logging.getLogger('qa.cmd').info(" ".join(sys.argv)) jpayne@0: main(params)