Mercurial > repos > galaxytrakr > hfp_nowayout_awsbatch
comparison 0.5.0/bin/create_fasta_and_lineages.py @ 0:3c767f9cfd88 draft default tip
planemo upload
| author | galaxytrakr |
|---|---|
| date | Fri, 29 May 2026 13:37:56 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:3c767f9cfd88 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 import argparse | |
| 4 import gzip | |
| 5 import inspect | |
| 6 import logging | |
| 7 import os | |
| 8 import pprint | |
| 9 import re | |
| 10 import shutil | |
| 11 import ssl | |
| 12 import tempfile | |
| 13 from html.parser import HTMLParser | |
| 14 from urllib.request import urlopen | |
| 15 | |
| 16 from Bio import SeqIO | |
| 17 from Bio.Seq import Seq | |
| 18 from Bio.SeqRecord import SeqRecord | |
| 19 | |
| 20 | |
| 21 # Multiple inheritence for pretty printing of help text. | |
| 22 class MultiArgFormatClasses( | |
| 23 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter | |
| 24 ): | |
| 25 pass | |
| 26 | |
| 27 | |
| 28 # HTMLParser override class to get fna.gz and gbff.gz | |
| 29 class NCBIHTMLParser(HTMLParser): | |
| 30 def __init__(self, *, convert_charrefs: bool = ...) -> None: | |
| 31 super().__init__(convert_charrefs=convert_charrefs) | |
| 32 self.reset() | |
| 33 self.href_data = list() | |
| 34 | |
| 35 def handle_data(self, data): | |
| 36 self.href_data.append(data) | |
| 37 | |
| 38 | |
| 39 # Download organelle FASTA and GenBank file. | |
| 40 def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike: | |
| 41 """ | |
| 42 Method to save .fna.gz and .gbff.gz files for the | |
| 43 RefSeq mitochondrion release. | |
| 44 """ | |
| 45 contxt = ssl.create_default_context() | |
| 46 contxt.check_hostname = False | |
| 47 contxt.verify_mode = ssl.CERT_NONE | |
| 48 | |
| 49 if url == None: | |
| 50 logging.error( | |
| 51 "Please provide the base URL where .fna.gz and .gbff.gz" | |
| 52 + "\nfiles for RefSeq mitochondrion can be found." | |
| 53 ) | |
| 54 exit(1) | |
| 55 | |
| 56 if os.path.exists(out): | |
| 57 for file in os.listdir(out): | |
| 58 file_path = os.path.join(out, file) | |
| 59 | |
| 60 if suffix.match(file_path) and os.path.getsize(file_path) > 0: | |
| 61 logging.info( | |
| 62 f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]" | |
| 63 + " already exists.\nSkipping download from NCBI..." | |
| 64 + "\nPlease use -f to delete and overwrite." | |
| 65 ) | |
| 66 return file_path | |
| 67 else: | |
| 68 os.makedirs(out) | |
| 69 | |
| 70 html_parser = NCBIHTMLParser() | |
| 71 logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}") | |
| 72 | |
| 73 with urlopen(url, context=contxt) as response: | |
| 74 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: | |
| 75 shutil.copyfileobj(response, tmp_html_file) | |
| 76 | |
| 77 with open(tmp_html_file.name, "r") as html: | |
| 78 html_parser.feed("".join(html.readlines())) | |
| 79 | |
| 80 file = suffix.search("".join(html_parser.href_data)).group(0) | |
| 81 file_url = "/".join([url, file + ".gz"]) | |
| 82 file_at = os.path.join(out, file) | |
| 83 | |
| 84 logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}") | |
| 85 | |
| 86 logging.info(f"Saving to:\n{file_at}") | |
| 87 | |
| 88 with tempfile.NamedTemporaryFile(delete=False) as tmp_gz: | |
| 89 with urlopen(file_url, context=contxt) as response: | |
| 90 tmp_gz.write(response.read()) | |
| 91 | |
| 92 with open(file_at, "w") as fh: | |
| 93 with gzip.open(tmp_gz.name, "rb") as web_gz: | |
| 94 fh.write(web_gz.read().decode("utf-8")) | |
| 95 | |
| 96 html.close() | |
| 97 tmp_gz.close() | |
| 98 tmp_html_file.close() | |
| 99 os.unlink(tmp_gz.name) | |
| 100 os.unlink(tmp_html_file.name) | |
| 101 fh.close() | |
| 102 web_gz.close() | |
| 103 response.close() | |
| 104 | |
| 105 return file_at | |
| 106 | |
| 107 | |
| 108 def get_lineages(csv: os.PathLike, cols: list) -> list: | |
| 109 """ | |
| 110 Parse the output from `ncbitax2lin` tool and | |
| 111 return a dict of lineages where the key is | |
| 112 genusspeciesstrain. | |
| 113 """ | |
| 114 lineages = dict() | |
| 115 if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0): | |
| 116 logging.error( | |
| 117 f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!" | |
| 118 ) | |
| 119 exit(1) | |
| 120 | |
| 121 logging.info(f"Indexing {os.path.basename(csv)}...") | |
| 122 | |
| 123 with open(csv, "r") as csv_fh: | |
| 124 header_cols = csv_fh.readline().strip().split(",") | |
| 125 user_req_cols = [ | |
| 126 tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols | |
| 127 ] | |
| 128 cols_not_found = [tcol for tcol in cols if tcol not in header_cols] | |
| 129 raw_recs = 0 | |
| 130 | |
| 131 if len(cols_not_found) > 0: | |
| 132 logging.error( | |
| 133 f"The following columns do not exist in the" | |
| 134 + f"\nCSV file [ {os.path.basename(csv)} ]:\n" | |
| 135 + "".join(cols_not_found) | |
| 136 ) | |
| 137 exit(1) | |
| 138 elif len(user_req_cols) > 9: | |
| 139 logging.error( | |
| 140 f"Only a total of 9 columns are needed!" | |
| 141 + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain" | |
| 142 ) | |
| 143 exit(1) | |
| 144 | |
| 145 for tax in csv_fh: | |
| 146 raw_recs += 1 | |
| 147 lcols = tax.strip().split(",") | |
| 148 | |
| 149 if bool(lcols[user_req_cols[8]]): | |
| 150 lineages[lcols[user_req_cols[8]]] = ",".join( | |
| 151 [re.sub(r"[\,\"]", "", lcols[l]) for l in user_req_cols[1:]] | |
| 152 ) | |
| 153 elif bool(lcols[user_req_cols[7]]): | |
| 154 lineages[lcols[user_req_cols[7]]] = ",".join( | |
| 155 [re.sub(r"[\,\"]", "", lcols[l]) for l in user_req_cols[1:8]] | |
| 156 + [str()] | |
| 157 ) | |
| 158 | |
| 159 if lcols[7] == "Rondeletia bicolor Goode & Bean": | |
| 160 print(lineages[lcols[8]]) | |
| 161 exit(0) | |
| 162 | |
| 163 csv_fh.close() | |
| 164 return lineages, raw_recs | |
| 165 | |
| 166 | |
| 167 def from_genbank(gbk: os.PathLike, min_len: int) -> dict: | |
| 168 """ | |
| 169 Method to parse GenBank file and return | |
| 170 organism to latest accession mapping. | |
| 171 """ | |
| 172 accs2orgs = dict() | |
| 173 sanitize_pat = re.compile(r"[\,\"]") | |
| 174 | |
| 175 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): | |
| 176 logging.info( | |
| 177 f"The GenBank file [{os.path.basename(gbk)}] does not exist" | |
| 178 + "\nor is of size 0." | |
| 179 ) | |
| 180 exit(1) | |
| 181 | |
| 182 logging.info(f"Indexing {os.path.basename(gbk)}...") | |
| 183 | |
| 184 # a = open("./_accs", "w") | |
| 185 try: | |
| 186 for record in SeqIO.parse(gbk, "genbank"): | |
| 187 if len(record.seq) < min_len: | |
| 188 continue | |
| 189 else: | |
| 190 # a.write(f"{record.id}\n") | |
| 191 accs2orgs[record.id] = sanitize_pat.sub( | |
| 192 "", record.annotations["organism"] | |
| 193 ) | |
| 194 except Exception as e: | |
| 195 logging.error(f"Error occured around GenBank ID: {record.id}") | |
| 196 logging.error(f"Error: {e}") | |
| 197 exit(1) | |
| 198 | |
| 199 return accs2orgs | |
| 200 | |
| 201 | |
| 202 def from_genbank_alt(gbk: os.PathLike) -> dict: | |
| 203 """ | |
| 204 Method to parse GenBank file and return | |
| 205 organism to latest accession mapping without | |
| 206 using BioPython's GenBank Scanner | |
| 207 """ | |
| 208 accs2orgs = dict() | |
| 209 accs = dict() | |
| 210 orgs = dict() | |
| 211 acc = False | |
| 212 acc_pat = re.compile(r"^VERSION\s+(.+)") | |
| 213 org_pat = re.compile(r"^\s+ORGANISM\s+(.+)") | |
| 214 sanitize_pat = re.compile(r"[\,\"]") | |
| 215 | |
| 216 if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): | |
| 217 logging.info( | |
| 218 f"The GenBank file [{os.path.basename(gbk)}] does not exist" | |
| 219 + "\nor is of size 0." | |
| 220 ) | |
| 221 exit(1) | |
| 222 | |
| 223 logging.info( | |
| 224 f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..." | |
| 225 ) | |
| 226 | |
| 227 with open(gbk, "r") as gbk_fh: | |
| 228 for line in gbk_fh: | |
| 229 line = line.rstrip() | |
| 230 if line.startswith("VERSION") and acc_pat.match(line): | |
| 231 acc = acc_pat.match(line).group(1) | |
| 232 accs[acc] = 1 | |
| 233 if org_pat.match(line): | |
| 234 if acc and acc not in orgs.keys(): | |
| 235 orgs[acc] = sanitize_pat.sub("", org_pat.match(line).group(1)) | |
| 236 elif acc and acc in orgs.keys(): | |
| 237 logging.error(f"Duplicate VERSION line: {acc}") | |
| 238 exit(1) | |
| 239 if len(accs.keys()) != len(orgs.keys()): | |
| 240 logging.error( | |
| 241 f"Got unequal number of organisms ({len(orgs.keys())})\n" | |
| 242 + f"and accessions ({len(accs.keys())})" | |
| 243 ) | |
| 244 exit(1) | |
| 245 else: | |
| 246 for acc in accs.keys(): | |
| 247 if acc not in orgs.keys(): | |
| 248 logging.error(f"ORAGANISM not found for accession: {acc}") | |
| 249 exit(1) | |
| 250 accs2orgs[acc] = orgs[acc] | |
| 251 | |
| 252 gbk_fh.close() | |
| 253 return accs2orgs | |
| 254 | |
| 255 | |
| 256 def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None: | |
| 257 """ | |
| 258 Write sequence with no description to specified file. | |
| 259 """ | |
| 260 SeqIO.write( | |
| 261 SeqRecord(Seq(seq), id=id, description=str()), | |
| 262 os.path.join(basedir, id + suffix), | |
| 263 "fasta", | |
| 264 ) | |
| 265 | |
| 266 | |
| 267 # Main | |
| 268 def main() -> None: | |
| 269 """ | |
| 270 This script takes: | |
| 271 1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files. | |
| 272 2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`. | |
| 273 | |
| 274 and then generates a folder containing individual FASTA sequence files | |
| 275 per organelle, and a corresponding lineage file in CSV format. | |
| 276 """ | |
| 277 | |
| 278 # Set logging. | |
| 279 logging.basicConfig( | |
| 280 format="\n" | |
| 281 + "=" * 55 | |
| 282 + "\n%(asctime)s - %(levelname)s\n" | |
| 283 + "=" * 55 | |
| 284 + "\n%(message)s\n\n", | |
| 285 level=logging.DEBUG, | |
| 286 ) | |
| 287 | |
| 288 # Debug print. | |
| 289 ppp = pprint.PrettyPrinter(width=55) | |
| 290 prog_name = os.path.basename(inspect.stack()[0].filename) | |
| 291 | |
| 292 parser = argparse.ArgumentParser( | |
| 293 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
| 294 ) | |
| 295 | |
| 296 required = parser.add_argument_group("required arguments") | |
| 297 | |
| 298 required.add_argument( | |
| 299 "-csv", | |
| 300 dest="csv", | |
| 301 default=False, | |
| 302 required=True, | |
| 303 help="Absolute UNIX path to .csv or .csv.gz file which is generated " | |
| 304 + "\nby the `ncbitax2lin` tool.", | |
| 305 ) | |
| 306 parser.add_argument( | |
| 307 "-cols", | |
| 308 dest="lineage_cols", | |
| 309 default="tax_id,domain,phylum,class,order,family,genus,species,strain", | |
| 310 required=False, | |
| 311 help="Taxonomic lineage will be built using these columns from the output of" | |
| 312 + "\n`ncbitax2lin` tool.", | |
| 313 ) | |
| 314 parser.add_argument( | |
| 315 "-url", | |
| 316 dest="url", | |
| 317 default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion", | |
| 318 required=False, | |
| 319 help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.", | |
| 320 ) | |
| 321 parser.add_argument( | |
| 322 "-out", | |
| 323 dest="out_folder", | |
| 324 default=os.path.join(os.getcwd(), "organelles"), | |
| 325 required=False, | |
| 326 help="By default, the output is written to this folder.", | |
| 327 ) | |
| 328 parser.add_argument( | |
| 329 "-f", | |
| 330 dest="force_write_out", | |
| 331 default=False, | |
| 332 action="store_true", | |
| 333 required=False, | |
| 334 help="Force overwrite output directory contents.", | |
| 335 ) | |
| 336 parser.add_argument( | |
| 337 "--fna-suffix", | |
| 338 dest="fna_suffix", | |
| 339 default=".fna", | |
| 340 required=False, | |
| 341 help="Suffix of the individual organelle FASTA files that will be saved.", | |
| 342 ) | |
| 343 parser.add_argument( | |
| 344 "-ml", | |
| 345 dest="fa_min_len", | |
| 346 default=200, | |
| 347 required=False, | |
| 348 help="Minimum length of the FASTA sequence for it to be considered for" | |
| 349 + "\nfurther processing", | |
| 350 ) | |
| 351 parser.add_argument( | |
| 352 "--gen-per-fa", | |
| 353 dest="gen_per_fa", | |
| 354 default=False, | |
| 355 required=False, | |
| 356 action="store_true", | |
| 357 help="Generate per sequence FASTA file.", | |
| 358 ) | |
| 359 parser.add_argument( | |
| 360 "--alt-gb-parser", | |
| 361 dest="alt_gb_parser", | |
| 362 default=False, | |
| 363 required=False, | |
| 364 action="store_true", | |
| 365 help="Use alternate GenBank parser instead of BioPython's.", | |
| 366 ) | |
| 367 | |
| 368 # Parse defaults | |
| 369 args = parser.parse_args() | |
| 370 csv = args.csv | |
| 371 out = args.out_folder | |
| 372 overwrite = args.force_write_out | |
| 373 fna_suffix = args.fna_suffix | |
| 374 url = args.url | |
| 375 tax_cols = args.lineage_cols | |
| 376 gen_per_fa = args.gen_per_fa | |
| 377 alt_gb_parser = args.alt_gb_parser | |
| 378 min_len = int(args.fa_min_len) | |
| 379 tcols_pat = re.compile(r"^[\w\,]+?\w$") | |
| 380 mito_fna_suffix = re.compile(r".*?\.genomic\.fna") | |
| 381 mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff") | |
| 382 final_lineages = os.path.join(out, "lineages.csv") | |
| 383 lineages_not_found = os.path.join(out, "lineages_not_found.csv") | |
| 384 base_fasta_dir = os.path.join(out, "fasta") | |
| 385 | |
| 386 # Basic checks | |
| 387 if not overwrite and os.path.exists(out): | |
| 388 logging.warning( | |
| 389 f"Output destination [{os.path.basename(out)}] already exists!" | |
| 390 + "\nPlease use -f to delete and overwrite." | |
| 391 ) | |
| 392 elif overwrite and os.path.exists(out): | |
| 393 logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...") | |
| 394 shutil.rmtree(out) | |
| 395 | |
| 396 if not tcols_pat.match(tax_cols): | |
| 397 logging.error( | |
| 398 f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma." | |
| 399 ) | |
| 400 exit(1) | |
| 401 else: | |
| 402 tax_cols = re.sub("\n", "", tax_cols).split(",") | |
| 403 | |
| 404 # Get .fna and .gbk files | |
| 405 fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out) | |
| 406 gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out) | |
| 407 | |
| 408 # Get taxonomy from ncbitax2lin | |
| 409 lineages, raw_recs = get_lineages(csv, tax_cols) | |
| 410 | |
| 411 # Get parsed organisms and latest accession from GenBank file. | |
| 412 if alt_gb_parser: | |
| 413 accs2orgs = from_genbank_alt(gbk) | |
| 414 else: | |
| 415 accs2orgs = from_genbank(gbk, min_len) | |
| 416 | |
| 417 # # Finally, read FASTA and create individual FASTA if lineage exists. | |
| 418 logging.info(f"Creating new sequences and lineages...") | |
| 419 | |
| 420 l_fh = open(final_lineages, "w") | |
| 421 ln_fh = open(lineages_not_found, "w") | |
| 422 l_fh.write( | |
| 423 "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n" | |
| 424 ) | |
| 425 ln_fh.write("fna_id,gbk_org\n") | |
| 426 passed_lookup = 0 | |
| 427 failed_lookup = 0 | |
| 428 gbk_recs_missing = 0 | |
| 429 skipped_len_short = 0 | |
| 430 | |
| 431 if gen_per_fa and not os.path.exists(base_fasta_dir): | |
| 432 os.makedirs(base_fasta_dir) | |
| 433 | |
| 434 for record in SeqIO.parse(fna, "fasta"): | |
| 435 if len(record.seq) < min_len: | |
| 436 skipped_len_short += 1 | |
| 437 continue | |
| 438 elif record.id in accs2orgs.keys(): | |
| 439 org_words = accs2orgs[record.id].split(" ") | |
| 440 else: | |
| 441 gbk_recs_missing += 1 | |
| 442 continue | |
| 443 | |
| 444 genus_species = ( | |
| 445 " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:]) | |
| 446 ) | |
| 447 | |
| 448 if gen_per_fa: | |
| 449 write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix) | |
| 450 | |
| 451 if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys(): | |
| 452 l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n") | |
| 453 passed_lookup += 1 | |
| 454 elif record.id in accs2orgs.keys() and genus_species in lineages.keys(): | |
| 455 if len(org_words) > 2: | |
| 456 l_fh.write( | |
| 457 ",".join( | |
| 458 [ | |
| 459 record.id, | |
| 460 lineages[genus_species].rstrip(","), | |
| 461 accs2orgs[record.id], | |
| 462 ] | |
| 463 ) | |
| 464 + "\n" | |
| 465 ) | |
| 466 else: | |
| 467 l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n") | |
| 468 passed_lookup += 1 | |
| 469 else: | |
| 470 if len(org_words) > 2: | |
| 471 l_fh.write( | |
| 472 ",".join( | |
| 473 [ | |
| 474 record.id, | |
| 475 "", | |
| 476 "", | |
| 477 "", | |
| 478 "", | |
| 479 "", | |
| 480 org_words[0], | |
| 481 org_words[0] + " " + org_words[1], | |
| 482 accs2orgs[record.id], | |
| 483 ] | |
| 484 ) | |
| 485 + "\n" | |
| 486 ) | |
| 487 else: | |
| 488 l_fh.write( | |
| 489 ",".join( | |
| 490 [ | |
| 491 record.id, | |
| 492 "", | |
| 493 "", | |
| 494 "", | |
| 495 "", | |
| 496 "", | |
| 497 org_words[0], | |
| 498 accs2orgs[record.id], | |
| 499 "", | |
| 500 ] | |
| 501 ) | |
| 502 + "\n" | |
| 503 ) | |
| 504 ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n") | |
| 505 failed_lookup += 1 | |
| 506 | |
| 507 logging.info( | |
| 508 f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}" | |
| 509 + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}" | |
| 510 + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}" | |
| 511 + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}" | |
| 512 + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}" | |
| 513 + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}" | |
| 514 + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}" | |
| 515 + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}" | |
| 516 ) | |
| 517 | |
| 518 if (passed_lookup + failed_lookup) != len(accs2orgs.keys()): | |
| 519 logging.error( | |
| 520 f"The number of FASTA records written [{len(accs2orgs.keys())}]" | |
| 521 + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!" | |
| 522 ) | |
| 523 exit(1) | |
| 524 else: | |
| 525 logging.info("Succesfully created lineages and FASTA records! Done!!") | |
| 526 | |
| 527 l_fh.close() | |
| 528 ln_fh.close() | |
| 529 | |
| 530 | |
| 531 if __name__ == "__main__": | |
| 532 main() |
