Mercurial > repos > jpayne > cfsan_qaqc
view qa_core/gims_core.py @ 2:021864b5a0a5 tip
planemo upload
author | jpayne |
---|---|
date | Fri, 11 May 2018 10:25:14 -0400 |
parents | dafec28bd37e |
children |
line wrap: on
line source
#! /usr/bin/env python import argparse import binascii from collections import OrderedDict from contextlib import contextmanager import xml.dom.minidom as dom from xml.etree import ElementTree as xml from functools import partial, wraps from multiprocessing.pool import ThreadPool as Pool from inspect import getargspec import os, os.path, sys, errno import imp import subprocess import logging ver = "unknown" usage = """ CFSAN Genomics QA Core May 15, 2015 Justin Payne justin.payne@fda.hhs.gov usage: {0} /path/to/sandbox (production mode) {0} {{0}} """.format(os.path.basename(__file__)) PHRED_OFFSET = 33 input_subdir = 'Input' output_subdir = 'Output' #Heng Li's readfq implementation #https://github.com/lh3/readfq/blob/master/readfq.py def readfq(fp): # this is a generator function last = None # this is a buffer keeping the last unprocessed line while True: # mimic closure; is it a bad idea? if not last: # the first record or a record following a fastq for l in fp: # search for the start of the next record if l[0] in '>@': # fasta/q header line last = l[:-1] # save this line break if not last: break name, seqs, last = last[1:].partition(" ")[0], [], None for l in fp: # read the sequence if l[0] in '@+>': last = l[:-1] break seqs.append(l[:-1]) if not last or last[0] != '+': # this is a fasta record yield name, ''.join(seqs), [] # yield a fasta record if not last: break else: # this is a fastq record seq, leng, seqs = ''.join(seqs), 0, [] for l in fp: # read the quality seqs.append(l[:-1]) leng += len(l) - 1 if leng >= len(seq): # have read enough quality last = None yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record break if last: # reach EOF before reading enough quality yield name, seq, [] # yield a fasta record instead break #from qa_core import qa_test, new_output_file, open_input, readfq, qa_params qa_tests = OrderedDict() qa_params = OrderedDict() qa_core = imp.new_module("qa_core") sys.modules['qa_core'] = qa_core def qa_test(func=None, disable=False, profile=True): "QA Test decorator" def qa_test_decorator(qa_func): #define a parameter-binding wrapper @wraps(qa_func) def wrap(namespace): "Use partial application of functions to bind parameters declared in the QA function from a namespace object" monad = qa_func for argument in getargspec(qa_func).args: if hasattr(namespace, argument) and getattr(namespace, argument): monad = partial(monad, **{argument: getattr(namespace, argument)}) #and then call it return monad() wrap.__profile_enabled__ = profile if not disable: # if not name: # name = qa_func.__name__ qa_tests[qa_func.__name__] = wrap return wrap if func: return qa_test_decorator(func) return qa_test_decorator qa_core.qa_test = qa_test qa_core.readfq = readfq qa_core.qa_params = qa_params @qa_test def version(): "Collect repository commit id from git" p = os.path.abspath(os.path.dirname(__file__)) try: ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '') except subprocess.CalledProcessError: ref = "+".join(qa_tests.keys())[:255] return ref @qa_test def name(annotation=None): if annotation: return annotation @contextmanager def open_file(name, path='.', mode='w'): with open(os.path.join(path, name), mode) as f: if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b': import gzip with gzip.open(os.path.join(path, name), mode) as g: yield g else: f.seek(0) yield f def suite(namespace, log=logging.getLogger('qa.suite')): for name, test in qa_tests.items(): logg = log.getChild(name) try: result = test(namespace) if result is not None: logg.info(result) yield name, result except Exception as e: import traceback logg.error(traceback.format_exc()) def main(namespace): if namespace.annotation: qa_params['annotation'] = namespace.annotation parallelism = vars(namespace).get('parallelism', 1) if parallelism > 1: qa_params['map implementation'] = Pool(parallelism).map if namespace.mode not in ('suite', 'profile'): #it's a path to a sandbox job logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode)) tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml")) root = tree.getroot() #logging.getLogger('qa.input').debug(xml.tostring(root).strip()) for element in root: val = getattr(namespace, element.tag) if isinstance(val, list): val.append(element.text) else: setattr(namespace, element.tag, element.text) #set up utility functions path = os.path.join(namespace.mode, 'Output') try: os.makedirs(path) except OSError as e: if e.errno != errno.EEXIST: raise except Exception as e: logging.getLogger('qa.setup').error(e) qa_core.new_output_file = partial(open_file, path=path) #qa_core.new_output_file = lambda f: open_file(f, path=path) qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r') #import and run tests logging.getLogger('qa.tests').info("Loading and running tests...") import gims_qa_tests root = xml.Element("qaQcCoverageOutput") tree = xml.ElementTree(element=root) [logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()] for i, (name, result) in enumerate(suite(namespace)): if isinstance(result, xml.Element): root.append(result) else: if result: xml.SubElement(root, name).text = str(result) try: with qa_core.new_output_file('output.xml') as out: d = dom.parseString(xml.tostring(root)) d.writexml(out, addindent=' ', newl='\n') logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode)) except Exception as e: logging.getLogger('qa.results').error(e) elif namespace.mode == 'suite': qa_core.open_input = partial(open_file, mode='rb') qa_core.new_output_file = open_file if not namespace.output_plots: qa_params['plots'] = False else: #set up directory for plots pass import gims_qa_tests def fix_value(result): if isinstance(result, xml.Element): return xml.tostring(result) return result results = {name:fix_value(result) for name, result in suite(namespace)} import csv wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys())) wtr.writeheader() wtr.writerow(results) elif namespace.mode == 'profile': return profile(namespace) def profile(namespace): log = logging.getLogger('qa.profile') import cProfile import StringIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from Bio.Alphabet import IUPAC import random from random import choice as c from tempfile import TemporaryFile as tmp qa_params['plots'] = False profile_fastq = StringIO.StringIO() log.info("Setup...") #create some fake FASTQ 'reads' for r in range(10000): #boring random distribution #s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)]) #interesting diamond distribution s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)]) s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)]) q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)] rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r)) rec.letter_annotations['phred_quality'] = q profile_fastq.write(rec.format('fastq')) @contextmanager def input_stub(filename, mode=None): if 'xml' in filename: profile_xml.seek(0) yield profile_xml elif 'fastq' in filename: profile_fastq.seek(0) yield profile_fastq else: yield open('/dev/null', 'r') qa_core.open_input = input_stub profiling_buffer = StringIO.StringIO() @contextmanager def get_profiling_buffer(*a, **k): yield profiling_buffer profiling_buffer.seek(0) qa_core.new_output_file = get_profiling_buffer import gims_qa_tests namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq') namespace.sequencingType = 'Illumina MiSeq' namespace.approximateGenomeSize = 1000000 def run_tests(namespace, log): log.getChild('run') log.info(vars(namespace)) for name, test in qa_tests.items(): if test.__profile_enabled__: test(namespace) log.info("Profiling...") return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("mode") parser.add_argument('sequenceFileName', nargs='*') parser.add_argument('--genus','-g') parser.add_argument('--species', '-s') parser.add_argument('--sequencingType', '-t') parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000) parser.add_argument('--coverageThreshold', '-c', type=int, default=20) parser.add_argument('--minimumQScore', '-q', type=float, default=27.0) parser.add_argument('--databasePath', '--db') parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout) parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout) parser.add_argument('--output_plots') parser.add_argument('--parallelism', '-p', type=int, default=1) parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation') params = parser.parse_args() logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log) logging.getLogger('qa.cmd').info(" ".join(sys.argv)) main(params)