jpayne@0: from qa_core import qa_test, new_output_file, open_input, readfq, qa_params jpayne@0: from collections import namedtuple, defaultdict, Counter jpayne@0: from Bio import SeqIO jpayne@0: from multiprocessing import Pool, Process, cpu_count jpayne@0: from functools import partial jpayne@0: from itertools import izip, count, izip_longest, islice jpayne@0: from tempfile import NamedTemporaryFile, mkdtemp jpayne@0: from subprocess import check_call, CalledProcessError jpayne@0: from numpy import genfromtxt jpayne@0: from os.path import abspath, join, split, splitext jpayne@0: from contextlib import contextmanager jpayne@0: from subprocess import call jpayne@0: from subprocess import check_output jpayne@0: from hashlib import md5 jpayne@0: jpayne@0: jpayne@0: import sys jpayne@0: import shutil jpayne@0: import os jpayne@0: import pandas jpayne@0: import datetime jpayne@0: import re jpayne@0: import glob, string jpayne@0: jpayne@0: #Test implementation classes and functions. The actual tests are down at the bottom. jpayne@0: jpayne@0: SuperResultTuple = namedtuple("SuperResultTuple", ("readCount", "numBases", "baseQScoreHisto", "nucleotideHisto", "qualityHisto")) jpayne@0: jpayne@0: # #Global Variables for kraken methods jpayne@0: # krakendatabase = "/scratch/biostat/kraken/miniDB/" jpayne@0: # we don't actually need this as a global jpayne@0: jpayne@0: class ResultTuple(SuperResultTuple): jpayne@0: def __add__(self, other): jpayne@0: r1, n1, h1, nuc1, qual1 = self jpayne@0: r2, n2, h2, nuc2, qual2 = other jpayne@0: return ResultTuple(r1 + r2, jpayne@0: n1 + n2, jpayne@0: [p + q for p, q in izip_longest(h1, h2, fillvalue=Counter())], jpayne@0: [p + q for p, q in izip_longest(nuc1, nuc2, fillvalue=Counter())], jpayne@0: qual1 + qual2) jpayne@0: jpayne@0: def __and__(self, other): jpayne@0: r1, n1, h1, nuc1, qual1 = self jpayne@0: r2, n2, h2, nuc2, qual2 = other jpayne@0: return ResultTuple(r1 + r2, jpayne@0: n1 + n2, jpayne@0: h1 + h2, jpayne@0: nuc1 + nuc2, jpayne@0: qual1 + qual2) jpayne@0: jpayne@0: @classmethod jpayne@0: def zero(cls): jpayne@0: return cls(0, 0, [], [], Counter()) jpayne@0: jpayne@0: def to_dataFrame(self): jpayne@0: try: jpayne@0: from pandas import DataFrame jpayne@0: #from_dict = partial(DataFrame.from_dict, DataFrame) jpayne@0: except: jpayne@0: #no pandas jpayne@0: def DataFrame(self, data): jpayne@0: return ResultTuple(**data) jpayne@0: jpayne@0: pivotNucHisto = defaultdict(list) jpayne@0: for cnt in self.nucleotideHisto: jpayne@0: for base in ('A', 'T', 'G', 'C', 'N'): jpayne@0: pivotNucHisto[base].append(cnt[base]) jpayne@0: pivotQBaseHisto = self.baseQScoreHisto jpayne@0: pivotQualHisto = {} jpayne@0: for q in range(max(self.qualityHisto.keys())): jpayne@0: pivotQualHisto[q] = self.qualityHisto[q] jpayne@0: df = {'readCount':self.readCount, jpayne@0: 'numBases':self.numBases, jpayne@0: 'baseQScoreHisto':pivotQBaseHisto, jpayne@0: 'nucleotideHisto':pivotNucHisto, jpayne@0: 'qualityHisto':pivotQualHisto} jpayne@0: return df jpayne@0: jpayne@0: class MergedResultTuple(): jpayne@0: "Merged multi-file results object. You can iterate on it for filename and ResultTuple pairs, or call its attributes for combined results" jpayne@0: def __init__(self, **kwargs): jpayne@0: self.per_file = {} jpayne@0: self.per_file.update(kwargs) jpayne@0: jpayne@0: def __getattr__(self, name): jpayne@0: return getattr(reduce(lambda p, q: p & q, [v for k, v in sorted(self.per_file.items())], ResultTuple.zero()), name) jpayne@0: jpayne@0: def __iter__(self): jpayne@0: return iter(self.per_file.items()) jpayne@0: jpayne@0: jpayne@0: def analyze_batch_reads(reads): jpayne@0: readCount = 0 jpayne@0: numBases = 0 jpayne@0: baseQScoreHisto = [] jpayne@0: nucleotideHisto = [] jpayne@0: qualityHisto = Counter() jpayne@0: for name, seq, quals in reads: jpayne@0: qualityHisto.update(quals) jpayne@0: def scan_a_read(read): jpayne@0: #for base, qual, index in zip(seq, quals, count(0)): jpayne@0: base, qual, index = read jpayne@0: try: jpayne@0: nucleotideHisto[index][base] += 1 jpayne@0: except IndexError: jpayne@0: nucleotideHisto.append(Counter(base)) jpayne@0: try: jpayne@0: baseQScoreHisto[index][qual] += 1 jpayne@0: except IndexError: jpayne@0: baseQScoreHisto.append(Counter({qual:1})) jpayne@0: return index jpayne@0: m = map(scan_a_read, zip(seq, quals, count(0))) #numBases is the last index of the read jpayne@0: #print(name, m[:3]) jpayne@0: numBases += m[-1] jpayne@0: readCount += 1 jpayne@0: r = ResultTuple(readCount, numBases, baseQScoreHisto, nucleotideHisto, qualityHisto) jpayne@0: return r jpayne@0: jpayne@0: jpayne@0: def cluster(slicable, n=2): jpayne@0: "return an n-tuple of iterators over an in-memory sequence" jpayne@0: #determine the offset size jpayne@0: offset = (len(slicable) / n) + 1 jpayne@0: #return tuple([islice(slicable, i, i+offset) for i in range(n)]) jpayne@0: return tuple([tuple(slicable[i*offset:(i+1)*offset-1]) for i in range(n)]) jpayne@0: jpayne@0: jpayne@0: def tupleize_parse(fileh, format): jpayne@0: for read in SeqIO.parse(fileh, format): jpayne@0: yield (read.id, str(read.seq), read.letter_annotations['phred_quality']) jpayne@0: jpayne@0: #@profile jpayne@0: def analyze_shortread(seqFile, map_f=map, multi=qa_params.get('parallelism', cpu_count())): jpayne@0: "analyze one file whose reads are typical short-read technology" jpayne@0: if 'sff' in seqFile: jpayne@0: format, quality, mode = 'sff', 'phred_quality', 'rb' jpayne@0: rds_f = partial(tupleize_parse, format=format) jpayne@0: else: jpayne@0: format, quality, mode = 'fastq', 'phred_quality', 'rU' jpayne@0: rds_f = readfq jpayne@0: with open_input(seqFile) as seqFileH: jpayne@0: rds = list(rds_f(seqFileH)) jpayne@0: rds = cluster(rds, multi) jpayne@0: result = sum(map_f(analyze_batch_reads, rds), ResultTuple.zero()) jpayne@0: return result jpayne@0: jpayne@0: jpayne@0: def analyze_longread(seqFile, map_f=map): jpayne@0: "analyze one file from pacbio or nanopore" jpayne@0: jpayne@0: return ResultTuple.zero() jpayne@0: #what to do here? if nanopore, plot 2d/1d length vs accuracy jpayne@0: jpayne@0: jpayne@0: def panic(*a, **k): jpayne@0: raise ValueError("No analysis method for this sequencing type") jpayne@0: jpayne@0: config = {'Roche 454':analyze_shortread, jpayne@0: 'Illumina MiSeq':analyze_shortread, jpayne@0: 'Illumina NextSeq':analyze_shortread, jpayne@0: 'Illumina HiSeq':analyze_shortread, jpayne@0: 'IonTorrent PGM':analyze_shortread, jpayne@0: 'PacBio RS':analyze_longread, jpayne@0: 'Oxford Nanopore MiniION':analyze_longread} jpayne@0: jpayne@0: jpayne@0: class QaMemo(object): jpayne@0: "lazy-evaluated QA operation" jpayne@0: jpayne@0: result = None #named tuple results object jpayne@0: jpayne@0: @classmethod jpayne@0: def analyze(cls, jpayne@0: sequenceFiles, jpayne@0: sequencingType, jpayne@0: map_f=qa_params.get('map implementation', map), jpayne@0: plots=qa_params.get('plots', True), jpayne@0: annotation=qa_params.get('annotation', '')): jpayne@0: if type(sequenceFiles) == str: #if this is a single file jpayne@0: sequenceFiles = [sequenceFiles] jpayne@0: analysis = partial(config.get(sequencingType, panic), map_f=map_f) jpayne@0: # result = reduce(lambda a, b: a & b, map(analysis, sequenceFiles), ResultTuple.zero()) jpayne@0: result = MergedResultTuple(**{fi:analysis(fi) for fi in sequenceFiles}) jpayne@0: if plots: jpayne@0: try: jpayne@0: from gims_qa_plot import qa_plots jpayne@0: result_df = result.to_dataFrame() jpayne@0: qa_plots(result_df, annotation) jpayne@0: except Exception: jpayne@0: import traceback jpayne@0: traceback.print_exc() jpayne@0: return result jpayne@0: jpayne@0: @classmethod jpayne@0: def of(cls, sequenceFiles, sequencingType): jpayne@0: try: jpayne@0: if not cls.result: jpayne@0: cls.result = QaMemo.analyze(sequenceFiles, sequencingType) jpayne@0: return cls.result jpayne@0: except TypeError as e: jpayne@0: raise ValueError(str(e)), None, sys.exc_info()[2] jpayne@0: jpayne@0: jpayne@0: TaxTuple = namedtuple("TaxTuple", ("taxon", "taxon_prevalence", "contaminant", "contaminant_prevalence", "checksum")) jpayne@0: jpayne@0: jpayne@0: class TaxMemo(object): jpayne@0: "lazy-evaluated Tax check operation" jpayne@0: jpayne@0: result = None # jpayne@0: jpayne@0: @classmethod jpayne@0: def analyze(cls, sequenceFiles, database): jpayne@0: from gims_qa_kraken import run_kraken jpayne@0: return run_kraken(sequenceFiles, database, dict(preload=None, jpayne@0: threads=qa_params.get('parallelism', 1))) jpayne@0: jpayne@0: # ## print cls jpayne@0: # ## print sequenceFiles jpayne@0: # from gims_qa_kraken import runKrakenSeqTest jpayne@0: jpayne@0: # #hash the kraken db jpayne@0: # #this isn't right yet, the db is a directory jpayne@0: # db_hash = md5(open(database, 'rb').read()).hexdigest() jpayne@0: jpayne@0: jpayne@0: # #Run Kraken on the current sequence and collect result and pass to TaxTuple to be written to xml jpayne@0: # KrakenResult = runKrakenSeqtest(sequenceFiles,krakendatabase) jpayne@0: jpayne@0: # #do this more readably lines with list comprehensions jpayne@0: # org_headers = ('OrganismTwo', 'OrganismThree', 'OrganismFour', 'OrganismFive') jpayne@0: # org_percent_headers = ('OrganismTwo%', 'OrganismThree%', 'OrganismFour%', 'OrganismFive%') jpayne@0: # Organisms = [KrakenResult[h].iloc[0] for h in org_headers] jpayne@0: # OrganismsPer = [KrakenResult[h].iloc[0] for h in org_percent_headers] jpayne@0: jpayne@0: jpayne@0: # return TaxTuple(KrakenResult['OrganismOne'].iloc[0], KrakenResult['OrganismOne%'].iloc[0], Organisms, OrganismsPer, db_hash) jpayne@0: jpayne@0: @classmethod jpayne@0: def of(cls, *args, **kwargs): jpayne@0: if not cls.result: jpayne@0: cls.result = TaxMemo.analyze(*args, **kwargs) jpayne@0: return cls.result jpayne@0: jpayne@0: failure_mode = "" jpayne@0: jpayne@0: def update_failure_mode(failure): jpayne@0: global failure_mode jpayne@0: if failure_mode: jpayne@0: failure_mode += "; {}".format(failure) jpayne@0: else: jpayne@0: failure_mode = failure jpayne@0: jpayne@0: jpayne@0: ########################################################################################## jpayne@0: # jpayne@0: # START HERE to design and configure the tests ---------- jpayne@0: # jpayne@0: ########################################################################################## jpayne@0: jpayne@0: jpayne@0: jpayne@0: jpayne@0: @qa_test(disable=True) jpayne@0: def parametersUsed(sequencingType='not given', jpayne@0: coverageThreshold='not given', jpayne@0: approximateGenomeSize=4500000): jpayne@0: "Simple function to save the test and evaluation parameters used" jpayne@0: import xml.etree.ElementTree as xml jpayne@0: el = xml.SubElement jpayne@0: root = xml.Element("parametersUsed") jpayne@0: for key, value in qa_params.items(): jpayne@0: el(root, "parameter", attrib=dict(key=str(key), value=str(value))) jpayne@0: return root jpayne@0: jpayne@0: @qa_test jpayne@0: def readCount(sequenceFileName, sequencingType): jpayne@0: return QaMemo.of(sequenceFileName, sequencingType).readCount jpayne@0: jpayne@0: @qa_test jpayne@0: def coverage(sequenceFileName, sequencingType, approximateGenomeSize=4500000): jpayne@0: return QaMemo.of(sequenceFileName, sequencingType).numBases / int(approximateGenomeSize) jpayne@0: jpayne@0: jpayne@0: # @qa_test(disable=False) jpayne@0: # def coverageQThirtyPlus(sequenceFileName, sequencingType, approximateGenomeSize=4500000, minimumQScore=qa_params.get('user Q threshold', 27.0)): jpayne@0: # numOverMinQ = 0 jpayne@0: # for score, num in QaMemo.of(sequenceFileName, sequencingType).baseQScoreHisto.items(): jpayne@0: # if score >= int(minimumQScore): jpayne@0: # numOverMinQ += num jpayne@0: # return numOverMinQ jpayne@0: jpayne@0: jpayne@0: def mean(counter): jpayne@0: return float(sum([v * c for v, c in counter.items()])) / float(sum(counter.values())) jpayne@0: jpayne@0: jpayne@0: @qa_test jpayne@0: def averageQPerFile(sequenceFileName, sequencingType): jpayne@0: return sorted({fi:mean(res.qualityHisto) for fi,res in QaMemo.of(sequenceFileName, sequencingType)}.items()) jpayne@0: jpayne@0: jpayne@0: def averages_exceed(qualityHisto, q): jpayne@0: return mean(qualityHisto) >= q jpayne@0: jpayne@0: jpayne@0: @qa_test jpayne@0: def qcStatus(sequenceFileName, sequencingType, jpayne@0: coverageThreshold=None, jpayne@0: approximateGenomeSize=4500000, jpayne@0: minimumQScore=qa_params.get('user Q threshold', 27.0), jpayne@0: bbThreshold=qa_params.get('permitted base balance variance', 0.2)): jpayne@0: global failure_mode jpayne@0: results = QaMemo.of(sequenceFileName, sequencingType) jpayne@0: if not all([averages_exceed(r.qualityHisto, minimumQScore) for f, r in results]): jpayne@0: update_failure_mode("qscore below {}".format(minimumQScore)) jpayne@0: return "Fail" jpayne@0: a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] jpayne@0: at = float(a) / float(t) jpayne@0: gc = float(g) / float(c) jpayne@0: if any((at > 1.0 + bbThreshold, at < 1.0 - bbThreshold, gc > 1.0 + bbThreshold, gc < 1.0 - bbThreshold)): jpayne@0: update_failure_mode("AT or GC base-balance outside of {} threshold".format(bbThreshold)) jpayne@0: return "Fail" jpayne@0: if coverageThreshold: jpayne@0: if not (results.numBases / int(approximateGenomeSize)) >= float(coverageThreshold): jpayne@0: update_failure_mode("coverage below {}".format(coverageThreshold)) jpayne@0: return "Fail" jpayne@0: return "Pass" jpayne@0: return "Undetermined" jpayne@0: jpayne@0: jpayne@0: jpayne@0: jpayne@0: def baseBalance(sequenceFileName, sequencingType): jpayne@0: results = QaMemo.of(sequenceFileName, sequencingType) jpayne@0: a, t, g, c = [sum([n[b] for n in results.nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] jpayne@0: return (float(a) / float(t) if t else 1, float(g) / float(c) if c else 1) jpayne@0: jpayne@0: @qa_test jpayne@0: def at_gc_baseBalance(sequenceFileName, sequencingType): jpayne@0: a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] jpayne@0: at = float(a) / float(t) if t else 1 jpayne@0: gc = float(g) / float(c) if c else 1 jpayne@0: return "{:02}/{:02}".format(at, gc) jpayne@0: jpayne@0: @qa_test(disable=True) jpayne@0: def baseBalance(sequenceFileName, sequencingType, threshold=qa_params.get('permitted base balance variance', 0.2) ): jpayne@0: import xml.etree.ElementTree as xml jpayne@0: el = xml.SubElement jpayne@0: a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] jpayne@0: at = float(a) / float(t) if t else 1 jpayne@0: gc = float(g) / float(c) if c else 1 jpayne@0: status = "Pass" jpayne@0: if any((at > 1.0 + threshold, at < 1.0 - threshold, gc > 1.0 + threshold, gc < 1.0 - threshold)): jpayne@0: status = "Fail" jpayne@0: update_failure_mode("Base balance out of variance threshold ({:.2},{:.2} > {})".format(at, gc, threshold)) jpayne@0: root = xml.Element('baseBalanceResult') jpayne@0: el(root, 'AT').text = str(at) jpayne@0: el(root, 'GC').text = str(gc) jpayne@0: el(root, 'baseBalanceStatus').text = status jpayne@0: return root jpayne@0: jpayne@0: jpayne@0: jpayne@0: jpayne@0: jpayne@0: @qa_test(disable = ~os.path.exists(qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB')), jpayne@0: profile=False) jpayne@0: def taxonomyQualityUsingKraken(sequenceFileName, jpayne@0: sequencingType, jpayne@0: genus=None, jpayne@0: species=None, jpayne@0: databasePath=qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB') jpayne@0: ): jpayne@0: jpayne@0: #print sequenceFileName jpayne@0: #print sequencingType jpayne@0: #print sequenceFileName jpayne@0: #print sequencingType jpayne@0: #print genus jpayne@0: #print species jpayne@0: jpayne@0: jpayne@0: import xml.etree.ElementTree as xml jpayne@0: global failure_mode jpayne@0: el = xml.SubElement jpayne@0: res = TaxMemo.of(sequenceFileName, databasePath) jpayne@0: #make return element jpayne@0: root = xml.Element('taxonomyQualityResult') jpayne@0: if not genus: jpayne@0: el(root, 'taxStatus').text = "Undetermined" jpayne@0: else: jpayne@0: if genus in res.taxon.split() and species in res.taxon.split(): jpayne@0: el(root, 'taxStatus').text = "Pass" jpayne@0: else: jpayne@0: el(root, 'taxStatus').text = "Fail" jpayne@0: update_failure_mode('"{} {}" was not in agreement with calculated ID "{}"'.format(genus, species, res.taxon)) jpayne@0: el(root, 'mostPrevalentTaxon', dict(prevalence=str(res.taxon_prevalence))).text = res.taxon[-230:] #last 255 characters jpayne@0: jpayne@0: contams = el(root, 'contaminants') jpayne@0: for i in range(4): jpayne@0: el(contams, 'prevalentContaminant', dict(prevalence=str(res.contaminant_prevalence[i]), ordinal=str(i))).text = res.contaminant[i][-230:] jpayne@0: el(root, 'database', dict(checksum=str(res.checksum))).text = databasePath jpayne@0: jpayne@0: jpayne@0: return root jpayne@0: jpayne@0: jpayne@0: jpayne@0: @qa_test jpayne@0: def reasonForFailure(): jpayne@0: global failure_mode jpayne@0: return failure_mode[0:254] #255 character limit in destination DB field jpayne@0: jpayne@0: @qa_test(disable=True) jpayne@0: def run_serotype_usingseqsero( jpayne@0: sequenceFileName, jpayne@0: sequencingType): jpayne@0: from gims_qa_serotyping import run_serotype_usingseqsero jpayne@0: jpayne@0: seq1 = "" + sequenceFileName[0] jpayne@0: seq2 = "" + sequenceFileName[1] jpayne@0: jpayne@0: return run_serotype_usingseqsero( jpayne@0: seq1, seq2, jpayne@0: sequencingType, jpayne@0: Oantigen="Undetermined", jpayne@0: H1antigen="Undetermined", jpayne@0: H2antigen="Undetermined", jpayne@0: AntigenicProfile="Undetermined", jpayne@0: PredictedSerotype="Undetermined") jpayne@0: jpayne@0: return root