comparison 0.1.0/bin/dl_pdg_metadata.py @ 0:c8597e9e1a97

"planemo upload"
author kkonganti
date Mon, 27 Nov 2023 12:37:44 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c8597e9e1a97
1 #!/usr/bin/env python3
2
3 # Kranti Konganti
4
5 import argparse
6 import inspect
7 import logging
8 import os
9 import re
10 import shutil
11 import tempfile
12 from html.parser import HTMLParser
13 from urllib.request import urlopen
14
15 # Set logging format.
16 logging.basicConfig(
17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n",
18 level=logging.DEBUG,
19 )
20
21
22 # Multiple inheritence for pretty printing of help text.
23 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
24 pass
25
26
27 # HTMLParser override class to get PDG release and latest Cluster .tsv file
28 class NCBIPathogensHTMLParser(HTMLParser):
29 def __init__(self, *, convert_charrefs: bool = ...) -> None:
30 super().__init__(convert_charrefs=convert_charrefs)
31 self.reset()
32 self.href_data = list()
33
34 def handle_data(self, data):
35 self.href_data.append(data)
36
37
38 def dl_pdg(**kwargs) -> None:
39 """
40 Method to save the PDG metadata file and
41 return the latest PDG release.
42 """
43 db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()]
44
45 if (db_path or url) == None:
46 logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.")
47 exit(1)
48
49 if re.match(r"^PDG\d+\.\d+$", release):
50 url = re.sub("latest_snps", release.strip(), url)
51
52 html_parser = NCBIPathogensHTMLParser()
53 logging.info(f"Finding latest NCBI PDG release at:\n{url}")
54
55 with urlopen(url) as response:
56 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file:
57 shutil.copyfileobj(response, tmp_html_file)
58
59 with open(tmp_html_file.name, "r") as html:
60 html_parser.feed("".join(html.readlines()))
61
62 pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0)
63 pdg_release = pdg_filename.rstrip(suffix)
64 pdg_metadata_url = "/".join([url, pdg_filename])
65 pdg_release = pdg_filename.rstrip(suffix)
66 dest_dir = os.path.join(db_path, pdg_release)
67
68 logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}")
69
70 if (
71 not overwrite
72 and re.match(r".+?\.metadata\.tsv$", pdg_filename)
73 and os.path.exists(dest_dir)
74 ):
75 logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.")
76 exit(1)
77 elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename):
78 shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None
79 os.makedirs(dest_dir)
80 elif (
81 not overwrite
82 and re.match(r".+?\.metadata\.tsv$", pdg_filename)
83 and not os.path.exists(dest_dir)
84 ):
85 os.makedirs(dest_dir)
86
87 tsv_at = os.path.join(dest_dir, pdg_filename)
88 logging.info(f"Saving to:\n{tsv_at}")
89
90 with urlopen(pdg_metadata_url) as response:
91 with open(tsv_at, "w") as tsv:
92 tsv.writelines(response.read().decode("utf-8"))
93
94 html.close()
95 tmp_html_file.close()
96 os.unlink(tmp_html_file.name)
97 tsv.close()
98 response.close()
99
100 return tsv_at, dest_dir
101
102
103 def main() -> None:
104 """
105 This script is part of the `cronology_db` Nextflow workflow and is only
106 tested on POSIX sytems.
107 It:
108 1. Downloads the latest NCBI Pathogens Release metadata file, which
109 looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster
110 information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv
111 2. Generates a new metadata file with only required information such as
112 computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA
113 URL.
114 """
115
116 prog_name = os.path.basename(inspect.stack()[0].filename)
117
118 parser = argparse.ArgumentParser(
119 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses
120 )
121
122 required = parser.add_argument_group("required arguments")
123
124 parser.add_argument(
125 "-db",
126 dest="db_path",
127 default=os.getcwd(),
128 required=False,
129 help="Absolute UNIX path to a path where all results files are\nstored.",
130 )
131 parser.add_argument(
132 "-f",
133 dest="overwrite_db",
134 default=False,
135 required=False,
136 action="store_true",
137 help="Force overwrite a PDG release directory at DB path\nmentioned with -db.",
138 )
139 parser.add_argument(
140 "-org",
141 dest="organism",
142 default="Cronobacter",
143 required=False,
144 help="The organism to create the DB flat files\nfor.",
145 )
146 required.add_argument(
147 "-rel",
148 dest="release",
149 default=False,
150 required=False,
151 help="If you get a 404 error, try mentioning the actual release identifier.\n"
152 + "Ex: For Cronobacter, you can get the release identifier by going to:\n"
153 + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Cronobacter\n"
154 + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n"
155 + " would use this command-line option as:\n -rel PDG000000002.2507",
156 )
157
158 args = parser.parse_args()
159 db_path = args.db_path
160 org = args.organism
161 overwrite = args.overwrite_db
162 release = args.release
163 ncbi_pathogens_loc = "/".join(
164 ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"]
165 )
166
167 if not db_path:
168 db_path = os.getcwd()
169
170 # Save metadata
171 file, dest_dir = dl_pdg(
172 db_path=db_path,
173 url="/".join([ncbi_pathogens_loc, "Metadata"]),
174 regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"),
175 suffix=".metadata.tsv",
176 overwrite=overwrite,
177 release=release,
178 )
179
180 # Save cluster to target mapping
181 dl_pdg(
182 db_path=db_path,
183 url="/".join([ncbi_pathogens_loc, "Clusters"]),
184 regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"),
185 suffix="reference_target\.cluster_list\.tsv",
186 overwrite=overwrite,
187 release=release,
188 )
189
190 # Create accs.txt for dataformat to fetch required ACC fields
191 accs_file = os.path.join(dest_dir, "accs_all.txt")
192 with open(file, "r") as pdg_metadata_fh:
193 with open(accs_file, "w") as accs_fh:
194 for line in pdg_metadata_fh.readlines():
195 if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]:
196 continue
197 cols = line.strip().split("\t")
198 asm_acc = cols[9]
199 accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None
200 accs_fh.close()
201 pdg_metadata_fh.close()
202
203 logging.info("Finished writing accessions for dataformat tool.")
204
205
206 if __name__ == "__main__":
207 main()