annotate qa_core/gims_core.py @ 2:021864b5a0a5 tip

planemo upload
author jpayne
date Fri, 11 May 2018 10:25:14 -0400
parents dafec28bd37e
children
rev   line source
jpayne@0 1 #! /usr/bin/env python
jpayne@0 2
jpayne@0 3 import argparse
jpayne@0 4 import binascii
jpayne@0 5 from collections import OrderedDict
jpayne@0 6 from contextlib import contextmanager
jpayne@0 7 import xml.dom.minidom as dom
jpayne@0 8 from xml.etree import ElementTree as xml
jpayne@0 9 from functools import partial, wraps
jpayne@0 10 from multiprocessing.pool import ThreadPool as Pool
jpayne@0 11 from inspect import getargspec
jpayne@0 12 import os, os.path, sys, errno
jpayne@0 13 import imp
jpayne@0 14 import subprocess
jpayne@0 15 import logging
jpayne@0 16
jpayne@0 17 ver = "unknown"
jpayne@0 18
jpayne@0 19
jpayne@0 20 usage = """ CFSAN Genomics QA Core
jpayne@0 21 May 15, 2015
jpayne@0 22 Justin Payne
jpayne@0 23 justin.payne@fda.hhs.gov
jpayne@0 24
jpayne@0 25 usage:
jpayne@0 26 {0} /path/to/sandbox (production mode)
jpayne@0 27 {0} {{0}}
jpayne@0 28
jpayne@0 29 """.format(os.path.basename(__file__))
jpayne@0 30
jpayne@0 31 PHRED_OFFSET = 33
jpayne@0 32
jpayne@0 33 input_subdir = 'Input'
jpayne@0 34 output_subdir = 'Output'
jpayne@0 35
jpayne@0 36 #Heng Li's readfq implementation
jpayne@0 37 #https://github.com/lh3/readfq/blob/master/readfq.py
jpayne@0 38 def readfq(fp): # this is a generator function
jpayne@0 39 last = None # this is a buffer keeping the last unprocessed line
jpayne@0 40 while True: # mimic closure; is it a bad idea?
jpayne@0 41 if not last: # the first record or a record following a fastq
jpayne@0 42 for l in fp: # search for the start of the next record
jpayne@0 43 if l[0] in '>@': # fasta/q header line
jpayne@0 44 last = l[:-1] # save this line
jpayne@0 45 break
jpayne@0 46 if not last: break
jpayne@0 47 name, seqs, last = last[1:].partition(" ")[0], [], None
jpayne@0 48 for l in fp: # read the sequence
jpayne@0 49 if l[0] in '@+>':
jpayne@0 50 last = l[:-1]
jpayne@0 51 break
jpayne@0 52 seqs.append(l[:-1])
jpayne@0 53 if not last or last[0] != '+': # this is a fasta record
jpayne@0 54 yield name, ''.join(seqs), [] # yield a fasta record
jpayne@0 55 if not last: break
jpayne@0 56 else: # this is a fastq record
jpayne@0 57 seq, leng, seqs = ''.join(seqs), 0, []
jpayne@0 58 for l in fp: # read the quality
jpayne@0 59 seqs.append(l[:-1])
jpayne@0 60 leng += len(l) - 1
jpayne@0 61 if leng >= len(seq): # have read enough quality
jpayne@0 62 last = None
jpayne@0 63 yield name, seq, [ord(c) - PHRED_OFFSET for c in ''.join(seqs)]; # yield a fastq record
jpayne@0 64 break
jpayne@0 65 if last: # reach EOF before reading enough quality
jpayne@0 66 yield name, seq, [] # yield a fasta record instead
jpayne@0 67 break
jpayne@0 68
jpayne@0 69 #from qa_core import qa_test, new_output_file, open_input, readfq, qa_params
jpayne@0 70
jpayne@0 71 qa_tests = OrderedDict()
jpayne@0 72
jpayne@0 73 qa_params = OrderedDict()
jpayne@0 74
jpayne@0 75 qa_core = imp.new_module("qa_core")
jpayne@0 76 sys.modules['qa_core'] = qa_core
jpayne@0 77
jpayne@0 78 def qa_test(func=None, disable=False, profile=True):
jpayne@0 79 "QA Test decorator"
jpayne@0 80 def qa_test_decorator(qa_func):
jpayne@0 81 #define a parameter-binding wrapper
jpayne@0 82 @wraps(qa_func)
jpayne@0 83 def wrap(namespace):
jpayne@0 84 "Use partial application of functions to bind parameters declared in the QA function from a namespace object"
jpayne@0 85 monad = qa_func
jpayne@0 86 for argument in getargspec(qa_func).args:
jpayne@0 87 if hasattr(namespace, argument) and getattr(namespace, argument):
jpayne@0 88 monad = partial(monad, **{argument: getattr(namespace, argument)})
jpayne@0 89
jpayne@0 90 #and then call it
jpayne@0 91 return monad()
jpayne@0 92
jpayne@0 93 wrap.__profile_enabled__ = profile
jpayne@0 94 if not disable:
jpayne@0 95 # if not name:
jpayne@0 96 # name = qa_func.__name__
jpayne@0 97 qa_tests[qa_func.__name__] = wrap
jpayne@0 98 return wrap
jpayne@0 99 if func:
jpayne@0 100 return qa_test_decorator(func)
jpayne@0 101 return qa_test_decorator
jpayne@0 102
jpayne@0 103 qa_core.qa_test = qa_test
jpayne@0 104 qa_core.readfq = readfq
jpayne@0 105 qa_core.qa_params = qa_params
jpayne@0 106
jpayne@0 107 @qa_test
jpayne@0 108 def version():
jpayne@0 109 "Collect repository commit id from git"
jpayne@0 110 p = os.path.abspath(os.path.dirname(__file__))
jpayne@0 111 try:
jpayne@0 112 ref = subprocess.check_output("git -C {0} describe".format(p), shell=True).replace("\n", '')
jpayne@0 113 except subprocess.CalledProcessError:
jpayne@0 114 ref = "+".join(qa_tests.keys())[:255]
jpayne@0 115 return ref
jpayne@0 116
jpayne@0 117 @qa_test
jpayne@0 118 def name(annotation=None):
jpayne@0 119 if annotation:
jpayne@0 120 return annotation
jpayne@0 121
jpayne@0 122 @contextmanager
jpayne@0 123 def open_file(name, path='.', mode='w'):
jpayne@0 124 with open(os.path.join(path, name), mode) as f:
jpayne@0 125 if 'r' in mode and binascii.hexlify(f.read(2)) == b'1f8b':
jpayne@0 126 import gzip
jpayne@0 127 with gzip.open(os.path.join(path, name), mode) as g:
jpayne@0 128 yield g
jpayne@0 129 else:
jpayne@0 130 f.seek(0)
jpayne@0 131 yield f
jpayne@0 132
jpayne@0 133 def suite(namespace, log=logging.getLogger('qa.suite')):
jpayne@0 134 for name, test in qa_tests.items():
jpayne@0 135 logg = log.getChild(name)
jpayne@0 136 try:
jpayne@0 137 result = test(namespace)
jpayne@0 138 if result is not None:
jpayne@0 139 logg.info(result)
jpayne@0 140 yield name, result
jpayne@0 141 except Exception as e:
jpayne@0 142 import traceback
jpayne@0 143 logg.error(traceback.format_exc())
jpayne@0 144
jpayne@0 145
jpayne@0 146 def main(namespace):
jpayne@0 147 if namespace.annotation:
jpayne@0 148 qa_params['annotation'] = namespace.annotation
jpayne@0 149 parallelism = vars(namespace).get('parallelism', 1)
jpayne@0 150 if parallelism > 1:
jpayne@0 151 qa_params['map implementation'] = Pool(parallelism).map
jpayne@0 152 if namespace.mode not in ('suite', 'profile'):
jpayne@0 153 #it's a path to a sandbox job
jpayne@0 154 logging.getLogger('qa.input').info("Load job description xml from {}...".format(namespace.mode))
jpayne@0 155 tree = xml.parse(os.path.join(namespace.mode, "Input/input.xml"))
jpayne@0 156 root = tree.getroot()
jpayne@0 157 #logging.getLogger('qa.input').debug(xml.tostring(root).strip())
jpayne@0 158 for element in root:
jpayne@0 159 val = getattr(namespace, element.tag)
jpayne@0 160 if isinstance(val, list):
jpayne@0 161 val.append(element.text)
jpayne@0 162 else:
jpayne@0 163 setattr(namespace, element.tag, element.text)
jpayne@0 164 #set up utility functions
jpayne@0 165 path = os.path.join(namespace.mode, 'Output')
jpayne@0 166 try:
jpayne@0 167 os.makedirs(path)
jpayne@0 168 except OSError as e:
jpayne@0 169 if e.errno != errno.EEXIST:
jpayne@0 170 raise
jpayne@0 171 except Exception as e:
jpayne@0 172 logging.getLogger('qa.setup').error(e)
jpayne@0 173 qa_core.new_output_file = partial(open_file, path=path)
jpayne@0 174 #qa_core.new_output_file = lambda f: open_file(f, path=path)
jpayne@0 175 qa_core.open_input = partial(open_file, path=os.path.join(namespace.mode, "Input"), mode='r')
jpayne@0 176 #import and run tests
jpayne@0 177 logging.getLogger('qa.tests').info("Loading and running tests...")
jpayne@0 178 import gims_qa_tests
jpayne@0 179 root = xml.Element("qaQcCoverageOutput")
jpayne@0 180 tree = xml.ElementTree(element=root)
jpayne@0 181 [logging.getLogger('qa.params').info("{}:{}".format(k,v)) for k,v in vars(namespace).items()]
jpayne@0 182 for i, (name, result) in enumerate(suite(namespace)):
jpayne@0 183 if isinstance(result, xml.Element):
jpayne@0 184 root.append(result)
jpayne@0 185 else:
jpayne@0 186 if result:
jpayne@0 187 xml.SubElement(root, name).text = str(result)
jpayne@0 188
jpayne@0 189 try:
jpayne@0 190 with qa_core.new_output_file('output.xml') as out:
jpayne@0 191 d = dom.parseString(xml.tostring(root))
jpayne@0 192 d.writexml(out, addindent=' ', newl='\n')
jpayne@0 193 logging.getLogger('qa.results').info("Finished tests and wrote to {}".format(namespace.mode))
jpayne@0 194 except Exception as e:
jpayne@0 195 logging.getLogger('qa.results').error(e)
jpayne@0 196 elif namespace.mode == 'suite':
jpayne@0 197 qa_core.open_input = partial(open_file, mode='rb')
jpayne@0 198 qa_core.new_output_file = open_file
jpayne@0 199 if not namespace.output_plots:
jpayne@0 200 qa_params['plots'] = False
jpayne@0 201 else:
jpayne@0 202 #set up directory for plots
jpayne@0 203 pass
jpayne@0 204 import gims_qa_tests
jpayne@0 205 def fix_value(result):
jpayne@0 206 if isinstance(result, xml.Element):
jpayne@0 207 return xml.tostring(result)
jpayne@0 208 return result
jpayne@0 209 results = {name:fix_value(result) for name, result in suite(namespace)}
jpayne@0 210 import csv
jpayne@0 211 wtr = csv.DictWriter(namespace.output, delimiter='\t', fieldnames=sorted(results.keys()))
jpayne@0 212 wtr.writeheader()
jpayne@0 213 wtr.writerow(results)
jpayne@0 214 elif namespace.mode == 'profile':
jpayne@0 215 return profile(namespace)
jpayne@0 216
jpayne@0 217
jpayne@0 218 def profile(namespace):
jpayne@0 219 log = logging.getLogger('qa.profile')
jpayne@0 220 import cProfile
jpayne@0 221 import StringIO
jpayne@0 222 from Bio.Seq import Seq
jpayne@0 223 from Bio.SeqRecord import SeqRecord
jpayne@0 224 from Bio.Alphabet import IUPAC
jpayne@0 225 import random
jpayne@0 226 from random import choice as c
jpayne@0 227 from tempfile import TemporaryFile as tmp
jpayne@0 228
jpayne@0 229 qa_params['plots'] = False
jpayne@0 230
jpayne@0 231 profile_fastq = StringIO.StringIO()
jpayne@0 232 log.info("Setup...")
jpayne@0 233 #create some fake FASTQ 'reads'
jpayne@0 234 for r in range(10000):
jpayne@0 235 #boring random distribution
jpayne@0 236 #s = "".join([random.choice(('A', 'T', 'G', 'C')) for i in range(300)])
jpayne@0 237 #interesting diamond distribution
jpayne@0 238 s = "".join([c(c(('A', 'C')) * (300-i) + c(('G','T'))*i) for i in range(150)])
jpayne@0 239 s += "".join([c(c(('A', 'C')) * 150 + c(('G','T'))*(150-i)) for i in range(150)])
jpayne@0 240
jpayne@0 241 q = [int(max(0, min((random.normalvariate(15, 10), 62)))) for i in range(300)]
jpayne@0 242 rec = SeqRecord(Seq(s, IUPAC.unambiguous_dna), id="read_{}".format(r))
jpayne@0 243 rec.letter_annotations['phred_quality'] = q
jpayne@0 244 profile_fastq.write(rec.format('fastq'))
jpayne@0 245 @contextmanager
jpayne@0 246 def input_stub(filename, mode=None):
jpayne@0 247 if 'xml' in filename:
jpayne@0 248 profile_xml.seek(0)
jpayne@0 249 yield profile_xml
jpayne@0 250 elif 'fastq' in filename:
jpayne@0 251 profile_fastq.seek(0)
jpayne@0 252 yield profile_fastq
jpayne@0 253 else:
jpayne@0 254 yield open('/dev/null', 'r')
jpayne@0 255
jpayne@0 256 qa_core.open_input = input_stub
jpayne@0 257 profiling_buffer = StringIO.StringIO()
jpayne@0 258
jpayne@0 259 @contextmanager
jpayne@0 260 def get_profiling_buffer(*a, **k):
jpayne@0 261 yield profiling_buffer
jpayne@0 262 profiling_buffer.seek(0)
jpayne@0 263
jpayne@0 264 qa_core.new_output_file = get_profiling_buffer
jpayne@0 265 import gims_qa_tests
jpayne@0 266
jpayne@0 267 namespace.sequenceFileName = ('forward.fastq', 'reverse.fastq')
jpayne@0 268 namespace.sequencingType = 'Illumina MiSeq'
jpayne@0 269 namespace.approximateGenomeSize = 1000000
jpayne@0 270
jpayne@0 271 def run_tests(namespace, log):
jpayne@0 272 log.getChild('run')
jpayne@0 273 log.info(vars(namespace))
jpayne@0 274 for name, test in qa_tests.items():
jpayne@0 275 if test.__profile_enabled__:
jpayne@0 276 test(namespace)
jpayne@0 277 log.info("Profiling...")
jpayne@0 278 return cProfile.runctx('run_tests(namespace, log)', None, locals(), sort='tottime')
jpayne@0 279
jpayne@0 280 if __name__ == '__main__':
jpayne@0 281
jpayne@0 282 parser = argparse.ArgumentParser()
jpayne@0 283 parser.add_argument("mode")
jpayne@0 284 parser.add_argument('sequenceFileName', nargs='*')
jpayne@0 285 parser.add_argument('--genus','-g')
jpayne@0 286 parser.add_argument('--species', '-s')
jpayne@0 287 parser.add_argument('--sequencingType', '-t')
jpayne@0 288 parser.add_argument('--approximateGenomeSize','--gs', type=int, default=4500000)
jpayne@0 289 parser.add_argument('--coverageThreshold', '-c', type=int, default=20)
jpayne@0 290 parser.add_argument('--minimumQScore', '-q', type=float, default=27.0)
jpayne@0 291 parser.add_argument('--databasePath', '--db')
jpayne@0 292 parser.add_argument('--log', '-l', type=argparse.FileType('w'), default=sys.stdout)
jpayne@0 293 parser.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout)
jpayne@0 294 parser.add_argument('--output_plots')
jpayne@0 295 parser.add_argument('--parallelism', '-p', type=int, default=1)
jpayne@0 296 parser.add_argument('--annotate', '-a', type=str, default=None, dest='annotation')
jpayne@0 297 params = parser.parse_args()
jpayne@0 298 logging.basicConfig(format='%(name)s\t%(levelname)s:%(message)s', level=logging.DEBUG, stream = params.log)
jpayne@0 299 logging.getLogger('qa.cmd').info(" ".join(sys.argv))
jpayne@0 300 main(params)