jpayne@7: """ jpayne@7: Support for streaming http requests in emscripten. jpayne@7: jpayne@7: A few caveats - jpayne@7: jpayne@7: Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed. jpayne@7: Streaming only works if you're running pyodide in a web worker. jpayne@7: jpayne@7: Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch jpayne@7: operation, so it requires that you have crossOriginIsolation enabled, by serving over https jpayne@7: (or from localhost) with the two headers below set: jpayne@7: jpayne@7: Cross-Origin-Opener-Policy: same-origin jpayne@7: Cross-Origin-Embedder-Policy: require-corp jpayne@7: jpayne@7: You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in jpayne@7: javascript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole jpayne@7: request into a buffer and then returning it. it shows a warning in the javascript console in this case. jpayne@7: jpayne@7: Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once jpayne@7: control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch. jpayne@7: jpayne@7: NB: in this code, there are a lot of javascript objects. They are named js_* jpayne@7: to make it clear what type of object they are. jpayne@7: """ jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import io jpayne@7: import json jpayne@7: from email.parser import Parser jpayne@7: from importlib.resources import files jpayne@7: from typing import TYPE_CHECKING, Any jpayne@7: jpayne@7: import js # type: ignore[import-not-found] jpayne@7: from pyodide.ffi import ( # type: ignore[import-not-found] jpayne@7: JsArray, jpayne@7: JsException, jpayne@7: JsProxy, jpayne@7: to_js, jpayne@7: ) jpayne@7: jpayne@7: if TYPE_CHECKING: jpayne@7: from typing_extensions import Buffer jpayne@7: jpayne@7: from .request import EmscriptenRequest jpayne@7: from .response import EmscriptenResponse jpayne@7: jpayne@7: """ jpayne@7: There are some headers that trigger unintended CORS preflight requests. jpayne@7: See also https://github.com/koenvo/pyodide-http/issues/22 jpayne@7: """ jpayne@7: HEADERS_TO_IGNORE = ("user-agent",) jpayne@7: jpayne@7: SUCCESS_HEADER = -1 jpayne@7: SUCCESS_EOF = -2 jpayne@7: ERROR_TIMEOUT = -3 jpayne@7: ERROR_EXCEPTION = -4 jpayne@7: jpayne@7: _STREAMING_WORKER_CODE = ( jpayne@7: files(__package__) jpayne@7: .joinpath("emscripten_fetch_worker.js") jpayne@7: .read_text(encoding="utf-8") jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: class _RequestError(Exception): jpayne@7: def __init__( jpayne@7: self, jpayne@7: message: str | None = None, jpayne@7: *, jpayne@7: request: EmscriptenRequest | None = None, jpayne@7: response: EmscriptenResponse | None = None, jpayne@7: ): jpayne@7: self.request = request jpayne@7: self.response = response jpayne@7: self.message = message jpayne@7: super().__init__(self.message) jpayne@7: jpayne@7: jpayne@7: class _StreamingError(_RequestError): jpayne@7: pass jpayne@7: jpayne@7: jpayne@7: class _TimeoutError(_RequestError): jpayne@7: pass jpayne@7: jpayne@7: jpayne@7: def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy: jpayne@7: return to_js(dict_val, dict_converter=js.Object.fromEntries) jpayne@7: jpayne@7: jpayne@7: class _ReadStream(io.RawIOBase): jpayne@7: def __init__( jpayne@7: self, jpayne@7: int_buffer: JsArray, jpayne@7: byte_buffer: JsArray, jpayne@7: timeout: float, jpayne@7: worker: JsProxy, jpayne@7: connection_id: int, jpayne@7: request: EmscriptenRequest, jpayne@7: ): jpayne@7: self.int_buffer = int_buffer jpayne@7: self.byte_buffer = byte_buffer jpayne@7: self.read_pos = 0 jpayne@7: self.read_len = 0 jpayne@7: self.connection_id = connection_id jpayne@7: self.worker = worker jpayne@7: self.timeout = int(1000 * timeout) if timeout > 0 else None jpayne@7: self.is_live = True jpayne@7: self._is_closed = False jpayne@7: self.request: EmscriptenRequest | None = request jpayne@7: jpayne@7: def __del__(self) -> None: jpayne@7: self.close() jpayne@7: jpayne@7: # this is compatible with _base_connection jpayne@7: def is_closed(self) -> bool: jpayne@7: return self._is_closed jpayne@7: jpayne@7: # for compatibility with RawIOBase jpayne@7: @property jpayne@7: def closed(self) -> bool: jpayne@7: return self.is_closed() jpayne@7: jpayne@7: def close(self) -> None: jpayne@7: if not self.is_closed(): jpayne@7: self.read_len = 0 jpayne@7: self.read_pos = 0 jpayne@7: self.int_buffer = None jpayne@7: self.byte_buffer = None jpayne@7: self._is_closed = True jpayne@7: self.request = None jpayne@7: if self.is_live: jpayne@7: self.worker.postMessage(_obj_from_dict({"close": self.connection_id})) jpayne@7: self.is_live = False jpayne@7: super().close() jpayne@7: jpayne@7: def readable(self) -> bool: jpayne@7: return True jpayne@7: jpayne@7: def writable(self) -> bool: jpayne@7: return False jpayne@7: jpayne@7: def seekable(self) -> bool: jpayne@7: return False jpayne@7: jpayne@7: def readinto(self, byte_obj: Buffer) -> int: jpayne@7: if not self.int_buffer: jpayne@7: raise _StreamingError( jpayne@7: "No buffer for stream in _ReadStream.readinto", jpayne@7: request=self.request, jpayne@7: response=None, jpayne@7: ) jpayne@7: if self.read_len == 0: jpayne@7: # wait for the worker to send something jpayne@7: js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT) jpayne@7: self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id})) jpayne@7: if ( jpayne@7: js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout) jpayne@7: == "timed-out" jpayne@7: ): jpayne@7: raise _TimeoutError jpayne@7: data_len = self.int_buffer[0] jpayne@7: if data_len > 0: jpayne@7: self.read_len = data_len jpayne@7: self.read_pos = 0 jpayne@7: elif data_len == ERROR_EXCEPTION: jpayne@7: string_len = self.int_buffer[1] jpayne@7: # decode the error string jpayne@7: js_decoder = js.TextDecoder.new() jpayne@7: json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len)) jpayne@7: raise _StreamingError( jpayne@7: f"Exception thrown in fetch: {json_str}", jpayne@7: request=self.request, jpayne@7: response=None, jpayne@7: ) jpayne@7: else: jpayne@7: # EOF, free the buffers and return zero jpayne@7: # and free the request jpayne@7: self.is_live = False jpayne@7: self.close() jpayne@7: return 0 jpayne@7: # copy from int32array to python bytes jpayne@7: ret_length = min(self.read_len, len(memoryview(byte_obj))) jpayne@7: subarray = self.byte_buffer.subarray( jpayne@7: self.read_pos, self.read_pos + ret_length jpayne@7: ).to_py() jpayne@7: memoryview(byte_obj)[0:ret_length] = subarray jpayne@7: self.read_len -= ret_length jpayne@7: self.read_pos += ret_length jpayne@7: return ret_length jpayne@7: jpayne@7: jpayne@7: class _StreamingFetcher: jpayne@7: def __init__(self) -> None: jpayne@7: # make web-worker and data buffer on startup jpayne@7: self.streaming_ready = False jpayne@7: jpayne@7: js_data_blob = js.Blob.new( jpayne@7: [_STREAMING_WORKER_CODE], _obj_from_dict({"type": "application/javascript"}) jpayne@7: ) jpayne@7: jpayne@7: def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None: jpayne@7: def onMsg(e: JsProxy) -> None: jpayne@7: self.streaming_ready = True jpayne@7: js_resolve_fn(e) jpayne@7: jpayne@7: def onErr(e: JsProxy) -> None: jpayne@7: js_reject_fn(e) # Defensive: never happens in ci jpayne@7: jpayne@7: self.js_worker.onmessage = onMsg jpayne@7: self.js_worker.onerror = onErr jpayne@7: jpayne@7: js_data_url = js.URL.createObjectURL(js_data_blob) jpayne@7: self.js_worker = js.globalThis.Worker.new(js_data_url) jpayne@7: self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver) jpayne@7: jpayne@7: def send(self, request: EmscriptenRequest) -> EmscriptenResponse: jpayne@7: headers = { jpayne@7: k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE jpayne@7: } jpayne@7: jpayne@7: body = request.body jpayne@7: fetch_data = {"headers": headers, "body": to_js(body), "method": request.method} jpayne@7: # start the request off in the worker jpayne@7: timeout = int(1000 * request.timeout) if request.timeout > 0 else None jpayne@7: js_shared_buffer = js.SharedArrayBuffer.new(1048576) jpayne@7: js_int_buffer = js.Int32Array.new(js_shared_buffer) jpayne@7: js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8) jpayne@7: jpayne@7: js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT) jpayne@7: js.Atomics.notify(js_int_buffer, 0) jpayne@7: js_absolute_url = js.URL.new(request.url, js.location).href jpayne@7: self.js_worker.postMessage( jpayne@7: _obj_from_dict( jpayne@7: { jpayne@7: "buffer": js_shared_buffer, jpayne@7: "url": js_absolute_url, jpayne@7: "fetchParams": fetch_data, jpayne@7: } jpayne@7: ) jpayne@7: ) jpayne@7: # wait for the worker to send something jpayne@7: js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout) jpayne@7: if js_int_buffer[0] == ERROR_TIMEOUT: jpayne@7: raise _TimeoutError( jpayne@7: "Timeout connecting to streaming request", jpayne@7: request=request, jpayne@7: response=None, jpayne@7: ) jpayne@7: elif js_int_buffer[0] == SUCCESS_HEADER: jpayne@7: # got response jpayne@7: # header length is in second int of intBuffer jpayne@7: string_len = js_int_buffer[1] jpayne@7: # decode the rest to a JSON string jpayne@7: js_decoder = js.TextDecoder.new() jpayne@7: # this does a copy (the slice) because decode can't work on shared array jpayne@7: # for some silly reason jpayne@7: json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) jpayne@7: # get it as an object jpayne@7: response_obj = json.loads(json_str) jpayne@7: return EmscriptenResponse( jpayne@7: request=request, jpayne@7: status_code=response_obj["status"], jpayne@7: headers=response_obj["headers"], jpayne@7: body=_ReadStream( jpayne@7: js_int_buffer, jpayne@7: js_byte_buffer, jpayne@7: request.timeout, jpayne@7: self.js_worker, jpayne@7: response_obj["connectionID"], jpayne@7: request, jpayne@7: ), jpayne@7: ) jpayne@7: elif js_int_buffer[0] == ERROR_EXCEPTION: jpayne@7: string_len = js_int_buffer[1] jpayne@7: # decode the error string jpayne@7: js_decoder = js.TextDecoder.new() jpayne@7: json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) jpayne@7: raise _StreamingError( jpayne@7: f"Exception thrown in fetch: {json_str}", request=request, response=None jpayne@7: ) jpayne@7: else: jpayne@7: raise _StreamingError( jpayne@7: f"Unknown status from worker in fetch: {js_int_buffer[0]}", jpayne@7: request=request, jpayne@7: response=None, jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: # check if we are in a worker or not jpayne@7: def is_in_browser_main_thread() -> bool: jpayne@7: return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window jpayne@7: jpayne@7: jpayne@7: def is_cross_origin_isolated() -> bool: jpayne@7: return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated jpayne@7: jpayne@7: jpayne@7: def is_in_node() -> bool: jpayne@7: return ( jpayne@7: hasattr(js, "process") jpayne@7: and hasattr(js.process, "release") jpayne@7: and hasattr(js.process.release, "name") jpayne@7: and js.process.release.name == "node" jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: def is_worker_available() -> bool: jpayne@7: return hasattr(js, "Worker") and hasattr(js, "Blob") jpayne@7: jpayne@7: jpayne@7: _fetcher: _StreamingFetcher | None = None jpayne@7: jpayne@7: if is_worker_available() and ( jpayne@7: (is_cross_origin_isolated() and not is_in_browser_main_thread()) jpayne@7: and (not is_in_node()) jpayne@7: ): jpayne@7: _fetcher = _StreamingFetcher() jpayne@7: else: jpayne@7: _fetcher = None jpayne@7: jpayne@7: jpayne@7: def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None: jpayne@7: if _fetcher and streaming_ready(): jpayne@7: return _fetcher.send(request) jpayne@7: else: jpayne@7: _show_streaming_warning() jpayne@7: return None jpayne@7: jpayne@7: jpayne@7: _SHOWN_TIMEOUT_WARNING = False jpayne@7: jpayne@7: jpayne@7: def _show_timeout_warning() -> None: jpayne@7: global _SHOWN_TIMEOUT_WARNING jpayne@7: if not _SHOWN_TIMEOUT_WARNING: jpayne@7: _SHOWN_TIMEOUT_WARNING = True jpayne@7: message = "Warning: Timeout is not available on main browser thread" jpayne@7: js.console.warn(message) jpayne@7: jpayne@7: jpayne@7: _SHOWN_STREAMING_WARNING = False jpayne@7: jpayne@7: jpayne@7: def _show_streaming_warning() -> None: jpayne@7: global _SHOWN_STREAMING_WARNING jpayne@7: if not _SHOWN_STREAMING_WARNING: jpayne@7: _SHOWN_STREAMING_WARNING = True jpayne@7: message = "Can't stream HTTP requests because: \n" jpayne@7: if not is_cross_origin_isolated(): jpayne@7: message += " Page is not cross-origin isolated\n" jpayne@7: if is_in_browser_main_thread(): jpayne@7: message += " Python is running in main browser thread\n" jpayne@7: if not is_worker_available(): jpayne@7: message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in jpayne@7: if streaming_ready() is False: jpayne@7: message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch jpayne@7: is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`""" jpayne@7: from js import console jpayne@7: jpayne@7: console.warn(message) jpayne@7: jpayne@7: jpayne@7: def send_request(request: EmscriptenRequest) -> EmscriptenResponse: jpayne@7: try: jpayne@7: js_xhr = js.XMLHttpRequest.new() jpayne@7: jpayne@7: if not is_in_browser_main_thread(): jpayne@7: js_xhr.responseType = "arraybuffer" jpayne@7: if request.timeout: jpayne@7: js_xhr.timeout = int(request.timeout * 1000) jpayne@7: else: jpayne@7: js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15") jpayne@7: if request.timeout: jpayne@7: # timeout isn't available on the main thread - show a warning in console jpayne@7: # if it is set jpayne@7: _show_timeout_warning() jpayne@7: jpayne@7: js_xhr.open(request.method, request.url, False) jpayne@7: for name, value in request.headers.items(): jpayne@7: if name.lower() not in HEADERS_TO_IGNORE: jpayne@7: js_xhr.setRequestHeader(name, value) jpayne@7: jpayne@7: js_xhr.send(to_js(request.body)) jpayne@7: jpayne@7: headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders())) jpayne@7: jpayne@7: if not is_in_browser_main_thread(): jpayne@7: body = js_xhr.response.to_py().tobytes() jpayne@7: else: jpayne@7: body = js_xhr.response.encode("ISO-8859-15") jpayne@7: return EmscriptenResponse( jpayne@7: status_code=js_xhr.status, headers=headers, body=body, request=request jpayne@7: ) jpayne@7: except JsException as err: jpayne@7: if err.name == "TimeoutError": jpayne@7: raise _TimeoutError(err.message, request=request) jpayne@7: elif err.name == "NetworkError": jpayne@7: raise _RequestError(err.message, request=request) jpayne@7: else: jpayne@7: # general http error jpayne@7: raise _RequestError(err.message, request=request) jpayne@7: jpayne@7: jpayne@7: def streaming_ready() -> bool | None: jpayne@7: if _fetcher: jpayne@7: return _fetcher.streaming_ready jpayne@7: else: jpayne@7: return None # no fetcher, return None to signify that jpayne@7: jpayne@7: jpayne@7: async def wait_for_streaming_ready() -> bool: jpayne@7: if _fetcher: jpayne@7: await _fetcher.js_worker_ready_promise jpayne@7: return True jpayne@7: else: jpayne@7: return False