kkonganti@17: #!/usr/bin/env python3 kkonganti@17: kkonganti@17: # Kranti Konganti kkonganti@17: kkonganti@17: import os kkonganti@17: import argparse kkonganti@17: import inspect kkonganti@17: import logging kkonganti@17: import re kkonganti@17: import pickle kkonganti@17: import pprint kkonganti@17: import json kkonganti@17: from collections import defaultdict kkonganti@17: kkonganti@17: # Set logging. kkonganti@17: logging.basicConfig( kkonganti@17: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", kkonganti@17: level=logging.DEBUG, kkonganti@17: ) kkonganti@17: kkonganti@17: # Debug print. kkonganti@17: ppp = pprint.PrettyPrinter(width=50, indent=4) kkonganti@17: kkonganti@17: # Multiple inheritence for pretty printing of help text. kkonganti@17: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@17: pass kkonganti@17: kkonganti@17: kkonganti@17: def main() -> None: kkonganti@17: """ kkonganti@17: This script works only in the context of `bettercallsal` Nextflow workflow. kkonganti@17: It takes: kkonganti@17: 1. A CSV file containing a similarity matrix or dissimilarity matrix where kkonganti@17: the header row contains the names. kkonganti@17: 3. It takes indexed NCBI Pathogen metadata in pickle format and converts kkonganti@17: accessions to serotype names in the final distance matrix output. kkonganti@17: """ kkonganti@17: kkonganti@17: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@17: kkonganti@17: parser = argparse.ArgumentParser( kkonganti@17: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@17: ) kkonganti@17: kkonganti@17: required = parser.add_argument_group("required arguments") kkonganti@17: kkonganti@17: required.add_argument( kkonganti@17: "-csv", kkonganti@17: dest="mat", kkonganti@17: default=False, kkonganti@17: required=True, kkonganti@17: help="Absolute UNIX path to .csv file containing similarity\n" kkonganti@17: + "or dissimilarity matrix from `sourmash compare`.", kkonganti@17: ) kkonganti@17: required.add_argument( kkonganti@17: "-pickle", kkonganti@17: dest="acc2sero", kkonganti@17: default=False, kkonganti@17: required=True, kkonganti@17: help="Absolute UNIX Path to the *ACC2SERO.pickle\n" kkonganti@17: + "metadata file. On raven2, these are located at\n" kkonganti@17: + "/hpc/db/bettercallsal/PDGXXXXXXXXXX.XXXXX/", kkonganti@17: ) kkonganti@17: required.add_argument( kkonganti@17: "-labels", kkonganti@17: dest="labels", kkonganti@17: default=False, kkonganti@17: required=True, kkonganti@17: help="Absolute UNIX Path to the *.labels.txt\n" kkonganti@17: + "file from `sourmash compare`. The accessions\n" kkonganti@17: + "will be renanamed to serotype names.", kkonganti@17: ) kkonganti@17: kkonganti@17: args = parser.parse_args() kkonganti@17: csv = args.mat kkonganti@17: labels = args.labels kkonganti@17: pickled_sero = args.acc2sero kkonganti@17: row_names = list() kkonganti@17: distance_mat = defaultdict(defaultdict) kkonganti@17: out_csv = os.path.join(os.getcwd(), "bcs_sourmash_matrix.tblsum.txt") kkonganti@17: out_json = os.path.join(os.getcwd(), "bcs_sourmash_matrix_mqc.json") kkonganti@17: kkonganti@17: # Prepare dictionary to be dumped as JSON. kkonganti@17: distance_mat["id"] = "BETTERCALLSAL_CONTAINMENT_INDEX" kkonganti@17: distance_mat["section_name"] = "Containment index" kkonganti@17: distance_mat["description"] = ( kkonganti@17: "This section shows the containment index between a sample and the genomes" kkonganti@17: + "by running sourmash gather " kkonganti@17: + "using --containment option." kkonganti@17: ) kkonganti@17: distance_mat["plot_type"] = "heatmap" kkonganti@17: distance_mat["pconfig"]["id"] = "bettercallsal_containment_index_heatmap" kkonganti@17: distance_mat["pconfig"]["title"] = "Sourmash: containment index" kkonganti@17: distance_mat["pconfig"]["xTitle"] = "Samples" kkonganti@17: distance_mat["pconfig"]["yTitle"] = "Isolates (Genome assemblies)" kkonganti@17: distance_mat["pconfig"]["ycats_samples"] = "False" kkonganti@17: distance_mat["pconfig"]["xcats_samples"] = "False" kkonganti@17: distance_mat["pconfig"]["square"] = "False" kkonganti@17: distance_mat["pconfig"]["min"] = "0.0" kkonganti@17: distance_mat["pconfig"]["max"] = "1.0" kkonganti@17: distance_mat["data"]["data"] = list() kkonganti@17: kkonganti@17: if pickled_sero and (not os.path.exists(pickled_sero) or not os.path.getsize(pickled_sero)): kkonganti@17: logging.error( kkonganti@17: "The pickle file,\n" + f"{os.path.basename(pickled_sero)} does not exist or is empty!" kkonganti@17: ) kkonganti@17: exit(1) kkonganti@17: else: kkonganti@17: acc2sero = pickle.load(file=open(pickled_sero, "rb")) kkonganti@17: kkonganti@17: if csv and (not os.path.exists(csv) or not os.path.getsize(csv) > 0): kkonganti@17: logging.error("File,\n" + f"{csv}\ndoes not exist " + "or is empty!") kkonganti@17: exit(0) kkonganti@17: kkonganti@17: if labels and (not os.path.exists(labels) or not os.path.getsize(labels) > 0): kkonganti@17: logging.error("File,\n" + f"{labels}\ndoes not exist " + "or is empty!") kkonganti@17: exit(0) kkonganti@17: kkonganti@17: # with open(out_labels, "w") as out_labels_fh: kkonganti@17: with open(labels, "r") as labels_fh: kkonganti@17: for line in labels_fh: kkonganti@17: line = line.strip() kkonganti@17: if line not in acc2sero.keys(): kkonganti@17: row_names.append(line) kkonganti@17: kkonganti@17: labels_fh.close() kkonganti@17: kkonganti@17: with open(out_csv, "w") as csv_out_fh: kkonganti@17: with open(csv, "r") as csv_in_fh: kkonganti@17: header = csv_in_fh.readline().strip().split(",") kkonganti@17: acc_cols = [idx for idx, col in enumerate(header) if col in acc2sero.keys()] kkonganti@17: sample_cols = [idx for idx, col in enumerate(header) if col not in acc2sero.keys()] kkonganti@17: kkonganti@17: col_names = [ kkonganti@17: re.sub(r"serotype=|\,antigen_formula=.*?\|", "", s) kkonganti@17: for s in [acc2sero[col] + f"| | {col}" for col in header if col in acc2sero.keys()] kkonganti@17: ] kkonganti@17: kkonganti@17: distance_mat["xcats"] = col_names kkonganti@17: csv_out_fh.write("\t".join(["Sample"] + col_names) + "\n") kkonganti@17: line_num = 0 kkonganti@17: kkonganti@17: for line in csv_in_fh: kkonganti@17: if line_num not in sample_cols: kkonganti@17: continue kkonganti@17: else: kkonganti@17: kkonganti@17: heatmap_rows = [ kkonganti@17: str(round(float(line.strip().split(",")[col]), 5)) for col in acc_cols kkonganti@17: ] kkonganti@17: # distance_mat["data"]["hmdata"].append(heatmap_rows) kkonganti@17: # distance_mat["data"][row_names[line_num]] = heatmap_rows kkonganti@17: distance_mat["data"]["data"].append(heatmap_rows) kkonganti@17: # distance_mat["data"][row_names[line_num]] = dict( kkonganti@17: # [(col_names[idx], val) for idx, val in enumerate(heatmap_rows)] kkonganti@17: # ) kkonganti@17: csv_out_fh.write("\t".join([row_names[line_num]] + heatmap_rows) + "\n") kkonganti@17: line_num += 1 kkonganti@17: csv_in_fh.close() kkonganti@17: csv_out_fh.close() kkonganti@17: kkonganti@17: distance_mat["ycats"] = row_names kkonganti@17: json.dump(distance_mat, open(out_json, "w")) kkonganti@17: kkonganti@17: kkonganti@17: if __name__ == "__main__": kkonganti@17: main()