view qa_core/gims_qa_tests.py @ 2:021864b5a0a5 tip

planemo upload
author jpayne
date Fri, 11 May 2018 10:25:14 -0400
parents dafec28bd37e
children
line wrap: on
line source
from qa_core import qa_test, new_output_file, open_input, readfq, qa_params
from collections import namedtuple, defaultdict, Counter
from Bio import SeqIO
from multiprocessing import Pool, Process, cpu_count
from functools import partial
from itertools import izip, count, izip_longest, islice
from tempfile import NamedTemporaryFile, mkdtemp
from subprocess import check_call, CalledProcessError
from numpy import genfromtxt
from os.path import abspath, join, split, splitext
from contextlib import contextmanager
from subprocess import call
from subprocess import check_output
from hashlib import md5


import sys
import shutil
import os
import pandas
import datetime
import re
import glob, string

#Test implementation classes and functions. The actual tests are down at the bottom.

SuperResultTuple = namedtuple("SuperResultTuple", ("readCount", "numBases", "baseQScoreHisto", "nucleotideHisto", "qualityHisto"))

# #Global Variables for kraken methods
# krakendatabase = "/scratch/biostat/kraken/miniDB/"
# we don't actually need this as a global

class ResultTuple(SuperResultTuple):
	def __add__(self, other):
		r1, n1, h1, nuc1, qual1 = self
 		r2, n2, h2, nuc2, qual2 = other
 		return ResultTuple(r1 + r2, 
						   n1 + n2, 
						   [p + q for p, q in izip_longest(h1, h2, fillvalue=Counter())], 
						   [p + q for p, q in izip_longest(nuc1, nuc2, fillvalue=Counter())], 
						   qual1 + qual2)
 		
 	def __and__(self, other):
 		r1, n1, h1, nuc1, qual1 = self
 		r2, n2, h2, nuc2, qual2 = other
 		return ResultTuple(r1 + r2, 
						   n1 + n2,
						   h1 + h2,
						   nuc1 + nuc2,
						   qual1 + qual2)
 		
 	@classmethod
 	def zero(cls):
 		return cls(0, 0, [], [], Counter())
 		
 	def to_dataFrame(self):
		try:
 			from pandas import DataFrame
 			#from_dict = partial(DataFrame.from_dict, DataFrame)
 		except:
 			#no pandas
 			def DataFrame(self, data):
 				return ResultTuple(**data)
 		
		pivotNucHisto = defaultdict(list)
		for cnt in self.nucleotideHisto:
			for base in ('A', 'T', 'G', 'C', 'N'):
				pivotNucHisto[base].append(cnt[base])
		pivotQBaseHisto = self.baseQScoreHisto
		pivotQualHisto = {}
		for q in range(max(self.qualityHisto.keys())):
			pivotQualHisto[q] = self.qualityHisto[q]
		df = {'readCount':self.readCount,
			  'numBases':self.numBases,
			  'baseQScoreHisto':pivotQBaseHisto,
			  'nucleotideHisto':pivotNucHisto,
			  'qualityHisto':pivotQualHisto}
		return df
		
class MergedResultTuple():
	"Merged multi-file results object. You can iterate on it for filename and ResultTuple pairs, or call its attributes for combined results"
	def __init__(self, **kwargs):
		self.per_file = {}
		self.per_file.update(kwargs)
		
	def __getattr__(self, name):
		return getattr(reduce(lambda p, q: p & q, [v for k, v in sorted(self.per_file.items())], ResultTuple.zero()), name)
		
	def __iter__(self):
		return iter(self.per_file.items())


def analyze_batch_reads(reads):
	readCount = 0
	numBases = 0
	baseQScoreHisto = []
	nucleotideHisto = []
	qualityHisto = Counter()
	for name, seq, quals in reads:
		qualityHisto.update(quals)
		def scan_a_read(read):
		#for base, qual, index in zip(seq, quals, count(0)):
			base, qual, index = read
			try:
				nucleotideHisto[index][base] += 1
			except IndexError:
				nucleotideHisto.append(Counter(base))
			try:
				baseQScoreHisto[index][qual] += 1
			except IndexError:
				baseQScoreHisto.append(Counter({qual:1}))
			return index
		m = map(scan_a_read, zip(seq, quals, count(0))) #numBases is the last index of the read
		#print(name, m[:3])
		numBases += m[-1]
		readCount += 1
	r = ResultTuple(readCount, numBases, baseQScoreHisto, nucleotideHisto, qualityHisto)
	return r

	
