Mercurial > repos > jpayne > cfsan_qaqc
diff qa_core/gims_core.py @ 0:dafec28bd37e
planemo upload
author | jpayne |
---|---|
date | Tue, 08 May 2018 18:54:20 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qa_core/gims_core.py Tue May 08 18:54:20 2018 -0400 @@ -0,0 +1,300 @@ +#! /usr/bin/env python + +import argparse +import binascii +from collections import OrderedDict +from contextlib import contextmanager +import xml.dom.minidom as dom +from xml.etree import ElementTree as xml +from functools import partial, wraps +from multiprocessing.pool import ThreadPool as Pool +from inspect import getargspec +import os, os.path, sys, errno +import imp +import subprocess +import logging + +ver = "unknown" + + +usage = """ CFSAN Genomics QA Core +May 15, 2015 +Justin Payne +justin.payne@fda.hhs.gov + + usage: + {0} /path/to/sandbox (production mode) + {0} {{0}} + +""".format(os.path.basename(__file__)) + +PHRED_OFFSET = 33 + +input_subdir = 'Input' +output_subdir = 'Output' + +#Heng Li's readfq implementation +#https://github.com/lh3/readfq/blob/master/readfq.py +def readfq(fp): # this is a generator function + last = None # this is a buffer keeping the last unprocessed line + while True: # mimic closure; is it a bad idea? + if not last: # the first record or a record following a fastq + for l in fp: # search for the start of the next record + if l[0] in '>@': # fasta/q header line + last = l[:-1] # save this line + break + if not last: break + name, seqs, last = last[1:].partition(" ")[0], [], None + for l in fp: # read the sequence + if l[0] in '@+>': + last = l[:-1] + break + seqs.append(l[:-1]) + if not last or last[0] != '+': # this is a fasta record + yield name, ''.join(seqs), [] # yield a fasta record + if not last: break + else: # this is a fastq record + seq, leng, seqs = ''.join(seqs), 0, [] + for l in fp: # read the quality + seqs.append(l[:-1]) + leng += len(l) - 1 + if leng >= len(seq): # have read enough quality + last = None + yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record + break + if last: # reach EOF before reading enough quality + yield name, seq, [] # yield a fasta record instead + break + +#from qa_core import qa_test, new_output_file, open_input, readfq, qa_params + +qa_tests = OrderedDict() + +qa_params = OrderedDict() + +qa_core = imp.new_module("qa_core") +sys.modules['qa_core'] = qa_core + +def qa_test(func=None, disable=False, profile=True): + "QA Test decorator" + def qa_test_decorator(qa_func): + #define a parameter-binding wrapper + @wraps(qa_func) + def wrap(namespace): + "Use partial application of functions to bind parameters declared in the QA function from a namespace object" + monad = qa_func + for argument in getargspec(qa_func).args: + if hasattr(namespace, argument) and getattr(namespace, argument): + monad = partial(monad, **{argument: getattr(namespace, argument)}) + + #and then call it + return monad() + + wrap.__profile_enabled__ = profile + if not disable: + # if not name: + # name = qa_func.__name__ + qa_tests[qa_func.__name__] = wrap + return wrap + if func: + return qa_test_decorator(func) + return qa_test_decorator + +qa_core.qa_test = qa_test +qa_core.readfq = readfq +qa_core.qa_params = qa_params + +@qa_test +def version(): + "Collect repository commit id from git" + p = os.path.abspath(os.path.dirname(__file__)) + try: + ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '') + except subprocess.CalledProcessError: + ref = "+".join(qa_tests.keys())[:255] + return ref + +@qa_test +def name(annotation=None): + if annotation: + return annotation + +@contextmanager +def open_file(name, path='.', mode='w'): + with open(os.path.join(path, name), mode) as f: + if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b': + import gzip + with gzip.open(os.path.join(path, name), mode) as g: + yield g + else: + f.seek(0) + yield f + +def suite(namespace, log=logging.getLogger('qa.suite')): + for name, test in qa_tests.items(): + logg = log.getChild(name) + try: + result = test(namespace) + if result is not None: + logg.info(result) + yield name, result + except Exception as e: + import traceback + logg.error(traceback.format_exc()) + + +def main(namespace): + if namespace.annotation: + qa_params['annotation'] = namespace.annotation + parallelism = vars(namespace).get('parallelism', 1) + if parallelism > 1: + qa_params['map implementation'] = Pool(parallelism).map + if namespace.mode not in ('suite', 'profile'): + #it's a path to a sandbox job + logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode)) + tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml")) + root = tree.getroot() + #logging.getLogger('qa.input').debug(xml.tostring(root).strip()) + for element in root: + val = getattr(namespace, element.tag) + if isinstance(val, list): + val.append(element.text) + else: + setattr(namespace, element.tag, element.text) + #set up utility functions + path = os.path.join(namespace.mode, 'Output') + try: + os.makedirs(path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + except Exception as e: + logging.getLogger('qa.setup').error(e) + qa_core.new_output_file = partial(open_file, path=path) + #qa_core.new_output_file = lambda f: open_file(f, path=path) + qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r') + #import and run tests + logging.getLogger('qa.tests').info("Loading and running tests...") + import gims_qa_tests + root = xml.Element("qaQcCoverageOutput") + tree = xml.ElementTree(element=root) + [logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()] + for i, (name, result) in enumerate(suite(namespace)): + if isinstance(result, xml.Element): + root.append(result) + else: + if result: + xml.SubElement(root, name).text = str(result) + + try: + with qa_core.new_output_file('output.xml') as out: + d = dom.parseString(xml.tostring(root)) + d.writexml(out, addindent=' ', newl='\n') + logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode)) + except Exception as e: + logging.getLogger('qa.results').error(e) + elif namespace.mode == 'suite': + qa_core.open_input = partial(open_file, mode='rb') + qa_core.new_output_file = open_file + if not namespace.output_plots: + qa_params['plots'] = False + else: + #set up directory for plots + pass + import gims_qa_tests + def fix_value(result): + if isinstance(result, xml.Element): + return xml.tostring(result) + return result + results = {name:fix_value(result) for name, result in suite(namespace)} + import csv + wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys())) + wtr.writeheader() + wtr.writerow(results) + elif namespace.mode == 'profile': + return profile(namespace) + + +def profile(namespace): + log = logging.getLogger('qa.profile') + import cProfile + import StringIO + from Bio.Seq import Seq + from Bio.SeqRecord import SeqRecord + from Bio.Alphabet import IUPAC + import random + from random import choice as c + from tempfile import TemporaryFile as tmp + + qa_params['plots'] = False + + profile_fastq = StringIO.StringIO() + log.info("Setup...") + #create some fake FASTQ 'reads' + for r in range(10000): + #boring random distribution + #s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)]) + #interesting diamond distribution + s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)]) + s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)]) + + q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)] + rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r)) + rec.letter_annotations['phred_quality'] = q + profile_fastq.write(rec.format('fastq')) + @contextmanager + def input_stub(filename, mode=None): + if 'xml' in filename: + profile_xml.seek(0) + yield profile_xml + elif 'fastq' in filename: + profile_fastq.seek(0) + yield profile_fastq + else: + yield open('/dev/null', 'r') + + qa_core.open_input = input_stub + profiling_buffer = StringIO.StringIO() + + @contextmanager + def get_profiling_buffer(*a, **k): + yield profiling_buffer + profiling_buffer.seek(0) + + qa_core.new_output_file = get_profiling_buffer + import gims_qa_tests + + namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq') + namespace.sequencingType = 'Illumina MiSeq' + namespace.approximateGenomeSize = 1000000 + + def run_tests(namespace, log): + log.getChild('run') + log.info(vars(namespace)) + for name, test in qa_tests.items(): + if test.__profile_enabled__: + test(namespace) + log.info("Profiling...") + return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime') + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + parser.add_argument("mode") + parser.add_argument('sequenceFileName', nargs='*') + parser.add_argument('--genus','-g') + parser.add_argument('--species', '-s') + parser.add_argument('--sequencingType', '-t') + parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000) + parser.add_argument('--coverageThreshold', '-c', type=int, default=20) + parser.add_argument('--minimumQScore', '-q', type=float, default=27.0) + parser.add_argument('--databasePath', '--db') + parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout) + parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout) + parser.add_argument('--output_plots') + parser.add_argument('--parallelism', '-p', type=int, default=1) + parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation') + params = parser.parse_args() + logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log) + logging.getLogger('qa.cmd').info(" ".join(sys.argv)) + main(params) \ No newline at end of file