annotate 0.5.0/bin/dl_pdg_metadata.py @ 1:365849f031fd

"planemo upload"
author kkonganti
date Mon, 05 Jun 2023 18:48:51 -0400
parents
children
rev   line source
kkonganti@1 1 #!/usr/bin/env python3
kkonganti@1 2
kkonganti@1 3 # Kranti Konganti
kkonganti@1 4
kkonganti@1 5 import os
kkonganti@1 6 import shutil
kkonganti@1 7 import tempfile
kkonganti@1 8 import argparse
kkonganti@1 9 import inspect
kkonganti@1 10 import logging
kkonganti@1 11 import re
kkonganti@1 12 from urllib.request import urlopen
kkonganti@1 13 from html.parser import HTMLParser
kkonganti@1 14
kkonganti@1 15 # Set logging.f
kkonganti@1 16 logging.basicConfig(
kkonganti@1 17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n",
kkonganti@1 18 level=logging.DEBUG,
kkonganti@1 19 )
kkonganti@1 20
kkonganti@1 21 # Multiple inheritence for pretty printing of help text.
kkonganti@1 22 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
kkonganti@1 23 pass
kkonganti@1 24
kkonganti@1 25
kkonganti@1 26 # HTMLParser override class to get PDG release and latest Cluster .tsv file
kkonganti@1 27 class NCBIPathogensHTMLParser(HTMLParser):
kkonganti@1 28 def __init__(self, *, convert_charrefs: bool = ...) -> None:
kkonganti@1 29 super().__init__(convert_charrefs=convert_charrefs)
kkonganti@1 30 self.reset()
kkonganti@1 31 self.href_data = list()
kkonganti@1 32
kkonganti@1 33 def handle_data(self, data):
kkonganti@1 34 self.href_data.append(data)
kkonganti@1 35
kkonganti@1 36
kkonganti@1 37 def dl_pdg(**kwargs) -> None:
kkonganti@1 38 """
kkonganti@1 39 Method to save the PDG metadata file and
kkonganti@1 40 return the latest PDG release.
kkonganti@1 41 """
kkonganti@1 42 db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()]
kkonganti@1 43
kkonganti@1 44 if (db_path or url) == None:
kkonganti@1 45 logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.")
kkonganti@1 46 exit(1)
kkonganti@1 47
kkonganti@1 48 if re.match(r"^PDG\d+\.\d+$", release):
kkonganti@1 49 url = re.sub("latest_snps", release.strip(), url)
kkonganti@1 50
kkonganti@1 51 html_parser = NCBIPathogensHTMLParser()
kkonganti@1 52 logging.info(f"Finding latest NCBI PDG release at:\n{url}")
kkonganti@1 53
kkonganti@1 54 with urlopen(url) as response:
kkonganti@1 55 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
kkonganti@1 56 shutil.copyfileobj(response, tmp_html_file)
kkonganti@1 57
kkonganti@1 58 with open(tmp_html_file.name, "r") as html:
kkonganti@1 59 html_parser.feed("".join(html.readlines()))
kkonganti@1 60
kkonganti@1 61 pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0)
kkonganti@1 62 pdg_release = pdg_filename.rstrip(suffix)
kkonganti@1 63 pdg_metadata_url = "/".join([url, pdg_filename])
kkonganti@1 64 pdg_release = pdg_filename.rstrip(suffix)
kkonganti@1 65 dest_dir = os.path.join(db_path, pdg_release)
kkonganti@1 66
kkonganti@1 67 logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}")
kkonganti@1 68
kkonganti@1 69 if (
kkonganti@1 70 not overwrite
kkonganti@1 71 and re.match(r".+?\.metadata\.tsv$", pdg_filename)
kkonganti@1 72 and os.path.exists(dest_dir)
kkonganti@1 73 ):
kkonganti@1 74 logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.")
kkonganti@1 75 exit(1)
kkonganti@1 76 elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename):
kkonganti@1 77 shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None
kkonganti@1 78 os.makedirs(dest_dir)
kkonganti@1 79 elif (
kkonganti@1 80 not overwrite
kkonganti@1 81 and re.match(r".+?\.metadata\.tsv$", pdg_filename)
kkonganti@1 82 and not os.path.exists(dest_dir)
kkonganti@1 83 ):
kkonganti@1 84 os.makedirs(dest_dir)
kkonganti@1 85
kkonganti@1 86 tsv_at = os.path.join(dest_dir, pdg_filename)
kkonganti@1 87 logging.info(f"Saving to:\n{tsv_at}")
kkonganti@1 88
kkonganti@1 89 with urlopen(pdg_metadata_url) as response:
kkonganti@1 90 with open(tsv_at, "w") as tsv:
kkonganti@1 91 tsv.writelines(response.read().decode("utf-8"))
kkonganti@1 92
kkonganti@1 93 html.close()
kkonganti@1 94 tmp_html_file.close()
kkonganti@1 95 os.unlink(tmp_html_file.name)
kkonganti@1 96 tsv.close()
kkonganti@1 97 response.close()
kkonganti@1 98
kkonganti@1 99 return tsv_at, dest_dir
kkonganti@1 100
kkonganti@1 101
kkonganti@1 102 def main() -> None:
kkonganti@1 103 """
kkonganti@1 104 This script is part of the `bettercallsal_db` Nextflow workflow and is only
kkonganti@1 105 tested on POSIX sytems.
kkonganti@1 106 It:
kkonganti@1 107 1. Downloads the latest NCBI Pathogens Release metadata file, which
kkonganti@1 108 looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster
kkonganti@1 109 information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv
kkonganti@1 110 2. Generates a new metadata file with only required information such as
kkonganti@1 111 computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA
kkonganti@1 112 URL.
kkonganti@1 113 """
kkonganti@1 114
kkonganti@1 115 prog_name = os.path.basename(inspect.stack()[0].filename)
kkonganti@1 116
kkonganti@1 117 parser = argparse.ArgumentParser(
kkonganti@1 118 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
kkonganti@1 119 )
kkonganti@1 120
kkonganti@1 121 # required = parser.add_argument_group("required arguments")
kkonganti@1 122
kkonganti@1 123 parser.add_argument(
kkonganti@1 124 "-db",
kkonganti@1 125 dest="db_path",
kkonganti@1 126 default=os.getcwd(),
kkonganti@1 127 required=False,
kkonganti@1 128 help="Absolute UNIX path to a path where all results files are\nstored.",
kkonganti@1 129 )
kkonganti@1 130 parser.add_argument(
kkonganti@1 131 "-f",
kkonganti@1 132 dest="overwrite_db",
kkonganti@1 133 default=False,
kkonganti@1 134 required=False,
kkonganti@1 135 action="store_true",
kkonganti@1 136 help="Force overwrite a PDG release directory at DB path\nmentioned with -db.",
kkonganti@1 137 )
kkonganti@1 138 parser.add_argument(
kkonganti@1 139 "-org",
kkonganti@1 140 dest="organism",
kkonganti@1 141 default="Salmonella",
kkonganti@1 142 required=False,
kkonganti@1 143 help="The organism to create the DB flat files\nfor.",
kkonganti@1 144 )
kkonganti@1 145 parser.add_argument(
kkonganti@1 146 "-rel",
kkonganti@1 147 dest="release",
kkonganti@1 148 default=False,
kkonganti@1 149 required=False,
kkonganti@1 150 help="If you get a 404 error, try mentioning the actual release identifier.\n"
kkonganti@1 151 + "Ex: For Salmonella, you can get the release identifier by going to:\n"
kkonganti@1 152 + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Salmonella\n"
kkonganti@1 153 + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n"
kkonganti@1 154 + " would use this command-line option as:\n -rel PDG000000002.2507",
kkonganti@1 155 )
kkonganti@1 156
kkonganti@1 157 args = parser.parse_args()
kkonganti@1 158 db_path = args.db_path
kkonganti@1 159 org = args.organism
kkonganti@1 160 overwrite = args.overwrite_db
kkonganti@1 161 release = args.release
kkonganti@1 162 ncbi_pathogens_loc = "/".join(
kkonganti@1 163 ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"]
kkonganti@1 164 )
kkonganti@1 165
kkonganti@1 166 if not db_path:
kkonganti@1 167 db_path = os.getcwd()
kkonganti@1 168
kkonganti@1 169 # Save metadata
kkonganti@1 170 file, dest_dir = dl_pdg(
kkonganti@1 171 db_path=db_path,
kkonganti@1 172 url="/".join([ncbi_pathogens_loc, "Metadata"]),
kkonganti@1 173 regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"),
kkonganti@1 174 suffix=".metadata.tsv",
kkonganti@1 175 overwrite=overwrite,
kkonganti@1 176 release=release,
kkonganti@1 177 )
kkonganti@1 178
kkonganti@1 179 # Save cluster to target mapping
kkonganti@1 180 dl_pdg(
kkonganti@1 181 db_path=db_path,
kkonganti@1 182 url="/".join([ncbi_pathogens_loc, "Clusters"]),
kkonganti@1 183 regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"),
kkonganti@1 184 suffix="reference_target\.cluster_list\.tsv",
kkonganti@1 185 overwrite=overwrite,
kkonganti@1 186 release=release,
kkonganti@1 187 )
kkonganti@1 188
kkonganti@1 189 # Create accs.txt for dataformat to fetch required ACC fields
kkonganti@1 190 accs_file = os.path.join(dest_dir, "accs_all.txt")
kkonganti@1 191 with open(file, "r") as pdg_metadata_fh:
kkonganti@1 192 with open(accs_file, "w") as accs_fh:
kkonganti@1 193 for line in pdg_metadata_fh.readlines():
kkonganti@1 194 if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]:
kkonganti@1 195 continue
kkonganti@1 196 cols = line.strip().split("\t")
kkonganti@1 197 asm_acc = cols[9]
kkonganti@1 198 accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None
kkonganti@1 199 accs_fh.close()
kkonganti@1 200 pdg_metadata_fh.close()
kkonganti@1 201
kkonganti@1 202 logging.info("Finished writing accessions for dataformat tool.")
kkonganti@1 203
kkonganti@1 204
kkonganti@1 205 if __name__ == "__main__":
kkonganti@1 206 main()