Mercurial > repos > kkonganti > cfsan_cronology
comparison 0.1.0/bin/rmlst_post.py @ 0:c8597e9e1a97
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 27 Nov 2023 12:37:44 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c8597e9e1a97 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Kranti Konganti | |
4 | |
5 import argparse | |
6 import base64 | |
7 import gzip | |
8 import inspect | |
9 import json | |
10 import logging | |
11 import os | |
12 import pprint | |
13 import re | |
14 from collections import defaultdict | |
15 | |
16 import requests | |
17 | |
18 | |
19 # Multiple inheritence for pretty printing of help text. | |
20 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): | |
21 pass | |
22 | |
23 | |
24 # Main | |
25 def main() -> None: | |
26 """ | |
27 This script takes as input an assembly .fasta format (gzipped or ungzipped) | |
28 and posts to PubMLST to get the species taxonomy. | |
29 """ | |
30 | |
31 # Set logging. | |
32 logging.basicConfig( | |
33 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", | |
34 level=logging.DEBUG, | |
35 ) | |
36 | |
37 # Debug print. | |
38 ppp = pprint.PrettyPrinter(width=55) | |
39 prog_name = os.path.basename(inspect.stack()[0].filename) | |
40 | |
41 parser = argparse.ArgumentParser( | |
42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
43 ) | |
44 | |
45 required = parser.add_argument_group("required arguments") | |
46 | |
47 required.add_argument( | |
48 "-fasta", | |
49 dest="fasta", | |
50 default=False, | |
51 required=True, | |
52 help="Absolute UNIX path to file no. 1 containing\nnon white space lines.", | |
53 ) | |
54 parser.add_argument( | |
55 "-prefix", | |
56 dest="prefix", | |
57 default="response", | |
58 required=False, | |
59 help="The prefix of the file name that will be created in\nthe current working directory.", | |
60 ) | |
61 parser.add_argument( | |
62 "-fkey", | |
63 dest="fkey", | |
64 default="fields", | |
65 required=False, | |
66 help="The key name in the JSON response that contains ST results.", | |
67 ) | |
68 parser.add_argument( | |
69 "-tkey", | |
70 dest="tkey", | |
71 default="taxon_prediction", | |
72 required=False, | |
73 help="The key name in the JSON response that contains a list of\ntaxonomy predictions.", | |
74 ) | |
75 | |
76 # Define defaults | |
77 | |
78 args = parser.parse_args() | |
79 fasta = args.fasta | |
80 fkey = args.fkey | |
81 tkey = args.tkey | |
82 outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv") | |
83 logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json") | |
84 field_keys = ["rST", "other_designation"] | |
85 tax_pred_keys = ["rank", "support", "taxon", "taxonomy"] | |
86 uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence" | |
87 # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence" | |
88 payload = '{"base64":true, "details":true, "sequence":"' | |
89 sample_name = str(args.prefix) | |
90 out = defaultdict(defaultdict) | |
91 | |
92 # Basic checks | |
93 | |
94 if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0): | |
95 logging.error(f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty.") | |
96 exit(1) | |
97 | |
98 try: | |
99 with gzip.open(fasta, "rb") as fasta_fh: | |
100 seqs = fasta_fh.read() | |
101 except gzip.BadGzipFile: | |
102 with open(fasta, "r") as fasta_fh: | |
103 seqs = fasta_fh.read() | |
104 payload += base64.b64encode(str(seqs).encode()).decode() + '"}' | |
105 response = requests.post(uri, data=payload) | |
106 | |
107 if response.status_code == requests.codes.ok: | |
108 res = response.json() | |
109 json.dump(res, open(logfile, "w"), indent=4, sort_keys=True) | |
110 | |
111 try: | |
112 for count, prediction in enumerate(res[tkey]): | |
113 out.setdefault(tkey, {}).setdefault(count, {}) | |
114 for key in tax_pred_keys: | |
115 out[tkey][count].setdefault(key, prediction[key]) | |
116 except (KeyError, AttributeError, TypeError) as e: | |
117 logging.error( | |
118 "Did not get taxonomy prediction from JSON response. Highly unusual?\n" | |
119 + f"KeyError or AttributeError or TypeError:\n{e}" | |
120 ) | |
121 exit(1) | |
122 | |
123 try: | |
124 for key in field_keys: | |
125 out.setdefault(key, res[fkey][key]) | |
126 except (KeyError, AttributeError, TypeError) as e: | |
127 for key in field_keys: | |
128 out.setdefault(key, "-") | |
129 logging.info( | |
130 "Did not get rST or other_designation from JSON response. Will skip.\n" | |
131 + f"KeyError or AttributeError or TypeError:\n{e}" | |
132 ) | |
133 | |
134 try: | |
135 with open(outfile, "w") as out_fh: | |
136 # Header | |
137 out_fh.writelines( | |
138 "\t".join( | |
139 ["Sample"] | |
140 + [k for k, _ in out.items() if out[k] and k != tkey] | |
141 + [k for k in out[tkey][0].keys() if out[tkey][0][k]] | |
142 ) | |
143 ) | |
144 for count in out[tkey].keys(): | |
145 out_fh.writelines( | |
146 "\n" | |
147 + "\t".join( | |
148 [sample_name] | |
149 + [v for k, v in out.items() if out[k] and k != tkey] | |
150 + [ | |
151 str(re.sub(r"\s*\>\s*", ";", str(v))) | |
152 for k, v in out[tkey][count].items() | |
153 if out[tkey][count][k] | |
154 ], | |
155 ) | |
156 + "\n" | |
157 ) | |
158 out_fh.close() | |
159 except (KeyError, AttributeError, TypeError) as e: | |
160 logging.error(f"Unable to write final results.\nException: {e}") | |
161 exit(1) | |
162 | |
163 | |
164 if __name__ == "__main__": | |
165 main() |