kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: import argparse kkonganti@0: import gzip kkonganti@0: import inspect kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import pprint kkonganti@0: import re kkonganti@0: import shutil kkonganti@0: import ssl kkonganti@0: import tempfile kkonganti@0: from html.parser import HTMLParser kkonganti@0: from urllib.request import urlopen kkonganti@0: kkonganti@0: from Bio import SeqIO kkonganti@0: from Bio.Seq import Seq kkonganti@0: from Bio.SeqRecord import SeqRecord kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses( kkonganti@0: argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter kkonganti@0: ): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: # HTMLParser override class to get fna.gz and gbff.gz kkonganti@0: class NCBIHTMLParser(HTMLParser): kkonganti@0: def __init__(self, *, convert_charrefs: bool = ...) -> None: kkonganti@0: super().__init__(convert_charrefs=convert_charrefs) kkonganti@0: self.reset() kkonganti@0: self.href_data = list() kkonganti@0: kkonganti@0: def handle_data(self, data): kkonganti@0: self.href_data.append(data) kkonganti@0: kkonganti@0: kkonganti@0: # Download organelle FASTA and GenBank file. kkonganti@0: def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike: kkonganti@0: """ kkonganti@0: Method to save .fna.gz and .gbff.gz files for the kkonganti@0: RefSeq mitochondrion release. kkonganti@0: """ kkonganti@0: contxt = ssl.create_default_context() kkonganti@0: contxt.check_hostname = False kkonganti@0: contxt.verify_mode = ssl.CERT_NONE kkonganti@0: kkonganti@0: if url == None: kkonganti@0: logging.error( kkonganti@0: "Please provide the base URL where .fna.gz and .gbff.gz" kkonganti@0: + "\nfiles for RefSeq mitochondrion can be found." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: if os.path.exists(out): kkonganti@0: for file in os.listdir(out): kkonganti@0: file_path = os.path.join(out, file) kkonganti@0: kkonganti@0: if suffix.match(file_path) and os.path.getsize(file_path) > 0: kkonganti@0: logging.info( kkonganti@0: f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]" kkonganti@0: + " already exists.\nSkipping download from NCBI..." kkonganti@0: + "\nPlease use -f to delete and overwrite." kkonganti@0: ) kkonganti@0: return file_path kkonganti@0: else: kkonganti@0: os.makedirs(out) kkonganti@0: kkonganti@0: html_parser = NCBIHTMLParser() kkonganti@0: logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}") kkonganti@0: kkonganti@0: with urlopen(url, context=contxt) as response: kkonganti@0: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@0: shutil.copyfileobj(response, tmp_html_file) kkonganti@0: kkonganti@0: with open(tmp_html_file.name, "r") as html: kkonganti@0: html_parser.feed("".join(html.readlines())) kkonganti@0: kkonganti@0: file = suffix.search("".join(html_parser.href_data)).group(0) kkonganti@0: file_url = "/".join([url, file + ".gz"]) kkonganti@0: file_at = os.path.join(out, file) kkonganti@0: kkonganti@0: logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}") kkonganti@0: kkonganti@0: logging.info(f"Saving to:\n{file_at}") kkonganti@0: kkonganti@0: with tempfile.NamedTemporaryFile(delete=False) as tmp_gz: kkonganti@0: with urlopen(file_url, context=contxt) as response: kkonganti@0: tmp_gz.write(response.read()) kkonganti@0: kkonganti@0: with open(file_at, "w") as fh: kkonganti@0: with gzip.open(tmp_gz.name, "rb") as web_gz: kkonganti@0: fh.write(web_gz.read().decode("utf-8")) kkonganti@0: kkonganti@0: html.close() kkonganti@0: tmp_gz.close() kkonganti@0: tmp_html_file.close() kkonganti@0: os.unlink(tmp_gz.name) kkonganti@0: os.unlink(tmp_html_file.name) kkonganti@0: fh.close() kkonganti@0: web_gz.close() kkonganti@0: response.close() kkonganti@0: kkonganti@0: return file_at kkonganti@0: kkonganti@0: kkonganti@0: def get_lineages(csv: os.PathLike, cols: list) -> list: kkonganti@0: """ kkonganti@0: Parse the output from `ncbitax2lin` tool and kkonganti@0: return a dict of lineages where the key is kkonganti@0: genusspeciesstrain. kkonganti@0: """ kkonganti@0: lineages = dict() kkonganti@0: if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0): kkonganti@0: logging.error( kkonganti@0: f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: logging.info(f"Indexing {os.path.basename(csv)}...") kkonganti@0: kkonganti@0: with open(csv, "r") as csv_fh: kkonganti@0: header_cols = csv_fh.readline().strip().split(",") kkonganti@0: user_req_cols = [ kkonganti@0: tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols kkonganti@0: ] kkonganti@0: cols_not_found = [tcol for tcol in cols if tcol not in header_cols] kkonganti@0: raw_recs = 0 kkonganti@0: kkonganti@0: if len(cols_not_found) > 0: kkonganti@0: logging.error( kkonganti@0: f"The following columns do not exist in the" kkonganti@0: + f"\nCSV file [ {os.path.basename(csv)} ]:\n" kkonganti@0: + "".join(cols_not_found) kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif len(user_req_cols) > 9: kkonganti@0: logging.error( kkonganti@0: f"Only a total of 9 columns are needed!" kkonganti@0: + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: for tax in csv_fh: kkonganti@0: raw_recs += 1 kkonganti@0: lcols = tax.strip().split(",") kkonganti@0: kkonganti@0: if bool(lcols[user_req_cols[8]]): kkonganti@0: lineages[lcols[user_req_cols[8]]] = ",".join( kkonganti@0: [lcols[l] for l in user_req_cols[1:]] kkonganti@0: ) kkonganti@0: elif bool(lcols[user_req_cols[7]]): kkonganti@0: lineages[lcols[user_req_cols[7]]] = ",".join( kkonganti@0: [lcols[l] for l in user_req_cols[1:8]] + [str()] kkonganti@0: ) kkonganti@0: kkonganti@0: csv_fh.close() kkonganti@0: return lineages, raw_recs kkonganti@0: kkonganti@0: kkonganti@0: def from_genbank(gbk: os.PathLike, min_len: int) -> dict: kkonganti@0: """ kkonganti@0: Method to parse GenBank file and return kkonganti@0: organism to latest accession mapping. kkonganti@0: """ kkonganti@0: accs2orgs = dict() kkonganti@0: kkonganti@0: if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): kkonganti@0: logging.info( kkonganti@0: f"The GenBank file [{os.path.basename(gbk)}] does not exist" kkonganti@0: + "\nor is of size 0." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: logging.info(f"Indexing {os.path.basename(gbk)}...") kkonganti@0: kkonganti@0: # a = open("./_accs", "w") kkonganti@0: for record in SeqIO.parse(gbk, "genbank"): kkonganti@0: if len(record.seq) < min_len: kkonganti@0: continue kkonganti@0: else: kkonganti@0: # a.write(f"{record.id}\n") kkonganti@0: accs2orgs[record.id] = record.annotations["organism"] kkonganti@0: kkonganti@0: return accs2orgs kkonganti@0: kkonganti@0: kkonganti@0: def from_genbank_alt(gbk: os.PathLike) -> dict: kkonganti@0: """ kkonganti@0: Method to parse GenBank file and return kkonganti@0: organism to latest accession mapping without kkonganti@0: using BioPython's GenBank Scanner kkonganti@0: """ kkonganti@0: accs2orgs = dict() kkonganti@0: accs = dict() kkonganti@0: orgs = dict() kkonganti@0: acc = False kkonganti@0: acc_pat = re.compile(r"^VERSION\s+(.+)") kkonganti@0: org_pat = re.compile(r"^\s+ORGANISM\s+(.+)") kkonganti@0: kkonganti@0: if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): kkonganti@0: logging.info( kkonganti@0: f"The GenBank file [{os.path.basename(gbk)}] does not exist" kkonganti@0: + "\nor is of size 0." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: logging.info( kkonganti@0: f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..." kkonganti@0: ) kkonganti@0: kkonganti@0: with open(gbk, "r") as gbk_fh: kkonganti@0: for line in gbk_fh: kkonganti@0: line = line.rstrip() kkonganti@0: if line.startswith("VERSION") and acc_pat.match(line): kkonganti@0: acc = acc_pat.match(line).group(1) kkonganti@0: accs[acc] = 1 kkonganti@0: if org_pat.match(line): kkonganti@0: if acc and acc not in orgs.keys(): kkonganti@0: orgs[acc] = org_pat.match(line).group(1) kkonganti@0: elif acc and acc in orgs.keys(): kkonganti@0: logging.error(f"Duplicate VERSION line: {acc}") kkonganti@0: exit(1) kkonganti@0: if len(accs.keys()) != len(orgs.keys()): kkonganti@0: logging.error( kkonganti@0: f"Got unequal number of organisms ({len(orgs.keys())})\n" kkonganti@0: + f"and accessions ({len(accs.keys())})" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: else: kkonganti@0: for acc in accs.keys(): kkonganti@0: if acc not in orgs.keys(): kkonganti@0: logging.error(f"ORAGANISM not found for accession: {acc}") kkonganti@0: exit(1) kkonganti@0: accs2orgs[acc] = orgs[acc] kkonganti@0: kkonganti@0: gbk_fh.close() kkonganti@0: return accs2orgs kkonganti@0: kkonganti@0: kkonganti@0: def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None: kkonganti@0: """ kkonganti@0: Write sequence with no description to specified file. kkonganti@0: """ kkonganti@0: SeqIO.write( kkonganti@0: SeqRecord(Seq(seq), id=id, description=str()), kkonganti@0: os.path.join(basedir, id + suffix), kkonganti@0: "fasta", kkonganti@0: ) kkonganti@0: kkonganti@0: kkonganti@0: # Main kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script takes: kkonganti@0: 1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files. kkonganti@0: 2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`. kkonganti@0: kkonganti@0: and then generates a folder containing individual FASTA sequence files kkonganti@0: per organelle, and a corresponding lineage file in CSV format. kkonganti@0: """ kkonganti@0: kkonganti@0: # Set logging. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(asctime)s - %(levelname)s\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(message)s\n\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: # Debug print. kkonganti@0: ppp = pprint.PrettyPrinter(width=55) kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-csv", kkonganti@0: dest="csv", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to .csv or .csv.gz file which is generated " kkonganti@0: + "\nby the `ncbitax2lin` tool.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-cols", kkonganti@0: dest="lineage_cols", kkonganti@0: default="tax_id,superkingdom,phylum,class,order,family,genus,species,strain", kkonganti@0: required=False, kkonganti@0: help="Taxonomic lineage will be built using these columns from the output of" kkonganti@0: + "\n`ncbitax2lin` tool.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-url", kkonganti@0: dest="url", kkonganti@0: default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion", kkonganti@0: required=False, kkonganti@0: help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-out", kkonganti@0: dest="out_folder", kkonganti@0: default=os.path.join(os.getcwd(), "organelles"), kkonganti@0: required=False, kkonganti@0: help="By default, the output is written to this folder.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-f", kkonganti@0: dest="force_write_out", kkonganti@0: default=False, kkonganti@0: action="store_true", kkonganti@0: required=False, kkonganti@0: help="Force overwrite output directory contents.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "--fna-suffix", kkonganti@0: dest="fna_suffix", kkonganti@0: default=".fna", kkonganti@0: required=False, kkonganti@0: help="Suffix of the individual organelle FASTA files that will be saved.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-ml", kkonganti@0: dest="fa_min_len", kkonganti@0: default=200, kkonganti@0: required=False, kkonganti@0: help="Minimum length of the FASTA sequence for it to be considered for" kkonganti@0: + "\nfurther processing", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "--skip-per-fa", kkonganti@0: dest="skip_per_fa", kkonganti@0: default=False, kkonganti@0: required=False, kkonganti@0: action="store_true", kkonganti@0: help="Do not generate per sequence FASTA file.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "--alt-gb-parser", kkonganti@0: dest="alt_gb_parser", kkonganti@0: default=False, kkonganti@0: required=False, kkonganti@0: action="store_true", kkonganti@0: help="Use alternate GenBank parser instead of BioPython's.", kkonganti@0: ) kkonganti@0: kkonganti@0: # Parse defaults kkonganti@0: args = parser.parse_args() kkonganti@0: csv = args.csv kkonganti@0: out = args.out_folder kkonganti@0: overwrite = args.force_write_out kkonganti@0: fna_suffix = args.fna_suffix kkonganti@0: url = args.url kkonganti@0: tax_cols = args.lineage_cols kkonganti@0: skip_per_fa = args.skip_per_fa kkonganti@0: alt_gb_parser = args.alt_gb_parser kkonganti@0: min_len = int(args.fa_min_len) kkonganti@0: tcols_pat = re.compile(r"^[\w\,]+?\w$") kkonganti@0: mito_fna_suffix = re.compile(r".*?\.genomic\.fna") kkonganti@0: mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff") kkonganti@0: final_lineages = os.path.join(out, "lineages.csv") kkonganti@0: lineages_not_found = os.path.join(out, "lineages_not_found.csv") kkonganti@0: base_fasta_dir = os.path.join(out, "fasta") kkonganti@0: kkonganti@0: # Basic checks kkonganti@0: if not overwrite and os.path.exists(out): kkonganti@0: logging.warning( kkonganti@0: f"Output destination [{os.path.basename(out)}] already exists!" kkonganti@0: + "\nPlease use -f to delete and overwrite." kkonganti@0: ) kkonganti@0: elif overwrite and os.path.exists(out): kkonganti@0: logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...") kkonganti@0: shutil.rmtree(out) kkonganti@0: kkonganti@0: if not tcols_pat.match(tax_cols): kkonganti@0: logging.error( kkonganti@0: f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: else: kkonganti@0: tax_cols = re.sub("\n", "", tax_cols).split(",") kkonganti@0: kkonganti@0: # Get .fna and .gbk files kkonganti@0: fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out) kkonganti@0: gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out) kkonganti@0: kkonganti@0: # Get taxonomy from ncbitax2lin kkonganti@0: lineages, raw_recs = get_lineages(csv, tax_cols) kkonganti@0: kkonganti@0: # Get parsed organisms and latest accession from GenBank file. kkonganti@0: if alt_gb_parser: kkonganti@0: accs2orgs = from_genbank_alt(gbk) kkonganti@0: else: kkonganti@0: accs2orgs = from_genbank(gbk, min_len) kkonganti@0: kkonganti@0: # # Finally, read FASTA and create individual FASTA if lineage exists. kkonganti@0: logging.info(f"Creating new sequences and lineages...") kkonganti@0: kkonganti@0: l_fh = open(final_lineages, "w") kkonganti@0: ln_fh = open(lineages_not_found, "w") kkonganti@0: l_fh.write( kkonganti@0: "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n" kkonganti@0: ) kkonganti@0: ln_fh.write("fna_id,gbk_org\n") kkonganti@0: passed_lookup = 0 kkonganti@0: failed_lookup = 0 kkonganti@0: gbk_recs_missing = 0 kkonganti@0: skipped_len_short = 0 kkonganti@0: kkonganti@0: if not os.path.exists(base_fasta_dir): kkonganti@0: os.makedirs(base_fasta_dir) kkonganti@0: kkonganti@0: for record in SeqIO.parse(fna, "fasta"): kkonganti@0: if len(record.seq) < min_len: kkonganti@0: skipped_len_short += 1 kkonganti@0: continue kkonganti@0: elif record.id in accs2orgs.keys(): kkonganti@0: org_words = accs2orgs[record.id].split(" ") kkonganti@0: else: kkonganti@0: gbk_recs_missing += 1 kkonganti@0: continue kkonganti@0: kkonganti@0: genus_species = ( kkonganti@0: " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:]) kkonganti@0: ) kkonganti@0: kkonganti@0: if not skip_per_fa: kkonganti@0: write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix) kkonganti@0: kkonganti@0: if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys(): kkonganti@0: l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n") kkonganti@0: passed_lookup += 1 kkonganti@0: elif record.id in accs2orgs.keys() and genus_species in lineages.keys(): kkonganti@0: if len(org_words) > 2: kkonganti@0: l_fh.write( kkonganti@0: ",".join( kkonganti@0: [ kkonganti@0: record.id, kkonganti@0: lineages[genus_species].rstrip(","), kkonganti@0: accs2orgs[record.id], kkonganti@0: ] kkonganti@0: ) kkonganti@0: + "\n" kkonganti@0: ) kkonganti@0: else: kkonganti@0: l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n") kkonganti@0: passed_lookup += 1 kkonganti@0: else: kkonganti@0: if len(org_words) > 2: kkonganti@0: l_fh.write( kkonganti@0: ",".join( kkonganti@0: [ kkonganti@0: record.id, kkonganti@0: "", kkonganti@0: "", kkonganti@0: "", kkonganti@0: "", kkonganti@0: "", kkonganti@0: org_words[0], kkonganti@0: org_words[0] + " " + org_words[1], kkonganti@0: accs2orgs[record.id], kkonganti@0: ] kkonganti@0: ) kkonganti@0: + "\n" kkonganti@0: ) kkonganti@0: else: kkonganti@0: l_fh.write( kkonganti@0: ",".join( kkonganti@0: [ kkonganti@0: record.id, kkonganti@0: "", kkonganti@0: "", kkonganti@0: "", kkonganti@0: "", kkonganti@0: "", kkonganti@0: org_words[0], kkonganti@0: accs2orgs[record.id], kkonganti@0: "", kkonganti@0: ] kkonganti@0: ) kkonganti@0: + "\n" kkonganti@0: ) kkonganti@0: ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n") kkonganti@0: failed_lookup += 1 kkonganti@0: kkonganti@0: logging.info( kkonganti@0: f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}" kkonganti@0: + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}" kkonganti@0: + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}" kkonganti@0: + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}" kkonganti@0: + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}" kkonganti@0: + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}" kkonganti@0: + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}" kkonganti@0: + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}" kkonganti@0: ) kkonganti@0: kkonganti@0: if (passed_lookup + failed_lookup) != len(accs2orgs.keys()): kkonganti@0: logging.error( kkonganti@0: f"The number of FASTA records written [{len(accs2orgs.keys())}]" kkonganti@0: + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: else: kkonganti@0: logging.info("Succesfully created lineages and FASTA records! Done!!") kkonganti@0: kkonganti@0: l_fh.close() kkonganti@0: ln_fh.close() kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main()