annotate qa_core/gims_qa_tests.py @ 2:021864b5a0a5 tip

planemo upload
author jpayne
date Fri, 11 May 2018 10:25:14 -0400
parents dafec28bd37e
children
rev   line source
jpayne@0 1 from qa_core import qa_test, new_output_file, open_input, readfq, qa_params
jpayne@0 2 from collections import namedtuple, defaultdict, Counter
jpayne@0 3 from Bio import SeqIO
jpayne@0 4 from multiprocessing import Pool, Process, cpu_count
jpayne@0 5 from functools import partial
jpayne@0 6 from itertools import izip, count, izip_longest, islice
jpayne@0 7 from tempfile import NamedTemporaryFile, mkdtemp
jpayne@0 8 from subprocess import check_call, CalledProcessError
jpayne@0 9 from numpy import genfromtxt
jpayne@0 10 from os.path import abspath, join, split, splitext
jpayne@0 11 from contextlib import contextmanager
jpayne@0 12 from subprocess import call
jpayne@0 13 from subprocess import check_output
jpayne@0 14 from hashlib import md5
jpayne@0 15
jpayne@0 16
jpayne@0 17 import sys
jpayne@0 18 import shutil
jpayne@0 19 import os
jpayne@0 20 import pandas
jpayne@0 21 import datetime
jpayne@0 22 import re
jpayne@0 23 import glob, string
jpayne@0 24
jpayne@0 25 #Test implementation classes and functions. The actual tests are down at the bottom.
jpayne@0 26
jpayne@0 27 SuperResultTuple = namedtuple("SuperResultTuple", ("readCount", "numBases", "baseQScoreHisto", "nucleotideHisto", "qualityHisto"))
jpayne@0 28
jpayne@0 29 # #Global Variables for kraken methods
jpayne@0 30 # krakendatabase = "/scratch/biostat/kraken/miniDB/"
jpayne@0 31 # we don't actually need this as a global
jpayne@0 32
jpayne@0 33 class ResultTuple(SuperResultTuple):
jpayne@0 34 def __add__(self, other):
jpayne@0 35 r1, n1, h1, nuc1, qual1 = self
jpayne@0 36 r2, n2, h2, nuc2, qual2 = other
jpayne@0 37 return ResultTuple(r1 + r2,
jpayne@0 38 n1 + n2,
jpayne@0 39 [p + q for p, q in izip_longest(h1, h2, fillvalue=Counter())],
jpayne@0 40 [p + q for p, q in izip_longest(nuc1, nuc2, fillvalue=Counter())],
jpayne@0 41 qual1 + qual2)
jpayne@0 42
jpayne@0 43 def __and__(self, other):
jpayne@0 44 r1, n1, h1, nuc1, qual1 = self
jpayne@0 45 r2, n2, h2, nuc2, qual2 = other
jpayne@0 46 return ResultTuple(r1 + r2,
jpayne@0 47 n1 + n2,
jpayne@0 48 h1 + h2,
jpayne@0 49 nuc1 + nuc2,
jpayne@0 50 qual1 + qual2)
jpayne@0 51
jpayne@0 52 @classmethod
jpayne@0 53 def zero(cls):
jpayne@0 54 return cls(0, 0, [], [], Counter())
jpayne@0 55
jpayne@0 56 def to_dataFrame(self):
jpayne@0 57 try:
jpayne@0 58 from pandas import DataFrame
jpayne@0 59 #from_dict = partial(DataFrame.from_dict, DataFrame)
jpayne@0 60 except:
jpayne@0 61 #no pandas
jpayne@0 62 def DataFrame(self, data):
jpayne@0 63 return ResultTuple(**data)
jpayne@0 64
jpayne@0 65 pivotNucHisto = defaultdict(list)
jpayne@0 66 for cnt in self.nucleotideHisto:
jpayne@0 67 for base in ('A', 'T', 'G', 'C', 'N'):
jpayne@0 68 pivotNucHisto[base].append(cnt[base])
jpayne@0 69 pivotQBaseHisto = self.baseQScoreHisto
jpayne@0 70 pivotQualHisto = {}
jpayne@0 71 for q in range(max(self.qualityHisto.keys())):
jpayne@0 72 pivotQualHisto[q] = self.qualityHisto[q]
jpayne@0 73 df = {'readCount':self.readCount,
jpayne@0 74 'numBases':self.numBases,
jpayne@0 75 'baseQScoreHisto':pivotQBaseHisto,
jpayne@0 76 'nucleotideHisto':pivotNucHisto,
jpayne@0 77 'qualityHisto':pivotQualHisto}
jpayne@0 78 return df
jpayne@0 79
jpayne@0 80 class MergedResultTuple():
jpayne@0 81 "Merged multi-file results object. You can iterate on it for filename and ResultTuple pairs, or call its attributes for combined results"
jpayne@0 82 def __init__(self, **kwargs):
jpayne@0 83 self.per_file = {}
jpayne@0 84 self.per_file.update(kwargs)
jpayne@0 85
jpayne@0 86 def __getattr__(self, name):
jpayne@0 87 return getattr(reduce(lambda p, q: p & q, [v for k, v in sorted(self.per_file.items())], ResultTuple.zero()), name)
jpayne@0 88
jpayne@0 89 def __iter__(self):
jpayne@0 90 return iter(self.per_file.items())
jpayne@0 91
jpayne@0 92
jpayne@0 93 def analyze_batch_reads(reads):
jpayne@0 94 readCount = 0
jpayne@0 95 numBases = 0
jpayne@0 96 baseQScoreHisto = []
jpayne@0 97 nucleotideHisto = []
jpayne@0 98 qualityHisto = Counter()
jpayne@0 99 for name, seq, quals in reads:
jpayne@0 100 qualityHisto.update(quals)
jpayne@0 101 def scan_a_read(read):
jpayne@0 102 #for base, qual, index in zip(seq, quals, count(0)):
jpayne@0 103 base, qual, index = read
jpayne@0 104 try:
jpayne@0 105 nucleotideHisto[index][base] += 1
jpayne@0 106 except IndexError:
jpayne@0 107 nucleotideHisto.append(Counter(base))
jpayne@0 108 try:
jpayne@0 109 baseQScoreHisto[index][qual] += 1
jpayne@0 110 except IndexError:
jpayne@0 111 baseQScoreHisto.append(Counter({qual:1}))
jpayne@0 112 return index
jpayne@0 113 m = map(scan_a_read, zip(seq, quals, count(0))) #numBases is the last index of the read
jpayne@0 114 #print(name, m[:3])
jpayne@0 115 numBases += m[-1]
jpayne@0 116 readCount += 1
jpayne@0 117 r = ResultTuple(readCount, numBases, baseQScoreHisto, nucleotideHisto, qualityHisto)
jpayne@0 118 return r
jpayne@0 119
jpayne@0 120
jpayne@0 121 def cluster(slicable, n=2):
jpayne@0 122 "return an n-tuple of iterators over an in-memory sequence"
jpayne@0 123 #determine the offset size
jpayne@0 124 offset = (len(slicable) / n) + 1
jpayne@0 125 #return tuple([islice(slicable, i, i+offset) for i in range(n)])
jpayne@0 126 return tuple([tuple(slicable[i*offset:(i+1)*offset-1]) for i in range(n)])
jpayne@0 127
jpayne@0 128
jpayne@0 129 def tupleize_parse(fileh, format):
jpayne@0 130 for read in SeqIO.parse(fileh, format):
jpayne@0 131 yield (read.id, str(read.seq), read.letter_annotations['phred_quality'])
jpayne@0 132
jpayne@0 133 #@profile
jpayne@0 134 def analyze_shortread(seqFile, map_f=map, multi=qa_params.get('parallelism', cpu_count())):
jpayne@0 135 "analyze one file whose reads are typical short-read technology"
jpayne@0 136 if 'sff' in seqFile:
jpayne@0 137 format, quality, mode = 'sff', 'phred_quality', 'rb'
jpayne@0 138 rds_f = partial(tupleize_parse, format=format)
jpayne@0 139 else:
jpayne@0 140 format, quality, mode = 'fastq', 'phred_quality', 'rU'
jpayne@0 141 rds_f = readfq
jpayne@0 142 with open_input(seqFile) as seqFileH:
jpayne@0 143 rds = list(rds_f(seqFileH))
jpayne@0 144 rds = cluster(rds, multi)
jpayne@0 145 result = sum(map_f(analyze_batch_reads, rds), ResultTuple.zero())
jpayne@0 146 return result
jpayne@0 147
jpayne@0 148
jpayne@0 149 def analyze_longread(seqFile, map_f=map):
jpayne@0 150 "analyze one file from pacbio or nanopore"
jpayne@0 151
jpayne@0 152 return ResultTuple.zero()
jpayne@0 153 #what to do here? if nanopore, plot 2d/1d length vs accuracy
jpayne@0 154
jpayne@0 155
jpayne@0 156 def panic(*a, **k):
jpayne@0 157 raise ValueError("No analysis method for this sequencing type")
jpayne@0 158
jpayne@0 159 config = {'Roche 454':analyze_shortread,
jpayne@0 160 'Illumina MiSeq':analyze_shortread,
jpayne@0 161 'Illumina NextSeq':analyze_shortread,
jpayne@0 162 'Illumina HiSeq':analyze_shortread,
jpayne@0 163 'IonTorrent PGM':analyze_shortread,
jpayne@0 164 'PacBio RS':analyze_longread,
jpayne@0 165 'Oxford Nanopore MiniION':analyze_longread}
jpayne@0 166
jpayne@0 167
jpayne@0 168 class QaMemo(object):
jpayne@0 169 "lazy-evaluated QA operation"
jpayne@0 170
jpayne@0 171 result = None #named tuple results object
jpayne@0 172
jpayne@0 173 @classmethod
jpayne@0 174 def analyze(cls,
jpayne@0 175 sequenceFiles,
jpayne@0 176 sequencingType,
jpayne@0 177 map_f=qa_params.get('map implementation', map),
jpayne@0 178 plots=qa_params.get('plots', True),
jpayne@0 179 annotation=qa_params.get('annotation', '')):
jpayne@0 180 if type(sequenceFiles) == str: #if this is a single file
jpayne@0 181 sequenceFiles = [sequenceFiles]
jpayne@0 182 analysis = partial(config.get(sequencingType, panic), map_f=map_f)
jpayne@0 183 # result = reduce(lambda a, b: a & b, map(analysis, sequenceFiles), ResultTuple.zero())
jpayne@0 184 result = MergedResultTuple(**{fi:analysis(fi) for fi in sequenceFiles})
jpayne@0 185 if plots:
jpayne@0 186 try:
jpayne@0 187 from gims_qa_plot import qa_plots
jpayne@0 188 result_df = result.to_dataFrame()
jpayne@0 189 qa_plots(result_df, annotation)
jpayne@0 190 except Exception:
jpayne@0 191 import traceback
jpayne@0 192 traceback.print_exc()
jpayne@0 193 return result
jpayne@0 194
jpayne@0 195 @classmethod
jpayne@0 196 def of(cls, sequenceFiles, sequencingType):
jpayne@0 197 try:
jpayne@0 198 if not cls.result:
jpayne@0 199 cls.result = QaMemo.analyze(sequenceFiles, sequencingType)
jpayne@0 200 return cls.result
jpayne@0 201 except TypeError as e:
jpayne@0 202 raise ValueError(str(e)), None, sys.exc_info()[2]
jpayne@0 203
jpayne@0 204
jpayne@0 205 TaxTuple = namedtuple("TaxTuple", ("taxon", "taxon_prevalence", "contaminant", "contaminant_prevalence", "checksum"))
jpayne@0 206
jpayne@0 207
jpayne@0 208 class TaxMemo(object):
jpayne@0 209 "lazy-evaluated Tax check operation"
jpayne@0 210
jpayne@0 211 result = None #
jpayne@0 212
jpayne@0 213 @classmethod
jpayne@0 214 def analyze(cls, sequenceFiles, database):
jpayne@0 215 from gims_qa_kraken import run_kraken
jpayne@0 216 return run_kraken(sequenceFiles, database, dict(preload=None,
jpayne@0 217 threads=qa_params.get('parallelism', 1)))
jpayne@0 218
jpayne@0 219 # ## print cls
jpayne@0 220 # ## print sequenceFiles
jpayne@0 221 # from gims_qa_kraken import runKrakenSeqTest
jpayne@0 222
jpayne@0 223 # #hash the kraken db
jpayne@0 224 # #this isn't right yet, the db is a directory
jpayne@0 225 # db_hash = md5(open(database, 'rb').read()).hexdigest()
jpayne@0 226
jpayne@0 227
jpayne@0 228 # #Run Kraken on the current sequence and collect result and pass to TaxTuple to be written to xml
jpayne@0 229 # KrakenResult = runKrakenSeqtest(sequenceFiles,krakendatabase)
jpayne@0 230
jpayne@0 231 # #do this more readably lines with list comprehensions
jpayne@0 232 # org_headers = ('OrganismTwo', 'OrganismThree', 'OrganismFour', 'OrganismFive')
jpayne@0 233 # org_percent_headers = ('OrganismTwo%', 'OrganismThree%', 'OrganismFour%', 'OrganismFive%')
jpayne@0 234 # Organisms = [KrakenResult[h].iloc[0] for h in org_headers]
jpayne@0 235 # OrganismsPer = [KrakenResult[h].iloc[0] for h in org_percent_headers]
jpayne@0 236
jpayne@0 237
jpayne@0 238 # return TaxTuple(KrakenResult['OrganismOne'].iloc[0], KrakenResult['OrganismOne%'].iloc[0], Organisms, OrganismsPer, db_hash)
jpayne@0 239
jpayne@0 240 @classmethod
jpayne@0 241 def of(cls, *args, **kwargs):
jpayne@0 242 if not cls.result:
jpayne@0 243 cls.result = TaxMemo.analyze(*args, **kwargs)
jpayne@0 244 return cls.result
jpayne@0 245
jpayne@0 246 failure_mode = ""
jpayne@0 247
jpayne@0 248 def update_failure_mode(failure):
jpayne@0 249 global failure_mode
jpayne@0 250 if failure_mode:
jpayne@0 251 failure_mode += "; {}".format(failure)
jpayne@0 252 else:
jpayne@0 253 failure_mode = failure
jpayne@0 254
jpayne@0 255
jpayne@0 256 ##########################################################################################
jpayne@0 257 #
jpayne@0 258 # START HERE to design and configure the tests ----------
jpayne@0 259 #
jpayne@0 260 ##########################################################################################
jpayne@0 261
jpayne@0 262
jpayne@0 263
jpayne@0 264
jpayne@0 265 @qa_test(disable=True)
jpayne@0 266 def parametersUsed(sequencingType='not given',
jpayne@0 267 coverageThreshold='not given',
jpayne@0 268 approximateGenomeSize=4500000):
jpayne@0 269 "Simple function to save the test and evaluation parameters used"
jpayne@0 270 import xml.etree.ElementTree as xml
jpayne@0 271 el = xml.SubElement
jpayne@0 272 root = xml.Element("parametersUsed")
jpayne@0 273 for key, value in qa_params.items():
jpayne@0 274 el(root, "parameter", attrib=dict(key=str(key), value=str(value)))
jpayne@0 275 return root
jpayne@0 276
jpayne@0 277 @qa_test
jpayne@0 278 def readCount(sequenceFileName, sequencingType):
jpayne@0 279 return QaMemo.of(sequenceFileName, sequencingType).readCount
jpayne@0 280
jpayne@0 281 @qa_test
jpayne@0 282 def coverage(sequenceFileName, sequencingType, approximateGenomeSize=4500000):
jpayne@0 283 return QaMemo.of(sequenceFileName, sequencingType).numBases / int(approximateGenomeSize)
jpayne@0 284
jpayne@0 285
jpayne@0 286 # @qa_test(disable=False)
jpayne@0 287 # def coverageQThirtyPlus(sequenceFileName, sequencingType, approximateGenomeSize=4500000, minimumQScore=qa_params.get('user Q threshold', 27.0)):
jpayne@0 288 # numOverMinQ = 0
jpayne@0 289 # for score, num in QaMemo.of(sequenceFileName, sequencingType).baseQScoreHisto.items():
jpayne@0 290 # if score >= int(minimumQScore):
jpayne@0 291 # numOverMinQ += num
jpayne@0 292 # return numOverMinQ
jpayne@0 293
jpayne@0 294
jpayne@0 295 def mean(counter):
jpayne@0 296 return float(sum([v * c for v, c in counter.items()])) / float(sum(counter.values()))
jpayne@0 297
jpayne@0 298
jpayne@0 299 @qa_test
jpayne@0 300 def averageQPerFile(sequenceFileName, sequencingType):
jpayne@0 301 return sorted({fi:mean(res.qualityHisto) for fi,res in QaMemo.of(sequenceFileName, sequencingType)}.items())
jpayne@0 302
jpayne@0 303
jpayne@0 304 def averages_exceed(qualityHisto, q):
jpayne@0 305 return mean(qualityHisto) >= q
jpayne@0 306
jpayne@0 307
jpayne@0 308 @qa_test
jpayne@0 309 def qcStatus(sequenceFileName, sequencingType,
jpayne@0 310 coverageThreshold=None,
jpayne@0 311 approximateGenomeSize=4500000,
jpayne@0 312 minimumQScore=qa_params.get('user Q threshold', 27.0),
jpayne@0 313 bbThreshold=qa_params.get('permitted base balance variance', 0.2)):
jpayne@0 314 global failure_mode
jpayne@0 315 results = QaMemo.of(sequenceFileName, sequencingType)
jpayne@0 316 if not all([averages_exceed(r.qualityHisto, minimumQScore) for f, r in results]):
jpayne@0 317 update_failure_mode("qscore below {}".format(minimumQScore))
jpayne@0 318 return "Fail"
jpayne@0 319 a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
jpayne@0 320 at = float(a) / float(t)
jpayne@0 321 gc = float(g) / float(c)
jpayne@0 322 if any((at > 1.0 + bbThreshold, at < 1.0 - bbThreshold, gc > 1.0 + bbThreshold, gc < 1.0 - bbThreshold)):
jpayne@0 323 update_failure_mode("AT or GC base-balance outside of {} threshold".format(bbThreshold))
jpayne@0 324 return "Fail"
jpayne@0 325 if coverageThreshold:
jpayne@0 326 if not (results.numBases / int(approximateGenomeSize)) >= float(coverageThreshold):
jpayne@0 327 update_failure_mode("coverage below {}".format(coverageThreshold))
jpayne@0 328 return "Fail"
jpayne@0 329 return "Pass"
jpayne@0 330 return "Undetermined"
jpayne@0 331
jpayne@0 332
jpayne@0 333
jpayne@0 334
jpayne@0 335 def baseBalance(sequenceFileName, sequencingType):
jpayne@0 336 results = QaMemo.of(sequenceFileName, sequencingType)
jpayne@0 337 a, t, g, c = [sum([n[b] for n in results.nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
jpayne@0 338 return (float(a) / float(t) if t else 1, float(g) / float(c) if c else 1)
jpayne@0 339
jpayne@0 340 @qa_test
jpayne@0 341 def at_gc_baseBalance(sequenceFileName, sequencingType):
jpayne@0 342 a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
jpayne@0 343 at = float(a) / float(t) if t else 1
jpayne@0 344 gc = float(g) / float(c) if c else 1
jpayne@0 345 return "{:02}/{:02}".format(at, gc)
jpayne@0 346
jpayne@0 347 @qa_test(disable=True)
jpayne@0 348 def baseBalance(sequenceFileName, sequencingType, threshold=qa_params.get('permitted base balance variance', 0.2) ):
jpayne@0 349 import xml.etree.ElementTree as xml
jpayne@0 350 el = xml.SubElement
jpayne@0 351 a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
jpayne@0 352 at = float(a) / float(t) if t else 1
jpayne@0 353 gc = float(g) / float(c) if c else 1
jpayne@0 354 status = "Pass"
jpayne@0 355 if any((at > 1.0 + threshold, at < 1.0 - threshold, gc > 1.0 + threshold, gc < 1.0 - threshold)):
jpayne@0 356 status = "Fail"
jpayne@0 357 update_failure_mode("Base balance out of variance threshold ({:.2},{:.2} > {})".format(at, gc, threshold))
jpayne@0 358 root = xml.Element('baseBalanceResult')
jpayne@0 359 el(root, 'AT').text = str(at)
jpayne@0 360 el(root, 'GC').text = str(gc)
jpayne@0 361 el(root, 'baseBalanceStatus').text = status
jpayne@0 362 return root
jpayne@0 363
jpayne@0 364
jpayne@0 365
jpayne@0 366
jpayne@0 367
jpayne@0 368 @qa_test(disable = ~os.path.exists(qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB')),
jpayne@0 369 profile=False)
jpayne@0 370 def taxonomyQualityUsingKraken(sequenceFileName,
jpayne@0 371 sequencingType,
jpayne@0 372 genus=None,
jpayne@0 373 species=None,
jpayne@0 374 databasePath=qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB')
jpayne@0 375 ):
jpayne@0 376
jpayne@0 377 #print sequenceFileName
jpayne@0 378 #print sequencingType
jpayne@0 379 #print sequenceFileName
jpayne@0 380 #print sequencingType
jpayne@0 381 #print genus
jpayne@0 382 #print species
jpayne@0 383
jpayne@0 384
jpayne@0 385 import xml.etree.ElementTree as xml
jpayne@0 386 global failure_mode
jpayne@0 387 el = xml.SubElement
jpayne@0 388 res = TaxMemo.of(sequenceFileName, databasePath)
jpayne@0 389 #make return element
jpayne@0 390 root = xml.Element('taxonomyQualityResult')
jpayne@0 391 if not genus:
jpayne@0 392 el(root, 'taxStatus').text = "Undetermined"
jpayne@0 393 else:
jpayne@0 394 if genus in res.taxon.split() and species in res.taxon.split():
jpayne@0 395 el(root, 'taxStatus').text = "Pass"
jpayne@0 396 else:
jpayne@0 397 el(root, 'taxStatus').text = "Fail"
jpayne@0 398 update_failure_mode('"{} {}" was not in agreement with calculated ID "{}"'.format(genus, species, res.taxon))
jpayne@0 399 el(root, 'mostPrevalentTaxon', dict(prevalence=str(res.taxon_prevalence))).text = res.taxon[-230:] #last 255 characters
jpayne@0 400
jpayne@0 401 contams = el(root, 'contaminants')
jpayne@0 402 for i in range(4):
jpayne@0 403 el(contams, 'prevalentContaminant', dict(prevalence=str(res.contaminant_prevalence[i]), ordinal=str(i))).text = res.contaminant[i][-230:]
jpayne@0 404 el(root, 'database', dict(checksum=str(res.checksum))).text = databasePath
jpayne@0 405
jpayne@0 406
jpayne@0 407 return root
jpayne@0 408
jpayne@0 409
jpayne@0 410
jpayne@0 411 @qa_test
jpayne@0 412 def reasonForFailure():
jpayne@0 413 global failure_mode
jpayne@0 414 return failure_mode[0:254] #255 character limit in destination DB field
jpayne@0 415
jpayne@0 416 @qa_test(disable=True)
jpayne@0 417 def run_serotype_usingseqsero(
jpayne@0 418 sequenceFileName,
jpayne@0 419 sequencingType):
jpayne@0 420 from gims_qa_serotyping import run_serotype_usingseqsero
jpayne@0 421
jpayne@0 422 seq1 = "" + sequenceFileName[0]
jpayne@0 423 seq2 = "" + sequenceFileName[1]
jpayne@0 424
jpayne@0 425 return run_serotype_usingseqsero(
jpayne@0 426 seq1, seq2,
jpayne@0 427 sequencingType,
jpayne@0 428 Oantigen="Undetermined",
jpayne@0 429 H1antigen="Undetermined",
jpayne@0 430 H2antigen="Undetermined",
jpayne@0 431 AntigenicProfile="Undetermined",
jpayne@0 432 PredictedSerotype="Undetermined")
jpayne@0 433
jpayne@0 434 return root