kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: import argparse kkonganti@0: import gzip kkonganti@0: import inspect kkonganti@0: import logging kkonganti@0: import os kkonganti@0: import pprint kkonganti@0: import re kkonganti@0: import shutil kkonganti@0: from collections import defaultdict kkonganti@0: from typing import BinaryIO, TextIO, Union kkonganti@0: kkonganti@0: from Bio import SeqIO kkonganti@0: from Bio.Seq import Seq kkonganti@0: from Bio.SeqRecord import SeqRecord kkonganti@0: kkonganti@0: kkonganti@0: # Multiple inheritence for pretty printing of help text. kkonganti@0: class MultiArgFormatClasses( kkonganti@0: argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter kkonganti@0: ): kkonganti@0: pass kkonganti@0: kkonganti@0: kkonganti@0: def get_lineages(csv: os.PathLike) -> defaultdict: kkonganti@0: """ kkonganti@0: Parse the lineages.csv file and store a list of kkonganti@0: accessions. kkonganti@0: """ kkonganti@0: lineages = dict() kkonganti@0: if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0): kkonganti@0: logging.error( kkonganti@0: f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: logging.info(f"Indexing {os.path.basename(csv)}...") kkonganti@0: kkonganti@0: with open(csv, "r") as csv_fh: kkonganti@0: _ = csv_fh.readline().strip().split(",") kkonganti@0: for line in csv_fh: kkonganti@0: cols = line.strip().split(",") kkonganti@0: kkonganti@0: if len(cols) < 9: kkonganti@0: logging.error( kkonganti@0: f"The CSV file {os.path.basename(csv)} should have a mandatory 9 columns." kkonganti@0: + "\n\nEx: identifiers,superkingdom,phylum,class,order,family,genus,species,strain" kkonganti@0: + "\nAB211151.1,Eukaryota,Arthropoda,Malacostraca,Decapoda,Majidae,Chionoecetes,Chionoecetes opilio," kkonganti@0: + f"\n\nGot:\n{line}" kkonganti@0: ) kkonganti@0: exit(1) kkonganti@0: kkonganti@0: lineages[cols[0]] = re.sub(r"\W+", "-", "_".join(cols[7].split(" "))) kkonganti@0: kkonganti@0: csv_fh.close() kkonganti@0: return lineages kkonganti@0: kkonganti@0: kkonganti@0: def write_fasta(recs: list, basedir: os.PathLike, name: str, suffix: str) -> None: kkonganti@0: """ kkonganti@0: Write sequence with no description to a specified file. kkonganti@0: """ kkonganti@0: SeqIO.write( kkonganti@0: recs, kkonganti@0: os.path.join(basedir, name + suffix), kkonganti@0: "fasta", kkonganti@0: ) kkonganti@0: kkonganti@0: kkonganti@0: def parse_fasta(fh: Union[TextIO, BinaryIO], sp2accs: dict) -> list: kkonganti@0: """ kkonganti@0: Parse the sequences and create per species FASTA record. kkonganti@0: """ kkonganti@0: records = defaultdict() kkonganti@0: logging.info("") kkonganti@0: kkonganti@0: for record in SeqIO.parse(fh, "fasta"): kkonganti@0: kkonganti@0: id = record.id kkonganti@0: seq = record.seq kkonganti@0: kkonganti@0: if id in sp2accs.keys(): kkonganti@0: records.setdefault(sp2accs[id], []).append( kkonganti@0: SeqRecord(Seq(seq), id=id, description=str()) kkonganti@0: ) kkonganti@0: else: kkonganti@0: print(f"Lineage row does not exist for accession: {id}") kkonganti@0: kkonganti@0: logging.info(f"Collected FASTA records for {len(records.keys())} species'.") kkonganti@0: fh.close() kkonganti@0: return records kkonganti@0: kkonganti@0: kkonganti@0: # Main kkonganti@0: def main() -> None: kkonganti@0: """ kkonganti@0: This script takes: kkonganti@0: 1. The FASTA file and, kkonganti@0: 2. Takes the corresponding lineages.csv file and, kkonganti@0: kkonganti@0: then generates a folder containing individual FASTA sequence files kkonganti@0: per species. kkonganti@0: """ kkonganti@0: kkonganti@0: # Set logging. kkonganti@0: logging.basicConfig( kkonganti@0: format="\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(asctime)s - %(levelname)s\n" kkonganti@0: + "=" * 55 kkonganti@0: + "\n%(message)s\r\r", kkonganti@0: level=logging.DEBUG, kkonganti@0: ) kkonganti@0: kkonganti@0: # Debug print. kkonganti@0: ppp = pprint.PrettyPrinter(width=55) kkonganti@0: prog_name = os.path.basename(inspect.stack()[0].filename) kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser( kkonganti@0: prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses kkonganti@0: ) kkonganti@0: kkonganti@0: required = parser.add_argument_group("required arguments") kkonganti@0: kkonganti@0: required.add_argument( kkonganti@0: "-fa", kkonganti@0: dest="fna", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to the FASTA file that corresponds" kkonganti@0: + "\nto the lineages.csv file.", kkonganti@0: ) kkonganti@0: required.add_argument( kkonganti@0: "-csv", kkonganti@0: dest="csv", kkonganti@0: default=False, kkonganti@0: required=True, kkonganti@0: help="Absolute UNIX path to lineages.csv which has a guaranteed 9 " kkonganti@0: + "\ncolumns with the first being an accession.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-out", kkonganti@0: dest="out_folder", kkonganti@0: default=os.path.join(os.getcwd(), "species"), kkonganti@0: required=False, kkonganti@0: help="By default, the output is written to this\nfolder.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-f", kkonganti@0: dest="force_write_out", kkonganti@0: default=False, kkonganti@0: action="store_true", kkonganti@0: required=False, kkonganti@0: help="Force overwrite output directory contents.", kkonganti@0: ) kkonganti@0: parser.add_argument( kkonganti@0: "-suffix", kkonganti@0: dest="fna_suffix", kkonganti@0: default=".fna", kkonganti@0: required=False, kkonganti@0: help="Suffix of the individual species FASTA files\nthat will be saved.", kkonganti@0: ) kkonganti@0: kkonganti@0: # Parse defaults kkonganti@0: args = parser.parse_args() kkonganti@0: csv = args.csv kkonganti@0: fna = args.fna kkonganti@0: out = args.out_folder kkonganti@0: overwrite = args.force_write_out kkonganti@0: fna_suffix = args.fna_suffix kkonganti@0: kkonganti@0: # Basic checks kkonganti@0: if not overwrite and os.path.exists(out): kkonganti@0: logging.warning( kkonganti@0: f"Output destination [{os.path.basename(out)}] already exists!" kkonganti@0: + "\nPlease use -f to delete and overwrite." kkonganti@0: ) kkonganti@0: elif overwrite and os.path.exists(out): kkonganti@0: logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...") kkonganti@0: shutil.rmtree(out) kkonganti@0: kkonganti@0: # Get taxonomy from ncbitax2lin kkonganti@0: lineages = get_lineages(csv) kkonganti@0: kkonganti@0: logging.info(f"Creating new squences per species...") kkonganti@0: kkonganti@0: if not os.path.exists(out): kkonganti@0: os.makedirs(out) kkonganti@0: kkonganti@0: try: kkonganti@0: gz_fh = gzip.open(fna, "rt") kkonganti@0: fa_recs = parse_fasta(gz_fh, lineages) kkonganti@0: except gzip.BadGzipFile: kkonganti@0: logging.info( kkonganti@0: f"Input FASTA file {os.path.basename(csv)} is not in\nGZIP format." kkonganti@0: ) kkonganti@0: txt_fh = open(fna, "r") kkonganti@0: fa_recs = parse_fasta(txt_fh, lineages) kkonganti@0: finally: kkonganti@0: logging.info("Assigned FASTA records per species...") kkonganti@0: kkonganti@0: logging.info("Writing FASTA records per species...") kkonganti@0: kkonganti@0: for sp in fa_recs.keys(): kkonganti@0: write_fasta(fa_recs[sp], out, sp, fna_suffix) kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: main() kkonganti@0: kkonganti@0: # ~/apps/nowayout/bin/gen_per_species_fa_from_bold.py -tsv BOLD_Public.05-Feb-2024.tsv -csv ../tax.csv ─╯ kkonganti@0: kkonganti@0: # ======================================================= kkonganti@0: # 2024-02-08 21:37:28,541 - INFO kkonganti@0: # ======================================================= kkonganti@0: # Indexing tax.csv... kkonganti@0: kkonganti@0: # ======================================================= kkonganti@0: # 2024-02-08 21:38:06,567 - INFO kkonganti@0: # ======================================================= kkonganti@0: # Creating new squences per species... kkonganti@0: kkonganti@0: # ======================================================= kkonganti@0: # 2024-02-08 21:38:06,572 - INFO kkonganti@0: # ======================================================= kkonganti@0: # Input TSV file BOLD_Public.05-Feb-2024.tsv is not in kkonganti@0: # GZIP format. kkonganti@0: kkonganti@0: # ======================================================= kkonganti@0: # 2024-02-08 22:01:04,554 - INFO kkonganti@0: # ======================================================= kkonganti@0: # Collected FASTA records for 497421 species'. kkonganti@0: kkonganti@0: # ======================================================= kkonganti@0: # 2024-02-08 22:24:35,000 - INFO kkonganti@0: # ======================================================= kkonganti@0: # No. of raw records present in `ncbitax2lin` [tax.csv]: 2550767 kkonganti@0: # No. of valid records collected from `ncbitax2lin` [tax.csv]: 2134980 kkonganti@0: # No. of raw records in TSV [BOLD_Public.05-Feb-2024.tsv]: 9735210 kkonganti@0: # No. of valid records in TSV [BOLD_Public.05-Feb-2024.tsv]: 4988323 kkonganti@0: # No. of FASTA records for which new lineages were created: 4069202 kkonganti@0: # No. of FASTA records for which only genus, species and/or strain information were created: 919121 kkonganti@0: kkonganti@0: # ======================================================= kkonganti@0: # 2024-02-08 22:24:35,001 - INFO kkonganti@0: # ======================================================= kkonganti@0: # Succesfully created lineages and FASTA records! Done!!