view 0.5.0/bin/gen_per_species_fa_from_bold.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
line wrap: on
line source
#!/usr/bin/env python3

import argparse
import gzip
import inspect
import logging
import os
import pprint
import re
import shutil
from collections import defaultdict
from typing import BinaryIO, TextIO, Union

from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord


# Multiple inheritence for pretty printing of help text.
class MultiArgFormatClasses(
    argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
):
    pass


def get_lineages(csv: os.PathLike, cols: list) -> list:
    """
    Parse the output from `ncbitax2lin` tool and
    return a dict of lineages where the key is
    genusspeciesstrain.
    """
    lineages = dict()
    if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0):
        logging.error(
            f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!"
        )
        exit(1)

    logging.info(f"Indexing {os.path.basename(csv)}...")

    with open(csv, "r") as csv_fh:
        header_cols = csv_fh.readline().strip().split(",")
        user_req_cols = [
            tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols
        ]
        cols_not_found = [tcol for tcol in cols if tcol not in header_cols]
        raw_recs = 0

        if len(cols_not_found) > 0:
            logging.error(
                f"The following columns do not exist in the"
                + f"\nCSV file [ {os.path.basename(csv)} ]:\n"
                + "".join(cols_not_found)
            )
            exit(1)
        elif len(user_req_cols) > 9:
            logging.error(
                f"Only a total of 9 columns are needed!"
                + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain"
            )
            exit(1)

        for tax in csv_fh:
            raw_recs += 1
            lcols = tax.strip().split(",")

            if bool(lcols[user_req_cols[8]]):
                lineages[lcols[user_req_cols[8]]] = ",".join(
                    [lcols[l] for l in user_req_cols[1:]]
                )
            elif bool(lcols[user_req_cols[7]]):
                lineages[lcols[user_req_cols[7]]] = ",".join(
                    [lcols[l] for l in user_req_cols[1:8]] + [str()]
                )

    csv_fh.close()
    return lineages, raw_recs


def write_fasta(recs: list, basedir: os.PathLike, name: str, suffix: str) -> None:
    """
    Write sequence with no description to a specified file.
    """
    SeqIO.write(
        recs,
        os.path.join(basedir, name + suffix),
        "fasta",
    )


def check_and_get_cols(pat: re, cols: str, delim: str) -> list:
    """
    Check if header column matches the pattern and return
    columns.
    """
    if not pat.match(cols):
        logging.error(
            f"Supplied columns' names {cols} should only have words"
            f"\n(alphanumeric) separated by: {delim}."
        )
        exit(1)
    else:
        cols = re.sub("\n", "", cols).split(delim)

    return cols


def parse_tsv(fh: Union[TextIO, BinaryIO], tcols: list, delim: str) -> list:
    """
    Parse the TSV file and produce the required per
    species FASTA's.
    """
    records, sp2accs = (defaultdict(list), defaultdict(list))
    header = fh.readline().strip().split(delim)
    raw_recs = 0

    if not all(col in header for col in tcols):
        logging.error(
            "The following columns were not found in the"
            + f"\nheader row of file {os.path.basename(fh.name)}\n"
            + "\n".join([ele for ele in tcols if ele not in header])
        )

    id_i, genus_i, species_i, strain_i, seq_i = [
        i for i, ele in enumerate(header) if ele in tcols
    ]

    for record in fh:
        raw_recs += 1

        id = record.strip().split(delim)[id_i]
        genus = record.strip().split(delim)[genus_i]
        species = re.sub(r"[\/\\]+", "-", record.strip().split(delim)[species_i])
        strain = record.strip().split(delim)[strain_i]
        seq = re.sub(r"[^ATGC]+", "", record.strip().split(delim)[seq_i], re.IGNORECASE)

        if re.match(r"None|Null", species, re.IGNORECASE):
            continue

        # print(id)
        # print(genus)
        # print(species)
        # print(strain)
        # print(seq)

        records.setdefault(species, []).append(
            SeqRecord(Seq(seq), id=id, description=str())
        )
        sp2accs.setdefault(species, []).append(id)

    logging.info(f"Collected FASTA records for {len(records.keys())} species'.")
    fh.close()
    return records, sp2accs, raw_recs


