Mercurial > repos > kkonganti > hfp_nowayout
diff 0.5.0/bin/create_fasta_and_lineages.py @ 0:97cd2f532efe
planemo upload
author | kkonganti |
---|---|
date | Mon, 31 Mar 2025 14:50:40 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/0.5.0/bin/create_fasta_and_lineages.py Mon Mar 31 14:50:40 2025 -0400 @@ -0,0 +1,518 @@ +#!/usr/bin/env python3 + +import argparse +import gzip +import inspect +import logging +import os +import pprint +import re +import shutil +import ssl +import tempfile +from html.parser import HTMLParser +from urllib.request import urlopen + +from Bio import SeqIO +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord + + +# Multiple inheritence for pretty printing of help text. +class MultiArgFormatClasses( + argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter +): + pass + + +# HTMLParser override class to get fna.gz and gbff.gz +class NCBIHTMLParser(HTMLParser): + def __init__(self, *, convert_charrefs: bool = ...) -> None: + super().__init__(convert_charrefs=convert_charrefs) + self.reset() + self.href_data = list() + + def handle_data(self, data): + self.href_data.append(data) + + +# Download organelle FASTA and GenBank file. +def dl_mito_seqs_and_flat_files(url: str, suffix: re, out: os.PathLike) -> os.PathLike: + """ + Method to save .fna.gz and .gbff.gz files for the + RefSeq mitochondrion release. + """ + contxt = ssl.create_default_context() + contxt.check_hostname = False + contxt.verify_mode = ssl.CERT_NONE + + if url == None: + logging.error( + "Please provide the base URL where .fna.gz and .gbff.gz" + + "\nfiles for RefSeq mitochondrion can be found." + ) + exit(1) + + if os.path.exists(out): + for file in os.listdir(out): + file_path = os.path.join(out, file) + + if suffix.match(file_path) and os.path.getsize(file_path) > 0: + logging.info( + f"The required mitochondrion file(s)\n[{os.path.basename(file_path)}]" + + " already exists.\nSkipping download from NCBI..." + + "\nPlease use -f to delete and overwrite." + ) + return file_path + else: + os.makedirs(out) + + html_parser = NCBIHTMLParser() + logging.info(f"Finding latest NCBI RefSeq mitochondrion release at:\n{url}") + + with urlopen(url, context=contxt) as response: + with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: + shutil.copyfileobj(response, tmp_html_file) + + with open(tmp_html_file.name, "r") as html: + html_parser.feed("".join(html.readlines())) + + file = suffix.search("".join(html_parser.href_data)).group(0) + file_url = "/".join([url, file + ".gz"]) + file_at = os.path.join(out, file) + + logging.info(f"Found NCBI RefSeq mitochondrian file(s):\n{file_url}") + + logging.info(f"Saving to:\n{file_at}") + + with tempfile.NamedTemporaryFile(delete=False) as tmp_gz: + with urlopen(file_url, context=contxt) as response: + tmp_gz.write(response.read()) + + with open(file_at, "w") as fh: + with gzip.open(tmp_gz.name, "rb") as web_gz: + fh.write(web_gz.read().decode("utf-8")) + + html.close() + tmp_gz.close() + tmp_html_file.close() + os.unlink(tmp_gz.name) + os.unlink(tmp_html_file.name) + fh.close() + web_gz.close() + response.close() + + return file_at + + +def get_lineages(csv: os.PathLike, cols: list) -> list: + """ + Parse the output from `ncbitax2lin` tool and + return a dict of lineages where the key is + genusspeciesstrain. + """ + lineages = dict() + if csv == None or not (os.path.exists(csv) or os.path.getsize(csv) > 0): + logging.error( + f"The CSV file [{os.path.basename(csv)}] is empty or does not exist!" + ) + exit(1) + + logging.info(f"Indexing {os.path.basename(csv)}...") + + with open(csv, "r") as csv_fh: + header_cols = csv_fh.readline().strip().split(",") + user_req_cols = [ + tcol_i for tcol_i, tcol in enumerate(header_cols) if tcol in cols + ] + cols_not_found = [tcol for tcol in cols if tcol not in header_cols] + raw_recs = 0 + + if len(cols_not_found) > 0: + logging.error( + f"The following columns do not exist in the" + + f"\nCSV file [ {os.path.basename(csv)} ]:\n" + + "".join(cols_not_found) + ) + exit(1) + elif len(user_req_cols) > 9: + logging.error( + f"Only a total of 9 columns are needed!" + + "\ntax_id,kindom,phylum,class,order,family,genus,species,strain" + ) + exit(1) + + for tax in csv_fh: + raw_recs += 1 + lcols = tax.strip().split(",") + + if bool(lcols[user_req_cols[8]]): + lineages[lcols[user_req_cols[8]]] = ",".join( + [lcols[l] for l in user_req_cols[1:]] + ) + elif bool(lcols[user_req_cols[7]]): + lineages[lcols[user_req_cols[7]]] = ",".join( + [lcols[l] for l in user_req_cols[1:8]] + [str()] + ) + + csv_fh.close() + return lineages, raw_recs + + +def from_genbank(gbk: os.PathLike, min_len: int) -> dict: + """ + Method to parse GenBank file and return + organism to latest accession mapping. + """ + accs2orgs = dict() + + if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): + logging.info( + f"The GenBank file [{os.path.basename(gbk)}] does not exist" + + "\nor is of size 0." + ) + exit(1) + + logging.info(f"Indexing {os.path.basename(gbk)}...") + + # a = open("./_accs", "w") + for record in SeqIO.parse(gbk, "genbank"): + if len(record.seq) < min_len: + continue + else: + # a.write(f"{record.id}\n") + accs2orgs[record.id] = record.annotations["organism"] + + return accs2orgs + + +def from_genbank_alt(gbk: os.PathLike) -> dict: + """ + Method to parse GenBank file and return + organism to latest accession mapping without + using BioPython's GenBank Scanner + """ + accs2orgs = dict() + accs = dict() + orgs = dict() + acc = False + acc_pat = re.compile(r"^VERSION\s+(.+)") + org_pat = re.compile(r"^\s+ORGANISM\s+(.+)") + + if not (os.path.exists(gbk) or os.path.getsize(gbk) > 0): + logging.info( + f"The GenBank file [{os.path.basename(gbk)}] does not exist" + + "\nor is of size 0." + ) + exit(1) + + logging.info( + f"Indexing {os.path.basename(gbk)} without using\nBioPython's GenBank Scanner..." + ) + + with open(gbk, "r") as gbk_fh: + for line in gbk_fh: + line = line.rstrip() + if line.startswith("VERSION") and acc_pat.match(line): + acc = acc_pat.match(line).group(1) + accs[acc] = 1 + if org_pat.match(line): + if acc and acc not in orgs.keys(): + orgs[acc] = org_pat.match(line).group(1) + elif acc and acc in orgs.keys(): + logging.error(f"Duplicate VERSION line: {acc}") + exit(1) + if len(accs.keys()) != len(orgs.keys()): + logging.error( + f"Got unequal number of organisms ({len(orgs.keys())})\n" + + f"and accessions ({len(accs.keys())})" + ) + exit(1) + else: + for acc in accs.keys(): + if acc not in orgs.keys(): + logging.error(f"ORAGANISM not found for accession: {acc}") + exit(1) + accs2orgs[acc] = orgs[acc] + + gbk_fh.close() + return accs2orgs + + +def write_fasta(seq: str, id: str, basedir: os.PathLike, suffix: str) -> None: + """ + Write sequence with no description to specified file. + """ + SeqIO.write( + SeqRecord(Seq(seq), id=id, description=str()), + os.path.join(basedir, id + suffix), + "fasta", + ) + + +# Main +def main() -> None: + """ + This script takes: + 1. Downloads the RefSeq Mitochrondrial GenBank and FASTA format files. + 2. Takes as input and output .csv.gz or .csv file generated by `ncbitax2lin`. + + and then generates a folder containing individual FASTA sequence files + per organelle, and a corresponding lineage file in CSV format. + """ + + # Set logging. + logging.basicConfig( + format="\n" + + "=" * 55 + + "\n%(asctime)s - %(levelname)s\n" + + "=" * 55 + + "\n%(message)s\n\n", + level=logging.DEBUG, + ) + + # Debug print. + ppp = pprint.PrettyPrinter(width=55) + prog_name = os.path.basename(inspect.stack()[0].filename) + + parser = argparse.ArgumentParser( + prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses + ) + + required = parser.add_argument_group("required arguments") + + required.add_argument( + "-csv", + dest="csv", + default=False, + required=True, + help="Absolute UNIX path to .csv or .csv.gz file which is generated " + + "\nby the `ncbitax2lin` tool.", + ) + parser.add_argument( + "-cols", + dest="lineage_cols", + default="tax_id,superkingdom,phylum,class,order,family,genus,species,strain", + required=False, + help="Taxonomic lineage will be built using these columns from the output of" + + "\n`ncbitax2lin` tool.", + ) + parser.add_argument( + "-url", + dest="url", + default="https://ftp.ncbi.nlm.nih.gov/refseq/release/mitochondrion", + required=False, + help="Base URL from where NCBI RefSeq mitochondrion files will be downloaded\nfrom.", + ) + parser.add_argument( + "-out", + dest="out_folder", + default=os.path.join(os.getcwd(), "organelles"), + required=False, + help="By default, the output is written to this folder.", + ) + parser.add_argument( + "-f", + dest="force_write_out", + default=False, + action="store_true", + required=False, + help="Force overwrite output directory contents.", + ) + parser.add_argument( + "--fna-suffix", + dest="fna_suffix", + default=".fna", + required=False, + help="Suffix of the individual organelle FASTA files that will be saved.", + ) + parser.add_argument( + "-ml", + dest="fa_min_len", + default=200, + required=False, + help="Minimum length of the FASTA sequence for it to be considered for" + + "\nfurther processing", + ) + parser.add_argument( + "--skip-per-fa", + dest="skip_per_fa", + default=False, + required=False, + action="store_true", + help="Do not generate per sequence FASTA file.", + ) + parser.add_argument( + "--alt-gb-parser", + dest="alt_gb_parser", + default=False, + required=False, + action="store_true", + help="Use alternate GenBank parser instead of BioPython's.", + ) + + # Parse defaults + args = parser.parse_args() + csv = args.csv + out = args.out_folder + overwrite = args.force_write_out + fna_suffix = args.fna_suffix + url = args.url + tax_cols = args.lineage_cols + skip_per_fa = args.skip_per_fa + alt_gb_parser = args.alt_gb_parser + min_len = int(args.fa_min_len) + tcols_pat = re.compile(r"^[\w\,]+?\w$") + mito_fna_suffix = re.compile(r".*?\.genomic\.fna") + mito_gbff_suffix = re.compile(r".*?\.genomic\.gbff") + final_lineages = os.path.join(out, "lineages.csv") + lineages_not_found = os.path.join(out, "lineages_not_found.csv") + base_fasta_dir = os.path.join(out, "fasta") + + # Basic checks + if not overwrite and os.path.exists(out): + logging.warning( + f"Output destination [{os.path.basename(out)}] already exists!" + + "\nPlease use -f to delete and overwrite." + ) + elif overwrite and os.path.exists(out): + logging.info(f"Overwrite requested. Deleting {os.path.basename(out)}...") + shutil.rmtree(out) + + if not tcols_pat.match(tax_cols): + logging.error( + f"Supplied columns' names {tax_cols} should only have words (alphanumeric) separated by a comma." + ) + exit(1) + else: + tax_cols = re.sub("\n", "", tax_cols).split(",") + + # Get .fna and .gbk files + fna = dl_mito_seqs_and_flat_files(url, mito_fna_suffix, out) + gbk = dl_mito_seqs_and_flat_files(url, mito_gbff_suffix, out) + + # Get taxonomy from ncbitax2lin + lineages, raw_recs = get_lineages(csv, tax_cols) + + # Get parsed organisms and latest accession from GenBank file. + if alt_gb_parser: + accs2orgs = from_genbank_alt(gbk) + else: + accs2orgs = from_genbank(gbk, min_len) + + # # Finally, read FASTA and create individual FASTA if lineage exists. + logging.info(f"Creating new sequences and lineages...") + + l_fh = open(final_lineages, "w") + ln_fh = open(lineages_not_found, "w") + l_fh.write( + "identifiers,superkingdom,phylum,class,order,family,genus,species,strain\n" + ) + ln_fh.write("fna_id,gbk_org\n") + passed_lookup = 0 + failed_lookup = 0 + gbk_recs_missing = 0 + skipped_len_short = 0 + + if not os.path.exists(base_fasta_dir): + os.makedirs(base_fasta_dir) + + for record in SeqIO.parse(fna, "fasta"): + if len(record.seq) < min_len: + skipped_len_short += 1 + continue + elif record.id in accs2orgs.keys(): + org_words = accs2orgs[record.id].split(" ") + else: + gbk_recs_missing += 1 + continue + + genus_species = ( + " ".join(org_words[0:2]) if len(org_words) > 2 else " ".join(org_words[0:]) + ) + + if not skip_per_fa: + write_fasta(record.seq, record.id, base_fasta_dir, fna_suffix) + + if record.id in accs2orgs.keys() and accs2orgs[record.id] in lineages.keys(): + l_fh.write(",".join([record.id, lineages[accs2orgs[record.id]]]) + "\n") + passed_lookup += 1 + elif record.id in accs2orgs.keys() and genus_species in lineages.keys(): + if len(org_words) > 2: + l_fh.write( + ",".join( + [ + record.id, + lineages[genus_species].rstrip(","), + accs2orgs[record.id], + ] + ) + + "\n" + ) + else: + l_fh.write(",".join([record.id, lineages[genus_species]]) + "\n") + passed_lookup += 1 + else: + if len(org_words) > 2: + l_fh.write( + ",".join( + [ + record.id, + "", + "", + "", + "", + "", + org_words[0], + org_words[0] + " " + org_words[1], + accs2orgs[record.id], + ] + ) + + "\n" + ) + else: + l_fh.write( + ",".join( + [ + record.id, + "", + "", + "", + "", + "", + org_words[0], + accs2orgs[record.id], + "", + ] + ) + + "\n" + ) + ln_fh.write(",".join([record.id, accs2orgs[record.id]]) + "\n") + failed_lookup += 1 + + logging.info( + f"No. of raw records present in `ncbitax2lin` [{os.path.basename(csv)}]: {raw_recs}" + + f"\nNo. of valid records collected from `ncbitax2lin` [{os.path.basename(csv)}]: {len(lineages.keys())}" + + f"\nNo. of sequences skipped (Sequence length < {min_len}): {skipped_len_short}" + + f"\nNo. of records in FASTA [{os.path.basename(fna)}]: {passed_lookup + failed_lookup}" + + f"\nNo. of records in GenBank [{os.path.basename(gbk)}]: {len(accs2orgs.keys())}" + + f"\nNo. of FASTA records for which new lineages were created: {passed_lookup}" + + f"\nNo. of FASTA records for which only genus, species and/or strain information were created: {failed_lookup}" + + f"\nNo. of FASTA records for which no GenBank records exist: {gbk_recs_missing}" + ) + + if (passed_lookup + failed_lookup) != len(accs2orgs.keys()): + logging.error( + f"The number of FASTA records written [{len(accs2orgs.keys())}]" + + f"\nis not equal to number of lineages created [{passed_lookup + failed_lookup}]!" + ) + exit(1) + else: + logging.info("Succesfully created lineages and FASTA records! Done!!") + + l_fh.close() + ln_fh.close() + + +if __name__ == "__main__": + main()