view qa_core/gims_core.py @ 2:021864b5a0a5 tip

planemo upload
author jpayne
date Fri, 11 May 2018 10:25:14 -0400
parents dafec28bd37e
children
line wrap: on
line source
#! /usr/bin/env python

import argparse
import binascii
from collections import OrderedDict
from contextlib import contextmanager
import xml.dom.minidom as dom
from xml.etree import ElementTree as xml
from functools import partial, wraps
from multiprocessing.pool import ThreadPool as Pool
from inspect import getargspec
import os, os.path, sys, errno
import imp
import subprocess
import logging

ver = "unknown"


usage = """ CFSAN Genomics QA Core
May 15, 2015
Justin Payne
justin.payne@fda.hhs.gov

	usage:
	{0} /path/to/sandbox (production mode)
	{0} {{0}}

""".format(os.path.basename(__file__))

PHRED_OFFSET = 33

input_subdir = 'Input'
output_subdir = 'Output'

#Heng Li's readfq implementation
#https://github.com/lh3/readfq/blob/master/readfq.py
def readfq(fp): # this is a generator function
    last = None # this is a buffer keeping the last unprocessed line
    while True: # mimic closure; is it a bad idea?
        if not last: # the first record or a record following a fastq
            for l in fp: # search for the start of the next record
                if l[0] in '>@': # fasta/q header line
                    last = l[:-1] # save this line
                    break
        if not last: break
        name, seqs, last = last[1:].partition(" ")[0], [], None
        for l in fp: # read the sequence
            if l[0] in '@+>':
                last = l[:-1]
                break
            seqs.append(l[:-1])
        if not last or last[0] != '+': # this is a fasta record
            yield name, ''.join(seqs), [] # yield a fasta record
            if not last: break
        else: # this is a fastq record
            seq, leng, seqs = ''.join(seqs), 0, []
            for l in fp: # read the quality
                seqs.append(l[:-1])
                leng += len(l) - 1
                if leng >= len(seq): # have read enough quality
                    last = None
                    yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record
                    break
            if last: # reach EOF before reading enough quality
                yield name, seq, [] # yield a fasta record instead
                break

#from qa_core import qa_test, new_output_file, open_input, readfq, qa_params

qa_tests = OrderedDict()

qa_params = OrderedDict()

qa_core = imp.new_module("qa_core")
sys.modules['qa_core'] = qa_core

def qa_test(func=None, disable=False, profile=True):
	"QA Test decorator"
	def qa_test_decorator(qa_func):
		#define a parameter-binding wrapper
		@wraps(qa_func)
		def wrap(namespace):
			"Use partial application of functions to bind parameters declared in the QA function from a namespace object"
			monad = qa_func
			for argument in getargspec(qa_func).args:
				if hasattr(namespace, argument) and getattr(namespace, argument):
					monad = partial(monad, **{argument: getattr(namespace, argument)})

			#and then call it
			return monad()

		wrap.__profile_enabled__ = profile
		if not disable:
			# if not name:
			# 	name = qa_func.__name__
			qa_tests[qa_func.__name__] = wrap
		return wrap
	if func:
		return qa_test_decorator(func)
	return qa_test_decorator

qa_core.qa_test = qa_test
qa_core.readfq = readfq
qa_core.qa_params = qa_params

@qa_test
def version():
	"Collect repository commit id from git"
	p = os.path.abspath(os.path.dirname(__file__))
	try:
		ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '')
	except subprocess.CalledProcessError:
		ref = "+".join(qa_tests.keys())[:255]
	return ref

@qa_test
def name(annotation=None):
	if annotation:
		return annotation

@contextmanager
def open_file(name, path='.', mode='w'):
	with open(os.path.join(path, name), mode) as f:
		if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b':
			import gzip
			with gzip.open(os.path.join(path, name), mode) as g:
				yield g
		else:
			f.seek(0)
			yield f

def suite(namespace, log=logging.getLogger('qa.suite')):
	for name, test in qa_tests.items():
			logg = log.getChild(name)
			try:
				result = test(namespace)
				if result is not None:
					logg.info(result)
				yield name, result
			except Exception as e:
				import traceback
				logg.error(traceback.format_exc())


