kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: # Kranti Konganti kkonganti@0: kkonganti@0: import argparse kkonganti@0: import glob kkonganti@0: import inspect kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import pprint kkonganti@0: import re kkonganti@0: from collections import defaultdict kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses( kkonganti@0: argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter kkonganti@0: ): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: # Main kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script will take the final taxonomic classification files and create a kkonganti@0: global relative abundance type file in the current working directory. The kkonganti@0: relative abundance type files should be in CSV or TSV format and should have kkonganti@0: the lineage or taxonomy in first column and samples in the subsequent columns. kkonganti@0: """ kkonganti@0: # Set logging. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(asctime)s - %(levelname)s\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(message)s\n\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: # Debug print. kkonganti@0: ppp = pprint.PrettyPrinter(width=55) kkonganti@0: prog_name = inspect.stack()[0].filename kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-abn", kkonganti@0: dest="rel_abn_dir", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to the parent directory that contains the\n" kkonganti@0: + "abundance type files.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-op", kkonganti@0: dest="out_prefix", kkonganti@0: default="nowayout.tblsum", kkonganti@0: required=False, kkonganti@0: help="Set the output file(s) prefix for output(s) generated\nby this program.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-header", kkonganti@0: dest="header", kkonganti@0: action="store_true", kkonganti@0: default=True, kkonganti@0: required=False, kkonganti@0: help="Do the relative abundance files have a header.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-filepat", kkonganti@0: dest="file_pat", kkonganti@0: default="*.lineage_summary.tsv", kkonganti@0: required=False, kkonganti@0: help="Files will be searched by this suffix for merged output generation\nby this program.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-failedfilepat", kkonganti@0: dest="failed_file_pat", kkonganti@0: default="*FAILED.txt", kkonganti@0: required=False, kkonganti@0: help="Files will be searched by this suffix for merged output generation\nby this program.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-delim", kkonganti@0: dest="delim", kkonganti@0: default="\t", kkonganti@0: required=False, kkonganti@0: help="The delimitor by which the fields are separated in the file.", kkonganti@0: ) kkonganti@0: kkonganti@0: args = parser.parse_args() kkonganti@0: rel_abn_dir = args.rel_abn_dir kkonganti@0: is_header = args.header kkonganti@0: out_prefix = args.out_prefix kkonganti@0: file_pat = args.file_pat kkonganti@0: failed_file_pat = args.failed_file_pat kkonganti@0: delim = args.delim kkonganti@0: suffix = re.sub(r"^\*", "", file_pat) kkonganti@0: rel_abn_comb = os.path.join(os.getcwd(), out_prefix + ".txt") kkonganti@0: rel_abn_files = glob.glob(os.path.join(rel_abn_dir, file_pat)) kkonganti@0: failed_rel_abn_files = glob.glob(os.path.join(rel_abn_dir, failed_file_pat)) kkonganti@0: empty_results = "Relative abundance results did not pass thresholds" kkonganti@0: sample2lineage, seen_lineage = (defaultdict(defaultdict), defaultdict(int)) kkonganti@0: kkonganti@0: if len(rel_abn_files) == 0: kkonganti@0: logging.info( kkonganti@0: "Unable to find any files with .tsv extentsion.\nNow trying .csv extension." kkonganti@0: ) kkonganti@0: rel_abn_files = glob.glob(os.path.join(rel_abn_dir, "*.csv")) kkonganti@0: delim = "," kkonganti@0: kkonganti@0: if len(failed_rel_abn_files) == 0: kkonganti@0: logging.info( kkonganti@0: f"Unable to find any files with patttern {failed_file_pat}.\n" kkonganti@0: + "The failed samples will not appear in the final aggregate file." kkonganti@0: ) kkonganti@0: kkonganti@0: if rel_abn_dir: kkonganti@0: if not os.path.isdir(rel_abn_dir): kkonganti@0: logging.error("UNIX path\n" + f"{rel_abn_dir}\n" + "does not exist!") kkonganti@0: exit(1) kkonganti@0: if len(rel_abn_files) <= 0: kkonganti@0: with open(rel_abn_comb, "w") as rel_abn_comb_fh: kkonganti@0: rel_abn_comb_fh.write(f"Sample\n{empty_results} in any samples\n") kkonganti@0: rel_abn_comb_fh.close() kkonganti@0: exit(0) kkonganti@0: kkonganti@0: for failed_rel_abn in failed_rel_abn_files: kkonganti@0: with open(failed_rel_abn, "r") as failed_fh: kkonganti@0: sample2lineage[failed_fh.readline().strip()].setdefault( kkonganti@0: "unclassified", [] kkonganti@0: ).append(float("1.0")) kkonganti@0: failed_fh.close() kkonganti@0: kkonganti@0: for rel_abn_file in rel_abn_files: kkonganti@0: sample_name = re.match(r"(^.+?)\..*$", os.path.basename(rel_abn_file))[1] kkonganti@0: kkonganti@0: with open(rel_abn_file, "r") as rel_abn_fh: kkonganti@0: if is_header: kkonganti@0: sample_names = rel_abn_fh.readline().strip().split(delim)[1:] kkonganti@0: if len(sample_names) > 2: kkonganti@0: logging.error( kkonganti@0: "The individual relative abundance file has more " kkonganti@0: + "\nthan 1 sample. This is rare in the context of running the " kkonganti@0: + "\n nowayout Nextflow workflow." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif len(sample_names) < 2: kkonganti@0: sample_name = re.sub(suffix, "", os.path.basename(rel_abn_file)) kkonganti@0: logging.info( kkonganti@0: "Seems like there is no sample name in the lineage summary file." kkonganti@0: + f"\nTherefore, sample name has been extracted from file name: {sample_name}." kkonganti@0: ) kkonganti@0: else: kkonganti@0: sample_name = sample_names[0] kkonganti@0: kkonganti@0: for line in rel_abn_fh.readlines(): kkonganti@0: cols = line.strip().split(delim) kkonganti@0: lineage = cols[0] kkonganti@0: abn = cols[1] kkonganti@0: sample2lineage[sample_name].setdefault(lineage, []).append( kkonganti@0: float(abn) kkonganti@0: ) kkonganti@0: seen_lineage[lineage] = 1 kkonganti@0: kkonganti@0: with open(rel_abn_comb, "w") as rel_abn_comb_fh: kkonganti@0: samples = sorted(sample2lineage.keys()) kkonganti@0: rel_abn_comb_fh.write(f"Lineage{delim}" + delim.join(samples) + "\n") kkonganti@0: kkonganti@0: for lineage in sorted(seen_lineage.keys()): kkonganti@0: rel_abn_comb_fh.write(lineage) kkonganti@0: for sample in samples: kkonganti@0: if lineage in sample2lineage[sample].keys(): kkonganti@0: rel_abn_comb_fh.write( kkonganti@0: delim kkonganti@0: + "".join( kkonganti@0: [str(abn) for abn in sample2lineage[sample][lineage]] kkonganti@0: ) kkonganti@0: ) kkonganti@0: else: kkonganti@0: rel_abn_comb_fh.write(f"{delim}0.0") kkonganti@0: rel_abn_comb_fh.write("\n") kkonganti@0: rel_abn_comb_fh.close() kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main()