def cluster(slicable, n=2):
	"return an n-tuple of iterators over an in-memory sequence"
	#determine the offset size
	offset = (len(slicable) / n) + 1
	#return tuple([islice(slicable, i, i+offset) for i in range(n)])
	return tuple([tuple(slicable[i*offset:(i+1)*offset-1]) for i in range(n)])
	
	
def tupleize_parse(fileh, format):
	for read in SeqIO.parse(fileh, format):
		yield (read.id, str(read.seq), read.letter_annotations['phred_quality'])

#@profile
def analyze_shortread(seqFile, map_f=map, multi=qa_params.get('parallelism', cpu_count())):
	"analyze one file whose reads are typical short-read technology"
	if 'sff' in seqFile:
		format, quality, mode = 'sff', 'phred_quality', 'rb'
		rds_f = partial(tupleize_parse, format=format)
	else:
		format, quality, mode = 'fastq', 'phred_quality', 'rU'
		rds_f = readfq
	with open_input(seqFile) as seqFileH:
		rds = list(rds_f(seqFileH))
		rds = cluster(rds, multi)
		result = sum(map_f(analyze_batch_reads, rds), ResultTuple.zero())
	return result


def analyze_longread(seqFile, map_f=map):
	"analyze one file from pacbio or nanopore"

	return ResultTuple.zero()
	#what to do here? if nanopore, plot 2d/1d length vs accuracy


def panic(*a, **k):
	raise ValueError("No analysis method for this sequencing type")

config = {'Roche 454':analyze_shortread,
		  'Illumina MiSeq':analyze_shortread,
		  'Illumina NextSeq':analyze_shortread,
		  'Illumina HiSeq':analyze_shortread,
		  'IonTorrent PGM':analyze_shortread,
		  'PacBio RS':analyze_longread,
		  'Oxford Nanopore MiniION':analyze_longread}


class QaMemo(object):
	"lazy-evaluated QA operation"
	
	result = None #named tuple results object
	
	@classmethod
	def analyze(cls, 
				sequenceFiles, 
				sequencingType, 
				map_f=qa_params.get('map implementation', map), 
				plots=qa_params.get('plots', True),
				annotation=qa_params.get('annotation', '')):
		if type(sequenceFiles) == str: #if this is a single file
			sequenceFiles = [sequenceFiles]
		analysis = partial(config.get(sequencingType, panic), map_f=map_f)
#		result = reduce(lambda a, b: a & b, map(analysis, sequenceFiles), ResultTuple.zero())
		result = MergedResultTuple(**{fi:analysis(fi) for fi in sequenceFiles})
		if plots:
			try:
				from gims_qa_plot import qa_plots
				result_df = result.to_dataFrame()
				qa_plots(result_df, annotation)
			except Exception:
				import traceback
				traceback.print_exc()
		return result
	
	@classmethod
	def of(cls, sequenceFiles, sequencingType):
		try:
			if not cls.result:
				cls.result = QaMemo.analyze(sequenceFiles, sequencingType)
			return cls.result
		except TypeError as e:
			raise ValueError(str(e)), None, sys.exc_info()[2]
			
			
