comparison urllib3/contrib/emscripten/response.py @ 7:5eb2d5e3bf22

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Sun, 05 May 2024 23:32:17 -0400
parents
children
comparison
equal deleted inserted replaced
6:b2745907b1eb 7:5eb2d5e3bf22
1 from __future__ import annotations
2
3 import json as _json
4 import logging
5 import typing
6 from contextlib import contextmanager
7 from dataclasses import dataclass
8 from http.client import HTTPException as HTTPException
9 from io import BytesIO, IOBase
10
11 from ...exceptions import InvalidHeader, TimeoutError
12 from ...response import BaseHTTPResponse
13 from ...util.retry import Retry
14 from .request import EmscriptenRequest
15
16 if typing.TYPE_CHECKING:
17 from ..._base_connection import BaseHTTPConnection, BaseHTTPSConnection
18
19 log = logging.getLogger(__name__)
20
21
22 @dataclass
23 class EmscriptenResponse:
24 status_code: int
25 headers: dict[str, str]
26 body: IOBase | bytes
27 request: EmscriptenRequest
28
29
30 class EmscriptenHttpResponseWrapper(BaseHTTPResponse):
31 def __init__(
32 self,
33 internal_response: EmscriptenResponse,
34 url: str | None = None,
35 connection: BaseHTTPConnection | BaseHTTPSConnection | None = None,
36 ):
37 self._pool = None # set by pool class
38 self._body = None
39 self._response = internal_response
40 self._url = url
41 self._connection = connection
42 self._closed = False
43 super().__init__(
44 headers=internal_response.headers,
45 status=internal_response.status_code,
46 request_url=url,
47 version=0,
48 reason="",
49 decode_content=True,
50 )
51 self.length_remaining = self._init_length(self._response.request.method)
52 self.length_is_certain = False
53
54 @property
55 def url(self) -> str | None:
56 return self._url
57
58 @url.setter
59 def url(self, url: str | None) -> None:
60 self._url = url
61
62 @property
63 def connection(self) -> BaseHTTPConnection | BaseHTTPSConnection | None:
64 return self._connection
65
66 @property
67 def retries(self) -> Retry | None:
68 return self._retries
69
70 @retries.setter
71 def retries(self, retries: Retry | None) -> None:
72 # Override the request_url if retries has a redirect location.
73 self._retries = retries
74
75 def stream(
76 self, amt: int | None = 2**16, decode_content: bool | None = None
77 ) -> typing.Generator[bytes, None, None]:
78 """
79 A generator wrapper for the read() method. A call will block until
80 ``amt`` bytes have been read from the connection or until the
81 connection is closed.
82
83 :param amt:
84 How much of the content to read. The generator will return up to
85 much data per iteration, but may return less. This is particularly
86 likely when using compressed data. However, the empty string will
87 never be returned.
88
89 :param decode_content:
90 If True, will attempt to decode the body based on the
91 'content-encoding' header.
92 """
93 while True:
94 data = self.read(amt=amt, decode_content=decode_content)
95
96 if data:
97 yield data
98 else:
99 break
100
101 def _init_length(self, request_method: str | None) -> int | None:
102 length: int | None
103 content_length: str | None = self.headers.get("content-length")
104
105 if content_length is not None:
106 try:
107 # RFC 7230 section 3.3.2 specifies multiple content lengths can
108 # be sent in a single Content-Length header
109 # (e.g. Content-Length: 42, 42). This line ensures the values
110 # are all valid ints and that as long as the `set` length is 1,
111 # all values are the same. Otherwise, the header is invalid.
112 lengths = {int(val) for val in content_length.split(",")}
113 if len(lengths) > 1:
114 raise InvalidHeader(
115 "Content-Length contained multiple "
116 "unmatching values (%s)" % content_length
117 )
118 length = lengths.pop()
119 except ValueError:
120 length = None
121 else:
122 if length < 0:
123 length = None
124
125 else: # if content_length is None
126 length = None
127
128 # Check for responses that shouldn't include a body
129 if (
130 self.status in (204, 304)
131 or 100 <= self.status < 200
132 or request_method == "HEAD"
133 ):
134 length = 0
135
136 return length
137
138 def read(
139 self,
140 amt: int | None = None,
141 decode_content: bool | None = None, # ignored because browser decodes always
142 cache_content: bool = False,
143 ) -> bytes:
144 if (
145 self._closed
146 or self._response is None
147 or (isinstance(self._response.body, IOBase) and self._response.body.closed)
148 ):
149 return b""
150
151 with self._error_catcher():
152 # body has been preloaded as a string by XmlHttpRequest
153 if not isinstance(self._response.body, IOBase):
154 self.length_remaining = len(self._response.body)
155 self.length_is_certain = True
156 # wrap body in IOStream
157 self._response.body = BytesIO(self._response.body)
158 if amt is not None:
159 # don't cache partial content
160 cache_content = False
161 data = self._response.body.read(amt)
162 if self.length_remaining is not None:
163 self.length_remaining = max(self.length_remaining - len(data), 0)
164 if (self.length_is_certain and self.length_remaining == 0) or len(
165 data
166 ) < amt:
167 # definitely finished reading, close response stream
168 self._response.body.close()
169 return typing.cast(bytes, data)
170 else: # read all we can (and cache it)
171 data = self._response.body.read()
172 if cache_content:
173 self._body = data
174 if self.length_remaining is not None:
175 self.length_remaining = max(self.length_remaining - len(data), 0)
176 if len(data) == 0 or (
177 self.length_is_certain and self.length_remaining == 0
178 ):
179 # definitely finished reading, close response stream
180 self._response.body.close()
181 return typing.cast(bytes, data)
182
183 def read_chunked(
184 self,
185 amt: int | None = None,
186 decode_content: bool | None = None,
187 ) -> typing.Generator[bytes, None, None]:
188 # chunked is handled by browser
189 while True:
190 bytes = self.read(amt, decode_content)
191 if not bytes:
192 break
193 yield bytes
194
195 def release_conn(self) -> None:
196 if not self._pool or not self._connection:
197 return None
198
199 self._pool._put_conn(self._connection)
200 self._connection = None
201
202 def drain_conn(self) -> None:
203 self.close()
204
205 @property
206 def data(self) -> bytes:
207 if self._body:
208 return self._body
209 else:
210 return self.read(cache_content=True)
211
212 def json(self) -> typing.Any:
213 """
214 Parses the body of the HTTP response as JSON.
215
216 To use a custom JSON decoder pass the result of :attr:`HTTPResponse.data` to the decoder.
217
218 This method can raise either `UnicodeDecodeError` or `json.JSONDecodeError`.
219
220 Read more :ref:`here <json>`.
221 """
222 data = self.data.decode("utf-8")
223 return _json.loads(data)
224
225 def close(self) -> None:
226 if not self._closed:
227 if isinstance(self._response.body, IOBase):
228 self._response.body.close()
229 if self._connection:
230 self._connection.close()
231 self._connection = None
232 self._closed = True
233
234 @contextmanager
235 def _error_catcher(self) -> typing.Generator[None, None, None]:
236 """
237 Catch Emscripten specific exceptions thrown by fetch.py,
238 instead re-raising urllib3 variants, so that low-level exceptions
239 are not leaked in the high-level api.
240
241 On exit, release the connection back to the pool.
242 """
243 from .fetch import _RequestError, _TimeoutError # avoid circular import
244
245 clean_exit = False
246
247 try:
248 yield
249 # If no exception is thrown, we should avoid cleaning up
250 # unnecessarily.
251 clean_exit = True
252 except _TimeoutError as e:
253 raise TimeoutError(str(e))
254 except _RequestError as e:
255 raise HTTPException(str(e))
256 finally:
257 # If we didn't terminate cleanly, we need to throw away our
258 # connection.
259 if not clean_exit:
260 # The response may not be closed but we're not going to use it
261 # anymore so close it now
262 if (
263 isinstance(self._response.body, IOBase)
264 and not self._response.body.closed
265 ):
266 self._response.body.close()
267 # release the connection back to the pool
268 self.release_conn()
269 else:
270 # If we have read everything from the response stream,
271 # return the connection back to the pool.
272 if (
273 isinstance(self._response.body, IOBase)
274 and self._response.body.closed
275 ):
276 self.release_conn()