comparison 0.5.0/bin/create_fasta_and_lineages.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:97cd2f532efe
1 #!/usr/bin/env python3
2
3 import argparse
4 import gzip
5 import inspect
6 import logging
7 import os
8 import pprint
9 import re
10 import shutil
11 import ssl
12 import tempfile
13 from html.parser import HTMLParser
14 from urllib.request import urlopen
15
16 from Bio import SeqIO
17 from Bio.Seq import Seq
18 from Bio.SeqRecord import SeqRecord
19
20
21 # Multiple inheritence for pretty printing of help text.
22 class MultiArgFormatClasses(
23 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
24 ):
25 pass
26
27
28 # HTMLParser override class to get fna.gz and gbff.gz
29 class NCBIHTMLParser(HTMLParser):
30 def __init__(self, *, convert_charrefs: bool = ...) -> None:
31 super().__init__(convert_charrefs=convert_charrefs)
32 self.reset()
33 self.href_data = list()
34
35 def handle_data(self, data):
36 self.href_data.append(data)
37
38
39 # Download organelle FASTA and GenBank file.
40 def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike:
41 """
42 Method to save .fna.gz and .gbff.gz files for the
43 RefSeq mitochondrion release.
44 """
45 contxt = ssl.create_default_context()
46 contxt.check_hostname = False
47 contxt.verify_mode = ssl.CERT_NONE
48
49 if url == None:
50 logging.error(
51 "Please provide the base URL where .fna.gz and .gbff.gz"
52 + "\nfiles for RefSeq mitochondrion can be found."
53 )
54 exit(1)
55
56 if os.path.exists(out):
57 for file in os.listdir(out):
58 file_path = os.path.join(out, file)
59
60 if suffix.match(file_path) and os.path.getsize(file_path) > 0:
61 logging.info(
62 f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]"
63 + " already exists.\nSkipping download from NCBI..."
64 + "\nPlease use -f to delete and overwrite."
65 )
66 return file_path
67 else:
68 os.makedirs(out)
69
70 html_parser = NCBIHTMLParser()
71 logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}")
72
73 with urlopen(url, context=contxt) as response:
74 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
75 shutil.copyfileobj(response, tmp_html_file)
76
77 with open(tmp_html_file.name, "r") as html:
78 html_parser.feed("".join(html.readlines()))
79
80 file = suffix.search("".join(html_parser.href_data)).group(0)
81 file_url = "/".join([url, file + ".gz"])
82 file_at = os.path.join(out, file)
83
84 logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}")
85
86 logging.info(f"Saving to:\n{file_at}")
87
88 with tempfile.NamedTemporaryFile(delete=False) as tmp_gz:
89 with urlopen(file_url, context=contxt) as response:
90 tmp_gz.write(response.read())
91
92 with open(file_at, "w") as fh:
93 with gzip.open(tmp_gz.name, "rb") as web_gz:
94 fh.write(web_gz.read().decode("utf-8"))
95
96 html.close()
97 tmp_gz.close()
98 tmp_html_file.close()
99 os.unlink(tmp_gz.name)
100 os.unlink(tmp_html_file.name)
101 fh.close()
102 web_gz.close()
103 response.close()
104
105 return file_at
106
107
108 def get_lineages(csv: os.PathLike, cols: list) -> list:
109 """
110 Parse the output from `ncbitax2lin` tool and
111 return a dict of lineages where the key is
112 genusspeciesstrain.
113 """
114 lineages = dict()
115 if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0):
116 logging.error(
117 f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!"
118 )
119 exit(1)
120
121 logging.info(f"Indexing {os.path.basename(csv)}...")
122
123 with open(csv, "r") as csv_fh:
124 header_cols = csv_fh.readline().strip().split(",")
125 user_req_cols = [
126 tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols
127 ]
128 cols_not_found = [tcol for tcol in cols if tcol not in header_cols]
129 raw_recs = 0
130
131 if len(cols_not_found) > 0:
132 logging.error(
133 f"The following columns do not exist in the"
134 + f"\nCSV file [ {os.path.basename(csv)} ]:\n"
135 + "".join(cols_not_found)
136 )
137 exit(1)
138 elif len(user_req_cols) > 9:
139 logging.error(
140 f"Only a total of 9 columns are needed!"
141 + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain"
142 )
143 exit(1)
144
145 for tax in csv_fh:
146 raw_recs += 1
147 lcols = tax.strip().split(",")
148
149 if bool(lcols[user_req_cols[8]]):
150 lineages[lcols[user_req_cols[8]]] = ",".join(
151 [lcols[l] for l in user_req_cols[1:]]
152 )
153 elif bool(lcols[user_req_cols[7]]):
154 lineages[lcols[user_req_cols[7]]] = ",".join(
155 [lcols[l] for l in user_req_cols[1:8]] + [str()]
156 )
157
158 csv_fh.close()
159 return lineages, raw_recs
160
161
162 def from_genbank(gbk: os.PathLike, min_len: int) -> dict:
163 """
164 Method to parse GenBank file and return
165 organism to latest accession mapping.
166 """
167 accs2orgs = dict()
168
169 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0):
170 logging.info(
171 f"The GenBank file [{os.path.basename(gbk)}] does not exist"
172 + "\nor is of size 0."
173 )
174 exit(1)
175
176 logging.info(f"Indexing {os.path.basename(gbk)}...")
177
178 # a = open("./_accs", "w")
179 for record in SeqIO.parse(gbk, "genbank"):
180 if len(record.seq) < min_len:
181 continue
182 else:
183 # a.write(f"{record.id}\n")
184 accs2orgs[record.id] = record.annotations["organism"]
185
186 return accs2orgs
187
188
189 def from_genbank_alt(gbk: os.PathLike) -> dict:
190 """
191 Method to parse GenBank file and return
192 organism to latest accession mapping without
193 using BioPython's GenBank Scanner
194 """
195 accs2orgs = dict()
196 accs = dict()
197 orgs = dict()
198 acc = False
199 acc_pat = re.compile(r"^VERSION\s+(.+)")
200 org_pat = re.compile(r"^\s+ORGANISM\s+(.+)")
201
202 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0):
203 logging.info(
204 f"The GenBank file [{os.path.basename(gbk)}] does not exist"
205 + "\nor is of size 0."
206 )
207 exit(1)
208
209 logging.info(
210 f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..."
211 )
212
213 with open(gbk, "r") as gbk_fh:
214 for line in gbk_fh:
215 line = line.rstrip()
216 if line.startswith("VERSION") and acc_pat.match(line):
217 acc = acc_pat.match(line).group(1)
218 accs[acc] = 1
219 if org_pat.match(line):
220 if acc and acc not in orgs.keys():
221 orgs[acc] = org_pat.match(line).group(1)
222 elif acc and acc in orgs.keys():
223 logging.error(f"Duplicate VERSION line: {acc}")
224 exit(1)
225 if len(accs.keys()) != len(orgs.keys()):
226 logging.error(
227 f"Got unequal number of organisms ({len(orgs.keys())})\n"
228 + f"and accessions ({len(accs.keys())})"
229 )
230 exit(1)
231 else:
232 for acc in accs.keys():
233 if acc not in orgs.keys():
234 logging.error(f"ORAGANISM not found for accession: {acc}")
235 exit(1)
236 accs2orgs[acc] = orgs[acc]
237
238 gbk_fh.close()
239 return accs2orgs
240
241
242 def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None:
243 """
244 Write sequence with no description to specified file.
245 """
246 SeqIO.write(
247 SeqRecord(Seq(seq), id=id, description=str()),
248 os.path.join(basedir, id + suffix),
249 "fasta",
250 )
251
252
253 # Main
254 def main() -> None:
255 """
256 This script takes:
257 1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files.
258 2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`.
259
260 and then generates a folder containing individual FASTA sequence files
261 per organelle, and a corresponding lineage file in CSV format.
262 """
263
264 # Set logging.
265 logging.basicConfig(
266 format="\n"
267 + "=" * 55
268 + "\n%(asctime)s - %(levelname)s\n"
269 + "=" * 55
270 + "\n%(message)s\n\n",
271 level=logging.DEBUG,
272 )
273
274 # Debug print.
275 ppp = pprint.PrettyPrinter(width=55)
276 prog_name = os.path.basename(inspect.stack()[0].filename)
277
278 parser = argparse.ArgumentParser(
279 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
280 )
281
282 required = parser.add_argument_group("required arguments")
283
284 required.add_argument(
285 "-csv",
286 dest="csv",
287 default=False,
288 required=True,
289 help="Absolute UNIX path to .csv or .csv.gz file which is generated "
290 + "\nby the `ncbitax2lin` tool.",
291 )
292 parser.add_argument(
293 "-cols",
294 dest="lineage_cols",
295 default="tax_id,superkingdom,phylum,class,order,family,genus,species,strain",
296 required=False,
297 help="Taxonomic lineage will be built using these columns from the output of"
298 + "\n`ncbitax2lin` tool.",
299 )
300 parser.add_argument(
301 "-url",
302 dest="url",
303 default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion",
304 required=False,
305 help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.",
306 )
307 parser.add_argument(
308 "-out",
309 dest="out_folder",
310 default=os.path.join(os.getcwd(), "organelles"),
311 required=False,
312 help="By default, the output is written to this folder.",
313 )
314 parser.add_argument(
315 "-f",
316 dest="force_write_out",
317 default=False,
318 action="store_true",
319 required=False,
320 help="Force overwrite output directory contents.",
321 )
322 parser.add_argument(
323 "--fna-suffix",
324 dest="fna_suffix",
325 default=".fna",
326 required=False,
327 help="Suffix of the individual organelle FASTA files that will be saved.",
328 )
329 parser.add_argument(
330 "-ml",
331 dest="fa_min_len",
332 default=200,
333 required=False,
334 help="Minimum length of the FASTA sequence for it to be considered for"
335 + "\nfurther processing",
336 )
337 parser.add_argument(
338 "--skip-per-fa",
339 dest="skip_per_fa",
340 default=False,
341 required=False,
342 action="store_true",
343 help="Do not generate per sequence FASTA file.",
344 )
345 parser.add_argument(
346 "--alt-gb-parser",
347 dest="alt_gb_parser",
348 default=False,
349 required=False,
350 action="store_true",
351 help="Use alternate GenBank parser instead of BioPython's.",
352 )
353
354 # Parse defaults
355 args = parser.parse_args()
356 csv = args.csv
357 out = args.out_folder
358 overwrite = args.force_write_out
359 fna_suffix = args.fna_suffix
360 url = args.url
361 tax_cols = args.lineage_cols
362 skip_per_fa = args.skip_per_fa
363 alt_gb_parser = args.alt_gb_parser
364 min_len = int(args.fa_min_len)
365 tcols_pat = re.compile(r"^[\w\,]+?\w$")
366 mito_fna_suffix = re.compile(r".*?\.genomic\.fna")
367 mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff")
368 final_lineages = os.path.join(out, "lineages.csv")
369 lineages_not_found = os.path.join(out, "lineages_not_found.csv")
370 base_fasta_dir = os.path.join(out, "fasta")
371
372 # Basic checks
373 if not overwrite and os.path.exists(out):
374 logging.warning(
375 f"Output destination [{os.path.basename(out)}] already exists!"
376 + "\nPlease use -f to delete and overwrite."
377 )
378 elif overwrite and os.path.exists(out):
379 logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...")
380 shutil.rmtree(out)
381
382 if not tcols_pat.match(tax_cols):
383 logging.error(
384 f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma."
385 )
386 exit(1)
387 else:
388 tax_cols = re.sub("\n", "", tax_cols).split(",")
389
390 # Get .fna and .gbk files
391 fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out)
392 gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out)
393
394 # Get taxonomy from ncbitax2lin
395 lineages, raw_recs = get_lineages(csv, tax_cols)
396
397 # Get parsed organisms and latest accession from GenBank file.
398 if alt_gb_parser:
399 accs2orgs = from_genbank_alt(gbk)
400 else:
401 accs2orgs = from_genbank(gbk, min_len)
402
403 # # Finally, read FASTA and create individual FASTA if lineage exists.
404 logging.info(f"Creating new sequences and lineages...")
405
406 l_fh = open(final_lineages, "w")
407 ln_fh = open(lineages_not_found, "w")
408 l_fh.write(
409 "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n"
410 )
411 ln_fh.write("fna_id,gbk_org\n")
412 passed_lookup = 0
413 failed_lookup = 0
414 gbk_recs_missing = 0
415 skipped_len_short = 0
416
417 if not os.path.exists(base_fasta_dir):
418 os.makedirs(base_fasta_dir)
419
420 for record in SeqIO.parse(fna, "fasta"):
421 if len(record.seq) < min_len:
422 skipped_len_short += 1
423 continue
424 elif record.id in accs2orgs.keys():
425 org_words = accs2orgs[record.id].split(" ")
426 else:
427 gbk_recs_missing += 1
428 continue
429
430 genus_species = (
431 " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:])
432 )
433
434 if not skip_per_fa:
435 write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix)
436
437 if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys():
438 l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n")
439 passed_lookup += 1
440 elif record.id in accs2orgs.keys() and genus_species in lineages.keys():
441 if len(org_words) > 2:
442 l_fh.write(
443 ",".join(
444 [
445 record.id,
446 lineages[genus_species].rstrip(","),
447 accs2orgs[record.id],
448 ]
449 )
450 + "\n"
451 )
452 else:
453 l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n")
454 passed_lookup += 1
455 else:
456 if len(org_words) > 2:
457 l_fh.write(
458 ",".join(
459 [
460 record.id,
461 "",
462 "",
463 "",
464 "",
465 "",
466 org_words[0],
467 org_words[0] + " " + org_words[1],
468 accs2orgs[record.id],
469 ]
470 )
471 + "\n"
472 )
473 else:
474 l_fh.write(
475 ",".join(
476 [
477 record.id,
478 "",
479 "",
480 "",
481 "",
482 "",
483 org_words[0],
484 accs2orgs[record.id],
485 "",
486 ]
487 )
488 + "\n"
489 )
490 ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n")
491 failed_lookup += 1
492
493 logging.info(
494 f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}"
495 + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}"
496 + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}"
497 + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}"
498 + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}"
499 + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}"
500 + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}"
501 + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}"
502 )
503
504 if (passed_lookup + failed_lookup) != len(accs2orgs.keys()):
505 logging.error(
506 f"The number of FASTA records written [{len(accs2orgs.keys())}]"
507 + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!"
508 )
509 exit(1)
510 else:
511 logging.info("Succesfully created lineages and FASTA records! Done!!")
512
513 l_fh.close()
514 ln_fh.close()
515
516
517 if __name__ == "__main__":
518 main()