annotate 0.1.0/bin/dl_pdg_metadata.py @ 0:c8597e9e1a97

"planemo upload"
author kkonganti
date Mon, 27 Nov 2023 12:37:44 -0500
parents
children
rev   line source
kkonganti@0 1 #!/usr/bin/env python3
kkonganti@0 2
kkonganti@0 3 # Kranti Konganti
kkonganti@0 4
kkonganti@0 5 import argparse
kkonganti@0 6 import inspect
kkonganti@0 7 import logging
kkonganti@0 8 import os
kkonganti@0 9 import re
kkonganti@0 10 import shutil
kkonganti@0 11 import tempfile
kkonganti@0 12 from html.parser import HTMLParser
kkonganti@0 13 from urllib.request import urlopen
kkonganti@0 14
kkonganti@0 15 # Set logging format.
kkonganti@0 16 logging.basicConfig(
kkonganti@0 17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n",
kkonganti@0 18 level=logging.DEBUG,
kkonganti@0 19 )
kkonganti@0 20
kkonganti@0 21
kkonganti@0 22 # Multiple inheritence for pretty printing of help text.
kkonganti@0 23 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
kkonganti@0 24 pass
kkonganti@0 25
kkonganti@0 26
kkonganti@0 27 # HTMLParser override class to get PDG release and latest Cluster .tsv file
kkonganti@0 28 class NCBIPathogensHTMLParser(HTMLParser):
kkonganti@0 29 def __init__(self, *, convert_charrefs: bool = ...) -> None:
kkonganti@0 30 super().__init__(convert_charrefs=convert_charrefs)
kkonganti@0 31 self.reset()
kkonganti@0 32 self.href_data = list()
kkonganti@0 33
kkonganti@0 34 def handle_data(self, data):
kkonganti@0 35 self.href_data.append(data)
kkonganti@0 36
kkonganti@0 37
kkonganti@0 38 def dl_pdg(**kwargs) -> None:
kkonganti@0 39 """
kkonganti@0 40 Method to save the PDG metadata file and
kkonganti@0 41 return the latest PDG release.
kkonganti@0 42 """
kkonganti@0 43 db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()]
kkonganti@0 44
kkonganti@0 45 if (db_path or url) == None:
kkonganti@0 46 logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.")
kkonganti@0 47 exit(1)
kkonganti@0 48
kkonganti@0 49 if re.match(r"^PDG\d+\.\d+$", release):
kkonganti@0 50 url = re.sub("latest_snps", release.strip(), url)
kkonganti@0 51
kkonganti@0 52 html_parser = NCBIPathogensHTMLParser()
kkonganti@0 53 logging.info(f"Finding latest NCBI PDG release at:\n{url}")
kkonganti@0 54
kkonganti@0 55 with urlopen(url) as response:
kkonganti@0 56 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
kkonganti@0 57 shutil.copyfileobj(response, tmp_html_file)
kkonganti@0 58
kkonganti@0 59 with open(tmp_html_file.name, "r") as html:
kkonganti@0 60 html_parser.feed("".join(html.readlines()))
kkonganti@0 61
kkonganti@0 62 pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0)
kkonganti@0 63 pdg_release = pdg_filename.rstrip(suffix)
kkonganti@0 64 pdg_metadata_url = "/".join([url, pdg_filename])
kkonganti@0 65 pdg_release = pdg_filename.rstrip(suffix)
kkonganti@0 66 dest_dir = os.path.join(db_path, pdg_release)
kkonganti@0 67
kkonganti@0 68 logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}")
kkonganti@0 69
kkonganti@0 70 if (
kkonganti@0 71 not overwrite
kkonganti@0 72 and re.match(r".+?\.metadata\.tsv$", pdg_filename)
kkonganti@0 73 and os.path.exists(dest_dir)
kkonganti@0 74 ):
kkonganti@0 75 logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.")
kkonganti@0 76 exit(1)
kkonganti@0 77 elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename):
kkonganti@0 78 shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None
kkonganti@0 79 os.makedirs(dest_dir)
kkonganti@0 80 elif (
kkonganti@0 81 not overwrite
kkonganti@0 82 and re.match(r".+?\.metadata\.tsv$", pdg_filename)
kkonganti@0 83 and not os.path.exists(dest_dir)
kkonganti@0 84 ):
kkonganti@0 85 os.makedirs(dest_dir)
kkonganti@0 86
kkonganti@0 87 tsv_at = os.path.join(dest_dir, pdg_filename)
kkonganti@0 88 logging.info(f"Saving to:\n{tsv_at}")
kkonganti@0 89
kkonganti@0 90 with urlopen(pdg_metadata_url) as response:
kkonganti@0 91 with open(tsv_at, "w") as tsv:
kkonganti@0 92 tsv.writelines(response.read().decode("utf-8"))
kkonganti@0 93
kkonganti@0 94 html.close()
kkonganti@0 95 tmp_html_file.close()
kkonganti@0 96 os.unlink(tmp_html_file.name)
kkonganti@0 97 tsv.close()
kkonganti@0 98 response.close()
kkonganti@0 99
kkonganti@0 100 return tsv_at, dest_dir
kkonganti@0 101
kkonganti@0 102
kkonganti@0 103 def main() -> None:
kkonganti@0 104 """
kkonganti@0 105 This script is part of the `cronology_db` Nextflow workflow and is only
kkonganti@0 106 tested on POSIX sytems.
kkonganti@0 107 It:
kkonganti@0 108 1. Downloads the latest NCBI Pathogens Release metadata file, which
kkonganti@0 109 looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster
kkonganti@0 110 information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv
kkonganti@0 111 2. Generates a new metadata file with only required information such as
kkonganti@0 112 computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA
kkonganti@0 113 URL.
kkonganti@0 114 """
kkonganti@0 115
kkonganti@0 116 prog_name = os.path.basename(inspect.stack()[0].filename)
kkonganti@0 117
kkonganti@0 118 parser = argparse.ArgumentParser(
kkonganti@0 119 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
kkonganti@0 120 )
kkonganti@0 121
kkonganti@0 122 required = parser.add_argument_group("required arguments")
kkonganti@0 123
kkonganti@0 124 parser.add_argument(
kkonganti@0 125 "-db",
kkonganti@0 126 dest="db_path",
kkonganti@0 127 default=os.getcwd(),
kkonganti@0 128 required=False,
kkonganti@0 129 help="Absolute UNIX path to a path where all results files are\nstored.",
kkonganti@0 130 )
kkonganti@0 131 parser.add_argument(
kkonganti@0 132 "-f",
kkonganti@0 133 dest="overwrite_db",
kkonganti@0 134 default=False,
kkonganti@0 135 required=False,
kkonganti@0 136 action="store_true",
kkonganti@0 137 help="Force overwrite a PDG release directory at DB path\nmentioned with -db.",
kkonganti@0 138 )
kkonganti@0 139 parser.add_argument(
kkonganti@0 140 "-org",
kkonganti@0 141 dest="organism",
kkonganti@0 142 default="Cronobacter",
kkonganti@0 143 required=False,
kkonganti@0 144 help="The organism to create the DB flat files\nfor.",
kkonganti@0 145 )
kkonganti@0 146 required.add_argument(
kkonganti@0 147 "-rel",
kkonganti@0 148 dest="release",
kkonganti@0 149 default=False,
kkonganti@0 150 required=False,
kkonganti@0 151 help="If you get a 404 error, try mentioning the actual release identifier.\n"
kkonganti@0 152 + "Ex: For Cronobacter, you can get the release identifier by going to:\n"
kkonganti@0 153 + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Cronobacter\n"
kkonganti@0 154 + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n"
kkonganti@0 155 + " would use this command-line option as:\n -rel PDG000000002.2507",
kkonganti@0 156 )
kkonganti@0 157
kkonganti@0 158 args = parser.parse_args()
kkonganti@0 159 db_path = args.db_path
kkonganti@0 160 org = args.organism
kkonganti@0 161 overwrite = args.overwrite_db
kkonganti@0 162 release = args.release
kkonganti@0 163 ncbi_pathogens_loc = "/".join(
kkonganti@0 164 ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"]
kkonganti@0 165 )
kkonganti@0 166
kkonganti@0 167 if not db_path:
kkonganti@0 168 db_path = os.getcwd()
kkonganti@0 169
kkonganti@0 170 # Save metadata
kkonganti@0 171 file, dest_dir = dl_pdg(
kkonganti@0 172 db_path=db_path,
kkonganti@0 173 url="/".join([ncbi_pathogens_loc, "Metadata"]),
kkonganti@0 174 regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"),
kkonganti@0 175 suffix=".metadata.tsv",
kkonganti@0 176 overwrite=overwrite,
kkonganti@0 177 release=release,
kkonganti@0 178 )
kkonganti@0 179
kkonganti@0 180 # Save cluster to target mapping
kkonganti@0 181 dl_pdg(
kkonganti@0 182 db_path=db_path,
kkonganti@0 183 url="/".join([ncbi_pathogens_loc, "Clusters"]),
kkonganti@0 184 regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"),
kkonganti@0 185 suffix="reference_target\.cluster_list\.tsv",
kkonganti@0 186 overwrite=overwrite,
kkonganti@0 187 release=release,
kkonganti@0 188 )
kkonganti@0 189
kkonganti@0 190 # Create accs.txt for dataformat to fetch required ACC fields
kkonganti@0 191 accs_file = os.path.join(dest_dir, "accs_all.txt")
kkonganti@0 192 with open(file, "r") as pdg_metadata_fh:
kkonganti@0 193 with open(accs_file, "w") as accs_fh:
kkonganti@0 194 for line in pdg_metadata_fh.readlines():
kkonganti@0 195 if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]:
kkonganti@0 196 continue
kkonganti@0 197 cols = line.strip().split("\t")
kkonganti@0 198 asm_acc = cols[9]
kkonganti@0 199 accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None
kkonganti@0 200 accs_fh.close()
kkonganti@0 201 pdg_metadata_fh.close()
kkonganti@0 202
kkonganti@0 203 logging.info("Finished writing accessions for dataformat tool.")
kkonganti@0 204
kkonganti@0 205
kkonganti@0 206 if __name__ == "__main__":
kkonganti@0 207 main()