Mercurial > repos > jpayne > bioproject_to_srr_2
comparison urllib3/contrib/emscripten/fetch.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 """ | |
2 Support for streaming http requests in emscripten. | |
3 | |
4 A few caveats - | |
5 | |
6 Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed. | |
7 Streaming only works if you're running pyodide in a web worker. | |
8 | |
9 Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch | |
10 operation, so it requires that you have crossOriginIsolation enabled, by serving over https | |
11 (or from localhost) with the two headers below set: | |
12 | |
13 Cross-Origin-Opener-Policy: same-origin | |
14 Cross-Origin-Embedder-Policy: require-corp | |
15 | |
16 You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in | |
17 javascript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole | |
18 request into a buffer and then returning it. it shows a warning in the javascript console in this case. | |
19 | |
20 Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once | |
21 control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch. | |
22 | |
23 NB: in this code, there are a lot of javascript objects. They are named js_* | |
24 to make it clear what type of object they are. | |
25 """ | |
26 from __future__ import annotations | |
27 | |
28 import io | |
29 import json | |
30 from email.parser import Parser | |
31 from importlib.resources import files | |
32 from typing import TYPE_CHECKING, Any | |
33 | |
34 import js # type: ignore[import-not-found] | |
35 from pyodide.ffi import ( # type: ignore[import-not-found] | |
36 JsArray, | |
37 JsException, | |
38 JsProxy, | |
39 to_js, | |
40 ) | |
41 | |
42 if TYPE_CHECKING: | |
43 from typing_extensions import Buffer | |
44 | |
45 from .request import EmscriptenRequest | |
46 from .response import EmscriptenResponse | |
47 | |
48 """ | |
49 There are some headers that trigger unintended CORS preflight requests. | |
50 See also https://github.com/koenvo/pyodide-http/issues/22 | |
51 """ | |
52 HEADERS_TO_IGNORE = ("user-agent",) | |
53 | |
54 SUCCESS_HEADER = -1 | |
55 SUCCESS_EOF = -2 | |
56 ERROR_TIMEOUT = -3 | |
57 ERROR_EXCEPTION = -4 | |
58 | |
59 _STREAMING_WORKER_CODE = ( | |
60 files(__package__) | |
61 .joinpath("emscripten_fetch_worker.js") | |
62 .read_text(encoding="utf-8") | |
63 ) | |
64 | |
65 | |
66 class _RequestError(Exception): | |
67 def __init__( | |
68 self, | |
69 message: str | None = None, | |
70 *, | |
71 request: EmscriptenRequest | None = None, | |
72 response: EmscriptenResponse | None = None, | |
73 ): | |
74 self.request = request | |
75 self.response = response | |
76 self.message = message | |
77 super().__init__(self.message) | |
78 | |
79 | |
80 class _StreamingError(_RequestError): | |
81 pass | |
82 | |
83 | |
84 class _TimeoutError(_RequestError): | |
85 pass | |
86 | |
87 | |
88 def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy: | |
89 return to_js(dict_val, dict_converter=js.Object.fromEntries) | |
90 | |
91 | |
92 class _ReadStream(io.RawIOBase): | |
93 def __init__( | |
94 self, | |
95 int_buffer: JsArray, | |
96 byte_buffer: JsArray, | |
97 timeout: float, | |
98 worker: JsProxy, | |
99 connection_id: int, | |
100 request: EmscriptenRequest, | |
101 ): | |
102 self.int_buffer = int_buffer | |
103 self.byte_buffer = byte_buffer | |
104 self.read_pos = 0 | |
105 self.read_len = 0 | |
106 self.connection_id = connection_id | |
107 self.worker = worker | |
108 self.timeout = int(1000 * timeout) if timeout > 0 else None | |
109 self.is_live = True | |
110 self._is_closed = False | |
111 self.request: EmscriptenRequest | None = request | |
112 | |
113 def __del__(self) -> None: | |
114 self.close() | |
115 | |
116 # this is compatible with _base_connection | |
117 def is_closed(self) -> bool: | |
118 return self._is_closed | |
119 | |
120 # for compatibility with RawIOBase | |
121 @property | |
122 def closed(self) -> bool: | |
123 return self.is_closed() | |
124 | |
125 def close(self) -> None: | |
126 if not self.is_closed(): | |
127 self.read_len = 0 | |
128 self.read_pos = 0 | |
129 self.int_buffer = None | |
130 self.byte_buffer = None | |
131 self._is_closed = True | |
132 self.request = None | |
133 if self.is_live: | |
134 self.worker.postMessage(_obj_from_dict({"close": self.connection_id})) | |
135 self.is_live = False | |
136 super().close() | |
137 | |
138 def readable(self) -> bool: | |
139 return True | |
140 | |
141 def writable(self) -> bool: | |
142 return False | |
143 | |
144 def seekable(self) -> bool: | |
145 return False | |
146 | |
147 def readinto(self, byte_obj: Buffer) -> int: | |
148 if not self.int_buffer: | |
149 raise _StreamingError( | |
150 "No buffer for stream in _ReadStream.readinto", | |
151 request=self.request, | |
152 response=None, | |
153 ) | |
154 if self.read_len == 0: | |
155 # wait for the worker to send something | |
156 js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT) | |
157 self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id})) | |
158 if ( | |
159 js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout) | |
160 == "timed-out" | |
161 ): | |
162 raise _TimeoutError | |
163 data_len = self.int_buffer[0] | |
164 if data_len > 0: | |
165 self.read_len = data_len | |
166 self.read_pos = 0 | |
167 elif data_len == ERROR_EXCEPTION: | |
168 string_len = self.int_buffer[1] | |
169 # decode the error string | |
170 js_decoder = js.TextDecoder.new() | |
171 json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len)) | |
172 raise _StreamingError( | |
173 f"Exception thrown in fetch: {json_str}", | |
174 request=self.request, | |
175 response=None, | |
176 ) | |
177 else: | |
178 # EOF, free the buffers and return zero | |
179 # and free the request | |
180 self.is_live = False | |
181 self.close() | |
182 return 0 | |
183 # copy from int32array to python bytes | |
184 ret_length = min(self.read_len, len(memoryview(byte_obj))) | |
185 subarray = self.byte_buffer.subarray( | |
186 self.read_pos, self.read_pos + ret_length | |
187 ).to_py() | |
188 memoryview(byte_obj)[0:ret_length] = subarray | |
189 self.read_len -= ret_length | |
190 self.read_pos += ret_length | |
191 return ret_length | |
192 | |
193 | |
194 class _StreamingFetcher: | |
195 def __init__(self) -> None: | |
196 # make web-worker and data buffer on startup | |
197 self.streaming_ready = False | |
198 | |
199 js_data_blob = js.Blob.new( | |
200 [_STREAMING_WORKER_CODE], _obj_from_dict({"type": "application/javascript"}) | |
201 ) | |
202 | |
203 def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None: | |
204 def onMsg(e: JsProxy) -> None: | |
205 self.streaming_ready = True | |
206 js_resolve_fn(e) | |
207 | |
208 def onErr(e: JsProxy) -> None: | |
209 js_reject_fn(e) # Defensive: never happens in ci | |
210 | |
211 self.js_worker.onmessage = onMsg | |
212 self.js_worker.onerror = onErr | |
213 | |
214 js_data_url = js.URL.createObjectURL(js_data_blob) | |
215 self.js_worker = js.globalThis.Worker.new(js_data_url) | |
216 self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver) | |
217 | |
218 def send(self, request: EmscriptenRequest) -> EmscriptenResponse: | |
219 headers = { | |
220 k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE | |
221 } | |
222 | |
223 body = request.body | |
224 fetch_data = {"headers": headers, "body": to_js(body), "method": request.method} | |
225 # start the request off in the worker | |
226 timeout = int(1000 * request.timeout) if request.timeout > 0 else None | |
227 js_shared_buffer = js.SharedArrayBuffer.new(1048576) | |
228 js_int_buffer = js.Int32Array.new(js_shared_buffer) | |
229 js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8) | |
230 | |
231 js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT) | |
232 js.Atomics.notify(js_int_buffer, 0) | |
233 js_absolute_url = js.URL.new(request.url, js.location).href | |
234 self.js_worker.postMessage( | |
235 _obj_from_dict( | |
236 { | |
237 "buffer": js_shared_buffer, | |
238 "url": js_absolute_url, | |
239 "fetchParams": fetch_data, | |
240 } | |
241 ) | |
242 ) | |
243 # wait for the worker to send something | |
244 js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout) | |
245 if js_int_buffer[0] == ERROR_TIMEOUT: | |
246 raise _TimeoutError( | |
247 "Timeout connecting to streaming request", | |
248 request=request, | |
249 response=None, | |
250 ) | |
251 elif js_int_buffer[0] == SUCCESS_HEADER: | |
252 # got response | |
253 # header length is in second int of intBuffer | |
254 string_len = js_int_buffer[1] | |
255 # decode the rest to a JSON string | |
256 js_decoder = js.TextDecoder.new() | |
257 # this does a copy (the slice) because decode can't work on shared array | |
258 # for some silly reason | |
259 json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) | |
260 # get it as an object | |
261 response_obj = json.loads(json_str) | |
262 return EmscriptenResponse( | |
263 request=request, | |
264 status_code=response_obj["status"], | |
265 headers=response_obj["headers"], | |
266 body=_ReadStream( | |
267 js_int_buffer, | |
268 js_byte_buffer, | |
269 request.timeout, | |
270 self.js_worker, | |
271 response_obj["connectionID"], | |
272 request, | |
273 ), | |
274 ) | |
275 elif js_int_buffer[0] == ERROR_EXCEPTION: | |
276 string_len = js_int_buffer[1] | |
277 # decode the error string | |
278 js_decoder = js.TextDecoder.new() | |
279 json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) | |
280 raise _StreamingError( | |
281 f"Exception thrown in fetch: {json_str}", request=request, response=None | |
282 ) | |
283 else: | |
284 raise _StreamingError( | |
285 f"Unknown status from worker in fetch: {js_int_buffer[0]}", | |
286 request=request, | |
287 response=None, | |
288 ) | |
289 | |
290 | |
291 # check if we are in a worker or not | |
292 def is_in_browser_main_thread() -> bool: | |
293 return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window | |
294 | |
295 | |
296 def is_cross_origin_isolated() -> bool: | |
297 return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated | |
298 | |
299 | |
300 def is_in_node() -> bool: | |
301 return ( | |
302 hasattr(js, "process") | |
303 and hasattr(js.process, "release") | |
304 and hasattr(js.process.release, "name") | |
305 and js.process.release.name == "node" | |
306 ) | |
307 | |
308 | |
309 def is_worker_available() -> bool: | |
310 return hasattr(js, "Worker") and hasattr(js, "Blob") | |
311 | |
312 | |
313 _fetcher: _StreamingFetcher | None = None | |
314 | |
315 if is_worker_available() and ( | |
316 (is_cross_origin_isolated() and not is_in_browser_main_thread()) | |
317 and (not is_in_node()) | |
318 ): | |
319 _fetcher = _StreamingFetcher() | |
320 else: | |
321 _fetcher = None | |
322 | |
323 | |
324 def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None: | |
325 if _fetcher and streaming_ready(): | |
326 return _fetcher.send(request) | |
327 else: | |
328 _show_streaming_warning() | |
329 return None | |
330 | |
331 | |
332 _SHOWN_TIMEOUT_WARNING = False | |
333 | |
334 | |
335 def _show_timeout_warning() -> None: | |
336 global _SHOWN_TIMEOUT_WARNING | |
337 if not _SHOWN_TIMEOUT_WARNING: | |
338 _SHOWN_TIMEOUT_WARNING = True | |
339 message = "Warning: Timeout is not available on main browser thread" | |
340 js.console.warn(message) | |
341 | |
342 | |
343 _SHOWN_STREAMING_WARNING = False | |
344 | |
345 | |
346 def _show_streaming_warning() -> None: | |
347 global _SHOWN_STREAMING_WARNING | |
348 if not _SHOWN_STREAMING_WARNING: | |
349 _SHOWN_STREAMING_WARNING = True | |
350 message = "Can't stream HTTP requests because: \n" | |
351 if not is_cross_origin_isolated(): | |
352 message += " Page is not cross-origin isolated\n" | |
353 if is_in_browser_main_thread(): | |
354 message += " Python is running in main browser thread\n" | |
355 if not is_worker_available(): | |
356 message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in | |
357 if streaming_ready() is False: | |
358 message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch | |
359 is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`""" | |
360 from js import console | |
361 | |
362 console.warn(message) | |
363 | |
364 | |
365 def send_request(request: EmscriptenRequest) -> EmscriptenResponse: | |
366 try: | |
367 js_xhr = js.XMLHttpRequest.new() | |
368 | |
369 if not is_in_browser_main_thread(): | |
370 js_xhr.responseType = "arraybuffer" | |
371 if request.timeout: | |
372 js_xhr.timeout = int(request.timeout * 1000) | |
373 else: | |
374 js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15") | |
375 if request.timeout: | |
376 # timeout isn't available on the main thread - show a warning in console | |
377 # if it is set | |
378 _show_timeout_warning() | |
379 | |
380 js_xhr.open(request.method, request.url, False) | |
381 for name, value in request.headers.items(): | |
382 if name.lower() not in HEADERS_TO_IGNORE: | |
383 js_xhr.setRequestHeader(name, value) | |
384 | |
385 js_xhr.send(to_js(request.body)) | |
386 | |
387 headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders())) | |
388 | |
389 if not is_in_browser_main_thread(): | |
390 body = js_xhr.response.to_py().tobytes() | |
391 else: | |
392 body = js_xhr.response.encode("ISO-8859-15") | |
393 return EmscriptenResponse( | |
394 status_code=js_xhr.status, headers=headers, body=body, request=request | |
395 ) | |
396 except JsException as err: | |
397 if err.name == "TimeoutError": | |
398 raise _TimeoutError(err.message, request=request) | |
399 elif err.name == "NetworkError": | |
400 raise _RequestError(err.message, request=request) | |
401 else: | |
402 # general http error | |
403 raise _RequestError(err.message, request=request) | |
404 | |
405 | |
406 def streaming_ready() -> bool | None: | |
407 if _fetcher: | |
408 return _fetcher.streaming_ready | |
409 else: | |
410 return None # no fetcher, return None to signify that | |
411 | |
412 | |
413 async def wait_for_streaming_ready() -> bool: | |
414 if _fetcher: | |
415 await _fetcher.js_worker_ready_promise | |
416 return True | |
417 else: | |
418 return False |