comparison 0.5.0/bin/create_fasta_and_lineages.py @ 0:3c767f9cfd88 draft default tip

planemo upload
author galaxytrakr
date Fri, 29 May 2026 13:37:56 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:3c767f9cfd88
1 #!/usr/bin/env python3
2
3 import argparse
4 import gzip
5 import inspect
6 import logging
7 import os
8 import pprint
9 import re
10 import shutil
11 import ssl
12 import tempfile
13 from html.parser import HTMLParser
14 from urllib.request import urlopen
15
16 from Bio import SeqIO
17 from Bio.Seq import Seq
18 from Bio.SeqRecord import SeqRecord
19
20
21 # Multiple inheritence for pretty printing of help text.
22 class MultiArgFormatClasses(
23 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
24 ):
25 pass
26
27
28 # HTMLParser override class to get fna.gz and gbff.gz
29 class NCBIHTMLParser(HTMLParser):
30 def __init__(self, *, convert_charrefs: bool = ...) -> None:
31 super().__init__(convert_charrefs=convert_charrefs)
32 self.reset()
33 self.href_data = list()
34
35 def handle_data(self, data):
36 self.href_data.append(data)
37
38
39 # Download organelle FASTA and GenBank file.
40 def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike:
41 """
42 Method to save .fna.gz and .gbff.gz files for the
43 RefSeq mitochondrion release.
44 """
45 contxt = ssl.create_default_context()
46 contxt.check_hostname = False
47 contxt.verify_mode = ssl.CERT_NONE
48
49 if url == None:
50 logging.error(
51 "Please provide the base URL where .fna.gz and .gbff.gz"
52 + "\nfiles for RefSeq mitochondrion can be found."
53 )
54 exit(1)
55
56 if os.path.exists(out):
57 for file in os.listdir(out):
58 file_path = os.path.join(out, file)
59
60 if suffix.match(file_path) and os.path.getsize(file_path) > 0:
61 logging.info(
62 f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]"
63 + " already exists.\nSkipping download from NCBI..."
64 + "\nPlease use -f to delete and overwrite."
65 )
66 return file_path
67 else:
68 os.makedirs(out)
69
70 html_parser = NCBIHTMLParser()
71 logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}")
72
73 with urlopen(url, context=contxt) as response:
74 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
75 shutil.copyfileobj(response, tmp_html_file)
76
77 with open(tmp_html_file.name, "r") as html:
78 html_parser.feed("".join(html.readlines()))
79
80 file = suffix.search("".join(html_parser.href_data)).group(0)
81 file_url = "/".join([url, file + ".gz"])
82 file_at = os.path.join(out, file)
83
84 logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}")
85
86 logging.info(f"Saving to:\n{file_at}")
87
88 with tempfile.NamedTemporaryFile(delete=False) as tmp_gz:
89 with urlopen(file_url, context=contxt) as response:
90 tmp_gz.write(response.read())
91
92 with open(file_at, "w") as fh:
93 with gzip.open(tmp_gz.name, "rb") as web_gz:
94 fh.write(web_gz.read().decode("utf-8"))
95
96 html.close()
97 tmp_gz.close()
98 tmp_html_file.close()
99 os.unlink(tmp_gz.name)
100 os.unlink(tmp_html_file.name)
101 fh.close()
102 web_gz.close()
103 response.close()
104
105 return file_at
106
107
108 def get_lineages(csv: os.PathLike, cols: list) -> list:
109 """
110 Parse the output from `ncbitax2lin` tool and
111 return a dict of lineages where the key is
112 genusspeciesstrain.
113 """
114 lineages = dict()
115 if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0):
116 logging.error(
117 f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!"
118 )
119 exit(1)
120
121 logging.info(f"Indexing {os.path.basename(csv)}...")
122
123 with open(csv, "r") as csv_fh:
124 header_cols = csv_fh.readline().strip().split(",")
125 user_req_cols = [
126 tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols
127 ]
128 cols_not_found = [tcol for tcol in cols if tcol not in header_cols]
129 raw_recs = 0
130
131 if len(cols_not_found) > 0:
132 logging.error(
133 f"The following columns do not exist in the"
134 + f"\nCSV file [ {os.path.basename(csv)} ]:\n"
135 + "".join(cols_not_found)
136 )
137 exit(1)
138 elif len(user_req_cols) > 9:
139 logging.error(
140 f"Only a total of 9 columns are needed!"
141 + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain"
142 )
143 exit(1)
144
145 for tax in csv_fh:
146 raw_recs += 1
147 lcols = tax.strip().split(",")
148
149 if bool(lcols[user_req_cols[8]]):
150 lineages[lcols[user_req_cols[8]]] = ",".join(
151 [re.sub(r"[\,\"]", "", lcols[l]) for l in user_req_cols[1:]]
152 )
153 elif bool(lcols[user_req_cols[7]]):
154 lineages[lcols[user_req_cols[7]]] = ",".join(
155 [re.sub(r"[\,\"]", "", lcols[l]) for l in user_req_cols[1:8]]
156 + [str()]
157 )
158
159 if lcols[7] == "Rondeletia bicolor Goode & Bean":
160 print(lineages[lcols[8]])
161 exit(0)
162
163 csv_fh.close()
164 return lineages, raw_recs
165
166
167 def from_genbank(gbk: os.PathLike, min_len: int) -> dict:
168 """
169 Method to parse GenBank file and return
170 organism to latest accession mapping.
171 """
172 accs2orgs = dict()
173 sanitize_pat = re.compile(r"[\,\"]")
174
175 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0):
176 logging.info(
177 f"The GenBank file [{os.path.basename(gbk)}] does not exist"
178 + "\nor is of size 0."
179 )
180 exit(1)
181
182 logging.info(f"Indexing {os.path.basename(gbk)}...")
183
184 # a = open("./_accs", "w")
185 try:
186 for record in SeqIO.parse(gbk, "genbank"):
187 if len(record.seq) < min_len:
188 continue
189 else:
190 # a.write(f"{record.id}\n")
191 accs2orgs[record.id] = sanitize_pat.sub(
192 "", record.annotations["organism"]
193 )
194 except Exception as e:
195 logging.error(f"Error occured around GenBank ID: {record.id}")
196 logging.error(f"Error: {e}")
197 exit(1)
198
199 return accs2orgs
200
201
202 def from_genbank_alt(gbk: os.PathLike) -> dict:
203 """
204 Method to parse GenBank file and return
205 organism to latest accession mapping without
206 using BioPython's GenBank Scanner
207 """
208 accs2orgs = dict()
209 accs = dict()
210 orgs = dict()
211 acc = False
212 acc_pat = re.compile(r"^VERSION\s+(.+)")
213 org_pat = re.compile(r"^\s+ORGANISM\s+(.+)")
214 sanitize_pat = re.compile(r"[\,\"]")
215
216 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0):
217 logging.info(
218 f"The GenBank file [{os.path.basename(gbk)}] does not exist"
219 + "\nor is of size 0."
220 )
221 exit(1)
222
223 logging.info(
224 f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..."
225 )
226
227 with open(gbk, "r") as gbk_fh:
228 for line in gbk_fh:
229 line = line.rstrip()
230 if line.startswith("VERSION") and acc_pat.match(line):
231 acc = acc_pat.match(line).group(1)
232 accs[acc] = 1
233 if org_pat.match(line):
234 if acc and acc not in orgs.keys():
235 orgs[acc] = sanitize_pat.sub("", org_pat.match(line).group(1))
236 elif acc and acc in orgs.keys():
237 logging.error(f"Duplicate VERSION line: {acc}")
238 exit(1)
239 if len(accs.keys()) != len(orgs.keys()):
240 logging.error(
241 f"Got unequal number of organisms ({len(orgs.keys())})\n"
242 + f"and accessions ({len(accs.keys())})"
243 )
244 exit(1)
245 else:
246 for acc in accs.keys():
247 if acc not in orgs.keys():
248 logging.error(f"ORAGANISM not found for accession: {acc}")
249 exit(1)
250 accs2orgs[acc] = orgs[acc]
251
252 gbk_fh.close()
253 return accs2orgs
254
255
256 def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None:
257 """
258 Write sequence with no description to specified file.
259 """
260 SeqIO.write(
261 SeqRecord(Seq(seq), id=id, description=str()),
262 os.path.join(basedir, id + suffix),
263 "fasta",
264 )
265
266
267 # Main
268 def main() -> None:
269 """
270 This script takes:
271 1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files.
272 2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`.
273
274 and then generates a folder containing individual FASTA sequence files
275 per organelle, and a corresponding lineage file in CSV format.
276 """
277
278 # Set logging.
279 logging.basicConfig(
280 format="\n"
281 + "=" * 55
282 + "\n%(asctime)s - %(levelname)s\n"
283 + "=" * 55
284 + "\n%(message)s\n\n",
285 level=logging.DEBUG,
286 )
287
288 # Debug print.
289 ppp = pprint.PrettyPrinter(width=55)
290 prog_name = os.path.basename(inspect.stack()[0].filename)
291
292 parser = argparse.ArgumentParser(
293 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
294 )
295
296 required = parser.add_argument_group("required arguments")
297
298 required.add_argument(
299 "-csv",
300 dest="csv",
301 default=False,
302 required=True,
303 help="Absolute UNIX path to .csv or .csv.gz file which is generated "
304 + "\nby the `ncbitax2lin` tool.",
305 )
306 parser.add_argument(
307 "-cols",
308 dest="lineage_cols",
309 default="tax_id,domain,phylum,class,order,family,genus,species,strain",
310 required=False,
311 help="Taxonomic lineage will be built using these columns from the output of"
312 + "\n`ncbitax2lin` tool.",
313 )
314 parser.add_argument(
315 "-url",
316 dest="url",
317 default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion",
318 required=False,
319 help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.",
320 )
321 parser.add_argument(
322 "-out",
323 dest="out_folder",
324 default=os.path.join(os.getcwd(), "organelles"),
325 required=False,
326 help="By default, the output is written to this folder.",
327 )
328 parser.add_argument(
329 "-f",
330 dest="force_write_out",
331 default=False,
332 action="store_true",
333 required=False,
334 help="Force overwrite output directory contents.",
335 )
336 parser.add_argument(
337 "--fna-suffix",
338 dest="fna_suffix",
339 default=".fna",
340 required=False,
341 help="Suffix of the individual organelle FASTA files that will be saved.",
342 )
343 parser.add_argument(
344 "-ml",
345 dest="fa_min_len",
346 default=200,
347 required=False,
348 help="Minimum length of the FASTA sequence for it to be considered for"
349 + "\nfurther processing",
350 )
351 parser.add_argument(
352 "--gen-per-fa",
353 dest="gen_per_fa",
354 default=False,
355 required=False,
356 action="store_true",
357 help="Generate per sequence FASTA file.",
358 )
359 parser.add_argument(
360 "--alt-gb-parser",
361 dest="alt_gb_parser",
362 default=False,
363 required=False,
364 action="store_true",
365 help="Use alternate GenBank parser instead of BioPython's.",
366 )
367
368 # Parse defaults
369 args = parser.parse_args()
370 csv = args.csv
371 out = args.out_folder
372 overwrite = args.force_write_out
373 fna_suffix = args.fna_suffix
374 url = args.url
375 tax_cols = args.lineage_cols
376 gen_per_fa = args.gen_per_fa
377 alt_gb_parser = args.alt_gb_parser
378 min_len = int(args.fa_min_len)
379 tcols_pat = re.compile(r"^[\w\,]+?\w$")
380 mito_fna_suffix = re.compile(r".*?\.genomic\.fna")
381 mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff")
382 final_lineages = os.path.join(out, "lineages.csv")
383 lineages_not_found = os.path.join(out, "lineages_not_found.csv")
384 base_fasta_dir = os.path.join(out, "fasta")
385
386 # Basic checks
387 if not overwrite and os.path.exists(out):
388 logging.warning(
389 f"Output destination [{os.path.basename(out)}] already exists!"
390 + "\nPlease use -f to delete and overwrite."
391 )
392 elif overwrite and os.path.exists(out):
393 logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...")
394 shutil.rmtree(out)
395
396 if not tcols_pat.match(tax_cols):
397 logging.error(
398 f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma."
399 )
400 exit(1)
401 else:
402 tax_cols = re.sub("\n", "", tax_cols).split(",")
403
404 # Get .fna and .gbk files
405 fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out)
406 gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out)
407
408 # Get taxonomy from ncbitax2lin
409 lineages, raw_recs = get_lineages(csv, tax_cols)
410
411 # Get parsed organisms and latest accession from GenBank file.
412 if alt_gb_parser:
413 accs2orgs = from_genbank_alt(gbk)
414 else:
415 accs2orgs = from_genbank(gbk, min_len)
416
417 # # Finally, read FASTA and create individual FASTA if lineage exists.
418 logging.info(f"Creating new sequences and lineages...")
419
420 l_fh = open(final_lineages, "w")
421 ln_fh = open(lineages_not_found, "w")
422 l_fh.write(
423 "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n"
424 )
425 ln_fh.write("fna_id,gbk_org\n")
426 passed_lookup = 0
427 failed_lookup = 0
428 gbk_recs_missing = 0
429 skipped_len_short = 0
430
431 if gen_per_fa and not os.path.exists(base_fasta_dir):
432 os.makedirs(base_fasta_dir)
433
434 for record in SeqIO.parse(fna, "fasta"):
435 if len(record.seq) < min_len:
436 skipped_len_short += 1
437 continue
438 elif record.id in accs2orgs.keys():
439 org_words = accs2orgs[record.id].split(" ")
440 else:
441 gbk_recs_missing += 1
442 continue
443
444 genus_species = (
445 " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:])
446 )
447
448 if gen_per_fa:
449 write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix)
450
451 if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys():
452 l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n")
453 passed_lookup += 1
454 elif record.id in accs2orgs.keys() and genus_species in lineages.keys():
455 if len(org_words) > 2:
456 l_fh.write(
457 ",".join(
458 [
459 record.id,
460 lineages[genus_species].rstrip(","),
461 accs2orgs[record.id],
462 ]
463 )
464 + "\n"
465 )
466 else:
467 l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n")
468 passed_lookup += 1
469 else:
470 if len(org_words) > 2:
471 l_fh.write(
472 ",".join(
473 [
474 record.id,
475 "",
476 "",
477 "",
478 "",
479 "",
480 org_words[0],
481 org_words[0] + " " + org_words[1],
482 accs2orgs[record.id],
483 ]
484 )
485 + "\n"
486 )
487 else:
488 l_fh.write(
489 ",".join(
490 [
491 record.id,
492 "",
493 "",
494 "",
495 "",
496 "",
497 org_words[0],
498 accs2orgs[record.id],
499 "",
500 ]
501 )
502 + "\n"
503 )
504 ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n")
505 failed_lookup += 1
506
507 logging.info(
508 f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}"
509 + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}"
510 + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}"
511 + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}"
512 + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}"
513 + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}"
514 + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}"
515 + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}"
516 )
517
518 if (passed_lookup + failed_lookup) != len(accs2orgs.keys()):
519 logging.error(
520 f"The number of FASTA records written [{len(accs2orgs.keys())}]"
521 + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!"
522 )
523 exit(1)
524 else:
525 logging.info("Succesfully created lineages and FASTA records! Done!!")
526
527 l_fh.close()
528 ln_fh.close()
529
530
531 if __name__ == "__main__":
532 main()