jpayne@0
|
1 #! /usr/bin/env python
|
jpayne@0
|
2
|
jpayne@0
|
3 import argparse
|
jpayne@0
|
4 import binascii
|
jpayne@0
|
5 from collections import OrderedDict
|
jpayne@0
|
6 from contextlib import contextmanager
|
jpayne@0
|
7 import xml.dom.minidom as dom
|
jpayne@0
|
8 from xml.etree import ElementTree as xml
|
jpayne@0
|
9 from functools import partial, wraps
|
jpayne@0
|
10 from multiprocessing.pool import ThreadPool as Pool
|
jpayne@0
|
11 from inspect import getargspec
|
jpayne@0
|
12 import os, os.path, sys, errno
|
jpayne@0
|
13 import imp
|
jpayne@0
|
14 import subprocess
|
jpayne@0
|
15 import logging
|
jpayne@0
|
16
|
jpayne@0
|
17 ver = "unknown"
|
jpayne@0
|
18
|
jpayne@0
|
19
|
jpayne@0
|
20 usage = """ CFSAN Genomics QA Core
|
jpayne@0
|
21 May 15, 2015
|
jpayne@0
|
22 Justin Payne
|
jpayne@0
|
23 justin.payne@fda.hhs.gov
|
jpayne@0
|
24
|
jpayne@0
|
25 usage:
|
jpayne@0
|
26 {0} /path/to/sandbox (production mode)
|
jpayne@0
|
27 {0} {{0}}
|
jpayne@0
|
28
|
jpayne@0
|
29 """.format(os.path.basename(__file__))
|
jpayne@0
|
30
|
jpayne@0
|
31 PHRED_OFFSET = 33
|
jpayne@0
|
32
|
jpayne@0
|
33 input_subdir = 'Input'
|
jpayne@0
|
34 output_subdir = 'Output'
|
jpayne@0
|
35
|
jpayne@0
|
36 #Heng Li's readfq implementation
|
jpayne@0
|
37 #https://github.com/lh3/readfq/blob/master/readfq.py
|
jpayne@0
|
38 def readfq(fp): # this is a generator function
|
jpayne@0
|
39 last = None # this is a buffer keeping the last unprocessed line
|
jpayne@0
|
40 while True: # mimic closure; is it a bad idea?
|
jpayne@0
|
41 if not last: # the first record or a record following a fastq
|
jpayne@0
|
42 for l in fp: # search for the start of the next record
|
jpayne@0
|
43 if l[0] in '>@': # fasta/q header line
|
jpayne@0
|
44 last = l[:-1] # save this line
|
jpayne@0
|
45 break
|
jpayne@0
|
46 if not last: break
|
jpayne@0
|
47 name, seqs, last = last[1:].partition(" ")[0], [], None
|
jpayne@0
|
48 for l in fp: # read the sequence
|
jpayne@0
|
49 if l[0] in '@+>':
|
jpayne@0
|
50 last = l[:-1]
|
jpayne@0
|
51 break
|
jpayne@0
|
52 seqs.append(l[:-1])
|
jpayne@0
|
53 if not last or last[0] != '+': # this is a fasta record
|
jpayne@0
|
54 yield name, ''.join(seqs), [] # yield a fasta record
|
jpayne@0
|
55 if not last: break
|
jpayne@0
|
56 else: # this is a fastq record
|
jpayne@0
|
57 seq, leng, seqs = ''.join(seqs), 0, []
|
jpayne@0
|
58 for l in fp: # read the quality
|
jpayne@0
|
59 seqs.append(l[:-1])
|
jpayne@0
|
60 leng += len(l) - 1
|
jpayne@0
|
61 if leng >= len(seq): # have read enough quality
|
jpayne@0
|
62 last = None
|
jpayne@0
|
63 yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record
|
jpayne@0
|
64 break
|
jpayne@0
|
65 if last: # reach EOF before reading enough quality
|
jpayne@0
|
66 yield name, seq, [] # yield a fasta record instead
|
jpayne@0
|
67 break
|
jpayne@0
|
68
|
jpayne@0
|
69 #from qa_core import qa_test, new_output_file, open_input, readfq, qa_params
|
jpayne@0
|
70
|
jpayne@0
|
71 qa_tests = OrderedDict()
|
jpayne@0
|
72
|
jpayne@0
|
73 qa_params = OrderedDict()
|
jpayne@0
|
74
|
jpayne@0
|
75 qa_core = imp.new_module("qa_core")
|
jpayne@0
|
76 sys.modules['qa_core'] = qa_core
|
jpayne@0
|
77
|
jpayne@0
|
78 def qa_test(func=None, disable=False, profile=True):
|
jpayne@0
|
79 "QA Test decorator"
|
jpayne@0
|
80 def qa_test_decorator(qa_func):
|
jpayne@0
|
81 #define a parameter-binding wrapper
|
jpayne@0
|
82 @wraps(qa_func)
|
jpayne@0
|
83 def wrap(namespace):
|
jpayne@0
|
84 "Use partial application of functions to bind parameters declared in the QA function from a namespace object"
|
jpayne@0
|
85 monad = qa_func
|
jpayne@0
|
86 for argument in getargspec(qa_func).args:
|
jpayne@0
|
87 if hasattr(namespace, argument) and getattr(namespace, argument):
|
jpayne@0
|
88 monad = partial(monad, **{argument: getattr(namespace, argument)})
|
jpayne@0
|
89
|
jpayne@0
|
90 #and then call it
|
jpayne@0
|
91 return monad()
|
jpayne@0
|
92
|
jpayne@0
|
93 wrap.__profile_enabled__ = profile
|
jpayne@0
|
94 if not disable:
|
jpayne@0
|
95 # if not name:
|
jpayne@0
|
96 # name = qa_func.__name__
|
jpayne@0
|
97 qa_tests[qa_func.__name__] = wrap
|
jpayne@0
|
98 return wrap
|
jpayne@0
|
99 if func:
|
jpayne@0
|
100 return qa_test_decorator(func)
|
jpayne@0
|
101 return qa_test_decorator
|
jpayne@0
|
102
|
jpayne@0
|
103 qa_core.qa_test = qa_test
|
jpayne@0
|
104 qa_core.readfq = readfq
|
jpayne@0
|
105 qa_core.qa_params = qa_params
|
jpayne@0
|
106
|
jpayne@0
|
107 @qa_test
|
jpayne@0
|
108 def version():
|
jpayne@0
|
109 "Collect repository commit id from git"
|
jpayne@0
|
110 p = os.path.abspath(os.path.dirname(__file__))
|
jpayne@0
|
111 try:
|
jpayne@0
|
112 ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '')
|
jpayne@0
|
113 except subprocess.CalledProcessError:
|
jpayne@0
|
114 ref = "+".join(qa_tests.keys())[:255]
|
jpayne@0
|
115 return ref
|
jpayne@0
|
116
|
jpayne@0
|
117 @qa_test
|
jpayne@0
|
118 def name(annotation=None):
|
jpayne@0
|
119 if annotation:
|
jpayne@0
|
120 return annotation
|
jpayne@0
|
121
|
jpayne@0
|
122 @contextmanager
|
jpayne@0
|
123 def open_file(name, path='.', mode='w'):
|
jpayne@0
|
124 with open(os.path.join(path, name), mode) as f:
|
jpayne@0
|
125 if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b':
|
jpayne@0
|
126 import gzip
|
jpayne@0
|
127 with gzip.open(os.path.join(path, name), mode) as g:
|
jpayne@0
|
128 yield g
|
jpayne@0
|
129 else:
|
jpayne@0
|
130 f.seek(0)
|
jpayne@0
|
131 yield f
|
jpayne@0
|
132
|
jpayne@0
|
133 def suite(namespace, log=logging.getLogger('qa.suite')):
|
jpayne@0
|
134 for name, test in qa_tests.items():
|
jpayne@0
|
135 logg = log.getChild(name)
|
jpayne@0
|
136 try:
|
jpayne@0
|
137 result = test(namespace)
|
jpayne@0
|
138 if result is not None:
|
jpayne@0
|
139 logg.info(result)
|
jpayne@0
|
140 yield name, result
|
jpayne@0
|
141 except Exception as e:
|
jpayne@0
|
142 import traceback
|
jpayne@0
|
143 logg.error(traceback.format_exc())
|
jpayne@0
|
144
|
jpayne@0
|
145
|
jpayne@0
|
146 def main(namespace):
|
jpayne@0
|
147 if namespace.annotation:
|
jpayne@0
|
148 qa_params['annotation'] = namespace.annotation
|
jpayne@0
|
149 parallelism = vars(namespace).get('parallelism', 1)
|
jpayne@0
|
150 if parallelism > 1:
|
jpayne@0
|
151 qa_params['map implementation'] = Pool(parallelism).map
|
jpayne@0
|
152 if namespace.mode not in ('suite', 'profile'):
|
jpayne@0
|
153 #it's a path to a sandbox job
|
jpayne@0
|
154 logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode))
|
jpayne@0
|
155 tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml"))
|
jpayne@0
|
156 root = tree.getroot()
|
jpayne@0
|
157 #logging.getLogger('qa.input').debug(xml.tostring(root).strip())
|
jpayne@0
|
158 for element in root:
|
jpayne@0
|
159 val = getattr(namespace, element.tag)
|
jpayne@0
|
160 if isinstance(val, list):
|
jpayne@0
|
161 val.append(element.text)
|
jpayne@0
|
162 else:
|
jpayne@0
|
163 setattr(namespace, element.tag, element.text)
|
jpayne@0
|
164 #set up utility functions
|
jpayne@0
|
165 path = os.path.join(namespace.mode, 'Output')
|
jpayne@0
|
166 try:
|
jpayne@0
|
167 os.makedirs(path)
|
jpayne@0
|
168 except OSError as e:
|
jpayne@0
|
169 if e.errno != errno.EEXIST:
|
jpayne@0
|
170 raise
|
jpayne@0
|
171 except Exception as e:
|
jpayne@0
|
172 logging.getLogger('qa.setup').error(e)
|
jpayne@0
|
173 qa_core.new_output_file = partial(open_file, path=path)
|
jpayne@0
|
174 #qa_core.new_output_file = lambda f: open_file(f, path=path)
|
jpayne@0
|
175 qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r')
|
jpayne@0
|
176 #import and run tests
|
jpayne@0
|
177 logging.getLogger('qa.tests').info("Loading and running tests...")
|
jpayne@0
|
178 import gims_qa_tests
|
jpayne@0
|
179 root = xml.Element("qaQcCoverageOutput")
|
jpayne@0
|
180 tree = xml.ElementTree(element=root)
|
jpayne@0
|
181 [logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()]
|
jpayne@0
|
182 for i, (name, result) in enumerate(suite(namespace)):
|
jpayne@0
|
183 if isinstance(result, xml.Element):
|
jpayne@0
|
184 root.append(result)
|
jpayne@0
|
185 else:
|
jpayne@0
|
186 if result:
|
jpayne@0
|
187 xml.SubElement(root, name).text = str(result)
|
jpayne@0
|
188
|
jpayne@0
|
189 try:
|
jpayne@0
|
190 with qa_core.new_output_file('output.xml') as out:
|
jpayne@0
|
191 d = dom.parseString(xml.tostring(root))
|
jpayne@0
|
192 d.writexml(out, addindent=' ', newl='\n')
|
jpayne@0
|
193 logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode))
|
jpayne@0
|
194 except Exception as e:
|
jpayne@0
|
195 logging.getLogger('qa.results').error(e)
|
jpayne@0
|
196 elif namespace.mode == 'suite':
|
jpayne@0
|
197 qa_core.open_input = partial(open_file, mode='rb')
|
jpayne@0
|
198 qa_core.new_output_file = open_file
|
jpayne@0
|
199 if not namespace.output_plots:
|
jpayne@0
|
200 qa_params['plots'] = False
|
jpayne@0
|
201 else:
|
jpayne@0
|
202 #set up directory for plots
|
jpayne@0
|
203 pass
|
jpayne@0
|
204 import gims_qa_tests
|
jpayne@0
|
205 def fix_value(result):
|
jpayne@0
|
206 if isinstance(result, xml.Element):
|
jpayne@0
|
207 return xml.tostring(result)
|
jpayne@0
|
208 return result
|
jpayne@0
|
209 results = {name:fix_value(result) for name, result in suite(namespace)}
|
jpayne@0
|
210 import csv
|
jpayne@0
|
211 wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys()))
|
jpayne@0
|
212 wtr.writeheader()
|
jpayne@0
|
213 wtr.writerow(results)
|
jpayne@0
|
214 elif namespace.mode == 'profile':
|
jpayne@0
|
215 return profile(namespace)
|
jpayne@0
|
216
|
jpayne@0
|
217
|
jpayne@0
|
218 def profile(namespace):
|
jpayne@0
|
219 log = logging.getLogger('qa.profile')
|
jpayne@0
|
220 import cProfile
|
jpayne@0
|
221 import StringIO
|
jpayne@0
|
222 from Bio.Seq import Seq
|
jpayne@0
|
223 from Bio.SeqRecord import SeqRecord
|
jpayne@0
|
224 from Bio.Alphabet import IUPAC
|
jpayne@0
|
225 import random
|
jpayne@0
|
226 from random import choice as c
|
jpayne@0
|
227 from tempfile import TemporaryFile as tmp
|
jpayne@0
|
228
|
jpayne@0
|
229 qa_params['plots'] = False
|
jpayne@0
|
230
|
jpayne@0
|
231 profile_fastq = StringIO.StringIO()
|
jpayne@0
|
232 log.info("Setup...")
|
jpayne@0
|
233 #create some fake FASTQ 'reads'
|
jpayne@0
|
234 for r in range(10000):
|
jpayne@0
|
235 #boring random distribution
|
jpayne@0
|
236 #s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)])
|
jpayne@0
|
237 #interesting diamond distribution
|
jpayne@0
|
238 s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)])
|
jpayne@0
|
239 s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)])
|
jpayne@0
|
240
|
jpayne@0
|
241 q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)]
|
jpayne@0
|
242 rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r))
|
jpayne@0
|
243 rec.letter_annotations['phred_quality'] = q
|
jpayne@0
|
244 profile_fastq.write(rec.format('fastq'))
|
jpayne@0
|
245 @contextmanager
|
jpayne@0
|
246 def input_stub(filename, mode=None):
|
jpayne@0
|
247 if 'xml' in filename:
|
jpayne@0
|
248 profile_xml.seek(0)
|
jpayne@0
|
249 yield profile_xml
|
jpayne@0
|
250 elif 'fastq' in filename:
|
jpayne@0
|
251 profile_fastq.seek(0)
|
jpayne@0
|
252 yield profile_fastq
|
jpayne@0
|
253 else:
|
jpayne@0
|
254 yield open('/dev/null', 'r')
|
jpayne@0
|
255
|
jpayne@0
|
256 qa_core.open_input = input_stub
|
jpayne@0
|
257 profiling_buffer = StringIO.StringIO()
|
jpayne@0
|
258
|
jpayne@0
|
259 @contextmanager
|
jpayne@0
|
260 def get_profiling_buffer(*a, **k):
|
jpayne@0
|
261 yield profiling_buffer
|
jpayne@0
|
262 profiling_buffer.seek(0)
|
jpayne@0
|
263
|
jpayne@0
|
264 qa_core.new_output_file = get_profiling_buffer
|
jpayne@0
|
265 import gims_qa_tests
|
jpayne@0
|
266
|
jpayne@0
|
267 namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq')
|
jpayne@0
|
268 namespace.sequencingType = 'Illumina MiSeq'
|
jpayne@0
|
269 namespace.approximateGenomeSize = 1000000
|
jpayne@0
|
270
|
jpayne@0
|
271 def run_tests(namespace, log):
|
jpayne@0
|
272 log.getChild('run')
|
jpayne@0
|
273 log.info(vars(namespace))
|
jpayne@0
|
274 for name, test in qa_tests.items():
|
jpayne@0
|
275 if test.__profile_enabled__:
|
jpayne@0
|
276 test(namespace)
|
jpayne@0
|
277 log.info("Profiling...")
|
jpayne@0
|
278 return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime')
|
jpayne@0
|
279
|
jpayne@0
|
280 if __name__ == '__main__':
|
jpayne@0
|
281
|
jpayne@0
|
282 parser = argparse.ArgumentParser()
|
jpayne@0
|
283 parser.add_argument("mode")
|
jpayne@0
|
284 parser.add_argument('sequenceFileName', nargs='*')
|
jpayne@0
|
285 parser.add_argument('--genus','-g')
|
jpayne@0
|
286 parser.add_argument('--species', '-s')
|
jpayne@0
|
287 parser.add_argument('--sequencingType', '-t')
|
jpayne@0
|
288 parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000)
|
jpayne@0
|
289 parser.add_argument('--coverageThreshold', '-c', type=int, default=20)
|
jpayne@0
|
290 parser.add_argument('--minimumQScore', '-q', type=float, default=27.0)
|
jpayne@0
|
291 parser.add_argument('--databasePath', '--db')
|
jpayne@0
|
292 parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout)
|
jpayne@0
|
293 parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout)
|
jpayne@0
|
294 parser.add_argument('--output_plots')
|
jpayne@0
|
295 parser.add_argument('--parallelism', '-p', type=int, default=1)
|
jpayne@0
|
296 parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation')
|
jpayne@0
|
297 params = parser.parse_args()
|
jpayne@0
|
298 logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log)
|
jpayne@0
|
299 logging.getLogger('qa.cmd').info(" ".join(sys.argv))
|
jpayne@0
|
300 main(params) |