annotate 0.5.0/bin/remove_dup_fasta_ids.py @ 0:97cd2f532efe

planemo upload
author kkonganti
date Mon, 31 Mar 2025 14:50:40 -0400
parents
children
rev   line source
kkonganti@0 1 #!/usr/bin/env python3
kkonganti@0 2
kkonganti@0 3 import argparse
kkonganti@0 4 import gzip
kkonganti@0 5 import inspect
kkonganti@0 6 import logging
kkonganti@0 7 import os
kkonganti@0 8 import pprint
kkonganti@0 9 import shutil
kkonganti@0 10 from typing import BinaryIO, TextIO, Union
kkonganti@0 11
kkonganti@0 12 from Bio import SeqIO
kkonganti@0 13 from Bio.Seq import Seq
kkonganti@0 14 from Bio.SeqRecord import SeqRecord
kkonganti@0 15 from genericpath import isdir
kkonganti@0 16
kkonganti@0 17
kkonganti@0 18 # Multiple inheritence for pretty printing of help text.
kkonganti@0 19 class MultiArgFormatClasses(
kkonganti@0 20 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
kkonganti@0 21 ):
kkonganti@0 22 pass
kkonganti@0 23
kkonganti@0 24
kkonganti@0 25 def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None:
kkonganti@0 26 """
kkonganti@0 27 Write sequence with no description to specified file.
kkonganti@0 28 """
kkonganti@0 29 SeqIO.write(
kkonganti@0 30 SeqRecord(Seq(seq), id=id, description=str()),
kkonganti@0 31 fh,
kkonganti@0 32 "fasta",
kkonganti@0 33 )
kkonganti@0 34
kkonganti@0 35
kkonganti@0 36 # Main
kkonganti@0 37 def main() -> None:
kkonganti@0 38 """
kkonganti@0 39 This script takes:
kkonganti@0 40 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and
kkonganti@0 41
kkonganti@0 42 and then generates a new FASTA file with duplicate FASTA IDs replaced
kkonganti@0 43 with a unique ID.
kkonganti@0 44 """
kkonganti@0 45
kkonganti@0 46 # Set logging.
kkonganti@0 47 logging.basicConfig(
kkonganti@0 48 format="\n"
kkonganti@0 49 + "=" * 55
kkonganti@0 50 + "\n%(asctime)s - %(levelname)s\n"
kkonganti@0 51 + "=" * 55
kkonganti@0 52 + "\n%(message)s\n\n",
kkonganti@0 53 level=logging.DEBUG,
kkonganti@0 54 )
kkonganti@0 55
kkonganti@0 56 # Debug print.
kkonganti@0 57 ppp = pprint.PrettyPrinter(width=55)
kkonganti@0 58 prog_name = os.path.basename(inspect.stack()[0].filename)
kkonganti@0 59
kkonganti@0 60 parser = argparse.ArgumentParser(
kkonganti@0 61 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
kkonganti@0 62 )
kkonganti@0 63
kkonganti@0 64 required = parser.add_argument_group("required arguments")
kkonganti@0 65
kkonganti@0 66 required.add_argument(
kkonganti@0 67 "-fna",
kkonganti@0 68 dest="fna",
kkonganti@0 69 default=False,
kkonganti@0 70 required=True,
kkonganti@0 71 help="Absolute UNIX path to .fna or .fna.gz file.",
kkonganti@0 72 )
kkonganti@0 73 parser.add_argument(
kkonganti@0 74 "-lin",
kkonganti@0 75 dest="lineages",
kkonganti@0 76 default=False,
kkonganti@0 77 required=False,
kkonganti@0 78 help="Absolute UNIX path to lineages.csv file for which the"
kkonganti@0 79 + "\nthe duplicate IDs will be made unique corresponding to"
kkonganti@0 80 + "\nthe FASTA IDs",
kkonganti@0 81 )
kkonganti@0 82 parser.add_argument(
kkonganti@0 83 "-outdir",
kkonganti@0 84 dest="out_folder",
kkonganti@0 85 default=os.getcwd(),
kkonganti@0 86 required=False,
kkonganti@0 87 help="By default, the output is written to this\nfolder.",
kkonganti@0 88 )
kkonganti@0 89 parser.add_argument(
kkonganti@0 90 "-f",
kkonganti@0 91 dest="force_write_out",
kkonganti@0 92 default=False,
kkonganti@0 93 action="store_true",
kkonganti@0 94 required=False,
kkonganti@0 95 help="Force overwrite the output file.",
kkonganti@0 96 )
kkonganti@0 97 parser.add_argument(
kkonganti@0 98 "--fna-suffix",
kkonganti@0 99 dest="fna_suffix",
kkonganti@0 100 default=".fna",
kkonganti@0 101 required=False,
kkonganti@0 102 help="Suffix of the output FASTA file.",
kkonganti@0 103 )
kkonganti@0 104
kkonganti@0 105 # Parse defaults
kkonganti@0 106 args = parser.parse_args()
kkonganti@0 107 fna = args.fna
kkonganti@0 108 lineages = args.lineages
kkonganti@0 109 outdir = args.out_folder
kkonganti@0 110 overwrite = args.force_write_out
kkonganti@0 111 fna_suffix = args.fna_suffix
kkonganti@0 112 new_fna = os.path.join(
kkonganti@0 113 outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix
kkonganti@0 114 )
kkonganti@0 115 lin_header = False
kkonganti@0 116 new_lin = False
kkonganti@0 117 seen_ids = dict()
kkonganti@0 118 seen_lineages = dict()
kkonganti@0 119
kkonganti@0 120 # Basic checks
kkonganti@0 121 if not overwrite and os.path.exists(new_fna):
kkonganti@0 122 logging.warning(
kkonganti@0 123 f"Output destination [{os.path.basename(new_fna)}] already exists!"
kkonganti@0 124 + "\nPlease use -f to delete and overwrite."
kkonganti@0 125 )
kkonganti@0 126 elif overwrite and os.path.exists(new_fna):
kkonganti@0 127 logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...")
kkonganti@0 128 if os.path.isdir(new_fna):
kkonganti@0 129 shutil.rmtree(new_fna)
kkonganti@0 130 else:
kkonganti@0 131 os.remove(new_fna)
kkonganti@0 132
kkonganti@0 133 # Prepare for writing
kkonganti@0 134 new_fna_fh = open(new_fna, "+at")
kkonganti@0 135
kkonganti@0 136 # If lineages file is mentioned, index it.
kkonganti@0 137 if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0:
kkonganti@0 138 new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv")
kkonganti@0 139 new_lin_fh = open(new_lin, "w")
kkonganti@0 140 with open(lineages, "r") as l_fh:
kkonganti@0 141 lin_header = l_fh.readline()
kkonganti@0 142 for line in l_fh:
kkonganti@0 143 cols = line.strip().split(",")
kkonganti@0 144 if len(cols) < 9:
kkonganti@0 145 logging.error(
kkonganti@0 146 f"The row in the lineages file {os.path.basename(lineages)}"
kkonganti@0 147 + f"\ndoes not have 9 required columns: {len(cols)}"
kkonganti@0 148 + f"\n\n{lin_header.strip()}\n{line.strip()}"
kkonganti@0 149 )
kkonganti@0 150 exit(1)
kkonganti@0 151 elif len(cols) > 9:
kkonganti@0 152 logging.info(
kkonganti@0 153 f"The row in the lineages file {os.path.basename(lineages)}"
kkonganti@0 154 + f"\nhas more than 9 required columns: {len(cols)}"
kkonganti@0 155 + f"\nRetaining only 9 columns of the following 10 columns."
kkonganti@0 156 + f"\n\n{lin_header.strip()}\n{line.strip()}"
kkonganti@0 157 )
kkonganti@0 158
kkonganti@0 159 if cols[0] not in seen_lineages.keys():
kkonganti@0 160 seen_lineages[cols[0]] = ",".join(cols[1:9])
kkonganti@0 161
kkonganti@0 162 new_lin_fh.write(lin_header)
kkonganti@0 163 l_fh.close()
kkonganti@0 164
kkonganti@0 165 # Read FASTA and create unique FASTA IDs.
kkonganti@0 166 logging.info(f"Creating new FASTA with unique IDs.")
kkonganti@0 167 try:
kkonganti@0 168 fna_fh = gzip.open(fna, "rt")
kkonganti@0 169 _ = fna_fh.readline()
kkonganti@0 170 except gzip.BadGzipFile:
kkonganti@0 171 logging.info(
kkonganti@0 172 f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format."
kkonganti@0 173 + "\nAttempting text parsing."
kkonganti@0 174 )
kkonganti@0 175 fna_fh = open(fna, "r")
kkonganti@0 176
kkonganti@0 177 for record in SeqIO.parse(fna_fh, format="fasta"):
kkonganti@0 178 seq_id = record.id
kkonganti@0 179
kkonganti@0 180 if record.id not in seen_ids.keys():
kkonganti@0 181 seen_ids[record.id] = 1
kkonganti@0 182 else:
kkonganti@0 183 seen_ids[record.id] += 1
kkonganti@0 184
kkonganti@0 185 if seen_ids[seq_id] > 1:
kkonganti@0 186 seq_id = str(record.id) + str(seen_ids[record.id])
kkonganti@0 187
kkonganti@0 188 if new_lin:
kkonganti@0 189 new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n")
kkonganti@0 190
kkonganti@0 191 write_fasta(record.seq, seq_id, new_fna_fh)
kkonganti@0 192
kkonganti@0 193 if new_lin:
kkonganti@0 194 new_lin_fh.close()
kkonganti@0 195
kkonganti@0 196 logging.info("Done!")
kkonganti@0 197
kkonganti@0 198
kkonganti@0 199 if __name__ == "__main__":
kkonganti@0 200
kkonganti@0 201 main()