TaxTuple = namedtuple("TaxTuple", ("taxon", "taxon_prevalence", "contaminant", "contaminant_prevalence", "checksum"))
			
			
class TaxMemo(object):
	"lazy-evaluated Tax check operation"
	
	result = None #
	
	@classmethod
	def analyze(cls, sequenceFiles, database):
		from gims_qa_kraken import run_kraken
		return run_kraken(sequenceFiles, database, dict(preload=None,
														threads=qa_params.get('parallelism', 1)))
	
	# ##	print cls
	# ##	print sequenceFiles		
	# 	from gims_qa_kraken import runKrakenSeqTest

	# 	#hash the kraken db
	# 	#this isn't right yet, the db is a directory
	# 	db_hash = md5(open(database, 'rb').read()).hexdigest()

	
	# 	#Run Kraken on the current sequence and collect result and pass to TaxTuple to be written to xml
    #     KrakenResult = runKrakenSeqtest(sequenceFiles,krakendatabase)

	# 	#do this more readably lines with list comprehensions
	# 	org_headers = ('OrganismTwo', 'OrganismThree', 'OrganismFour', 'OrganismFive')
	# 	org_percent_headers = ('OrganismTwo%', 'OrganismThree%', 'OrganismFour%', 'OrganismFive%')
  	# 	Organisms  = [KrakenResult[h].iloc[0] for h in org_headers]
    #     OrganismsPer  = [KrakenResult[h].iloc[0] for h in org_percent_headers]


	# 	return TaxTuple(KrakenResult['OrganismOne'].iloc[0], KrakenResult['OrganismOne%'].iloc[0], Organisms, OrganismsPer, db_hash) 
		
	@classmethod
	def of(cls, *args, **kwargs):
		if not cls.result:
			cls.result = TaxMemo.analyze(*args, **kwargs)
		return cls.result

failure_mode = ""

def update_failure_mode(failure):
	global failure_mode
	if failure_mode:
		failure_mode += "; {}".format(failure)
	else:
		failure_mode = failure
			
			
##########################################################################################
#
#	START HERE to design and configure the tests ----------
#
##########################################################################################




@qa_test(disable=True)
def parametersUsed(sequencingType='not given', 
				   coverageThreshold='not given', 
				   approximateGenomeSize=4500000):
	"Simple function to save the test and evaluation parameters used"
	import xml.etree.ElementTree as xml
	el = xml.SubElement
	root = xml.Element("parametersUsed")
	for key, value in qa_params.items():
		el(root, "parameter", attrib=dict(key=str(key), value=str(value)))
	return root

@qa_test
def readCount(sequenceFileName, sequencingType):
	return QaMemo.of(sequenceFileName, sequencingType).readCount
	
@qa_test
def coverage(sequenceFileName, sequencingType, approximateGenomeSize=4500000):
	return QaMemo.of(sequenceFileName, sequencingType).numBases / int(approximateGenomeSize)

	
# @qa_test(disable=False)
# def coverageQThirtyPlus(sequenceFileName, sequencingType, approximateGenomeSize=4500000, minimumQScore=qa_params.get('user Q threshold', 27.0)):
# 	numOverMinQ = 0
# 	for score, num in QaMemo.of(sequenceFileName, sequencingType).baseQScoreHisto.items():
# 		if score >= int(minimumQScore):
# 			numOverMinQ += num
# 	return numOverMinQ
	
	
def mean(counter):
	return float(sum([v * c for v, c in counter.items()])) / float(sum(counter.values()))
		

@qa_test
def averageQPerFile(sequenceFileName, sequencingType):
	return sorted({fi:mean(res.qualityHisto) for fi,res in QaMemo.of(sequenceFileName, sequencingType)}.items())
	
		
def averages_exceed(qualityHisto, q):
	return mean(qualityHisto) >= q


@qa_test	
def qcStatus(sequenceFileName, sequencingType, 
			 coverageThreshold=None, 
			 approximateGenomeSize=4500000, 
			 minimumQScore=qa_params.get('user Q threshold', 27.0),
			 bbThreshold=qa_params.get('permitted base balance variance', 0.2)):
	global failure_mode
	results = QaMemo.of(sequenceFileName, sequencingType)
	if not all([averages_exceed(r.qualityHisto, minimumQScore) for f, r in results]):
		update_failure_mode("qscore below {}".format(minimumQScore))
		return "Fail"
	a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
	at = float(a) / float(t)
	gc = float(g) / float(c)
	if any((at > 1.0 + bbThreshold, at < 1.0 - bbThreshold, gc > 1.0 + bbThreshold, gc < 1.0 - bbThreshold)):
		update_failure_mode("AT or GC base-balance outside of {} threshold".format(bbThreshold))
		return "Fail"
	if coverageThreshold:
		if not (results.numBases / int(approximateGenomeSize)) >= float(coverageThreshold):
			update_failure_mode("coverage below {}".format(coverageThreshold))
			return "Fail"
		return "Pass"
	return "Undetermined"
	