def main(namespace):
	if namespace.annotation:
		qa_params['annotation'] = namespace.annotation
	parallelism = vars(namespace).get('parallelism', 1)
	if parallelism > 1:
		qa_params['map implementation'] = Pool(parallelism).map
	if namespace.mode not in ('suite', 'profile'):
		#it's a path to a sandbox job
		logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode))
		tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml"))
		root = tree.getroot()
		#logging.getLogger('qa.input').debug(xml.tostring(root).strip())
		for element in root:
			val = getattr(namespace, element.tag)
			if isinstance(val, list):
				val.append(element.text)
			else:
				setattr(namespace, element.tag, element.text)
		#set up utility functions
		path = os.path.join(namespace.mode, 'Output')
		try:
			os.makedirs(path)
		except OSError as e:
			if e.errno != errno.EEXIST:
				raise
		except Exception as e:
			logging.getLogger('qa.setup').error(e)
		qa_core.new_output_file = partial(open_file, path=path)
		#qa_core.new_output_file = lambda f: open_file(f, path=path)
		qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r')
		#import and run tests
		logging.getLogger('qa.tests').info("Loading and running tests...")
		import gims_qa_tests
		root = xml.Element("qaQcCoverageOutput")
		tree = xml.ElementTree(element=root)
		[logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()]
		for i, (name, result) in enumerate(suite(namespace)):
			if isinstance(result, xml.Element):
				root.append(result)
			else:
				if result:
					xml.SubElement(root, name).text = str(result)

		try:
			with qa_core.new_output_file('output.xml') as out:
				d = dom.parseString(xml.tostring(root))
				d.writexml(out, addindent='    ', newl='\n')
			logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode))
		except Exception as e:
			logging.getLogger('qa.results').error(e)
	elif namespace.mode == 'suite':
		qa_core.open_input = partial(open_file, mode='rb')
		qa_core.new_output_file = open_file
		if not namespace.output_plots:
			qa_params['plots'] = False
		else:
			#set up directory for plots
			pass
		import gims_qa_tests
		def fix_value(result):
			if isinstance(result, xml.Element):
				return xml.tostring(result)
			return result
		results = {name:fix_value(result) for name, result in suite(namespace)}
		import csv
		wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys()))
		wtr.writeheader()
		wtr.writerow(results)
	elif namespace.mode == 'profile':
		return profile(namespace)
	

def profile(namespace):
	log = logging.getLogger('qa.profile')
	import cProfile
	import StringIO
	from Bio.Seq import Seq
	from Bio.SeqRecord import SeqRecord
	from Bio.Alphabet import IUPAC
	import random
	from random import choice as c
	from tempfile import TemporaryFile as tmp

	qa_params['plots'] = False

	profile_fastq = StringIO.StringIO()
	log.info("Setup...")
	#create some fake FASTQ 'reads'
	for r in range(10000):
		#boring random distribution
		#s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)])
		#interesting diamond distribution
		s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)])
		s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)])

		q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)]
		rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r))
		rec.letter_annotations['phred_quality'] = q
		profile_fastq.write(rec.format('fastq'))
	@contextmanager
	def input_stub(filename, mode=None):
		if 'xml' in filename:
			profile_xml.seek(0)
			yield profile_xml
		elif 'fastq' in filename:
			profile_fastq.seek(0)
			yield profile_fastq
		else:
			yield open('/dev/null', 'r')
			
	qa_core.open_input = input_stub
	profiling_buffer = StringIO.StringIO()

	@contextmanager
	def get_profiling_buffer(*a, **k):
		yield profiling_buffer
		profiling_buffer.seek(0)
		
	qa_core.new_output_file = get_profiling_buffer
	import gims_qa_tests

	namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq')
	namespace.sequencingType = 'Illumina MiSeq'
	namespace.approximateGenomeSize = 1000000

	def run_tests(namespace, log):
		log.getChild('run')
		log.info(vars(namespace))
		for name, test in qa_tests.items():
			if test.__profile_enabled__:
				test(namespace)
	log.info("Profiling...")
	return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime')

if __name__ == '__main__':
	
	parser = argparse.ArgumentParser()
	parser.add_argument("mode")
	parser.add_argument('sequenceFileName', nargs='*')
	parser.add_argument('--genus','-g')
	parser.add_argument('--species', '-s')
	parser.add_argument('--sequencingType', '-t')
	parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000)
	parser.add_argument('--coverageThreshold', '-c', type=int, default=20)
	parser.add_argument('--minimumQScore', '-q', type=float, default=27.0)
	parser.add_argument('--databasePath', '--db')
	parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout)
	parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout)
	parser.add_argument('--output_plots')
	parser.add_argument('--parallelism', '-p', type=int, default=1)
	parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation')
	params = parser.parse_args()
	logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log)
	logging.getLogger('qa.cmd').info(" ".join(sys.argv))
	main(params)