Mercurial > repos > kkonganti > cfsan_cronology
diff 0.1.0/bin/index_pdg_metadata.py @ 0:c8597e9e1a97
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 27 Nov 2023 12:37:44 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.1.0/bin/index_pdg_metadata.py Mon Nov 27 12:37:44 2023 -0500 @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 + +# Kranti Konganti + +import argparse +import inspect +import logging +import os +import pickle +import pprint +import re +from collections import defaultdict + + +# Multiple inheritence for pretty printing of help text. +class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): + pass + + +# Main +def main() -> None: + """ + This script works only in the context of `cronology_db` Nextflow workflow. + It takes an UNIX path to directory containing the following files: + 1. PDG metadata file (Ex: `PDG000000043.204.metadata.tsv`) + 2. PDG SNP Cluster metadata file (Ex: `PDG000000043.204.reference_target.cluster_list.tsv`) + 3. A list of possibly downloadable assembly accessions (one per line) from the metadata file. + and then generates a pickled file with relevant metadata columns mentioned with the -cols option. + """ + + # Set logging. + logging.basicConfig( + format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", + level=logging.DEBUG, + ) + + # Debug print. + ppp = pprint.PrettyPrinter(width=55) + prog_name = os.path.basename(inspect.stack()[0].filename) + + parser = argparse.ArgumentParser( + prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses + ) + + required = parser.add_argument_group("required arguments") + + required.add_argument( + "-pdg_dir", + dest="pdg_dir", + default=False, + required=True, + help="Absolute UNIX path to directory containing the following files.\nEx:" + + "\n1. PDG000000043.204.metadata.tsv" + + "\n2. PDG000000043.204.reference_target.cluster_list.tsv" + + "\n3. A file of assembly accessions, one per line parsed out from" + + "\n the metadata file.", + ) + parser.add_argument( + "-mlst", + dest="mlst_results", + required=False, + help="Absolute UNIX path to MLST results file\nIf MLST results exists for a accession, they" + + "\nwill be included in the index.", + ) + parser.add_argument( + "-pdg_meta_pat", + dest="pdg_meta_pat", + default="PDG\d+\.\d+\.metadata\.tsv", + required=False, + help="The pattern to be used to search for PDG metadata\nfile.", + ) + parser.add_argument( + "-pdg_snp_meta_pat", + dest="pdg_snp_meta_pat", + default="PDG\d+\.\d+\.reference\_target\.cluster\_list\.tsv", + required=False, + help="The pattern to be used to search for PDG SNP Cluster metadata\nfile.", + ) + parser.add_argument( + "-pdg_accs_filename_pat", + dest="pdg_accs_fn_pat", + default="accs_all.txt", + required=False, + help="The filename to look for where all the parsed GC[AF] accessions are stored,\n" + + "one per line.", + ) + parser.add_argument( + "-cols", + dest="metadata_cols", + default="epi_type,collected_by,collection_date,host," + + "\nhost_disease,isolation_source,outbreak,sample_name,scientific_name,serovar," + + "\nsource_type,strain,computed_types,target_acc", + required=False, + help="The data in these metadata columns will be indexed for each\nisolate.", + ) + parser.add_argument( + "-fs", + dest="force_write_pick", + action="store_true", + required=False, + help="By default, when -s flag is on, the pickle file named *.IDXD_PDG_METAD.pickle" + + "\nis written to CWD. If the file exists, the program will not overwrite" + + "\nand exit. Use -fs option to overwrite.", + ) + parser.add_argument( + "-op", + dest="out_prefix", + default="IDXD_PDG_METAD", + help="Set the output file prefix for indexed PDG metadata.", + ) + parser.add_argument( + "-pfs", + dest="pdg_meta_fs", + default="\t", + help="Change the field separator of the PDG metadata file.", + ) + + args = parser.parse_args() + pdg_dir = os.path.abspath(args.pdg_dir) + mcols = args.metadata_cols + f_write_pick = args.force_write_pick + out_prefix = args.out_prefix + pdg_meta_fs = args.pdg_meta_fs + mlst_res = args.mlst_results + acc_pat = re.compile(r"^GC[AF]\_\d+\.?\d*") + mcols_pat = re.compile(r"[\w+\,]") + pdg_meta_pat = re.compile(f"{args.pdg_meta_pat}") + pdg_snp_meta_pat = re.compile(f"{args.pdg_snp_meta_pat}") + pdg_accs_fn_pat = re.compile(f"{args.pdg_accs_fn_pat}") + target_acc_col = 41 + acc_col = 9 + num_accs_check = list() + mlst_sts = dict() + acceptable_num_mlst_cols = 10 + mlst_st_col = 2 + mlst_acc_col = 0 + + # Basic checks + + if os.path.exists(pdg_dir) and os.path.isdir(pdg_dir): + pdg_meta_file = [f for f in os.listdir(pdg_dir) if pdg_meta_pat.match(f)] + pdg_snp_meta_file = [f for f in os.listdir(pdg_dir) if pdg_snp_meta_pat.match(f)] + pdg_acc_all = [f for f in os.listdir(pdg_dir) if pdg_accs_fn_pat.match(f)] + req_files = [len(pdg_meta_file), len(pdg_snp_meta_file), len(pdg_acc_all)] + if any(x > 1 for x in req_files): + logging.error( + f"Directory {os.path.basename(pdg_dir)} contains" + + "\ncontains mulitple files matching the search pattern." + ) + exit(1) + elif any(x == 0 for x in req_files): + logging.error( + f"Directory {os.path.basename(pdg_dir)} does not contain" + + "\nany files matching the following file patterns:" + + f"\n{pdg_meta_pat.pattern}" + + f"\n{pdg_snp_meta_pat.pattern}" + + f"\n{pdg_accs_fn_pat.pattern}" + ) + exit(1) + pdg_meta_file = os.path.join(pdg_dir, "".join(pdg_meta_file)) + pdg_snp_meta_file = os.path.join(pdg_dir, "".join(pdg_snp_meta_file)) + pdg_acc_all = os.path.join(pdg_dir, "".join(pdg_acc_all)) + else: + logging.error(f"Directory path {pdg_dir} does not exist.") + exit(1) + + if mlst_res and not (os.path.exists(mlst_res) or os.path.getsize(mlst_res) > 0): + logging.error( + f"Requested to index MLST results. but the file {os.path.basename(mlst_res)}" + + "does not exist or the file is empty." + ) + exit(1) + elif mlst_res: + with open(mlst_res, "r") as mlst_res_fh: + header = mlst_res_fh.readline() + mlst_res_has_10_cols = False + + for line in mlst_res_fh: + cols = line.strip().split("\t") + acc = acc_pat.findall(cols[mlst_acc_col]) + if len(acc) > 1: + logging.error(f"Found more than 1 accession in column:\ncols[mlst_acc_col]\n") + exit(1) + else: + acc = "".join(acc) + if len(cols) == acceptable_num_mlst_cols and re.match(r"\d+|\-", cols[mlst_st_col]): + mlst_res_has_10_cols = True + if re.match(r"\-", cols[mlst_st_col]): + mlst_sts[acc] = "NULL" + else: + mlst_sts[acc] = cols[mlst_st_col] + + if not mlst_res_has_10_cols: + logging.error( + "Requested to incorporate MLST ST's but file" + + f"\n{os.path.basename(mlst_res)}" + + "\ndoes not have 10 columns in all rows." + ) + exit(1) + + mlst_res_fh.close() + + with open(pdg_acc_all, "r") as pdg_acc_all_fh: + for a in pdg_acc_all_fh.readlines(): + num_accs_check.append(a.strip()) + + if not mcols_pat.match(mcols): + logging.error( + f"Supplied columns' names should only be" + + "\nalphanumeric (including _) separated by a comma." + ) + exit(1) + else: + mcols = re.sub("\n", "", mcols).split(",") + + if ( + pdg_snp_meta_file + and os.path.exists(pdg_snp_meta_file) + and os.path.getsize(pdg_snp_meta_file) > 0 + ): + acc2snp = defaultdict() + acc2meta = defaultdict(defaultdict) + init_pickled_sero = os.path.join(os.getcwd(), out_prefix + ".pickle") + + if ( + os.path.exists(init_pickled_sero) + and os.path.getsize(init_pickled_sero) + and not f_write_pick + ): + logging.error( + f"File {os.path.basename(init_pickled_sero)} already exists in\n{os.getcwd()}\n" + + "Use -fs to force overwrite it." + ) + exit(1) + + with open(pdg_snp_meta_file, "r") as snp_meta: + header = snp_meta.readline() + skipped_acc2snp = set() + for line in snp_meta: + cols = line.strip().split(pdg_meta_fs) + if not 4 <= len(cols) < 5: + logging.error( + f"The metadata file {pdg_snp_meta_file} is malformed.\n" + + f"Expected 4 columns. Got {len(cols)} columns.\n" + ) + exit(1) + + if re.match("NULL", cols[3]): + skipped_acc2snp.add(f"Isolate {cols[1]} has no genome accession: {cols[3]}") + elif not acc_pat.match(cols[3]): + logging.error( + f"Did not find accession in either field number 4\n" + + "or field number 10 of column 4." + + f"\nLine: {line}" + ) + exit(1) + elif not re.match("NULL", cols[3]): + acc2snp[cols[3]] = cols[0] + + if len(skipped_acc2snp) > 0: + logging.info( + f"While indexing\n{os.path.basename(pdg_snp_meta_file)}," + + "\nthese isolates were skipped:\n\n" + + "\n".join(skipped_acc2snp) + ) + + with open(pdg_meta_file, "r") as pdg_meta: + header = pdg_meta.readline().strip().split(pdg_meta_fs) + user_req_cols = [mcol_i for mcol_i, mcol in enumerate(header) if mcol in mcols] + cols_not_found = [mcol for mcol in mcols if mcol not in header] + null_wgs_accs = set() + if len(cols_not_found) > 0: + logging.error( + f"The following columns do not exist in the" + + f"\nmetadata file [ {os.path.basename(pdg_meta_file)} ]:\n" + + "".join(cols_not_found) + ) + exit(1) + + for line in pdg_meta.readlines(): + cols = line.strip().split(pdg_meta_fs) + pdg_assm_acc = cols[acc_col] + if not acc_pat.match(pdg_assm_acc): + null_wgs_accs.add( + f"Isolate {cols[target_acc_col]} has no genome accession: {pdg_assm_acc}" + ) + continue + + if pdg_assm_acc in mlst_sts.keys(): + acc2meta[pdg_assm_acc].setdefault("mlst_sequence_type", []).append( + str(mlst_sts[pdg_assm_acc]) + ) + + for col in user_req_cols: + acc2meta[pdg_assm_acc].setdefault(header[col], []).append(str(cols[col])) + + if len(null_wgs_accs) > 0: + logging.info( + f"While indexing\n{os.path.basename(pdg_meta_file)}," + + "\nthese isolates were skipped:\n\n" + + "\n".join(null_wgs_accs) + ) + + with open(init_pickled_sero, "wb") as write_pickled_sero: + pickle.dump(file=write_pickled_sero, obj=acc2meta) + + if len(num_accs_check) != len(acc2meta.keys()): + logging.error( + "Failed the accession count check." + + f"\nExpected {len(num_accs_check)} accessions but got {len(acc2meta.keys())}." + ) + exit(1) + else: + logging.info( + f"Number of valid accessions: {len(num_accs_check)}" + + f"\nNumber of accessions indexed: {len(acc2meta.keys())}" + + f"\nNumber of accessions participating in any of the SNP Clusters: {len(acc2snp.keys())}" + + f"\n\nCreated the pickle file for\n{os.path.basename(pdg_meta_file)}." + + "\nThis was the only requested function." + ) + + snp_meta.close() + write_pickled_sero.close() + exit(0) + elif pdg_meta_file and not ( + os.path.exists(pdg_meta_file) and os.path.getsize(pdg_meta_file) > 0 + ): + logging.error( + "Requested to create pickle from metadata, but\n" + + f"the file, {os.path.basename(pdg_meta_file)} is empty or\ndoes not exist!" + ) + exit(1) + + pdg_acc_all_fh.close() + snp_meta.close() + pdg_meta.close() + write_pickled_sero.close() + + +if __name__ == "__main__": + main()