|
0
|
1 #!/usr/bin/env python3
|
|
|
2
|
|
|
3 import argparse
|
|
|
4 import gzip
|
|
|
5 import inspect
|
|
|
6 import logging
|
|
|
7 import os
|
|
|
8 import pprint
|
|
|
9 import shutil
|
|
|
10 from typing import BinaryIO, TextIO, Union
|
|
|
11
|
|
|
12 from Bio import SeqIO
|
|
|
13 from Bio.Seq import Seq
|
|
|
14 from Bio.SeqRecord import SeqRecord
|
|
|
15 from genericpath import isdir
|
|
|
16
|
|
|
17
|
|
|
18 # Multiple inheritence for pretty printing of help text.
|
|
|
19 class MultiArgFormatClasses(
|
|
|
20 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
|
|
|
21 ):
|
|
|
22 pass
|
|
|
23
|
|
|
24
|
|
|
25 def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None:
|
|
|
26 """
|
|
|
27 Write sequence with no description to specified file.
|
|
|
28 """
|
|
|
29 SeqIO.write(
|
|
|
30 SeqRecord(Seq(seq), id=id, description=str()),
|
|
|
31 fh,
|
|
|
32 "fasta",
|
|
|
33 )
|
|
|
34
|
|
|
35
|
|
|
36 # Main
|
|
|
37 def main() -> None:
|
|
|
38 """
|
|
|
39 This script takes:
|
|
|
40 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and
|
|
|
41
|
|
|
42 and then generates a new FASTA file with duplicate FASTA IDs replaced
|
|
|
43 with a unique ID.
|
|
|
44 """
|
|
|
45
|
|
|
46 # Set logging.
|
|
|
47 logging.basicConfig(
|
|
|
48 format="\n"
|
|
|
49 + "=" * 55
|
|
|
50 + "\n%(asctime)s - %(levelname)s\n"
|
|
|
51 + "=" * 55
|
|
|
52 + "\n%(message)s\n\n",
|
|
|
53 level=logging.DEBUG,
|
|
|
54 )
|
|
|
55
|
|
|
56 # Debug print.
|
|
|
57 ppp = pprint.PrettyPrinter(width=55)
|
|
|
58 prog_name = os.path.basename(inspect.stack()[0].filename)
|
|
|
59
|
|
|
60 parser = argparse.ArgumentParser(
|
|
|
61 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
|
|
|
62 )
|
|
|
63
|
|
|
64 required = parser.add_argument_group("required arguments")
|
|
|
65
|
|
|
66 required.add_argument(
|
|
|
67 "-fna",
|
|
|
68 dest="fna",
|
|
|
69 default=False,
|
|
|
70 required=True,
|
|
|
71 help="Absolute UNIX path to .fna or .fna.gz file.",
|
|
|
72 )
|
|
|
73 parser.add_argument(
|
|
|
74 "-lin",
|
|
|
75 dest="lineages",
|
|
|
76 default=False,
|
|
|
77 required=False,
|
|
|
78 help="Absolute UNIX path to lineages.csv file for which the"
|
|
|
79 + "\nthe duplicate IDs will be made unique corresponding to"
|
|
|
80 + "\nthe FASTA IDs",
|
|
|
81 )
|
|
|
82 parser.add_argument(
|
|
|
83 "-outdir",
|
|
|
84 dest="out_folder",
|
|
|
85 default=os.getcwd(),
|
|
|
86 required=False,
|
|
|
87 help="By default, the output is written to this\nfolder.",
|
|
|
88 )
|
|
|
89 parser.add_argument(
|
|
|
90 "-f",
|
|
|
91 dest="force_write_out",
|
|
|
92 default=False,
|
|
|
93 action="store_true",
|
|
|
94 required=False,
|
|
|
95 help="Force overwrite the output file.",
|
|
|
96 )
|
|
|
97 parser.add_argument(
|
|
|
98 "--fna-suffix",
|
|
|
99 dest="fna_suffix",
|
|
|
100 default=".fna",
|
|
|
101 required=False,
|
|
|
102 help="Suffix of the output FASTA file.",
|
|
|
103 )
|
|
|
104
|
|
|
105 # Parse defaults
|
|
|
106 args = parser.parse_args()
|
|
|
107 fna = args.fna
|
|
|
108 lineages = args.lineages
|
|
|
109 outdir = args.out_folder
|
|
|
110 overwrite = args.force_write_out
|
|
|
111 fna_suffix = args.fna_suffix
|
|
|
112 new_fna = os.path.join(
|
|
|
113 outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix
|
|
|
114 )
|
|
|
115 lin_header = False
|
|
|
116 new_lin = False
|
|
|
117 seen_ids = dict()
|
|
|
118 seen_lineages = dict()
|
|
|
119
|
|
|
120 # Basic checks
|
|
|
121 if not overwrite and os.path.exists(new_fna):
|
|
|
122 logging.warning(
|
|
|
123 f"Output destination [{os.path.basename(new_fna)}] already exists!"
|
|
|
124 + "\nPlease use -f to delete and overwrite."
|
|
|
125 )
|
|
|
126 elif overwrite and os.path.exists(new_fna):
|
|
|
127 logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...")
|
|
|
128 if os.path.isdir(new_fna):
|
|
|
129 shutil.rmtree(new_fna)
|
|
|
130 else:
|
|
|
131 os.remove(new_fna)
|
|
|
132
|
|
|
133 # Prepare for writing
|
|
|
134 new_fna_fh = open(new_fna, "+at")
|
|
|
135
|
|
|
136 # If lineages file is mentioned, index it.
|
|
|
137 if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0:
|
|
|
138 new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv")
|
|
|
139 new_lin_fh = open(new_lin, "w")
|
|
|
140 with open(lineages, "r") as l_fh:
|
|
|
141 lin_header = l_fh.readline()
|
|
|
142 for line in l_fh:
|
|
|
143 cols = line.strip().split(",")
|
|
|
144 if len(cols) < 9:
|
|
|
145 logging.error(
|
|
|
146 f"The row in the lineages file {os.path.basename(lineages)}"
|
|
|
147 + f"\ndoes not have 9 required columns: {len(cols)}"
|
|
|
148 + f"\n\n{lin_header.strip()}\n{line.strip()}"
|
|
|
149 )
|
|
|
150 exit(1)
|
|
|
151 elif len(cols) > 9:
|
|
|
152 logging.info(
|
|
|
153 f"The row in the lineages file {os.path.basename(lineages)}"
|
|
|
154 + f"\nhas more than 9 required columns: {len(cols)}"
|
|
|
155 + f"\nRetaining only 9 columns of the following 10 columns."
|
|
|
156 + f"\n\n{lin_header.strip()}\n{line.strip()}"
|
|
|
157 )
|
|
|
158
|
|
|
159 if cols[0] not in seen_lineages.keys():
|
|
|
160 seen_lineages[cols[0]] = ",".join(cols[1:9])
|
|
|
161
|
|
|
162 new_lin_fh.write(lin_header)
|
|
|
163 l_fh.close()
|
|
|
164
|
|
|
165 # Read FASTA and create unique FASTA IDs.
|
|
|
166 logging.info(f"Creating new FASTA with unique IDs.")
|
|
|
167 try:
|
|
|
168 fna_fh = gzip.open(fna, "rt")
|
|
|
169 _ = fna_fh.readline()
|
|
|
170 except gzip.BadGzipFile:
|
|
|
171 logging.info(
|
|
|
172 f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format."
|
|
|
173 + "\nAttempting text parsing."
|
|
|
174 )
|
|
|
175 fna_fh = open(fna, "r")
|
|
|
176
|
|
|
177 for record in SeqIO.parse(fna_fh, format="fasta"):
|
|
|
178 seq_id = record.id
|
|
|
179
|
|
|
180 if record.id not in seen_ids.keys():
|
|
|
181 seen_ids[record.id] = 1
|
|
|
182 else:
|
|
|
183 seen_ids[record.id] += 1
|
|
|
184
|
|
|
185 if seen_ids[seq_id] > 1:
|
|
|
186 seq_id = str(record.id) + str(seen_ids[record.id])
|
|
|
187
|
|
|
188 if new_lin:
|
|
|
189 new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n")
|
|
|
190
|
|
|
191 write_fasta(record.seq, seq_id, new_fna_fh)
|
|
|
192
|
|
|
193 if new_lin:
|
|
|
194 new_lin_fh.close()
|
|
|
195
|
|
|
196 logging.info("Done!")
|
|
|
197
|
|
|
198
|
|
|
199 if __name__ == "__main__":
|
|
|
200
|
|
|
201 main()
|