kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: # Kranti Konganti kkonganti@0: kkonganti@0: import argparse kkonganti@0: import base64 kkonganti@0: import gzip kkonganti@0: import inspect kkonganti@0: import json kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import pprint kkonganti@0: import re kkonganti@0: from collections import defaultdict kkonganti@0: kkonganti@0: import requests kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: # Main kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script takes as input an assembly .fasta format (gzipped or ungzipped) kkonganti@0: and posts to PubMLST to get the species taxonomy. kkonganti@0: """ kkonganti@0: kkonganti@0: # Set logging. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: # Debug print. kkonganti@0: ppp = pprint.PrettyPrinter(width=55) kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-fasta", kkonganti@0: dest="fasta", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to file no. 1 containing\nnon white space lines.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-prefix", kkonganti@0: dest="prefix", kkonganti@0: default="response", kkonganti@0: required=False, kkonganti@0: help="The prefix of the file name that will be created in\nthe current working directory.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-fkey", kkonganti@0: dest="fkey", kkonganti@0: default="fields", kkonganti@0: required=False, kkonganti@0: help="The key name in the JSON response that contains ST results.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-tkey", kkonganti@0: dest="tkey", kkonganti@0: default="taxon_prediction", kkonganti@0: required=False, kkonganti@0: help="The key name in the JSON response that contains a list of\ntaxonomy predictions.", kkonganti@0: ) kkonganti@0: kkonganti@0: # Define defaults kkonganti@0: kkonganti@0: args = parser.parse_args() kkonganti@0: fasta = args.fasta kkonganti@0: fkey = args.fkey kkonganti@0: tkey = args.tkey kkonganti@0: outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv") kkonganti@0: logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json") kkonganti@0: field_keys = ["rST", "other_designation"] kkonganti@0: tax_pred_keys = ["rank", "support", "taxon", "taxonomy"] kkonganti@0: uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence" kkonganti@0: # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence" kkonganti@0: payload = '{"base64":true, "details":true, "sequence":"' kkonganti@0: sample_name = str(args.prefix) kkonganti@0: out = defaultdict(defaultdict) kkonganti@0: kkonganti@0: # Basic checks kkonganti@0: kkonganti@0: if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0): kkonganti@0: logging.error(f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty.") kkonganti@0: exit(1) kkonganti@0: kkonganti@0: try: kkonganti@0: with gzip.open(fasta, "rb") as fasta_fh: kkonganti@0: seqs = fasta_fh.read() kkonganti@0: except gzip.BadGzipFile: kkonganti@0: with open(fasta, "r") as fasta_fh: kkonganti@0: seqs = fasta_fh.read() kkonganti@0: payload += base64.b64encode(str(seqs).encode()).decode() + '"}' kkonganti@0: response = requests.post(uri, data=payload) kkonganti@0: kkonganti@0: if response.status_code == requests.codes.ok: kkonganti@0: res = response.json() kkonganti@0: json.dump(res, open(logfile, "w"), indent=4, sort_keys=True) kkonganti@0: kkonganti@0: try: kkonganti@0: for count, prediction in enumerate(res[tkey]): kkonganti@0: out.setdefault(tkey, {}).setdefault(count, {}) kkonganti@0: for key in tax_pred_keys: kkonganti@0: out[tkey][count].setdefault(key, prediction[key]) kkonganti@0: except (KeyError, AttributeError, TypeError) as e: kkonganti@0: logging.error( kkonganti@0: "Did not get taxonomy prediction from JSON response. Highly unusual?\n" kkonganti@0: + f"KeyError or AttributeError or TypeError:\n{e}" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: try: kkonganti@0: for key in field_keys: kkonganti@0: out.setdefault(key, res[fkey][key]) kkonganti@0: except (KeyError, AttributeError, TypeError) as e: kkonganti@0: for key in field_keys: kkonganti@0: out.setdefault(key, "-") kkonganti@0: logging.info( kkonganti@0: "Did not get rST or other_designation from JSON response. Will skip.\n" kkonganti@0: + f"KeyError or AttributeError or TypeError:\n{e}" kkonganti@0: ) kkonganti@0: kkonganti@0: try: kkonganti@0: with open(outfile, "w") as out_fh: kkonganti@0: # Header kkonganti@0: out_fh.writelines( kkonganti@0: "\t".join( kkonganti@0: ["Sample"] kkonganti@0: + [k for k, _ in out.items() if out[k] and k != tkey] kkonganti@0: + [k for k in out[tkey][0].keys() if out[tkey][0][k]] kkonganti@0: ) kkonganti@0: ) kkonganti@0: for count in out[tkey].keys(): kkonganti@0: out_fh.writelines( kkonganti@0: "\n" kkonganti@0: + "\t".join( kkonganti@0: [sample_name] kkonganti@0: + [v for k, v in out.items() if out[k] and k != tkey] kkonganti@0: + [ kkonganti@0: str(re.sub(r"\s*\>\s*", ";", str(v))) kkonganti@0: for k, v in out[tkey][count].items() kkonganti@0: if out[tkey][count][k] kkonganti@0: ], kkonganti@0: ) kkonganti@0: + "\n" kkonganti@0: ) kkonganti@0: out_fh.close() kkonganti@0: except (KeyError, AttributeError, TypeError) as e: kkonganti@0: logging.error(f"Unable to write final results.\nException: {e}") kkonganti@0: exit(1) kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main()