Mercurial > repos > kkonganti > cfsan_bettercallsal
view 0.5.0/bin/get_top_unique_mash_hit_genomes.py @ 1:365849f031fd
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 05 Jun 2023 18:48:51 -0400 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Kranti Konganti import os import glob import pickle import argparse import inspect import logging import re import pprint from collections import defaultdict # Multiple inheritence for pretty printing of help text. class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): pass # Main def main() -> None: """ This script works only in the context of `bettercallsal` Nextflow workflow. It takes: 1. A pickle file containing a dictionary object where genome accession is the key and the computed serotype is the value. 2. A file with `mash screen` results run against the Salmonella SNP Cluster genomes' sketch. 3. A directory containing genomes' FASTA in gzipped format where the FASTA file contains 2 lines: one FASTA header followed by genome Sequence. and then generates a concatenated FASTA file of top N unique `mash screen` genome hits as requested. """ # Set logging. logging.basicConfig( format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", level=logging.DEBUG, ) # Debug print. ppp = pprint.PrettyPrinter(width=55) prog_name = os.path.basename(inspect.stack()[0].filename) parser = argparse.ArgumentParser( prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses ) parser.add_argument( "-s", dest="sero_snp_metadata", default=False, required=False, help="Absolute UNIX path to metadata text file with the field separator, | " + "\nand 5 fields: serotype|asm_lvl|asm_url|snp_cluster_id" + "\nEx: serotype=Derby,antigen_formula=4:f,g:-|Scaffold|402440|ftp://...\n|PDS000096654.2\n" + "Mentioning this option will create a pickle file for the\nprovided metadata and exits.", ) parser.add_argument( "-fs", dest="force_write_pick", action="store_true", required=False, help="By default, when -s flag is on, the pickle file named *.ACC2SERO.pickle\n" + "is written to CWD. If the file exists, the program will not overwrite\n" + "and exit. Use -fs option to overwrite.", ) parser.add_argument( "-m", dest="mash_screen_res", default=False, required=False, help="Absolute UNIX path to `mash screen` results file.", ) parser.add_argument( "-ms", dest="mash_screen_res_suffix", default=".screened", required=False, help="Suffix of the `mash screen` result file.", ) parser.add_argument( "-ps", dest="pickled_sero", default=False, required=False, help="Absolute UNIX Path to serialized metadata object in a pickle file.\n" + "You can create the pickle file of the metadata using -s option.\n" + "Required if -m is on.", ) parser.add_argument( "-gd", dest="genomes_dir", default=False, required=False, help="Absolute UNIX path to a directory containing\n" + "gzipped genome FASTA files.\n" + "Required if -m is on.", ) parser.add_argument( "-gds", dest="genomes_dir_suffix", default="_scaffolded_genomic.fna.gz", required=False, help="Genome FASTA file suffix to search for\nin the directory mentioned using\n-gd.", ) parser.add_argument( "-n", dest="num_uniq_hits", default=10, help="This many number of serotype genomes' accessions are " + "\nreturned.", ) parser.add_argument( "-op", dest="out_prefix", default="MASH_SCREEN", help="Set the output file prefix for .fna.gz and .txt files.", ) # required = parser.add_argument_group('required arguments') args = parser.parse_args() num_uniq_hits = int(args.num_uniq_hits) mash_screen_res = args.mash_screen_res mash_screen_res_suffix = args.mash_screen_res_suffix pickle_sero = args.sero_snp_metadata pickled_sero = args.pickled_sero f_write_pick = args.force_write_pick genomes_dir = args.genomes_dir genomes_dir_suffix = args.genomes_dir_suffix out_prefix = args.out_prefix mash_genomes_gz = os.path.join( os.getcwd(), out_prefix + "_TOP_" + str(num_uniq_hits) + "_UNIQUE_HITS.fna.gz" ) mash_uniq_hits_txt = os.path.join( os.getcwd(), re.sub(".fna.gz", ".txt", os.path.basename(mash_genomes_gz)) ) if mash_screen_res and os.path.exists(mash_genomes_gz): logging.error( "A concatenated genome FASTA file,\n" + f"{os.path.basename(mash_genomes_gz)} already exists in:\n" + f"{os.getcwd()}\n" + "Please remove or move it as we will not " + "overwrite it." ) exit(1) if os.path.exists(mash_uniq_hits_txt) and os.path.getsize(mash_uniq_hits_txt) > 0: os.remove(mash_uniq_hits_txt) if mash_screen_res and (not genomes_dir or not pickled_sero): logging.error("When -m is on, -ps and -gd are also required.") exit(1) if genomes_dir: if not os.path.isdir(genomes_dir): logging.error("UNIX path\n" + f"{genomes_dir}\n" + "does not exist!") exit(1) if len(glob.glob(os.path.join(genomes_dir, "*" + genomes_dir_suffix))) <= 0: logging.error( "Genomes directory" + f"{genomes_dir}" + "\ndoes not seem to have any\n" + f"files ending with suffix: {genomes_dir_suffix}" ) exit(1) if pickle_sero and os.path.exists(pickle_sero) and os.path.getsize(pickle_sero) > 0: acc2serotype = defaultdict() init_pickled_sero = os.path.join(os.getcwd(), out_prefix + ".ACC2SERO.pickle") if ( os.path.exists(init_pickled_sero) and os.path.getsize(init_pickled_sero) and not f_write_pick ): logging.error( f"File {os.path.basename(init_pickled_sero)} already exists in\n{os.getcwd()}\n" + "Use -fs to force overwrite it." ) exit(1) with open(pickle_sero, "r") as sero_snp_meta: for line in sero_snp_meta: cols = line.strip().split("|") url_cols = cols[3].split("/") if not 4 <= len(cols) <= 5: logging.error( f"The metadata file {pickle_sero} is malformed.\n" + f"Expected 4-5 columns. Got {len(cols)} columns.\n" ) exit(1) if not len(url_cols) > 5: acc = url_cols[3] else: acc = url_cols[9] if not re.match(r"^GC[AF]\_\d+\.\d+$", acc): logging.error( f"Did not find accession in either field number 4\n" + "or field number 10 of column 4." ) exit(1) acc2serotype[acc] = cols[0] with open(init_pickled_sero, "wb") as write_pickled_sero: pickle.dump(file=write_pickled_sero, obj=acc2serotype) logging.info( f"Created the pickle file for\n{os.path.basename(pickle_sero)}.\n" + "This was the only requested function." ) sero_snp_meta.close() write_pickled_sero.close() exit(0) elif pickle_sero and not (os.path.exists(pickle_sero) and os.path.getsize(pickle_sero) > 0): logging.error( "Requested to create pickle from metadata, but\n" + f"the file, {os.path.basename(pickle_sero)} is empty or\ndoes not exist!" ) exit(1) if mash_screen_res and os.path.exists(mash_screen_res): if os.path.getsize(mash_screen_res) > 0: seen_uniq_hits = 0 unpickled_acc2serotype = pickle.load(file=open(pickled_sero, "rb")) with open(mash_screen_res, "r") as msh_res: mash_hits = defaultdict() seen_mash_sero = defaultdict() for line in msh_res: cols = line.strip().split("\t") if len(cols) < 5: logging.error( f"The file {os.path.basename(mash_screen_res)} seems to\n" + "be malformed. It contains less than required 5-6 columns." ) exit(1) mash_hit_acc = re.sub( genomes_dir_suffix, "", str((re.search(r"GC[AF].*?" + genomes_dir_suffix, cols[4])).group()), ) if mash_hit_acc: mash_hits.setdefault(cols[0], []).append(mash_hit_acc) else: logging.error( "Did not find an assembly accession in column\n" + f"number 5. Found {cols[4]} instead. Cannot proceed!" ) exit(1) msh_res.close() elif os.path.getsize(mash_screen_res) == 0: failed_sample_name = os.path.basename(mash_screen_res).rstrip(mash_screen_res_suffix) with open( os.path.join(os.getcwd(), "_".join([out_prefix, "FAILED.txt"])), "w" ) as failed_sample_fh: failed_sample_fh.write(f"{failed_sample_name}\n") failed_sample_fh.close() exit(0) # ppp.pprint(mash_hits) msh_out_txt = open(mash_uniq_hits_txt, "w") with open(mash_genomes_gz, "wb") as msh_out_gz: for _, (ident, acc_list) in enumerate(sorted(mash_hits.items(), reverse=True)): for acc in acc_list: if seen_uniq_hits >= num_uniq_hits: break if unpickled_acc2serotype[acc] not in seen_mash_sero.keys(): seen_mash_sero[unpickled_acc2serotype[acc]] = 1 seen_uniq_hits += 1 # print(acc.strip() + '\t' + ident + '\t' + unpickled_acc2serotype[acc], file=sys.stdout) msh_out_txt.write( f"{acc.strip()}\t{unpickled_acc2serotype[acc]}\t{ident}\n" ) with open( os.path.join(genomes_dir, acc + genomes_dir_suffix), "rb" ) as msh_in_gz: msh_out_gz.writelines(msh_in_gz.readlines()) msh_in_gz.close() msh_out_gz.close() msh_out_txt.close() logging.info( f"File {os.path.basename(mash_genomes_gz)}\n" + f"written in:\n{os.getcwd()}\nDone! Bye!" ) exit(0) if __name__ == "__main__": main()