kkonganti@11: #!/usr/bin/env python3 kkonganti@11: kkonganti@11: # Kranti Konganti kkonganti@11: kkonganti@11: import argparse kkonganti@11: import inspect kkonganti@11: import logging kkonganti@11: import os kkonganti@11: import pickle kkonganti@11: import pprint kkonganti@11: import re kkonganti@11: from collections import defaultdict kkonganti@11: kkonganti@11: kkonganti@11: # Multiple inheritence for pretty printing of help text. kkonganti@11: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@11: pass kkonganti@11: kkonganti@11: kkonganti@11: # Main kkonganti@11: def main() -> None: kkonganti@11: """ kkonganti@11: This script works only in the context of `cronology_db` Nextflow workflow. kkonganti@11: It takes an UNIX path to directory containing the following files: kkonganti@11: 1. PDG metadata file (Ex: `PDG000000043.204.metadata.tsv`) kkonganti@11: 2. PDG SNP Cluster metadata file (Ex: `PDG000000043.204.reference_target.cluster_list.tsv`) kkonganti@11: 3. A list of possibly downloadable assembly accessions (one per line) from the metadata file. kkonganti@11: and then generates a pickled file with relevant metadata columns mentioned with the -cols option. kkonganti@11: """ kkonganti@11: kkonganti@11: # Set logging. kkonganti@11: logging.basicConfig( kkonganti@11: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", kkonganti@11: level=logging.DEBUG, kkonganti@11: ) kkonganti@11: kkonganti@11: # Debug print. kkonganti@11: ppp = pprint.PrettyPrinter(width=55) kkonganti@11: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@11: kkonganti@11: parser = argparse.ArgumentParser( kkonganti@11: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@11: ) kkonganti@11: kkonganti@11: required = parser.add_argument_group("required arguments") kkonganti@11: kkonganti@11: required.add_argument( kkonganti@11: "-pdg_dir", kkonganti@11: dest="pdg_dir", kkonganti@11: default=False, kkonganti@11: required=True, kkonganti@11: help="Absolute UNIX path to directory containing the following files.\nEx:" kkonganti@11: + "\n1. PDG000000043.204.metadata.tsv" kkonganti@11: + "\n2. PDG000000043.204.reference_target.cluster_list.tsv" kkonganti@11: + "\n3. A file of assembly accessions, one per line parsed out from" kkonganti@11: + "\n the metadata file.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-mlst", kkonganti@11: dest="mlst_results", kkonganti@11: required=False, kkonganti@11: help="Absolute UNIX path to MLST results file\nIf MLST results exists for a accession, they" kkonganti@11: + "\nwill be included in the index.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-pdg_meta_pat", kkonganti@11: dest="pdg_meta_pat", kkonganti@11: default="PDG\d+\.\d+\.metadata\.tsv", kkonganti@11: required=False, kkonganti@11: help="The pattern to be used to search for PDG metadata\nfile.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-pdg_snp_meta_pat", kkonganti@11: dest="pdg_snp_meta_pat", kkonganti@11: default="PDG\d+\.\d+\.reference\_target\.cluster\_list\.tsv", kkonganti@11: required=False, kkonganti@11: help="The pattern to be used to search for PDG SNP Cluster metadata\nfile.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-pdg_accs_filename_pat", kkonganti@11: dest="pdg_accs_fn_pat", kkonganti@11: default="accs_all.txt", kkonganti@11: required=False, kkonganti@11: help="The filename to look for where all the parsed GC[AF] accessions are stored,\n" kkonganti@11: + "one per line.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-cols", kkonganti@11: dest="metadata_cols", kkonganti@11: default="epi_type,collected_by,collection_date,host," kkonganti@11: + "\nhost_disease,isolation_source,outbreak,sample_name,scientific_name,serovar," kkonganti@11: + "\nsource_type,strain,computed_types,target_acc", kkonganti@11: required=False, kkonganti@11: help="The data in these metadata columns will be indexed for each\nisolate.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-fs", kkonganti@11: dest="force_write_pick", kkonganti@11: action="store_true", kkonganti@11: required=False, kkonganti@11: help="By default, when -s flag is on, the pickle file named *.IDXD_PDG_METAD.pickle" kkonganti@11: + "\nis written to CWD. If the file exists, the program will not overwrite" kkonganti@11: + "\nand exit. Use -fs option to overwrite.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-op", kkonganti@11: dest="out_prefix", kkonganti@11: default="IDXD_PDG_METAD", kkonganti@11: help="Set the output file prefix for indexed PDG metadata.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-pfs", kkonganti@11: dest="pdg_meta_fs", kkonganti@11: default="\t", kkonganti@11: help="Change the field separator of the PDG metadata file.", kkonganti@11: ) kkonganti@11: kkonganti@11: args = parser.parse_args() kkonganti@11: pdg_dir = os.path.abspath(args.pdg_dir) kkonganti@11: mcols = args.metadata_cols kkonganti@11: f_write_pick = args.force_write_pick kkonganti@11: out_prefix = args.out_prefix kkonganti@11: pdg_meta_fs = args.pdg_meta_fs kkonganti@11: mlst_res = args.mlst_results kkonganti@11: acc_pat = re.compile(r"^GC[AF]\_\d+\.?\d*") kkonganti@11: mcols_pat = re.compile(r"[\w+\,]") kkonganti@11: pdg_meta_pat = re.compile(f"{args.pdg_meta_pat}") kkonganti@11: pdg_snp_meta_pat = re.compile(f"{args.pdg_snp_meta_pat}") kkonganti@11: pdg_accs_fn_pat = re.compile(f"{args.pdg_accs_fn_pat}") kkonganti@11: target_acc_col = 41 kkonganti@11: acc_col = 9 kkonganti@11: num_accs_check = list() kkonganti@11: mlst_sts = dict() kkonganti@11: acceptable_num_mlst_cols = 10 kkonganti@11: mlst_st_col = 2 kkonganti@11: mlst_acc_col = 0 kkonganti@11: kkonganti@11: # Basic checks kkonganti@11: kkonganti@11: if os.path.exists(pdg_dir) and os.path.isdir(pdg_dir): kkonganti@11: pdg_meta_file = [f for f in os.listdir(pdg_dir) if pdg_meta_pat.match(f)] kkonganti@11: pdg_snp_meta_file = [f for f in os.listdir(pdg_dir) if pdg_snp_meta_pat.match(f)] kkonganti@11: pdg_acc_all = [f for f in os.listdir(pdg_dir) if pdg_accs_fn_pat.match(f)] kkonganti@11: req_files = [len(pdg_meta_file), len(pdg_snp_meta_file), len(pdg_acc_all)] kkonganti@11: if any(x > 1 for x in req_files): kkonganti@11: logging.error( kkonganti@11: f"Directory {os.path.basename(pdg_dir)} contains" kkonganti@11: + "\ncontains mulitple files matching the search pattern." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: elif any(x == 0 for x in req_files): kkonganti@11: logging.error( kkonganti@11: f"Directory {os.path.basename(pdg_dir)} does not contain" kkonganti@11: + "\nany files matching the following file patterns:" kkonganti@11: + f"\n{pdg_meta_pat.pattern}" kkonganti@11: + f"\n{pdg_snp_meta_pat.pattern}" kkonganti@11: + f"\n{pdg_accs_fn_pat.pattern}" kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: pdg_meta_file = os.path.join(pdg_dir, "".join(pdg_meta_file)) kkonganti@11: pdg_snp_meta_file = os.path.join(pdg_dir, "".join(pdg_snp_meta_file)) kkonganti@11: pdg_acc_all = os.path.join(pdg_dir, "".join(pdg_acc_all)) kkonganti@11: else: kkonganti@11: logging.error(f"Directory path {pdg_dir} does not exist.") kkonganti@11: exit(1) kkonganti@11: kkonganti@11: if mlst_res and not (os.path.exists(mlst_res) or os.path.getsize(mlst_res) > 0): kkonganti@11: logging.error( kkonganti@11: f"Requested to index MLST results. but the file {os.path.basename(mlst_res)}" kkonganti@11: + "does not exist or the file is empty." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: elif mlst_res: kkonganti@11: with open(mlst_res, "r") as mlst_res_fh: kkonganti@11: header = mlst_res_fh.readline() kkonganti@11: mlst_res_has_10_cols = False kkonganti@11: kkonganti@11: for line in mlst_res_fh: kkonganti@11: cols = line.strip().split("\t") kkonganti@11: acc = acc_pat.findall(cols[mlst_acc_col]) kkonganti@11: if len(acc) > 1: kkonganti@11: logging.error(f"Found more than 1 accession in column:\ncols[mlst_acc_col]\n") kkonganti@11: exit(1) kkonganti@11: else: kkonganti@11: acc = "".join(acc) kkonganti@11: if len(cols) == acceptable_num_mlst_cols and re.match(r"\d+|\-", cols[mlst_st_col]): kkonganti@11: mlst_res_has_10_cols = True kkonganti@11: if re.match(r"\-", cols[mlst_st_col]): kkonganti@11: mlst_sts[acc] = "NULL" kkonganti@11: else: kkonganti@11: mlst_sts[acc] = cols[mlst_st_col] kkonganti@11: kkonganti@11: if not mlst_res_has_10_cols: kkonganti@11: logging.error( kkonganti@11: "Requested to incorporate MLST ST's but file" kkonganti@11: + f"\n{os.path.basename(mlst_res)}" kkonganti@11: + "\ndoes not have 10 columns in all rows." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: mlst_res_fh.close() kkonganti@11: kkonganti@11: with open(pdg_acc_all, "r") as pdg_acc_all_fh: kkonganti@11: for a in pdg_acc_all_fh.readlines(): kkonganti@11: num_accs_check.append(a.strip()) kkonganti@11: kkonganti@11: if not mcols_pat.match(mcols): kkonganti@11: logging.error( kkonganti@11: f"Supplied columns' names should only be" kkonganti@11: + "\nalphanumeric (including _) separated by a comma." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: else: kkonganti@11: mcols = re.sub("\n", "", mcols).split(",") kkonganti@11: kkonganti@11: if ( kkonganti@11: pdg_snp_meta_file kkonganti@11: and os.path.exists(pdg_snp_meta_file) kkonganti@11: and os.path.getsize(pdg_snp_meta_file) > 0 kkonganti@11: ): kkonganti@11: acc2snp = defaultdict() kkonganti@11: acc2meta = defaultdict(defaultdict) kkonganti@11: init_pickled_sero = os.path.join(os.getcwd(), out_prefix + ".pickle") kkonganti@11: kkonganti@11: if ( kkonganti@11: os.path.exists(init_pickled_sero) kkonganti@11: and os.path.getsize(init_pickled_sero) kkonganti@11: and not f_write_pick kkonganti@11: ): kkonganti@11: logging.error( kkonganti@11: f"File {os.path.basename(init_pickled_sero)} already exists in\n{os.getcwd()}\n" kkonganti@11: + "Use -fs to force overwrite it." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: with open(pdg_snp_meta_file, "r") as snp_meta: kkonganti@11: header = snp_meta.readline() kkonganti@11: skipped_acc2snp = set() kkonganti@11: for line in snp_meta: kkonganti@11: cols = line.strip().split(pdg_meta_fs) kkonganti@11: if not 4 <= len(cols) < 5: kkonganti@11: logging.error( kkonganti@11: f"The metadata file {pdg_snp_meta_file} is malformed.\n" kkonganti@11: + f"Expected 4 columns. Got {len(cols)} columns.\n" kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: if re.match("NULL", cols[3]): kkonganti@11: skipped_acc2snp.add(f"Isolate {cols[1]} has no genome accession: {cols[3]}") kkonganti@11: elif not acc_pat.match(cols[3]): kkonganti@11: logging.error( kkonganti@11: f"Did not find accession in either field number 4\n" kkonganti@11: + "or field number 10 of column 4." kkonganti@11: + f"\nLine: {line}" kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: elif not re.match("NULL", cols[3]): kkonganti@11: acc2snp[cols[3]] = cols[0] kkonganti@11: kkonganti@11: if len(skipped_acc2snp) > 0: kkonganti@11: logging.info( kkonganti@11: f"While indexing\n{os.path.basename(pdg_snp_meta_file)}," kkonganti@11: + "\nthese isolates were skipped:\n\n" kkonganti@11: + "\n".join(skipped_acc2snp) kkonganti@11: ) kkonganti@11: kkonganti@11: with open(pdg_meta_file, "r") as pdg_meta: kkonganti@11: header = pdg_meta.readline().strip().split(pdg_meta_fs) kkonganti@11: user_req_cols = [mcol_i for mcol_i, mcol in enumerate(header) if mcol in mcols] kkonganti@11: cols_not_found = [mcol for mcol in mcols if mcol not in header] kkonganti@11: null_wgs_accs = set() kkonganti@11: if len(cols_not_found) > 0: kkonganti@11: logging.error( kkonganti@11: f"The following columns do not exist in the" kkonganti@11: + f"\nmetadata file [ {os.path.basename(pdg_meta_file)} ]:\n" kkonganti@11: + "".join(cols_not_found) kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: for line in pdg_meta.readlines(): kkonganti@11: cols = line.strip().split(pdg_meta_fs) kkonganti@11: pdg_assm_acc = cols[acc_col] kkonganti@11: if not acc_pat.match(pdg_assm_acc): kkonganti@11: null_wgs_accs.add( kkonganti@11: f"Isolate {cols[target_acc_col]} has no genome accession: {pdg_assm_acc}" kkonganti@11: ) kkonganti@11: continue kkonganti@11: kkonganti@11: if pdg_assm_acc in mlst_sts.keys(): kkonganti@11: acc2meta[pdg_assm_acc].setdefault("mlst_sequence_type", []).append( kkonganti@11: str(mlst_sts[pdg_assm_acc]) kkonganti@11: ) kkonganti@11: kkonganti@11: for col in user_req_cols: kkonganti@11: acc2meta[pdg_assm_acc].setdefault(header[col], []).append(str(cols[col])) kkonganti@11: kkonganti@11: if len(null_wgs_accs) > 0: kkonganti@11: logging.info( kkonganti@11: f"While indexing\n{os.path.basename(pdg_meta_file)}," kkonganti@11: + "\nthese isolates were skipped:\n\n" kkonganti@11: + "\n".join(null_wgs_accs) kkonganti@11: ) kkonganti@11: kkonganti@11: with open(init_pickled_sero, "wb") as write_pickled_sero: kkonganti@11: pickle.dump(file=write_pickled_sero, obj=acc2meta) kkonganti@11: kkonganti@11: if len(num_accs_check) != len(acc2meta.keys()): kkonganti@11: logging.error( kkonganti@11: "Failed the accession count check." kkonganti@11: + f"\nExpected {len(num_accs_check)} accessions but got {len(acc2meta.keys())}." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: else: kkonganti@11: logging.info( kkonganti@11: f"Number of valid accessions: {len(num_accs_check)}" kkonganti@11: + f"\nNumber of accessions indexed: {len(acc2meta.keys())}" kkonganti@11: + f"\nNumber of accessions participating in any of the SNP Clusters: {len(acc2snp.keys())}" kkonganti@11: + f"\n\nCreated the pickle file for\n{os.path.basename(pdg_meta_file)}." kkonganti@11: + "\nThis was the only requested function." kkonganti@11: ) kkonganti@11: kkonganti@11: snp_meta.close() kkonganti@11: write_pickled_sero.close() kkonganti@11: exit(0) kkonganti@11: elif pdg_meta_file and not ( kkonganti@11: os.path.exists(pdg_meta_file) and os.path.getsize(pdg_meta_file) > 0 kkonganti@11: ): kkonganti@11: logging.error( kkonganti@11: "Requested to create pickle from metadata, but\n" kkonganti@11: + f"the file, {os.path.basename(pdg_meta_file)} is empty or\ndoes not exist!" kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: pdg_acc_all_fh.close() kkonganti@11: snp_meta.close() kkonganti@11: pdg_meta.close() kkonganti@11: write_pickled_sero.close() kkonganti@11: kkonganti@11: kkonganti@11: if __name__ == "__main__": kkonganti@11: main()