annotate urllib3/contrib/emscripten/response.py @ 13:f550715358f1

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Mon, 20 May 2024 00:56:52 -0400
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 from __future__ import annotations
jpayne@7 2
jpayne@7 3 import json as _json
jpayne@7 4 import logging
jpayne@7 5 import typing
jpayne@7 6 from contextlib import contextmanager
jpayne@7 7 from dataclasses import dataclass
jpayne@7 8 from http.client import HTTPException as HTTPException
jpayne@7 9 from io import BytesIO, IOBase
jpayne@7 10
jpayne@7 11 from ...exceptions import InvalidHeader, TimeoutError
jpayne@7 12 from ...response import BaseHTTPResponse
jpayne@7 13 from ...util.retry import Retry
jpayne@7 14 from .request import EmscriptenRequest
jpayne@7 15
jpayne@7 16 if typing.TYPE_CHECKING:
jpayne@7 17 from ..._base_connection import BaseHTTPConnection, BaseHTTPSConnection
jpayne@7 18
jpayne@7 19 log = logging.getLogger(__name__)
jpayne@7 20
jpayne@7 21
jpayne@7 22 @dataclass
jpayne@7 23 class EmscriptenResponse:
jpayne@7 24 status_code: int
jpayne@7 25 headers: dict[str, str]
jpayne@7 26 body: IOBase | bytes
jpayne@7 27 request: EmscriptenRequest
jpayne@7 28
jpayne@7 29
jpayne@7 30 class EmscriptenHttpResponseWrapper(BaseHTTPResponse):
jpayne@7 31 def __init__(
jpayne@7 32 self,
jpayne@7 33 internal_response: EmscriptenResponse,
jpayne@7 34 url: str | None = None,
jpayne@7 35 connection: BaseHTTPConnection | BaseHTTPSConnection | None = None,
jpayne@7 36 ):
jpayne@7 37 self._pool = None # set by pool class
jpayne@7 38 self._body = None
jpayne@7 39 self._response = internal_response
jpayne@7 40 self._url = url
jpayne@7 41 self._connection = connection
jpayne@7 42 self._closed = False
jpayne@7 43 super().__init__(
jpayne@7 44 headers=internal_response.headers,
jpayne@7 45 status=internal_response.status_code,
jpayne@7 46 request_url=url,
jpayne@7 47 version=0,
jpayne@7 48 reason="",
jpayne@7 49 decode_content=True,
jpayne@7 50 )
jpayne@7 51 self.length_remaining = self._init_length(self._response.request.method)
jpayne@7 52 self.length_is_certain = False
jpayne@7 53
jpayne@7 54 @property
jpayne@7 55 def url(self) -> str | None:
jpayne@7 56 return self._url
jpayne@7 57
jpayne@7 58 @url.setter
jpayne@7 59 def url(self, url: str | None) -> None:
jpayne@7 60 self._url = url
jpayne@7 61
jpayne@7 62 @property
jpayne@7 63 def connection(self) -> BaseHTTPConnection | BaseHTTPSConnection | None:
jpayne@7 64 return self._connection
jpayne@7 65
jpayne@7 66 @property
jpayne@7 67 def retries(self) -> Retry | None:
jpayne@7 68 return self._retries
jpayne@7 69
jpayne@7 70 @retries.setter
jpayne@7 71 def retries(self, retries: Retry | None) -> None:
jpayne@7 72 # Override the request_url if retries has a redirect location.
jpayne@7 73 self._retries = retries
jpayne@7 74
jpayne@7 75 def stream(
jpayne@7 76 self, amt: int | None = 2**16, decode_content: bool | None = None
jpayne@7 77 ) -> typing.Generator[bytes, None, None]:
jpayne@7 78 """
jpayne@7 79 A generator wrapper for the read() method. A call will block until
jpayne@7 80 ``amt`` bytes have been read from the connection or until the
jpayne@7 81 connection is closed.
jpayne@7 82
jpayne@7 83 :param amt:
jpayne@7 84 How much of the content to read. The generator will return up to
jpayne@7 85 much data per iteration, but may return less. This is particularly
jpayne@7 86 likely when using compressed data. However, the empty string will
jpayne@7 87 never be returned.
jpayne@7 88
jpayne@7 89 :param decode_content:
jpayne@7 90 If True, will attempt to decode the body based on the
jpayne@7 91 'content-encoding' header.
jpayne@7 92 """
jpayne@7 93 while True:
jpayne@7 94 data = self.read(amt=amt, decode_content=decode_content)
jpayne@7 95
jpayne@7 96 if data:
jpayne@7 97 yield data
jpayne@7 98 else:
jpayne@7 99 break
jpayne@7 100
jpayne@7 101 def _init_length(self, request_method: str | None) -> int | None:
jpayne@7 102 length: int | None
jpayne@7 103 content_length: str | None = self.headers.get("content-length")
jpayne@7 104
jpayne@7 105 if content_length is not None:
jpayne@7 106 try:
jpayne@7 107 # RFC 7230 section 3.3.2 specifies multiple content lengths can
jpayne@7 108 # be sent in a single Content-Length header
jpayne@7 109 # (e.g. Content-Length: 42, 42). This line ensures the values
jpayne@7 110 # are all valid ints and that as long as the `set` length is 1,
jpayne@7 111 # all values are the same. Otherwise, the header is invalid.
jpayne@7 112 lengths = {int(val) for val in content_length.split(",")}
jpayne@7 113 if len(lengths) > 1:
jpayne@7 114 raise InvalidHeader(
jpayne@7 115 "Content-Length contained multiple "
jpayne@7 116 "unmatching values (%s)" % content_length
jpayne@7 117 )
jpayne@7 118 length = lengths.pop()
jpayne@7 119 except ValueError:
jpayne@7 120 length = None
jpayne@7 121 else:
jpayne@7 122 if length < 0:
jpayne@7 123 length = None
jpayne@7 124
jpayne@7 125 else: # if content_length is None
jpayne@7 126 length = None
jpayne@7 127
jpayne@7 128 # Check for responses that shouldn't include a body
jpayne@7 129 if (
jpayne@7 130 self.status in (204, 304)
jpayne@7 131 or 100 <= self.status < 200
jpayne@7 132 or request_method == "HEAD"
jpayne@7 133 ):
jpayne@7 134 length = 0
jpayne@7 135
jpayne@7 136 return length
jpayne@7 137
jpayne@7 138 def read(
jpayne@7 139 self,
jpayne@7 140 amt: int | None = None,
jpayne@7 141 decode_content: bool | None = None, # ignored because browser decodes always
jpayne@7 142 cache_content: bool = False,
jpayne@7 143 ) -> bytes:
jpayne@7 144 if (
jpayne@7 145 self._closed
jpayne@7 146 or self._response is None
jpayne@7 147 or (isinstance(self._response.body, IOBase) and self._response.body.closed)
jpayne@7 148 ):
jpayne@7 149 return b""
jpayne@7 150
jpayne@7 151 with self._error_catcher():
jpayne@7 152 # body has been preloaded as a string by XmlHttpRequest
jpayne@7 153 if not isinstance(self._response.body, IOBase):
jpayne@7 154 self.length_remaining = len(self._response.body)
jpayne@7 155 self.length_is_certain = True
jpayne@7 156 # wrap body in IOStream
jpayne@7 157 self._response.body = BytesIO(self._response.body)
jpayne@7 158 if amt is not None:
jpayne@7 159 # don't cache partial content
jpayne@7 160 cache_content = False
jpayne@7 161 data = self._response.body.read(amt)
jpayne@7 162 if self.length_remaining is not None:
jpayne@7 163 self.length_remaining = max(self.length_remaining - len(data), 0)
jpayne@7 164 if (self.length_is_certain and self.length_remaining == 0) or len(
jpayne@7 165 data
jpayne@7 166 ) < amt:
jpayne@7 167 # definitely finished reading, close response stream
jpayne@7 168 self._response.body.close()
jpayne@7 169 return typing.cast(bytes, data)
jpayne@7 170 else: # read all we can (and cache it)
jpayne@7 171 data = self._response.body.read()
jpayne@7 172 if cache_content:
jpayne@7 173 self._body = data
jpayne@7 174 if self.length_remaining is not None:
jpayne@7 175 self.length_remaining = max(self.length_remaining - len(data), 0)
jpayne@7 176 if len(data) == 0 or (
jpayne@7 177 self.length_is_certain and self.length_remaining == 0
jpayne@7 178 ):
jpayne@7 179 # definitely finished reading, close response stream
jpayne@7 180 self._response.body.close()
jpayne@7 181 return typing.cast(bytes, data)
jpayne@7 182
jpayne@7 183 def read_chunked(
jpayne@7 184 self,
jpayne@7 185 amt: int | None = None,
jpayne@7 186 decode_content: bool | None = None,
jpayne@7 187 ) -> typing.Generator[bytes, None, None]:
jpayne@7 188 # chunked is handled by browser
jpayne@7 189 while True:
jpayne@7 190 bytes = self.read(amt, decode_content)
jpayne@7 191 if not bytes:
jpayne@7 192 break
jpayne@7 193 yield bytes
jpayne@7 194
jpayne@7 195 def release_conn(self) -> None:
jpayne@7 196 if not self._pool or not self._connection:
jpayne@7 197 return None
jpayne@7 198
jpayne@7 199 self._pool._put_conn(self._connection)
jpayne@7 200 self._connection = None
jpayne@7 201
jpayne@7 202 def drain_conn(self) -> None:
jpayne@7 203 self.close()
jpayne@7 204
jpayne@7 205 @property
jpayne@7 206 def data(self) -> bytes:
jpayne@7 207 if self._body:
jpayne@7 208 return self._body
jpayne@7 209 else:
jpayne@7 210 return self.read(cache_content=True)
jpayne@7 211
jpayne@7 212 def json(self) -> typing.Any:
jpayne@7 213 """
jpayne@7 214 Parses the body of the HTTP response as JSON.
jpayne@7 215
jpayne@7 216 To use a custom JSON decoder pass the result of :attr:`HTTPResponse.data` to the decoder.
jpayne@7 217
jpayne@7 218 This method can raise either `UnicodeDecodeError` or `json.JSONDecodeError`.
jpayne@7 219
jpayne@7 220 Read more :ref:`here <json>`.
jpayne@7 221 """
jpayne@7 222 data = self.data.decode("utf-8")
jpayne@7 223 return _json.loads(data)
jpayne@7 224
jpayne@7 225 def close(self) -> None:
jpayne@7 226 if not self._closed:
jpayne@7 227 if isinstance(self._response.body, IOBase):
jpayne@7 228 self._response.body.close()
jpayne@7 229 if self._connection:
jpayne@7 230 self._connection.close()
jpayne@7 231 self._connection = None
jpayne@7 232 self._closed = True
jpayne@7 233
jpayne@7 234 @contextmanager
jpayne@7 235 def _error_catcher(self) -> typing.Generator[None, None, None]:
jpayne@7 236 """
jpayne@7 237 Catch Emscripten specific exceptions thrown by fetch.py,
jpayne@7 238 instead re-raising urllib3 variants, so that low-level exceptions
jpayne@7 239 are not leaked in the high-level api.
jpayne@7 240
jpayne@7 241 On exit, release the connection back to the pool.
jpayne@7 242 """
jpayne@7 243 from .fetch import _RequestError, _TimeoutError # avoid circular import
jpayne@7 244
jpayne@7 245 clean_exit = False
jpayne@7 246
jpayne@7 247 try:
jpayne@7 248 yield
jpayne@7 249 # If no exception is thrown, we should avoid cleaning up
jpayne@7 250 # unnecessarily.
jpayne@7 251 clean_exit = True
jpayne@7 252 except _TimeoutError as e:
jpayne@7 253 raise TimeoutError(str(e))
jpayne@7 254 except _RequestError as e:
jpayne@7 255 raise HTTPException(str(e))
jpayne@7 256 finally:
jpayne@7 257 # If we didn't terminate cleanly, we need to throw away our
jpayne@7 258 # connection.
jpayne@7 259 if not clean_exit:
jpayne@7 260 # The response may not be closed but we're not going to use it
jpayne@7 261 # anymore so close it now
jpayne@7 262 if (
jpayne@7 263 isinstance(self._response.body, IOBase)
jpayne@7 264 and not self._response.body.closed
jpayne@7 265 ):
jpayne@7 266 self._response.body.close()
jpayne@7 267 # release the connection back to the pool
jpayne@7 268 self.release_conn()
jpayne@7 269 else:
jpayne@7 270 # If we have read everything from the response stream,
jpayne@7 271 # return the connection back to the pool.
jpayne@7 272 if (
jpayne@7 273 isinstance(self._response.body, IOBase)
jpayne@7 274 and self._response.body.closed
jpayne@7 275 ):
jpayne@7 276 self.release_conn()