view 0.5.0/bin/gen_otf_genome.py @ 10:74ac6f6a9526 tip

planemo upload
author kkonganti
date Tue, 01 Apr 2025 11:08:01 -0400
parents 97cd2f532efe
children
line wrap: on
line source
#!/usr/bin/env python3

# Kranti Konganti

import argparse
import glob
import gzip
import inspect
import logging
import os
import pprint
import re

# Set logging.
logging.basicConfig(
    format="\n"
    + "=" * 55
    + "\n%(asctime)s - %(levelname)s\n"
    + "=" * 55
    + "\n%(message)s\n\n",
    level=logging.DEBUG,
)

# Debug print.
ppp = pprint.PrettyPrinter(width=50, indent=4)


# Multiple inheritence for pretty printing of help text.
class MultiArgFormatClasses(
    argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
):
    pass


def main() -> None:
    """
    This script works only in the context of a Nextflow workflow.
    It takes:
        1. A text file containing accessions or FASTA IDs, one per line and
            then,
        2. Optionally, searches for a genome FASTA file in gzipped format in specified
            search path, where the prefix of the filename is the accession or
            FASTA ID from 1. and then, creates a new concatenated gzipped genome FASTA
            file with all the genomes in the text file from 1.
        3. Creates a new FASTQ file with reads aligned to the accessions in the text
            file from 1.
    """

    prog_name = os.path.basename(inspect.stack()[0].filename)

    parser = argparse.ArgumentParser(
        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
    )

    required = parser.add_argument_group("required arguments")

    required.add_argument(
        "-txt",
        dest="accs_txt",
        default=False,
        required=True,
        help="Absolute UNIX path to .txt file containing accessions\n"
        + "FASTA IDs, one per line.",
    )
    required.add_argument(
        "-op",
        dest="out_prefix",
        default="CATTED_GENOMES",
        required=True,
        help="Set the output file prefix for .fna.gz and .txt\n" + "files.",
    )
    parser.add_argument(
        "-gd",
        dest="genomes_dir",
        default=False,
        required=False,
        help="Absolute UNIX path to a directory containing\n"
        + "gzipped genome FASTA files or a file.\n",
    )
    parser.add_argument(
        "-gds",
        dest="genomes_dir_suffix",
        default="_scaffolded_genomic.fna.gz",
        required=False,
        help="Genome FASTA file suffix to search for\nin the directory mentioned using\n-gd.",
    )
    parser.add_argument(
        "-query",
        dest="id_is_query",
        default=False,
        action="store_true",
        required=False,
        help="In the produced FASTQ file, should the FASTA ID should be of KMA query ID\n"
        + "or template ID.",
    )
    parser.add_argument(
        "-txts",
        dest="accs_suffix",
        default="_template_hits.txt",
        required=False,
        help="The suffix of the file supplied with -txt option. It is assumed that the\n"
        + "sample name is present in the file supplied with -txt option and the suffix\n"
        + "will be stripped and stored in a file that logs samples which have no hits.",
    )
    parser.add_argument(
        "-frag_delim",
        dest="frag_delim",
        default="\t",
        required=False,
        help="The delimitor by which the fields are separated in *_frag.gz file.",
    )

    args = parser.parse_args()
    accs_txt = args.accs_txt
    genomes_dir = args.genomes_dir
    genomes_dir_suffix = args.genomes_dir_suffix
    id_is_query = args.id_is_query
    out_prefix = args.out_prefix
    accs_suffix = args.accs_suffix
    frag_delim = args.frag_delim
    accs_seen = dict()
    cat_genomes_gz = os.path.join(os.getcwd(), out_prefix + "_" + genomes_dir_suffix)
    cat_genomes_gz = re.sub("__", "_", str(cat_genomes_gz))
    frags_gz = os.path.join(os.getcwd(), out_prefix + ".frag.gz")
    cat_reads_gz = os.path.join(os.getcwd(), out_prefix + "_aln_reads.fna.gz")
    cat_reads_gz = re.sub("__", "_", cat_reads_gz)

    if (
        accs_txt
        and os.path.exists(cat_genomes_gz)
        and os.path.getsize(cat_genomes_gz) > 0
    ):
        logging.error(
            "A concatenated genome FASTA file,\n"
            + f"{os.path.basename(cat_genomes_gz)} already exists in:\n"
            + f"{os.getcwd()}\n"
            + "Please remove or move it as we will not "
            + "overwrite it."
        )
        exit(1)

    if accs_txt and (not os.path.exists(accs_txt) or not os.path.getsize(accs_txt) > 0):
        logging.error("File,\n" + f"{accs_txt}\ndoes not exist " + "or is empty!")
        failed_sample_name = re.sub(accs_suffix, "", os.path.basename(accs_txt))
        with open(
            os.path.join(os.getcwd(), "_".join([out_prefix, "FAILED.txt"])), "w"
        ) as failed_sample_fh:
            failed_sample_fh.write(f"{failed_sample_name}\n")
        failed_sample_fh.close()
        exit(0)

    # ppp.pprint(mash_hits)
    empty_lines = 0
    empty_lines_msg = ""

    with open(accs_txt, "r") as accs_txt_fh:
        for line in accs_txt_fh:
            if line in ["\n", "\n\r"]:
                empty_lines += 1
                continue
            else:
                line = line.strip()

            if line in accs_seen.keys():
                continue
            else:
                accs_seen[line] = 1
    accs_txt_fh.close()

    if genomes_dir:
        if not os.path.isdir(genomes_dir):
            logging.error("UNIX path\n" + f"{genomes_dir}\n" + "does not exist!")
            exit(1)
        if len(glob.glob(os.path.join(genomes_dir, "*" + genomes_dir_suffix))) <= 0:
            logging.error(
                "Genomes directory"
                + f"{genomes_dir}"
                + "\ndoes not seem to have any\n"
                + f"files ending with suffix: {genomes_dir_suffix}"
            )
            exit(1)

        with open(cat_genomes_gz, "wb") as genomes_out_gz:
            for line in accs_seen.keys():
                genome_file = os.path.join(genomes_dir, line + genomes_dir_suffix)

                if not os.path.exists(genome_file) or os.path.getsize(genome_file) <= 0:
                    logging.error(
                        f"Genome file {os.path.basename(genome_file)} does not\n"
                        + "exits or is empty!"
                    )
                    exit(1)
                else:
                    with open(genome_file, "rb") as genome_file_h:
                        genomes_out_gz.writelines(genome_file_h.readlines())
                    genome_file_h.close()
        genomes_out_gz.close()

    if (
        len(accs_seen.keys()) > 0
        and os.path.exists(frags_gz)
        and os.path.getsize(frags_gz) > 0
    ):
        with gzip.open(
            cat_reads_gz, "wt", encoding="utf-8", compresslevel=6
        ) as cat_reads_gz_fh:
            with gzip.open(frags_gz, "rb", compresslevel=6) as fragz_gz_fh:
                fasta_id = 7 if id_is_query else 6
                for frag_line in fragz_gz_fh:
                    frag_lines = frag_line.decode("utf-8").strip().split(frag_delim)
                    # Per KMA specification, 6=template, 7=query, 1=read
                    cat_reads_gz_fh.write(f">{frag_lines[fasta_id]}\n{frag_lines[0]}\n")
            fragz_gz_fh.close()
        cat_reads_gz_fh.close()

        if empty_lines > 0:
            empty_lines_msg = f"Skipped {empty_lines} empty line(s).\n"

        logging.info(
            empty_lines_msg
            + f"File {os.path.basename(cat_genomes_gz)}\n"
            + f"written in:\n{os.getcwd()}\nDone! Bye!"
        )


if __name__ == "__main__":
    main()