Mercurial > repos > kkonganti > cfsan_bettercallsal
diff 0.5.0/bin/sourmash_sim_matrix.py @ 1:365849f031fd
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 05 Jun 2023 18:48:51 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.5.0/bin/sourmash_sim_matrix.py Mon Jun 05 18:48:51 2023 -0400 @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 + +# Kranti Konganti + +import os +import argparse +import inspect +import logging +import re +import pickle +import pprint +import json +from collections import defaultdict + +# Set logging. +logging.basicConfig( + format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", + level=logging.DEBUG, +) + +# Debug print. +ppp = pprint.PrettyPrinter(width=50, indent=4) + +# Multiple inheritence for pretty printing of help text. +class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): + pass + + +def main() -> None: + """ + This script works only in the context of `bettercallsal` Nextflow workflow. + It takes: + 1. A CSV file containing a similarity matrix or dissimilarity matrix where + the header row contains the names. + 3. It takes indexed NCBI Pathogen metadata in pickle format and converts + accessions to serotype names in the final distance matrix output. + """ + + prog_name = os.path.basename(inspect.stack()[0].filename) + + parser = argparse.ArgumentParser( + prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses + ) + + required = parser.add_argument_group("required arguments") + + required.add_argument( + "-csv", + dest="mat", + default=False, + required=True, + help="Absolute UNIX path to .csv file containing similarity\n" + + "or dissimilarity matrix from `sourmash compare`.", + ) + required.add_argument( + "-pickle", + dest="acc2sero", + default=False, + required=True, + help="Absolute UNIX Path to the *ACC2SERO.pickle\n" + + "metadata file. On raven2, these are located at\n" + + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/", + ) + required.add_argument( + "-labels", + dest="labels", + default=False, + required=True, + help="Absolute UNIX Path to the *.labels.txt\n" + + "file from `sourmash compare`. The accessions\n" + + "will be renanamed to serotype names.", + ) + + args = parser.parse_args() + csv = args.mat + labels = args.labels + pickled_sero = args.acc2sero + row_names = list() + distance_mat = defaultdict(defaultdict) + out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt") + out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json") + + # Prepare dictionary to be dumped as JSON. + distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX" + distance_mat["section_name"] = "Containment index" + distance_mat["description"] = ( + "This section shows the containment index between a sample and the genomes" + + "by running <code>sourmash gather</code> " + + "using <code>--containment</code> option." + ) + distance_mat["plot_type"] = "heatmap" + distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap" + distance_mat["pconfig"]["title"] = "Sourmash: containment index" + distance_mat["pconfig"]["xTitle"] = "Samples" + distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)" + distance_mat["pconfig"]["ycats_samples"] = "False" + distance_mat["pconfig"]["xcats_samples"] = "False" + distance_mat["pconfig"]["square"] = "False" + distance_mat["pconfig"]["min"] = "0.0" + distance_mat["pconfig"]["max"] = "1.0" + distance_mat["data"]["data"] = list() + + if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)): + logging.error( + "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!" + ) + exit(1) + else: + acc2sero = pickle.load(file=open(pickled_sero, "rb")) + + if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0): + logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!") + exit(0) + + if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0): + logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!") + exit(0) + + # with open(out_labels, "w") as out_labels_fh: + with open(labels, "r") as labels_fh: + for line in labels_fh: + line = line.strip() + if line not in acc2sero.keys(): + row_names.append(line) + + labels_fh.close() + + with open(out_csv, "w") as csv_out_fh: + with open(csv, "r") as csv_in_fh: + header = csv_in_fh.readline().strip().split(",") + acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()] + sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()] + + col_names = [ + re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s) + for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()] + ] + + distance_mat["xcats"] = col_names + csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n") + line_num = 0 + + for line in csv_in_fh: + if line_num not in sample_cols: + continue + else: + + heatmap_rows = [ + str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols + ] + # distance_mat["data"]["hmdata"].append(heatmap_rows) + # distance_mat["data"][row_names[line_num]] = heatmap_rows + distance_mat["data"]["data"].append(heatmap_rows) + # distance_mat["data"][row_names[line_num]] = dict( + # [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)] + # ) + csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n") + line_num += 1 + csv_in_fh.close() + csv_out_fh.close() + + distance_mat["ycats"] = row_names + json.dump(distance_mat, open(out_json, "w")) + + +if __name__ == "__main__": + main()