kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: # Kranti Konganti kkonganti@0: kkonganti@0: import argparse kkonganti@0: import inspect kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import pickle kkonganti@0: import pprint kkonganti@0: import re kkonganti@0: from collections import defaultdict kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: # Main kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script works only in the context of `cronology_db` Nextflow workflow. kkonganti@0: It takes an UNIX path to directory containing the following files: kkonganti@0: 1. PDG metadata file (Ex: `PDG000000043.204.metadata.tsv`) kkonganti@0: 2. PDG SNP Cluster metadata file (Ex: `PDG000000043.204.reference_target.cluster_list.tsv`) kkonganti@0: 3. A list of possibly downloadable assembly accessions (one per line) from the metadata file. kkonganti@0: and then generates a pickled file with relevant metadata columns mentioned with the -cols option. kkonganti@0: """ kkonganti@0: kkonganti@0: # Set logging. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: # Debug print. kkonganti@0: ppp = pprint.PrettyPrinter(width=55) kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-pdg_dir", kkonganti@0: dest="pdg_dir", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to directory containing the following files.\nEx:" kkonganti@0: + "\n1. PDG000000043.204.metadata.tsv" kkonganti@0: + "\n2. PDG000000043.204.reference_target.cluster_list.tsv" kkonganti@0: + "\n3. A file of assembly accessions, one per line parsed out from" kkonganti@0: + "\n the metadata file.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-mlst", kkonganti@0: dest="mlst_results", kkonganti@0: required=False, kkonganti@0: help="Absolute UNIX path to MLST results file\nIf MLST results exists for a accession, they" kkonganti@0: + "\nwill be included in the index.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-pdg_meta_pat", kkonganti@0: dest="pdg_meta_pat", kkonganti@0: default="PDG\d+\.\d+\.metadata\.tsv", kkonganti@0: required=False, kkonganti@0: help="The pattern to be used to search for PDG metadata\nfile.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-pdg_snp_meta_pat", kkonganti@0: dest="pdg_snp_meta_pat", kkonganti@0: default="PDG\d+\.\d+\.reference\_target\.cluster\_list\.tsv", kkonganti@0: required=False, kkonganti@0: help="The pattern to be used to search for PDG SNP Cluster metadata\nfile.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-pdg_accs_filename_pat", kkonganti@0: dest="pdg_accs_fn_pat", kkonganti@0: default="accs_all.txt", kkonganti@0: required=False, kkonganti@0: help="The filename to look for where all the parsed GC[AF] accessions are stored,\n" kkonganti@0: + "one per line.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-cols", kkonganti@0: dest="metadata_cols", kkonganti@0: default="epi_type,collected_by,collection_date,host," kkonganti@0: + "\nhost_disease,isolation_source,outbreak,sample_name,scientific_name,serovar," kkonganti@0: + "\nsource_type,strain,computed_types,target_acc", kkonganti@0: required=False, kkonganti@0: help="The data in these metadata columns will be indexed for each\nisolate.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-fs", kkonganti@0: dest="force_write_pick", kkonganti@0: action="store_true", kkonganti@0: required=False, kkonganti@0: help="By default, when -s flag is on, the pickle file named *.IDXD_PDG_METAD.pickle" kkonganti@0: + "\nis written to CWD. If the file exists, the program will not overwrite" kkonganti@0: + "\nand exit. Use -fs option to overwrite.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-op", kkonganti@0: dest="out_prefix", kkonganti@0: default="IDXD_PDG_METAD", kkonganti@0: help="Set the output file prefix for indexed PDG metadata.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-pfs", kkonganti@0: dest="pdg_meta_fs", kkonganti@0: default="\t", kkonganti@0: help="Change the field separator of the PDG metadata file.", kkonganti@0: ) kkonganti@0: kkonganti@0: args = parser.parse_args() kkonganti@0: pdg_dir = os.path.abspath(args.pdg_dir) kkonganti@0: mcols = args.metadata_cols kkonganti@0: f_write_pick = args.force_write_pick kkonganti@0: out_prefix = args.out_prefix kkonganti@0: pdg_meta_fs = args.pdg_meta_fs kkonganti@0: mlst_res = args.mlst_results kkonganti@0: acc_pat = re.compile(r"^GC[AF]\_\d+\.?\d*") kkonganti@0: mcols_pat = re.compile(r"[\w+\,]") kkonganti@0: pdg_meta_pat = re.compile(f"{args.pdg_meta_pat}") kkonganti@0: pdg_snp_meta_pat = re.compile(f"{args.pdg_snp_meta_pat}") kkonganti@0: pdg_accs_fn_pat = re.compile(f"{args.pdg_accs_fn_pat}") kkonganti@0: target_acc_col = 41 kkonganti@0: acc_col = 9 kkonganti@0: num_accs_check = list() kkonganti@0: mlst_sts = dict() kkonganti@0: acceptable_num_mlst_cols = 10 kkonganti@0: mlst_st_col = 2 kkonganti@0: mlst_acc_col = 0 kkonganti@0: kkonganti@0: # Basic checks kkonganti@0: kkonganti@0: if os.path.exists(pdg_dir) and os.path.isdir(pdg_dir): kkonganti@0: pdg_meta_file = [f for f in os.listdir(pdg_dir) if pdg_meta_pat.match(f)] kkonganti@0: pdg_snp_meta_file = [f for f in os.listdir(pdg_dir) if pdg_snp_meta_pat.match(f)] kkonganti@0: pdg_acc_all = [f for f in os.listdir(pdg_dir) if pdg_accs_fn_pat.match(f)] kkonganti@0: req_files = [len(pdg_meta_file), len(pdg_snp_meta_file), len(pdg_acc_all)] kkonganti@0: if any(x > 1 for x in req_files): kkonganti@0: logging.error( kkonganti@0: f"Directory {os.path.basename(pdg_dir)} contains" kkonganti@0: + "\ncontains mulitple files matching the search pattern." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif any(x == 0 for x in req_files): kkonganti@0: logging.error( kkonganti@0: f"Directory {os.path.basename(pdg_dir)} does not contain" kkonganti@0: + "\nany files matching the following file patterns:" kkonganti@0: + f"\n{pdg_meta_pat.pattern}" kkonganti@0: + f"\n{pdg_snp_meta_pat.pattern}" kkonganti@0: + f"\n{pdg_accs_fn_pat.pattern}" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: pdg_meta_file = os.path.join(pdg_dir, "".join(pdg_meta_file)) kkonganti@0: pdg_snp_meta_file = os.path.join(pdg_dir, "".join(pdg_snp_meta_file)) kkonganti@0: pdg_acc_all = os.path.join(pdg_dir, "".join(pdg_acc_all)) kkonganti@0: else: kkonganti@0: logging.error(f"Directory path {pdg_dir} does not exist.") kkonganti@0: exit(1) kkonganti@0: kkonganti@0: if mlst_res and not (os.path.exists(mlst_res) or os.path.getsize(mlst_res) > 0): kkonganti@0: logging.error( kkonganti@0: f"Requested to index MLST results. but the file {os.path.basename(mlst_res)}" kkonganti@0: + "does not exist or the file is empty." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif mlst_res: kkonganti@0: with open(mlst_res, "r") as mlst_res_fh: kkonganti@0: header = mlst_res_fh.readline() kkonganti@0: mlst_res_has_10_cols = False kkonganti@0: kkonganti@0: for line in mlst_res_fh: kkonganti@0: cols = line.strip().split("\t") kkonganti@0: acc = acc_pat.findall(cols[mlst_acc_col]) kkonganti@0: if len(acc) > 1: kkonganti@0: logging.error(f"Found more than 1 accession in column:\ncols[mlst_acc_col]\n") kkonganti@0: exit(1) kkonganti@0: else: kkonganti@0: acc = "".join(acc) kkonganti@0: if len(cols) == acceptable_num_mlst_cols and re.match(r"\d+|\-", cols[mlst_st_col]): kkonganti@0: mlst_res_has_10_cols = True kkonganti@0: if re.match(r"\-", cols[mlst_st_col]): kkonganti@0: mlst_sts[acc] = "NULL" kkonganti@0: else: kkonganti@0: mlst_sts[acc] = cols[mlst_st_col] kkonganti@0: kkonganti@0: if not mlst_res_has_10_cols: kkonganti@0: logging.error( kkonganti@0: "Requested to incorporate MLST ST's but file" kkonganti@0: + f"\n{os.path.basename(mlst_res)}" kkonganti@0: + "\ndoes not have 10 columns in all rows." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: mlst_res_fh.close() kkonganti@0: kkonganti@0: with open(pdg_acc_all, "r") as pdg_acc_all_fh: kkonganti@0: for a in pdg_acc_all_fh.readlines(): kkonganti@0: num_accs_check.append(a.strip()) kkonganti@0: kkonganti@0: if not mcols_pat.match(mcols): kkonganti@0: logging.error( kkonganti@0: f"Supplied columns' names should only be" kkonganti@0: + "\nalphanumeric (including _) separated by a comma." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: else: kkonganti@0: mcols = re.sub("\n", "", mcols).split(",") kkonganti@0: kkonganti@0: if ( kkonganti@0: pdg_snp_meta_file kkonganti@0: and os.path.exists(pdg_snp_meta_file) kkonganti@0: and os.path.getsize(pdg_snp_meta_file) > 0 kkonganti@0: ): kkonganti@0: acc2snp = defaultdict() kkonganti@0: acc2meta = defaultdict(defaultdict) kkonganti@0: init_pickled_sero = os.path.join(os.getcwd(), out_prefix + ".pickle") kkonganti@0: kkonganti@0: if ( kkonganti@0: os.path.exists(init_pickled_sero) kkonganti@0: and os.path.getsize(init_pickled_sero) kkonganti@0: and not f_write_pick kkonganti@0: ): kkonganti@0: logging.error( kkonganti@0: f"File {os.path.basename(init_pickled_sero)} already exists in\n{os.getcwd()}\n" kkonganti@0: + "Use -fs to force overwrite it." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: with open(pdg_snp_meta_file, "r") as snp_meta: kkonganti@0: header = snp_meta.readline() kkonganti@0: skipped_acc2snp = set() kkonganti@0: for line in snp_meta: kkonganti@0: cols = line.strip().split(pdg_meta_fs) kkonganti@0: if not 4 <= len(cols) < 5: kkonganti@0: logging.error( kkonganti@0: f"The metadata file {pdg_snp_meta_file} is malformed.\n" kkonganti@0: + f"Expected 4 columns. Got {len(cols)} columns.\n" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: if re.match("NULL", cols[3]): kkonganti@0: skipped_acc2snp.add(f"Isolate {cols[1]} has no genome accession: {cols[3]}") kkonganti@0: elif not acc_pat.match(cols[3]): kkonganti@0: logging.error( kkonganti@0: f"Did not find accession in either field number 4\n" kkonganti@0: + "or field number 10 of column 4." kkonganti@0: + f"\nLine: {line}" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif not re.match("NULL", cols[3]): kkonganti@0: acc2snp[cols[3]] = cols[0] kkonganti@0: kkonganti@0: if len(skipped_acc2snp) > 0: kkonganti@0: logging.info( kkonganti@0: f"While indexing\n{os.path.basename(pdg_snp_meta_file)}," kkonganti@0: + "\nthese isolates were skipped:\n\n" kkonganti@0: + "\n".join(skipped_acc2snp) kkonganti@0: ) kkonganti@0: kkonganti@0: with open(pdg_meta_file, "r") as pdg_meta: kkonganti@0: header = pdg_meta.readline().strip().split(pdg_meta_fs) kkonganti@0: user_req_cols = [mcol_i for mcol_i, mcol in enumerate(header) if mcol in mcols] kkonganti@0: cols_not_found = [mcol for mcol in mcols if mcol not in header] kkonganti@0: null_wgs_accs = set() kkonganti@0: if len(cols_not_found) > 0: kkonganti@0: logging.error( kkonganti@0: f"The following columns do not exist in the" kkonganti@0: + f"\nmetadata file [ {os.path.basename(pdg_meta_file)} ]:\n" kkonganti@0: + "".join(cols_not_found) kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: for line in pdg_meta.readlines(): kkonganti@0: cols = line.strip().split(pdg_meta_fs) kkonganti@0: pdg_assm_acc = cols[acc_col] kkonganti@0: if not acc_pat.match(pdg_assm_acc): kkonganti@0: null_wgs_accs.add( kkonganti@0: f"Isolate {cols[target_acc_col]} has no genome accession: {pdg_assm_acc}" kkonganti@0: ) kkonganti@0: continue kkonganti@0: kkonganti@0: if pdg_assm_acc in mlst_sts.keys(): kkonganti@0: acc2meta[pdg_assm_acc].setdefault("mlst_sequence_type", []).append( kkonganti@0: str(mlst_sts[pdg_assm_acc]) kkonganti@0: ) kkonganti@0: kkonganti@0: for col in user_req_cols: kkonganti@0: acc2meta[pdg_assm_acc].setdefault(header[col], []).append(str(cols[col])) kkonganti@0: kkonganti@0: if len(null_wgs_accs) > 0: kkonganti@0: logging.info( kkonganti@0: f"While indexing\n{os.path.basename(pdg_meta_file)}," kkonganti@0: + "\nthese isolates were skipped:\n\n" kkonganti@0: + "\n".join(null_wgs_accs) kkonganti@0: ) kkonganti@0: kkonganti@0: with open(init_pickled_sero, "wb") as write_pickled_sero: kkonganti@0: pickle.dump(file=write_pickled_sero, obj=acc2meta) kkonganti@0: kkonganti@0: if len(num_accs_check) != len(acc2meta.keys()): kkonganti@0: logging.error( kkonganti@0: "Failed the accession count check." kkonganti@0: + f"\nExpected {len(num_accs_check)} accessions but got {len(acc2meta.keys())}." kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: else: kkonganti@0: logging.info( kkonganti@0: f"Number of valid accessions: {len(num_accs_check)}" kkonganti@0: + f"\nNumber of accessions indexed: {len(acc2meta.keys())}" kkonganti@0: + f"\nNumber of accessions participating in any of the SNP Clusters: {len(acc2snp.keys())}" kkonganti@0: + f"\n\nCreated the pickle file for\n{os.path.basename(pdg_meta_file)}." kkonganti@0: + "\nThis was the only requested function." kkonganti@0: ) kkonganti@0: kkonganti@0: snp_meta.close() kkonganti@0: write_pickled_sero.close() kkonganti@0: exit(0) kkonganti@0: elif pdg_meta_file and not ( kkonganti@0: os.path.exists(pdg_meta_file) and os.path.getsize(pdg_meta_file) > 0 kkonganti@0: ): kkonganti@0: logging.error( kkonganti@0: "Requested to create pickle from metadata, but\n" kkonganti@0: + f"the file, {os.path.basename(pdg_meta_file)} is empty or\ndoes not exist!" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: pdg_acc_all_fh.close() kkonganti@0: snp_meta.close() kkonganti@0: pdg_meta.close() kkonganti@0: write_pickled_sero.close() kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main()