comparison 0.5.0/bin/gen_sim_abn_table.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:97cd2f532efe
1 #!/usr/bin/env python3
2
3 # Kranti Konganti
4
5 import argparse
6 import glob
7 import inspect
8 import logging
9 import os
10 import pprint
11 import re
12 from collections import defaultdict
13
14
15 # Multiple inheritence for pretty printing of help text.
16 class MultiArgFormatClasses(
17 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
18 ):
19 pass
20
21
22 # Main
23 def main() -> None:
24 """
25 This script will take the final taxonomic classification files and create a
26 global relative abundance type file in the current working directory. The
27 relative abundance type files should be in CSV or TSV format and should have
28 the lineage or taxonomy in first column and samples in the subsequent columns.
29 """
30 # Set logging.
31 logging.basicConfig(
32 format="\n"
33 + "=" * 55
34 + "\n%(asctime)s - %(levelname)s\n"
35 + "=" * 55
36 + "\n%(message)s\n\n",
37 level=logging.DEBUG,
38 )
39
40 # Debug print.
41 ppp = pprint.PrettyPrinter(width=55)
42 prog_name = inspect.stack()[0].filename
43
44 parser = argparse.ArgumentParser(
45 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
46 )
47
48 required = parser.add_argument_group("required arguments")
49
50 required.add_argument(
51 "-abn",
52 dest="rel_abn_dir",
53 default=False,
54 required=True,
55 help="Absolute UNIX path to the parent directory that contains the\n"
56 + "abundance type files.",
57 )
58 parser.add_argument(
59 "-op",
60 dest="out_prefix",
61 default="nowayout.tblsum",
62 required=False,
63 help="Set the output file(s) prefix for output(s) generated\nby this program.",
64 )
65 parser.add_argument(
66 "-header",
67 dest="header",
68 action="store_true",
69 default=True,
70 required=False,
71 help="Do the relative abundance files have a header.",
72 )
73 parser.add_argument(
74 "-filepat",
75 dest="file_pat",
76 default="*.lineage_summary.tsv",
77 required=False,
78 help="Files will be searched by this suffix for merged output generation\nby this program.",
79 )
80 parser.add_argument(
81 "-failedfilepat",
82 dest="failed_file_pat",
83 default="*FAILED.txt",
84 required=False,
85 help="Files will be searched by this suffix for merged output generation\nby this program.",
86 )
87 parser.add_argument(
88 "-delim",
89 dest="delim",
90 default="\t",
91 required=False,
92 help="The delimitor by which the fields are separated in the file.",
93 )
94
95 args = parser.parse_args()
96 rel_abn_dir = args.rel_abn_dir
97 is_header = args.header
98 out_prefix = args.out_prefix
99 file_pat = args.file_pat
100 failed_file_pat = args.failed_file_pat
101 delim = args.delim
102 suffix = re.sub(r"^\*", "", file_pat)
103 rel_abn_comb = os.path.join(os.getcwd(), out_prefix + ".txt")
104 rel_abn_files = glob.glob(os.path.join(rel_abn_dir, file_pat))
105 failed_rel_abn_files = glob.glob(os.path.join(rel_abn_dir, failed_file_pat))
106 empty_results = "Relative abundance results did not pass thresholds"
107 sample2lineage, seen_lineage = (defaultdict(defaultdict), defaultdict(int))
108
109 if len(rel_abn_files) == 0:
110 logging.info(
111 "Unable to find any files with .tsv extentsion.\nNow trying .csv extension."
112 )
113 rel_abn_files = glob.glob(os.path.join(rel_abn_dir, "*.csv"))
114 delim = ","
115
116 if len(failed_rel_abn_files) == 0:
117 logging.info(
118 f"Unable to find any files with patttern {failed_file_pat}.\n"
119 + "The failed samples will not appear in the final aggregate file."
120 )
121
122 if rel_abn_dir:
123 if not os.path.isdir(rel_abn_dir):
124 logging.error("UNIX path\n" + f"{rel_abn_dir}\n" + "does not exist!")
125 exit(1)
126 if len(rel_abn_files) <= 0:
127 with open(rel_abn_comb, "w") as rel_abn_comb_fh:
128 rel_abn_comb_fh.write(f"Sample\n{empty_results} in any samples\n")
129 rel_abn_comb_fh.close()
130 exit(0)
131
132 for failed_rel_abn in failed_rel_abn_files:
133 with open(failed_rel_abn, "r") as failed_fh:
134 sample2lineage[failed_fh.readline().strip()].setdefault(
135 "unclassified", []
136 ).append(float("1.0"))
137 failed_fh.close()
138
139 for rel_abn_file in rel_abn_files:
140 sample_name = re.match(r"(^.+?)\..*$", os.path.basename(rel_abn_file))[1]
141
142 with open(rel_abn_file, "r") as rel_abn_fh:
143 if is_header:
144 sample_names = rel_abn_fh.readline().strip().split(delim)[1:]
145 if len(sample_names) > 2:
146 logging.error(
147 "The individual relative abundance file has more "
148 + "\nthan 1 sample. This is rare in the context of running the "
149 + "\n nowayout Nextflow workflow."
150 )
151 exit(1)
152 elif len(sample_names) < 2:
153 sample_name = re.sub(suffix, "", os.path.basename(rel_abn_file))
154 logging.info(
155 "Seems like there is no sample name in the lineage summary file."
156 + f"\nTherefore, sample name has been extracted from file name: {sample_name}."
157 )
158 else:
159 sample_name = sample_names[0]
160
161 for line in rel_abn_fh.readlines():
162 cols = line.strip().split(delim)
163 lineage = cols[0]
164 abn = cols[1]
165 sample2lineage[sample_name].setdefault(lineage, []).append(
166 float(abn)
167 )
168 seen_lineage[lineage] = 1
169
170 with open(rel_abn_comb, "w") as rel_abn_comb_fh:
171 samples = sorted(sample2lineage.keys())
172 rel_abn_comb_fh.write(f"Lineage{delim}" + delim.join(samples) + "\n")
173
174 for lineage in sorted(seen_lineage.keys()):
175 rel_abn_comb_fh.write(lineage)
176 for sample in samples:
177 if lineage in sample2lineage[sample].keys():
178 rel_abn_comb_fh.write(
179 delim
180 + "".join(
181 [str(abn) for abn in sample2lineage[sample][lineage]]
182 )
183 )
184 else:
185 rel_abn_comb_fh.write(f"{delim}0.0")
186 rel_abn_comb_fh.write("\n")
187 rel_abn_comb_fh.close()
188
189
190 if __name__ == "__main__":
191 main()