Mercurial > repos > kkonganti > hfp_nowayout
diff 0.5.0/bin/gen_sim_abn_table.py @ 0:97cd2f532efe
planemo upload
author | kkonganti |
---|---|
date | Mon, 31 Mar 2025 14:50:40 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.5.0/bin/gen_sim_abn_table.py Mon Mar 31 14:50:40 2025 -0400 @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 + +# Kranti Konganti + +import argparse +import glob +import inspect +import logging +import os +import pprint +import re +from collections import defaultdict + + +# Multiple inheritence for pretty printing of help text. +class MultiArgFormatClasses( + argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter +): + pass + + +# Main +def main() -> None: + """ + This script will take the final taxonomic classification files and create a + global relative abundance type file in the current working directory. The + relative abundance type files should be in CSV or TSV format and should have + the lineage or taxonomy in first column and samples in the subsequent columns. + """ + # Set logging. + logging.basicConfig( + format="\n" + + "=" * 55 + + "\n%(asctime)s - %(levelname)s\n" + + "=" * 55 + + "\n%(message)s\n\n", + level=logging.DEBUG, + ) + + # Debug print. + ppp = pprint.PrettyPrinter(width=55) + prog_name = inspect.stack()[0].filename + + parser = argparse.ArgumentParser( + prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses + ) + + required = parser.add_argument_group("required arguments") + + required.add_argument( + "-abn", + dest="rel_abn_dir", + default=False, + required=True, + help="Absolute UNIX path to the parent directory that contains the\n" + + "abundance type files.", + ) + parser.add_argument( + "-op", + dest="out_prefix", + default="nowayout.tblsum", + required=False, + help="Set the output file(s) prefix for output(s) generated\nby this program.", + ) + parser.add_argument( + "-header", + dest="header", + action="store_true", + default=True, + required=False, + help="Do the relative abundance files have a header.", + ) + parser.add_argument( + "-filepat", + dest="file_pat", + default="*.lineage_summary.tsv", + required=False, + help="Files will be searched by this suffix for merged output generation\nby this program.", + ) + parser.add_argument( + "-failedfilepat", + dest="failed_file_pat", + default="*FAILED.txt", + required=False, + help="Files will be searched by this suffix for merged output generation\nby this program.", + ) + parser.add_argument( + "-delim", + dest="delim", + default="\t", + required=False, + help="The delimitor by which the fields are separated in the file.", + ) + + args = parser.parse_args() + rel_abn_dir = args.rel_abn_dir + is_header = args.header + out_prefix = args.out_prefix + file_pat = args.file_pat + failed_file_pat = args.failed_file_pat + delim = args.delim + suffix = re.sub(r"^\*", "", file_pat) + rel_abn_comb = os.path.join(os.getcwd(), out_prefix + ".txt") + rel_abn_files = glob.glob(os.path.join(rel_abn_dir, file_pat)) + failed_rel_abn_files = glob.glob(os.path.join(rel_abn_dir, failed_file_pat)) + empty_results = "Relative abundance results did not pass thresholds" + sample2lineage, seen_lineage = (defaultdict(defaultdict), defaultdict(int)) + + if len(rel_abn_files) == 0: + logging.info( + "Unable to find any files with .tsv extentsion.\nNow trying .csv extension." + ) + rel_abn_files = glob.glob(os.path.join(rel_abn_dir, "*.csv")) + delim = "," + + if len(failed_rel_abn_files) == 0: + logging.info( + f"Unable to find any files with patttern {failed_file_pat}.\n" + + "The failed samples will not appear in the final aggregate file." + ) + + if rel_abn_dir: + if not os.path.isdir(rel_abn_dir): + logging.error("UNIX path\n" + f"{rel_abn_dir}\n" + "does not exist!") + exit(1) + if len(rel_abn_files) <= 0: + with open(rel_abn_comb, "w") as rel_abn_comb_fh: + rel_abn_comb_fh.write(f"Sample\n{empty_results} in any samples\n") + rel_abn_comb_fh.close() + exit(0) + + for failed_rel_abn in failed_rel_abn_files: + with open(failed_rel_abn, "r") as failed_fh: + sample2lineage[failed_fh.readline().strip()].setdefault( + "unclassified", [] + ).append(float("1.0")) + failed_fh.close() + + for rel_abn_file in rel_abn_files: + sample_name = re.match(r"(^.+?)\..*$", os.path.basename(rel_abn_file))[1] + + with open(rel_abn_file, "r") as rel_abn_fh: + if is_header: + sample_names = rel_abn_fh.readline().strip().split(delim)[1:] + if len(sample_names) > 2: + logging.error( + "The individual relative abundance file has more " + + "\nthan 1 sample. This is rare in the context of running the " + + "\n nowayout Nextflow workflow." + ) + exit(1) + elif len(sample_names) < 2: + sample_name = re.sub(suffix, "", os.path.basename(rel_abn_file)) + logging.info( + "Seems like there is no sample name in the lineage summary file." + + f"\nTherefore, sample name has been extracted from file name: {sample_name}." + ) + else: + sample_name = sample_names[0] + + for line in rel_abn_fh.readlines(): + cols = line.strip().split(delim) + lineage = cols[0] + abn = cols[1] + sample2lineage[sample_name].setdefault(lineage, []).append( + float(abn) + ) + seen_lineage[lineage] = 1 + + with open(rel_abn_comb, "w") as rel_abn_comb_fh: + samples = sorted(sample2lineage.keys()) + rel_abn_comb_fh.write(f"Lineage{delim}" + delim.join(samples) + "\n") + + for lineage in sorted(seen_lineage.keys()): + rel_abn_comb_fh.write(lineage) + for sample in samples: + if lineage in sample2lineage[sample].keys(): + rel_abn_comb_fh.write( + delim + + "".join( + [str(abn) for abn in sample2lineage[sample][lineage]] + ) + ) + else: + rel_abn_comb_fh.write(f"{delim}0.0") + rel_abn_comb_fh.write("\n") + rel_abn_comb_fh.close() + + +if __name__ == "__main__": + main()