kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: # Kranti Konganti kkonganti@0: kkonganti@0: import argparse kkonganti@0: import inspect kkonganti@0: import json kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import shutil kkonganti@0: import tempfile kkonganti@0: from urllib.parse import urlparse kkonganti@0: from urllib.request import urlopen kkonganti@0: kkonganti@0: # Set logging format. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: def dl_pubmlst(**kwargs) -> None: kkonganti@0: """ kkonganti@0: Method to save the Raw Data from a URL. kkonganti@0: """ kkonganti@0: outdir, url, suffix, parent, filename, expectjson = [kwargs[k] for k in kwargs.keys()] kkonganti@0: kkonganti@0: if (outdir or url) == None: kkonganti@0: logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") kkonganti@0: exit(1) kkonganti@0: kkonganti@0: logging.info(f"Downloading... Please wait...\n{url}") kkonganti@0: kkonganti@0: with urlopen(url) as response: kkonganti@0: with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: kkonganti@0: shutil.copyfileobj(response, tmp_html_file) kkonganti@0: kkonganti@0: if expectjson: kkonganti@0: try: kkonganti@0: jsonresponse = json.load(open(tmp_html_file.name, "r")) kkonganti@0: except json.JSONDecodeError: kkonganti@0: logging.error(f"The response from\n{url}\nwas not valid JSON!") kkonganti@0: exit(1) kkonganti@0: kkonganti@0: logging.info(f"Got a valid JSON response from:\n{url}") kkonganti@0: return jsonresponse kkonganti@0: kkonganti@0: if not parent: kkonganti@0: if not filename: kkonganti@0: save_to = os.path.join(outdir, os.path.basename(urlparse(url).path) + suffix) kkonganti@0: else: kkonganti@0: save_to = os.path.join(outdir, filename + suffix) kkonganti@0: kkonganti@0: logging.info(f"Saving to:\n{os.path.basename(save_to)}") kkonganti@0: kkonganti@0: with urlopen(url) as url_response: kkonganti@0: with open(save_to, "w") as fout: kkonganti@0: fout.writelines(url_response.read().decode("utf-8")) kkonganti@0: kkonganti@0: fout.close() kkonganti@0: url_response.close() kkonganti@0: kkonganti@0: kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script is part of the `cronology_db` Nextflow workflow and is only kkonganti@0: tested on POSIX sytems. kkonganti@0: It: kkonganti@0: 1. Downloads the MLST scheme in JSON format from PubMLST. kkonganti@0: and then, kkonganti@0: 2. Downloads the alleles' FASTA and profile table kkonganti@0: suitable to run MLST analysis. kkonganti@0: """ kkonganti@0: kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-org", kkonganti@0: dest="organism", kkonganti@0: required=True, kkonganti@0: help="The organism name to download the MLST alleles'\nFASTA and profile CSV for." kkonganti@0: + "\nEx: -org cronobacter", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-f", kkonganti@0: dest="overwrite", kkonganti@0: default=False, kkonganti@0: required=False, kkonganti@0: action="store_true", kkonganti@0: help="Force overwrite the results directory\nmentioned with -out.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-out", kkonganti@0: dest="outdir", kkonganti@0: default=os.getcwd(), kkonganti@0: required=False, kkonganti@0: help="The absolute UNIX path to store the MLST alleles'\nFASTA and profile CSV.\n", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-mlsts", kkonganti@0: dest="schemes", kkonganti@0: default="schemes/1", kkonganti@0: required=False, kkonganti@0: help="The MLST scheme ID to download.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-profile", kkonganti@0: dest="profile", kkonganti@0: default="profiles_csv", kkonganti@0: required=False, kkonganti@0: help="The MLST profile name in the scheme.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-loci", kkonganti@0: dest="loci", kkonganti@0: default="loci", kkonganti@0: required=False, kkonganti@0: help="The key name in the JSON response which lists the\nallele URLs to download.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-suffix", kkonganti@0: dest="asuffix", kkonganti@0: default=".tfa", kkonganti@0: required=False, kkonganti@0: help="What should be the suffix of the downloaded allele\nFASTA.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-akey", kkonganti@0: dest="allele_fa_key", kkonganti@0: default="alleles_fasta", kkonganti@0: required=False, kkonganti@0: help="What is the key in the JSON response that contains\nthe URL for allele FASTA.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-id", kkonganti@0: dest="id_key", kkonganti@0: default="id", kkonganti@0: required=False, kkonganti@0: help="What is the key in the JSON response that contains\nthe name of the allele FASTA.", kkonganti@0: ) kkonganti@0: kkonganti@0: args = parser.parse_args() kkonganti@0: org = args.organism kkonganti@0: outdir = os.path.join(args.outdir, org) kkonganti@0: overwrite = args.overwrite kkonganti@0: pubmlst_loc = "_".join(["https://rest.pubmlst.org/db/pubmlst", org, "seqdef"]) kkonganti@0: schemes = args.schemes kkonganti@0: profile = args.profile kkonganti@0: loci = args.loci kkonganti@0: suffix = args.asuffix kkonganti@0: allele_fa_key = args.allele_fa_key kkonganti@0: id_key = args.id_key kkonganti@0: kkonganti@0: if not overwrite and os.path.exists(outdir): kkonganti@0: logging.error( kkonganti@0: f"Output directory\n{os.path.basename(outdir)}\nexists. Please use -f to overwrite." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif overwrite and os.path.exists(outdir): kkonganti@0: shutil.rmtree(outdir, ignore_errors=True) kkonganti@0: kkonganti@0: # Create required output directory. kkonganti@0: os.makedirs(outdir) kkonganti@0: kkonganti@0: # Query MLST scheme for an organism. kkonganti@0: pubmlst_json = dl_pubmlst( kkonganti@0: path=outdir, kkonganti@0: url="/".join([pubmlst_loc, schemes]), kkonganti@0: suffix=suffix, kkonganti@0: parent=True, kkonganti@0: filename=False, kkonganti@0: expectjson=True, kkonganti@0: ) kkonganti@0: kkonganti@0: # Save profile_csv as organism.txt. kkonganti@0: if profile in pubmlst_json.keys(): kkonganti@0: dl_pubmlst( kkonganti@0: path=outdir, kkonganti@0: url=pubmlst_json[profile], kkonganti@0: suffix=".txt", kkonganti@0: parent=False, kkonganti@0: filename=org, kkonganti@0: expectjson=False, kkonganti@0: ) kkonganti@0: kkonganti@0: # Save MLST alleles' FASTA kkonganti@0: if loci in pubmlst_json.keys(): kkonganti@0: for allele in pubmlst_json[loci]: kkonganti@0: allele_fa_json = dl_pubmlst( kkonganti@0: path=outdir, kkonganti@0: url=allele, kkonganti@0: suffix=suffix, kkonganti@0: parent=True, kkonganti@0: filename=False, kkonganti@0: expectJson=True, kkonganti@0: ) kkonganti@0: kkonganti@0: dl_pubmlst( kkonganti@0: path=outdir, kkonganti@0: url=allele_fa_json[allele_fa_key], kkonganti@0: suffix=suffix, kkonganti@0: parent=False, kkonganti@0: filename=allele_fa_json[id_key], kkonganti@0: expectJson=False, kkonganti@0: ) kkonganti@0: kkonganti@0: logging.info(f"Finished downloading MLST scheme and profile for {org}.") kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main()