Mercurial > repos > kkonganti > hfp_nowayout
comparison 0.5.0/bin/remove_dup_fasta_ids.py @ 0:97cd2f532efe
planemo upload
author | kkonganti |
---|---|
date | Mon, 31 Mar 2025 14:50:40 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:97cd2f532efe |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import argparse | |
4 import gzip | |
5 import inspect | |
6 import logging | |
7 import os | |
8 import pprint | |
9 import shutil | |
10 from typing import BinaryIO, TextIO, Union | |
11 | |
12 from Bio import SeqIO | |
13 from Bio.Seq import Seq | |
14 from Bio.SeqRecord import SeqRecord | |
15 from genericpath import isdir | |
16 | |
17 | |
18 # Multiple inheritence for pretty printing of help text. | |
19 class MultiArgFormatClasses( | |
20 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter | |
21 ): | |
22 pass | |
23 | |
24 | |
25 def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None: | |
26 """ | |
27 Write sequence with no description to specified file. | |
28 """ | |
29 SeqIO.write( | |
30 SeqRecord(Seq(seq), id=id, description=str()), | |
31 fh, | |
32 "fasta", | |
33 ) | |
34 | |
35 | |
36 # Main | |
37 def main() -> None: | |
38 """ | |
39 This script takes: | |
40 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and | |
41 | |
42 and then generates a new FASTA file with duplicate FASTA IDs replaced | |
43 with a unique ID. | |
44 """ | |
45 | |
46 # Set logging. | |
47 logging.basicConfig( | |
48 format="\n" | |
49 + "=" * 55 | |
50 + "\n%(asctime)s - %(levelname)s\n" | |
51 + "=" * 55 | |
52 + "\n%(message)s\n\n", | |
53 level=logging.DEBUG, | |
54 ) | |
55 | |
56 # Debug print. | |
57 ppp = pprint.PrettyPrinter(width=55) | |
58 prog_name = os.path.basename(inspect.stack()[0].filename) | |
59 | |
60 parser = argparse.ArgumentParser( | |
61 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
62 ) | |
63 | |
64 required = parser.add_argument_group("required arguments") | |
65 | |
66 required.add_argument( | |
67 "-fna", | |
68 dest="fna", | |
69 default=False, | |
70 required=True, | |
71 help="Absolute UNIX path to .fna or .fna.gz file.", | |
72 ) | |
73 parser.add_argument( | |
74 "-lin", | |
75 dest="lineages", | |
76 default=False, | |
77 required=False, | |
78 help="Absolute UNIX path to lineages.csv file for which the" | |
79 + "\nthe duplicate IDs will be made unique corresponding to" | |
80 + "\nthe FASTA IDs", | |
81 ) | |
82 parser.add_argument( | |
83 "-outdir", | |
84 dest="out_folder", | |
85 default=os.getcwd(), | |
86 required=False, | |
87 help="By default, the output is written to this\nfolder.", | |
88 ) | |
89 parser.add_argument( | |
90 "-f", | |
91 dest="force_write_out", | |
92 default=False, | |
93 action="store_true", | |
94 required=False, | |
95 help="Force overwrite the output file.", | |
96 ) | |
97 parser.add_argument( | |
98 "--fna-suffix", | |
99 dest="fna_suffix", | |
100 default=".fna", | |
101 required=False, | |
102 help="Suffix of the output FASTA file.", | |
103 ) | |
104 | |
105 # Parse defaults | |
106 args = parser.parse_args() | |
107 fna = args.fna | |
108 lineages = args.lineages | |
109 outdir = args.out_folder | |
110 overwrite = args.force_write_out | |
111 fna_suffix = args.fna_suffix | |
112 new_fna = os.path.join( | |
113 outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix | |
114 ) | |
115 lin_header = False | |
116 new_lin = False | |
117 seen_ids = dict() | |
118 seen_lineages = dict() | |
119 | |
120 # Basic checks | |
121 if not overwrite and os.path.exists(new_fna): | |
122 logging.warning( | |
123 f"Output destination [{os.path.basename(new_fna)}] already exists!" | |
124 + "\nPlease use -f to delete and overwrite." | |
125 ) | |
126 elif overwrite and os.path.exists(new_fna): | |
127 logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...") | |
128 if os.path.isdir(new_fna): | |
129 shutil.rmtree(new_fna) | |
130 else: | |
131 os.remove(new_fna) | |
132 | |
133 # Prepare for writing | |
134 new_fna_fh = open(new_fna, "+at") | |
135 | |
136 # If lineages file is mentioned, index it. | |
137 if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0: | |
138 new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv") | |
139 new_lin_fh = open(new_lin, "w") | |
140 with open(lineages, "r") as l_fh: | |
141 lin_header = l_fh.readline() | |
142 for line in l_fh: | |
143 cols = line.strip().split(",") | |
144 if len(cols) < 9: | |
145 logging.error( | |
146 f"The row in the lineages file {os.path.basename(lineages)}" | |
147 + f"\ndoes not have 9 required columns: {len(cols)}" | |
148 + f"\n\n{lin_header.strip()}\n{line.strip()}" | |
149 ) | |
150 exit(1) | |
151 elif len(cols) > 9: | |
152 logging.info( | |
153 f"The row in the lineages file {os.path.basename(lineages)}" | |
154 + f"\nhas more than 9 required columns: {len(cols)}" | |
155 + f"\nRetaining only 9 columns of the following 10 columns." | |
156 + f"\n\n{lin_header.strip()}\n{line.strip()}" | |
157 ) | |
158 | |
159 if cols[0] not in seen_lineages.keys(): | |
160 seen_lineages[cols[0]] = ",".join(cols[1:9]) | |
161 | |
162 new_lin_fh.write(lin_header) | |
163 l_fh.close() | |
164 | |
165 # Read FASTA and create unique FASTA IDs. | |
166 logging.info(f"Creating new FASTA with unique IDs.") | |
167 try: | |
168 fna_fh = gzip.open(fna, "rt") | |
169 _ = fna_fh.readline() | |
170 except gzip.BadGzipFile: | |
171 logging.info( | |
172 f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format." | |
173 + "\nAttempting text parsing." | |
174 ) | |
175 fna_fh = open(fna, "r") | |
176 | |
177 for record in SeqIO.parse(fna_fh, format="fasta"): | |
178 seq_id = record.id | |
179 | |
180 if record.id not in seen_ids.keys(): | |
181 seen_ids[record.id] = 1 | |
182 else: | |
183 seen_ids[record.id] += 1 | |
184 | |
185 if seen_ids[seq_id] > 1: | |
186 seq_id = str(record.id) + str(seen_ids[record.id]) | |
187 | |
188 if new_lin: | |
189 new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n") | |
190 | |
191 write_fasta(record.seq, seq_id, new_fna_fh) | |
192 | |
193 if new_lin: | |
194 new_lin_fh.close() | |
195 | |
196 logging.info("Done!") | |
197 | |
198 | |
199 if __name__ == "__main__": | |
200 | |
201 main() |