kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: # Kranti Konganti kkonganti@0: kkonganti@0: import argparse kkonganti@0: import inspect kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import re kkonganti@0: import shutil kkonganti@0: import tempfile kkonganti@0: from html.parser import HTMLParser kkonganti@0: from urllib.request import urlopen kkonganti@0: kkonganti@0: # Set logging format. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: # HTMLParser override class to get PDG release and latest Cluster .tsv file kkonganti@0: class NCBIPathogensHTMLParser(HTMLParser): kkonganti@0: def __init__(self, *, convert_charrefs: bool = ...) -> None: kkonganti@0: super().__init__(convert_charrefs=convert_charrefs) kkonganti@0: self.reset() kkonganti@0: self.href_data = list() kkonganti@0: kkonganti@0: def handle_data(self, data): kkonganti@0: self.href_data.append(data) kkonganti@0: kkonganti@0: kkonganti@0: def dl_pdg(**kwargs) -> None: kkonganti@0: """ kkonganti@0: Method to save the PDG metadata file and kkonganti@0: return the latest PDG release. kkonganti@0: """ kkonganti@0: db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()] kkonganti@0: kkonganti@0: if (db_path or url) == None: kkonganti@0: logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") kkonganti@0: exit(1) kkonganti@0: kkonganti@0: if re.match(r"^PDG\d+\.\d+$", release): kkonganti@0: url = re.sub("latest_snps", release.strip(), url) kkonganti@0: kkonganti@0: html_parser = NCBIPathogensHTMLParser() kkonganti@0: logging.info(f"Finding latest NCBI PDG release at:\n{url}") kkonganti@0: kkonganti@0: with urlopen(url) as response: kkonganti@0: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@0: shutil.copyfileobj(response, tmp_html_file) kkonganti@0: kkonganti@0: with open(tmp_html_file.name, "r") as html: kkonganti@0: html_parser.feed("".join(html.readlines())) kkonganti@0: kkonganti@0: pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0) kkonganti@0: pdg_release = pdg_filename.rstrip(suffix) kkonganti@0: pdg_metadata_url = "/".join([url, pdg_filename]) kkonganti@0: pdg_release = pdg_filename.rstrip(suffix) kkonganti@0: dest_dir = os.path.join(db_path, pdg_release) kkonganti@0: kkonganti@0: logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}") kkonganti@0: kkonganti@0: if ( kkonganti@0: not overwrite kkonganti@0: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@0: and os.path.exists(dest_dir) kkonganti@0: ): kkonganti@0: logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.") kkonganti@0: exit(1) kkonganti@0: elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename): kkonganti@0: shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None kkonganti@0: os.makedirs(dest_dir) kkonganti@0: elif ( kkonganti@0: not overwrite kkonganti@0: and re.match(r".+?\.metadata\.tsv$", pdg_filename) kkonganti@0: and not os.path.exists(dest_dir) kkonganti@0: ): kkonganti@0: os.makedirs(dest_dir) kkonganti@0: kkonganti@0: tsv_at = os.path.join(dest_dir, pdg_filename) kkonganti@0: logging.info(f"Saving to:\n{tsv_at}") kkonganti@0: kkonganti@0: with urlopen(pdg_metadata_url) as response: kkonganti@0: with open(tsv_at, "w") as tsv: kkonganti@0: tsv.writelines(response.read().decode("utf-8")) kkonganti@0: kkonganti@0: html.close() kkonganti@0: tmp_html_file.close() kkonganti@0: os.unlink(tmp_html_file.name) kkonganti@0: tsv.close() kkonganti@0: response.close() kkonganti@0: kkonganti@0: return tsv_at, dest_dir kkonganti@0: kkonganti@0: kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script is part of the `cronology_db` Nextflow workflow and is only kkonganti@0: tested on POSIX sytems. kkonganti@0: It: kkonganti@0: 1. Downloads the latest NCBI Pathogens Release metadata file, which kkonganti@0: looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster kkonganti@0: information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv kkonganti@0: 2. Generates a new metadata file with only required information such as kkonganti@0: computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA kkonganti@0: URL. kkonganti@0: """ kkonganti@0: kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: parser.add_argument( kkonganti@0: "-db", kkonganti@0: dest="db_path", kkonganti@0: default=os.getcwd(), kkonganti@0: required=False, kkonganti@0: help="Absolute UNIX path to a path where all results files are\nstored.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-f", kkonganti@0: dest="overwrite_db", kkonganti@0: default=False, kkonganti@0: required=False, kkonganti@0: action="store_true", kkonganti@0: help="Force overwrite a PDG release directory at DB path\nmentioned with -db.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-org", kkonganti@0: dest="organism", kkonganti@0: default="Cronobacter", kkonganti@0: required=False, kkonganti@0: help="The organism to create the DB flat files\nfor.", kkonganti@0: ) kkonganti@0: required.add_argument( kkonganti@0: "-rel", kkonganti@0: dest="release", kkonganti@0: default=False, kkonganti@0: required=False, kkonganti@0: help="If you get a 404 error, try mentioning the actual release identifier.\n" kkonganti@0: + "Ex: For Cronobacter, you can get the release identifier by going to:\n" kkonganti@0: + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Cronobacter\n" kkonganti@0: + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n" kkonganti@0: + " would use this command-line option as:\n -rel PDG000000002.2507", kkonganti@0: ) kkonganti@0: kkonganti@0: args = parser.parse_args() kkonganti@0: db_path = args.db_path kkonganti@0: org = args.organism kkonganti@0: overwrite = args.overwrite_db kkonganti@0: release = args.release kkonganti@0: ncbi_pathogens_loc = "/".join( kkonganti@0: ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"] kkonganti@0: ) kkonganti@0: kkonganti@0: if not db_path: kkonganti@0: db_path = os.getcwd() kkonganti@0: kkonganti@0: # Save metadata kkonganti@0: file, dest_dir = dl_pdg( kkonganti@0: db_path=db_path, kkonganti@0: url="/".join([ncbi_pathogens_loc, "Metadata"]), kkonganti@0: regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"), kkonganti@0: suffix=".metadata.tsv", kkonganti@0: overwrite=overwrite, kkonganti@0: release=release, kkonganti@0: ) kkonganti@0: kkonganti@0: # Save cluster to target mapping kkonganti@0: dl_pdg( kkonganti@0: db_path=db_path, kkonganti@0: url="/".join([ncbi_pathogens_loc, "Clusters"]), kkonganti@0: regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"), kkonganti@0: suffix="reference_target\.cluster_list\.tsv", kkonganti@0: overwrite=overwrite, kkonganti@0: release=release, kkonganti@0: ) kkonganti@0: kkonganti@0: # Create accs.txt for dataformat to fetch required ACC fields kkonganti@0: accs_file = os.path.join(dest_dir, "accs_all.txt") kkonganti@0: with open(file, "r") as pdg_metadata_fh: kkonganti@0: with open(accs_file, "w") as accs_fh: kkonganti@0: for line in pdg_metadata_fh.readlines(): kkonganti@0: if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]: kkonganti@0: continue kkonganti@0: cols = line.strip().split("\t") kkonganti@0: asm_acc = cols[9] kkonganti@0: accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None kkonganti@0: accs_fh.close() kkonganti@0: pdg_metadata_fh.close() kkonganti@0: kkonganti@0: logging.info("Finished writing accessions for dataformat tool.") kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main()