kkonganti@11: #!/usr/bin/env python3 kkonganti@11: kkonganti@11: # Kranti Konganti kkonganti@11: kkonganti@11: import argparse kkonganti@11: import base64 kkonganti@11: import gzip kkonganti@11: import inspect kkonganti@11: import json kkonganti@11: import logging kkonganti@11: import os kkonganti@11: import pprint kkonganti@11: import re kkonganti@11: from collections import defaultdict kkonganti@11: kkonganti@11: import requests kkonganti@11: kkonganti@11: kkonganti@11: # Multiple inheritence for pretty printing of help text. kkonganti@11: class MultiArgFormatClasses( kkonganti@11: argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter kkonganti@11: ): kkonganti@11: pass kkonganti@11: kkonganti@11: kkonganti@11: # Main kkonganti@11: def main() -> None: kkonganti@11: """ kkonganti@11: This script takes as input an assembly .fasta format (gzipped or ungzipped) kkonganti@11: and posts to PubMLST to get the species taxonomy. kkonganti@11: """ kkonganti@11: kkonganti@11: # Set logging. kkonganti@11: logging.basicConfig( kkonganti@11: format="\n" kkonganti@11: + "=" * 55 kkonganti@11: + "\n%(asctime)s - %(levelname)s\n" kkonganti@11: + "=" * 55 kkonganti@11: + "\n%(message)s\n\n", kkonganti@11: level=logging.DEBUG, kkonganti@11: ) kkonganti@11: kkonganti@11: # Debug print. kkonganti@11: ppp = pprint.PrettyPrinter(width=55) kkonganti@11: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@11: kkonganti@11: parser = argparse.ArgumentParser( kkonganti@11: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@11: ) kkonganti@11: kkonganti@11: required = parser.add_argument_group("required arguments") kkonganti@11: kkonganti@11: required.add_argument( kkonganti@11: "-fasta", kkonganti@11: dest="fasta", kkonganti@11: default=False, kkonganti@11: required=True, kkonganti@11: help="Absolute UNIX path to file no. 1 containing\nnon white space lines.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-prefix", kkonganti@11: dest="prefix", kkonganti@11: default="response", kkonganti@11: required=False, kkonganti@11: help="The prefix of the file name that will be created in\nthe current working directory.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-fkey", kkonganti@11: dest="fkey", kkonganti@11: default="fields", kkonganti@11: required=False, kkonganti@11: help="The key name in the JSON response that contains ST results.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-tkey", kkonganti@11: dest="tkey", kkonganti@11: default="taxon_prediction", kkonganti@11: required=False, kkonganti@11: help="The key name in the JSON response that contains a list of\ntaxonomy predictions.", kkonganti@11: ) kkonganti@11: kkonganti@11: # Define defaults kkonganti@11: kkonganti@11: args = parser.parse_args() kkonganti@11: fasta = args.fasta kkonganti@11: fkey = args.fkey kkonganti@11: tkey = args.tkey kkonganti@11: outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv") kkonganti@11: logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json") kkonganti@11: field_keys = ["rST", "other_designation"] kkonganti@11: tax_pred_keys = ["rank", "support", "taxon", "taxonomy"] kkonganti@11: uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence" kkonganti@11: # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence" kkonganti@11: payload = '{"base64":true, "details":true, "sequence":"' kkonganti@11: sample_name = str(args.prefix) kkonganti@11: out = defaultdict(defaultdict) kkonganti@11: kkonganti@11: # Basic checks kkonganti@11: kkonganti@11: if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0): kkonganti@11: logging.error( kkonganti@11: f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: try: kkonganti@11: with gzip.open(fasta, "rb") as fasta_fh: kkonganti@11: seqs = fasta_fh.read() kkonganti@11: except gzip.BadGzipFile: kkonganti@11: with open(fasta, "r") as fasta_fh: kkonganti@11: seqs = fasta_fh.read() kkonganti@11: payload += base64.b64encode(str(seqs).encode()).decode() + '"}' kkonganti@11: response = requests.post(uri, data=payload) kkonganti@11: kkonganti@11: if response.status_code == requests.codes.ok: kkonganti@11: res = response.json() kkonganti@11: json.dump(res, open(logfile, "w"), indent=4, sort_keys=True) kkonganti@11: kkonganti@11: try: kkonganti@11: for count, prediction in enumerate(res[tkey]): kkonganti@11: out.setdefault(tkey, {}).setdefault(count, {}) kkonganti@11: for key in tax_pred_keys: kkonganti@11: out[tkey][count].setdefault(key, prediction[key]) kkonganti@11: except (KeyError, AttributeError, TypeError) as e: kkonganti@11: logging.warning( kkonganti@11: "Did not get taxonomy prediction from JSON response. Probably no match?\n" kkonganti@11: + f"KeyError or AttributeError or TypeError:\n{e}" kkonganti@11: ) kkonganti@11: exit(0) kkonganti@11: kkonganti@11: try: kkonganti@11: for key in field_keys: kkonganti@11: out.setdefault(key, res[fkey][key]) kkonganti@11: except (KeyError, AttributeError, TypeError) as e: kkonganti@11: for key in field_keys: kkonganti@11: out.setdefault(key, "-") kkonganti@11: logging.info( kkonganti@11: "Did not get rST or other_designation from JSON response. Will skip.\n" kkonganti@11: + f"KeyError or AttributeError or TypeError:\n{e}" kkonganti@11: ) kkonganti@11: kkonganti@11: try: kkonganti@11: with open(outfile, "w") as out_fh: kkonganti@11: # Header kkonganti@11: out_fh.writelines( kkonganti@11: "\t".join( kkonganti@11: ["Sample"] kkonganti@11: + [k for k, _ in out.items() if out[k] and k != tkey] kkonganti@11: + [k for k in out[tkey][0].keys() if out[tkey][0][k]] kkonganti@11: ) kkonganti@11: ) kkonganti@11: for count in out[tkey].keys(): kkonganti@11: out_fh.writelines( kkonganti@11: "\n" kkonganti@11: + "\t".join( kkonganti@11: [sample_name] kkonganti@11: + [v for k, v in out.items() if out[k] and k != tkey] kkonganti@11: + [ kkonganti@11: str(re.sub(r"\s*\>\s*", ";", str(v))) kkonganti@11: for k, v in out[tkey][count].items() kkonganti@11: if out[tkey][count][k] kkonganti@11: ], kkonganti@11: ) kkonganti@11: + "\n" kkonganti@11: ) kkonganti@11: out_fh.close() kkonganti@11: except (KeyError, AttributeError, TypeError) as e: kkonganti@11: logging.error(f"Unable to write final results.\nException: {e}") kkonganti@11: exit(1) kkonganti@11: kkonganti@11: kkonganti@11: if __name__ == "__main__": kkonganti@11: main()