# Main
def main() -> None:
    """
    This script takes:
        1. The TSV file from BOLD systems,
        2. Takes as input a .csv file generated by `ncbitax2lin`.

    and then generates a folder containing individual FASTA sequence files
    per species. This is only possible if the full taxonomy of the barcode
    sequence is present in the FASTA header.
    """

    # Set logging.
    logging.basicConfig(
        format="\n"
        + "=" * 55
        + "\n%(asctime)s - %(levelname)s\n"
        + "=" * 55
        + "\n%(message)s\r\r",
        level=logging.DEBUG,
    )

    # Debug print.
    ppp = pprint.PrettyPrinter(width=55)
    prog_name = os.path.basename(inspect.stack()[0].filename)

    parser = argparse.ArgumentParser(
        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
    )

    required = parser.add_argument_group("required arguments")

    required.add_argument(
        "-tsv",
        dest="tsv",
        default=False,
        required=True,
        help="Absolute UNIX path to the TSV file from BOLD systems"
        + "\nin uncompressed TXT format.",
    )
    required.add_argument(
        "-csv",
        dest="csv",
        default=False,
        required=True,
        help="Absolute UNIX path to .csv or .csv.gz file which is generated "
        + "\nby the `ncbitax2lin` tool.",
    )
    parser.add_argument(
        "-out",
        dest="out_folder",
        default=os.path.join(os.getcwd(), "species"),
        required=False,
        help="By default, the output is written to this\nfolder.",
    )
    parser.add_argument(
        "-f",
        dest="force_write_out",
        default=False,
        action="store_true",
        required=False,
        help="Force overwrite output directory contents.",
    )
    parser.add_argument(
        "-suffix",
        dest="fna_suffix",
        default=".fna",
        required=False,
        help="Suffix of the individual species FASTA files\nthat will be saved.",
    )
    parser.add_argument(
        "-ccols",
        dest="csv_cols",
        default="tax_id,superkingdom,phylum,class,order,family,genus,species,strain",
        required=False,
        help="Taxonomic lineage will be built using these columns from the output of"
        + "\n`ncbitax2lin`\ntool.",
    )
    parser.add_argument(
        "-ccols-sep",
        dest="csv_delim",
        default=",",
        required=False,
        help="The delimitor of the fields in the CSV file.",
    )
    parser.add_argument(
        "-tcols",
        dest="tsv_cols",
        default="processid\tgenus\tspecies\tsubspecies\tnucraw",
        required=False,
        help="For each species, the nucletide sequences will be\naggregated.",
    )
    parser.add_argument(
        "-tcols-sep",
        dest="tsv_delim",
        default="\t",
        required=False,
        help="The delimitor of the fields in the TSV file.",
    )

    # Parse defaults
    args = parser.parse_args()
    tsv = args.tsv
    csv = args.csv
    csep = args.csv_delim
    tsep = args.tsv_delim
    csv_cols = args.csv_cols
    tsv_cols = args.tsv_cols
    out = args.out_folder
    overwrite = args.force_write_out
    fna_suffix = args.fna_suffix
    ccols_pat = re.compile(f"^[\w\{csep}]+?\w$")
    tcols_pat = re.compile(f"^[\w\{tsep}]+?\w$")
    final_lineages = os.path.join(out, "lineages.csv")
    lineages_not_found = os.path.join(out, "lineages_not_found.csv")
    base_fasta_dir = os.path.join(out, "fasta")

    # Basic checks
    if not overwrite and os.path.exists(out):
        logging.warning(
            f"Output destination [{os.path.basename(out)}] already exists!"
            + "\nPlease use -f to delete and overwrite."
        )
    elif overwrite and os.path.exists(out):
        logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...")
        shutil.rmtree(out)

    # Validate user requested columns
    passed_ccols = check_and_get_cols(ccols_pat, csv_cols, csep)
    passed_tcols = check_and_get_cols(tcols_pat, tsv_cols, tsep)

    # Get  taxonomy from ncbitax2lin
    lineages, raw_recs = get_lineages(csv, passed_ccols)

    # Finally, read BOLD tsv if lineage exists.
    logging.info(f"Creating new squences per species...")

    if not os.path.exists(out):
        os.makedirs(out)

    try:
        gz_fh = gzip.open(tsv, "rt")
        records, sp2accs, traw_recs = parse_tsv(gz_fh, passed_tcols, tsep)
    except gzip.BadGzipFile:
        logging.info(f"Input TSV file {os.path.basename(tsv)} is not in\nGZIP format.")
        txt_fh = open(tsv, "r")
        records, sp2accs, traw_recs = parse_tsv(txt_fh, passed_tcols, tsep)

    passed_tax_check = 0
    failed_tax_check = 0
    fasta_recs_written = 0
    l_fh = open(final_lineages, "w")
    ln_fh = open(lineages_not_found, "w")
    l_fh.write(
        "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n"
    )
    ln_fh.write("fna_id,parsed_org\n")

    if not os.path.exists(base_fasta_dir):
        os.makedirs(base_fasta_dir)

    for genus_species in records.keys():
        fasta_recs_written += len(records[genus_species])
        write_fasta(
            records[genus_species],
            base_fasta_dir,
            "_".join(genus_species.split(" ")),
            fna_suffix,
        )
        org_words = genus_species.split(" ")

        for id in sp2accs[genus_species]:
            if genus_species in lineages.keys():
                this_line = ",".join([id, lineages[genus_species]]) + "\n"

                if len(org_words) > 2:
                    this_line = (
                        ",".join(
                            [id, lineages[genus_species].rstrip(","), genus_species]
                        )
                        + "\n"
                    )

                l_fh.write(this_line)
                passed_tax_check += 1
            else:
                this_line = (
                    ",".join(
                        [
                            id,
                            "",
                            "",
                            "",
                            "",
                            "",
                            org_words[0],
                            genus_species,
                            "",
                        ]
                    )
                    + "\n"
                )
                if len(org_words) > 2:
                    this_line = (
                        ",".join(
                            [
                                id,
                                "",
                                "",
                                "",
                                "",
                                "",
                                org_words[0],
                                org_words[0] + " " + org_words[1],
                                genus_species,
                            ]
                        )
                        + "\n"
                    )
                l_fh.write(this_line)
                ln_fh.write(",".join([id, genus_species]) + "\n")
                failed_tax_check += 1

    logging.info(
        f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}"
        + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}"
        + f"\nNo. of raw records in TSV [{os.path.basename(tsv)}]: {traw_recs}"
        + f"\nNo. of valid records in TSV [{os.path.basename(tsv)}]: {passed_tax_check + failed_tax_check}"
        + f"\nNo. of FASTA records for which new lineages were created: {passed_tax_check}"
        + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_tax_check}"
    )

    if (passed_tax_check + failed_tax_check) != fasta_recs_written:
        logging.error(
            f"The number of input FASTA records [{fasta_recs_written}]"
            + f"\nis not equal to number of lineages created [{passed_tax_check + failed_tax_check}]!"
        )
        exit(1)
    else:
        logging.info("Succesfully created lineages and FASTA records! Done!!")


