annotate urllib3/contrib/emscripten/fetch.py @ 14:18e1cb6018fd

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Mon, 20 May 2024 02:25:23 -0400
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 """
jpayne@7 2 Support for streaming http requests in emscripten.
jpayne@7 3
jpayne@7 4 A few caveats -
jpayne@7 5
jpayne@7 6 Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed.
jpayne@7 7 Streaming only works if you're running pyodide in a web worker.
jpayne@7 8
jpayne@7 9 Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch
jpayne@7 10 operation, so it requires that you have crossOriginIsolation enabled, by serving over https
jpayne@7 11 (or from localhost) with the two headers below set:
jpayne@7 12
jpayne@7 13 Cross-Origin-Opener-Policy: same-origin
jpayne@7 14 Cross-Origin-Embedder-Policy: require-corp
jpayne@7 15
jpayne@7 16 You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in
jpayne@7 17 javascript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole
jpayne@7 18 request into a buffer and then returning it. it shows a warning in the javascript console in this case.
jpayne@7 19
jpayne@7 20 Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once
jpayne@7 21 control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch.
jpayne@7 22
jpayne@7 23 NB: in this code, there are a lot of javascript objects. They are named js_*
jpayne@7 24 to make it clear what type of object they are.
jpayne@7 25 """
jpayne@7 26 from __future__ import annotations
jpayne@7 27
jpayne@7 28 import io
jpayne@7 29 import json
jpayne@7 30 from email.parser import Parser
jpayne@7 31 from importlib.resources import files
jpayne@7 32 from typing import TYPE_CHECKING, Any
jpayne@7 33
jpayne@7 34 import js # type: ignore[import-not-found]
jpayne@7 35 from pyodide.ffi import ( # type: ignore[import-not-found]
jpayne@7 36 JsArray,
jpayne@7 37 JsException,
jpayne@7 38 JsProxy,
jpayne@7 39 to_js,
jpayne@7 40 )
jpayne@7 41
jpayne@7 42 if TYPE_CHECKING:
jpayne@7 43 from typing_extensions import Buffer
jpayne@7 44
jpayne@7 45 from .request import EmscriptenRequest
jpayne@7 46 from .response import EmscriptenResponse
jpayne@7 47
jpayne@7 48 """
jpayne@7 49 There are some headers that trigger unintended CORS preflight requests.
jpayne@7 50 See also https://github.com/koenvo/pyodide-http/issues/22
jpayne@7 51 """
jpayne@7 52 HEADERS_TO_IGNORE = ("user-agent",)
jpayne@7 53
jpayne@7 54 SUCCESS_HEADER = -1
jpayne@7 55 SUCCESS_EOF = -2
jpayne@7 56 ERROR_TIMEOUT = -3
jpayne@7 57 ERROR_EXCEPTION = -4
jpayne@7 58
jpayne@7 59 _STREAMING_WORKER_CODE = (
jpayne@7 60 files(__package__)
jpayne@7 61 .joinpath("emscripten_fetch_worker.js")
jpayne@7 62 .read_text(encoding="utf-8")
jpayne@7 63 )
jpayne@7 64
jpayne@7 65
jpayne@7 66 class _RequestError(Exception):
jpayne@7 67 def __init__(
jpayne@7 68 self,
jpayne@7 69 message: str | None = None,
jpayne@7 70 *,
jpayne@7 71 request: EmscriptenRequest | None = None,
jpayne@7 72 response: EmscriptenResponse | None = None,
jpayne@7 73 ):
jpayne@7 74 self.request = request
jpayne@7 75 self.response = response
jpayne@7 76 self.message = message
jpayne@7 77 super().__init__(self.message)
jpayne@7 78
jpayne@7 79
jpayne@7 80 class _StreamingError(_RequestError):
jpayne@7 81 pass
jpayne@7 82
jpayne@7 83
jpayne@7 84 class _TimeoutError(_RequestError):
jpayne@7 85 pass
jpayne@7 86
jpayne@7 87
jpayne@7 88 def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy:
jpayne@7 89 return to_js(dict_val, dict_converter=js.Object.fromEntries)
jpayne@7 90
jpayne@7 91
jpayne@7 92 class _ReadStream(io.RawIOBase):
jpayne@7 93 def __init__(
jpayne@7 94 self,
jpayne@7 95 int_buffer: JsArray,
jpayne@7 96 byte_buffer: JsArray,
jpayne@7 97 timeout: float,
jpayne@7 98 worker: JsProxy,
jpayne@7 99 connection_id: int,
jpayne@7 100 request: EmscriptenRequest,
jpayne@7 101 ):
jpayne@7 102 self.int_buffer = int_buffer
jpayne@7 103 self.byte_buffer = byte_buffer
jpayne@7 104 self.read_pos = 0
jpayne@7 105 self.read_len = 0
jpayne@7 106 self.connection_id = connection_id
jpayne@7 107 self.worker = worker
jpayne@7 108 self.timeout = int(1000 * timeout) if timeout > 0 else None
jpayne@7 109 self.is_live = True
jpayne@7 110 self._is_closed = False
jpayne@7 111 self.request: EmscriptenRequest | None = request
jpayne@7 112
jpayne@7 113 def __del__(self) -> None:
jpayne@7 114 self.close()
jpayne@7 115
jpayne@7 116 # this is compatible with _base_connection
jpayne@7 117 def is_closed(self) -> bool:
jpayne@7 118 return self._is_closed
jpayne@7 119
jpayne@7 120 # for compatibility with RawIOBase
jpayne@7 121 @property
jpayne@7 122 def closed(self) -> bool:
jpayne@7 123 return self.is_closed()
jpayne@7 124
jpayne@7 125 def close(self) -> None:
jpayne@7 126 if not self.is_closed():
jpayne@7 127 self.read_len = 0
jpayne@7 128 self.read_pos = 0
jpayne@7 129 self.int_buffer = None
jpayne@7 130 self.byte_buffer = None
jpayne@7 131 self._is_closed = True
jpayne@7 132 self.request = None
jpayne@7 133 if self.is_live:
jpayne@7 134 self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
jpayne@7 135 self.is_live = False
jpayne@7 136 super().close()
jpayne@7 137
jpayne@7 138 def readable(self) -> bool:
jpayne@7 139 return True
jpayne@7 140
jpayne@7 141 def writable(self) -> bool:
jpayne@7 142 return False
jpayne@7 143
jpayne@7 144 def seekable(self) -> bool:
jpayne@7 145 return False
jpayne@7 146
jpayne@7 147 def readinto(self, byte_obj: Buffer) -> int:
jpayne@7 148 if not self.int_buffer:
jpayne@7 149 raise _StreamingError(
jpayne@7 150 "No buffer for stream in _ReadStream.readinto",
jpayne@7 151 request=self.request,
jpayne@7 152 response=None,
jpayne@7 153 )
jpayne@7 154 if self.read_len == 0:
jpayne@7 155 # wait for the worker to send something
jpayne@7 156 js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT)
jpayne@7 157 self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
jpayne@7 158 if (
jpayne@7 159 js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout)
jpayne@7 160 == "timed-out"
jpayne@7 161 ):
jpayne@7 162 raise _TimeoutError
jpayne@7 163 data_len = self.int_buffer[0]
jpayne@7 164 if data_len > 0:
jpayne@7 165 self.read_len = data_len
jpayne@7 166 self.read_pos = 0
jpayne@7 167 elif data_len == ERROR_EXCEPTION:
jpayne@7 168 string_len = self.int_buffer[1]
jpayne@7 169 # decode the error string
jpayne@7 170 js_decoder = js.TextDecoder.new()
jpayne@7 171 json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len))
jpayne@7 172 raise _StreamingError(
jpayne@7 173 f"Exception thrown in fetch: {json_str}",
jpayne@7 174 request=self.request,
jpayne@7 175 response=None,
jpayne@7 176 )
jpayne@7 177 else:
jpayne@7 178 # EOF, free the buffers and return zero
jpayne@7 179 # and free the request
jpayne@7 180 self.is_live = False
jpayne@7 181 self.close()
jpayne@7 182 return 0
jpayne@7 183 # copy from int32array to python bytes
jpayne@7 184 ret_length = min(self.read_len, len(memoryview(byte_obj)))
jpayne@7 185 subarray = self.byte_buffer.subarray(
jpayne@7 186 self.read_pos, self.read_pos + ret_length
jpayne@7 187 ).to_py()
jpayne@7 188 memoryview(byte_obj)[0:ret_length] = subarray
jpayne@7 189 self.read_len -= ret_length
jpayne@7 190 self.read_pos += ret_length
jpayne@7 191 return ret_length
jpayne@7 192
jpayne@7 193
jpayne@7 194 class _StreamingFetcher:
jpayne@7 195 def __init__(self) -> None:
jpayne@7 196 # make web-worker and data buffer on startup
jpayne@7 197 self.streaming_ready = False
jpayne@7 198
jpayne@7 199 js_data_blob = js.Blob.new(
jpayne@7 200 [_STREAMING_WORKER_CODE], _obj_from_dict({"type": "application/javascript"})
jpayne@7 201 )
jpayne@7 202
jpayne@7 203 def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None:
jpayne@7 204 def onMsg(e: JsProxy) -> None:
jpayne@7 205 self.streaming_ready = True
jpayne@7 206 js_resolve_fn(e)
jpayne@7 207
jpayne@7 208 def onErr(e: JsProxy) -> None:
jpayne@7 209 js_reject_fn(e) # Defensive: never happens in ci
jpayne@7 210
jpayne@7 211 self.js_worker.onmessage = onMsg
jpayne@7 212 self.js_worker.onerror = onErr
jpayne@7 213
jpayne@7 214 js_data_url = js.URL.createObjectURL(js_data_blob)
jpayne@7 215 self.js_worker = js.globalThis.Worker.new(js_data_url)
jpayne@7 216 self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver)
jpayne@7 217
jpayne@7 218 def send(self, request: EmscriptenRequest) -> EmscriptenResponse:
jpayne@7 219 headers = {
jpayne@7 220 k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE
jpayne@7 221 }
jpayne@7 222
jpayne@7 223 body = request.body
jpayne@7 224 fetch_data = {"headers": headers, "body": to_js(body), "method": request.method}
jpayne@7 225 # start the request off in the worker
jpayne@7 226 timeout = int(1000 * request.timeout) if request.timeout > 0 else None
jpayne@7 227 js_shared_buffer = js.SharedArrayBuffer.new(1048576)
jpayne@7 228 js_int_buffer = js.Int32Array.new(js_shared_buffer)
jpayne@7 229 js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8)
jpayne@7 230
jpayne@7 231 js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT)
jpayne@7 232 js.Atomics.notify(js_int_buffer, 0)
jpayne@7 233 js_absolute_url = js.URL.new(request.url, js.location).href
jpayne@7 234 self.js_worker.postMessage(
jpayne@7 235 _obj_from_dict(
jpayne@7 236 {
jpayne@7 237 "buffer": js_shared_buffer,
jpayne@7 238 "url": js_absolute_url,
jpayne@7 239 "fetchParams": fetch_data,
jpayne@7 240 }
jpayne@7 241 )
jpayne@7 242 )
jpayne@7 243 # wait for the worker to send something
jpayne@7 244 js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout)
jpayne@7 245 if js_int_buffer[0] == ERROR_TIMEOUT:
jpayne@7 246 raise _TimeoutError(
jpayne@7 247 "Timeout connecting to streaming request",
jpayne@7 248 request=request,
jpayne@7 249 response=None,
jpayne@7 250 )
jpayne@7 251 elif js_int_buffer[0] == SUCCESS_HEADER:
jpayne@7 252 # got response
jpayne@7 253 # header length is in second int of intBuffer
jpayne@7 254 string_len = js_int_buffer[1]
jpayne@7 255 # decode the rest to a JSON string
jpayne@7 256 js_decoder = js.TextDecoder.new()
jpayne@7 257 # this does a copy (the slice) because decode can't work on shared array
jpayne@7 258 # for some silly reason
jpayne@7 259 json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
jpayne@7 260 # get it as an object
jpayne@7 261 response_obj = json.loads(json_str)
jpayne@7 262 return EmscriptenResponse(
jpayne@7 263 request=request,
jpayne@7 264 status_code=response_obj["status"],
jpayne@7 265 headers=response_obj["headers"],
jpayne@7 266 body=_ReadStream(
jpayne@7 267 js_int_buffer,
jpayne@7 268 js_byte_buffer,
jpayne@7 269 request.timeout,
jpayne@7 270 self.js_worker,
jpayne@7 271 response_obj["connectionID"],
jpayne@7 272 request,
jpayne@7 273 ),
jpayne@7 274 )
jpayne@7 275 elif js_int_buffer[0] == ERROR_EXCEPTION:
jpayne@7 276 string_len = js_int_buffer[1]
jpayne@7 277 # decode the error string
jpayne@7 278 js_decoder = js.TextDecoder.new()
jpayne@7 279 json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
jpayne@7 280 raise _StreamingError(
jpayne@7 281 f"Exception thrown in fetch: {json_str}", request=request, response=None
jpayne@7 282 )
jpayne@7 283 else:
jpayne@7 284 raise _StreamingError(
jpayne@7 285 f"Unknown status from worker in fetch: {js_int_buffer[0]}",
jpayne@7 286 request=request,
jpayne@7 287 response=None,
jpayne@7 288 )
jpayne@7 289
jpayne@7 290
jpayne@7 291 # check if we are in a worker or not
jpayne@7 292 def is_in_browser_main_thread() -> bool:
jpayne@7 293 return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window
jpayne@7 294
jpayne@7 295
jpayne@7 296 def is_cross_origin_isolated() -> bool:
jpayne@7 297 return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated
jpayne@7 298
jpayne@7 299
jpayne@7 300 def is_in_node() -> bool:
jpayne@7 301 return (
jpayne@7 302 hasattr(js, "process")
jpayne@7 303 and hasattr(js.process, "release")
jpayne@7 304 and hasattr(js.process.release, "name")
jpayne@7 305 and js.process.release.name == "node"
jpayne@7 306 )
jpayne@7 307
jpayne@7 308
jpayne@7 309 def is_worker_available() -> bool:
jpayne@7 310 return hasattr(js, "Worker") and hasattr(js, "Blob")
jpayne@7 311
jpayne@7 312
jpayne@7 313 _fetcher: _StreamingFetcher | None = None
jpayne@7 314
jpayne@7 315 if is_worker_available() and (
jpayne@7 316 (is_cross_origin_isolated() and not is_in_browser_main_thread())
jpayne@7 317 and (not is_in_node())
jpayne@7 318 ):
jpayne@7 319 _fetcher = _StreamingFetcher()
jpayne@7 320 else:
jpayne@7 321 _fetcher = None
jpayne@7 322
jpayne@7 323
jpayne@7 324 def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None:
jpayne@7 325 if _fetcher and streaming_ready():
jpayne@7 326 return _fetcher.send(request)
jpayne@7 327 else:
jpayne@7 328 _show_streaming_warning()
jpayne@7 329 return None
jpayne@7 330
jpayne@7 331
jpayne@7 332 _SHOWN_TIMEOUT_WARNING = False
jpayne@7 333
jpayne@7 334
jpayne@7 335 def _show_timeout_warning() -> None:
jpayne@7 336 global _SHOWN_TIMEOUT_WARNING
jpayne@7 337 if not _SHOWN_TIMEOUT_WARNING:
jpayne@7 338 _SHOWN_TIMEOUT_WARNING = True
jpayne@7 339 message = "Warning: Timeout is not available on main browser thread"
jpayne@7 340 js.console.warn(message)
jpayne@7 341
jpayne@7 342
jpayne@7 343 _SHOWN_STREAMING_WARNING = False
jpayne@7 344
jpayne@7 345
jpayne@7 346 def _show_streaming_warning() -> None:
jpayne@7 347 global _SHOWN_STREAMING_WARNING
jpayne@7 348 if not _SHOWN_STREAMING_WARNING:
jpayne@7 349 _SHOWN_STREAMING_WARNING = True
jpayne@7 350 message = "Can't stream HTTP requests because: \n"
jpayne@7 351 if not is_cross_origin_isolated():
jpayne@7 352 message += " Page is not cross-origin isolated\n"
jpayne@7 353 if is_in_browser_main_thread():
jpayne@7 354 message += " Python is running in main browser thread\n"
jpayne@7 355 if not is_worker_available():
jpayne@7 356 message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in
jpayne@7 357 if streaming_ready() is False:
jpayne@7 358 message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch
jpayne@7 359 is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`"""
jpayne@7 360 from js import console
jpayne@7 361
jpayne@7 362 console.warn(message)
jpayne@7 363
jpayne@7 364
jpayne@7 365 def send_request(request: EmscriptenRequest) -> EmscriptenResponse:
jpayne@7 366 try:
jpayne@7 367 js_xhr = js.XMLHttpRequest.new()
jpayne@7 368
jpayne@7 369 if not is_in_browser_main_thread():
jpayne@7 370 js_xhr.responseType = "arraybuffer"
jpayne@7 371 if request.timeout:
jpayne@7 372 js_xhr.timeout = int(request.timeout * 1000)
jpayne@7 373 else:
jpayne@7 374 js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15")
jpayne@7 375 if request.timeout:
jpayne@7 376 # timeout isn't available on the main thread - show a warning in console
jpayne@7 377 # if it is set
jpayne@7 378 _show_timeout_warning()
jpayne@7 379
jpayne@7 380 js_xhr.open(request.method, request.url, False)
jpayne@7 381 for name, value in request.headers.items():
jpayne@7 382 if name.lower() not in HEADERS_TO_IGNORE:
jpayne@7 383 js_xhr.setRequestHeader(name, value)
jpayne@7 384
jpayne@7 385 js_xhr.send(to_js(request.body))
jpayne@7 386
jpayne@7 387 headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders()))
jpayne@7 388
jpayne@7 389 if not is_in_browser_main_thread():
jpayne@7 390 body = js_xhr.response.to_py().tobytes()
jpayne@7 391 else:
jpayne@7 392 body = js_xhr.response.encode("ISO-8859-15")
jpayne@7 393 return EmscriptenResponse(
jpayne@7 394 status_code=js_xhr.status, headers=headers, body=body, request=request
jpayne@7 395 )
jpayne@7 396 except JsException as err:
jpayne@7 397 if err.name == "TimeoutError":
jpayne@7 398 raise _TimeoutError(err.message, request=request)
jpayne@7 399 elif err.name == "NetworkError":
jpayne@7 400 raise _RequestError(err.message, request=request)
jpayne@7 401 else:
jpayne@7 402 # general http error
jpayne@7 403 raise _RequestError(err.message, request=request)
jpayne@7 404
jpayne@7 405
jpayne@7 406 def streaming_ready() -> bool | None:
jpayne@7 407 if _fetcher:
jpayne@7 408 return _fetcher.streaming_ready
jpayne@7 409 else:
jpayne@7 410 return None # no fetcher, return None to signify that
jpayne@7 411
jpayne@7 412
jpayne@7 413 async def wait_for_streaming_ready() -> bool:
jpayne@7 414 if _fetcher:
jpayne@7 415 await _fetcher.js_worker_ready_promise
jpayne@7 416 return True
jpayne@7 417 else:
jpayne@7 418 return False