diff 0.5.0/bin/gen_per_species_fa_from_lin.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/0.5.0/bin/gen_per_species_fa_from_lin.py	Mon Mar 31 14:50:40 2025 -0400
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+
+import argparse
+import gzip
+import inspect
+import logging
+import os
+import pprint
+import re
+import shutil
+from collections import defaultdict
+from typing import BinaryIO, TextIO, Union
+
+from Bio import SeqIO
+from Bio.Seq import Seq
+from Bio.SeqRecord import SeqRecord
+
+
+# Multiple inheritence for pretty printing of help text.
+class MultiArgFormatClasses(
+    argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
+):
+    pass
+
+
+def get_lineages(csv: os.PathLike) -> defaultdict:
+    """
+    Parse the lineages.csv file and store a list of
+    accessions.
+    """
+    lineages = dict()
+    if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0):
+        logging.error(
+            f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!"
+        )
+        exit(1)
+
+    logging.info(f"Indexing {os.path.basename(csv)}...")
+
+    with open(csv, "r") as csv_fh:
+        _ = csv_fh.readline().strip().split(",")
+        for line in csv_fh:
+            cols = line.strip().split(",")
+
+            if len(cols) < 9:
+                logging.error(
+                    f"The CSV file {os.path.basename(csv)} should have a mandatory 9 columns."
+                    + "\n\nEx: identifiers,superkingdom,phylum,class,order,family,genus,species,strain"
+                    + "\nAB211151.1,Eukaryota,Arthropoda,Malacostraca,Decapoda,Majidae,Chionoecetes,Chionoecetes opilio,"
+                    + f"\n\nGot:\n{line}"
+                )
+                exit(1)
+
+            lineages[cols[0]] = re.sub(r"\W+", "-", "_".join(cols[7].split(" ")))
+
+    csv_fh.close()
+    return lineages
+
+
+def write_fasta(recs: list, basedir: os.PathLike, name: str, suffix: str) -> None:
+    """
+    Write sequence with no description to a specified file.
+    """
+    SeqIO.write(
+        recs,
+        os.path.join(basedir, name + suffix),
+        "fasta",
+    )
+
+
+def parse_fasta(fh: Union[TextIO, BinaryIO], sp2accs: dict) -> list:
+    """
+    Parse the sequences and create per species FASTA record.
+    """
+    records = defaultdict()
+    logging.info("")
+
+    for record in SeqIO.parse(fh, "fasta"):
+
+        id = record.id
+        seq = record.seq
+
+        if id in sp2accs.keys():
+            records.setdefault(sp2accs[id], []).append(
+                SeqRecord(Seq(seq), id=id, description=str())
+            )
+        else:
+            print(f"Lineage row does not exist for accession: {id}")
+
+    logging.info(f"Collected FASTA records for {len(records.keys())} species'.")
+    fh.close()
+    return records
+
+
+# Main
+def main() -> None:
+    """
+    This script takes:
+        1. The FASTA file and,
+        2. Takes the corresponding lineages.csv file and,
+
+    then generates a folder containing individual FASTA sequence files
+    per species.
+    """
+
+    # Set logging.
+    logging.basicConfig(
+        format="\n"
+        + "=" * 55
+        + "\n%(asctime)s - %(levelname)s\n"
+        + "=" * 55
+        + "\n%(message)s\r\r",
+        level=logging.DEBUG,
+    )
+
+    # Debug print.
+    ppp = pprint.PrettyPrinter(width=55)
+    prog_name = os.path.basename(inspect.stack()[0].filename)
+
+    parser = argparse.ArgumentParser(
+        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
+    )
+
+    required = parser.add_argument_group("required arguments")
+
+    required.add_argument(
+        "-fa",
+        dest="fna",
+        default=False,
+        required=True,
+        help="Absolute UNIX path to the FASTA file that corresponds"
+        + "\nto the lineages.csv file.",
+    )
+    required.add_argument(
+        "-csv",
+        dest="csv",
+        default=False,
+        required=True,
+        help="Absolute UNIX path to lineages.csv which has a guaranteed 9 "
+        + "\ncolumns with the first being an accession.",
+    )
+    parser.add_argument(
+        "-out",
+        dest="out_folder",
+        default=os.path.join(os.getcwd(), "species"),
+        required=False,
+        help="By default, the output is written to this\nfolder.",
+    )
+    parser.add_argument(
+        "-f",
+        dest="force_write_out",
+        default=False,
+        action="store_true",
+        required=False,
+        help="Force overwrite output directory contents.",
+    )
+    parser.add_argument(
+        "-suffix",
+        dest="fna_suffix",
+        default=".fna",
+        required=False,
+        help="Suffix of the individual species FASTA files\nthat will be saved.",
+    )
+
+    # Parse defaults
+    args = parser.parse_args()
+    csv = args.csv
+    fna = args.fna
+    out = args.out_folder
+    overwrite = args.force_write_out
+    fna_suffix = args.fna_suffix
+
+    # Basic checks
+    if not overwrite and os.path.exists(out):
+        logging.warning(
+            f"Output destination [{os.path.basename(out)}] already exists!"
+            + "\nPlease use -f to delete and overwrite."
+        )
+    elif overwrite and os.path.exists(out):
+        logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...")
+        shutil.rmtree(out)
+
+    # Get  taxonomy from ncbitax2lin
+    lineages = get_lineages(csv)
+
+    logging.info(f"Creating new squences per species...")
+
+    if not os.path.exists(out):
+        os.makedirs(out)
+
+    try:
+        gz_fh = gzip.open(fna, "rt")
+        fa_recs = parse_fasta(gz_fh, lineages)
+    except gzip.BadGzipFile:
+        logging.info(
+            f"Input FASTA file {os.path.basename(csv)} is not in\nGZIP format."
+        )
+        txt_fh = open(fna, "r")
+        fa_recs = parse_fasta(txt_fh, lineages)
+    finally:
+        logging.info("Assigned FASTA records per species...")
+
+    logging.info("Writing FASTA records per species...")
+
+    for sp in fa_recs.keys():
+        write_fasta(fa_recs[sp], out, sp, fna_suffix)
+
+
+if __name__ == "__main__":
+    main()
+
+# ~/apps/nowayout/bin/gen_per_species_fa_from_bold.py -tsv BOLD_Public.05-Feb-2024.tsv -csv ../tax.csv                                  ─╯
+
+# =======================================================
+# 2024-02-08 21:37:28,541 - INFO
+# =======================================================
+# Indexing tax.csv...
+
+# =======================================================
+# 2024-02-08 21:38:06,567 - INFO
+# =======================================================
+# Creating new squences per species...
+
+# =======================================================
+# 2024-02-08 21:38:06,572 - INFO
+# =======================================================
+# Input TSV file BOLD_Public.05-Feb-2024.tsv is not in
+# GZIP format.
+
+# =======================================================
+# 2024-02-08 22:01:04,554 - INFO
+# =======================================================
+# Collected FASTA records for 497421 species'.
+
+# =======================================================
+# 2024-02-08 22:24:35,000 - INFO
+# =======================================================
+# No. of raw records present in `ncbitax2lin` [tax.csv]: 2550767
+# No. of valid records collected from `ncbitax2lin` [tax.csv]: 2134980
+# No. of raw records in TSV [BOLD_Public.05-Feb-2024.tsv]: 9735210
+# No. of valid records in TSV [BOLD_Public.05-Feb-2024.tsv]: 4988323
+# No. of FASTA records for which new lineages were created: 4069202
+# No. of FASTA records for which only genus, species and/or strain information were created: 919121
+
+# =======================================================
+# 2024-02-08 22:24:35,001 - INFO
+# =======================================================
+# Succesfully created lineages and FASTA records! Done!!