if __name__ == "__main__":
    main()

# ~/apps/nowayout/bin/gen_per_species_fa_from_bold.py -tsv BOLD_Public.05-Feb-2024.tsv -csv ../tax.csv                                  ─╯

# =======================================================
# 2024-02-08 21:37:28,541 - INFO
# =======================================================
# Indexing tax.csv...

# =======================================================
# 2024-02-08 21:38:06,567 - INFO
# =======================================================
# Creating new squences per species...

# =======================================================
# 2024-02-08 21:38:06,572 - INFO
# =======================================================
# Input TSV file BOLD_Public.05-Feb-2024.tsv is not in
# GZIP format.

# =======================================================
# 2024-02-08 22:01:04,554 - INFO
# =======================================================
# Collected FASTA records for 497421 species'.

# =======================================================
# 2024-02-08 22:24:35,000 - INFO
# =======================================================
# No. of raw records present in `ncbitax2lin` [tax.csv]: 2550767
# No. of valid records collected from `ncbitax2lin` [tax.csv]: 2134980
# No. of raw records in TSV [BOLD_Public.05-Feb-2024.tsv]: 9735210
# No. of valid records in TSV [BOLD_Public.05-Feb-2024.tsv]: 4988323
# No. of FASTA records for which new lineages were created: 4069202
# No. of FASTA records for which only genus, species and/or strain information were created: 919121

# =======================================================
# 2024-02-08 22:24:35,001 - INFO
# =======================================================
# Succesfully created lineages and FASTA records! Done!!