comparison 0.1.0/bin/rmlst_post.py @ 0:c8597e9e1a97

"planemo upload"
author kkonganti
date Mon, 27 Nov 2023 12:37:44 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c8597e9e1a97
1 #!/usr/bin/env python3
2
3 # Kranti Konganti
4
5 import argparse
6 import base64
7 import gzip
8 import inspect
9 import json
10 import logging
11 import os
12 import pprint
13 import re
14 from collections import defaultdict
15
16 import requests
17
18
19 # Multiple inheritence for pretty printing of help text.
20 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
21 pass
22
23
24 # Main
25 def main() -> None:
26 """
27 This script takes as input an assembly .fasta format (gzipped or ungzipped)
28 and posts to PubMLST to get the species taxonomy.
29 """
30
31 # Set logging.
32 logging.basicConfig(
33 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n",
34 level=logging.DEBUG,
35 )
36
37 # Debug print.
38 ppp = pprint.PrettyPrinter(width=55)
39 prog_name = os.path.basename(inspect.stack()[0].filename)
40
41 parser = argparse.ArgumentParser(
42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
43 )
44
45 required = parser.add_argument_group("required arguments")
46
47 required.add_argument(
48 "-fasta",
49 dest="fasta",
50 default=False,
51 required=True,
52 help="Absolute UNIX path to file no. 1 containing\nnon white space lines.",
53 )
54 parser.add_argument(
55 "-prefix",
56 dest="prefix",
57 default="response",
58 required=False,
59 help="The prefix of the file name that will be created in\nthe current working directory.",
60 )
61 parser.add_argument(
62 "-fkey",
63 dest="fkey",
64 default="fields",
65 required=False,
66 help="The key name in the JSON response that contains ST results.",
67 )
68 parser.add_argument(
69 "-tkey",
70 dest="tkey",
71 default="taxon_prediction",
72 required=False,
73 help="The key name in the JSON response that contains a list of\ntaxonomy predictions.",
74 )
75
76 # Define defaults
77
78 args = parser.parse_args()
79 fasta = args.fasta
80 fkey = args.fkey
81 tkey = args.tkey
82 outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv")
83 logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json")
84 field_keys = ["rST", "other_designation"]
85 tax_pred_keys = ["rank", "support", "taxon", "taxonomy"]
86 uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence"
87 # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence"
88 payload = '{"base64":true, "details":true, "sequence":"'
89 sample_name = str(args.prefix)
90 out = defaultdict(defaultdict)
91
92 # Basic checks
93
94 if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0):
95 logging.error(f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty.")
96 exit(1)
97
98 try:
99 with gzip.open(fasta, "rb") as fasta_fh:
100 seqs = fasta_fh.read()
101 except gzip.BadGzipFile:
102 with open(fasta, "r") as fasta_fh:
103 seqs = fasta_fh.read()
104 payload += base64.b64encode(str(seqs).encode()).decode() + '"}'
105 response = requests.post(uri, data=payload)
106
107 if response.status_code == requests.codes.ok:
108 res = response.json()
109 json.dump(res, open(logfile, "w"), indent=4, sort_keys=True)
110
111 try:
112 for count, prediction in enumerate(res[tkey]):
113 out.setdefault(tkey, {}).setdefault(count, {})
114 for key in tax_pred_keys:
115 out[tkey][count].setdefault(key, prediction[key])
116 except (KeyError, AttributeError, TypeError) as e:
117 logging.error(
118 "Did not get taxonomy prediction from JSON response. Highly unusual?\n"
119 + f"KeyError or AttributeError or TypeError:\n{e}"
120 )
121 exit(1)
122
123 try:
124 for key in field_keys:
125 out.setdefault(key, res[fkey][key])
126 except (KeyError, AttributeError, TypeError) as e:
127 for key in field_keys:
128 out.setdefault(key, "-")
129 logging.info(
130 "Did not get rST or other_designation from JSON response. Will skip.\n"
131 + f"KeyError or AttributeError or TypeError:\n{e}"
132 )
133
134 try:
135 with open(outfile, "w") as out_fh:
136 # Header
137 out_fh.writelines(
138 "\t".join(
139 ["Sample"]
140 + [k for k, _ in out.items() if out[k] and k != tkey]
141 + [k for k in out[tkey][0].keys() if out[tkey][0][k]]
142 )
143 )
144 for count in out[tkey].keys():
145 out_fh.writelines(
146 "\n"
147 + "\t".join(
148 [sample_name]
149 + [v for k, v in out.items() if out[k] and k != tkey]
150 + [
151 str(re.sub(r"\s*\>\s*", ";", str(v)))
152 for k, v in out[tkey][count].items()
153 if out[tkey][count][k]
154 ],
155 )
156 + "\n"
157 )
158 out_fh.close()
159 except (KeyError, AttributeError, TypeError) as e:
160 logging.error(f"Unable to write final results.\nException: {e}")
161 exit(1)
162
163
164 if __name__ == "__main__":
165 main()