comparison 0.5.0/bin/remove_dup_fasta_ids.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:97cd2f532efe
1 #!/usr/bin/env python3
2
3 import argparse
4 import gzip
5 import inspect
6 import logging
7 import os
8 import pprint
9 import shutil
10 from typing import BinaryIO, TextIO, Union
11
12 from Bio import SeqIO
13 from Bio.Seq import Seq
14 from Bio.SeqRecord import SeqRecord
15 from genericpath import isdir
16
17
18 # Multiple inheritence for pretty printing of help text.
19 class MultiArgFormatClasses(
20 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
21 ):
22 pass
23
24
25 def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None:
26 """
27 Write sequence with no description to specified file.
28 """
29 SeqIO.write(
30 SeqRecord(Seq(seq), id=id, description=str()),
31 fh,
32 "fasta",
33 )
34
35
36 # Main
37 def main() -> None:
38 """
39 This script takes:
40 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and
41
42 and then generates a new FASTA file with duplicate FASTA IDs replaced
43 with a unique ID.
44 """
45
46 # Set logging.
47 logging.basicConfig(
48 format="\n"
49 + "=" * 55
50 + "\n%(asctime)s - %(levelname)s\n"
51 + "=" * 55
52 + "\n%(message)s\n\n",
53 level=logging.DEBUG,
54 )
55
56 # Debug print.
57 ppp = pprint.PrettyPrinter(width=55)
58 prog_name = os.path.basename(inspect.stack()[0].filename)
59
60 parser = argparse.ArgumentParser(
61 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
62 )
63
64 required = parser.add_argument_group("required arguments")
65
66 required.add_argument(
67 "-fna",
68 dest="fna",
69 default=False,
70 required=True,
71 help="Absolute UNIX path to .fna or .fna.gz file.",
72 )
73 parser.add_argument(
74 "-lin",
75 dest="lineages",
76 default=False,
77 required=False,
78 help="Absolute UNIX path to lineages.csv file for which the"
79 + "\nthe duplicate IDs will be made unique corresponding to"
80 + "\nthe FASTA IDs",
81 )
82 parser.add_argument(
83 "-outdir",
84 dest="out_folder",
85 default=os.getcwd(),
86 required=False,
87 help="By default, the output is written to this\nfolder.",
88 )
89 parser.add_argument(
90 "-f",
91 dest="force_write_out",
92 default=False,
93 action="store_true",
94 required=False,
95 help="Force overwrite the output file.",
96 )
97 parser.add_argument(
98 "--fna-suffix",
99 dest="fna_suffix",
100 default=".fna",
101 required=False,
102 help="Suffix of the output FASTA file.",
103 )
104
105 # Parse defaults
106 args = parser.parse_args()
107 fna = args.fna
108 lineages = args.lineages
109 outdir = args.out_folder
110 overwrite = args.force_write_out
111 fna_suffix = args.fna_suffix
112 new_fna = os.path.join(
113 outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix
114 )
115 lin_header = False
116 new_lin = False
117 seen_ids = dict()
118 seen_lineages = dict()
119
120 # Basic checks
121 if not overwrite and os.path.exists(new_fna):
122 logging.warning(
123 f"Output destination [{os.path.basename(new_fna)}] already exists!"
124 + "\nPlease use -f to delete and overwrite."
125 )
126 elif overwrite and os.path.exists(new_fna):
127 logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...")
128 if os.path.isdir(new_fna):
129 shutil.rmtree(new_fna)
130 else:
131 os.remove(new_fna)
132
133 # Prepare for writing
134 new_fna_fh = open(new_fna, "+at")
135
136 # If lineages file is mentioned, index it.
137 if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0:
138 new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv")
139 new_lin_fh = open(new_lin, "w")
140 with open(lineages, "r") as l_fh:
141 lin_header = l_fh.readline()
142 for line in l_fh:
143 cols = line.strip().split(",")
144 if len(cols) < 9:
145 logging.error(
146 f"The row in the lineages file {os.path.basename(lineages)}"
147 + f"\ndoes not have 9 required columns: {len(cols)}"
148 + f"\n\n{lin_header.strip()}\n{line.strip()}"
149 )
150 exit(1)
151 elif len(cols) > 9:
152 logging.info(
153 f"The row in the lineages file {os.path.basename(lineages)}"
154 + f"\nhas more than 9 required columns: {len(cols)}"
155 + f"\nRetaining only 9 columns of the following 10 columns."
156 + f"\n\n{lin_header.strip()}\n{line.strip()}"
157 )
158
159 if cols[0] not in seen_lineages.keys():
160 seen_lineages[cols[0]] = ",".join(cols[1:9])
161
162 new_lin_fh.write(lin_header)
163 l_fh.close()
164
165 # Read FASTA and create unique FASTA IDs.
166 logging.info(f"Creating new FASTA with unique IDs.")
167 try:
168 fna_fh = gzip.open(fna, "rt")
169 _ = fna_fh.readline()
170 except gzip.BadGzipFile:
171 logging.info(
172 f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format."
173 + "\nAttempting text parsing."
174 )
175 fna_fh = open(fna, "r")
176
177 for record in SeqIO.parse(fna_fh, format="fasta"):
178 seq_id = record.id
179
180 if record.id not in seen_ids.keys():
181 seen_ids[record.id] = 1
182 else:
183 seen_ids[record.id] += 1
184
185 if seen_ids[seq_id] > 1:
186 seq_id = str(record.id) + str(seen_ids[record.id])
187
188 if new_lin:
189 new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n")
190
191 write_fasta(record.seq, seq_id, new_fna_fh)
192
193 if new_lin:
194 new_lin_fh.close()
195
196 logging.info("Done!")
197
198
199 if __name__ == "__main__":
200
201 main()