kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: import argparse kkonganti@0: import gzip kkonganti@0: import inspect kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import pprint kkonganti@0: import shutil kkonganti@0: from typing import BinaryIO, TextIO, Union kkonganti@0: kkonganti@0: from Bio import SeqIO kkonganti@0: from Bio.Seq import Seq kkonganti@0: from Bio.SeqRecord import SeqRecord kkonganti@0: from genericpath import isdir kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses( kkonganti@0: argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter kkonganti@0: ): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: def write_fasta(seq: str, id: str, fh: Union[TextIO, BinaryIO]) -> None: kkonganti@0: """ kkonganti@0: Write sequence with no description to specified file. kkonganti@0: """ kkonganti@0: SeqIO.write( kkonganti@0: SeqRecord(Seq(seq), id=id, description=str()), kkonganti@0: fh, kkonganti@0: "fasta", kkonganti@0: ) kkonganti@0: kkonganti@0: kkonganti@0: # Main kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script takes: kkonganti@0: 1. A FASTA file in gzip or non-gzip (ASCII TXT) format and kkonganti@0: kkonganti@0: and then generates a new FASTA file with duplicate FASTA IDs replaced kkonganti@0: with a unique ID. kkonganti@0: """ kkonganti@0: kkonganti@0: # Set logging. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(asctime)s - %(levelname)s\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(message)s\n\n", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: # Debug print. kkonganti@0: ppp = pprint.PrettyPrinter(width=55) kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-fna", kkonganti@0: dest="fna", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to .fna or .fna.gz file.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-lin", kkonganti@0: dest="lineages", kkonganti@0: default=False, kkonganti@0: required=False, kkonganti@0: help="Absolute UNIX path to lineages.csv file for which the" kkonganti@0: + "\nthe duplicate IDs will be made unique corresponding to" kkonganti@0: + "\nthe FASTA IDs", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-outdir", kkonganti@0: dest="out_folder", kkonganti@0: default=os.getcwd(), kkonganti@0: required=False, kkonganti@0: help="By default, the output is written to this\nfolder.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-f", kkonganti@0: dest="force_write_out", kkonganti@0: default=False, kkonganti@0: action="store_true", kkonganti@0: required=False, kkonganti@0: help="Force overwrite the output file.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "--fna-suffix", kkonganti@0: dest="fna_suffix", kkonganti@0: default=".fna", kkonganti@0: required=False, kkonganti@0: help="Suffix of the output FASTA file.", kkonganti@0: ) kkonganti@0: kkonganti@0: # Parse defaults kkonganti@0: args = parser.parse_args() kkonganti@0: fna = args.fna kkonganti@0: lineages = args.lineages kkonganti@0: outdir = args.out_folder kkonganti@0: overwrite = args.force_write_out kkonganti@0: fna_suffix = args.fna_suffix kkonganti@0: new_fna = os.path.join( kkonganti@0: outdir, os.path.basename(fna).split(".")[0] + "_dedup_ids" + fna_suffix kkonganti@0: ) kkonganti@0: lin_header = False kkonganti@0: new_lin = False kkonganti@0: seen_ids = dict() kkonganti@0: seen_lineages = dict() kkonganti@0: kkonganti@0: # Basic checks kkonganti@0: if not overwrite and os.path.exists(new_fna): kkonganti@0: logging.warning( kkonganti@0: f"Output destination [{os.path.basename(new_fna)}] already exists!" kkonganti@0: + "\nPlease use -f to delete and overwrite." kkonganti@0: ) kkonganti@0: elif overwrite and os.path.exists(new_fna): kkonganti@0: logging.info(f"Overwrite requested. Deleting {os.path.basename(new_fna)}...") kkonganti@0: if os.path.isdir(new_fna): kkonganti@0: shutil.rmtree(new_fna) kkonganti@0: else: kkonganti@0: os.remove(new_fna) kkonganti@0: kkonganti@0: # Prepare for writing kkonganti@0: new_fna_fh = open(new_fna, "+at") kkonganti@0: kkonganti@0: # If lineages file is mentioned, index it. kkonganti@0: if lineages and os.path.exists(lineages) and os.path.getsize(lineages) > 0: kkonganti@0: new_lin = os.path.join(os.getcwd(), os.path.basename(lineages) + "_dedup.csv") kkonganti@0: new_lin_fh = open(new_lin, "w") kkonganti@0: with open(lineages, "r") as l_fh: kkonganti@0: lin_header = l_fh.readline() kkonganti@0: for line in l_fh: kkonganti@0: cols = line.strip().split(",") kkonganti@0: if len(cols) < 9: kkonganti@0: logging.error( kkonganti@0: f"The row in the lineages file {os.path.basename(lineages)}" kkonganti@0: + f"\ndoes not have 9 required columns: {len(cols)}" kkonganti@0: + f"\n\n{lin_header.strip()}\n{line.strip()}" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: elif len(cols) > 9: kkonganti@0: logging.info( kkonganti@0: f"The row in the lineages file {os.path.basename(lineages)}" kkonganti@0: + f"\nhas more than 9 required columns: {len(cols)}" kkonganti@0: + f"\nRetaining only 9 columns of the following 10 columns." kkonganti@0: + f"\n\n{lin_header.strip()}\n{line.strip()}" kkonganti@0: ) kkonganti@0: kkonganti@0: if cols[0] not in seen_lineages.keys(): kkonganti@0: seen_lineages[cols[0]] = ",".join(cols[1:9]) kkonganti@0: kkonganti@0: new_lin_fh.write(lin_header) kkonganti@0: l_fh.close() kkonganti@0: kkonganti@0: # Read FASTA and create unique FASTA IDs. kkonganti@0: logging.info(f"Creating new FASTA with unique IDs.") kkonganti@0: try: kkonganti@0: fna_fh = gzip.open(fna, "rt") kkonganti@0: _ = fna_fh.readline() kkonganti@0: except gzip.BadGzipFile: kkonganti@0: logging.info( kkonganti@0: f"Input FASTA file {os.path.basename(fna)} is not in\nGZIP format." kkonganti@0: + "\nAttempting text parsing." kkonganti@0: ) kkonganti@0: fna_fh = open(fna, "r") kkonganti@0: kkonganti@0: for record in SeqIO.parse(fna_fh, format="fasta"): kkonganti@0: seq_id = record.id kkonganti@0: kkonganti@0: if record.id not in seen_ids.keys(): kkonganti@0: seen_ids[record.id] = 1 kkonganti@0: else: kkonganti@0: seen_ids[record.id] += 1 kkonganti@0: kkonganti@0: if seen_ids[seq_id] > 1: kkonganti@0: seq_id = str(record.id) + str(seen_ids[record.id]) kkonganti@0: kkonganti@0: if new_lin: kkonganti@0: new_lin_fh.write(",".join([seq_id, seen_lineages[record.id]]) + "\n") kkonganti@0: kkonganti@0: write_fasta(record.seq, seq_id, new_fna_fh) kkonganti@0: kkonganti@0: if new_lin: kkonganti@0: new_lin_fh.close() kkonganti@0: kkonganti@0: logging.info("Done!") kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: kkonganti@0: main()