Mercurial > repos > kkonganti > cfsan_bettercallsal
diff 0.5.0/bin/gen_otf_genome.py @ 1:365849f031fd
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 05 Jun 2023 18:48:51 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.5.0/bin/gen_otf_genome.py Mon Jun 05 18:48:51 2023 -0400 @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 + +# Kranti Konganti + +import os +import argparse +import inspect +import logging +import re +import pprint +import glob + +# Set logging. +logging.basicConfig( + format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", + level=logging.DEBUG, +) + +# Debug print. +ppp = pprint.PrettyPrinter(width=50, indent=4) + +# Multiple inheritence for pretty printing of help text. +class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): + pass + + +def main() -> None: + """ + This script works only in the context of `bettercallsal` Nextflow workflow. + It takes: + 1. A text file containing accessions or FASTA IDs, one per line and + then, + 2. Searches for a genome FASTA file in gzipped format in specified + search path, where the prefix of the filename is the accession or + FASTA ID from 1. and then, + creates a new concatenated gzipped genome FASTA file with all the genomes + in the text file from 1. + """ + + prog_name = os.path.basename(inspect.stack()[0].filename) + + parser = argparse.ArgumentParser( + prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses + ) + + required = parser.add_argument_group("required arguments") + + required.add_argument( + "-txt", + dest="accs_txt", + default=False, + required=True, + help="Absolute UNIX path to .txt file containing accessions\n" + "FASTA IDs, one per line.", + ) + required.add_argument( + "-gd", + dest="genomes_dir", + default=False, + required=True, + help="Absolute UNIX path to a directory containing\n" + + "gzipped genome FASTA files.\n" + + "Required if -m is on.", + ) + parser.add_argument( + "-gds", + dest="genomes_dir_suffix", + default="_scaffolded_genomic.fna.gz", + required=False, + help="Genome FASTA file suffix to search for\nin the directory mentioned using\n-gd.", + ) + parser.add_argument( + "-op", + dest="out_prefix", + default="CATTED_GENOMES", + help="Set the output file prefix for .fna.gz and .txt\n" + "files.", + ) + parser.add_argument( + "-txts", + dest="accs_suffix", + default="_template_hits.txt", + required=False, + help="The suffix of the file supplied with -txt option. It is assumed that the\n" + + "sample name is present in the file supplied with -txt option and the suffix\n" + + "will be stripped and stored in a file that logs samples which have no hits.", + ) + + args = parser.parse_args() + accs_txt = args.accs_txt + genomes_dir = args.genomes_dir + genomes_dir_suffix = args.genomes_dir_suffix + out_prefix = args.out_prefix + accs_suffix = args.accs_suffix + accs_seen = dict() + cat_genomes_gz = os.path.join(os.getcwd(), out_prefix + "_" + genomes_dir_suffix) + cat_genomes_gz = re.sub("__", "_", str(cat_genomes_gz)) + + if accs_txt and os.path.exists(cat_genomes_gz) and os.path.getsize(cat_genomes_gz) > 0: + logging.error( + "A concatenated genome FASTA file,\n" + + f"{os.path.basename(cat_genomes_gz)} already exists in:\n" + + f"{os.getcwd()}\n" + + "Please remove or move it as we will not " + + "overwrite it." + ) + exit(1) + + if accs_txt and (not os.path.exists(accs_txt) or not os.path.getsize(accs_txt) > 0): + logging.error("File,\n" + f"{accs_txt}\ndoes not exist " + "or is empty!") + failed_sample_name = re.sub(accs_suffix, "", os.path.basename(accs_txt)) + with open( + os.path.join(os.getcwd(), "_".join([out_prefix, "FAILED.txt"])), "w" + ) as failed_sample_fh: + failed_sample_fh.write(f"{failed_sample_name}\n") + failed_sample_fh.close() + exit(0) + + if genomes_dir: + if not os.path.isdir(genomes_dir): + logging.error("UNIX path\n" + f"{genomes_dir}\n" + "does not exist!") + exit(1) + if len(glob.glob(os.path.join(genomes_dir, "*" + genomes_dir_suffix))) <= 0: + logging.error( + "Genomes directory" + + f"{genomes_dir}" + + "\ndoes not seem to have any\n" + + f"files ending with suffix: {genomes_dir_suffix}" + ) + exit(1) + + # ppp.pprint(mash_hits) + empty_lines = 0 + empty_lines_msg = "" + with open(cat_genomes_gz, "wb") as genomes_out_gz: + with open(accs_txt, "r") as accs_txt_fh: + for line in accs_txt_fh: + if line in ["\n", "\n\r"]: + empty_lines += 1 + continue + else: + line = line.strip() + + if line in accs_seen.keys(): + continue + else: + accs_seen[line] = 1 + + genome_file = os.path.join(genomes_dir, line + genomes_dir_suffix) + + if not os.path.exists(genome_file) or os.path.getsize(genome_file) <= 0: + logging.error( + f"Genome file {os.path.basename(genome_file)} does not\n" + + "exits or is empty!" + ) + exit(1) + else: + with open(genome_file, "rb") as genome_file_h: + genomes_out_gz.writelines(genome_file_h.readlines()) + genome_file_h.close() + accs_txt_fh.close() + genomes_out_gz.close() + + if empty_lines > 0: + empty_lines_msg = f"Skipped {empty_lines} empty line(s).\n" + + logging.info( + empty_lines_msg + + f"File {os.path.basename(cat_genomes_gz)}\n" + + f"written in:\n{os.getcwd()}\nDone! Bye!" + ) + exit(0) + + +if __name__ == "__main__": + main()