Mercurial > repos > galaxytrakr > hfp_nowayout_awsbatch
comparison 0.5.0/bin/remove_dup_fasta_ids.py @ 0:3c767f9cfd88 draft default tip
planemo upload
| author | galaxytrakr |
|---|---|
| date | Fri, 29 May 2026 13:37:56 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:3c767f9cfd88 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 import argparse | |
| 4 import gzip | |
| 5 import inspect | |
| 6 import logging | |
| 7 import os | |
| 8 import pprint | |
| 9 import shutil | |
| 10 from typing import BinaryIO, TextIO, Union | |
| 11 | |
| 12 from Bio import SeqIO | |
| 13 from Bio.Seq import Seq | |
| 14 from Bio.SeqRecord import SeqRecord | |
| 15 from genericpath import isdir | |
| 16 | |
| 17 | |
| 18 # Multiple inheritence for pretty printing of help text. | |
| 19 class MultiArgFormatClasses( | |
| 20 argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter | |
| 21 ): | |
| 22 pass | |
| 23 | |
| 24 | |
| 25 def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None: | |
| 26 """ | |
| 27 Write sequence with no description to specified file. | |
| 28 """ | |
| 29 SeqIO.write( | |
| 30 SeqRecord(Seq(seq), id=id, description=str()), | |
| 31 fh, | |
| 32 "fasta", | |
| 33 ) | |
| 34 | |
| 35 | |
| 36 # Main | |
| 37 def main() -> None: | |
| 38 """ | |
| 39 This script takes: | |
| 40 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and | |
| 41 | |
| 42 and then generates a new FASTA file with duplicate FASTA IDs replaced | |
| 43 with a unique ID. | |
| 44 """ | |
| 45 | |
| 46 # Set logging. | |
| 47 logging.basicConfig( | |
| 48 format="\n" | |
| 49 + "=" * 55 | |
| 50 + "\n%(asctime)s - %(levelname)s\n" | |
| 51 + "=" * 55 | |
| 52 + "\n%(message)s\n\n", | |
| 53 level=logging.DEBUG, | |
| 54 ) | |
| 55 | |
| 56 # Debug print. | |
| 57 ppp = pprint.PrettyPrinter(width=55) | |
| 58 prog_name = os.path.basename(inspect.stack()[0].filename) | |
| 59 | |
| 60 parser = argparse.ArgumentParser( | |
| 61 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
| 62 ) | |
| 63 | |
| 64 required = parser.add_argument_group("required arguments") | |
| 65 | |
| 66 required.add_argument( | |
| 67 "-fna", | |
| 68 dest="fna", | |
| 69 default=False, | |
| 70 required=True, | |
| 71 help="Absolute UNIX path to .fna or .fna.gz file.", | |
| 72 ) | |
| 73 parser.add_argument( | |
| 74 "-lin", | |
| 75 dest="lineages", | |
| 76 default=False, | |
| 77 required=False, | |
| 78 help="Absolute UNIX path to lineages.csv file for which the" | |
| 79 + "\nthe duplicate IDs will be made unique corresponding to" | |
| 80 + "\nthe FASTA IDs", | |
| 81 ) | |
| 82 parser.add_argument( | |
| 83 "-outdir", | |
| 84 dest="out_folder", | |
| 85 default=os.getcwd(), | |
| 86 required=False, | |
| 87 help="By default, the output is written to this\nfolder.", | |
| 88 ) | |
| 89 parser.add_argument( | |
| 90 "-f", | |
| 91 dest="force_write_out", | |
| 92 default=False, | |
| 93 action="store_true", | |
| 94 required=False, | |
| 95 help="Force overwrite the output file.", | |
| 96 ) | |
| 97 parser.add_argument( | |
| 98 "--fna-suffix", | |
| 99 dest="fna_suffix", | |
| 100 default=".fna", | |
| 101 required=False, | |
| 102 help="Suffix of the output FASTA file.", | |
| 103 ) | |
| 104 | |
| 105 # Parse defaults | |
| 106 args = parser.parse_args() | |
| 107 fna = args.fna | |
| 108 lineages = args.lineages | |
| 109 outdir = args.out_folder | |
| 110 overwrite = args.force_write_out | |
| 111 fna_suffix = args.fna_suffix | |
| 112 new_fna = os.path.join( | |
| 113 outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix | |
| 114 ) | |
| 115 lin_header = False | |
| 116 new_lin = False | |
| 117 seen_ids = dict() | |
| 118 seen_lineages = dict() | |
| 119 | |
| 120 # Basic checks | |
| 121 if not overwrite and os.path.exists(new_fna): | |
| 122 logging.warning( | |
| 123 f"Output destination [{os.path.basename(new_fna)}] already exists!" | |
| 124 + "\nPlease use -f to delete and overwrite." | |
| 125 ) | |
| 126 elif overwrite and os.path.exists(new_fna): | |
| 127 logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...") | |
| 128 if os.path.isdir(new_fna): | |
| 129 shutil.rmtree(new_fna) | |
| 130 else: | |
| 131 os.remove(new_fna) | |
| 132 | |
| 133 # Prepare for writing | |
| 134 new_fna_fh = open(new_fna, "+at") | |
| 135 | |
| 136 # If lineages file is mentioned, index it. | |
| 137 if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0: | |
| 138 new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv") | |
| 139 new_lin_fh = open(new_lin, "w") | |
| 140 with open(lineages, "r") as l_fh: | |
| 141 lin_header = l_fh.readline() | |
| 142 for line in l_fh: | |
| 143 cols = line.strip().split(",") | |
| 144 if len(cols) < 9: | |
| 145 logging.error( | |
| 146 f"The row in the lineages file {os.path.basename(lineages)}" | |
| 147 + f"\ndoes not have 9 required columns: {len(cols)}" | |
| 148 + f"\n\n{lin_header.strip()}\n{line.strip()}" | |
| 149 ) | |
| 150 exit(1) | |
| 151 elif len(cols) > 9: | |
| 152 logging.info( | |
| 153 f"The row in the lineages file {os.path.basename(lineages)}" | |
| 154 + f"\nhas more than 9 required columns: {len(cols)}" | |
| 155 + f"\nRetaining only 9 columns of the following 10 columns." | |
| 156 + f"\n\n{lin_header.strip()}\n{line.strip()}" | |
| 157 ) | |
| 158 | |
| 159 if cols[0] not in seen_lineages.keys(): | |
| 160 seen_lineages[cols[0]] = ",".join(cols[1:9]) | |
| 161 | |
| 162 new_lin_fh.write(lin_header) | |
| 163 l_fh.close() | |
| 164 | |
| 165 # Read FASTA and create unique FASTA IDs. | |
| 166 logging.info(f"Creating new FASTA with unique IDs.") | |
| 167 try: | |
| 168 fna_fh = gzip.open(fna, "rt") | |
| 169 _ = fna_fh.readline() | |
| 170 except gzip.BadGzipFile: | |
| 171 logging.info( | |
| 172 f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format." | |
| 173 + "\nAttempting text parsing." | |
| 174 ) | |
| 175 fna_fh = open(fna, "r") | |
| 176 | |
| 177 for record in SeqIO.parse(fna_fh, format="fasta"): | |
| 178 seq_id = record.id | |
| 179 | |
| 180 if record.id not in seen_ids.keys(): | |
| 181 seen_ids[record.id] = 1 | |
| 182 else: | |
| 183 seen_ids[record.id] += 1 | |
| 184 | |
| 185 if seen_ids[seq_id] > 1: | |
| 186 seq_id = str(record.id) + str(seen_ids[record.id]) | |
| 187 | |
| 188 if new_lin: | |
| 189 new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n") | |
| 190 | |
| 191 write_fasta(record.seq, seq_id, new_fna_fh) | |
| 192 | |
| 193 if new_lin: | |
| 194 new_lin_fh.close() | |
| 195 | |
| 196 logging.info("Done!") | |
| 197 | |
| 198 | |
| 199 if __name__ == "__main__": | |
| 200 | |
| 201 main() |
