view 0.7.0/bin/sourmash_sim_matrix.py @ 21:4ce0e079377d tip

planemo upload
author kkonganti
date Mon, 15 Jul 2024 12:01:00 -0400
parents 0e7a0053e4a6
children
line wrap: on
line source
#!/usr/bin/env python3

# Kranti Konganti

import os
import argparse
import inspect
import logging
import re
import pickle
import pprint
import json
from collections import defaultdict

# Set logging.
logging.basicConfig(
    format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n",
    level=logging.DEBUG,
)

# Debug print.
ppp = pprint.PrettyPrinter(width=50, indent=4)

# Multiple inheritence for pretty printing of help text.
class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
    pass


def main() -> None:
    """
    This script works only in the context of `bettercallsal` Nextflow workflow.
    It takes:
        1. A CSV file containing a similarity matrix or dissimilarity matrix where
            the header row contains the names.
        3. It takes indexed NCBI Pathogen metadata in pickle format and converts
            accessions to serotype names in the final distance matrix output.
    """

    prog_name = os.path.basename(inspect.stack()[0].filename)

    parser = argparse.ArgumentParser(
        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
    )

    required = parser.add_argument_group("required arguments")

    required.add_argument(
        "-csv",
        dest="mat",
        default=False,
        required=True,
        help="Absolute UNIX path to .csv file containing similarity\n"
        + "or dissimilarity matrix from `sourmash compare`.",
    )
    required.add_argument(
        "-pickle",
        dest="acc2sero",
        default=False,
        required=True,
        help="Absolute UNIX Path to the *ACC2SERO.pickle\n"
        + "metadata file. On raven2, these are located at\n"
        + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/",
    )
    required.add_argument(
        "-labels",
        dest="labels",
        default=False,
        required=True,
        help="Absolute UNIX Path to the *.labels.txt\n"
        + "file from `sourmash compare`. The accessions\n"
        + "will be renanamed to serotype names.",
    )

    args = parser.parse_args()
    csv = args.mat
    labels = args.labels
    pickled_sero = args.acc2sero
    row_names = list()
    distance_mat = defaultdict(defaultdict)
    out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt")
    out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json")

    # Prepare dictionary to be dumped as JSON.
    distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX"
    distance_mat["section_name"] = "Containment index"
    distance_mat["description"] = (
        "This section shows the containment index between a sample and the genomes"
        + "by running <code>sourmash gather</code> "
        + "using <code>--containment</code> option."
    )
    distance_mat["plot_type"] = "heatmap"
    distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap"
    distance_mat["pconfig"]["title"] = "Sourmash: containment index"
    distance_mat["pconfig"]["xTitle"] = "Samples"
    distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)"
    distance_mat["pconfig"]["ycats_samples"] = "False"
    distance_mat["pconfig"]["xcats_samples"] = "False"
    distance_mat["pconfig"]["square"] = "False"
    distance_mat["pconfig"]["min"] = "0.0"
    distance_mat["pconfig"]["max"] = "1.0"
    distance_mat["data"]["data"] = list()

    if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)):
        logging.error(
            "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!"
        )
        exit(1)
    else:
        acc2sero = pickle.load(file=open(pickled_sero, "rb"))

    if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0):
        logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!")
        exit(0)

    if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0):
        logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!")
        exit(0)

    # with open(out_labels, "w") as out_labels_fh:
    with open(labels, "r") as labels_fh:
        for line in labels_fh:
            line = line.strip()
            if line not in acc2sero.keys():
                row_names.append(line)

    labels_fh.close()

    with open(out_csv, "w") as csv_out_fh:
        with open(csv, "r") as csv_in_fh:
            header = csv_in_fh.readline().strip().split(",")
            acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()]
            sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()]

            col_names = [
                re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s)
                for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()]
            ]

            distance_mat["xcats"] = col_names
            csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n")
            line_num = 0

            for line in csv_in_fh:
                if line_num not in sample_cols:
                    continue
                else:

                    heatmap_rows = [
                        str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols
                    ]
                    # distance_mat["data"]["hmdata"].append(heatmap_rows)
                    # distance_mat["data"][row_names[line_num]] = heatmap_rows
                    distance_mat["data"]["data"].append(heatmap_rows)
                    # distance_mat["data"][row_names[line_num]] = dict(
                    #     [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)]
                    # )
                    csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n")
                    line_num += 1
        csv_in_fh.close()
    csv_out_fh.close()

    distance_mat["ycats"] = row_names
    json.dump(distance_mat, open(out_json, "w"))


if __name__ == "__main__":
    main()