Mercurial > repos > kkonganti > hfp_nowayout
comparison 0.5.0/bin/gen_sim_abn_table.py @ 0:97cd2f532efe
planemo upload
author | kkonganti |
---|---|
date | Mon, 31 Mar 2025 14:50:40 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:97cd2f532efe |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Kranti Konganti | |
4 | |
5 import argparse | |
6 import glob | |
7 import inspect | |
8 import logging | |
9 import os | |
10 import pprint | |
11 import re | |
12 from collections import defaultdict | |
13 | |
14 | |
15 # Multiple inheritence for pretty printing of help text. | |
16 class MultiArgFormatClasses( | |
17 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter | |
18 ): | |
19 pass | |
20 | |
21 | |
22 # Main | |
23 def main() -> None: | |
24 """ | |
25 This script will take the final taxonomic classification files and create a | |
26 global relative abundance type file in the current working directory. The | |
27 relative abundance type files should be in CSV or TSV format and should have | |
28 the lineage or taxonomy in first column and samples in the subsequent columns. | |
29 """ | |
30 # Set logging. | |
31 logging.basicConfig( | |
32 format="\n" | |
33 + "=" * 55 | |
34 + "\n%(asctime)s - %(levelname)s\n" | |
35 + "=" * 55 | |
36 + "\n%(message)s\n\n", | |
37 level=logging.DEBUG, | |
38 ) | |
39 | |
40 # Debug print. | |
41 ppp = pprint.PrettyPrinter(width=55) | |
42 prog_name = inspect.stack()[0].filename | |
43 | |
44 parser = argparse.ArgumentParser( | |
45 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
46 ) | |
47 | |
48 required = parser.add_argument_group("required arguments") | |
49 | |
50 required.add_argument( | |
51 "-abn", | |
52 dest="rel_abn_dir", | |
53 default=False, | |
54 required=True, | |
55 help="Absolute UNIX path to the parent directory that contains the\n" | |
56 + "abundance type files.", | |
57 ) | |
58 parser.add_argument( | |
59 "-op", | |
60 dest="out_prefix", | |
61 default="nowayout.tblsum", | |
62 required=False, | |
63 help="Set the output file(s) prefix for output(s) generated\nby this program.", | |
64 ) | |
65 parser.add_argument( | |
66 "-header", | |
67 dest="header", | |
68 action="store_true", | |
69 default=True, | |
70 required=False, | |
71 help="Do the relative abundance files have a header.", | |
72 ) | |
73 parser.add_argument( | |
74 "-filepat", | |
75 dest="file_pat", | |
76 default="*.lineage_summary.tsv", | |
77 required=False, | |
78 help="Files will be searched by this suffix for merged output generation\nby this program.", | |
79 ) | |
80 parser.add_argument( | |
81 "-failedfilepat", | |
82 dest="failed_file_pat", | |
83 default="*FAILED.txt", | |
84 required=False, | |
85 help="Files will be searched by this suffix for merged output generation\nby this program.", | |
86 ) | |
87 parser.add_argument( | |
88 "-delim", | |
89 dest="delim", | |
90 default="\t", | |
91 required=False, | |
92 help="The delimitor by which the fields are separated in the file.", | |
93 ) | |
94 | |
95 args = parser.parse_args() | |
96 rel_abn_dir = args.rel_abn_dir | |
97 is_header = args.header | |
98 out_prefix = args.out_prefix | |
99 file_pat = args.file_pat | |
100 failed_file_pat = args.failed_file_pat | |
101 delim = args.delim | |
102 suffix = re.sub(r"^\*", "", file_pat) | |
103 rel_abn_comb = os.path.join(os.getcwd(), out_prefix + ".txt") | |
104 rel_abn_files = glob.glob(os.path.join(rel_abn_dir, file_pat)) | |
105 failed_rel_abn_files = glob.glob(os.path.join(rel_abn_dir, failed_file_pat)) | |
106 empty_results = "Relative abundance results did not pass thresholds" | |
107 sample2lineage, seen_lineage = (defaultdict(defaultdict), defaultdict(int)) | |
108 | |
109 if len(rel_abn_files) == 0: | |
110 logging.info( | |
111 "Unable to find any files with .tsv extentsion.\nNow trying .csv extension." | |
112 ) | |
113 rel_abn_files = glob.glob(os.path.join(rel_abn_dir, "*.csv")) | |
114 delim = "," | |
115 | |
116 if len(failed_rel_abn_files) == 0: | |
117 logging.info( | |
118 f"Unable to find any files with patttern {failed_file_pat}.\n" | |
119 + "The failed samples will not appear in the final aggregate file." | |
120 ) | |
121 | |
122 if rel_abn_dir: | |
123 if not os.path.isdir(rel_abn_dir): | |
124 logging.error("UNIX path\n" + f"{rel_abn_dir}\n" + "does not exist!") | |
125 exit(1) | |
126 if len(rel_abn_files) <= 0: | |
127 with open(rel_abn_comb, "w") as rel_abn_comb_fh: | |
128 rel_abn_comb_fh.write(f"Sample\n{empty_results} in any samples\n") | |
129 rel_abn_comb_fh.close() | |
130 exit(0) | |
131 | |
132 for failed_rel_abn in failed_rel_abn_files: | |
133 with open(failed_rel_abn, "r") as failed_fh: | |
134 sample2lineage[failed_fh.readline().strip()].setdefault( | |
135 "unclassified", [] | |
136 ).append(float("1.0")) | |
137 failed_fh.close() | |
138 | |
139 for rel_abn_file in rel_abn_files: | |
140 sample_name = re.match(r"(^.+?)\..*$", os.path.basename(rel_abn_file))[1] | |
141 | |
142 with open(rel_abn_file, "r") as rel_abn_fh: | |
143 if is_header: | |
144 sample_names = rel_abn_fh.readline().strip().split(delim)[1:] | |
145 if len(sample_names) > 2: | |
146 logging.error( | |
147 "The individual relative abundance file has more " | |
148 + "\nthan 1 sample. This is rare in the context of running the " | |
149 + "\n nowayout Nextflow workflow." | |
150 ) | |
151 exit(1) | |
152 elif len(sample_names) < 2: | |
153 sample_name = re.sub(suffix, "", os.path.basename(rel_abn_file)) | |
154 logging.info( | |
155 "Seems like there is no sample name in the lineage summary file." | |
156 + f"\nTherefore, sample name has been extracted from file name: {sample_name}." | |
157 ) | |
158 else: | |
159 sample_name = sample_names[0] | |
160 | |
161 for line in rel_abn_fh.readlines(): | |
162 cols = line.strip().split(delim) | |
163 lineage = cols[0] | |
164 abn = cols[1] | |
165 sample2lineage[sample_name].setdefault(lineage, []).append( | |
166 float(abn) | |
167 ) | |
168 seen_lineage[lineage] = 1 | |
169 | |
170 with open(rel_abn_comb, "w") as rel_abn_comb_fh: | |
171 samples = sorted(sample2lineage.keys()) | |
172 rel_abn_comb_fh.write(f"Lineage{delim}" + delim.join(samples) + "\n") | |
173 | |
174 for lineage in sorted(seen_lineage.keys()): | |
175 rel_abn_comb_fh.write(lineage) | |
176 for sample in samples: | |
177 if lineage in sample2lineage[sample].keys(): | |
178 rel_abn_comb_fh.write( | |
179 delim | |
180 + "".join( | |
181 [str(abn) for abn in sample2lineage[sample][lineage]] | |
182 ) | |
183 ) | |
184 else: | |
185 rel_abn_comb_fh.write(f"{delim}0.0") | |
186 rel_abn_comb_fh.write("\n") | |
187 rel_abn_comb_fh.close() | |
188 | |
189 | |
190 if __name__ == "__main__": | |
191 main() |