annotate 0.1.0/bin/rmlst_post.py @ 5:6e5ceea33843

"planemo upload"
author kkonganti
date Mon, 27 Nov 2023 14:50:43 -0500
parents c8597e9e1a97
children
rev   line source
kkonganti@0 1 #!/usr/bin/env python3
kkonganti@0 2
kkonganti@0 3 # Kranti Konganti
kkonganti@0 4
kkonganti@0 5 import argparse
kkonganti@0 6 import base64
kkonganti@0 7 import gzip
kkonganti@0 8 import inspect
kkonganti@0 9 import json
kkonganti@0 10 import logging
kkonganti@0 11 import os
kkonganti@0 12 import pprint
kkonganti@0 13 import re
kkonganti@0 14 from collections import defaultdict
kkonganti@0 15
kkonganti@0 16 import requests
kkonganti@0 17
kkonganti@0 18
kkonganti@0 19 # Multiple inheritence for pretty printing of help text.
kkonganti@0 20 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
kkonganti@0 21 pass
kkonganti@0 22
kkonganti@0 23
kkonganti@0 24 # Main
kkonganti@0 25 def main() -> None:
kkonganti@0 26 """
kkonganti@0 27 This script takes as input an assembly .fasta format (gzipped or ungzipped)
kkonganti@0 28 and posts to PubMLST to get the species taxonomy.
kkonganti@0 29 """
kkonganti@0 30
kkonganti@0 31 # Set logging.
kkonganti@0 32 logging.basicConfig(
kkonganti@0 33 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n",
kkonganti@0 34 level=logging.DEBUG,
kkonganti@0 35 )
kkonganti@0 36
kkonganti@0 37 # Debug print.
kkonganti@0 38 ppp = pprint.PrettyPrinter(width=55)
kkonganti@0 39 prog_name = os.path.basename(inspect.stack()[0].filename)
kkonganti@0 40
kkonganti@0 41 parser = argparse.ArgumentParser(
kkonganti@0 42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
kkonganti@0 43 )
kkonganti@0 44
kkonganti@0 45 required = parser.add_argument_group("required arguments")
kkonganti@0 46
kkonganti@0 47 required.add_argument(
kkonganti@0 48 "-fasta",
kkonganti@0 49 dest="fasta",
kkonganti@0 50 default=False,
kkonganti@0 51 required=True,
kkonganti@0 52 help="Absolute UNIX path to file no. 1 containing\nnon white space lines.",
kkonganti@0 53 )
kkonganti@0 54 parser.add_argument(
kkonganti@0 55 "-prefix",
kkonganti@0 56 dest="prefix",
kkonganti@0 57 default="response",
kkonganti@0 58 required=False,
kkonganti@0 59 help="The prefix of the file name that will be created in\nthe current working directory.",
kkonganti@0 60 )
kkonganti@0 61 parser.add_argument(
kkonganti@0 62 "-fkey",
kkonganti@0 63 dest="fkey",
kkonganti@0 64 default="fields",
kkonganti@0 65 required=False,
kkonganti@0 66 help="The key name in the JSON response that contains ST results.",
kkonganti@0 67 )
kkonganti@0 68 parser.add_argument(
kkonganti@0 69 "-tkey",
kkonganti@0 70 dest="tkey",
kkonganti@0 71 default="taxon_prediction",
kkonganti@0 72 required=False,
kkonganti@0 73 help="The key name in the JSON response that contains a list of\ntaxonomy predictions.",
kkonganti@0 74 )
kkonganti@0 75
kkonganti@0 76 # Define defaults
kkonganti@0 77
kkonganti@0 78 args = parser.parse_args()
kkonganti@0 79 fasta = args.fasta
kkonganti@0 80 fkey = args.fkey
kkonganti@0 81 tkey = args.tkey
kkonganti@0 82 outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv")
kkonganti@0 83 logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json")
kkonganti@0 84 field_keys = ["rST", "other_designation"]
kkonganti@0 85 tax_pred_keys = ["rank", "support", "taxon", "taxonomy"]
kkonganti@0 86 uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence"
kkonganti@0 87 # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence"
kkonganti@0 88 payload = '{"base64":true, "details":true, "sequence":"'
kkonganti@0 89 sample_name = str(args.prefix)
kkonganti@0 90 out = defaultdict(defaultdict)
kkonganti@0 91
kkonganti@0 92 # Basic checks
kkonganti@0 93
kkonganti@0 94 if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0):
kkonganti@0 95 logging.error(f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty.")
kkonganti@0 96 exit(1)
kkonganti@0 97
kkonganti@0 98 try:
kkonganti@0 99 with gzip.open(fasta, "rb") as fasta_fh:
kkonganti@0 100 seqs = fasta_fh.read()
kkonganti@0 101 except gzip.BadGzipFile:
kkonganti@0 102 with open(fasta, "r") as fasta_fh:
kkonganti@0 103 seqs = fasta_fh.read()
kkonganti@0 104 payload += base64.b64encode(str(seqs).encode()).decode() + '"}'
kkonganti@0 105 response = requests.post(uri, data=payload)
kkonganti@0 106
kkonganti@0 107 if response.status_code == requests.codes.ok:
kkonganti@0 108 res = response.json()
kkonganti@0 109 json.dump(res, open(logfile, "w"), indent=4, sort_keys=True)
kkonganti@0 110
kkonganti@0 111 try:
kkonganti@0 112 for count, prediction in enumerate(res[tkey]):
kkonganti@0 113 out.setdefault(tkey, {}).setdefault(count, {})
kkonganti@0 114 for key in tax_pred_keys:
kkonganti@0 115 out[tkey][count].setdefault(key, prediction[key])
kkonganti@0 116 except (KeyError, AttributeError, TypeError) as e:
kkonganti@0 117 logging.error(
kkonganti@0 118 "Did not get taxonomy prediction from JSON response. Highly unusual?\n"
kkonganti@0 119 + f"KeyError or AttributeError or TypeError:\n{e}"
kkonganti@0 120 )
kkonganti@0 121 exit(1)
kkonganti@0 122
kkonganti@0 123 try:
kkonganti@0 124 for key in field_keys:
kkonganti@0 125 out.setdefault(key, res[fkey][key])
kkonganti@0 126 except (KeyError, AttributeError, TypeError) as e:
kkonganti@0 127 for key in field_keys:
kkonganti@0 128 out.setdefault(key, "-")
kkonganti@0 129 logging.info(
kkonganti@0 130 "Did not get rST or other_designation from JSON response. Will skip.\n"
kkonganti@0 131 + f"KeyError or AttributeError or TypeError:\n{e}"
kkonganti@0 132 )
kkonganti@0 133
kkonganti@0 134 try:
kkonganti@0 135 with open(outfile, "w") as out_fh:
kkonganti@0 136 # Header
kkonganti@0 137 out_fh.writelines(
kkonganti@0 138 "\t".join(
kkonganti@0 139 ["Sample"]
kkonganti@0 140 + [k for k, _ in out.items() if out[k] and k != tkey]
kkonganti@0 141 + [k for k in out[tkey][0].keys() if out[tkey][0][k]]
kkonganti@0 142 )
kkonganti@0 143 )
kkonganti@0 144 for count in out[tkey].keys():
kkonganti@0 145 out_fh.writelines(
kkonganti@0 146 "\n"
kkonganti@0 147 + "\t".join(
kkonganti@0 148 [sample_name]
kkonganti@0 149 + [v for k, v in out.items() if out[k] and k != tkey]
kkonganti@0 150 + [
kkonganti@0 151 str(re.sub(r"\s*\>\s*", ";", str(v)))
kkonganti@0 152 for k, v in out[tkey][count].items()
kkonganti@0 153 if out[tkey][count][k]
kkonganti@0 154 ],
kkonganti@0 155 )
kkonganti@0 156 + "\n"
kkonganti@0 157 )
kkonganti@0 158 out_fh.close()
kkonganti@0 159 except (KeyError, AttributeError, TypeError) as e:
kkonganti@0 160 logging.error(f"Unable to write final results.\nException: {e}")
kkonganti@0 161 exit(1)
kkonganti@0 162
kkonganti@0 163
kkonganti@0 164 if __name__ == "__main__":
kkonganti@0 165 main()