Mercurial > repos > galaxytrakr > hfp_cronology_awsbatch
comparison 0.2.0/bin/rmlst_post.py @ 0:9e8b1c747a6a draft default tip
planemo upload
| author | galaxytrakr |
|---|---|
| date | Fri, 29 May 2026 13:32:17 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:9e8b1c747a6a |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Kranti Konganti | |
| 4 | |
| 5 import argparse | |
| 6 import base64 | |
| 7 import gzip | |
| 8 import inspect | |
| 9 import json | |
| 10 import logging | |
| 11 import os | |
| 12 import pprint | |
| 13 import re | |
| 14 from collections import defaultdict | |
| 15 | |
| 16 import requests | |
| 17 | |
| 18 | |
| 19 # Multiple inheritence for pretty printing of help text. | |
| 20 class MultiArgFormatClasses( | |
| 21 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter | |
| 22 ): | |
| 23 pass | |
| 24 | |
| 25 | |
| 26 # Main | |
| 27 def main() -> None: | |
| 28 """ | |
| 29 This script takes as input an assembly .fasta format (gzipped or ungzipped) | |
| 30 and posts to PubMLST to get the species taxonomy. | |
| 31 """ | |
| 32 | |
| 33 # Set logging. | |
| 34 logging.basicConfig( | |
| 35 format="\n" | |
| 36 + "=" * 55 | |
| 37 + "\n%(asctime)s - %(levelname)s\n" | |
| 38 + "=" * 55 | |
| 39 + "\n%(message)s\n\n", | |
| 40 level=logging.DEBUG, | |
| 41 ) | |
| 42 | |
| 43 # Debug print. | |
| 44 ppp = pprint.PrettyPrinter(width=55) | |
| 45 prog_name = os.path.basename(inspect.stack()[0].filename) | |
| 46 | |
| 47 parser = argparse.ArgumentParser( | |
| 48 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
| 49 ) | |
| 50 | |
| 51 required = parser.add_argument_group("required arguments") | |
| 52 | |
| 53 required.add_argument( | |
| 54 "-fasta", | |
| 55 dest="fasta", | |
| 56 default=False, | |
| 57 required=True, | |
| 58 help="Absolute UNIX path to FASTA file.", | |
| 59 ) | |
| 60 parser.add_argument( | |
| 61 "-prefix", | |
| 62 dest="prefix", | |
| 63 default="response", | |
| 64 required=False, | |
| 65 help="The prefix of the file name that will be created in\nthe current working directory.", | |
| 66 ) | |
| 67 parser.add_argument( | |
| 68 "-fkey", | |
| 69 dest="fkey", | |
| 70 default="fields", | |
| 71 required=False, | |
| 72 help="The key name in the JSON response that contains ST results.", | |
| 73 ) | |
| 74 parser.add_argument( | |
| 75 "-tkey", | |
| 76 dest="tkey", | |
| 77 default="taxon_prediction", | |
| 78 required=False, | |
| 79 help="The key name in the JSON response that contains a list of\ntaxonomy predictions.", | |
| 80 ) | |
| 81 | |
| 82 # Define defaults | |
| 83 | |
| 84 args = parser.parse_args() | |
| 85 fasta = args.fasta | |
| 86 fkey = args.fkey | |
| 87 tkey = args.tkey | |
| 88 outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv") | |
| 89 logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json") | |
| 90 field_keys = ["rST", "other_designation"] | |
| 91 tax_pred_keys = ["rank", "support", "taxon", "taxonomy"] | |
| 92 # uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence" | |
| 93 uri = "https://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence" | |
| 94 payload = '{"base64":true, "details":true, "sequence":"' | |
| 95 sample_name = str(args.prefix) | |
| 96 out = defaultdict(defaultdict) | |
| 97 | |
| 98 # Basic checks | |
| 99 | |
| 100 if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0): | |
| 101 logging.error( | |
| 102 f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty." | |
| 103 ) | |
| 104 exit(1) | |
| 105 | |
| 106 try: | |
| 107 with gzip.open(fasta, "rb") as fasta_fh: | |
| 108 seqs = fasta_fh.read() | |
| 109 except gzip.BadGzipFile: | |
| 110 with open(fasta, "r") as fasta_fh: | |
| 111 seqs = fasta_fh.read() | |
| 112 payload += base64.b64encode(str(seqs).encode()).decode() + '"}' | |
| 113 response = requests.post(uri, data=payload) | |
| 114 | |
| 115 if response.status_code == requests.codes.ok: | |
| 116 res = response.json() | |
| 117 json.dump(res, open(logfile, "w"), indent=4, sort_keys=True) | |
| 118 | |
| 119 try: | |
| 120 for count, prediction in enumerate(res[tkey]): | |
| 121 out.setdefault(tkey, {}).setdefault(count, {}) | |
| 122 for key in tax_pred_keys: | |
| 123 out[tkey][count].setdefault(key, prediction[key]) | |
| 124 except (KeyError, AttributeError, TypeError) as e: | |
| 125 logging.warning( | |
| 126 "Did not get taxonomy prediction from JSON response. Probably no match?\n" | |
| 127 + f"KeyError or AttributeError or TypeError:\n{e}" | |
| 128 ) | |
| 129 exit(0) | |
| 130 | |
| 131 try: | |
| 132 for key in field_keys: | |
| 133 out.setdefault(key, res[fkey][key]) | |
| 134 except (KeyError, AttributeError, TypeError) as e: | |
| 135 for key in field_keys: | |
| 136 out.setdefault(key, "-") | |
| 137 logging.info( | |
| 138 "Did not get rST or other_designation from JSON response. Will skip.\n" | |
| 139 + f"KeyError or AttributeError or TypeError:\n{e}" | |
| 140 ) | |
| 141 | |
| 142 try: | |
| 143 with open(outfile, "w") as out_fh: | |
| 144 # Header | |
| 145 out_fh.writelines( | |
| 146 "\t".join( | |
| 147 ["Sample"] | |
| 148 + [k for k, _ in out.items() if out[k] and k != tkey] | |
| 149 + [k for k in out[tkey][0].keys() if out[tkey][0][k]] | |
| 150 ) | |
| 151 ) | |
| 152 for count in out[tkey].keys(): | |
| 153 out_fh.writelines( | |
| 154 "\n" | |
| 155 + "\t".join( | |
| 156 [sample_name] | |
| 157 + [v for k, v in out.items() if out[k] and k != tkey] | |
| 158 + [ | |
| 159 str(re.sub(r"\s*\>\s*", ";", str(v))) | |
| 160 for k, v in out[tkey][count].items() | |
| 161 if out[tkey][count][k] | |
| 162 ], | |
| 163 ) | |
| 164 + "\n" | |
| 165 ) | |
| 166 out_fh.close() | |
| 167 except (KeyError, AttributeError, TypeError) as e: | |
| 168 logging.error(f"Unable to write final results.\nException: {e}") | |
| 169 exit(1) | |
| 170 | |
| 171 | |
| 172 if __name__ == "__main__": | |
| 173 main() |
