Mercurial > repos > kkonganti > hfp_nowayout
comparison 0.5.0/bin/create_fasta_and_lineages.py @ 0:97cd2f532efe
planemo upload
author | kkonganti |
---|---|
date | Mon, 31 Mar 2025 14:50:40 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:97cd2f532efe |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import argparse | |
4 import gzip | |
5 import inspect | |
6 import logging | |
7 import os | |
8 import pprint | |
9 import re | |
10 import shutil | |
11 import ssl | |
12 import tempfile | |
13 from html.parser import HTMLParser | |
14 from urllib.request import urlopen | |
15 | |
16 from Bio import SeqIO | |
17 from Bio.Seq import Seq | |
18 from Bio.SeqRecord import SeqRecord | |
19 | |
20 | |
21 # Multiple inheritence for pretty printing of help text. | |
22 class MultiArgFormatClasses( | |
23 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter | |
24 ): | |
25 pass | |
26 | |
27 | |
28 # HTMLParser override class to get fna.gz and gbff.gz | |
29 class NCBIHTMLParser(HTMLParser): | |
30 def __init__(self, *, convert_charrefs: bool = ...) -> None: | |
31 super().__init__(convert_charrefs=convert_charrefs) | |
32 self.reset() | |
33 self.href_data = list() | |
34 | |
35 def handle_data(self, data): | |
36 self.href_data.append(data) | |
37 | |
38 | |
39 # Download organelle FASTA and GenBank file. | |
40 def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike: | |
41 """ | |
42 Method to save .fna.gz and .gbff.gz files for the | |
43 RefSeq mitochondrion release. | |
44 """ | |
45 contxt = ssl.create_default_context() | |
46 contxt.check_hostname = False | |
47 contxt.verify_mode = ssl.CERT_NONE | |
48 | |
49 if url == None: | |
50 logging.error( | |
51 "Please provide the base URL where .fna.gz and .gbff.gz" | |
52 + "\nfiles for RefSeq mitochondrion can be found." | |
53 ) | |
54 exit(1) | |
55 | |
56 if os.path.exists(out): | |
57 for file in os.listdir(out): | |
58 file_path = os.path.join(out, file) | |
59 | |
60 if suffix.match(file_path) and os.path.getsize(file_path) > 0: | |
61 logging.info( | |
62 f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]" | |
63 + " already exists.\nSkipping download from NCBI..." | |
64 + "\nPlease use -f to delete and overwrite." | |
65 ) | |
66 return file_path | |
67 else: | |
68 os.makedirs(out) | |
69 | |
70 html_parser = NCBIHTMLParser() | |
71 logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}") | |
72 | |
73 with urlopen(url, context=contxt) as response: | |
74 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: | |
75 shutil.copyfileobj(response, tmp_html_file) | |
76 | |
77 with open(tmp_html_file.name, "r") as html: | |
78 html_parser.feed("".join(html.readlines())) | |
79 | |
80 file = suffix.search("".join(html_parser.href_data)).group(0) | |
81 file_url = "/".join([url, file + ".gz"]) | |
82 file_at = os.path.join(out, file) | |
83 | |
84 logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}") | |
85 | |
86 logging.info(f"Saving to:\n{file_at}") | |
87 | |
88 with tempfile.NamedTemporaryFile(delete=False) as tmp_gz: | |
89 with urlopen(file_url, context=contxt) as response: | |
90 tmp_gz.write(response.read()) | |
91 | |
92 with open(file_at, "w") as fh: | |
93 with gzip.open(tmp_gz.name, "rb") as web_gz: | |
94 fh.write(web_gz.read().decode("utf-8")) | |
95 | |
96 html.close() | |
97 tmp_gz.close() | |
98 tmp_html_file.close() | |
99 os.unlink(tmp_gz.name) | |
100 os.unlink(tmp_html_file.name) | |
101 fh.close() | |
102 web_gz.close() | |
103 response.close() | |
104 | |
105 return file_at | |
106 | |
107 | |
108 def get_lineages(csv: os.PathLike, cols: list) -> list: | |
109 """ | |
110 Parse the output from `ncbitax2lin` tool and | |
111 return a dict of lineages where the key is | |
112 genusspeciesstrain. | |
113 """ | |
114 lineages = dict() | |
115 if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0): | |
116 logging.error( | |
117 f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!" | |
118 ) | |
119 exit(1) | |
120 | |
121 logging.info(f"Indexing {os.path.basename(csv)}...") | |
122 | |
123 with open(csv, "r") as csv_fh: | |
124 header_cols = csv_fh.readline().strip().split(",") | |
125 user_req_cols = [ | |
126 tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols | |
127 ] | |
128 cols_not_found = [tcol for tcol in cols if tcol not in header_cols] | |
129 raw_recs = 0 | |
130 | |
131 if len(cols_not_found) > 0: | |
132 logging.error( | |
133 f"The following columns do not exist in the" | |
134 + f"\nCSV file [ {os.path.basename(csv)} ]:\n" | |
135 + "".join(cols_not_found) | |
136 ) | |
137 exit(1) | |
138 elif len(user_req_cols) > 9: | |
139 logging.error( | |
140 f"Only a total of 9 columns are needed!" | |
141 + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain" | |
142 ) | |
143 exit(1) | |
144 | |
145 for tax in csv_fh: | |
146 raw_recs += 1 | |
147 lcols = tax.strip().split(",") | |
148 | |
149 if bool(lcols[user_req_cols[8]]): | |
150 lineages[lcols[user_req_cols[8]]] = ",".join( | |
151 [lcols[l] for l in user_req_cols[1:]] | |
152 ) | |
153 elif bool(lcols[user_req_cols[7]]): | |
154 lineages[lcols[user_req_cols[7]]] = ",".join( | |
155 [lcols[l] for l in user_req_cols[1:8]] + [str()] | |
156 ) | |
157 | |
158 csv_fh.close() | |
159 return lineages, raw_recs | |
160 | |
161 | |
162 def from_genbank(gbk: os.PathLike, min_len: int) -> dict: | |
163 """ | |
164 Method to parse GenBank file and return | |
165 organism to latest accession mapping. | |
166 """ | |
167 accs2orgs = dict() | |
168 | |
169 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): | |
170 logging.info( | |
171 f"The GenBank file [{os.path.basename(gbk)}] does not exist" | |
172 + "\nor is of size 0." | |
173 ) | |
174 exit(1) | |
175 | |
176 logging.info(f"Indexing {os.path.basename(gbk)}...") | |
177 | |
178 # a = open("./_accs", "w") | |
179 for record in SeqIO.parse(gbk, "genbank"): | |
180 if len(record.seq) < min_len: | |
181 continue | |
182 else: | |
183 # a.write(f"{record.id}\n") | |
184 accs2orgs[record.id] = record.annotations["organism"] | |
185 | |
186 return accs2orgs | |
187 | |
188 | |
189 def from_genbank_alt(gbk: os.PathLike) -> dict: | |
190 """ | |
191 Method to parse GenBank file and return | |
192 organism to latest accession mapping without | |
193 using BioPython's GenBank Scanner | |
194 """ | |
195 accs2orgs = dict() | |
196 accs = dict() | |
197 orgs = dict() | |
198 acc = False | |
199 acc_pat = re.compile(r"^VERSION\s+(.+)") | |
200 org_pat = re.compile(r"^\s+ORGANISM\s+(.+)") | |
201 | |
202 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): | |
203 logging.info( | |
204 f"The GenBank file [{os.path.basename(gbk)}] does not exist" | |
205 + "\nor is of size 0." | |
206 ) | |
207 exit(1) | |
208 | |
209 logging.info( | |
210 f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..." | |
211 ) | |
212 | |
213 with open(gbk, "r") as gbk_fh: | |
214 for line in gbk_fh: | |
215 line = line.rstrip() | |
216 if line.startswith("VERSION") and acc_pat.match(line): | |
217 acc = acc_pat.match(line).group(1) | |
218 accs[acc] = 1 | |
219 if org_pat.match(line): | |
220 if acc and acc not in orgs.keys(): | |
221 orgs[acc] = org_pat.match(line).group(1) | |
222 elif acc and acc in orgs.keys(): | |
223 logging.error(f"Duplicate VERSION line: {acc}") | |
224 exit(1) | |
225 if len(accs.keys()) != len(orgs.keys()): | |
226 logging.error( | |
227 f"Got unequal number of organisms ({len(orgs.keys())})\n" | |
228 + f"and accessions ({len(accs.keys())})" | |
229 ) | |
230 exit(1) | |
231 else: | |
232 for acc in accs.keys(): | |
233 if acc not in orgs.keys(): | |
234 logging.error(f"ORAGANISM not found for accession: {acc}") | |
235 exit(1) | |
236 accs2orgs[acc] = orgs[acc] | |
237 | |
238 gbk_fh.close() | |
239 return accs2orgs | |
240 | |
241 | |
242 def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None: | |
243 """ | |
244 Write sequence with no description to specified file. | |
245 """ | |
246 SeqIO.write( | |
247 SeqRecord(Seq(seq), id=id, description=str()), | |
248 os.path.join(basedir, id + suffix), | |
249 "fasta", | |
250 ) | |
251 | |
252 | |
253 # Main | |
254 def main() -> None: | |
255 """ | |
256 This script takes: | |
257 1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files. | |
258 2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`. | |
259 | |
260 and then generates a folder containing individual FASTA sequence files | |
261 per organelle, and a corresponding lineage file in CSV format. | |
262 """ | |
263 | |
264 # Set logging. | |
265 logging.basicConfig( | |
266 format="\n" | |
267 + "=" * 55 | |
268 + "\n%(asctime)s - %(levelname)s\n" | |
269 + "=" * 55 | |
270 + "\n%(message)s\n\n", | |
271 level=logging.DEBUG, | |
272 ) | |
273 | |
274 # Debug print. | |
275 ppp = pprint.PrettyPrinter(width=55) | |
276 prog_name = os.path.basename(inspect.stack()[0].filename) | |
277 | |
278 parser = argparse.ArgumentParser( | |
279 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
280 ) | |
281 | |
282 required = parser.add_argument_group("required arguments") | |
283 | |
284 required.add_argument( | |
285 "-csv", | |
286 dest="csv", | |
287 default=False, | |
288 required=True, | |
289 help="Absolute UNIX path to .csv or .csv.gz file which is generated " | |
290 + "\nby the `ncbitax2lin` tool.", | |
291 ) | |
292 parser.add_argument( | |
293 "-cols", | |
294 dest="lineage_cols", | |
295 default="tax_id,superkingdom,phylum,class,order,family,genus,species,strain", | |
296 required=False, | |
297 help="Taxonomic lineage will be built using these columns from the output of" | |
298 + "\n`ncbitax2lin` tool.", | |
299 ) | |
300 parser.add_argument( | |
301 "-url", | |
302 dest="url", | |
303 default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion", | |
304 required=False, | |
305 help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.", | |
306 ) | |
307 parser.add_argument( | |
308 "-out", | |
309 dest="out_folder", | |
310 default=os.path.join(os.getcwd(), "organelles"), | |
311 required=False, | |
312 help="By default, the output is written to this folder.", | |
313 ) | |
314 parser.add_argument( | |
315 "-f", | |
316 dest="force_write_out", | |
317 default=False, | |
318 action="store_true", | |
319 required=False, | |
320 help="Force overwrite output directory contents.", | |
321 ) | |
322 parser.add_argument( | |
323 "--fna-suffix", | |
324 dest="fna_suffix", | |
325 default=".fna", | |
326 required=False, | |
327 help="Suffix of the individual organelle FASTA files that will be saved.", | |
328 ) | |
329 parser.add_argument( | |
330 "-ml", | |
331 dest="fa_min_len", | |
332 default=200, | |
333 required=False, | |
334 help="Minimum length of the FASTA sequence for it to be considered for" | |
335 + "\nfurther processing", | |
336 ) | |
337 parser.add_argument( | |
338 "--skip-per-fa", | |
339 dest="skip_per_fa", | |
340 default=False, | |
341 required=False, | |
342 action="store_true", | |
343 help="Do not generate per sequence FASTA file.", | |
344 ) | |
345 parser.add_argument( | |
346 "--alt-gb-parser", | |
347 dest="alt_gb_parser", | |
348 default=False, | |
349 required=False, | |
350 action="store_true", | |
351 help="Use alternate GenBank parser instead of BioPython's.", | |
352 ) | |
353 | |
354 # Parse defaults | |
355 args = parser.parse_args() | |
356 csv = args.csv | |
357 out = args.out_folder | |
358 overwrite = args.force_write_out | |
359 fna_suffix = args.fna_suffix | |
360 url = args.url | |
361 tax_cols = args.lineage_cols | |
362 skip_per_fa = args.skip_per_fa | |
363 alt_gb_parser = args.alt_gb_parser | |
364 min_len = int(args.fa_min_len) | |
365 tcols_pat = re.compile(r"^[\w\,]+?\w$") | |
366 mito_fna_suffix = re.compile(r".*?\.genomic\.fna") | |
367 mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff") | |
368 final_lineages = os.path.join(out, "lineages.csv") | |
369 lineages_not_found = os.path.join(out, "lineages_not_found.csv") | |
370 base_fasta_dir = os.path.join(out, "fasta") | |
371 | |
372 # Basic checks | |
373 if not overwrite and os.path.exists(out): | |
374 logging.warning( | |
375 f"Output destination [{os.path.basename(out)}] already exists!" | |
376 + "\nPlease use -f to delete and overwrite." | |
377 ) | |
378 elif overwrite and os.path.exists(out): | |
379 logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...") | |
380 shutil.rmtree(out) | |
381 | |
382 if not tcols_pat.match(tax_cols): | |
383 logging.error( | |
384 f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma." | |
385 ) | |
386 exit(1) | |
387 else: | |
388 tax_cols = re.sub("\n", "", tax_cols).split(",") | |
389 | |
390 # Get .fna and .gbk files | |
391 fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out) | |
392 gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out) | |
393 | |
394 # Get taxonomy from ncbitax2lin | |
395 lineages, raw_recs = get_lineages(csv, tax_cols) | |
396 | |
397 # Get parsed organisms and latest accession from GenBank file. | |
398 if alt_gb_parser: | |
399 accs2orgs = from_genbank_alt(gbk) | |
400 else: | |
401 accs2orgs = from_genbank(gbk, min_len) | |
402 | |
403 # # Finally, read FASTA and create individual FASTA if lineage exists. | |
404 logging.info(f"Creating new sequences and lineages...") | |
405 | |
406 l_fh = open(final_lineages, "w") | |
407 ln_fh = open(lineages_not_found, "w") | |
408 l_fh.write( | |
409 "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n" | |
410 ) | |
411 ln_fh.write("fna_id,gbk_org\n") | |
412 passed_lookup = 0 | |
413 failed_lookup = 0 | |
414 gbk_recs_missing = 0 | |
415 skipped_len_short = 0 | |
416 | |
417 if not os.path.exists(base_fasta_dir): | |
418 os.makedirs(base_fasta_dir) | |
419 | |
420 for record in SeqIO.parse(fna, "fasta"): | |
421 if len(record.seq) < min_len: | |
422 skipped_len_short += 1 | |
423 continue | |
424 elif record.id in accs2orgs.keys(): | |
425 org_words = accs2orgs[record.id].split(" ") | |
426 else: | |
427 gbk_recs_missing += 1 | |
428 continue | |
429 | |
430 genus_species = ( | |
431 " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:]) | |
432 ) | |
433 | |
434 if not skip_per_fa: | |
435 write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix) | |
436 | |
437 if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys(): | |
438 l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n") | |
439 passed_lookup += 1 | |
440 elif record.id in accs2orgs.keys() and genus_species in lineages.keys(): | |
441 if len(org_words) > 2: | |
442 l_fh.write( | |
443 ",".join( | |
444 [ | |
445 record.id, | |
446 lineages[genus_species].rstrip(","), | |
447 accs2orgs[record.id], | |
448 ] | |
449 ) | |
450 + "\n" | |
451 ) | |
452 else: | |
453 l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n") | |
454 passed_lookup += 1 | |
455 else: | |
456 if len(org_words) > 2: | |
457 l_fh.write( | |
458 ",".join( | |
459 [ | |
460 record.id, | |
461 "", | |
462 "", | |
463 "", | |
464 "", | |
465 "", | |
466 org_words[0], | |
467 org_words[0] + " " + org_words[1], | |
468 accs2orgs[record.id], | |
469 ] | |
470 ) | |
471 + "\n" | |
472 ) | |
473 else: | |
474 l_fh.write( | |
475 ",".join( | |
476 [ | |
477 record.id, | |
478 "", | |
479 "", | |
480 "", | |
481 "", | |
482 "", | |
483 org_words[0], | |
484 accs2orgs[record.id], | |
485 "", | |
486 ] | |
487 ) | |
488 + "\n" | |
489 ) | |
490 ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n") | |
491 failed_lookup += 1 | |
492 | |
493 logging.info( | |
494 f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}" | |
495 + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}" | |
496 + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}" | |
497 + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}" | |
498 + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}" | |
499 + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}" | |
500 + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}" | |
501 + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}" | |
502 ) | |
503 | |
504 if (passed_lookup + failed_lookup) != len(accs2orgs.keys()): | |
505 logging.error( | |
506 f"The number of FASTA records written [{len(accs2orgs.keys())}]" | |
507 + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!" | |
508 ) | |
509 exit(1) | |
510 else: | |
511 logging.info("Succesfully created lineages and FASTA records! Done!!") | |
512 | |
513 l_fh.close() | |
514 ln_fh.close() | |
515 | |
516 | |
517 if __name__ == "__main__": | |
518 main() |