Mercurial > repos > galaxytrakr > hfp_bettercallsal_konda
comparison 1.0.0/bin/sourmash_sim_matrix.py @ 0:0a8dda29956e draft default tip
planemo upload
| author | galaxytrakr |
|---|---|
| date | Thu, 28 May 2026 20:41:10 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:0a8dda29956e |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 # Kranti Konganti | |
| 4 | |
| 5 import os | |
| 6 import argparse | |
| 7 import inspect | |
| 8 import logging | |
| 9 import re | |
| 10 import pickle | |
| 11 import pprint | |
| 12 import json | |
| 13 from collections import defaultdict | |
| 14 | |
| 15 # Set logging. | |
| 16 logging.basicConfig( | |
| 17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", | |
| 18 level=logging.DEBUG, | |
| 19 ) | |
| 20 | |
| 21 # Debug print. | |
| 22 ppp = pprint.PrettyPrinter(width=50, indent=4) | |
| 23 | |
| 24 # Multiple inheritence for pretty printing of help text. | |
| 25 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): | |
| 26 pass | |
| 27 | |
| 28 | |
| 29 def main() -> None: | |
| 30 """ | |
| 31 This script works only in the context of `bettercallsal` Nextflow workflow. | |
| 32 It takes: | |
| 33 1. A CSV file containing a similarity matrix or dissimilarity matrix where | |
| 34 the header row contains the names. | |
| 35 3. It takes indexed NCBI Pathogen metadata in pickle format and converts | |
| 36 accessions to serotype names in the final distance matrix output. | |
| 37 """ | |
| 38 | |
| 39 prog_name = os.path.basename(inspect.stack()[0].filename) | |
| 40 | |
| 41 parser = argparse.ArgumentParser( | |
| 42 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
| 43 ) | |
| 44 | |
| 45 required = parser.add_argument_group("required arguments") | |
| 46 | |
| 47 required.add_argument( | |
| 48 "-csv", | |
| 49 dest="mat", | |
| 50 default=False, | |
| 51 required=True, | |
| 52 help="Absolute UNIX path to .csv file containing similarity\n" | |
| 53 + "or dissimilarity matrix from `sourmash compare`.", | |
| 54 ) | |
| 55 required.add_argument( | |
| 56 "-pickle", | |
| 57 dest="acc2sero", | |
| 58 default=False, | |
| 59 required=True, | |
| 60 help="Absolute UNIX Path to the *ACC2SERO.pickle\n" | |
| 61 + "metadata file. On raven2, these are located at\n" | |
| 62 + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/", | |
| 63 ) | |
| 64 required.add_argument( | |
| 65 "-labels", | |
| 66 dest="labels", | |
| 67 default=False, | |
| 68 required=True, | |
| 69 help="Absolute UNIX Path to the *.labels.txt\n" | |
| 70 + "file from `sourmash compare`. The accessions\n" | |
| 71 + "will be renanamed to serotype names.", | |
| 72 ) | |
| 73 | |
| 74 args = parser.parse_args() | |
| 75 csv = args.mat | |
| 76 labels = args.labels | |
| 77 pickled_sero = args.acc2sero | |
| 78 row_names = list() | |
| 79 distance_mat = defaultdict(defaultdict) | |
| 80 out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt") | |
| 81 out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json") | |
| 82 | |
| 83 # Prepare dictionary to be dumped as JSON. | |
| 84 distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX" | |
| 85 distance_mat["section_name"] = "Containment index" | |
| 86 distance_mat["description"] = ( | |
| 87 "This section shows the containment index between a sample and the genomes" | |
| 88 + "by running <code>sourmash gather</code> " | |
| 89 + "using <code>--containment</code> option." | |
| 90 ) | |
| 91 distance_mat["plot_type"] = "heatmap" | |
| 92 distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap" | |
| 93 distance_mat["pconfig"]["title"] = "Sourmash: containment index" | |
| 94 distance_mat["pconfig"]["xTitle"] = "Samples" | |
| 95 distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)" | |
| 96 distance_mat["pconfig"]["ycats_samples"] = "False" | |
| 97 distance_mat["pconfig"]["xcats_samples"] = "False" | |
| 98 distance_mat["pconfig"]["square"] = "False" | |
| 99 distance_mat["pconfig"]["min"] = "0.0" | |
| 100 distance_mat["pconfig"]["max"] = "1.0" | |
| 101 distance_mat["data"]["data"] = list() | |
| 102 | |
| 103 if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)): | |
| 104 logging.error( | |
| 105 "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!" | |
| 106 ) | |
| 107 exit(1) | |
| 108 else: | |
| 109 acc2sero = pickle.load(file=open(pickled_sero, "rb")) | |
| 110 | |
| 111 if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0): | |
| 112 logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!") | |
| 113 exit(0) | |
| 114 | |
| 115 if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0): | |
| 116 logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!") | |
| 117 exit(0) | |
| 118 | |
| 119 # with open(out_labels, "w") as out_labels_fh: | |
| 120 with open(labels, "r") as labels_fh: | |
| 121 for line in labels_fh: | |
| 122 line = line.strip() | |
| 123 if line not in acc2sero.keys(): | |
| 124 row_names.append(line) | |
| 125 | |
| 126 labels_fh.close() | |
| 127 | |
| 128 with open(out_csv, "w") as csv_out_fh: | |
| 129 with open(csv, "r") as csv_in_fh: | |
| 130 header = csv_in_fh.readline().strip().split(",") | |
| 131 acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()] | |
| 132 sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()] | |
| 133 | |
| 134 col_names = [ | |
| 135 re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s) | |
| 136 for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()] | |
| 137 ] | |
| 138 | |
| 139 distance_mat["xcats"] = col_names | |
| 140 csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n") | |
| 141 line_num = 0 | |
| 142 | |
| 143 for line in csv_in_fh: | |
| 144 if line_num not in sample_cols: | |
| 145 continue | |
| 146 else: | |
| 147 | |
| 148 heatmap_rows = [ | |
| 149 str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols | |
| 150 ] | |
| 151 # distance_mat["data"]["hmdata"].append(heatmap_rows) | |
| 152 # distance_mat["data"][row_names[line_num]] = heatmap_rows | |
| 153 distance_mat["data"]["data"].append(heatmap_rows) | |
| 154 # distance_mat["data"][row_names[line_num]] = dict( | |
| 155 # [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)] | |
| 156 # ) | |
| 157 csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n") | |
| 158 line_num += 1 | |
| 159 csv_in_fh.close() | |
| 160 csv_out_fh.close() | |
| 161 | |
| 162 distance_mat["ycats"] = row_names | |
| 163 json.dump(distance_mat, open(out_json, "w")) | |
| 164 | |
| 165 | |
| 166 if __name__ == "__main__": | |
| 167 main() |
