diff 0.5.0/bin/dl_pdg_metadata.py @ 1:365849f031fd

"planemo upload"
author kkonganti
date Mon, 05 Jun 2023 18:48:51 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/0.5.0/bin/dl_pdg_metadata.py	Mon Jun 05 18:48:51 2023 -0400
@@ -0,0 +1,206 @@
+#!/usr/bin/env python3
+
+# Kranti Konganti
+
+import os
+import shutil
+import tempfile
+import argparse
+import inspect
+import logging
+import re
+from urllib.request import urlopen
+from html.parser import HTMLParser
+
+# Set logging.f
+logging.basicConfig(
+    format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n",
+    level=logging.DEBUG,
+)
+
+# Multiple inheritence for pretty printing of help text.
+class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
+    pass
+
+
+# HTMLParser override class to get PDG release and latest Cluster .tsv file
+class NCBIPathogensHTMLParser(HTMLParser):
+    def __init__(self, *, convert_charrefs: bool = ...) -> None:
+        super().__init__(convert_charrefs=convert_charrefs)
+        self.reset()
+        self.href_data = list()
+
+    def handle_data(self, data):
+        self.href_data.append(data)
+
+
+def dl_pdg(**kwargs) -> None:
+    """
+    Method to save the PDG metadata file and
+    return the latest PDG release.
+    """
+    db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()]
+
+    if (db_path or url) == None:
+        logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.")
+        exit(1)
+
+    if re.match(r"^PDG\d+\.\d+$", release):
+        url = re.sub("latest_snps", release.strip(), url)
+
+    html_parser = NCBIPathogensHTMLParser()
+    logging.info(f"Finding latest NCBI PDG release at:\n{url}")
+
+    with urlopen(url) as response:
+        with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
+            shutil.copyfileobj(response, tmp_html_file)
+
+    with open(tmp_html_file.name, "r") as html:
+        html_parser.feed("".join(html.readlines()))
+
+    pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0)
+    pdg_release = pdg_filename.rstrip(suffix)
+    pdg_metadata_url = "/".join([url, pdg_filename])
+    pdg_release = pdg_filename.rstrip(suffix)
+    dest_dir = os.path.join(db_path, pdg_release)
+
+    logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}")
+
+    if (
+        not overwrite
+        and re.match(r".+?\.metadata\.tsv$", pdg_filename)
+        and os.path.exists(dest_dir)
+    ):
+        logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.")
+        exit(1)
+    elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename):
+        shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None
+        os.makedirs(dest_dir)
+    elif (
+        not overwrite
+        and re.match(r".+?\.metadata\.tsv$", pdg_filename)
+        and not os.path.exists(dest_dir)
+    ):
+        os.makedirs(dest_dir)
+
+    tsv_at = os.path.join(dest_dir, pdg_filename)
+    logging.info(f"Saving to:\n{tsv_at}")
+
+    with urlopen(pdg_metadata_url) as response:
+        with open(tsv_at, "w") as tsv:
+            tsv.writelines(response.read().decode("utf-8"))
+
+    html.close()
+    tmp_html_file.close()
+    os.unlink(tmp_html_file.name)
+    tsv.close()
+    response.close()
+
+    return tsv_at, dest_dir
+
+
+def main() -> None:
+    """
+    This script is part of the `bettercallsal_db` Nextflow workflow and is only
+    tested on POSIX sytems.
+    It:
+        1. Downloads the latest NCBI Pathogens Release metadata file, which
+            looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster
+            information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv
+        2. Generates a new metadata file with only required information such as
+            computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA
+            URL.
+    """
+
+    prog_name = os.path.basename(inspect.stack()[0].filename)
+
+    parser = argparse.ArgumentParser(
+        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
+    )
+
+    # required = parser.add_argument_group("required arguments")
+
+    parser.add_argument(
+        "-db",
+        dest="db_path",
+        default=os.getcwd(),
+        required=False,
+        help="Absolute UNIX path to a path where all results files are\nstored.",
+    )
+    parser.add_argument(
+        "-f",
+        dest="overwrite_db",
+        default=False,
+        required=False,
+        action="store_true",
+        help="Force overwrite a PDG release directory at DB path\nmentioned with -db.",
+    )
+    parser.add_argument(
+        "-org",
+        dest="organism",
+        default="Salmonella",
+        required=False,
+        help="The organism to create the DB flat files\nfor.",
+    )
+    parser.add_argument(
+        "-rel",
+        dest="release",
+        default=False,
+        required=False,
+        help="If you get a 404 error, try mentioning the actual release identifier.\n"
+        + "Ex: For Salmonella, you can get the release identifier by going to:\n"
+        + "    https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Salmonella\n"
+        + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n"
+        + "    would use this command-line option as:\n    -rel PDG000000002.2507",
+    )
+
+    args = parser.parse_args()
+    db_path = args.db_path
+    org = args.organism
+    overwrite = args.overwrite_db
+    release = args.release
+    ncbi_pathogens_loc = "/".join(
+        ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"]
+    )
+
+    if not db_path:
+        db_path = os.getcwd()
+
+    # Save metadata
+    file, dest_dir = dl_pdg(
+        db_path=db_path,
+        url="/".join([ncbi_pathogens_loc, "Metadata"]),
+        regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"),
+        suffix=".metadata.tsv",
+        overwrite=overwrite,
+        release=release,
+    )
+
+    # Save cluster to target mapping
+    dl_pdg(
+        db_path=db_path,
+        url="/".join([ncbi_pathogens_loc, "Clusters"]),
+        regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"),
+        suffix="reference_target\.cluster_list\.tsv",
+        overwrite=overwrite,
+        release=release,
+    )
+
+    # Create accs.txt for dataformat to fetch required ACC fields
+    accs_file = os.path.join(dest_dir, "accs_all.txt")
+    with open(file, "r") as pdg_metadata_fh:
+        with open(accs_file, "w") as accs_fh:
+            for line in pdg_metadata_fh.readlines():
+                if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]:
+                    continue
+                cols = line.strip().split("\t")
+                asm_acc = cols[9]
+                accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None
+        accs_fh.close()
+    pdg_metadata_fh.close()
+
+    logging.info("Finished writing accessions for dataformat tool.")
+
+
+if __name__ == "__main__":
+    main()