diff 0.5.0/bin/create_fasta_and_lineages.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/0.5.0/bin/create_fasta_and_lineages.py	Mon Mar 31 14:50:40 2025 -0400
@@ -0,0 +1,518 @@
+#!/usr/bin/env python3
+
+import argparse
+import gzip
+import inspect
+import logging
+import os
+import pprint
+import re
+import shutil
+import ssl
+import tempfile
+from html.parser import HTMLParser
+from urllib.request import urlopen
+
+from Bio import SeqIO
+from Bio.Seq import Seq
+from Bio.SeqRecord import SeqRecord
+
+
+# Multiple inheritence for pretty printing of help text.
+class MultiArgFormatClasses(
+    argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
+):
+    pass
+
+
+# HTMLParser override class to get fna.gz and gbff.gz
+class NCBIHTMLParser(HTMLParser):
+    def __init__(self, *, convert_charrefs: bool = ...) -> None:
+        super().__init__(convert_charrefs=convert_charrefs)
+        self.reset()
+        self.href_data = list()
+
+    def handle_data(self, data):
+        self.href_data.append(data)
+
+
+# Download organelle FASTA and GenBank file.
+def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike:
+    """
+    Method to save .fna.gz and .gbff.gz files for the
+    RefSeq mitochondrion release.
+    """
+    contxt = ssl.create_default_context()
+    contxt.check_hostname = False
+    contxt.verify_mode = ssl.CERT_NONE
+
+    if url == None:
+        logging.error(
+            "Please provide the base URL where .fna.gz and .gbff.gz"
+            + "\nfiles for RefSeq mitochondrion can be found."
+        )
+        exit(1)
+
+    if os.path.exists(out):
+        for file in os.listdir(out):
+            file_path = os.path.join(out, file)
+
+            if suffix.match(file_path) and os.path.getsize(file_path) > 0:
+                logging.info(
+                    f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]"
+                    + " already exists.\nSkipping download from NCBI..."
+                    + "\nPlease use -f to delete and overwrite."
+                )
+                return file_path
+    else:
+        os.makedirs(out)
+
+    html_parser = NCBIHTMLParser()
+    logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}")
+
+    with urlopen(url, context=contxt) as response:
+        with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
+            shutil.copyfileobj(response, tmp_html_file)
+
+    with open(tmp_html_file.name, "r") as html:
+        html_parser.feed("".join(html.readlines()))
+
+    file = suffix.search("".join(html_parser.href_data)).group(0)
+    file_url = "/".join([url, file + ".gz"])
+    file_at = os.path.join(out, file)
+
+    logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}")
+
+    logging.info(f"Saving to:\n{file_at}")
+
+    with tempfile.NamedTemporaryFile(delete=False) as tmp_gz:
+        with urlopen(file_url, context=contxt) as response:
+            tmp_gz.write(response.read())
+
+    with open(file_at, "w") as fh:
+        with gzip.open(tmp_gz.name, "rb") as web_gz:
+            fh.write(web_gz.read().decode("utf-8"))
+
+    html.close()
+    tmp_gz.close()
+    tmp_html_file.close()
+    os.unlink(tmp_gz.name)
+    os.unlink(tmp_html_file.name)
+    fh.close()
+    web_gz.close()
+    response.close()
+
+    return file_at
+
+
+def get_lineages(csv: os.PathLike, cols: list) -> list:
+    """
+    Parse the output from `ncbitax2lin` tool and
+    return a dict of lineages where the key is
+    genusspeciesstrain.
+    """
+    lineages = dict()
+    if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0):
+        logging.error(
+            f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!"
+        )
+        exit(1)
+
+    logging.info(f"Indexing {os.path.basename(csv)}...")
+
+    with open(csv, "r") as csv_fh:
+        header_cols = csv_fh.readline().strip().split(",")
+        user_req_cols = [
+            tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols
+        ]
+        cols_not_found = [tcol for tcol in cols if tcol not in header_cols]
+        raw_recs = 0
+
+        if len(cols_not_found) > 0:
+            logging.error(
+                f"The following columns do not exist in the"
+                + f"\nCSV file [ {os.path.basename(csv)} ]:\n"
+                + "".join(cols_not_found)
+            )
+            exit(1)
+        elif len(user_req_cols) > 9:
+            logging.error(
+                f"Only a total of 9 columns are needed!"
+                + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain"
+            )
+            exit(1)
+
+        for tax in csv_fh:
+            raw_recs += 1
+            lcols = tax.strip().split(",")
+
+            if bool(lcols[user_req_cols[8]]):
+                lineages[lcols[user_req_cols[8]]] = ",".join(
+                    [lcols[l] for l in user_req_cols[1:]]
+                )
+            elif bool(lcols[user_req_cols[7]]):
+                lineages[lcols[user_req_cols[7]]] = ",".join(
+                    [lcols[l] for l in user_req_cols[1:8]] + [str()]
+                )
+
+    csv_fh.close()
+    return lineages, raw_recs
+
+
+def from_genbank(gbk: os.PathLike, min_len: int) -> dict:
+    """
+    Method to parse GenBank file and return
+    organism to latest accession mapping.
+    """
+    accs2orgs = dict()
+
+    if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0):
+        logging.info(
+            f"The GenBank file [{os.path.basename(gbk)}] does not exist"
+            + "\nor is of size 0."
+        )
+        exit(1)
+
+    logging.info(f"Indexing {os.path.basename(gbk)}...")
+
+    # a = open("./_accs", "w")
+    for record in SeqIO.parse(gbk, "genbank"):
+        if len(record.seq) < min_len:
+            continue
+        else:
+            # a.write(f"{record.id}\n")
+            accs2orgs[record.id] = record.annotations["organism"]
+
+    return accs2orgs
+
+
+def from_genbank_alt(gbk: os.PathLike) -> dict:
+    """
+    Method to parse GenBank file and return
+    organism to latest accession mapping without
+    using BioPython's GenBank Scanner
+    """
+    accs2orgs = dict()
+    accs = dict()
+    orgs = dict()
+    acc = False
+    acc_pat = re.compile(r"^VERSION\s+(.+)")
+    org_pat = re.compile(r"^\s+ORGANISM\s+(.+)")
+
+    if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0):
+        logging.info(
+            f"The GenBank file [{os.path.basename(gbk)}] does not exist"
+            + "\nor is of size 0."
+        )
+        exit(1)
+
+    logging.info(
+        f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..."
+    )
+
+    with open(gbk, "r") as gbk_fh:
+        for line in gbk_fh:
+            line = line.rstrip()
+            if line.startswith("VERSION") and acc_pat.match(line):
+                acc = acc_pat.match(line).group(1)
+                accs[acc] = 1
+            if org_pat.match(line):
+                if acc and acc not in orgs.keys():
+                    orgs[acc] = org_pat.match(line).group(1)
+                elif acc and acc in orgs.keys():
+                    logging.error(f"Duplicate VERSION line: {acc}")
+                    exit(1)
+        if len(accs.keys()) != len(orgs.keys()):
+            logging.error(
+                f"Got unequal number of organisms ({len(orgs.keys())})\n"
+                + f"and accessions ({len(accs.keys())})"
+            )
+            exit(1)
+        else:
+            for acc in accs.keys():
+                if acc not in orgs.keys():
+                    logging.error(f"ORAGANISM not found for accession: {acc}")
+                    exit(1)
+                accs2orgs[acc] = orgs[acc]
+
+    gbk_fh.close()
+    return accs2orgs
+
+
+def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None:
+    """
+    Write sequence with no description to specified file.
+    """
+    SeqIO.write(
+        SeqRecord(Seq(seq), id=id, description=str()),
+        os.path.join(basedir, id + suffix),
+        "fasta",
+    )
+
+
+# Main
+def main() -> None:
+    """
+    This script takes:
+        1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files.
+        2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`.
+
+    and then generates a folder containing individual FASTA sequence files
+    per organelle, and a corresponding lineage file in CSV format.
+    """
+
+    # Set logging.
+    logging.basicConfig(
+        format="\n"
+        + "=" * 55
+        + "\n%(asctime)s - %(levelname)s\n"
+        + "=" * 55
+        + "\n%(message)s\n\n",
+        level=logging.DEBUG,
+    )
+
+    # Debug print.
+    ppp = pprint.PrettyPrinter(width=55)
+    prog_name = os.path.basename(inspect.stack()[0].filename)
+
+    parser = argparse.ArgumentParser(
+        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
+    )
+
+    required = parser.add_argument_group("required arguments")
+
+    required.add_argument(
+        "-csv",
+        dest="csv",
+        default=False,
+        required=True,
+        help="Absolute UNIX path to .csv or .csv.gz file which is generated "
+        + "\nby the `ncbitax2lin` tool.",
+    )
+    parser.add_argument(
+        "-cols",
+        dest="lineage_cols",
+        default="tax_id,superkingdom,phylum,class,order,family,genus,species,strain",
+        required=False,
+        help="Taxonomic lineage will be built using these columns from the output of"
+        + "\n`ncbitax2lin` tool.",
+    )
+    parser.add_argument(
+        "-url",
+        dest="url",
+        default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion",
+        required=False,
+        help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.",
+    )
+    parser.add_argument(
+        "-out",
+        dest="out_folder",
+        default=os.path.join(os.getcwd(), "organelles"),
+        required=False,
+        help="By default, the output is written to this folder.",
+    )
+    parser.add_argument(
+        "-f",
+        dest="force_write_out",
+        default=False,
+        action="store_true",
+        required=False,
+        help="Force overwrite output directory contents.",
+    )
+    parser.add_argument(
+        "--fna-suffix",
+        dest="fna_suffix",
+        default=".fna",
+        required=False,
+        help="Suffix of the individual organelle FASTA files that will be saved.",
+    )
+    parser.add_argument(
+        "-ml",
+        dest="fa_min_len",
+        default=200,
+        required=False,
+        help="Minimum length of the FASTA sequence for it to be considered for"
+        + "\nfurther processing",
+    )
+    parser.add_argument(
+        "--skip-per-fa",
+        dest="skip_per_fa",
+        default=False,
+        required=False,
+        action="store_true",
+        help="Do not generate per sequence FASTA file.",
+    )
+    parser.add_argument(
+        "--alt-gb-parser",
+        dest="alt_gb_parser",
+        default=False,
+        required=False,
+        action="store_true",
+        help="Use alternate GenBank parser instead of BioPython's.",
+    )
+
+    # Parse defaults
+    args = parser.parse_args()
+    csv = args.csv
+    out = args.out_folder
+    overwrite = args.force_write_out
+    fna_suffix = args.fna_suffix
+    url = args.url
+    tax_cols = args.lineage_cols
+    skip_per_fa = args.skip_per_fa
+    alt_gb_parser = args.alt_gb_parser
+    min_len = int(args.fa_min_len)
+    tcols_pat = re.compile(r"^[\w\,]+?\w$")
+    mito_fna_suffix = re.compile(r".*?\.genomic\.fna")
+    mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff")
+    final_lineages = os.path.join(out, "lineages.csv")
+    lineages_not_found = os.path.join(out, "lineages_not_found.csv")
+    base_fasta_dir = os.path.join(out, "fasta")
+
+    # Basic checks
+    if not overwrite and os.path.exists(out):
+        logging.warning(
+            f"Output destination [{os.path.basename(out)}] already exists!"
+            + "\nPlease use -f to delete and overwrite."
+        )
+    elif overwrite and os.path.exists(out):
+        logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...")
+        shutil.rmtree(out)
+
+    if not tcols_pat.match(tax_cols):
+        logging.error(
+            f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma."
+        )
+        exit(1)
+    else:
+        tax_cols = re.sub("\n", "", tax_cols).split(",")
+
+    # Get .fna and .gbk files
+    fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out)
+    gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out)
+
+    # Get  taxonomy from ncbitax2lin
+    lineages, raw_recs = get_lineages(csv, tax_cols)
+
+    # Get parsed organisms and latest accession from GenBank file.
+    if alt_gb_parser:
+        accs2orgs = from_genbank_alt(gbk)
+    else:
+        accs2orgs = from_genbank(gbk, min_len)
+
+    # # Finally, read FASTA and create individual FASTA if lineage exists.
+    logging.info(f"Creating new sequences and lineages...")
+
+    l_fh = open(final_lineages, "w")
+    ln_fh = open(lineages_not_found, "w")
+    l_fh.write(
+        "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n"
+    )
+    ln_fh.write("fna_id,gbk_org\n")
+    passed_lookup = 0
+    failed_lookup = 0
+    gbk_recs_missing = 0
+    skipped_len_short = 0
+
+    if not os.path.exists(base_fasta_dir):
+        os.makedirs(base_fasta_dir)
+
+    for record in SeqIO.parse(fna, "fasta"):
+        if len(record.seq) < min_len:
+            skipped_len_short += 1
+            continue
+        elif record.id in accs2orgs.keys():
+            org_words = accs2orgs[record.id].split(" ")
+        else:
+            gbk_recs_missing += 1
+            continue
+
+        genus_species = (
+            " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:])
+        )
+
+        if not skip_per_fa:
+            write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix)
+
+        if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys():
+            l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n")
+            passed_lookup += 1
+        elif record.id in accs2orgs.keys() and genus_species in lineages.keys():
+            if len(org_words) > 2:
+                l_fh.write(
+                    ",".join(
+                        [
+                            record.id,
+                            lineages[genus_species].rstrip(","),
+                            accs2orgs[record.id],
+                        ]
+                    )
+                    + "\n"
+                )
+            else:
+                l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n")
+            passed_lookup += 1
+        else:
+            if len(org_words) > 2:
+                l_fh.write(
+                    ",".join(
+                        [
+                            record.id,
+                            "",
+                            "",
+                            "",
+                            "",
+                            "",
+                            org_words[0],
+                            org_words[0] + " " + org_words[1],
+                            accs2orgs[record.id],
+                        ]
+                    )
+                    + "\n"
+                )
+            else:
+                l_fh.write(
+                    ",".join(
+                        [
+                            record.id,
+                            "",
+                            "",
+                            "",
+                            "",
+                            "",
+                            org_words[0],
+                            accs2orgs[record.id],
+                            "",
+                        ]
+                    )
+                    + "\n"
+                )
+            ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n")
+            failed_lookup += 1
+
+    logging.info(
+        f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}"
+        + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}"
+        + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}"
+        + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}"
+        + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}"
+        + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}"
+        + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}"
+        + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}"
+    )
+
+    if (passed_lookup + failed_lookup) != len(accs2orgs.keys()):
+        logging.error(
+            f"The number of FASTA records written [{len(accs2orgs.keys())}]"
+            + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!"
+        )
+        exit(1)
+    else:
+        logging.info("Succesfully created lineages and FASTA records! Done!!")
+
+    l_fh.close()
+    ln_fh.close()
+
+
+if __name__ == "__main__":
+    main()