annotate 0.7.0/bin/sourmash_sim_matrix.py @ 21:4ce0e079377d tip

planemo upload
author kkonganti
date Mon, 15 Jul 2024 12:01:00 -0400
parents 0e7a0053e4a6
children
rev   line source
kkonganti@17 1 #!/usr/bin/env python3
kkonganti@17 2
kkonganti@17 3 # Kranti Konganti
kkonganti@17 4
kkonganti@17 5 import os
kkonganti@17 6 import argparse
kkonganti@17 7 import inspect
kkonganti@17 8 import logging
kkonganti@17 9 import re
kkonganti@17 10 import pickle
kkonganti@17 11 import pprint
kkonganti@17 12 import json
kkonganti@17 13 from collections import defaultdict
kkonganti@17 14
kkonganti@17 15 # Set logging.
kkonganti@17 16 logging.basicConfig(
kkonganti@17 17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n",
kkonganti@17 18 level=logging.DEBUG,
kkonganti@17 19 )
kkonganti@17 20
kkonganti@17 21 # Debug print.
kkonganti@17 22 ppp = pprint.PrettyPrinter(width=50, indent=4)
kkonganti@17 23
kkonganti@17 24 # Multiple inheritence for pretty printing of help text.
kkonganti@17 25 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
kkonganti@17 26 pass
kkonganti@17 27
kkonganti@17 28
kkonganti@17 29 def main() -> None:
kkonganti@17 30 """
kkonganti@17 31 This script works only in the context of `bettercallsal` Nextflow workflow.
kkonganti@17 32 It takes:
kkonganti@17 33 1. A CSV file containing a similarity matrix or dissimilarity matrix where
kkonganti@17 34 the header row contains the names.
kkonganti@17 35 3. It takes indexed NCBI Pathogen metadata in pickle format and converts
kkonganti@17 36 accessions to serotype names in the final distance matrix output.
kkonganti@17 37 """
kkonganti@17 38
kkonganti@17 39 prog_name = os.path.basename(inspect.stack()[0].filename)
kkonganti@17 40
kkonganti@17 41 parser = argparse.ArgumentParser(
kkonganti@17 42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
kkonganti@17 43 )
kkonganti@17 44
kkonganti@17 45 required = parser.add_argument_group("required arguments")
kkonganti@17 46
kkonganti@17 47 required.add_argument(
kkonganti@17 48 "-csv",
kkonganti@17 49 dest="mat",
kkonganti@17 50 default=False,
kkonganti@17 51 required=True,
kkonganti@17 52 help="Absolute UNIX path to .csv file containing similarity\n"
kkonganti@17 53 + "or dissimilarity matrix from `sourmash compare`.",
kkonganti@17 54 )
kkonganti@17 55 required.add_argument(
kkonganti@17 56 "-pickle",
kkonganti@17 57 dest="acc2sero",
kkonganti@17 58 default=False,
kkonganti@17 59 required=True,
kkonganti@17 60 help="Absolute UNIX Path to the *ACC2SERO.pickle\n"
kkonganti@17 61 + "metadata file. On raven2, these are located at\n"
kkonganti@17 62 + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/",
kkonganti@17 63 )
kkonganti@17 64 required.add_argument(
kkonganti@17 65 "-labels",
kkonganti@17 66 dest="labels",
kkonganti@17 67 default=False,
kkonganti@17 68 required=True,
kkonganti@17 69 help="Absolute UNIX Path to the *.labels.txt\n"
kkonganti@17 70 + "file from `sourmash compare`. The accessions\n"
kkonganti@17 71 + "will be renanamed to serotype names.",
kkonganti@17 72 )
kkonganti@17 73
kkonganti@17 74 args = parser.parse_args()
kkonganti@17 75 csv = args.mat
kkonganti@17 76 labels = args.labels
kkonganti@17 77 pickled_sero = args.acc2sero
kkonganti@17 78 row_names = list()
kkonganti@17 79 distance_mat = defaultdict(defaultdict)
kkonganti@17 80 out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt")
kkonganti@17 81 out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json")
kkonganti@17 82
kkonganti@17 83 # Prepare dictionary to be dumped as JSON.
kkonganti@17 84 distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX"
kkonganti@17 85 distance_mat["section_name"] = "Containment index"
kkonganti@17 86 distance_mat["description"] = (
kkonganti@17 87 "This section shows the containment index between a sample and the genomes"
kkonganti@17 88 + "by running <code>sourmash gather</code> "
kkonganti@17 89 + "using <code>--containment</code> option."
kkonganti@17 90 )
kkonganti@17 91 distance_mat["plot_type"] = "heatmap"
kkonganti@17 92 distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap"
kkonganti@17 93 distance_mat["pconfig"]["title"] = "Sourmash: containment index"
kkonganti@17 94 distance_mat["pconfig"]["xTitle"] = "Samples"
kkonganti@17 95 distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)"
kkonganti@17 96 distance_mat["pconfig"]["ycats_samples"] = "False"
kkonganti@17 97 distance_mat["pconfig"]["xcats_samples"] = "False"
kkonganti@17 98 distance_mat["pconfig"]["square"] = "False"
kkonganti@17 99 distance_mat["pconfig"]["min"] = "0.0"
kkonganti@17 100 distance_mat["pconfig"]["max"] = "1.0"
kkonganti@17 101 distance_mat["data"]["data"] = list()
kkonganti@17 102
kkonganti@17 103 if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)):
kkonganti@17 104 logging.error(
kkonganti@17 105 "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!"
kkonganti@17 106 )
kkonganti@17 107 exit(1)
kkonganti@17 108 else:
kkonganti@17 109 acc2sero = pickle.load(file=open(pickled_sero, "rb"))
kkonganti@17 110
kkonganti@17 111 if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0):
kkonganti@17 112 logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!")
kkonganti@17 113 exit(0)
kkonganti@17 114
kkonganti@17 115 if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0):
kkonganti@17 116 logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!")
kkonganti@17 117 exit(0)
kkonganti@17 118
kkonganti@17 119 # with open(out_labels, "w") as out_labels_fh:
kkonganti@17 120 with open(labels, "r") as labels_fh:
kkonganti@17 121 for line in labels_fh:
kkonganti@17 122 line = line.strip()
kkonganti@17 123 if line not in acc2sero.keys():
kkonganti@17 124 row_names.append(line)
kkonganti@17 125
kkonganti@17 126 labels_fh.close()
kkonganti@17 127
kkonganti@17 128 with open(out_csv, "w") as csv_out_fh:
kkonganti@17 129 with open(csv, "r") as csv_in_fh:
kkonganti@17 130 header = csv_in_fh.readline().strip().split(",")
kkonganti@17 131 acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()]
kkonganti@17 132 sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()]
kkonganti@17 133
kkonganti@17 134 col_names = [
kkonganti@17 135 re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s)
kkonganti@17 136 for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()]
kkonganti@17 137 ]
kkonganti@17 138
kkonganti@17 139 distance_mat["xcats"] = col_names
kkonganti@17 140 csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n")
kkonganti@17 141 line_num = 0
kkonganti@17 142
kkonganti@17 143 for line in csv_in_fh:
kkonganti@17 144 if line_num not in sample_cols:
kkonganti@17 145 continue
kkonganti@17 146 else:
kkonganti@17 147
kkonganti@17 148 heatmap_rows = [
kkonganti@17 149 str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols
kkonganti@17 150 ]
kkonganti@17 151 # distance_mat["data"]["hmdata"].append(heatmap_rows)
kkonganti@17 152 # distance_mat["data"][row_names[line_num]] = heatmap_rows
kkonganti@17 153 distance_mat["data"]["data"].append(heatmap_rows)
kkonganti@17 154 # distance_mat["data"][row_names[line_num]] = dict(
kkonganti@17 155 # [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)]
kkonganti@17 156 # )
kkonganti@17 157 csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n")
kkonganti@17 158 line_num += 1
kkonganti@17 159 csv_in_fh.close()
kkonganti@17 160 csv_out_fh.close()
kkonganti@17 161
kkonganti@17 162 distance_mat["ycats"] = row_names
kkonganti@17 163 json.dump(distance_mat, open(out_json, "w"))
kkonganti@17 164
kkonganti@17 165
kkonganti@17 166 if __name__ == "__main__":
kkonganti@17 167 main()