Mercurial > repos > kkonganti > cfsan_cronology
diff 0.2.0/bin/rmlst_post.py @ 11:a5f31c44f8c9
planemo upload
author | kkonganti |
---|---|
date | Mon, 15 Jul 2024 16:11:44 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.2.0/bin/rmlst_post.py Mon Jul 15 16:11:44 2024 -0400 @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 + +# Kranti Konganti + +import argparse +import base64 +import gzip +import inspect +import json +import logging +import os +import pprint +import re +from collections import defaultdict + +import requests + + +# Multiple inheritence for pretty printing of help text. +class MultiArgFormatClasses( + argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter +): + pass + + +# Main +def main() -> None: + """ + This script takes as input an assembly .fasta format (gzipped or ungzipped) + and posts to PubMLST to get the species taxonomy. + """ + + # Set logging. + logging.basicConfig( + format="\n" + + "=" * 55 + + "\n%(asctime)s - %(levelname)s\n" + + "=" * 55 + + "\n%(message)s\n\n", + level=logging.DEBUG, + ) + + # Debug print. + ppp = pprint.PrettyPrinter(width=55) + prog_name = os.path.basename(inspect.stack()[0].filename) + + parser = argparse.ArgumentParser( + prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses + ) + + required = parser.add_argument_group("required arguments") + + required.add_argument( + "-fasta", + dest="fasta", + default=False, + required=True, + help="Absolute UNIX path to file no. 1 containing\nnon white space lines.", + ) + parser.add_argument( + "-prefix", + dest="prefix", + default="response", + required=False, + help="The prefix of the file name that will be created in\nthe current working directory.", + ) + parser.add_argument( + "-fkey", + dest="fkey", + default="fields", + required=False, + help="The key name in the JSON response that contains ST results.", + ) + parser.add_argument( + "-tkey", + dest="tkey", + default="taxon_prediction", + required=False, + help="The key name in the JSON response that contains a list of\ntaxonomy predictions.", + ) + + # Define defaults + + args = parser.parse_args() + fasta = args.fasta + fkey = args.fkey + tkey = args.tkey + outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv") + logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json") + field_keys = ["rST", "other_designation"] + tax_pred_keys = ["rank", "support", "taxon", "taxonomy"] + uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence" + # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence" + payload = '{"base64":true, "details":true, "sequence":"' + sample_name = str(args.prefix) + out = defaultdict(defaultdict) + + # Basic checks + + if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0): + logging.error( + f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty." + ) + exit(1) + + try: + with gzip.open(fasta, "rb") as fasta_fh: + seqs = fasta_fh.read() + except gzip.BadGzipFile: + with open(fasta, "r") as fasta_fh: + seqs = fasta_fh.read() + payload += base64.b64encode(str(seqs).encode()).decode() + '"}' + response = requests.post(uri, data=payload) + + if response.status_code == requests.codes.ok: + res = response.json() + json.dump(res, open(logfile, "w"), indent=4, sort_keys=True) + + try: + for count, prediction in enumerate(res[tkey]): + out.setdefault(tkey, {}).setdefault(count, {}) + for key in tax_pred_keys: + out[tkey][count].setdefault(key, prediction[key]) + except (KeyError, AttributeError, TypeError) as e: + logging.warning( + "Did not get taxonomy prediction from JSON response. Probably no match?\n" + + f"KeyError or AttributeError or TypeError:\n{e}" + ) + exit(0) + + try: + for key in field_keys: + out.setdefault(key, res[fkey][key]) + except (KeyError, AttributeError, TypeError) as e: + for key in field_keys: + out.setdefault(key, "-") + logging.info( + "Did not get rST or other_designation from JSON response. Will skip.\n" + + f"KeyError or AttributeError or TypeError:\n{e}" + ) + + try: + with open(outfile, "w") as out_fh: + # Header + out_fh.writelines( + "\t".join( + ["Sample"] + + [k for k, _ in out.items() if out[k] and k != tkey] + + [k for k in out[tkey][0].keys() if out[tkey][0][k]] + ) + ) + for count in out[tkey].keys(): + out_fh.writelines( + "\n" + + "\t".join( + [sample_name] + + [v for k, v in out.items() if out[k] and k != tkey] + + [ + str(re.sub(r"\s*\>\s*", ";", str(v))) + for k, v in out[tkey][count].items() + if out[tkey][count][k] + ], + ) + + "\n" + ) + out_fh.close() + except (KeyError, AttributeError, TypeError) as e: + logging.error(f"Unable to write final results.\nException: {e}") + exit(1) + + +if __name__ == "__main__": + main()