Mercurial > repos > jpayne > bioproject_to_srr_2
comparison urllib3/http2.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 from __future__ import annotations | |
2 | |
3 import threading | |
4 import types | |
5 import typing | |
6 | |
7 import h2.config # type: ignore[import-untyped] | |
8 import h2.connection # type: ignore[import-untyped] | |
9 import h2.events # type: ignore[import-untyped] | |
10 | |
11 import urllib3.connection | |
12 import urllib3.util.ssl_ | |
13 from urllib3.response import BaseHTTPResponse | |
14 | |
15 from ._collections import HTTPHeaderDict | |
16 from .connection import HTTPSConnection | |
17 from .connectionpool import HTTPSConnectionPool | |
18 | |
19 orig_HTTPSConnection = HTTPSConnection | |
20 | |
21 T = typing.TypeVar("T") | |
22 | |
23 | |
24 class _LockedObject(typing.Generic[T]): | |
25 """ | |
26 A wrapper class that hides a specific object behind a lock. | |
27 | |
28 The goal here is to provide a simple way to protect access to an object | |
29 that cannot safely be simultaneously accessed from multiple threads. The | |
30 intended use of this class is simple: take hold of it with a context | |
31 manager, which returns the protected object. | |
32 """ | |
33 | |
34 def __init__(self, obj: T): | |
35 self.lock = threading.RLock() | |
36 self._obj = obj | |
37 | |
38 def __enter__(self) -> T: | |
39 self.lock.acquire() | |
40 return self._obj | |
41 | |
42 def __exit__( | |
43 self, | |
44 exc_type: type[BaseException] | None, | |
45 exc_val: BaseException | None, | |
46 exc_tb: types.TracebackType | None, | |
47 ) -> None: | |
48 self.lock.release() | |
49 | |
50 | |
51 class HTTP2Connection(HTTPSConnection): | |
52 def __init__( | |
53 self, host: str, port: int | None = None, **kwargs: typing.Any | |
54 ) -> None: | |
55 self._h2_conn = self._new_h2_conn() | |
56 self._h2_stream: int | None = None | |
57 self._h2_headers: list[tuple[bytes, bytes]] = [] | |
58 | |
59 if "proxy" in kwargs or "proxy_config" in kwargs: # Defensive: | |
60 raise NotImplementedError("Proxies aren't supported with HTTP/2") | |
61 | |
62 super().__init__(host, port, **kwargs) | |
63 | |
64 def _new_h2_conn(self) -> _LockedObject[h2.connection.H2Connection]: | |
65 config = h2.config.H2Configuration(client_side=True) | |
66 return _LockedObject(h2.connection.H2Connection(config=config)) | |
67 | |
68 def connect(self) -> None: | |
69 super().connect() | |
70 | |
71 with self._h2_conn as h2_conn: | |
72 h2_conn.initiate_connection() | |
73 self.sock.sendall(h2_conn.data_to_send()) | |
74 | |
75 def putrequest( | |
76 self, | |
77 method: str, | |
78 url: str, | |
79 skip_host: bool = False, | |
80 skip_accept_encoding: bool = False, | |
81 ) -> None: | |
82 with self._h2_conn as h2_conn: | |
83 self._request_url = url | |
84 self._h2_stream = h2_conn.get_next_available_stream_id() | |
85 | |
86 if ":" in self.host: | |
87 authority = f"[{self.host}]:{self.port or 443}" | |
88 else: | |
89 authority = f"{self.host}:{self.port or 443}" | |
90 | |
91 self._h2_headers.extend( | |
92 ( | |
93 (b":scheme", b"https"), | |
94 (b":method", method.encode()), | |
95 (b":authority", authority.encode()), | |
96 (b":path", url.encode()), | |
97 ) | |
98 ) | |
99 | |
100 def putheader(self, header: str, *values: str) -> None: # type: ignore[override] | |
101 for value in values: | |
102 self._h2_headers.append( | |
103 (header.encode("utf-8").lower(), value.encode("utf-8")) | |
104 ) | |
105 | |
106 def endheaders(self) -> None: # type: ignore[override] | |
107 with self._h2_conn as h2_conn: | |
108 h2_conn.send_headers( | |
109 stream_id=self._h2_stream, | |
110 headers=self._h2_headers, | |
111 end_stream=True, | |
112 ) | |
113 if data_to_send := h2_conn.data_to_send(): | |
114 self.sock.sendall(data_to_send) | |
115 | |
116 def send(self, data: bytes) -> None: # type: ignore[override] # Defensive: | |
117 if not data: | |
118 return | |
119 raise NotImplementedError("Sending data isn't supported yet") | |
120 | |
121 def getresponse( # type: ignore[override] | |
122 self, | |
123 ) -> HTTP2Response: | |
124 status = None | |
125 data = bytearray() | |
126 with self._h2_conn as h2_conn: | |
127 end_stream = False | |
128 while not end_stream: | |
129 # TODO: Arbitrary read value. | |
130 if received_data := self.sock.recv(65535): | |
131 events = h2_conn.receive_data(received_data) | |
132 for event in events: | |
133 if isinstance(event, h2.events.ResponseReceived): | |
134 headers = HTTPHeaderDict() | |
135 for header, value in event.headers: | |
136 if header == b":status": | |
137 status = int(value.decode()) | |
138 else: | |
139 headers.add( | |
140 header.decode("ascii"), value.decode("ascii") | |
141 ) | |
142 | |
143 elif isinstance(event, h2.events.DataReceived): | |
144 data += event.data | |
145 h2_conn.acknowledge_received_data( | |
146 event.flow_controlled_length, event.stream_id | |
147 ) | |
148 | |
149 elif isinstance(event, h2.events.StreamEnded): | |
150 end_stream = True | |
151 | |
152 if data_to_send := h2_conn.data_to_send(): | |
153 self.sock.sendall(data_to_send) | |
154 | |
155 # We always close to not have to handle connection management. | |
156 self.close() | |
157 | |
158 assert status is not None | |
159 return HTTP2Response( | |
160 status=status, | |
161 headers=headers, | |
162 request_url=self._request_url, | |
163 data=bytes(data), | |
164 ) | |
165 | |
166 def close(self) -> None: | |
167 with self._h2_conn as h2_conn: | |
168 try: | |
169 h2_conn.close_connection() | |
170 if data := h2_conn.data_to_send(): | |
171 self.sock.sendall(data) | |
172 except Exception: | |
173 pass | |
174 | |
175 # Reset all our HTTP/2 connection state. | |
176 self._h2_conn = self._new_h2_conn() | |
177 self._h2_stream = None | |
178 self._h2_headers = [] | |
179 | |
180 super().close() | |
181 | |
182 | |
183 class HTTP2Response(BaseHTTPResponse): | |
184 # TODO: This is a woefully incomplete response object, but works for non-streaming. | |
185 def __init__( | |
186 self, | |
187 status: int, | |
188 headers: HTTPHeaderDict, | |
189 request_url: str, | |
190 data: bytes, | |
191 decode_content: bool = False, # TODO: support decoding | |
192 ) -> None: | |
193 super().__init__( | |
194 status=status, | |
195 headers=headers, | |
196 # Following CPython, we map HTTP versions to major * 10 + minor integers | |
197 version=20, | |
198 # No reason phrase in HTTP/2 | |
199 reason=None, | |
200 decode_content=decode_content, | |
201 request_url=request_url, | |
202 ) | |
203 self._data = data | |
204 self.length_remaining = 0 | |
205 | |
206 @property | |
207 def data(self) -> bytes: | |
208 return self._data | |
209 | |
210 def get_redirect_location(self) -> None: | |
211 return None | |
212 | |
213 def close(self) -> None: | |
214 pass | |
215 | |
216 | |
217 def inject_into_urllib3() -> None: | |
218 HTTPSConnectionPool.ConnectionCls = HTTP2Connection | |
219 urllib3.connection.HTTPSConnection = HTTP2Connection # type: ignore[misc] | |
220 | |
221 # TODO: Offer 'http/1.1' as well, but for testing purposes this is handy. | |
222 urllib3.util.ssl_.ALPN_PROTOCOLS = ["h2"] | |
223 | |
224 | |
225 def extract_from_urllib3() -> None: | |
226 HTTPSConnectionPool.ConnectionCls = orig_HTTPSConnection | |
227 urllib3.connection.HTTPSConnection = orig_HTTPSConnection # type: ignore[misc] | |
228 | |
229 urllib3.util.ssl_.ALPN_PROTOCOLS = ["http/1.1"] |