diff 0.5.0/bin/remove_dup_fasta_ids.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/0.5.0/bin/remove_dup_fasta_ids.py	Mon Mar 31 14:50:40 2025 -0400
@@ -0,0 +1,201 @@
+#!/usr/bin/env python3
+
+import argparse
+import gzip
+import inspect
+import logging
+import os
+import pprint
+import shutil
+from typing import BinaryIO, TextIO, Union
+
+from Bio import SeqIO
+from Bio.Seq import Seq
+from Bio.SeqRecord import SeqRecord
+from genericpath import isdir
+
+
+# Multiple inheritence for pretty printing of help text.
+class MultiArgFormatClasses(
+    argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
+):
+    pass
+
+
+def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None:
+    """
+    Write sequence with no description to specified file.
+    """
+    SeqIO.write(
+        SeqRecord(Seq(seq), id=id, description=str()),
+        fh,
+        "fasta",
+    )
+
+
+# Main
+def main() -> None:
+    """
+    This script takes:
+        1. A FASTA file in gzip or non-gzip (ASCII TXT) format and
+
+    and then generates a new FASTA file with duplicate FASTA IDs replaced
+    with a unique ID.
+    """
+
+    # Set logging.
+    logging.basicConfig(
+        format="\n"
+        + "=" * 55
+        + "\n%(asctime)s - %(levelname)s\n"
+        + "=" * 55
+        + "\n%(message)s\n\n",
+        level=logging.DEBUG,
+    )
+
+    # Debug print.
+    ppp = pprint.PrettyPrinter(width=55)
+    prog_name = os.path.basename(inspect.stack()[0].filename)
+
+    parser = argparse.ArgumentParser(
+        prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
+    )
+
+    required = parser.add_argument_group("required arguments")
+
+    required.add_argument(
+        "-fna",
+        dest="fna",
+        default=False,
+        required=True,
+        help="Absolute UNIX path to .fna or .fna.gz file.",
+    )
+    parser.add_argument(
+        "-lin",
+        dest="lineages",
+        default=False,
+        required=False,
+        help="Absolute UNIX path to lineages.csv file for which the"
+        + "\nthe duplicate IDs will be made unique corresponding to"
+        + "\nthe FASTA IDs",
+    )
+    parser.add_argument(
+        "-outdir",
+        dest="out_folder",
+        default=os.getcwd(),
+        required=False,
+        help="By default, the output is written to this\nfolder.",
+    )
+    parser.add_argument(
+        "-f",
+        dest="force_write_out",
+        default=False,
+        action="store_true",
+        required=False,
+        help="Force overwrite the output file.",
+    )
+    parser.add_argument(
+        "--fna-suffix",
+        dest="fna_suffix",
+        default=".fna",
+        required=False,
+        help="Suffix of the output FASTA file.",
+    )
+
+    # Parse defaults
+    args = parser.parse_args()
+    fna = args.fna
+    lineages = args.lineages
+    outdir = args.out_folder
+    overwrite = args.force_write_out
+    fna_suffix = args.fna_suffix
+    new_fna = os.path.join(
+        outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix
+    )
+    lin_header = False
+    new_lin = False
+    seen_ids = dict()
+    seen_lineages = dict()
+
+    # Basic checks
+    if not overwrite and os.path.exists(new_fna):
+        logging.warning(
+            f"Output destination [{os.path.basename(new_fna)}] already exists!"
+            + "\nPlease use -f to delete and overwrite."
+        )
+    elif overwrite and os.path.exists(new_fna):
+        logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...")
+        if os.path.isdir(new_fna):
+            shutil.rmtree(new_fna)
+        else:
+            os.remove(new_fna)
+
+    # Prepare for writing
+    new_fna_fh = open(new_fna, "+at")
+
+    # If lineages file is mentioned, index it.
+    if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0:
+        new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv")
+        new_lin_fh = open(new_lin, "w")
+        with open(lineages, "r") as l_fh:
+            lin_header = l_fh.readline()
+            for line in l_fh:
+                cols = line.strip().split(",")
+                if len(cols) < 9:
+                    logging.error(
+                        f"The row in the lineages file {os.path.basename(lineages)}"
+                        + f"\ndoes not have 9 required columns: {len(cols)}"
+                        + f"\n\n{lin_header.strip()}\n{line.strip()}"
+                    )
+                    exit(1)
+                elif len(cols) > 9:
+                    logging.info(
+                        f"The row in the lineages file {os.path.basename(lineages)}"
+                        + f"\nhas more than 9 required columns: {len(cols)}"
+                        + f"\nRetaining only 9 columns of the following 10 columns."
+                        + f"\n\n{lin_header.strip()}\n{line.strip()}"
+                    )
+
+                if cols[0] not in seen_lineages.keys():
+                    seen_lineages[cols[0]] = ",".join(cols[1:9])
+
+        new_lin_fh.write(lin_header)
+        l_fh.close()
+
+    # Read FASTA and create unique FASTA IDs.
+    logging.info(f"Creating new FASTA with unique IDs.")
+    try:
+        fna_fh = gzip.open(fna, "rt")
+        _ = fna_fh.readline()
+    except gzip.BadGzipFile:
+        logging.info(
+            f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format."
+            + "\nAttempting text parsing."
+        )
+        fna_fh = open(fna, "r")
+
+    for record in SeqIO.parse(fna_fh, format="fasta"):
+        seq_id = record.id
+
+        if record.id not in seen_ids.keys():
+            seen_ids[record.id] = 1
+        else:
+            seen_ids[record.id] += 1
+
+        if seen_ids[seq_id] > 1:
+            seq_id = str(record.id) + str(seen_ids[record.id])
+
+        if new_lin:
+            new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n")
+
+        write_fasta(record.seq, seq_id, new_fna_fh)
+
+    if new_lin:
+        new_lin_fh.close()
+
+    logging.info("Done!")
+
+
+if __name__ == "__main__":
+
+    main()