Mercurial > repos > kkonganti > cfsan_bettercallsal
comparison 0.5.0/bin/dl_pdg_metadata.py @ 1:365849f031fd
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 05 Jun 2023 18:48:51 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:a4b1ee4b68b1 | 1:365849f031fd |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Kranti Konganti | |
4 | |
5 import os | |
6 import shutil | |
7 import tempfile | |
8 import argparse | |
9 import inspect | |
10 import logging | |
11 import re | |
12 from urllib.request import urlopen | |
13 from html.parser import HTMLParser | |
14 | |
15 # Set logging.f | |
16 logging.basicConfig( | |
17 format="\n" + "=" * 55 + "\n%(asctime)s - %(levelname)s\n" + "=" * 55 + "\n%(message)s\n", | |
18 level=logging.DEBUG, | |
19 ) | |
20 | |
21 # Multiple inheritence for pretty printing of help text. | |
22 class MultiArgFormatClasses(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): | |
23 pass | |
24 | |
25 | |
26 # HTMLParser override class to get PDG release and latest Cluster .tsv file | |
27 class NCBIPathogensHTMLParser(HTMLParser): | |
28 def __init__(self, *, convert_charrefs: bool = ...) -> None: | |
29 super().__init__(convert_charrefs=convert_charrefs) | |
30 self.reset() | |
31 self.href_data = list() | |
32 | |
33 def handle_data(self, data): | |
34 self.href_data.append(data) | |
35 | |
36 | |
37 def dl_pdg(**kwargs) -> None: | |
38 """ | |
39 Method to save the PDG metadata file and | |
40 return the latest PDG release. | |
41 """ | |
42 db_path, url, regex, suffix, overwrite, release = [kwargs[k] for k in kwargs.keys()] | |
43 | |
44 if (db_path or url) == None: | |
45 logging.error("Please provide absolute UNIX path\n" + "to store the result DB flat files.") | |
46 exit(1) | |
47 | |
48 if re.match(r"^PDG\d+\.\d+$", release): | |
49 url = re.sub("latest_snps", release.strip(), url) | |
50 | |
51 html_parser = NCBIPathogensHTMLParser() | |
52 logging.info(f"Finding latest NCBI PDG release at:\n{url}") | |
53 | |
54 with urlopen(url) as response: | |
55 with tempfile.NamedTemporaryFile(delete=False) as tmp_html_file: | |
56 shutil.copyfileobj(response, tmp_html_file) | |
57 | |
58 with open(tmp_html_file.name, "r") as html: | |
59 html_parser.feed("".join(html.readlines())) | |
60 | |
61 pdg_filename = re.search(regex, "".join(html_parser.href_data)).group(0) | |
62 pdg_release = pdg_filename.rstrip(suffix) | |
63 pdg_metadata_url = "/".join([url, pdg_filename]) | |
64 pdg_release = pdg_filename.rstrip(suffix) | |
65 dest_dir = os.path.join(db_path, pdg_release) | |
66 | |
67 logging.info(f"Found NCBI PDG file:\n{pdg_metadata_url}") | |
68 | |
69 if ( | |
70 not overwrite | |
71 and re.match(r".+?\.metadata\.tsv$", pdg_filename) | |
72 and os.path.exists(dest_dir) | |
73 ): | |
74 logging.error(f"DB path\n{dest_dir}\nalready exists. Please use -f to overwrite.") | |
75 exit(1) | |
76 elif overwrite and not re.match(r".+?\.reference_target\.cluster_list\.tsv$", pdg_filename): | |
77 shutil.rmtree(dest_dir, ignore_errors=True) if os.path.exists(dest_dir) else None | |
78 os.makedirs(dest_dir) | |
79 elif ( | |
80 not overwrite | |
81 and re.match(r".+?\.metadata\.tsv$", pdg_filename) | |
82 and not os.path.exists(dest_dir) | |
83 ): | |
84 os.makedirs(dest_dir) | |
85 | |
86 tsv_at = os.path.join(dest_dir, pdg_filename) | |
87 logging.info(f"Saving to:\n{tsv_at}") | |
88 | |
89 with urlopen(pdg_metadata_url) as response: | |
90 with open(tsv_at, "w") as tsv: | |
91 tsv.writelines(response.read().decode("utf-8")) | |
92 | |
93 html.close() | |
94 tmp_html_file.close() | |
95 os.unlink(tmp_html_file.name) | |
96 tsv.close() | |
97 response.close() | |
98 | |
99 return tsv_at, dest_dir | |
100 | |
101 | |
102 def main() -> None: | |
103 """ | |
104 This script is part of the `bettercallsal_db` Nextflow workflow and is only | |
105 tested on POSIX sytems. | |
106 It: | |
107 1. Downloads the latest NCBI Pathogens Release metadata file, which | |
108 looks like PDGXXXXXXXXXX.2504.metadata.csv and also the SNP cluster | |
109 information file which looks like PDGXXXXXXXXXX.2504.reference_target.cluster_list.tsv | |
110 2. Generates a new metadata file with only required information such as | |
111 computed_serotype, isolates GenBank or RefSeq downloadable genome FASTA | |
112 URL. | |
113 """ | |
114 | |
115 prog_name = os.path.basename(inspect.stack()[0].filename) | |
116 | |
117 parser = argparse.ArgumentParser( | |
118 prog=prog_name, description=main.__doc__, formatter_class=MultiArgFormatClasses | |
119 ) | |
120 | |
121 # required = parser.add_argument_group("required arguments") | |
122 | |
123 parser.add_argument( | |
124 "-db", | |
125 dest="db_path", | |
126 default=os.getcwd(), | |
127 required=False, | |
128 help="Absolute UNIX path to a path where all results files are\nstored.", | |
129 ) | |
130 parser.add_argument( | |
131 "-f", | |
132 dest="overwrite_db", | |
133 default=False, | |
134 required=False, | |
135 action="store_true", | |
136 help="Force overwrite a PDG release directory at DB path\nmentioned with -db.", | |
137 ) | |
138 parser.add_argument( | |
139 "-org", | |
140 dest="organism", | |
141 default="Salmonella", | |
142 required=False, | |
143 help="The organism to create the DB flat files\nfor.", | |
144 ) | |
145 parser.add_argument( | |
146 "-rel", | |
147 dest="release", | |
148 default=False, | |
149 required=False, | |
150 help="If you get a 404 error, try mentioning the actual release identifier.\n" | |
151 + "Ex: For Salmonella, you can get the release identifier by going to:\n" | |
152 + " https://ftp.ncbi.nlm.nih.gov/pathogen/Results/Salmonella\n" | |
153 + "Ex: If you want metadata beloginging to release PDG000000002.2507, then you\n" | |
154 + " would use this command-line option as:\n -rel PDG000000002.2507", | |
155 ) | |
156 | |
157 args = parser.parse_args() | |
158 db_path = args.db_path | |
159 org = args.organism | |
160 overwrite = args.overwrite_db | |
161 release = args.release | |
162 ncbi_pathogens_loc = "/".join( | |
163 ["https://ftp.ncbi.nlm.nih.gov/pathogen/Results", org, "latest_snps"] | |
164 ) | |
165 | |
166 if not db_path: | |
167 db_path = os.getcwd() | |
168 | |
169 # Save metadata | |
170 file, dest_dir = dl_pdg( | |
171 db_path=db_path, | |
172 url="/".join([ncbi_pathogens_loc, "Metadata"]), | |
173 regex=re.compile(r"PDG\d+\.\d+\.metadata\.tsv"), | |
174 suffix=".metadata.tsv", | |
175 overwrite=overwrite, | |
176 release=release, | |
177 ) | |
178 | |
179 # Save cluster to target mapping | |
180 dl_pdg( | |
181 db_path=db_path, | |
182 url="/".join([ncbi_pathogens_loc, "Clusters"]), | |
183 regex=re.compile(r"PDG\d+\.\d+\.reference_target\.cluster_list\.tsv"), | |
184 suffix="reference_target\.cluster_list\.tsv", | |
185 overwrite=overwrite, | |
186 release=release, | |
187 ) | |
188 | |
189 # Create accs.txt for dataformat to fetch required ACC fields | |
190 accs_file = os.path.join(dest_dir, "accs_all.txt") | |
191 with open(file, "r") as pdg_metadata_fh: | |
192 with open(accs_file, "w") as accs_fh: | |
193 for line in pdg_metadata_fh.readlines(): | |
194 if re.match(r"^#", line) or line in ["\n", "\n\r", "\r"]: | |
195 continue | |
196 cols = line.strip().split("\t") | |
197 asm_acc = cols[9] | |
198 accs_fh.write(f"{asm_acc}\n") if (asm_acc != "NULL") else None | |
199 accs_fh.close() | |
200 pdg_metadata_fh.close() | |
201 | |
202 logging.info("Finished writing accessions for dataformat tool.") | |
203 | |
204 | |
205 if __name__ == "__main__": | |
206 main() |