kkonganti@11: #!/usr/bin/env python3 kkonganti@11: kkonganti@11: # Kranti Konganti kkonganti@11: kkonganti@11: import os kkonganti@11: import shutil kkonganti@11: import tempfile kkonganti@11: import argparse kkonganti@11: import inspect kkonganti@11: import logging kkonganti@11: import re kkonganti@11: from urllib.request import urlopen kkonganti@11: from html.parser import HTMLParser kkonganti@11: kkonganti@11: # Set logging.f kkonganti@11: logging.basicConfig( kkonganti@11: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", kkonganti@11: level=logging.DEBUG, kkonganti@11: ) kkonganti@11: kkonganti@11: # Multiple inheritence for pretty printing of help text. kkonganti@11: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@11: pass kkonganti@11: kkonganti@11: kkonganti@11: # HTMLParser override class to get PDG release and latest Cluster .tsv file kkonganti@11: class NCBIPathogensHTMLParser(HTMLParser): kkonganti@11: def __init__(self, *, convert_charrefs: bool = ...) -> None: kkonganti@11: super().__init__(convert_charrefs=convert_charrefs) kkonganti@11: self.reset() kkonganti@11: self.href_data = list() kkonganti@11: kkonganti@11: def handle_data(self, data): kkonganti@11: self.href_data.append(data) kkonganti@11: kkonganti@11: kkonganti@11: def dl_pdg(**kwargs) -> None: kkonganti@11: """ kkonganti@11: Method to save the PDG metadata file and kkonganti@11: return the latest PDG release. kkonganti@11: """ kkonganti@11: db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()] kkonganti@11: kkonganti@11: if (db_path or url) == None: kkonganti@11: logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") kkonganti@11: exit(1) kkonganti@11: kkonganti@11: if re.match(r"^PDG\d+\.\d+$", release): kkonganti@11: url = re.sub("latest_snps", release.strip(), url) kkonganti@11: kkonganti@11: html_parser = NCBIPathogensHTMLParser() kkonganti@11: logging.info(f"Finding latest NCBI PDG release at:\n{url}") kkonganti@11: kkonganti@11: with urlopen(url) as response: kkonganti@11: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@11: shutil.copyfileobj(response, tmp_html_file) kkonganti@11: kkonganti@11: with open(tmp_html_file.name, "r") as html: kkonganti@11: html_parser.feed("".join(html.readlines())) kkonganti@11: kkonganti@11: pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0) kkonganti@11: pdg_release = pdg_filename.rstrip(suffix) kkonganti@11: pdg_metadata_url = "/".join([url, pdg_filename]) kkonganti@11: pdg_release = pdg_filename.rstrip(suffix) kkonganti@11: dest_dir = os.path.join(db_path, pdg_release) kkonganti@11: kkonganti@11: logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}") kkonganti@11: kkonganti@11: if ( kkonganti@11: not overwrite kkonganti@11: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@11: and os.path.exists(dest_dir) kkonganti@11: ): kkonganti@11: logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.") kkonganti@11: exit(1) kkonganti@11: elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename): kkonganti@11: shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None kkonganti@11: os.makedirs(dest_dir) kkonganti@11: elif ( kkonganti@11: not overwrite kkonganti@11: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@11: and not os.path.exists(dest_dir) kkonganti@11: ): kkonganti@11: os.makedirs(dest_dir) kkonganti@11: kkonganti@11: tsv_at = os.path.join(dest_dir, pdg_filename) kkonganti@11: logging.info(f"Saving to:\n{tsv_at}") kkonganti@11: kkonganti@11: with urlopen(pdg_metadata_url) as response: kkonganti@11: with open(tsv_at, "w") as tsv: kkonganti@11: tsv.writelines(response.read().decode("utf-8")) kkonganti@11: kkonganti@11: html.close() kkonganti@11: tmp_html_file.close() kkonganti@11: os.unlink(tmp_html_file.name) kkonganti@11: tsv.close() kkonganti@11: response.close() kkonganti@11: kkonganti@11: return tsv_at, dest_dir kkonganti@11: kkonganti@11: kkonganti@11: def main() -> None: kkonganti@11: """ kkonganti@11: This script is part of the `bettercallsal_db` Nextflow workflow and is only kkonganti@11: tested on POSIX sytems. kkonganti@11: It: kkonganti@11: 1. Downloads the latest NCBI Pathogens Release metadata file, which kkonganti@11: looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster kkonganti@11: information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv kkonganti@11: 2. Generates a new metadata file with only required information such as kkonganti@11: computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA kkonganti@11: URL. kkonganti@11: """ kkonganti@11: kkonganti@11: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@11: kkonganti@11: parser = argparse.ArgumentParser( kkonganti@11: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@11: ) kkonganti@11: kkonganti@11: # required = parser.add_argument_group("required arguments") kkonganti@11: kkonganti@11: parser.add_argument( kkonganti@11: "-db", kkonganti@11: dest="db_path", kkonganti@11: default=os.getcwd(), kkonganti@11: required=False, kkonganti@11: help="Absolute UNIX path to a path where all results files are\nstored.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-f", kkonganti@11: dest="overwrite_db", kkonganti@11: default=False, kkonganti@11: required=False, kkonganti@11: action="store_true", kkonganti@11: help="Force overwrite a PDG release directory at DB path\nmentioned with -db.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-org", kkonganti@11: dest="organism", kkonganti@11: default="Salmonella", kkonganti@11: required=False, kkonganti@11: help="The organism to create the DB flat files\nfor.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-rel", kkonganti@11: dest="release", kkonganti@11: default=False, kkonganti@11: required=False, kkonganti@11: help="If you get a 404 error, try mentioning the actual release identifier.\n" kkonganti@11: + "Ex: For Salmonella, you can get the release identifier by going to:\n" kkonganti@11: + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Salmonella\n" kkonganti@11: + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n" kkonganti@11: + " would use this command-line option as:\n -rel PDG000000002.2507", kkonganti@11: ) kkonganti@11: kkonganti@11: args = parser.parse_args() kkonganti@11: db_path = args.db_path kkonganti@11: org = args.organism kkonganti@11: overwrite = args.overwrite_db kkonganti@11: release = args.release kkonganti@11: ncbi_pathogens_loc = "/".join( kkonganti@11: ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"] kkonganti@11: ) kkonganti@11: kkonganti@11: if not db_path: kkonganti@11: db_path = os.getcwd() kkonganti@11: kkonganti@11: # Save metadata kkonganti@11: file, dest_dir = dl_pdg( kkonganti@11: db_path=db_path, kkonganti@11: url="/".join([ncbi_pathogens_loc, "Metadata"]), kkonganti@11: regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"), kkonganti@11: suffix=".metadata.tsv", kkonganti@11: overwrite=overwrite, kkonganti@11: release=release, kkonganti@11: ) kkonganti@11: kkonganti@11: # Save cluster to target mapping kkonganti@11: dl_pdg( kkonganti@11: db_path=db_path, kkonganti@11: url="/".join([ncbi_pathogens_loc, "Clusters"]), kkonganti@11: regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"), kkonganti@11: suffix="reference_target\.cluster_list\.tsv", kkonganti@11: overwrite=overwrite, kkonganti@11: release=release, kkonganti@11: ) kkonganti@11: kkonganti@11: # Create accs.txt for dataformat to fetch required ACC fields kkonganti@11: accs_file = os.path.join(dest_dir, "accs_all.txt") kkonganti@11: with open(file, "r") as pdg_metadata_fh: kkonganti@11: with open(accs_file, "w") as accs_fh: kkonganti@11: for line in pdg_metadata_fh.readlines(): kkonganti@11: if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]: kkonganti@11: continue kkonganti@11: cols = line.strip().split("\t") kkonganti@11: asm_acc = cols[9] kkonganti@11: accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None kkonganti@11: accs_fh.close() kkonganti@11: pdg_metadata_fh.close() kkonganti@11: kkonganti@11: logging.info("Finished writing accessions for dataformat tool.") kkonganti@11: kkonganti@11: kkonganti@11: if __name__ == "__main__": kkonganti@11: main()