diff qa_core/gims_core.py @ 0:dafec28bd37e

planemo upload
author jpayne
date Tue, 08 May 2018 18:54:20 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qa_core/gims_core.py	Tue May 08 18:54:20 2018 -0400
@@ -0,0 +1,300 @@
+#! /usr/bin/env python
+
+import argparse
+import binascii
+from collections import OrderedDict
+from contextlib import contextmanager
+import xml.dom.minidom as dom
+from xml.etree import ElementTree as xml
+from functools import partial, wraps
+from multiprocessing.pool import ThreadPool as Pool
+from inspect import getargspec
+import os, os.path, sys, errno
+import imp
+import subprocess
+import logging
+
+ver = "unknown"
+
+
+usage = """ CFSAN Genomics QA Core
+May 15, 2015
+Justin Payne
+justin.payne@fda.hhs.gov
+
+	usage:
+	{0} /path/to/sandbox (production mode)
+	{0} {{0}}
+
+""".format(os.path.basename(__file__))
+
+PHRED_OFFSET = 33
+
+input_subdir = 'Input'
+output_subdir = 'Output'
+
+#Heng Li's readfq implementation
+#https://github.com/lh3/readfq/blob/master/readfq.py
+def readfq(fp): # this is a generator function
+    last = None # this is a buffer keeping the last unprocessed line
+    while True: # mimic closure; is it a bad idea?
+        if not last: # the first record or a record following a fastq
+            for l in fp: # search for the start of the next record
+                if l[0] in '>@': # fasta/q header line
+                    last = l[:-1] # save this line
+                    break
+        if not last: break
+        name, seqs, last = last[1:].partition(" ")[0], [], None
+        for l in fp: # read the sequence
+            if l[0] in '@+>':
+                last = l[:-1]
+                break
+            seqs.append(l[:-1])
+        if not last or last[0] != '+': # this is a fasta record
+            yield name, ''.join(seqs), [] # yield a fasta record
+            if not last: break
+        else: # this is a fastq record
+            seq, leng, seqs = ''.join(seqs), 0, []
+            for l in fp: # read the quality
+                seqs.append(l[:-1])
+                leng += len(l) - 1
+                if leng >= len(seq): # have read enough quality
+                    last = None
+                    yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record
+                    break
+            if last: # reach EOF before reading enough quality
+                yield name, seq, [] # yield a fasta record instead
+                break
+
+#from qa_core import qa_test, new_output_file, open_input, readfq, qa_params
+
+qa_tests = OrderedDict()
+
+qa_params = OrderedDict()
+
+qa_core = imp.new_module("qa_core")
+sys.modules['qa_core'] = qa_core
+
+def qa_test(func=None, disable=False, profile=True):
+	"QA Test decorator"
+	def qa_test_decorator(qa_func):
+		#define a parameter-binding wrapper
+		@wraps(qa_func)
+		def wrap(namespace):
+			"Use partial application of functions to bind parameters declared in the QA function from a namespace object"
+			monad = qa_func
+			for argument in getargspec(qa_func).args:
+				if hasattr(namespace, argument) and getattr(namespace, argument):
+					monad = partial(monad, **{argument: getattr(namespace, argument)})
+
+			#and then call it
+			return monad()
+
+		wrap.__profile_enabled__ = profile
+		if not disable:
+			# if not name:
+			# 	name = qa_func.__name__
+			qa_tests[qa_func.__name__] = wrap
+		return wrap
+	if func:
+		return qa_test_decorator(func)
+	return qa_test_decorator
+
+qa_core.qa_test = qa_test
+qa_core.readfq = readfq
+qa_core.qa_params = qa_params
+
+@qa_test
+def version():
+	"Collect repository commit id from git"
+	p = os.path.abspath(os.path.dirname(__file__))
+	try:
+		ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '')
+	except subprocess.CalledProcessError:
+		ref = "+".join(qa_tests.keys())[:255]
+	return ref
+
+@qa_test
+def name(annotation=None):
+	if annotation:
+		return annotation
+
+@contextmanager
+def open_file(name, path='.', mode='w'):
+	with open(os.path.join(path, name), mode) as f:
+		if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b':
+			import gzip
+			with gzip.open(os.path.join(path, name), mode) as g:
+				yield g
+		else:
+			f.seek(0)
+			yield f
+
+def suite(namespace, log=logging.getLogger('qa.suite')):
+	for name, test in qa_tests.items():
+			logg = log.getChild(name)
+			try:
+				result = test(namespace)
+				if result is not None:
+					logg.info(result)
+				yield name, result
+			except Exception as e:
+				import traceback
+				logg.error(traceback.format_exc())
+
+
+def main(namespace):
+	if namespace.annotation:
+		qa_params['annotation'] = namespace.annotation
+	parallelism = vars(namespace).get('parallelism', 1)
+	if parallelism > 1:
+		qa_params['map implementation'] = Pool(parallelism).map
+	if namespace.mode not in ('suite', 'profile'):
+		#it's a path to a sandbox job
+		logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode))
+		tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml"))
+		root = tree.getroot()
+		#logging.getLogger('qa.input').debug(xml.tostring(root).strip())
+		for element in root:
+			val = getattr(namespace, element.tag)
+			if isinstance(val, list):
+				val.append(element.text)
+			else:
+				setattr(namespace, element.tag, element.text)
+		#set up utility functions
+		path = os.path.join(namespace.mode, 'Output')
+		try:
+			os.makedirs(path)
+		except OSError as e:
+			if e.errno != errno.EEXIST:
+				raise
+		except Exception as e:
+			logging.getLogger('qa.setup').error(e)
+		qa_core.new_output_file = partial(open_file, path=path)
+		#qa_core.new_output_file = lambda f: open_file(f, path=path)
+		qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r')
+		#import and run tests
+		logging.getLogger('qa.tests').info("Loading and running tests...")
+		import gims_qa_tests
+		root = xml.Element("qaQcCoverageOutput")
+		tree = xml.ElementTree(element=root)
+		[logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()]
+		for i, (name, result) in enumerate(suite(namespace)):
+			if isinstance(result, xml.Element):
+				root.append(result)
+			else:
+				if result:
+					xml.SubElement(root, name).text = str(result)
+
+		try:
+			with qa_core.new_output_file('output.xml') as out:
+				d = dom.parseString(xml.tostring(root))
+				d.writexml(out, addindent='    ', newl='\n')
+			logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode))
+		except Exception as e:
+			logging.getLogger('qa.results').error(e)
+	elif namespace.mode == 'suite':
+		qa_core.open_input = partial(open_file, mode='rb')
+		qa_core.new_output_file = open_file
+		if not namespace.output_plots:
+			qa_params['plots'] = False
+		else:
+			#set up directory for plots
+			pass
+		import gims_qa_tests
+		def fix_value(result):
+			if isinstance(result, xml.Element):
+				return xml.tostring(result)
+			return result
+		results = {name:fix_value(result) for name, result in suite(namespace)}
+		import csv
+		wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys()))
+		wtr.writeheader()
+		wtr.writerow(results)
+	elif namespace.mode == 'profile':
+		return profile(namespace)
+	
+
+def profile(namespace):
+	log = logging.getLogger('qa.profile')
+	import cProfile
+	import StringIO
+	from Bio.Seq import Seq
+	from Bio.SeqRecord import SeqRecord
+	from Bio.Alphabet import IUPAC
+	import random
+	from random import choice as c
+	from tempfile import TemporaryFile as tmp
+
+	qa_params['plots'] = False
+
+	profile_fastq = StringIO.StringIO()
+	log.info("Setup...")
+	#create some fake FASTQ 'reads'
+	for r in range(10000):
+		#boring random distribution
+		#s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)])
+		#interesting diamond distribution
+		s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)])
+		s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)])
+
+		q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)]
+		rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r))
+		rec.letter_annotations['phred_quality'] = q
+		profile_fastq.write(rec.format('fastq'))
+	@contextmanager
+	def input_stub(filename, mode=None):
+		if 'xml' in filename:
+			profile_xml.seek(0)
+			yield profile_xml
+		elif 'fastq' in filename:
+			profile_fastq.seek(0)
+			yield profile_fastq
+		else:
+			yield open('/dev/null', 'r')
+			
+	qa_core.open_input = input_stub
+	profiling_buffer = StringIO.StringIO()
+
+	@contextmanager
+	def get_profiling_buffer(*a, **k):
+		yield profiling_buffer
+		profiling_buffer.seek(0)
+		
+	qa_core.new_output_file = get_profiling_buffer
+	import gims_qa_tests
+
+	namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq')
+	namespace.sequencingType = 'Illumina MiSeq'
+	namespace.approximateGenomeSize = 1000000
+
+	def run_tests(namespace, log):
+		log.getChild('run')
+		log.info(vars(namespace))
+		for name, test in qa_tests.items():
+			if test.__profile_enabled__:
+				test(namespace)
+	log.info("Profiling...")
+	return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime')
+
+if __name__ == '__main__':
+	
+	parser = argparse.ArgumentParser()
+	parser.add_argument("mode")
+	parser.add_argument('sequenceFileName', nargs='*')
+	parser.add_argument('--genus','-g')
+	parser.add_argument('--species', '-s')
+	parser.add_argument('--sequencingType', '-t')
+	parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000)
+	parser.add_argument('--coverageThreshold', '-c', type=int, default=20)
+	parser.add_argument('--minimumQScore', '-q', type=float, default=27.0)
+	parser.add_argument('--databasePath', '--db')
+	parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout)
+	parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout)
+	parser.add_argument('--output_plots')
+	parser.add_argument('--parallelism', '-p', type=int, default=1)
+	parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation')
+	params = parser.parse_args()
+	logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log)
+	logging.getLogger('qa.cmd').info(" ".join(sys.argv))
+	main(params)
\ No newline at end of file