def baseBalance(sequenceFileName, sequencingType):
	results = QaMemo.of(sequenceFileName, sequencingType)
	a, t, g, c = [sum([n[b] for n in results.nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
	return (float(a) / float(t) if t else 1, float(g) / float(c) if c else 1)

@qa_test
def at_gc_baseBalance(sequenceFileName, sequencingType):
	a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
	at = float(a) / float(t) if t else 1
	gc = float(g) / float(c) if c else 1
	return "{:02}/{:02}".format(at, gc)

@qa_test(disable=True)
def baseBalance(sequenceFileName, sequencingType, threshold=qa_params.get('permitted base balance variance', 0.2) ):
	import xml.etree.ElementTree as xml
	el = xml.SubElement
	a, t, g, c = [sum([n[b] for n in QaMemo.of(sequenceFileName, sequencingType).nucleotideHisto]) for b in ('A', 'T', 'G', 'C')]
	at = float(a) / float(t) if t else 1
	gc = float(g) / float(c) if c else 1
	status = "Pass"
	if any((at > 1.0 + threshold, at < 1.0 - threshold, gc > 1.0 + threshold, gc < 1.0 - threshold)):
		status = "Fail"
		update_failure_mode("Base balance out of variance threshold ({:.2},{:.2} > {})".format(at, gc, threshold))
	root = xml.Element('baseBalanceResult')
	el(root, 'AT').text = str(at)
	el(root, 'GC').text = str(gc)
	el(root, 'baseBalanceStatus').text = status
	return root





@qa_test(disable = ~os.path.exists(qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB')), 
		 profile=False)
def taxonomyQualityUsingKraken(sequenceFileName, 
							   sequencingType,
							   genus=None,
							   species=None, 
							   databasePath=qa_params.get('path to kraken database', '/scratch/biostat/kraken/miniDB')
							   ):

	#print sequenceFileName
        #print sequencingType
	#print sequenceFileName
        #print sequencingType
	#print genus
	#print species


	import xml.etree.ElementTree as xml
	global failure_mode
	el = xml.SubElement
	res = TaxMemo.of(sequenceFileName, databasePath)
	#make return element
	root = xml.Element('taxonomyQualityResult')
	if not genus:
 		el(root, 'taxStatus').text = "Undetermined"
 	else:
 		if genus in res.taxon.split() and species in res.taxon.split():
 			el(root, 'taxStatus').text = "Pass"
 		else:
 			el(root, 'taxStatus').text = "Fail"
 			update_failure_mode('"{} {}" was not in agreement with calculated ID "{}"'.format(genus, species, res.taxon))
 	el(root, 'mostPrevalentTaxon', dict(prevalence=str(res.taxon_prevalence))).text = res.taxon[-230:] #last 255 characters
 	
	contams = el(root, 'contaminants')
	for i in range(4):
		el(contams, 'prevalentContaminant', dict(prevalence=str(res.contaminant_prevalence[i]), ordinal=str(i))).text = res.contaminant[i][-230:]
	el(root, 'database', dict(checksum=str(res.checksum))).text = databasePath
	
	
	return root



@qa_test
def reasonForFailure():
	global failure_mode
	return failure_mode[0:254] #255 character limit in destination DB field

@qa_test(disable=True)
def run_serotype_usingseqsero(
	sequenceFileName, 
	sequencingType):
	from gims_qa_serotyping import run_serotype_usingseqsero
	
	seq1 = "" + sequenceFileName[0]
	seq2 = "" + sequenceFileName[1]

	return run_serotype_usingseqsero(
	seq1, seq2,
							   sequencingType,
							   Oantigen="Undetermined",
							   H1antigen="Undetermined", 
							   H2antigen="Undetermined",
							   AntigenicProfile="Undetermined",
							   PredictedSerotype="Undetermined")

	return root