view 0.2.0/bin/rmlst_post.py @ 18:a72c172df773 tip

planemo upload
author kkonganti
date Mon, 15 Jul 2024 21:40:42 -0400
parents a5f31c44f8c9
children
line wrap: on
line source
#!/usr/bin/env python3

# Kranti Konganti

import argparse
import base64
import gzip
import inspect
import json
import logging
import os
import pprint
import re
from collections import defaultdict

import requests


# Multiple inheritence for pretty printing of help text.
class MultiArgFormatClasses(
    argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
):
    pass


# Main
def main() -> None:
    """
    This script takes as input an assembly .fasta format (gzipped or ungzipped)
    and posts to PubMLST to get the species taxonomy.
    """

    # Set logging.
    logging.basicConfig(
        format="\n"
        + "=" * 55
        + "\n%(asctime)s - %(levelname)s\n"
        + "=" * 55
        + "\n%(message)s\n\n",
        level=logging.DEBUG,
    )

    # Debug print.
    ppp = pprint.PrettyPrinter(width=55)
    prog_name = os.path.basename(inspect.stack()[0].filename)

    parser = argparse.ArgumentParser(
        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
    )

    required = parser.add_argument_group("required arguments")

    required.add_argument(
        "-fasta",
        dest="fasta",
        default=False,
        required=True,
        help="Absolute UNIX path to file no. 1 containing\nnon white space lines.",
    )
    parser.add_argument(
        "-prefix",
        dest="prefix",
        default="response",
        required=False,
        help="The prefix of the file name that will be created in\nthe current working directory.",
    )
    parser.add_argument(
        "-fkey",
        dest="fkey",
        default="fields",
        required=False,
        help="The key name in the JSON response that contains ST results.",
    )
    parser.add_argument(
        "-tkey",
        dest="tkey",
        default="taxon_prediction",
        required=False,
        help="The key name in the JSON response that contains a list of\ntaxonomy predictions.",
    )

    # Define defaults

    args = parser.parse_args()
    fasta = args.fasta
    fkey = args.fkey
    tkey = args.tkey
    outfile = os.path.join(os.getcwd(), args.prefix + "_rmlstd.tsv")
    logfile = os.path.join(os.getcwd(), args.prefix + "_rmlst_req.log.json")
    field_keys = ["rST", "other_designation"]
    tax_pred_keys = ["rank", "support", "taxon", "taxonomy"]
    uri = "http://rest.pubmlst.org/db/pubmlst_rmlst_seqdef_kiosk/schemes/1/sequence"
    # uri = "https://rest.pubmlst.org/db/pubmlst_cronobacter_isolates/loci/atpD/sequence"
    payload = '{"base64":true, "details":true, "sequence":"'
    sample_name = str(args.prefix)
    out = defaultdict(defaultdict)

    # Basic checks

    if not (os.path.exists(fasta) and os.path.getsize(fasta) > 0):
        logging.error(
            f"File\n{os.path.basename(fasta)}\ndoes not exist or the file is empty."
        )
        exit(1)

    try:
        with gzip.open(fasta, "rb") as fasta_fh:
            seqs = fasta_fh.read()
    except gzip.BadGzipFile:
        with open(fasta, "r") as fasta_fh:
            seqs = fasta_fh.read()
    payload += base64.b64encode(str(seqs).encode()).decode() + '"}'
    response = requests.post(uri, data=payload)

    if response.status_code == requests.codes.ok:
        res = response.json()
        json.dump(res, open(logfile, "w"), indent=4, sort_keys=True)

        try:
            for count, prediction in enumerate(res[tkey]):
                out.setdefault(tkey, {}).setdefault(count, {})
                for key in tax_pred_keys:
                    out[tkey][count].setdefault(key, prediction[key])
        except (KeyError, AttributeError, TypeError) as e:
            logging.warning(
                "Did not get taxonomy prediction from JSON response. Probably no match?\n"
                + f"KeyError or AttributeError or TypeError:\n{e}"
            )
            exit(0)

        try:
            for key in field_keys:
                out.setdefault(key, res[fkey][key])
        except (KeyError, AttributeError, TypeError) as e:
            for key in field_keys:
                out.setdefault(key, "-")
            logging.info(
                "Did not get rST or other_designation from JSON response. Will skip.\n"
                + f"KeyError or AttributeError or TypeError:\n{e}"
            )

        try:
            with open(outfile, "w") as out_fh:
                # Header
                out_fh.writelines(
                    "\t".join(
                        ["Sample"]
                        + [k for k, _ in out.items() if out[k] and k != tkey]
                        + [k for k in out[tkey][0].keys() if out[tkey][0][k]]
                    )
                )
                for count in out[tkey].keys():
                    out_fh.writelines(
                        "\n"
                        + "\t".join(
                            [sample_name]
                            + [v for k, v in out.items() if out[k] and k != tkey]
                            + [
                                str(re.sub(r"\s*\>\s*", ";", str(v)))
                                for k, v in out[tkey][count].items()
                                if out[tkey][count][k]
                            ],
                        )
                        + "\n"
                    )
            out_fh.close()
        except (KeyError, AttributeError, TypeError) as e:
            logging.error(f"Unable to write final results.\nException: {e}")
            exit(1)


if __name__ == "__main__":
    main()