comparison 0.5.0/bin/sourmash_sim_matrix.py @ 1:365849f031fd

"planemo upload"
author kkonganti
date Mon, 05 Jun 2023 18:48:51 -0400
parents
children
comparison
equal deleted inserted replaced
0:a4b1ee4b68b1 1:365849f031fd
1 #!/usr/bin/env python3
2
3 # Kranti Konganti
4
5 import os
6 import argparse
7 import inspect
8 import logging
9 import re
10 import pickle
11 import pprint
12 import json
13 from collections import defaultdict
14
15 # Set logging.
16 logging.basicConfig(
17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n",
18 level=logging.DEBUG,
19 )
20
21 # Debug print.
22 ppp = pprint.PrettyPrinter(width=50, indent=4)
23
24 # Multiple inheritence for pretty printing of help text.
25 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
26 pass
27
28
29 def main() -> None:
30 """
31 This script works only in the context of `bettercallsal` Nextflow workflow.
32 It takes:
33 1. A CSV file containing a similarity matrix or dissimilarity matrix where
34 the header row contains the names.
35 3. It takes indexed NCBI Pathogen metadata in pickle format and converts
36 accessions to serotype names in the final distance matrix output.
37 """
38
39 prog_name = os.path.basename(inspect.stack()[0].filename)
40
41 parser = argparse.ArgumentParser(
42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
43 )
44
45 required = parser.add_argument_group("required arguments")
46
47 required.add_argument(
48 "-csv",
49 dest="mat",
50 default=False,
51 required=True,
52 help="Absolute UNIX path to .csv file containing similarity\n"
53 + "or dissimilarity matrix from `sourmash compare`.",
54 )
55 required.add_argument(
56 "-pickle",
57 dest="acc2sero",
58 default=False,
59 required=True,
60 help="Absolute UNIX Path to the *ACC2SERO.pickle\n"
61 + "metadata file. On raven2, these are located at\n"
62 + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/",
63 )
64 required.add_argument(
65 "-labels",
66 dest="labels",
67 default=False,
68 required=True,
69 help="Absolute UNIX Path to the *.labels.txt\n"
70 + "file from `sourmash compare`. The accessions\n"
71 + "will be renanamed to serotype names.",
72 )
73
74 args = parser.parse_args()
75 csv = args.mat
76 labels = args.labels
77 pickled_sero = args.acc2sero
78 row_names = list()
79 distance_mat = defaultdict(defaultdict)
80 out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt")
81 out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json")
82
83 # Prepare dictionary to be dumped as JSON.
84 distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX"
85 distance_mat["section_name"] = "Containment index"
86 distance_mat["description"] = (
87 "This section shows the containment index between a sample and the genomes"
88 + "by running <code>sourmash gather</code> "
89 + "using <code>--containment</code> option."
90 )
91 distance_mat["plot_type"] = "heatmap"
92 distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap"
93 distance_mat["pconfig"]["title"] = "Sourmash: containment index"
94 distance_mat["pconfig"]["xTitle"] = "Samples"
95 distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)"
96 distance_mat["pconfig"]["ycats_samples"] = "False"
97 distance_mat["pconfig"]["xcats_samples"] = "False"
98 distance_mat["pconfig"]["square"] = "False"
99 distance_mat["pconfig"]["min"] = "0.0"
100 distance_mat["pconfig"]["max"] = "1.0"
101 distance_mat["data"]["data"] = list()
102
103 if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)):
104 logging.error(
105 "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!"
106 )
107 exit(1)
108 else:
109 acc2sero = pickle.load(file=open(pickled_sero, "rb"))
110
111 if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0):
112 logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!")
113 exit(0)
114
115 if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0):
116 logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!")
117 exit(0)
118
119 # with open(out_labels, "w") as out_labels_fh:
120 with open(labels, "r") as labels_fh:
121 for line in labels_fh:
122 line = line.strip()
123 if line not in acc2sero.keys():
124 row_names.append(line)
125
126 labels_fh.close()
127
128 with open(out_csv, "w") as csv_out_fh:
129 with open(csv, "r") as csv_in_fh:
130 header = csv_in_fh.readline().strip().split(",")
131 acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()]
132 sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()]
133
134 col_names = [
135 re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s)
136 for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()]
137 ]
138
139 distance_mat["xcats"] = col_names
140 csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n")
141 line_num = 0
142
143 for line in csv_in_fh:
144 if line_num not in sample_cols:
145 continue
146 else:
147
148 heatmap_rows = [
149 str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols
150 ]
151 # distance_mat["data"]["hmdata"].append(heatmap_rows)
152 # distance_mat["data"][row_names[line_num]] = heatmap_rows
153 distance_mat["data"]["data"].append(heatmap_rows)
154 # distance_mat["data"][row_names[line_num]] = dict(
155 # [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)]
156 # )
157 csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n")
158 line_num += 1
159 csv_in_fh.close()
160 csv_out_fh.close()
161
162 distance_mat["ycats"] = row_names
163 json.dump(distance_mat, open(out_json, "w"))
164
165
166 if __name__ == "__main__":
167 main()