kkonganti@11: #!/usr/bin/env python3 kkonganti@11: kkonganti@11: # Kranti Konganti kkonganti@11: kkonganti@11: import argparse kkonganti@11: import glob kkonganti@11: import gzip kkonganti@11: import inspect kkonganti@11: import logging kkonganti@11: import os kkonganti@11: import pprint kkonganti@11: import re kkonganti@11: kkonganti@11: # Set logging. kkonganti@11: logging.basicConfig( kkonganti@11: format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n\n", kkonganti@11: level=logging.DEBUG, kkonganti@11: ) kkonganti@11: kkonganti@11: # Debug print. kkonganti@11: ppp = pprint.PrettyPrinter(width=50, indent=4) kkonganti@11: kkonganti@11: # Multiple inheritence for pretty printing of help text. kkonganti@11: class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): kkonganti@11: pass kkonganti@11: kkonganti@11: kkonganti@11: def main() -> None: kkonganti@11: """ kkonganti@11: This script works only in the context of `bettercallsal` Nextflow workflow. kkonganti@11: It takes: kkonganti@11: 1. A text file containing accessions or FASTA IDs, one per line and kkonganti@11: then, kkonganti@11: 2. Searches for a genome FASTA file in gzipped format in specified kkonganti@11: search path, where the prefix of the filename is the accession or kkonganti@11: FASTA ID from 1. and then, kkonganti@11: creates a new concatenated gzipped genome FASTA file with all the genomes kkonganti@11: in the text file from 1. kkonganti@11: """ kkonganti@11: kkonganti@11: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@11: kkonganti@11: parser = argparse.ArgumentParser( kkonganti@11: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@11: ) kkonganti@11: kkonganti@11: required = parser.add_argument_group("required arguments") kkonganti@11: kkonganti@11: required.add_argument( kkonganti@11: "-txt", kkonganti@11: dest="accs_txt", kkonganti@11: default=False, kkonganti@11: required=True, kkonganti@11: help="Absolute UNIX path to .txt file containing accessions\n" + "FASTA IDs, one per line.", kkonganti@11: ) kkonganti@11: required.add_argument( kkonganti@11: "-gd", kkonganti@11: dest="genomes_dir", kkonganti@11: default=False, kkonganti@11: required=True, kkonganti@11: help="Absolute UNIX path to a directory containing\n" kkonganti@11: + "gzipped genome FASTA files.\n" kkonganti@11: + "Required if -m is on.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-gds", kkonganti@11: dest="genomes_dir_suffix", kkonganti@11: default="_scaffolded_genomic.fna.gz", kkonganti@11: required=False, kkonganti@11: help="Genome FASTA file suffix to search for\nin the directory mentioned using\n-gd.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-op", kkonganti@11: dest="out_prefix", kkonganti@11: default="CATTED_GENOMES", kkonganti@11: help="Set the output file prefix for .fna.gz and .txt\n" + "files.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-txts", kkonganti@11: dest="accs_suffix", kkonganti@11: default="_template_hits.txt", kkonganti@11: required=False, kkonganti@11: help="The suffix of the file supplied with -txt option. It is assumed that the\n" kkonganti@11: + "sample name is present in the file supplied with -txt option and the suffix\n" kkonganti@11: + "will be stripped and stored in a file that logs samples which have no hits.", kkonganti@11: ) kkonganti@11: parser.add_argument( kkonganti@11: "-frag_delim", kkonganti@11: dest="frag_delim", kkonganti@11: default="\t", kkonganti@11: required=False, kkonganti@11: help="The delimitor by which the fields are separated in *_frag.gz file.", kkonganti@11: ) kkonganti@11: kkonganti@11: args = parser.parse_args() kkonganti@11: accs_txt = args.accs_txt kkonganti@11: genomes_dir = args.genomes_dir kkonganti@11: genomes_dir_suffix = args.genomes_dir_suffix kkonganti@11: out_prefix = args.out_prefix kkonganti@11: accs_suffix = args.accs_suffix kkonganti@11: frag_delim = args.frag_delim kkonganti@11: accs_seen = dict() kkonganti@11: cat_genomes_gz = os.path.join(os.getcwd(), out_prefix + "_" + genomes_dir_suffix) kkonganti@11: cat_genomes_gz = re.sub("__", "_", str(cat_genomes_gz)) kkonganti@11: frags_gz = os.path.join(os.getcwd(), out_prefix + ".frag.gz") kkonganti@11: cat_reads_gz = os.path.join(os.getcwd(), out_prefix + "_aln_reads.fna.gz") kkonganti@11: cat_reads_gz = re.sub("__", "_", cat_reads_gz) kkonganti@11: kkonganti@11: if accs_txt and os.path.exists(cat_genomes_gz) and os.path.getsize(cat_genomes_gz) > 0: kkonganti@11: logging.error( kkonganti@11: "A concatenated genome FASTA file,\n" kkonganti@11: + f"{os.path.basename(cat_genomes_gz)} already exists in:\n" kkonganti@11: + f"{os.getcwd()}\n" kkonganti@11: + "Please remove or move it as we will not " kkonganti@11: + "overwrite it." kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: if accs_txt and (not os.path.exists(accs_txt) or not os.path.getsize(accs_txt) > 0): kkonganti@11: logging.error("File,\n" + f"{accs_txt}\ndoes not exist " + "or is empty!") kkonganti@11: failed_sample_name = re.sub(accs_suffix, "", os.path.basename(accs_txt)) kkonganti@11: with open( kkonganti@11: os.path.join(os.getcwd(), "_".join([out_prefix, "FAILED.txt"])), "w" kkonganti@11: ) as failed_sample_fh: kkonganti@11: failed_sample_fh.write(f"{failed_sample_name}\n") kkonganti@11: failed_sample_fh.close() kkonganti@11: exit(0) kkonganti@11: kkonganti@11: if genomes_dir: kkonganti@11: if not os.path.isdir(genomes_dir): kkonganti@11: logging.error("UNIX path\n" + f"{genomes_dir}\n" + "does not exist!") kkonganti@11: exit(1) kkonganti@11: if len(glob.glob(os.path.join(genomes_dir, "*" + genomes_dir_suffix))) <= 0: kkonganti@11: logging.error( kkonganti@11: "Genomes directory" kkonganti@11: + f"{genomes_dir}" kkonganti@11: + "\ndoes not seem to have any\n" kkonganti@11: + f"files ending with suffix: {genomes_dir_suffix}" kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: kkonganti@11: # ppp.pprint(mash_hits) kkonganti@11: empty_lines = 0 kkonganti@11: empty_lines_msg = "" kkonganti@11: with open(cat_genomes_gz, "wb") as genomes_out_gz: kkonganti@11: with open(accs_txt, "r") as accs_txt_fh: kkonganti@11: for line in accs_txt_fh: kkonganti@11: if line in ["\n", "\n\r"]: kkonganti@11: empty_lines += 1 kkonganti@11: continue kkonganti@11: else: kkonganti@11: line = line.strip() kkonganti@11: kkonganti@11: if line in accs_seen.keys(): kkonganti@11: continue kkonganti@11: else: kkonganti@11: accs_seen[line] = 1 kkonganti@11: kkonganti@11: genome_file = os.path.join(genomes_dir, line + genomes_dir_suffix) kkonganti@11: kkonganti@11: if not os.path.exists(genome_file) or os.path.getsize(genome_file) <= 0: kkonganti@11: logging.error( kkonganti@11: f"Genome file {os.path.basename(genome_file)} does not\n" kkonganti@11: + "exits or is empty!" kkonganti@11: ) kkonganti@11: exit(1) kkonganti@11: else: kkonganti@11: with open(genome_file, "rb") as genome_file_h: kkonganti@11: genomes_out_gz.writelines(genome_file_h.readlines()) kkonganti@11: genome_file_h.close() kkonganti@11: accs_txt_fh.close() kkonganti@11: genomes_out_gz.close() kkonganti@11: kkonganti@11: if len(accs_seen.keys()) > 0 and os.path.exists(frags_gz) and os.path.getsize(frags_gz) > 0: kkonganti@11: with gzip.open(cat_reads_gz, "wt", encoding="utf-8") as cat_reads_gz_fh: kkonganti@11: with gzip.open(frags_gz, "rb") as fragz_gz_fh: kkonganti@11: for frag_line in fragz_gz_fh.readlines(): kkonganti@11: frag_lines = frag_line.decode("utf-8").strip().split(frag_delim) kkonganti@11: # Per KMA specification, 6=template, 7=query, 1=read kkonganti@11: cat_reads_gz_fh.write(f">{frag_lines[6]}\n{frag_lines[0]}\n") kkonganti@11: fragz_gz_fh.close() kkonganti@11: cat_reads_gz_fh.close() kkonganti@11: kkonganti@11: if empty_lines > 0: kkonganti@11: empty_lines_msg = f"Skipped {empty_lines} empty line(s).\n" kkonganti@11: kkonganti@11: logging.info( kkonganti@11: empty_lines_msg kkonganti@11: + f"File {os.path.basename(cat_genomes_gz)}\n" kkonganti@11: + f"written in:\n{os.getcwd()}\nDone! Bye!" kkonganti@11: ) kkonganti@11: exit(0) kkonganti@11: kkonganti@11: kkonganti@11: if __name__ == "__main__": kkonganti@11: main()