kkonganti@1: #!/usr/bin/env python3 kkonganti@1: kkonganti@1: # Kranti Konganti kkonganti@1: kkonganti@1: import os kkonganti@1: import argparse kkonganti@1: import inspect kkonganti@1: import logging kkonganti@1: import re kkonganti@1: import pprint kkonganti@1: import glob kkonganti@1: kkonganti@1: # Set logging. kkonganti@1: logging.basicConfig( kkonganti@1: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", kkonganti@1: level=logging.DEBUG, kkonganti@1: ) kkonganti@1: kkonganti@1: # Debug print. kkonganti@1: ppp = pprint.PrettyPrinter(width=50, indent=4) kkonganti@1: kkonganti@1: # Multiple inheritence for pretty printing of help text. kkonganti@1: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@1: pass kkonganti@1: kkonganti@1: kkonganti@1: def main() -> None: kkonganti@1: """ kkonganti@1: This script works only in the context of `bettercallsal` Nextflow workflow. kkonganti@1: It takes: kkonganti@1: 1. A text file containing accessions or FASTA IDs, one per line and kkonganti@1: then, kkonganti@1: 2. Searches for a genome FASTA file in gzipped format in specified kkonganti@1: search path, where the prefix of the filename is the accession or kkonganti@1: FASTA ID from 1. and then, kkonganti@1: creates a new concatenated gzipped genome FASTA file with all the genomes kkonganti@1: in the text file from 1. kkonganti@1: """ kkonganti@1: kkonganti@1: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@1: kkonganti@1: parser = argparse.ArgumentParser( kkonganti@1: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@1: ) kkonganti@1: kkonganti@1: required = parser.add_argument_group("required arguments") kkonganti@1: kkonganti@1: required.add_argument( kkonganti@1: "-txt", kkonganti@1: dest="accs_txt", kkonganti@1: default=False, kkonganti@1: required=True, kkonganti@1: help="Absolute UNIX path to .txt file containing accessions\n" + "FASTA IDs, one per line.", kkonganti@1: ) kkonganti@1: required.add_argument( kkonganti@1: "-gd", kkonganti@1: dest="genomes_dir", kkonganti@1: default=False, kkonganti@1: required=True, kkonganti@1: help="Absolute UNIX path to a directory containing\n" kkonganti@1: + "gzipped genome FASTA files.\n" kkonganti@1: + "Required if -m is on.", kkonganti@1: ) kkonganti@1: parser.add_argument( kkonganti@1: "-gds", kkonganti@1: dest="genomes_dir_suffix", kkonganti@1: default="_scaffolded_genomic.fna.gz", kkonganti@1: required=False, kkonganti@1: help="Genome FASTA file suffix to search for\nin the directory mentioned using\n-gd.", kkonganti@1: ) kkonganti@1: parser.add_argument( kkonganti@1: "-op", kkonganti@1: dest="out_prefix", kkonganti@1: default="CATTED_GENOMES", kkonganti@1: help="Set the output file prefix for .fna.gz and .txt\n" + "files.", kkonganti@1: ) kkonganti@1: parser.add_argument( kkonganti@1: "-txts", kkonganti@1: dest="accs_suffix", kkonganti@1: default="_template_hits.txt", kkonganti@1: required=False, kkonganti@1: help="The suffix of the file supplied with -txt option. It is assumed that the\n" kkonganti@1: + "sample name is present in the file supplied with -txt option and the suffix\n" kkonganti@1: + "will be stripped and stored in a file that logs samples which have no hits.", kkonganti@1: ) kkonganti@1: kkonganti@1: args = parser.parse_args() kkonganti@1: accs_txt = args.accs_txt kkonganti@1: genomes_dir = args.genomes_dir kkonganti@1: genomes_dir_suffix = args.genomes_dir_suffix kkonganti@1: out_prefix = args.out_prefix kkonganti@1: accs_suffix = args.accs_suffix kkonganti@1: accs_seen = dict() kkonganti@1: cat_genomes_gz = os.path.join(os.getcwd(), out_prefix + "_" + genomes_dir_suffix) kkonganti@1: cat_genomes_gz = re.sub("__", "_", str(cat_genomes_gz)) kkonganti@1: kkonganti@1: if accs_txt and os.path.exists(cat_genomes_gz) and os.path.getsize(cat_genomes_gz) > 0: kkonganti@1: logging.error( kkonganti@1: "A concatenated genome FASTA file,\n" kkonganti@1: + f"{os.path.basename(cat_genomes_gz)} already exists in:\n" kkonganti@1: + f"{os.getcwd()}\n" kkonganti@1: + "Please remove or move it as we will not " kkonganti@1: + "overwrite it." kkonganti@1: ) kkonganti@1: exit(1) kkonganti@1: kkonganti@1: if accs_txt and (not os.path.exists(accs_txt) or not os.path.getsize(accs_txt) > 0): kkonganti@1: logging.error("File,\n" + f"{accs_txt}\ndoes not exist " + "or is empty!") kkonganti@1: failed_sample_name = re.sub(accs_suffix, "", os.path.basename(accs_txt)) kkonganti@1: with open( kkonganti@1: os.path.join(os.getcwd(), "_".join([out_prefix, "FAILED.txt"])), "w" kkonganti@1: ) as failed_sample_fh: kkonganti@1: failed_sample_fh.write(f"{failed_sample_name}\n") kkonganti@1: failed_sample_fh.close() kkonganti@1: exit(0) kkonganti@1: kkonganti@1: if genomes_dir: kkonganti@1: if not os.path.isdir(genomes_dir): kkonganti@1: logging.error("UNIX path\n" + f"{genomes_dir}\n" + "does not exist!") kkonganti@1: exit(1) kkonganti@1: if len(glob.glob(os.path.join(genomes_dir, "*" + genomes_dir_suffix))) <= 0: kkonganti@1: logging.error( kkonganti@1: "Genomes directory" kkonganti@1: + f"{genomes_dir}" kkonganti@1: + "\ndoes not seem to have any\n" kkonganti@1: + f"files ending with suffix: {genomes_dir_suffix}" kkonganti@1: ) kkonganti@1: exit(1) kkonganti@1: kkonganti@1: # ppp.pprint(mash_hits) kkonganti@1: empty_lines = 0 kkonganti@1: empty_lines_msg = "" kkonganti@1: with open(cat_genomes_gz, "wb") as genomes_out_gz: kkonganti@1: with open(accs_txt, "r") as accs_txt_fh: kkonganti@1: for line in accs_txt_fh: kkonganti@1: if line in ["\n", "\n\r"]: kkonganti@1: empty_lines += 1 kkonganti@1: continue kkonganti@1: else: kkonganti@1: line = line.strip() kkonganti@1: kkonganti@1: if line in accs_seen.keys(): kkonganti@1: continue kkonganti@1: else: kkonganti@1: accs_seen[line] = 1 kkonganti@1: kkonganti@1: genome_file = os.path.join(genomes_dir, line + genomes_dir_suffix) kkonganti@1: kkonganti@1: if not os.path.exists(genome_file) or os.path.getsize(genome_file) <= 0: kkonganti@1: logging.error( kkonganti@1: f"Genome file {os.path.basename(genome_file)} does not\n" kkonganti@1: + "exits or is empty!" kkonganti@1: ) kkonganti@1: exit(1) kkonganti@1: else: kkonganti@1: with open(genome_file, "rb") as genome_file_h: kkonganti@1: genomes_out_gz.writelines(genome_file_h.readlines()) kkonganti@1: genome_file_h.close() kkonganti@1: accs_txt_fh.close() kkonganti@1: genomes_out_gz.close() kkonganti@1: kkonganti@1: if empty_lines > 0: kkonganti@1: empty_lines_msg = f"Skipped {empty_lines} empty line(s).\n" kkonganti@1: kkonganti@1: logging.info( kkonganti@1: empty_lines_msg kkonganti@1: + f"File {os.path.basename(cat_genomes_gz)}\n" kkonganti@1: + f"written in:\n{os.getcwd()}\nDone! Bye!" kkonganti@1: ) kkonganti@1: exit(0) kkonganti@1: kkonganti@1: kkonganti@1: if __name__ == "__main__": kkonganti@1: main()