kkonganti@0
|
1 #!/usr/bin/env python3
|
kkonganti@0
|
2
|
kkonganti@0
|
3 import argparse
|
kkonganti@0
|
4 import gzip
|
kkonganti@0
|
5 import inspect
|
kkonganti@0
|
6 import logging
|
kkonganti@0
|
7 import os
|
kkonganti@0
|
8 import pprint
|
kkonganti@0
|
9 import shutil
|
kkonganti@0
|
10 from typing import BinaryIO, TextIO, Union
|
kkonganti@0
|
11
|
kkonganti@0
|
12 from Bio import SeqIO
|
kkonganti@0
|
13 from Bio.Seq import Seq
|
kkonganti@0
|
14 from Bio.SeqRecord import SeqRecord
|
kkonganti@0
|
15 from genericpath import isdir
|
kkonganti@0
|
16
|
kkonganti@0
|
17
|
kkonganti@0
|
18 # Multiple inheritence for pretty printing of help text.
|
kkonganti@0
|
19 class MultiArgFormatClasses(
|
kkonganti@0
|
20 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
|
kkonganti@0
|
21 ):
|
kkonganti@0
|
22 pass
|
kkonganti@0
|
23
|
kkonganti@0
|
24
|
kkonganti@0
|
25 def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None:
|
kkonganti@0
|
26 """
|
kkonganti@0
|
27 Write sequence with no description to specified file.
|
kkonganti@0
|
28 """
|
kkonganti@0
|
29 SeqIO.write(
|
kkonganti@0
|
30 SeqRecord(Seq(seq), id=id, description=str()),
|
kkonganti@0
|
31 fh,
|
kkonganti@0
|
32 "fasta",
|
kkonganti@0
|
33 )
|
kkonganti@0
|
34
|
kkonganti@0
|
35
|
kkonganti@0
|
36 # Main
|
kkonganti@0
|
37 def main() -> None:
|
kkonganti@0
|
38 """
|
kkonganti@0
|
39 This script takes:
|
kkonganti@0
|
40 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and
|
kkonganti@0
|
41
|
kkonganti@0
|
42 and then generates a new FASTA file with duplicate FASTA IDs replaced
|
kkonganti@0
|
43 with a unique ID.
|
kkonganti@0
|
44 """
|
kkonganti@0
|
45
|
kkonganti@0
|
46 # Set logging.
|
kkonganti@0
|
47 logging.basicConfig(
|
kkonganti@0
|
48 format="\n"
|
kkonganti@0
|
49 + "=" * 55
|
kkonganti@0
|
50 + "\n%(asctime)s - %(levelname)s\n"
|
kkonganti@0
|
51 + "=" * 55
|
kkonganti@0
|
52 + "\n%(message)s\n\n",
|
kkonganti@0
|
53 level=logging.DEBUG,
|
kkonganti@0
|
54 )
|
kkonganti@0
|
55
|
kkonganti@0
|
56 # Debug print.
|
kkonganti@0
|
57 ppp = pprint.PrettyPrinter(width=55)
|
kkonganti@0
|
58 prog_name = os.path.basename(inspect.stack()[0].filename)
|
kkonganti@0
|
59
|
kkonganti@0
|
60 parser = argparse.ArgumentParser(
|
kkonganti@0
|
61 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
|
kkonganti@0
|
62 )
|
kkonganti@0
|
63
|
kkonganti@0
|
64 required = parser.add_argument_group("required arguments")
|
kkonganti@0
|
65
|
kkonganti@0
|
66 required.add_argument(
|
kkonganti@0
|
67 "-fna",
|
kkonganti@0
|
68 dest="fna",
|
kkonganti@0
|
69 default=False,
|
kkonganti@0
|
70 required=True,
|
kkonganti@0
|
71 help="Absolute UNIX path to .fna or .fna.gz file.",
|
kkonganti@0
|
72 )
|
kkonganti@0
|
73 parser.add_argument(
|
kkonganti@0
|
74 "-lin",
|
kkonganti@0
|
75 dest="lineages",
|
kkonganti@0
|
76 default=False,
|
kkonganti@0
|
77 required=False,
|
kkonganti@0
|
78 help="Absolute UNIX path to lineages.csv file for which the"
|
kkonganti@0
|
79 + "\nthe duplicate IDs will be made unique corresponding to"
|
kkonganti@0
|
80 + "\nthe FASTA IDs",
|
kkonganti@0
|
81 )
|
kkonganti@0
|
82 parser.add_argument(
|
kkonganti@0
|
83 "-outdir",
|
kkonganti@0
|
84 dest="out_folder",
|
kkonganti@0
|
85 default=os.getcwd(),
|
kkonganti@0
|
86 required=False,
|
kkonganti@0
|
87 help="By default, the output is written to this\nfolder.",
|
kkonganti@0
|
88 )
|
kkonganti@0
|
89 parser.add_argument(
|
kkonganti@0
|
90 "-f",
|
kkonganti@0
|
91 dest="force_write_out",
|
kkonganti@0
|
92 default=False,
|
kkonganti@0
|
93 action="store_true",
|
kkonganti@0
|
94 required=False,
|
kkonganti@0
|
95 help="Force overwrite the output file.",
|
kkonganti@0
|
96 )
|
kkonganti@0
|
97 parser.add_argument(
|
kkonganti@0
|
98 "--fna-suffix",
|
kkonganti@0
|
99 dest="fna_suffix",
|
kkonganti@0
|
100 default=".fna",
|
kkonganti@0
|
101 required=False,
|
kkonganti@0
|
102 help="Suffix of the output FASTA file.",
|
kkonganti@0
|
103 )
|
kkonganti@0
|
104
|
kkonganti@0
|
105 # Parse defaults
|
kkonganti@0
|
106 args = parser.parse_args()
|
kkonganti@0
|
107 fna = args.fna
|
kkonganti@0
|
108 lineages = args.lineages
|
kkonganti@0
|
109 outdir = args.out_folder
|
kkonganti@0
|
110 overwrite = args.force_write_out
|
kkonganti@0
|
111 fna_suffix = args.fna_suffix
|
kkonganti@0
|
112 new_fna = os.path.join(
|
kkonganti@0
|
113 outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix
|
kkonganti@0
|
114 )
|
kkonganti@0
|
115 lin_header = False
|
kkonganti@0
|
116 new_lin = False
|
kkonganti@0
|
117 seen_ids = dict()
|
kkonganti@0
|
118 seen_lineages = dict()
|
kkonganti@0
|
119
|
kkonganti@0
|
120 # Basic checks
|
kkonganti@0
|
121 if not overwrite and os.path.exists(new_fna):
|
kkonganti@0
|
122 logging.warning(
|
kkonganti@0
|
123 f"Output destination [{os.path.basename(new_fna)}] already exists!"
|
kkonganti@0
|
124 + "\nPlease use -f to delete and overwrite."
|
kkonganti@0
|
125 )
|
kkonganti@0
|
126 elif overwrite and os.path.exists(new_fna):
|
kkonganti@0
|
127 logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...")
|
kkonganti@0
|
128 if os.path.isdir(new_fna):
|
kkonganti@0
|
129 shutil.rmtree(new_fna)
|
kkonganti@0
|
130 else:
|
kkonganti@0
|
131 os.remove(new_fna)
|
kkonganti@0
|
132
|
kkonganti@0
|
133 # Prepare for writing
|
kkonganti@0
|
134 new_fna_fh = open(new_fna, "+at")
|
kkonganti@0
|
135
|
kkonganti@0
|
136 # If lineages file is mentioned, index it.
|
kkonganti@0
|
137 if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0:
|
kkonganti@0
|
138 new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv")
|
kkonganti@0
|
139 new_lin_fh = open(new_lin, "w")
|
kkonganti@0
|
140 with open(lineages, "r") as l_fh:
|
kkonganti@0
|
141 lin_header = l_fh.readline()
|
kkonganti@0
|
142 for line in l_fh:
|
kkonganti@0
|
143 cols = line.strip().split(",")
|
kkonganti@0
|
144 if len(cols) < 9:
|
kkonganti@0
|
145 logging.error(
|
kkonganti@0
|
146 f"The row in the lineages file {os.path.basename(lineages)}"
|
kkonganti@0
|
147 + f"\ndoes not have 9 required columns: {len(cols)}"
|
kkonganti@0
|
148 + f"\n\n{lin_header.strip()}\n{line.strip()}"
|
kkonganti@0
|
149 )
|
kkonganti@0
|
150 exit(1)
|
kkonganti@0
|
151 elif len(cols) > 9:
|
kkonganti@0
|
152 logging.info(
|
kkonganti@0
|
153 f"The row in the lineages file {os.path.basename(lineages)}"
|
kkonganti@0
|
154 + f"\nhas more than 9 required columns: {len(cols)}"
|
kkonganti@0
|
155 + f"\nRetaining only 9 columns of the following 10 columns."
|
kkonganti@0
|
156 + f"\n\n{lin_header.strip()}\n{line.strip()}"
|
kkonganti@0
|
157 )
|
kkonganti@0
|
158
|
kkonganti@0
|
159 if cols[0] not in seen_lineages.keys():
|
kkonganti@0
|
160 seen_lineages[cols[0]] = ",".join(cols[1:9])
|
kkonganti@0
|
161
|
kkonganti@0
|
162 new_lin_fh.write(lin_header)
|
kkonganti@0
|
163 l_fh.close()
|
kkonganti@0
|
164
|
kkonganti@0
|
165 # Read FASTA and create unique FASTA IDs.
|
kkonganti@0
|
166 logging.info(f"Creating new FASTA with unique IDs.")
|
kkonganti@0
|
167 try:
|
kkonganti@0
|
168 fna_fh = gzip.open(fna, "rt")
|
kkonganti@0
|
169 _ = fna_fh.readline()
|
kkonganti@0
|
170 except gzip.BadGzipFile:
|
kkonganti@0
|
171 logging.info(
|
kkonganti@0
|
172 f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format."
|
kkonganti@0
|
173 + "\nAttempting text parsing."
|
kkonganti@0
|
174 )
|
kkonganti@0
|
175 fna_fh = open(fna, "r")
|
kkonganti@0
|
176
|
kkonganti@0
|
177 for record in SeqIO.parse(fna_fh, format="fasta"):
|
kkonganti@0
|
178 seq_id = record.id
|
kkonganti@0
|
179
|
kkonganti@0
|
180 if record.id not in seen_ids.keys():
|
kkonganti@0
|
181 seen_ids[record.id] = 1
|
kkonganti@0
|
182 else:
|
kkonganti@0
|
183 seen_ids[record.id] += 1
|
kkonganti@0
|
184
|
kkonganti@0
|
185 if seen_ids[seq_id] > 1:
|
kkonganti@0
|
186 seq_id = str(record.id) + str(seen_ids[record.id])
|
kkonganti@0
|
187
|
kkonganti@0
|
188 if new_lin:
|
kkonganti@0
|
189 new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n")
|
kkonganti@0
|
190
|
kkonganti@0
|
191 write_fasta(record.seq, seq_id, new_fna_fh)
|
kkonganti@0
|
192
|
kkonganti@0
|
193 if new_lin:
|
kkonganti@0
|
194 new_lin_fh.close()
|
kkonganti@0
|
195
|
kkonganti@0
|
196 logging.info("Done!")
|
kkonganti@0
|
197
|
kkonganti@0
|
198
|
kkonganti@0
|
199 if __name__ == "__main__":
|
kkonganti@0
|
200
|
kkonganti@0
|
201 main()
|