comparison urllib3/contrib/emscripten/fetch.py @ 7:5eb2d5e3bf22

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Sun, 05 May 2024 23:32:17 -0400
parents
children
comparison
equal deleted inserted replaced
6:b2745907b1eb 7:5eb2d5e3bf22
1 """
2 Support for streaming http requests in emscripten.
3
4 A few caveats -
5
6 Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed.
7 Streaming only works if you're running pyodide in a web worker.
8
9 Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch
10 operation, so it requires that you have crossOriginIsolation enabled, by serving over https
11 (or from localhost) with the two headers below set:
12
13 Cross-Origin-Opener-Policy: same-origin
14 Cross-Origin-Embedder-Policy: require-corp
15
16 You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in
17 javascript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole
18 request into a buffer and then returning it. it shows a warning in the javascript console in this case.
19
20 Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once
21 control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch.
22
23 NB: in this code, there are a lot of javascript objects. They are named js_*
24 to make it clear what type of object they are.
25 """
26 from __future__ import annotations
27
28 import io
29 import json
30 from email.parser import Parser
31 from importlib.resources import files
32 from typing import TYPE_CHECKING, Any
33
34 import js # type: ignore[import-not-found]
35 from pyodide.ffi import ( # type: ignore[import-not-found]
36 JsArray,
37 JsException,
38 JsProxy,
39 to_js,
40 )
41
42 if TYPE_CHECKING:
43 from typing_extensions import Buffer
44
45 from .request import EmscriptenRequest
46 from .response import EmscriptenResponse
47
48 """
49 There are some headers that trigger unintended CORS preflight requests.
50 See also https://github.com/koenvo/pyodide-http/issues/22
51 """
52 HEADERS_TO_IGNORE = ("user-agent",)
53
54 SUCCESS_HEADER = -1
55 SUCCESS_EOF = -2
56 ERROR_TIMEOUT = -3
57 ERROR_EXCEPTION = -4
58
59 _STREAMING_WORKER_CODE = (
60 files(__package__)
61 .joinpath("emscripten_fetch_worker.js")
62 .read_text(encoding="utf-8")
63 )
64
65
66 class _RequestError(Exception):
67 def __init__(
68 self,
69 message: str | None = None,
70 *,
71 request: EmscriptenRequest | None = None,
72 response: EmscriptenResponse | None = None,
73 ):
74 self.request = request
75 self.response = response
76 self.message = message
77 super().__init__(self.message)
78
79
80 class _StreamingError(_RequestError):
81 pass
82
83
84 class _TimeoutError(_RequestError):
85 pass
86
87
88 def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy:
89 return to_js(dict_val, dict_converter=js.Object.fromEntries)
90
91
92 class _ReadStream(io.RawIOBase):
93 def __init__(
94 self,
95 int_buffer: JsArray,
96 byte_buffer: JsArray,
97 timeout: float,
98 worker: JsProxy,
99 connection_id: int,
100 request: EmscriptenRequest,
101 ):
102 self.int_buffer = int_buffer
103 self.byte_buffer = byte_buffer
104 self.read_pos = 0
105 self.read_len = 0
106 self.connection_id = connection_id
107 self.worker = worker
108 self.timeout = int(1000 * timeout) if timeout > 0 else None
109 self.is_live = True
110 self._is_closed = False
111 self.request: EmscriptenRequest | None = request
112
113 def __del__(self) -> None:
114 self.close()
115
116 # this is compatible with _base_connection
117 def is_closed(self) -> bool:
118 return self._is_closed
119
120 # for compatibility with RawIOBase
121 @property
122 def closed(self) -> bool:
123 return self.is_closed()
124
125 def close(self) -> None:
126 if not self.is_closed():
127 self.read_len = 0
128 self.read_pos = 0
129 self.int_buffer = None
130 self.byte_buffer = None
131 self._is_closed = True
132 self.request = None
133 if self.is_live:
134 self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
135 self.is_live = False
136 super().close()
137
138 def readable(self) -> bool:
139 return True
140
141 def writable(self) -> bool:
142 return False
143
144 def seekable(self) -> bool:
145 return False
146
147 def readinto(self, byte_obj: Buffer) -> int:
148 if not self.int_buffer:
149 raise _StreamingError(
150 "No buffer for stream in _ReadStream.readinto",
151 request=self.request,
152 response=None,
153 )
154 if self.read_len == 0:
155 # wait for the worker to send something
156 js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT)
157 self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
158 if (
159 js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout)
160 == "timed-out"
161 ):
162 raise _TimeoutError
163 data_len = self.int_buffer[0]
164 if data_len > 0:
165 self.read_len = data_len
166 self.read_pos = 0
167 elif data_len == ERROR_EXCEPTION:
168 string_len = self.int_buffer[1]
169 # decode the error string
170 js_decoder = js.TextDecoder.new()
171 json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len))
172 raise _StreamingError(
173 f"Exception thrown in fetch: {json_str}",
174 request=self.request,
175 response=None,
176 )
177 else:
178 # EOF, free the buffers and return zero
179 # and free the request
180 self.is_live = False
181 self.close()
182 return 0
183 # copy from int32array to python bytes
184 ret_length = min(self.read_len, len(memoryview(byte_obj)))
185 subarray = self.byte_buffer.subarray(
186 self.read_pos, self.read_pos + ret_length
187 ).to_py()
188 memoryview(byte_obj)[0:ret_length] = subarray
189 self.read_len -= ret_length
190 self.read_pos += ret_length
191 return ret_length
192
193
194 class _StreamingFetcher:
195 def __init__(self) -> None:
196 # make web-worker and data buffer on startup
197 self.streaming_ready = False
198
199 js_data_blob = js.Blob.new(
200 [_STREAMING_WORKER_CODE], _obj_from_dict({"type": "application/javascript"})
201 )
202
203 def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None:
204 def onMsg(e: JsProxy) -> None:
205 self.streaming_ready = True
206 js_resolve_fn(e)
207
208 def onErr(e: JsProxy) -> None:
209 js_reject_fn(e) # Defensive: never happens in ci
210
211 self.js_worker.onmessage = onMsg
212 self.js_worker.onerror = onErr
213
214 js_data_url = js.URL.createObjectURL(js_data_blob)
215 self.js_worker = js.globalThis.Worker.new(js_data_url)
216 self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver)
217
218 def send(self, request: EmscriptenRequest) -> EmscriptenResponse:
219 headers = {
220 k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE
221 }
222
223 body = request.body
224 fetch_data = {"headers": headers, "body": to_js(body), "method": request.method}
225 # start the request off in the worker
226 timeout = int(1000 * request.timeout) if request.timeout > 0 else None
227 js_shared_buffer = js.SharedArrayBuffer.new(1048576)
228 js_int_buffer = js.Int32Array.new(js_shared_buffer)
229 js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8)
230
231 js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT)
232 js.Atomics.notify(js_int_buffer, 0)
233 js_absolute_url = js.URL.new(request.url, js.location).href
234 self.js_worker.postMessage(
235 _obj_from_dict(
236 {
237 "buffer": js_shared_buffer,
238 "url": js_absolute_url,
239 "fetchParams": fetch_data,
240 }
241 )
242 )
243 # wait for the worker to send something
244 js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout)
245 if js_int_buffer[0] == ERROR_TIMEOUT:
246 raise _TimeoutError(
247 "Timeout connecting to streaming request",
248 request=request,
249 response=None,
250 )
251 elif js_int_buffer[0] == SUCCESS_HEADER:
252 # got response
253 # header length is in second int of intBuffer
254 string_len = js_int_buffer[1]
255 # decode the rest to a JSON string
256 js_decoder = js.TextDecoder.new()
257 # this does a copy (the slice) because decode can't work on shared array
258 # for some silly reason
259 json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
260 # get it as an object
261 response_obj = json.loads(json_str)
262 return EmscriptenResponse(
263 request=request,
264 status_code=response_obj["status"],
265 headers=response_obj["headers"],
266 body=_ReadStream(
267 js_int_buffer,
268 js_byte_buffer,
269 request.timeout,
270 self.js_worker,
271 response_obj["connectionID"],
272 request,
273 ),
274 )
275 elif js_int_buffer[0] == ERROR_EXCEPTION:
276 string_len = js_int_buffer[1]
277 # decode the error string
278 js_decoder = js.TextDecoder.new()
279 json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
280 raise _StreamingError(
281 f"Exception thrown in fetch: {json_str}", request=request, response=None
282 )
283 else:
284 raise _StreamingError(
285 f"Unknown status from worker in fetch: {js_int_buffer[0]}",
286 request=request,
287 response=None,
288 )
289
290
291 # check if we are in a worker or not
292 def is_in_browser_main_thread() -> bool:
293 return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window
294
295
296 def is_cross_origin_isolated() -> bool:
297 return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated
298
299
300 def is_in_node() -> bool:
301 return (
302 hasattr(js, "process")
303 and hasattr(js.process, "release")
304 and hasattr(js.process.release, "name")
305 and js.process.release.name == "node"
306 )
307
308
309 def is_worker_available() -> bool:
310 return hasattr(js, "Worker") and hasattr(js, "Blob")
311
312
313 _fetcher: _StreamingFetcher | None = None
314
315 if is_worker_available() and (
316 (is_cross_origin_isolated() and not is_in_browser_main_thread())
317 and (not is_in_node())
318 ):
319 _fetcher = _StreamingFetcher()
320 else:
321 _fetcher = None
322
323
324 def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None:
325 if _fetcher and streaming_ready():
326 return _fetcher.send(request)
327 else:
328 _show_streaming_warning()
329 return None
330
331
332 _SHOWN_TIMEOUT_WARNING = False
333
334
335 def _show_timeout_warning() -> None:
336 global _SHOWN_TIMEOUT_WARNING
337 if not _SHOWN_TIMEOUT_WARNING:
338 _SHOWN_TIMEOUT_WARNING = True
339 message = "Warning: Timeout is not available on main browser thread"
340 js.console.warn(message)
341
342
343 _SHOWN_STREAMING_WARNING = False
344
345
346 def _show_streaming_warning() -> None:
347 global _SHOWN_STREAMING_WARNING
348 if not _SHOWN_STREAMING_WARNING:
349 _SHOWN_STREAMING_WARNING = True
350 message = "Can't stream HTTP requests because: \n"
351 if not is_cross_origin_isolated():
352 message += " Page is not cross-origin isolated\n"
353 if is_in_browser_main_thread():
354 message += " Python is running in main browser thread\n"
355 if not is_worker_available():
356 message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in
357 if streaming_ready() is False:
358 message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch
359 is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`"""
360 from js import console
361
362 console.warn(message)
363
364
365 def send_request(request: EmscriptenRequest) -> EmscriptenResponse:
366 try:
367 js_xhr = js.XMLHttpRequest.new()
368
369 if not is_in_browser_main_thread():
370 js_xhr.responseType = "arraybuffer"
371 if request.timeout:
372 js_xhr.timeout = int(request.timeout * 1000)
373 else:
374 js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15")
375 if request.timeout:
376 # timeout isn't available on the main thread - show a warning in console
377 # if it is set
378 _show_timeout_warning()
379
380 js_xhr.open(request.method, request.url, False)
381 for name, value in request.headers.items():
382 if name.lower() not in HEADERS_TO_IGNORE:
383 js_xhr.setRequestHeader(name, value)
384
385 js_xhr.send(to_js(request.body))
386
387 headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders()))
388
389 if not is_in_browser_main_thread():
390 body = js_xhr.response.to_py().tobytes()
391 else:
392 body = js_xhr.response.encode("ISO-8859-15")
393 return EmscriptenResponse(
394 status_code=js_xhr.status, headers=headers, body=body, request=request
395 )
396 except JsException as err:
397 if err.name == "TimeoutError":
398 raise _TimeoutError(err.message, request=request)
399 elif err.name == "NetworkError":
400 raise _RequestError(err.message, request=request)
401 else:
402 # general http error
403 raise _RequestError(err.message, request=request)
404
405
406 def streaming_ready() -> bool | None:
407 if _fetcher:
408 return _fetcher.streaming_ready
409 else:
410 return None # no fetcher, return None to signify that
411
412
413 async def wait_for_streaming_ready() -> bool:
414 if _fetcher:
415 await _fetcher.js_worker_ready_promise
416 return True
417 else:
418 return False