Mercurial > repos > kkonganti > hfp_nowayout
view 0.5.0/bin/gen_per_species_fa_from_lin.py @ 0:97cd2f532efe
planemo upload
author | kkonganti |
---|---|
date | Mon, 31 Mar 2025 14:50:40 -0400 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 import argparse import gzip import inspect import logging import os import pprint import re import shutil from collections import defaultdict from typing import BinaryIO, TextIO, Union from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord # Multiple inheritence for pretty printing of help text. class MultiArgFormatClasses( argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter ): pass def get_lineages(csv: os.PathLike) -> defaultdict: """ Parse the lineages.csv file and store a list of accessions. """ lineages = dict() if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0): logging.error( f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!" ) exit(1) logging.info(f"Indexing {os.path.basename(csv)}...") with open(csv, "r") as csv_fh: _ = csv_fh.readline().strip().split(",") for line in csv_fh: cols = line.strip().split(",") if len(cols) < 9: logging.error( f"The CSV file {os.path.basename(csv)} should have a mandatory 9 columns." + "\n\nEx: identifiers,superkingdom,phylum,class,order,family,genus,species,strain" + "\nAB211151.1,Eukaryota,Arthropoda,Malacostraca,Decapoda,Majidae,Chionoecetes,Chionoecetes opilio," + f"\n\nGot:\n{line}" ) exit(1) lineages[cols[0]] = re.sub(r"\W+", "-", "_".join(cols[7].split(" "))) csv_fh.close() return lineages def write_fasta(recs: list, basedir: os.PathLike, name: str, suffix: str) -> None: """ Write sequence with no description to a specified file. """ SeqIO.write( recs, os.path.join(basedir, name + suffix), "fasta", ) def parse_fasta(fh: Union[TextIO, BinaryIO], sp2accs: dict) -> list: """ Parse the sequences and create per species FASTA record. """ records = defaultdict() logging.info("") for record in SeqIO.parse(fh, "fasta"): id = record.id seq = record.seq if id in sp2accs.keys(): records.setdefault(sp2accs[id], []).append( SeqRecord(Seq(seq), id=id, description=str()) ) else: print(f"Lineage row does not exist for accession: {id}") logging.info(f"Collected FASTA records for {len(records.keys())} species'.") fh.close() return records # Main def main() -> None: """ This script takes: 1. The FASTA file and, 2. Takes the corresponding lineages.csv file and, then generates a folder containing individual FASTA sequence files per species. """ # Set logging. logging.basicConfig( format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\r\r", level=logging.DEBUG, ) # Debug print. ppp = pprint.PrettyPrinter(width=55) prog_name = os.path.basename(inspect.stack()[0].filename) parser = argparse.ArgumentParser( prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses ) required = parser.add_argument_group("required arguments") required.add_argument( "-fa", dest="fna", default=False, required=True, help="Absolute UNIX path to the FASTA file that corresponds" + "\nto the lineages.csv file.", ) required.add_argument( "-csv", dest="csv", default=False, required=True, help="Absolute UNIX path to lineages.csv which has a guaranteed 9 " + "\ncolumns with the first being an accession.", ) parser.add_argument( "-out", dest="out_folder", default=os.path.join(os.getcwd(), "species"), required=False, help="By default, the output is written to this\nfolder.", ) parser.add_argument( "-f", dest="force_write_out", default=False, action="store_true", required=False, help="Force overwrite output directory contents.", ) parser.add_argument( "-suffix", dest="fna_suffix", default=".fna", required=False, help="Suffix of the individual species FASTA files\nthat will be saved.", ) # Parse defaults args = parser.parse_args() csv = args.csv fna = args.fna out = args.out_folder overwrite = args.force_write_out fna_suffix = args.fna_suffix # Basic checks if not overwrite and os.path.exists(out): logging.warning( f"Output destination [{os.path.basename(out)}] already exists!" + "\nPlease use -f to delete and overwrite." ) elif overwrite and os.path.exists(out): logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...") shutil.rmtree(out) # Get taxonomy from ncbitax2lin lineages = get_lineages(csv) logging.info(f"Creating new squences per species...") if not os.path.exists(out): os.makedirs(out) try: gz_fh = gzip.open(fna, "rt") fa_recs = parse_fasta(gz_fh, lineages) except gzip.BadGzipFile: logging.info( f"Input FASTA file {os.path.basename(csv)} is not in\nGZIP format." ) txt_fh = open(fna, "r") fa_recs = parse_fasta(txt_fh, lineages) finally: logging.info("Assigned FASTA records per species...") logging.info("Writing FASTA records per species...") for sp in fa_recs.keys(): write_fasta(fa_recs[sp], out, sp, fna_suffix) if __name__ == "__main__": main() # ~/apps/nowayout/bin/gen_per_species_fa_from_bold.py -tsv BOLD_Public.05-Feb-2024.tsv -csv ../tax.csv ─╯ # ======================================================= # 2024-02-08 21:37:28,541 - INFO # ======================================================= # Indexing tax.csv... # ======================================================= # 2024-02-08 21:38:06,567 - INFO # ======================================================= # Creating new squences per species... # ======================================================= # 2024-02-08 21:38:06,572 - INFO # ======================================================= # Input TSV file BOLD_Public.05-Feb-2024.tsv is not in # GZIP format. # ======================================================= # 2024-02-08 22:01:04,554 - INFO # ======================================================= # Collected FASTA records for 497421 species'. # ======================================================= # 2024-02-08 22:24:35,000 - INFO # ======================================================= # No. of raw records present in `ncbitax2lin` [tax.csv]: 2550767 # No. of valid records collected from `ncbitax2lin` [tax.csv]: 2134980 # No. of raw records in TSV [BOLD_Public.05-Feb-2024.tsv]: 9735210 # No. of valid records in TSV [BOLD_Public.05-Feb-2024.tsv]: 4988323 # No. of FASTA records for which new lineages were created: 4069202 # No. of FASTA records for which only genus, species and/or strain information were created: 919121 # ======================================================= # 2024-02-08 22:24:35,001 - INFO # ======================================================= # Succesfully created lineages and FASTA records! Done!!