kkonganti@17: #!/usr/bin/env python3 kkonganti@17: kkonganti@17: # Kranti Konganti kkonganti@17: kkonganti@17: import argparse kkonganti@17: import inspect kkonganti@17: import logging kkonganti@17: import os kkonganti@17: import re kkonganti@17: import shutil kkonganti@17: import ssl kkonganti@17: import tempfile kkonganti@17: from html.parser import HTMLParser kkonganti@17: from urllib.request import urlopen kkonganti@17: kkonganti@17: # Set logging.f kkonganti@17: logging.basicConfig( kkonganti@17: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", kkonganti@17: level=logging.DEBUG, kkonganti@17: ) kkonganti@17: kkonganti@17: # Multiple inheritence for pretty printing of help text. kkonganti@17: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@17: pass kkonganti@17: kkonganti@17: kkonganti@17: # HTMLParser override class to get PDG release and latest Cluster .tsv file kkonganti@17: class NCBIPathogensHTMLParser(HTMLParser): kkonganti@17: def __init__(self, *, convert_charrefs: bool = ...) -> None: kkonganti@17: super().__init__(convert_charrefs=convert_charrefs) kkonganti@17: self.reset() kkonganti@17: self.href_data = list() kkonganti@17: kkonganti@17: def handle_data(self, data): kkonganti@17: self.href_data.append(data) kkonganti@17: kkonganti@17: kkonganti@17: def dl_pdg(**kwargs) -> None: kkonganti@17: """ kkonganti@17: Method to save the PDG metadata file and kkonganti@17: return the latest PDG release. kkonganti@17: """ kkonganti@17: db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()] kkonganti@17: kkonganti@17: contxt = ssl.create_default_context() kkonganti@17: contxt.check_hostname = False kkonganti@17: contxt.verify_mode = ssl.CERT_NONE kkonganti@17: kkonganti@17: if (db_path or url) == None: kkonganti@17: logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") kkonganti@17: exit(1) kkonganti@17: kkonganti@17: if re.match(r"^PDG\d+\.\d+$", release): kkonganti@17: url = re.sub("latest_snps", release.strip(), url) kkonganti@17: kkonganti@17: html_parser = NCBIPathogensHTMLParser() kkonganti@17: logging.info(f"Finding latest NCBI PDG release at:\n{url}") kkonganti@17: kkonganti@17: with urlopen(url, context=contxt) as response: kkonganti@17: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@17: shutil.copyfileobj(response, tmp_html_file) kkonganti@17: kkonganti@17: with open(tmp_html_file.name, "r") as html: kkonganti@17: html_parser.feed("".join(html.readlines())) kkonganti@17: kkonganti@17: pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0) kkonganti@17: pdg_release = pdg_filename.rstrip(suffix) kkonganti@17: pdg_metadata_url = "/".join([url, pdg_filename]) kkonganti@17: pdg_release = pdg_filename.rstrip(suffix) kkonganti@17: dest_dir = os.path.join(db_path, pdg_release) kkonganti@17: kkonganti@17: logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}") kkonganti@17: kkonganti@17: if ( kkonganti@17: not overwrite kkonganti@17: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@17: and os.path.exists(dest_dir) kkonganti@17: ): kkonganti@17: logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.") kkonganti@17: exit(1) kkonganti@17: elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename): kkonganti@17: shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None kkonganti@17: os.makedirs(dest_dir) kkonganti@17: elif ( kkonganti@17: not overwrite kkonganti@17: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@17: and not os.path.exists(dest_dir) kkonganti@17: ): kkonganti@17: os.makedirs(dest_dir) kkonganti@17: kkonganti@17: tsv_at = os.path.join(dest_dir, pdg_filename) kkonganti@17: logging.info(f"Saving to:\n{tsv_at}") kkonganti@17: kkonganti@17: with urlopen(pdg_metadata_url, context=contxt) as response: kkonganti@17: with open(tsv_at, "w") as tsv: kkonganti@17: tsv.writelines(response.read().decode("utf-8")) kkonganti@17: kkonganti@17: html.close() kkonganti@17: tmp_html_file.close() kkonganti@17: os.unlink(tmp_html_file.name) kkonganti@17: tsv.close() kkonganti@17: response.close() kkonganti@17: kkonganti@17: return tsv_at, dest_dir kkonganti@17: kkonganti@17: kkonganti@17: def main() -> None: kkonganti@17: """ kkonganti@17: This script is part of the `bettercallsal_db` Nextflow workflow and is only kkonganti@17: tested on POSIX sytems. kkonganti@17: It: kkonganti@17: 1. Downloads the latest NCBI Pathogens Release metadata file, which kkonganti@17: looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster kkonganti@17: information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv kkonganti@17: 2. Generates a new metadata file with only required information such as kkonganti@17: computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA kkonganti@17: URL. kkonganti@17: """ kkonganti@17: kkonganti@17: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@17: kkonganti@17: parser = argparse.ArgumentParser( kkonganti@17: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@17: ) kkonganti@17: kkonganti@17: # required = parser.add_argument_group("required arguments") kkonganti@17: kkonganti@17: parser.add_argument( kkonganti@17: "-db", kkonganti@17: dest="db_path", kkonganti@17: default=os.getcwd(), kkonganti@17: required=False, kkonganti@17: help="Absolute UNIX path to a path where all results files are\nstored.", kkonganti@17: ) kkonganti@17: parser.add_argument( kkonganti@17: "-f", kkonganti@17: dest="overwrite_db", kkonganti@17: default=False, kkonganti@17: required=False, kkonganti@17: action="store_true", kkonganti@17: help="Force overwrite a PDG release directory at DB path\nmentioned with -db.", kkonganti@17: ) kkonganti@17: parser.add_argument( kkonganti@17: "-org", kkonganti@17: dest="organism", kkonganti@17: default="Salmonella", kkonganti@17: required=False, kkonganti@17: help="The organism to create the DB flat files\nfor.", kkonganti@17: ) kkonganti@17: parser.add_argument( kkonganti@17: "-rel", kkonganti@17: dest="release", kkonganti@17: default=False, kkonganti@17: required=False, kkonganti@17: help="If you get a 404 error, try mentioning the actual release identifier.\n" kkonganti@17: + "Ex: For Salmonella, you can get the release identifier by going to:\n" kkonganti@17: + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Salmonella\n" kkonganti@17: + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n" kkonganti@17: + " would use this command-line option as:\n -rel PDG000000002.2507", kkonganti@17: ) kkonganti@17: kkonganti@17: args = parser.parse_args() kkonganti@17: db_path = args.db_path kkonganti@17: org = args.organism kkonganti@17: overwrite = args.overwrite_db kkonganti@17: release = args.release kkonganti@17: ncbi_pathogens_loc = "/".join( kkonganti@17: ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"] kkonganti@17: ) kkonganti@17: kkonganti@17: if not db_path: kkonganti@17: db_path = os.getcwd() kkonganti@17: kkonganti@17: # Save metadata kkonganti@17: file, dest_dir = dl_pdg( kkonganti@17: db_path=db_path, kkonganti@17: url="/".join([ncbi_pathogens_loc, "Metadata"]), kkonganti@17: regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"), kkonganti@17: suffix=".metadata.tsv", kkonganti@17: overwrite=overwrite, kkonganti@17: release=release, kkonganti@17: ) kkonganti@17: kkonganti@17: # Save cluster to target mapping kkonganti@17: dl_pdg( kkonganti@17: db_path=db_path, kkonganti@17: url="/".join([ncbi_pathogens_loc, "Clusters"]), kkonganti@17: regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"), kkonganti@17: suffix="reference_target\.cluster_list\.tsv", kkonganti@17: overwrite=overwrite, kkonganti@17: release=release, kkonganti@17: ) kkonganti@17: kkonganti@17: # Create accs.txt for dataformat to fetch required ACC fields kkonganti@17: accs_file = os.path.join(dest_dir, "accs_all.txt") kkonganti@17: with open(file, "r") as pdg_metadata_fh: kkonganti@17: with open(accs_file, "w") as accs_fh: kkonganti@17: for line in pdg_metadata_fh.readlines(): kkonganti@17: if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]: kkonganti@17: continue kkonganti@17: cols = line.strip().split("\t") kkonganti@17: asm_acc = cols[9] kkonganti@17: accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None kkonganti@17: accs_fh.close() kkonganti@17: pdg_metadata_fh.close() kkonganti@17: kkonganti@17: logging.info("Finished writing accessions for dataformat tool.") kkonganti@17: kkonganti@17: kkonganti@17: if __name__ == "__main__": kkonganti@17: main()