kkonganti@11: #!/usr/bin/env python3 kkonganti@11: kkonganti@11: # Kranti Konganti kkonganti@11: kkonganti@11: import argparse kkonganti@11: import inspect kkonganti@11: import json kkonganti@11: import logging kkonganti@11: import os kkonganti@11: import shutil kkonganti@11: import tempfile kkonganti@11: from urllib.parse import urlparse kkonganti@11: from urllib.request import urlopen kkonganti@11: kkonganti@11: # Set logging format. kkonganti@11: logging.basicConfig( kkonganti@11: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", kkonganti@11: level=logging.DEBUG, kkonganti@11: ) kkonganti@11: kkonganti@11: kkonganti@11: # Multiple inheritence for pretty printing of help text. kkonganti@11: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@11: pass kkonganti@11: kkonganti@11: kkonganti@11: def dl_pubmlst(**kwargs) -> None: kkonganti@11: """ kkonganti@11: Method to save the Raw Data from a URL. kkonganti@11: """ kkonganti@11: outdir, url, suffix, parent, filename, expectjson = [kwargs[k] for k in kwargs.keys()] kkonganti@11: kkonganti@11: if (outdir or url) == None: kkonganti@11: logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") kkonganti@11: exit(1) kkonganti@11: kkonganti@11: logging.info(f"Downloading... Please wait...\n{url}") kkonganti@11: kkonganti@11: with urlopen(url) as response: kkonganti@11: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@11: shutil.copyfileobj(response, tmp_html_file) kkonganti@11: kkonganti@11: if expectjson: kkonganti@11: try: kkonganti@11: jsonresponse = json.load(open(tmp_html_file.name, "r")) kkonganti@11: except json.JSONDecodeError: kkonganti@11: logging.error(f"The response from\n{url}\nwas not valid JSON!") kkonganti@11: exit(1) kkonganti@11: kkonganti@11: logging.info(f"Got a valid JSON response from:\n{url}") kkonganti@11: return jsonresponse kkonganti@11: kkonganti@11: if not parent: kkonganti@11: if not filename: kkonganti@11: save_to = os.path.join(outdir, os.path.basename(urlparse(url).path) + suffix) kkonganti@11: else: kkonganti@11: save_to = os.path.join(outdir, filename + suffix) kkonganti@11: kkonganti@11: logging.info(f"Saving to:\n{os.path.basename(save_to)}") kkonganti@11: kkonganti@11: with urlopen(url) as url_response: kkonganti@11: with open(save_to, "w") as fout: kkonganti@11: fout.writelines(url_response.read().decode("utf-8")) kkonganti@11: kkonganti@11: fout.close() kkonganti@11: url_response.close() kkonganti@11: kkonganti@11: kkonganti@11: def main() -> None: kkonganti@11: """ kkonganti@11: This script is part of the `cronology_db` Nextflow workflow and is only kkonganti@11: tested on POSIX sytems. kkonganti@11: It: kkonganti@11: 1. Downloads the MLST scheme in JSON format from PubMLST. kkonganti@11: and then, kkonganti@11: 2. Downloads the alleles' FASTA and profile table kkonganti@11: suitable to run MLST analysis. kkonganti@11: """ kkonganti@11: kkonganti@11: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@11: kkonganti@11: parser = argparse.ArgumentParser( kkonganti@11: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@11: ) kkonganti@11: kkonganti@11: required = parser.add_argument_group("required arguments") kkonganti@11: kkonganti@11: required.add_argument( kkonganti@11: "-org", kkonganti@11: dest="organism", kkonganti@11: required=True, kkonganti@11: help="The organism name to download the MLST alleles'\nFASTA and profile CSV for." kkonganti@11: + "\nEx: -org cronobacter", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-f", kkonganti@11: dest="overwrite", kkonganti@11: default=False, kkonganti@11: required=False, kkonganti@11: action="store_true", kkonganti@11: help="Force overwrite the results directory\nmentioned with -out.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-out", kkonganti@11: dest="outdir", kkonganti@11: default=os.getcwd(), kkonganti@11: required=False, kkonganti@11: help="The absolute UNIX path to store the MLST alleles'\nFASTA and profile CSV.\n", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-mlsts", kkonganti@11: dest="schemes", kkonganti@11: default="schemes/1", kkonganti@11: required=False, kkonganti@11: help="The MLST scheme ID to download.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-profile", kkonganti@11: dest="profile", kkonganti@11: default="profiles_csv", kkonganti@11: required=False, kkonganti@11: help="The MLST profile name in the scheme.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-loci", kkonganti@11: dest="loci", kkonganti@11: default="loci", kkonganti@11: required=False, kkonganti@11: help="The key name in the JSON response which lists the\nallele URLs to download.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-suffix", kkonganti@11: dest="asuffix", kkonganti@11: default=".tfa", kkonganti@11: required=False, kkonganti@11: help="What should be the suffix of the downloaded allele\nFASTA.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-akey", kkonganti@11: dest="allele_fa_key", kkonganti@11: default="alleles_fasta", kkonganti@11: required=False, kkonganti@11: help="What is the key in the JSON response that contains\nthe URL for allele FASTA.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-id", kkonganti@11: dest="id_key", kkonganti@11: default="id", kkonganti@11: required=False, kkonganti@11: help="What is the key in the JSON response that contains\nthe name of the allele FASTA.", kkonganti@11: ) kkonganti@11: kkonganti@11: args = parser.parse_args() kkonganti@11: org = args.organism kkonganti@11: outdir = os.path.join(args.outdir, org) kkonganti@11: overwrite = args.overwrite kkonganti@11: pubmlst_loc = "_".join(["https://rest.pubmlst.org/db/pubmlst", org, "seqdef"]) kkonganti@11: schemes = args.schemes kkonganti@11: profile = args.profile kkonganti@11: loci = args.loci kkonganti@11: suffix = args.asuffix kkonganti@11: allele_fa_key = args.allele_fa_key kkonganti@11: id_key = args.id_key kkonganti@11: kkonganti@11: if not overwrite and os.path.exists(outdir): kkonganti@11: logging.error( kkonganti@11: f"Output directory\n{os.path.basename(outdir)}\nexists. Please use -f to overwrite." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: elif overwrite and os.path.exists(outdir): kkonganti@11: shutil.rmtree(outdir, ignore_errors=True) kkonganti@11: kkonganti@11: # Create required output directory. kkonganti@11: os.makedirs(outdir) kkonganti@11: kkonganti@11: # Query MLST scheme for an organism. kkonganti@11: pubmlst_json = dl_pubmlst( kkonganti@11: path=outdir, kkonganti@11: url="/".join([pubmlst_loc, schemes]), kkonganti@11: suffix=suffix, kkonganti@11: parent=True, kkonganti@11: filename=False, kkonganti@11: expectjson=True, kkonganti@11: ) kkonganti@11: kkonganti@11: # Save profile_csv as organism.txt. kkonganti@11: if profile in pubmlst_json.keys(): kkonganti@11: dl_pubmlst( kkonganti@11: path=outdir, kkonganti@11: url=pubmlst_json[profile], kkonganti@11: suffix=".txt", kkonganti@11: parent=False, kkonganti@11: filename=org, kkonganti@11: expectjson=False, kkonganti@11: ) kkonganti@11: kkonganti@11: # Save MLST alleles' FASTA kkonganti@11: if loci in pubmlst_json.keys(): kkonganti@11: for allele in pubmlst_json[loci]: kkonganti@11: allele_fa_json = dl_pubmlst( kkonganti@11: path=outdir, kkonganti@11: url=allele, kkonganti@11: suffix=suffix, kkonganti@11: parent=True, kkonganti@11: filename=False, kkonganti@11: expectJson=True, kkonganti@11: ) kkonganti@11: kkonganti@11: dl_pubmlst( kkonganti@11: path=outdir, kkonganti@11: url=allele_fa_json[allele_fa_key], kkonganti@11: suffix=suffix, kkonganti@11: parent=False, kkonganti@11: filename=allele_fa_json[id_key], kkonganti@11: expectJson=False, kkonganti@11: ) kkonganti@11: kkonganti@11: logging.info(f"Finished downloading MLST scheme and profile for {org}.") kkonganti@11: kkonganti@11: kkonganti@11: if __name__ == "__main__": kkonganti@11: main()