kkonganti@1: #!/usr/bin/env python3 kkonganti@1: kkonganti@1: # Kranti Konganti kkonganti@1: kkonganti@1: import os kkonganti@1: import shutil kkonganti@1: import tempfile kkonganti@1: import argparse kkonganti@1: import inspect kkonganti@1: import logging kkonganti@1: import re kkonganti@1: from urllib.request import urlopen kkonganti@1: from html.parser import HTMLParser kkonganti@1: kkonganti@1: # Set logging.f kkonganti@1: logging.basicConfig( kkonganti@1: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", kkonganti@1: level=logging.DEBUG, kkonganti@1: ) kkonganti@1: kkonganti@1: # Multiple inheritence for pretty printing of help text. kkonganti@1: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@1: pass kkonganti@1: kkonganti@1: kkonganti@1: # HTMLParser override class to get PDG release and latest Cluster .tsv file kkonganti@1: class NCBIPathogensHTMLParser(HTMLParser): kkonganti@1: def __init__(self, *, convert_charrefs: bool = ...) -> None: kkonganti@1: super().__init__(convert_charrefs=convert_charrefs) kkonganti@1: self.reset() kkonganti@1: self.href_data = list() kkonganti@1: kkonganti@1: def handle_data(self, data): kkonganti@1: self.href_data.append(data) kkonganti@1: kkonganti@1: kkonganti@1: def dl_pdg(**kwargs) -> None: kkonganti@1: """ kkonganti@1: Method to save the PDG metadata file and kkonganti@1: return the latest PDG release. kkonganti@1: """ kkonganti@1: db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()] kkonganti@1: kkonganti@1: if (db_path or url) == None: kkonganti@1: logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") kkonganti@1: exit(1) kkonganti@1: kkonganti@1: if re.match(r"^PDG\d+\.\d+$", release): kkonganti@1: url = re.sub("latest_snps", release.strip(), url) kkonganti@1: kkonganti@1: html_parser = NCBIPathogensHTMLParser() kkonganti@1: logging.info(f"Finding latest NCBI PDG release at:\n{url}") kkonganti@1: kkonganti@1: with urlopen(url) as response: kkonganti@1: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@1: shutil.copyfileobj(response, tmp_html_file) kkonganti@1: kkonganti@1: with open(tmp_html_file.name, "r") as html: kkonganti@1: html_parser.feed("".join(html.readlines())) kkonganti@1: kkonganti@1: pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0) kkonganti@1: pdg_release = pdg_filename.rstrip(suffix) kkonganti@1: pdg_metadata_url = "/".join([url, pdg_filename]) kkonganti@1: pdg_release = pdg_filename.rstrip(suffix) kkonganti@1: dest_dir = os.path.join(db_path, pdg_release) kkonganti@1: kkonganti@1: logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}") kkonganti@1: kkonganti@1: if ( kkonganti@1: not overwrite kkonganti@1: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@1: and os.path.exists(dest_dir) kkonganti@1: ): kkonganti@1: logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.") kkonganti@1: exit(1) kkonganti@1: elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename): kkonganti@1: shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None kkonganti@1: os.makedirs(dest_dir) kkonganti@1: elif ( kkonganti@1: not overwrite kkonganti@1: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@1: and not os.path.exists(dest_dir) kkonganti@1: ): kkonganti@1: os.makedirs(dest_dir) kkonganti@1: kkonganti@1: tsv_at = os.path.join(dest_dir, pdg_filename) kkonganti@1: logging.info(f"Saving to:\n{tsv_at}") kkonganti@1: kkonganti@1: with urlopen(pdg_metadata_url) as response: kkonganti@1: with open(tsv_at, "w") as tsv: kkonganti@1: tsv.writelines(response.read().decode("utf-8")) kkonganti@1: kkonganti@1: html.close() kkonganti@1: tmp_html_file.close() kkonganti@1: os.unlink(tmp_html_file.name) kkonganti@1: tsv.close() kkonganti@1: response.close() kkonganti@1: kkonganti@1: return tsv_at, dest_dir kkonganti@1: kkonganti@1: kkonganti@1: def main() -> None: kkonganti@1: """ kkonganti@1: This script is part of the `bettercallsal_db` Nextflow workflow and is only kkonganti@1: tested on POSIX sytems. kkonganti@1: It: kkonganti@1: 1. Downloads the latest NCBI Pathogens Release metadata file, which kkonganti@1: looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster kkonganti@1: information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv kkonganti@1: 2. Generates a new metadata file with only required information such as kkonganti@1: computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA kkonganti@1: URL. kkonganti@1: """ kkonganti@1: kkonganti@1: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@1: kkonganti@1: parser = argparse.ArgumentParser( kkonganti@1: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@1: ) kkonganti@1: kkonganti@1: # required = parser.add_argument_group("required arguments") kkonganti@1: kkonganti@1: parser.add_argument( kkonganti@1: "-db", kkonganti@1: dest="db_path", kkonganti@1: default=os.getcwd(), kkonganti@1: required=False, kkonganti@1: help="Absolute UNIX path to a path where all results files are\nstored.", kkonganti@1: ) kkonganti@1: parser.add_argument( kkonganti@1: "-f", kkonganti@1: dest="overwrite_db", kkonganti@1: default=False, kkonganti@1: required=False, kkonganti@1: action="store_true", kkonganti@1: help="Force overwrite a PDG release directory at DB path\nmentioned with -db.", kkonganti@1: ) kkonganti@1: parser.add_argument( kkonganti@1: "-org", kkonganti@1: dest="organism", kkonganti@1: default="Salmonella", kkonganti@1: required=False, kkonganti@1: help="The organism to create the DB flat files\nfor.", kkonganti@1: ) kkonganti@1: parser.add_argument( kkonganti@1: "-rel", kkonganti@1: dest="release", kkonganti@1: default=False, kkonganti@1: required=False, kkonganti@1: help="If you get a 404 error, try mentioning the actual release identifier.\n" kkonganti@1: + "Ex: For Salmonella, you can get the release identifier by going to:\n" kkonganti@1: + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Salmonella\n" kkonganti@1: + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n" kkonganti@1: + " would use this command-line option as:\n -rel PDG000000002.2507", kkonganti@1: ) kkonganti@1: kkonganti@1: args = parser.parse_args() kkonganti@1: db_path = args.db_path kkonganti@1: org = args.organism kkonganti@1: overwrite = args.overwrite_db kkonganti@1: release = args.release kkonganti@1: ncbi_pathogens_loc = "/".join( kkonganti@1: ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"] kkonganti@1: ) kkonganti@1: kkonganti@1: if not db_path: kkonganti@1: db_path = os.getcwd() kkonganti@1: kkonganti@1: # Save metadata kkonganti@1: file, dest_dir = dl_pdg( kkonganti@1: db_path=db_path, kkonganti@1: url="/".join([ncbi_pathogens_loc, "Metadata"]), kkonganti@1: regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"), kkonganti@1: suffix=".metadata.tsv", kkonganti@1: overwrite=overwrite, kkonganti@1: release=release, kkonganti@1: ) kkonganti@1: kkonganti@1: # Save cluster to target mapping kkonganti@1: dl_pdg( kkonganti@1: db_path=db_path, kkonganti@1: url="/".join([ncbi_pathogens_loc, "Clusters"]), kkonganti@1: regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"), kkonganti@1: suffix="reference_target\.cluster_list\.tsv", kkonganti@1: overwrite=overwrite, kkonganti@1: release=release, kkonganti@1: ) kkonganti@1: kkonganti@1: # Create accs.txt for dataformat to fetch required ACC fields kkonganti@1: accs_file = os.path.join(dest_dir, "accs_all.txt") kkonganti@1: with open(file, "r") as pdg_metadata_fh: kkonganti@1: with open(accs_file, "w") as accs_fh: kkonganti@1: for line in pdg_metadata_fh.readlines(): kkonganti@1: if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]: kkonganti@1: continue kkonganti@1: cols = line.strip().split("\t") kkonganti@1: asm_acc = cols[9] kkonganti@1: accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None kkonganti@1: accs_fh.close() kkonganti@1: pdg_metadata_fh.close() kkonganti@1: kkonganti@1: logging.info("Finished writing accessions for dataformat tool.") kkonganti@1: kkonganti@1: kkonganti@1: if __name__ == "__main__": kkonganti@1: main()