Mercurial > repos > kkonganti > cfsan_bettercallsal
comparison 0.5.0/bin/sourmash_sim_matrix.py @ 1:365849f031fd
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 05 Jun 2023 18:48:51 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:a4b1ee4b68b1 | 1:365849f031fd |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Kranti Konganti | |
4 | |
5 import os | |
6 import argparse | |
7 import inspect | |
8 import logging | |
9 import re | |
10 import pickle | |
11 import pprint | |
12 import json | |
13 from collections import defaultdict | |
14 | |
15 # Set logging. | |
16 logging.basicConfig( | |
17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", | |
18 level=logging.DEBUG, | |
19 ) | |
20 | |
21 # Debug print. | |
22 ppp = pprint.PrettyPrinter(width=50, indent=4) | |
23 | |
24 # Multiple inheritence for pretty printing of help text. | |
25 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): | |
26 pass | |
27 | |
28 | |
29 def main() -> None: | |
30 """ | |
31 This script works only in the context of `bettercallsal` Nextflow workflow. | |
32 It takes: | |
33 1. A CSV file containing a similarity matrix or dissimilarity matrix where | |
34 the header row contains the names. | |
35 3. It takes indexed NCBI Pathogen metadata in pickle format and converts | |
36 accessions to serotype names in the final distance matrix output. | |
37 """ | |
38 | |
39 prog_name = os.path.basename(inspect.stack()[0].filename) | |
40 | |
41 parser = argparse.ArgumentParser( | |
42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
43 ) | |
44 | |
45 required = parser.add_argument_group("required arguments") | |
46 | |
47 required.add_argument( | |
48 "-csv", | |
49 dest="mat", | |
50 default=False, | |
51 required=True, | |
52 help="Absolute UNIX path to .csv file containing similarity\n" | |
53 + "or dissimilarity matrix from `sourmash compare`.", | |
54 ) | |
55 required.add_argument( | |
56 "-pickle", | |
57 dest="acc2sero", | |
58 default=False, | |
59 required=True, | |
60 help="Absolute UNIX Path to the *ACC2SERO.pickle\n" | |
61 + "metadata file. On raven2, these are located at\n" | |
62 + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/", | |
63 ) | |
64 required.add_argument( | |
65 "-labels", | |
66 dest="labels", | |
67 default=False, | |
68 required=True, | |
69 help="Absolute UNIX Path to the *.labels.txt\n" | |
70 + "file from `sourmash compare`. The accessions\n" | |
71 + "will be renanamed to serotype names.", | |
72 ) | |
73 | |
74 args = parser.parse_args() | |
75 csv = args.mat | |
76 labels = args.labels | |
77 pickled_sero = args.acc2sero | |
78 row_names = list() | |
79 distance_mat = defaultdict(defaultdict) | |
80 out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt") | |
81 out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json") | |
82 | |
83 # Prepare dictionary to be dumped as JSON. | |
84 distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX" | |
85 distance_mat["section_name"] = "Containment index" | |
86 distance_mat["description"] = ( | |
87 "This section shows the containment index between a sample and the genomes" | |
88 + "by running <code>sourmash gather</code> " | |
89 + "using <code>--containment</code> option." | |
90 ) | |
91 distance_mat["plot_type"] = "heatmap" | |
92 distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap" | |
93 distance_mat["pconfig"]["title"] = "Sourmash: containment index" | |
94 distance_mat["pconfig"]["xTitle"] = "Samples" | |
95 distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)" | |
96 distance_mat["pconfig"]["ycats_samples"] = "False" | |
97 distance_mat["pconfig"]["xcats_samples"] = "False" | |
98 distance_mat["pconfig"]["square"] = "False" | |
99 distance_mat["pconfig"]["min"] = "0.0" | |
100 distance_mat["pconfig"]["max"] = "1.0" | |
101 distance_mat["data"]["data"] = list() | |
102 | |
103 if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)): | |
104 logging.error( | |
105 "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!" | |
106 ) | |
107 exit(1) | |
108 else: | |
109 acc2sero = pickle.load(file=open(pickled_sero, "rb")) | |
110 | |
111 if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0): | |
112 logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!") | |
113 exit(0) | |
114 | |
115 if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0): | |
116 logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!") | |
117 exit(0) | |
118 | |
119 # with open(out_labels, "w") as out_labels_fh: | |
120 with open(labels, "r") as labels_fh: | |
121 for line in labels_fh: | |
122 line = line.strip() | |
123 if line not in acc2sero.keys(): | |
124 row_names.append(line) | |
125 | |
126 labels_fh.close() | |
127 | |
128 with open(out_csv, "w") as csv_out_fh: | |
129 with open(csv, "r") as csv_in_fh: | |
130 header = csv_in_fh.readline().strip().split(",") | |
131 acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()] | |
132 sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()] | |
133 | |
134 col_names = [ | |
135 re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s) | |
136 for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()] | |
137 ] | |
138 | |
139 distance_mat["xcats"] = col_names | |
140 csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n") | |
141 line_num = 0 | |
142 | |
143 for line in csv_in_fh: | |
144 if line_num not in sample_cols: | |
145 continue | |
146 else: | |
147 | |
148 heatmap_rows = [ | |
149 str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols | |
150 ] | |
151 # distance_mat["data"]["hmdata"].append(heatmap_rows) | |
152 # distance_mat["data"][row_names[line_num]] = heatmap_rows | |
153 distance_mat["data"]["data"].append(heatmap_rows) | |
154 # distance_mat["data"][row_names[line_num]] = dict( | |
155 # [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)] | |
156 # ) | |
157 csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n") | |
158 line_num += 1 | |
159 csv_in_fh.close() | |
160 csv_out_fh.close() | |
161 | |
162 distance_mat["ycats"] = row_names | |
163 json.dump(distance_mat, open(out_json, "w")) | |
164 | |
165 | |
166 if __name__ == "__main__": | |
167 main() |