annotate 0.2.0/bin/rmlst_post.py @ 17:b571995ddb51

planemo upload
author kkonganti
date Mon, 15 Jul 2024 19:01:29 -0400
parents a5f31c44f8c9
children
rev   line source
kkonganti@11 1 #!/usr/bin/env python3
kkonganti@11 2
kkonganti@11 3 # Kranti Konganti
kkonganti@11 4
kkonganti@11 5 import argparse
kkonganti@11 6 import base64
kkonganti@11 7 import gzip
kkonganti@11 8 import inspect
kkonganti@11 9 import json
kkonganti@11 10 import logging
kkonganti@11 11 import os
kkonganti@11 12 import pprint
kkonganti@11 13 import re
kkonganti@11 14 from collections import defaultdict
kkonganti@11 15
kkonganti@11 16 import requests
kkonganti@11 17
kkonganti@11 18
kkonganti@11 19 # Multiple inheritence for pretty printing of help text.
kkonganti@11 20 class MultiArgFormatClasses(
kkonganti@11 21 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
kkonganti@11 22 ):
kkonganti@11 23 pass
kkonganti@11 24
kkonganti@11 25
kkonganti@11 26 # Main
kkonganti@11 27 def main() -> None:
kkonganti@11 28 """
kkonganti@11 29 This script takes as input an assembly .fasta format (gzipped or ungzipped)
kkonganti@11 30 and posts to PubMLST to get the species taxonomy.
kkonganti@11 31 """
kkonganti@11 32
kkonganti@11 33 # Set logging.
kkonganti@11 34 logging.basicConfig(
kkonganti@11 35 format="\n"
kkonganti@11 36 + "=" * 55
kkonganti@11 37 + "\n%(asctime)s - %(levelname)s\n"
kkonganti@11 38 + "=" * 55
kkonganti@11 39 + "\n%(message)s\n\n",
kkonganti@11 40 level=logging.DEBUG,
kkonganti@11 41 )
kkonganti@11 42
kkonganti@11 43 # Debug print.
kkonganti@11 44 ppp = pprint.PrettyPrinter(width=55)
kkonganti@11 45 prog_name = os.path.basename(inspect.stack()[0].filename)
kkonganti@11 46
kkonganti@11 47 parser = argparse.ArgumentParser(
kkonganti@11 48 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
kkonganti@11 49 )
kkonganti@11 50
kkonganti@11 51 required = parser.add_argument_group("required arguments")
kkonganti@11 52
kkonganti@11 53 required.add_argument(
kkonganti@11 54 "-fasta",
kkonganti@11 55 dest="fasta",
kkonganti@11 56 default=False,
kkonganti@11 57 required=True,
kkonganti@11 58 help="Absolute UNIX path to file no. 1 containing\nnon white space lines.",
kkonganti@11 59 )
kkonganti@11 60 parser.add_argument(
kkonganti@11 61 "-prefix",
kkonganti@11 62 dest="prefix",
kkonganti@11 63 default="response",
kkonganti@11 64 required=False,
kkonganti@11 65 help="The prefix of the file name that will be created in\nthe current working directory.",
kkonganti@11 66 )
kkonganti@11 67 parser.add_argument(
kkonganti@11 68 "-fkey",
kkonganti@11 69 dest="fkey",
kkonganti@11 70 default="fields",
kkonganti@11 71 required=False,
kkonganti@11 72 help="The key name in the JSON response that contains ST results.",
kkonganti@11 73 )
kkonganti@11 74 parser.add_argument(
kkonganti@11 75 "-tkey",
kkonganti@11 76 dest="tkey",
kkonganti@11 77 default="taxon_prediction",
kkonganti@11 78 required=False,
kkonganti@11 79 help="The key name in the JSON response that contains a list of\ntaxonomy predictions.",
kkonganti@11 80 )
kkonganti@11 81
kkonganti@11 82 # Define defaults
kkonganti@11 83
kkonganti@11 84 args = parser.parse_args()
kkonganti@11 85 fasta = args.fasta
kkonganti@11 86 fkey = args.fkey
kkonganti@11 87 tkey = args.tkey
kkonganti@11 88 outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv")
kkonganti@11 89 logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json")
kkonganti@11 90 field_keys = ["rST", "other_designation"]
kkonganti@11 91 tax_pred_keys = ["rank", "support", "taxon", "taxonomy"]
kkonganti@11 92 uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence"
kkonganti@11 93 # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence"
kkonganti@11 94 payload = '{"base64":true, "details":true, "sequence":"'
kkonganti@11 95 sample_name = str(args.prefix)
kkonganti@11 96 out = defaultdict(defaultdict)
kkonganti@11 97
kkonganti@11 98 # Basic checks
kkonganti@11 99
kkonganti@11 100 if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0):
kkonganti@11 101 logging.error(
kkonganti@11 102 f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty."
kkonganti@11 103 )
kkonganti@11 104 exit(1)
kkonganti@11 105
kkonganti@11 106 try:
kkonganti@11 107 with gzip.open(fasta, "rb") as fasta_fh:
kkonganti@11 108 seqs = fasta_fh.read()
kkonganti@11 109 except gzip.BadGzipFile:
kkonganti@11 110 with open(fasta, "r") as fasta_fh:
kkonganti@11 111 seqs = fasta_fh.read()
kkonganti@11 112 payload += base64.b64encode(str(seqs).encode()).decode() + '"}'
kkonganti@11 113 response = requests.post(uri, data=payload)
kkonganti@11 114
kkonganti@11 115 if response.status_code == requests.codes.ok:
kkonganti@11 116 res = response.json()
kkonganti@11 117 json.dump(res, open(logfile, "w"), indent=4, sort_keys=True)
kkonganti@11 118
kkonganti@11 119 try:
kkonganti@11 120 for count, prediction in enumerate(res[tkey]):
kkonganti@11 121 out.setdefault(tkey, {}).setdefault(count, {})
kkonganti@11 122 for key in tax_pred_keys:
kkonganti@11 123 out[tkey][count].setdefault(key, prediction[key])
kkonganti@11 124 except (KeyError, AttributeError, TypeError) as e:
kkonganti@11 125 logging.warning(
kkonganti@11 126 "Did not get taxonomy prediction from JSON response. Probably no match?\n"
kkonganti@11 127 + f"KeyError or AttributeError or TypeError:\n{e}"
kkonganti@11 128 )
kkonganti@11 129 exit(0)
kkonganti@11 130
kkonganti@11 131 try:
kkonganti@11 132 for key in field_keys:
kkonganti@11 133 out.setdefault(key, res[fkey][key])
kkonganti@11 134 except (KeyError, AttributeError, TypeError) as e:
kkonganti@11 135 for key in field_keys:
kkonganti@11 136 out.setdefault(key, "-")
kkonganti@11 137 logging.info(
kkonganti@11 138 "Did not get rST or other_designation from JSON response. Will skip.\n"
kkonganti@11 139 + f"KeyError or AttributeError or TypeError:\n{e}"
kkonganti@11 140 )
kkonganti@11 141
kkonganti@11 142 try:
kkonganti@11 143 with open(outfile, "w") as out_fh:
kkonganti@11 144 # Header
kkonganti@11 145 out_fh.writelines(
kkonganti@11 146 "\t".join(
kkonganti@11 147 ["Sample"]
kkonganti@11 148 + [k for k, _ in out.items() if out[k] and k != tkey]
kkonganti@11 149 + [k for k in out[tkey][0].keys() if out[tkey][0][k]]
kkonganti@11 150 )
kkonganti@11 151 )
kkonganti@11 152 for count in out[tkey].keys():
kkonganti@11 153 out_fh.writelines(
kkonganti@11 154 "\n"
kkonganti@11 155 + "\t".join(
kkonganti@11 156 [sample_name]
kkonganti@11 157 + [v for k, v in out.items() if out[k] and k != tkey]
kkonganti@11 158 + [
kkonganti@11 159 str(re.sub(r"\s*\>\s*", ";", str(v)))
kkonganti@11 160 for k, v in out[tkey][count].items()
kkonganti@11 161 if out[tkey][count][k]
kkonganti@11 162 ],
kkonganti@11 163 )
kkonganti@11 164 + "\n"
kkonganti@11 165 )
kkonganti@11 166 out_fh.close()
kkonganti@11 167 except (KeyError, AttributeError, TypeError) as e:
kkonganti@11 168 logging.error(f"Unable to write final results.\nException: {e}")
kkonganti@11 169 exit(1)
kkonganti@11 170
kkonganti@11 171
kkonganti@11 172 if __name__ == "__main__":
kkonganti@11 173 main()