jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import json as _json jpayne@7: import logging jpayne@7: import typing jpayne@7: from contextlib import contextmanager jpayne@7: from dataclasses import dataclass jpayne@7: from http.client import HTTPException as HTTPException jpayne@7: from io import BytesIO, IOBase jpayne@7: jpayne@7: from ...exceptions import InvalidHeader, TimeoutError jpayne@7: from ...response import BaseHTTPResponse jpayne@7: from ...util.retry import Retry jpayne@7: from .request import EmscriptenRequest jpayne@7: jpayne@7: if typing.TYPE_CHECKING: jpayne@7: from ..._base_connection import BaseHTTPConnection, BaseHTTPSConnection jpayne@7: jpayne@7: log = logging.getLogger(__name__) jpayne@7: jpayne@7: jpayne@7: @dataclass jpayne@7: class EmscriptenResponse: jpayne@7: status_code: int jpayne@7: headers: dict[str, str] jpayne@7: body: IOBase | bytes jpayne@7: request: EmscriptenRequest jpayne@7: jpayne@7: jpayne@7: class EmscriptenHttpResponseWrapper(BaseHTTPResponse): jpayne@7: def __init__( jpayne@7: self, jpayne@7: internal_response: EmscriptenResponse, jpayne@7: url: str | None = None, jpayne@7: connection: BaseHTTPConnection | BaseHTTPSConnection | None = None, jpayne@7: ): jpayne@7: self._pool = None # set by pool class jpayne@7: self._body = None jpayne@7: self._response = internal_response jpayne@7: self._url = url jpayne@7: self._connection = connection jpayne@7: self._closed = False jpayne@7: super().__init__( jpayne@7: headers=internal_response.headers, jpayne@7: status=internal_response.status_code, jpayne@7: request_url=url, jpayne@7: version=0, jpayne@7: reason="", jpayne@7: decode_content=True, jpayne@7: ) jpayne@7: self.length_remaining = self._init_length(self._response.request.method) jpayne@7: self.length_is_certain = False jpayne@7: jpayne@7: @property jpayne@7: def url(self) -> str | None: jpayne@7: return self._url jpayne@7: jpayne@7: @url.setter jpayne@7: def url(self, url: str | None) -> None: jpayne@7: self._url = url jpayne@7: jpayne@7: @property jpayne@7: def connection(self) -> BaseHTTPConnection | BaseHTTPSConnection | None: jpayne@7: return self._connection jpayne@7: jpayne@7: @property jpayne@7: def retries(self) -> Retry | None: jpayne@7: return self._retries jpayne@7: jpayne@7: @retries.setter jpayne@7: def retries(self, retries: Retry | None) -> None: jpayne@7: # Override the request_url if retries has a redirect location. jpayne@7: self._retries = retries jpayne@7: jpayne@7: def stream( jpayne@7: self, amt: int | None = 2**16, decode_content: bool | None = None jpayne@7: ) -> typing.Generator[bytes, None, None]: jpayne@7: """ jpayne@7: A generator wrapper for the read() method. A call will block until jpayne@7: ``amt`` bytes have been read from the connection or until the jpayne@7: connection is closed. jpayne@7: jpayne@7: :param amt: jpayne@7: How much of the content to read. The generator will return up to jpayne@7: much data per iteration, but may return less. This is particularly jpayne@7: likely when using compressed data. However, the empty string will jpayne@7: never be returned. jpayne@7: jpayne@7: :param decode_content: jpayne@7: If True, will attempt to decode the body based on the jpayne@7: 'content-encoding' header. jpayne@7: """ jpayne@7: while True: jpayne@7: data = self.read(amt=amt, decode_content=decode_content) jpayne@7: jpayne@7: if data: jpayne@7: yield data jpayne@7: else: jpayne@7: break jpayne@7: jpayne@7: def _init_length(self, request_method: str | None) -> int | None: jpayne@7: length: int | None jpayne@7: content_length: str | None = self.headers.get("content-length") jpayne@7: jpayne@7: if content_length is not None: jpayne@7: try: jpayne@7: # RFC 7230 section 3.3.2 specifies multiple content lengths can jpayne@7: # be sent in a single Content-Length header jpayne@7: # (e.g. Content-Length: 42, 42). This line ensures the values jpayne@7: # are all valid ints and that as long as the `set` length is 1, jpayne@7: # all values are the same. Otherwise, the header is invalid. jpayne@7: lengths = {int(val) for val in content_length.split(",")} jpayne@7: if len(lengths) > 1: jpayne@7: raise InvalidHeader( jpayne@7: "Content-Length contained multiple " jpayne@7: "unmatching values (%s)" % content_length jpayne@7: ) jpayne@7: length = lengths.pop() jpayne@7: except ValueError: jpayne@7: length = None jpayne@7: else: jpayne@7: if length < 0: jpayne@7: length = None jpayne@7: jpayne@7: else: # if content_length is None jpayne@7: length = None jpayne@7: jpayne@7: # Check for responses that shouldn't include a body jpayne@7: if ( jpayne@7: self.status in (204, 304) jpayne@7: or 100 <= self.status < 200 jpayne@7: or request_method == "HEAD" jpayne@7: ): jpayne@7: length = 0 jpayne@7: jpayne@7: return length jpayne@7: jpayne@7: def read( jpayne@7: self, jpayne@7: amt: int | None = None, jpayne@7: decode_content: bool | None = None, # ignored because browser decodes always jpayne@7: cache_content: bool = False, jpayne@7: ) -> bytes: jpayne@7: if ( jpayne@7: self._closed jpayne@7: or self._response is None jpayne@7: or (isinstance(self._response.body, IOBase) and self._response.body.closed) jpayne@7: ): jpayne@7: return b"" jpayne@7: jpayne@7: with self._error_catcher(): jpayne@7: # body has been preloaded as a string by XmlHttpRequest jpayne@7: if not isinstance(self._response.body, IOBase): jpayne@7: self.length_remaining = len(self._response.body) jpayne@7: self.length_is_certain = True jpayne@7: # wrap body in IOStream jpayne@7: self._response.body = BytesIO(self._response.body) jpayne@7: if amt is not None: jpayne@7: # don't cache partial content jpayne@7: cache_content = False jpayne@7: data = self._response.body.read(amt) jpayne@7: if self.length_remaining is not None: jpayne@7: self.length_remaining = max(self.length_remaining - len(data), 0) jpayne@7: if (self.length_is_certain and self.length_remaining == 0) or len( jpayne@7: data jpayne@7: ) < amt: jpayne@7: # definitely finished reading, close response stream jpayne@7: self._response.body.close() jpayne@7: return typing.cast(bytes, data) jpayne@7: else: # read all we can (and cache it) jpayne@7: data = self._response.body.read() jpayne@7: if cache_content: jpayne@7: self._body = data jpayne@7: if self.length_remaining is not None: jpayne@7: self.length_remaining = max(self.length_remaining - len(data), 0) jpayne@7: if len(data) == 0 or ( jpayne@7: self.length_is_certain and self.length_remaining == 0 jpayne@7: ): jpayne@7: # definitely finished reading, close response stream jpayne@7: self._response.body.close() jpayne@7: return typing.cast(bytes, data) jpayne@7: jpayne@7: def read_chunked( jpayne@7: self, jpayne@7: amt: int | None = None, jpayne@7: decode_content: bool | None = None, jpayne@7: ) -> typing.Generator[bytes, None, None]: jpayne@7: # chunked is handled by browser jpayne@7: while True: jpayne@7: bytes = self.read(amt, decode_content) jpayne@7: if not bytes: jpayne@7: break jpayne@7: yield bytes jpayne@7: jpayne@7: def release_conn(self) -> None: jpayne@7: if not self._pool or not self._connection: jpayne@7: return None jpayne@7: jpayne@7: self._pool._put_conn(self._connection) jpayne@7: self._connection = None jpayne@7: jpayne@7: def drain_conn(self) -> None: jpayne@7: self.close() jpayne@7: jpayne@7: @property jpayne@7: def data(self) -> bytes: jpayne@7: if self._body: jpayne@7: return self._body jpayne@7: else: jpayne@7: return self.read(cache_content=True) jpayne@7: jpayne@7: def json(self) -> typing.Any: jpayne@7: """ jpayne@7: Parses the body of the HTTP response as JSON. jpayne@7: jpayne@7: To use a custom JSON decoder pass the result of :attr:`HTTPResponse.data` to the decoder. jpayne@7: jpayne@7: This method can raise either `UnicodeDecodeError` or `json.JSONDecodeError`. jpayne@7: jpayne@7: Read more :ref:`here `. jpayne@7: """ jpayne@7: data = self.data.decode("utf-8") jpayne@7: return _json.loads(data) jpayne@7: jpayne@7: def close(self) -> None: jpayne@7: if not self._closed: jpayne@7: if isinstance(self._response.body, IOBase): jpayne@7: self._response.body.close() jpayne@7: if self._connection: jpayne@7: self._connection.close() jpayne@7: self._connection = None jpayne@7: self._closed = True jpayne@7: jpayne@7: @contextmanager jpayne@7: def _error_catcher(self) -> typing.Generator[None, None, None]: jpayne@7: """ jpayne@7: Catch Emscripten specific exceptions thrown by fetch.py, jpayne@7: instead re-raising urllib3 variants, so that low-level exceptions jpayne@7: are not leaked in the high-level api. jpayne@7: jpayne@7: On exit, release the connection back to the pool. jpayne@7: """ jpayne@7: from .fetch import _RequestError, _TimeoutError # avoid circular import jpayne@7: jpayne@7: clean_exit = False jpayne@7: jpayne@7: try: jpayne@7: yield jpayne@7: # If no exception is thrown, we should avoid cleaning up jpayne@7: # unnecessarily. jpayne@7: clean_exit = True jpayne@7: except _TimeoutError as e: jpayne@7: raise TimeoutError(str(e)) jpayne@7: except _RequestError as e: jpayne@7: raise HTTPException(str(e)) jpayne@7: finally: jpayne@7: # If we didn't terminate cleanly, we need to throw away our jpayne@7: # connection. jpayne@7: if not clean_exit: jpayne@7: # The response may not be closed but we're not going to use it jpayne@7: # anymore so close it now jpayne@7: if ( jpayne@7: isinstance(self._response.body, IOBase) jpayne@7: and not self._response.body.closed jpayne@7: ): jpayne@7: self._response.body.close() jpayne@7: # release the connection back to the pool jpayne@7: self.release_conn() jpayne@7: else: jpayne@7: # If we have read everything from the response stream, jpayne@7: # return the connection back to the pool. jpayne@7: if ( jpayne@7: isinstance(self._response.body, IOBase) jpayne@7: and self._response.body.closed jpayne@7: ): jpayne@7: self.release_conn()