Mercurial > repos > jpayne > cfsan_qaqc
view qa_core/gims_qa_tests.py @ 2:021864b5a0a5 tip
planemo upload
author | jpayne |
---|---|
date | Fri, 11 May 2018 10:25:14 -0400 |
parents | dafec28bd37e |
children |
line wrap: on
line source
from qa_core import qa_test, new_output_file, open_input, readfq, qa_params from collections import namedtuple, defaultdict, Counter from Bio import SeqIO from multiprocessing import Pool, Process, cpu_count from functools import partial from itertools import izip, count, izip_longest, islice from tempfile import NamedTemporaryFile, mkdtemp from subprocess import check_call, CalledProcessError from numpy import genfromtxt from os.path import abspath, join, split, splitext from contextlib import contextmanager from subprocess import call from subprocess import check_output from hashlib import md5 import sys import shutil import os import pandas import datetime import re import glob, string #Test implementation classes and functions. The actual tests are down at the bottom. SuperResultTuple = namedtuple("SuperResultTuple", ("readCount", "numBases", "baseQScoreHisto", "nucleotideHisto", "qualityHisto")) # #Global Variables for kraken methods # krakendatabase = "/scratch/biostat/kraken/miniDB/" # we don't actually need this as a global class ResultTuple(SuperResultTuple): def __add__(self, other): r1, n1, h1, nuc1, qual1 = self r2, n2, h2, nuc2, qual2 = other return ResultTuple(r1 + r2, n1 + n2, [p + q for p, q in izip_longest(h1, h2, fillvalue=Counter())], [p + q for p, q in izip_longest(nuc1, nuc2, fillvalue=Counter())], qual1 + qual2) def __and__(self, other): r1, n1, h1, nuc1, qual1 = self r2, n2, h2, nuc2, qual2 = other return ResultTuple(r1 + r2, n1 + n2, h1 + h2, nuc1 + nuc2, qual1 + qual2) @classmethod def zero(cls): return cls(0, 0, [], [], Counter()) def to_dataFrame(self): try: from pandas import DataFrame #from_dict = partial(DataFrame.from_dict, DataFrame) except: #no pandas def DataFrame(self, data): return ResultTuple(**data) pivotNucHisto = defaultdict(list) for cnt in self.nucleotideHisto: for base in ('A', 'T', 'G', 'C', 'N'): pivotNucHisto[base].append(cnt[base]) pivotQBaseHisto = self.baseQScoreHisto pivotQualHisto = {} for q in range(max(self.qualityHisto.keys())): pivotQualHisto[q] = self.qualityHisto[q] df = {'readCount':self.readCount, 'numBases':self.numBases, 'baseQScoreHisto':pivotQBaseHisto, 'nucleotideHisto':pivotNucHisto, 'qualityHisto':pivotQualHisto} return df class MergedResultTuple(): "Merged multi-file results object. You can iterate on it for filename and ResultTuple pairs, or call its attributes for combined results" def __init__(self, **kwargs): self.per_file = {} self.per_file.update(kwargs) def __getattr__(self, name): return getattr(reduce(lambda p, q: p & q, [v for k, v in sorted(self.per_file.items())], ResultTuple.zero()), name) def __iter__(self): return iter(self.per_file.items()) def analyze_batch_reads(reads): readCount = 0 numBases = 0 baseQScoreHisto = [] nucleotideHisto = [] qualityHisto = Counter() for name, seq, quals in reads: qualityHisto.update(quals) def scan_a_read(read): #for base, qual, index in zip(seq, quals, count(0)): base, qual, index = read try: nucleotideHisto[index][base] += 1 except IndexError: nucleotideHisto.append(Counter(base)) try: baseQScoreHisto[index][qual] += 1 except IndexError: baseQScoreHisto.append(Counter({qual:1})) return index m = map(scan_a_read, zip(seq, quals, count(0))) #numBases is the last index of the read #print(name, m[:3]) numBases += m[-1] readCount += 1 r = ResultTuple(readCount, numBases, baseQScoreHisto, nucleotideHisto, qualityHisto) return r def cluster(slicable, n=2): "return an n-tuple of iterators over an in-memory sequence" #determine the offset size offset = (len(slicable) / n) + 1 #return tuple([islice(slicable, i, i+offset) for i in range(n)]) return tuple([tuple(slicable[i*offset:(i+1)*offset-1]) for i in range(n)]) def tupleize_parse(fileh, format): for read in SeqIO.parse(fileh, format): yield (read.id, str(read.seq), read.letter_annotations['phred_quality']) #@profile def analyze_shortread(seqFile, map_f=map, multi=qa_params.get('parallelism', cpu_count())): "analyze one file whose reads are typical short-read technology" if 'sff' in seqFile: format, quality, mode = 'sff', 'phred_quality', 'rb' rds_f = partial(tupleize_parse, format=format) else: format, quality, mode = 'fastq', 'phred_quality', 'rU' rds_f = readfq with open_input(seqFile) as seqFileH: rds = list(rds_f(seqFileH)) rds = cluster(rds, multi) result = sum(map_f(analyze_batch_reads, rds), ResultTuple.zero()) return result def analyze_longread(seqFile, map_f=map): "analyze one file from pacbio or nanopore" return ResultTuple.zero() #what to do here? if nanopore, plot 2d/1d length vs accuracy def panic(*a, **k): raise ValueError("No analysis method for this sequencing type") config = {'Roche 454':analyze_shortread, 'Illumina MiSeq':analyze_shortread, 'Illumina NextSeq':analyze_shortread, 'Illumina HiSeq':analyze_shortread, 'IonTorrent PGM':analyze_shortread, 'PacBio RS':analyze_longread, 'Oxford Nanopore MiniION':analyze_longread} class QaMemo(object): "lazy-evaluated QA operation" result = None #named tuple results object @classmethod def analyze(cls, sequenceFiles, sequencingType, map_f=qa_params.get('map implementation', map), plots=qa_params.get('plots', True), annotation=qa_params.get('annotation', '')): if type(sequenceFiles) == str: #if this is a single file sequenceFiles = [sequenceFiles] analysis = partial(config.get(sequencingType, panic), map_f=map_f) # result = reduce(lambda a, b: a & b, map(analysis, sequenceFiles), ResultTuple.zero()) result = MergedResultTuple(**{fi:analysis(fi) for fi in sequenceFiles}) if plots: try: from gims_qa_plot import qa_plots result_df = result.to_dataFrame() qa_plots(result_df, annotation) except Exception: import traceback traceback.print_exc() return result @classmethod def of(cls, sequenceFiles, sequencingType): try: if not cls.result: cls.result = QaMemo.analyze(sequenceFiles, sequencingType) return cls.result except TypeError as e: raise ValueError(str(e)), None, sys.exc_info()[2] TaxTuple = namedtuple("TaxTuple", ("taxon", "taxon_prevalence", "contaminant", "contaminant_prevalence", "checksum")) class TaxMemo(object): "lazy-evaluated Tax check operation" result = None # @classmethod def analyze(cls, sequenceFiles, database): from gims_qa_kraken import run_kraken return run_kraken(sequenceFiles, database, dict(preload=None, threads=qa_params.get('parallelism', 1))) # ## print cls # ## print sequenceFiles # from gims_qa_kraken import runKrakenSeqTest # #hash the kraken db # #this isn't right yet, the db is a directory # db_hash = md5(open(database, 'rb').read()).hexdigest() # #Run Kraken on the current sequence and collect result and pass to TaxTuple to be written to xml # KrakenResult = runKrakenSeqtest(sequenceFiles,krakendatabase) # #do this more readably lines with list comprehensions # org_headers = ('OrganismTwo', 'OrganismThree', 'OrganismFour', 'OrganismFive') # org_percent_headers = ('OrganismTwo%', 'OrganismThree%', 'OrganismFour%', 'OrganismFive%') # Organisms = [KrakenResult[h].iloc[0] for h in org_headers] # OrganismsPer = [KrakenResult[h].iloc[0] for h in org_percent_headers] # return TaxTuple(KrakenResult['OrganismOne'].iloc[0], KrakenResult['OrganismOne%'].iloc[0], Organisms, OrganismsPer, db_hash) @classmethod def of(cls, *args, **kwargs): if not cls.result: cls.result = TaxMemo.analyze(*args, **kwargs) return cls.result failure_mode = "" def update_failure_mode(failure): global failure_mode if failure_mode: failure_mode += "; {}".format(failure) else: failure_mode = failure ########################################################################################## # # START HERE to design and configure the tests ---------- # ########################################################################################## @qa_test(disable=True) def parametersUsed(sequencingType='not given', coverageThreshold='not given', approximateGenomeSize=4500000): "Simple function to save the test and evaluation parameters used" import xml.etree.ElementTree as xml el = xml.SubElement root = xml.Element("parametersUsed") for key, value in qa_params.items(): el(root, "parameter", attrib=dict(key=str(key), value=str(value))) return root @qa_test def readCount(sequenceFileName, sequencingType): return QaMemo.of(sequenceFileName, sequencingType).readCount @qa_test def coverage(sequenceFileName, sequencingType, approximateGenomeSize=4500000): return QaMemo.of(sequenceFileName, sequencingType).numBases / int(approximateGenomeSize) # @qa_test(disable=False) # def coverageQThirtyPlus(sequenceFileName, sequencingType, approximateGenomeSize=4500000, minimumQScore=qa_params.get('user Q threshold', 27.0)): # numOverMinQ = 0 # for score, num in QaMemo.of(sequenceFileName, sequencingType).baseQScoreHisto.items(): # if score >= int(minimumQScore): # numOverMinQ += num # return numOverMinQ def mean(counter): return float(sum([v * c for v, c in counter.items()])) / float(sum(counter.values())) @qa_test def averageQPerFile(sequenceFileName, sequencingType): return sorted({fi:mean(res.qualityHisto) for fi,res in QaMemo.of(sequenceFileName, sequencingType)}.items()) def averages_exceed(qualityHisto, q): return mean(qualityHisto) >= q @qa_test def qcStatus(sequenceFileName, sequencingType, coverageThreshold=None, approximateGenomeSize=4500000, minimumQScore=qa_params.get('user Q threshold', 27.0), bbThreshold=qa_params.get('permitted base balance variance', 0.2)): global failure_mode results = QaMemo.of(sequenceFileName, sequencingType) if not all([averages_exceed(r.qualityHisto, minimumQScore) for f, r in results]): update_failure_mode("qscore below {}".format(minimumQScore)) return "Fail" a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] at = float(a) / float(t) gc = float(g) / float(c) if any((at > 1.0 + bbThreshold, at < 1.0 - bbThreshold, gc > 1.0 + bbThreshold, gc < 1.0 - bbThreshold)): update_failure_mode("AT or GC base-balance outside of {} threshold".format(bbThreshold)) return "Fail" if coverageThreshold: if not (results.numBases / int(approximateGenomeSize)) >= float(coverageThreshold): update_failure_mode("coverage below {}".format(coverageThreshold)) return "Fail" return "Pass" return "Undetermined" def baseBalance(sequenceFileName, sequencingType): results = QaMemo.of(sequenceFileName, sequencingType) a, t, g, c = [sum([n[b] for n in results.nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] return (float(a) / float(t) if t else 1, float(g) / float(c) if c else 1) @qa_test def at_gc_baseBalance(sequenceFileName, sequencingType): a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] at = float(a) / float(t) if t else 1 gc = float(g) / float(c) if c else 1 return "{:02}/{:02}".format(at, gc) @qa_test(disable=True) def baseBalance(sequenceFileName, sequencingType, threshold=qa_params.get('permitted base balance variance', 0.2) ): import xml.etree.ElementTree as xml el = xml.SubElement a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')] at = float(a) / float(t) if t else 1 gc = float(g) / float(c) if c else 1 status = "Pass" if any((at > 1.0 + threshold, at < 1.0 - threshold, gc > 1.0 + threshold, gc < 1.0 - threshold)): status = "Fail" update_failure_mode("Base balance out of variance threshold ({:.2},{:.2} > {})".format(at, gc, threshold)) root = xml.Element('baseBalanceResult') el(root, 'AT').text = str(at) el(root, 'GC').text = str(gc) el(root, 'baseBalanceStatus').text = status return root @qa_test(disable = ~os.path.exists(qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB')), profile=False) def taxonomyQualityUsingKraken(sequenceFileName, sequencingType, genus=None, species=None, databasePath=qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB') ): #print sequenceFileName #print sequencingType #print sequenceFileName #print sequencingType #print genus #print species import xml.etree.ElementTree as xml global failure_mode el = xml.SubElement res = TaxMemo.of(sequenceFileName, databasePath) #make return element root = xml.Element('taxonomyQualityResult') if not genus: el(root, 'taxStatus').text = "Undetermined" else: if genus in res.taxon.split() and species in res.taxon.split(): el(root, 'taxStatus').text = "Pass" else: el(root, 'taxStatus').text = "Fail" update_failure_mode('"{} {}" was not in agreement with calculated ID "{}"'.format(genus, species, res.taxon)) el(root, 'mostPrevalentTaxon', dict(prevalence=str(res.taxon_prevalence))).text = res.taxon[-230:] #last 255 characters contams = el(root, 'contaminants') for i in range(4): el(contams, 'prevalentContaminant', dict(prevalence=str(res.contaminant_prevalence[i]), ordinal=str(i))).text = res.contaminant[i][-230:] el(root, 'database', dict(checksum=str(res.checksum))).text = databasePath return root @qa_test def reasonForFailure(): global failure_mode return failure_mode[0:254] #255 character limit in destination DB field @qa_test(disable=True) def run_serotype_usingseqsero( sequenceFileName, sequencingType): from gims_qa_serotyping import run_serotype_usingseqsero seq1 = "" + sequenceFileName[0] seq2 = "" + sequenceFileName[1] return run_serotype_usingseqsero( seq1, seq2, sequencingType, Oantigen="Undetermined", H1antigen="Undetermined", H2antigen="Undetermined", AntigenicProfile="Undetermined", PredictedSerotype="Undetermined") return root