comparison urllib3/http2.py @ 7:5eb2d5e3bf22

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Sun, 05 May 2024 23:32:17 -0400
parents
children
comparison
equal deleted inserted replaced
6:b2745907b1eb 7:5eb2d5e3bf22
1 from __future__ import annotations
2
3 import threading
4 import types
5 import typing
6
7 import h2.config # type: ignore[import-untyped]
8 import h2.connection # type: ignore[import-untyped]
9 import h2.events # type: ignore[import-untyped]
10
11 import urllib3.connection
12 import urllib3.util.ssl_
13 from urllib3.response import BaseHTTPResponse
14
15 from ._collections import HTTPHeaderDict
16 from .connection import HTTPSConnection
17 from .connectionpool import HTTPSConnectionPool
18
19 orig_HTTPSConnection = HTTPSConnection
20
21 T = typing.TypeVar("T")
22
23
24 class _LockedObject(typing.Generic[T]):
25 """
26 A wrapper class that hides a specific object behind a lock.
27
28 The goal here is to provide a simple way to protect access to an object
29 that cannot safely be simultaneously accessed from multiple threads. The
30 intended use of this class is simple: take hold of it with a context
31 manager, which returns the protected object.
32 """
33
34 def __init__(self, obj: T):
35 self.lock = threading.RLock()
36 self._obj = obj
37
38 def __enter__(self) -> T:
39 self.lock.acquire()
40 return self._obj
41
42 def __exit__(
43 self,
44 exc_type: type[BaseException] | None,
45 exc_val: BaseException | None,
46 exc_tb: types.TracebackType | None,
47 ) -> None:
48 self.lock.release()
49
50
51 class HTTP2Connection(HTTPSConnection):
52 def __init__(
53 self, host: str, port: int | None = None, **kwargs: typing.Any
54 ) -> None:
55 self._h2_conn = self._new_h2_conn()
56 self._h2_stream: int | None = None
57 self._h2_headers: list[tuple[bytes, bytes]] = []
58
59 if "proxy" in kwargs or "proxy_config" in kwargs: # Defensive:
60 raise NotImplementedError("Proxies aren't supported with HTTP/2")
61
62 super().__init__(host, port, **kwargs)
63
64 def _new_h2_conn(self) -> _LockedObject[h2.connection.H2Connection]:
65 config = h2.config.H2Configuration(client_side=True)
66 return _LockedObject(h2.connection.H2Connection(config=config))
67
68 def connect(self) -> None:
69 super().connect()
70
71 with self._h2_conn as h2_conn:
72 h2_conn.initiate_connection()
73 self.sock.sendall(h2_conn.data_to_send())
74
75 def putrequest(
76 self,
77 method: str,
78 url: str,
79 skip_host: bool = False,
80 skip_accept_encoding: bool = False,
81 ) -> None:
82 with self._h2_conn as h2_conn:
83 self._request_url = url
84 self._h2_stream = h2_conn.get_next_available_stream_id()
85
86 if ":" in self.host:
87 authority = f"[{self.host}]:{self.port or 443}"
88 else:
89 authority = f"{self.host}:{self.port or 443}"
90
91 self._h2_headers.extend(
92 (
93 (b":scheme", b"https"),
94 (b":method", method.encode()),
95 (b":authority", authority.encode()),
96 (b":path", url.encode()),
97 )
98 )
99
100 def putheader(self, header: str, *values: str) -> None: # type: ignore[override]
101 for value in values:
102 self._h2_headers.append(
103 (header.encode("utf-8").lower(), value.encode("utf-8"))
104 )
105
106 def endheaders(self) -> None: # type: ignore[override]
107 with self._h2_conn as h2_conn:
108 h2_conn.send_headers(
109 stream_id=self._h2_stream,
110 headers=self._h2_headers,
111 end_stream=True,
112 )
113 if data_to_send := h2_conn.data_to_send():
114 self.sock.sendall(data_to_send)
115
116 def send(self, data: bytes) -> None: # type: ignore[override] # Defensive:
117 if not data:
118 return
119 raise NotImplementedError("Sending data isn't supported yet")
120
121 def getresponse( # type: ignore[override]
122 self,
123 ) -> HTTP2Response:
124 status = None
125 data = bytearray()
126 with self._h2_conn as h2_conn:
127 end_stream = False
128 while not end_stream:
129 # TODO: Arbitrary read value.
130 if received_data := self.sock.recv(65535):
131 events = h2_conn.receive_data(received_data)
132 for event in events:
133 if isinstance(event, h2.events.ResponseReceived):
134 headers = HTTPHeaderDict()
135 for header, value in event.headers:
136 if header == b":status":
137 status = int(value.decode())
138 else:
139 headers.add(
140 header.decode("ascii"), value.decode("ascii")
141 )
142
143 elif isinstance(event, h2.events.DataReceived):
144 data += event.data
145 h2_conn.acknowledge_received_data(
146 event.flow_controlled_length, event.stream_id
147 )
148
149 elif isinstance(event, h2.events.StreamEnded):
150 end_stream = True
151
152 if data_to_send := h2_conn.data_to_send():
153 self.sock.sendall(data_to_send)
154
155 # We always close to not have to handle connection management.
156 self.close()
157
158 assert status is not None
159 return HTTP2Response(
160 status=status,
161 headers=headers,
162 request_url=self._request_url,
163 data=bytes(data),
164 )
165
166 def close(self) -> None:
167 with self._h2_conn as h2_conn:
168 try:
169 h2_conn.close_connection()
170 if data := h2_conn.data_to_send():
171 self.sock.sendall(data)
172 except Exception:
173 pass
174
175 # Reset all our HTTP/2 connection state.
176 self._h2_conn = self._new_h2_conn()
177 self._h2_stream = None
178 self._h2_headers = []
179
180 super().close()
181
182
183 class HTTP2Response(BaseHTTPResponse):
184 # TODO: This is a woefully incomplete response object, but works for non-streaming.
185 def __init__(
186 self,
187 status: int,
188 headers: HTTPHeaderDict,
189 request_url: str,
190 data: bytes,
191 decode_content: bool = False, # TODO: support decoding
192 ) -> None:
193 super().__init__(
194 status=status,
195 headers=headers,
196 # Following CPython, we map HTTP versions to major * 10 + minor integers
197 version=20,
198 # No reason phrase in HTTP/2
199 reason=None,
200 decode_content=decode_content,
201 request_url=request_url,
202 )
203 self._data = data
204 self.length_remaining = 0
205
206 @property
207 def data(self) -> bytes:
208 return self._data
209
210 def get_redirect_location(self) -> None:
211 return None
212
213 def close(self) -> None:
214 pass
215
216
217 def inject_into_urllib3() -> None:
218 HTTPSConnectionPool.ConnectionCls = HTTP2Connection
219 urllib3.connection.HTTPSConnection = HTTP2Connection # type: ignore[misc]
220
221 # TODO: Offer 'http/1.1' as well, but for testing purposes this is handy.
222 urllib3.util.ssl_.ALPN_PROTOCOLS = ["h2"]
223
224
225 def extract_from_urllib3() -> None:
226 HTTPSConnectionPool.ConnectionCls = orig_HTTPSConnection
227 urllib3.connection.HTTPSConnection = orig_HTTPSConnection # type: ignore[misc]
228
229 urllib3.util.ssl_.ALPN_PROTOCOLS = ["http/1.1"]