annotate urllib3/connection.py @ 16:dc2c003078e9 tip

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Tue, 21 May 2024 01:09:25 -0400 (8 months ago)
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 from __future__ import annotations
jpayne@7 2
jpayne@7 3 import datetime
jpayne@7 4 import logging
jpayne@7 5 import os
jpayne@7 6 import re
jpayne@7 7 import socket
jpayne@7 8 import sys
jpayne@7 9 import typing
jpayne@7 10 import warnings
jpayne@7 11 from http.client import HTTPConnection as _HTTPConnection
jpayne@7 12 from http.client import HTTPException as HTTPException # noqa: F401
jpayne@7 13 from http.client import ResponseNotReady
jpayne@7 14 from socket import timeout as SocketTimeout
jpayne@7 15
jpayne@7 16 if typing.TYPE_CHECKING:
jpayne@7 17 from typing import Literal
jpayne@7 18
jpayne@7 19 from .response import HTTPResponse
jpayne@7 20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT
jpayne@7 21 from .util.ssltransport import SSLTransport
jpayne@7 22
jpayne@7 23 from ._collections import HTTPHeaderDict
jpayne@7 24 from .util.response import assert_header_parsing
jpayne@7 25 from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout
jpayne@7 26 from .util.util import to_str
jpayne@7 27 from .util.wait import wait_for_read
jpayne@7 28
jpayne@7 29 try: # Compiled with SSL?
jpayne@7 30 import ssl
jpayne@7 31
jpayne@7 32 BaseSSLError = ssl.SSLError
jpayne@7 33 except (ImportError, AttributeError):
jpayne@7 34 ssl = None # type: ignore[assignment]
jpayne@7 35
jpayne@7 36 class BaseSSLError(BaseException): # type: ignore[no-redef]
jpayne@7 37 pass
jpayne@7 38
jpayne@7 39
jpayne@7 40 from ._base_connection import _TYPE_BODY
jpayne@7 41 from ._base_connection import ProxyConfig as ProxyConfig
jpayne@7 42 from ._base_connection import _ResponseOptions as _ResponseOptions
jpayne@7 43 from ._version import __version__
jpayne@7 44 from .exceptions import (
jpayne@7 45 ConnectTimeoutError,
jpayne@7 46 HeaderParsingError,
jpayne@7 47 NameResolutionError,
jpayne@7 48 NewConnectionError,
jpayne@7 49 ProxyError,
jpayne@7 50 SystemTimeWarning,
jpayne@7 51 )
jpayne@7 52 from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_
jpayne@7 53 from .util.request import body_to_chunks
jpayne@7 54 from .util.ssl_ import assert_fingerprint as _assert_fingerprint
jpayne@7 55 from .util.ssl_ import (
jpayne@7 56 create_urllib3_context,
jpayne@7 57 is_ipaddress,
jpayne@7 58 resolve_cert_reqs,
jpayne@7 59 resolve_ssl_version,
jpayne@7 60 ssl_wrap_socket,
jpayne@7 61 )
jpayne@7 62 from .util.ssl_match_hostname import CertificateError, match_hostname
jpayne@7 63 from .util.url import Url
jpayne@7 64
jpayne@7 65 # Not a no-op, we're adding this to the namespace so it can be imported.
jpayne@7 66 ConnectionError = ConnectionError
jpayne@7 67 BrokenPipeError = BrokenPipeError
jpayne@7 68
jpayne@7 69
jpayne@7 70 log = logging.getLogger(__name__)
jpayne@7 71
jpayne@7 72 port_by_scheme = {"http": 80, "https": 443}
jpayne@7 73
jpayne@7 74 # When it comes time to update this value as a part of regular maintenance
jpayne@7 75 # (ie test_recent_date is failing) update it to ~6 months before the current date.
jpayne@7 76 RECENT_DATE = datetime.date(2023, 6, 1)
jpayne@7 77
jpayne@7 78 _CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
jpayne@7 79
jpayne@7 80 _HAS_SYS_AUDIT = hasattr(sys, "audit")
jpayne@7 81
jpayne@7 82
jpayne@7 83 class HTTPConnection(_HTTPConnection):
jpayne@7 84 """
jpayne@7 85 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
jpayne@7 86 backwards-compatibility layer between older and newer Pythons.
jpayne@7 87
jpayne@7 88 Additional keyword parameters are used to configure attributes of the connection.
jpayne@7 89 Accepted parameters include:
jpayne@7 90
jpayne@7 91 - ``source_address``: Set the source address for the current connection.
jpayne@7 92 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
jpayne@7 93 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
jpayne@7 94 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
jpayne@7 95
jpayne@7 96 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
jpayne@7 97 you might pass:
jpayne@7 98
jpayne@7 99 .. code-block:: python
jpayne@7 100
jpayne@7 101 HTTPConnection.default_socket_options + [
jpayne@7 102 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
jpayne@7 103 ]
jpayne@7 104
jpayne@7 105 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
jpayne@7 106 """
jpayne@7 107
jpayne@7 108 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc]
jpayne@7 109
jpayne@7 110 #: Disable Nagle's algorithm by default.
jpayne@7 111 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
jpayne@7 112 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [
jpayne@7 113 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
jpayne@7 114 ]
jpayne@7 115
jpayne@7 116 #: Whether this connection verifies the host's certificate.
jpayne@7 117 is_verified: bool = False
jpayne@7 118
jpayne@7 119 #: Whether this proxy connection verified the proxy host's certificate.
jpayne@7 120 # If no proxy is currently connected to the value will be ``None``.
jpayne@7 121 proxy_is_verified: bool | None = None
jpayne@7 122
jpayne@7 123 blocksize: int
jpayne@7 124 source_address: tuple[str, int] | None
jpayne@7 125 socket_options: connection._TYPE_SOCKET_OPTIONS | None
jpayne@7 126
jpayne@7 127 _has_connected_to_proxy: bool
jpayne@7 128 _response_options: _ResponseOptions | None
jpayne@7 129 _tunnel_host: str | None
jpayne@7 130 _tunnel_port: int | None
jpayne@7 131 _tunnel_scheme: str | None
jpayne@7 132
jpayne@7 133 def __init__(
jpayne@7 134 self,
jpayne@7 135 host: str,
jpayne@7 136 port: int | None = None,
jpayne@7 137 *,
jpayne@7 138 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
jpayne@7 139 source_address: tuple[str, int] | None = None,
jpayne@7 140 blocksize: int = 16384,
jpayne@7 141 socket_options: None
jpayne@7 142 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options,
jpayne@7 143 proxy: Url | None = None,
jpayne@7 144 proxy_config: ProxyConfig | None = None,
jpayne@7 145 ) -> None:
jpayne@7 146 super().__init__(
jpayne@7 147 host=host,
jpayne@7 148 port=port,
jpayne@7 149 timeout=Timeout.resolve_default_timeout(timeout),
jpayne@7 150 source_address=source_address,
jpayne@7 151 blocksize=blocksize,
jpayne@7 152 )
jpayne@7 153 self.socket_options = socket_options
jpayne@7 154 self.proxy = proxy
jpayne@7 155 self.proxy_config = proxy_config
jpayne@7 156
jpayne@7 157 self._has_connected_to_proxy = False
jpayne@7 158 self._response_options = None
jpayne@7 159 self._tunnel_host: str | None = None
jpayne@7 160 self._tunnel_port: int | None = None
jpayne@7 161 self._tunnel_scheme: str | None = None
jpayne@7 162
jpayne@7 163 @property
jpayne@7 164 def host(self) -> str:
jpayne@7 165 """
jpayne@7 166 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
jpayne@7 167
jpayne@7 168 In general, SSL certificates don't include the trailing dot indicating a
jpayne@7 169 fully-qualified domain name, and thus, they don't validate properly when
jpayne@7 170 checked against a domain name that includes the dot. In addition, some
jpayne@7 171 servers may not expect to receive the trailing dot when provided.
jpayne@7 172
jpayne@7 173 However, the hostname with trailing dot is critical to DNS resolution; doing a
jpayne@7 174 lookup with the trailing dot will properly only resolve the appropriate FQDN,
jpayne@7 175 whereas a lookup without a trailing dot will search the system's search domain
jpayne@7 176 list. Thus, it's important to keep the original host around for use only in
jpayne@7 177 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
jpayne@7 178 actual TCP connection across which we're going to send HTTP requests).
jpayne@7 179 """
jpayne@7 180 return self._dns_host.rstrip(".")
jpayne@7 181
jpayne@7 182 @host.setter
jpayne@7 183 def host(self, value: str) -> None:
jpayne@7 184 """
jpayne@7 185 Setter for the `host` property.
jpayne@7 186
jpayne@7 187 We assume that only urllib3 uses the _dns_host attribute; httplib itself
jpayne@7 188 only uses `host`, and it seems reasonable that other libraries follow suit.
jpayne@7 189 """
jpayne@7 190 self._dns_host = value
jpayne@7 191
jpayne@7 192 def _new_conn(self) -> socket.socket:
jpayne@7 193 """Establish a socket connection and set nodelay settings on it.
jpayne@7 194
jpayne@7 195 :return: New socket connection.
jpayne@7 196 """
jpayne@7 197 try:
jpayne@7 198 sock = connection.create_connection(
jpayne@7 199 (self._dns_host, self.port),
jpayne@7 200 self.timeout,
jpayne@7 201 source_address=self.source_address,
jpayne@7 202 socket_options=self.socket_options,
jpayne@7 203 )
jpayne@7 204 except socket.gaierror as e:
jpayne@7 205 raise NameResolutionError(self.host, self, e) from e
jpayne@7 206 except SocketTimeout as e:
jpayne@7 207 raise ConnectTimeoutError(
jpayne@7 208 self,
jpayne@7 209 f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
jpayne@7 210 ) from e
jpayne@7 211
jpayne@7 212 except OSError as e:
jpayne@7 213 raise NewConnectionError(
jpayne@7 214 self, f"Failed to establish a new connection: {e}"
jpayne@7 215 ) from e
jpayne@7 216
jpayne@7 217 # Audit hooks are only available in Python 3.8+
jpayne@7 218 if _HAS_SYS_AUDIT:
jpayne@7 219 sys.audit("http.client.connect", self, self.host, self.port)
jpayne@7 220
jpayne@7 221 return sock
jpayne@7 222
jpayne@7 223 def set_tunnel(
jpayne@7 224 self,
jpayne@7 225 host: str,
jpayne@7 226 port: int | None = None,
jpayne@7 227 headers: typing.Mapping[str, str] | None = None,
jpayne@7 228 scheme: str = "http",
jpayne@7 229 ) -> None:
jpayne@7 230 if scheme not in ("http", "https"):
jpayne@7 231 raise ValueError(
jpayne@7 232 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'"
jpayne@7 233 )
jpayne@7 234 super().set_tunnel(host, port=port, headers=headers)
jpayne@7 235 self._tunnel_scheme = scheme
jpayne@7 236
jpayne@7 237 def connect(self) -> None:
jpayne@7 238 self.sock = self._new_conn()
jpayne@7 239 if self._tunnel_host:
jpayne@7 240 # If we're tunneling it means we're connected to our proxy.
jpayne@7 241 self._has_connected_to_proxy = True
jpayne@7 242
jpayne@7 243 # TODO: Fix tunnel so it doesn't depend on self.sock state.
jpayne@7 244 self._tunnel() # type: ignore[attr-defined]
jpayne@7 245
jpayne@7 246 # If there's a proxy to be connected to we are fully connected.
jpayne@7 247 # This is set twice (once above and here) due to forwarding proxies
jpayne@7 248 # not using tunnelling.
jpayne@7 249 self._has_connected_to_proxy = bool(self.proxy)
jpayne@7 250
jpayne@7 251 if self._has_connected_to_proxy:
jpayne@7 252 self.proxy_is_verified = False
jpayne@7 253
jpayne@7 254 @property
jpayne@7 255 def is_closed(self) -> bool:
jpayne@7 256 return self.sock is None
jpayne@7 257
jpayne@7 258 @property
jpayne@7 259 def is_connected(self) -> bool:
jpayne@7 260 if self.sock is None:
jpayne@7 261 return False
jpayne@7 262 return not wait_for_read(self.sock, timeout=0.0)
jpayne@7 263
jpayne@7 264 @property
jpayne@7 265 def has_connected_to_proxy(self) -> bool:
jpayne@7 266 return self._has_connected_to_proxy
jpayne@7 267
jpayne@7 268 @property
jpayne@7 269 def proxy_is_forwarding(self) -> bool:
jpayne@7 270 """
jpayne@7 271 Return True if a forwarding proxy is configured, else return False
jpayne@7 272 """
jpayne@7 273 return bool(self.proxy) and self._tunnel_host is None
jpayne@7 274
jpayne@7 275 def close(self) -> None:
jpayne@7 276 try:
jpayne@7 277 super().close()
jpayne@7 278 finally:
jpayne@7 279 # Reset all stateful properties so connection
jpayne@7 280 # can be re-used without leaking prior configs.
jpayne@7 281 self.sock = None
jpayne@7 282 self.is_verified = False
jpayne@7 283 self.proxy_is_verified = None
jpayne@7 284 self._has_connected_to_proxy = False
jpayne@7 285 self._response_options = None
jpayne@7 286 self._tunnel_host = None
jpayne@7 287 self._tunnel_port = None
jpayne@7 288 self._tunnel_scheme = None
jpayne@7 289
jpayne@7 290 def putrequest(
jpayne@7 291 self,
jpayne@7 292 method: str,
jpayne@7 293 url: str,
jpayne@7 294 skip_host: bool = False,
jpayne@7 295 skip_accept_encoding: bool = False,
jpayne@7 296 ) -> None:
jpayne@7 297 """"""
jpayne@7 298 # Empty docstring because the indentation of CPython's implementation
jpayne@7 299 # is broken but we don't want this method in our documentation.
jpayne@7 300 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
jpayne@7 301 if match:
jpayne@7 302 raise ValueError(
jpayne@7 303 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})"
jpayne@7 304 )
jpayne@7 305
jpayne@7 306 return super().putrequest(
jpayne@7 307 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding
jpayne@7 308 )
jpayne@7 309
jpayne@7 310 def putheader(self, header: str, *values: str) -> None: # type: ignore[override]
jpayne@7 311 """"""
jpayne@7 312 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
jpayne@7 313 super().putheader(header, *values)
jpayne@7 314 elif to_str(header.lower()) not in SKIPPABLE_HEADERS:
jpayne@7 315 skippable_headers = "', '".join(
jpayne@7 316 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)]
jpayne@7 317 )
jpayne@7 318 raise ValueError(
jpayne@7 319 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'"
jpayne@7 320 )
jpayne@7 321
jpayne@7 322 # `request` method's signature intentionally violates LSP.
jpayne@7 323 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental.
jpayne@7 324 def request( # type: ignore[override]
jpayne@7 325 self,
jpayne@7 326 method: str,
jpayne@7 327 url: str,
jpayne@7 328 body: _TYPE_BODY | None = None,
jpayne@7 329 headers: typing.Mapping[str, str] | None = None,
jpayne@7 330 *,
jpayne@7 331 chunked: bool = False,
jpayne@7 332 preload_content: bool = True,
jpayne@7 333 decode_content: bool = True,
jpayne@7 334 enforce_content_length: bool = True,
jpayne@7 335 ) -> None:
jpayne@7 336 # Update the inner socket's timeout value to send the request.
jpayne@7 337 # This only triggers if the connection is re-used.
jpayne@7 338 if self.sock is not None:
jpayne@7 339 self.sock.settimeout(self.timeout)
jpayne@7 340
jpayne@7 341 # Store these values to be fed into the HTTPResponse
jpayne@7 342 # object later. TODO: Remove this in favor of a real
jpayne@7 343 # HTTP lifecycle mechanism.
jpayne@7 344
jpayne@7 345 # We have to store these before we call .request()
jpayne@7 346 # because sometimes we can still salvage a response
jpayne@7 347 # off the wire even if we aren't able to completely
jpayne@7 348 # send the request body.
jpayne@7 349 self._response_options = _ResponseOptions(
jpayne@7 350 request_method=method,
jpayne@7 351 request_url=url,
jpayne@7 352 preload_content=preload_content,
jpayne@7 353 decode_content=decode_content,
jpayne@7 354 enforce_content_length=enforce_content_length,
jpayne@7 355 )
jpayne@7 356
jpayne@7 357 if headers is None:
jpayne@7 358 headers = {}
jpayne@7 359 header_keys = frozenset(to_str(k.lower()) for k in headers)
jpayne@7 360 skip_accept_encoding = "accept-encoding" in header_keys
jpayne@7 361 skip_host = "host" in header_keys
jpayne@7 362 self.putrequest(
jpayne@7 363 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
jpayne@7 364 )
jpayne@7 365
jpayne@7 366 # Transform the body into an iterable of sendall()-able chunks
jpayne@7 367 # and detect if an explicit Content-Length is doable.
jpayne@7 368 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
jpayne@7 369 chunks = chunks_and_cl.chunks
jpayne@7 370 content_length = chunks_and_cl.content_length
jpayne@7 371
jpayne@7 372 # When chunked is explicit set to 'True' we respect that.
jpayne@7 373 if chunked:
jpayne@7 374 if "transfer-encoding" not in header_keys:
jpayne@7 375 self.putheader("Transfer-Encoding", "chunked")
jpayne@7 376 else:
jpayne@7 377 # Detect whether a framing mechanism is already in use. If so
jpayne@7 378 # we respect that value, otherwise we pick chunked vs content-length
jpayne@7 379 # depending on the type of 'body'.
jpayne@7 380 if "content-length" in header_keys:
jpayne@7 381 chunked = False
jpayne@7 382 elif "transfer-encoding" in header_keys:
jpayne@7 383 chunked = True
jpayne@7 384
jpayne@7 385 # Otherwise we go off the recommendation of 'body_to_chunks()'.
jpayne@7 386 else:
jpayne@7 387 chunked = False
jpayne@7 388 if content_length is None:
jpayne@7 389 if chunks is not None:
jpayne@7 390 chunked = True
jpayne@7 391 self.putheader("Transfer-Encoding", "chunked")
jpayne@7 392 else:
jpayne@7 393 self.putheader("Content-Length", str(content_length))
jpayne@7 394
jpayne@7 395 # Now that framing headers are out of the way we send all the other headers.
jpayne@7 396 if "user-agent" not in header_keys:
jpayne@7 397 self.putheader("User-Agent", _get_default_user_agent())
jpayne@7 398 for header, value in headers.items():
jpayne@7 399 self.putheader(header, value)
jpayne@7 400 self.endheaders()
jpayne@7 401
jpayne@7 402 # If we're given a body we start sending that in chunks.
jpayne@7 403 if chunks is not None:
jpayne@7 404 for chunk in chunks:
jpayne@7 405 # Sending empty chunks isn't allowed for TE: chunked
jpayne@7 406 # as it indicates the end of the body.
jpayne@7 407 if not chunk:
jpayne@7 408 continue
jpayne@7 409 if isinstance(chunk, str):
jpayne@7 410 chunk = chunk.encode("utf-8")
jpayne@7 411 if chunked:
jpayne@7 412 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk))
jpayne@7 413 else:
jpayne@7 414 self.send(chunk)
jpayne@7 415
jpayne@7 416 # Regardless of whether we have a body or not, if we're in
jpayne@7 417 # chunked mode we want to send an explicit empty chunk.
jpayne@7 418 if chunked:
jpayne@7 419 self.send(b"0\r\n\r\n")
jpayne@7 420
jpayne@7 421 def request_chunked(
jpayne@7 422 self,
jpayne@7 423 method: str,
jpayne@7 424 url: str,
jpayne@7 425 body: _TYPE_BODY | None = None,
jpayne@7 426 headers: typing.Mapping[str, str] | None = None,
jpayne@7 427 ) -> None:
jpayne@7 428 """
jpayne@7 429 Alternative to the common request method, which sends the
jpayne@7 430 body with chunked encoding and not as one block
jpayne@7 431 """
jpayne@7 432 warnings.warn(
jpayne@7 433 "HTTPConnection.request_chunked() is deprecated and will be removed "
jpayne@7 434 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).",
jpayne@7 435 category=DeprecationWarning,
jpayne@7 436 stacklevel=2,
jpayne@7 437 )
jpayne@7 438 self.request(method, url, body=body, headers=headers, chunked=True)
jpayne@7 439
jpayne@7 440 def getresponse( # type: ignore[override]
jpayne@7 441 self,
jpayne@7 442 ) -> HTTPResponse:
jpayne@7 443 """
jpayne@7 444 Get the response from the server.
jpayne@7 445
jpayne@7 446 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable.
jpayne@7 447
jpayne@7 448 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed.
jpayne@7 449 """
jpayne@7 450 # Raise the same error as http.client.HTTPConnection
jpayne@7 451 if self._response_options is None:
jpayne@7 452 raise ResponseNotReady()
jpayne@7 453
jpayne@7 454 # Reset this attribute for being used again.
jpayne@7 455 resp_options = self._response_options
jpayne@7 456 self._response_options = None
jpayne@7 457
jpayne@7 458 # Since the connection's timeout value may have been updated
jpayne@7 459 # we need to set the timeout on the socket.
jpayne@7 460 self.sock.settimeout(self.timeout)
jpayne@7 461
jpayne@7 462 # This is needed here to avoid circular import errors
jpayne@7 463 from .response import HTTPResponse
jpayne@7 464
jpayne@7 465 # Get the response from http.client.HTTPConnection
jpayne@7 466 httplib_response = super().getresponse()
jpayne@7 467
jpayne@7 468 try:
jpayne@7 469 assert_header_parsing(httplib_response.msg)
jpayne@7 470 except (HeaderParsingError, TypeError) as hpe:
jpayne@7 471 log.warning(
jpayne@7 472 "Failed to parse headers (url=%s): %s",
jpayne@7 473 _url_from_connection(self, resp_options.request_url),
jpayne@7 474 hpe,
jpayne@7 475 exc_info=True,
jpayne@7 476 )
jpayne@7 477
jpayne@7 478 headers = HTTPHeaderDict(httplib_response.msg.items())
jpayne@7 479
jpayne@7 480 response = HTTPResponse(
jpayne@7 481 body=httplib_response,
jpayne@7 482 headers=headers,
jpayne@7 483 status=httplib_response.status,
jpayne@7 484 version=httplib_response.version,
jpayne@7 485 reason=httplib_response.reason,
jpayne@7 486 preload_content=resp_options.preload_content,
jpayne@7 487 decode_content=resp_options.decode_content,
jpayne@7 488 original_response=httplib_response,
jpayne@7 489 enforce_content_length=resp_options.enforce_content_length,
jpayne@7 490 request_method=resp_options.request_method,
jpayne@7 491 request_url=resp_options.request_url,
jpayne@7 492 )
jpayne@7 493 return response
jpayne@7 494
jpayne@7 495
jpayne@7 496 class HTTPSConnection(HTTPConnection):
jpayne@7 497 """
jpayne@7 498 Many of the parameters to this constructor are passed to the underlying SSL
jpayne@7 499 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
jpayne@7 500 """
jpayne@7 501
jpayne@7 502 default_port = port_by_scheme["https"] # type: ignore[misc]
jpayne@7 503
jpayne@7 504 cert_reqs: int | str | None = None
jpayne@7 505 ca_certs: str | None = None
jpayne@7 506 ca_cert_dir: str | None = None
jpayne@7 507 ca_cert_data: None | str | bytes = None
jpayne@7 508 ssl_version: int | str | None = None
jpayne@7 509 ssl_minimum_version: int | None = None
jpayne@7 510 ssl_maximum_version: int | None = None
jpayne@7 511 assert_fingerprint: str | None = None
jpayne@7 512
jpayne@7 513 def __init__(
jpayne@7 514 self,
jpayne@7 515 host: str,
jpayne@7 516 port: int | None = None,
jpayne@7 517 *,
jpayne@7 518 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
jpayne@7 519 source_address: tuple[str, int] | None = None,
jpayne@7 520 blocksize: int = 16384,
jpayne@7 521 socket_options: None
jpayne@7 522 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options,
jpayne@7 523 proxy: Url | None = None,
jpayne@7 524 proxy_config: ProxyConfig | None = None,
jpayne@7 525 cert_reqs: int | str | None = None,
jpayne@7 526 assert_hostname: None | str | Literal[False] = None,
jpayne@7 527 assert_fingerprint: str | None = None,
jpayne@7 528 server_hostname: str | None = None,
jpayne@7 529 ssl_context: ssl.SSLContext | None = None,
jpayne@7 530 ca_certs: str | None = None,
jpayne@7 531 ca_cert_dir: str | None = None,
jpayne@7 532 ca_cert_data: None | str | bytes = None,
jpayne@7 533 ssl_minimum_version: int | None = None,
jpayne@7 534 ssl_maximum_version: int | None = None,
jpayne@7 535 ssl_version: int | str | None = None, # Deprecated
jpayne@7 536 cert_file: str | None = None,
jpayne@7 537 key_file: str | None = None,
jpayne@7 538 key_password: str | None = None,
jpayne@7 539 ) -> None:
jpayne@7 540 super().__init__(
jpayne@7 541 host,
jpayne@7 542 port=port,
jpayne@7 543 timeout=timeout,
jpayne@7 544 source_address=source_address,
jpayne@7 545 blocksize=blocksize,
jpayne@7 546 socket_options=socket_options,
jpayne@7 547 proxy=proxy,
jpayne@7 548 proxy_config=proxy_config,
jpayne@7 549 )
jpayne@7 550
jpayne@7 551 self.key_file = key_file
jpayne@7 552 self.cert_file = cert_file
jpayne@7 553 self.key_password = key_password
jpayne@7 554 self.ssl_context = ssl_context
jpayne@7 555 self.server_hostname = server_hostname
jpayne@7 556 self.assert_hostname = assert_hostname
jpayne@7 557 self.assert_fingerprint = assert_fingerprint
jpayne@7 558 self.ssl_version = ssl_version
jpayne@7 559 self.ssl_minimum_version = ssl_minimum_version
jpayne@7 560 self.ssl_maximum_version = ssl_maximum_version
jpayne@7 561 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
jpayne@7 562 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
jpayne@7 563 self.ca_cert_data = ca_cert_data
jpayne@7 564
jpayne@7 565 # cert_reqs depends on ssl_context so calculate last.
jpayne@7 566 if cert_reqs is None:
jpayne@7 567 if self.ssl_context is not None:
jpayne@7 568 cert_reqs = self.ssl_context.verify_mode
jpayne@7 569 else:
jpayne@7 570 cert_reqs = resolve_cert_reqs(None)
jpayne@7 571 self.cert_reqs = cert_reqs
jpayne@7 572
jpayne@7 573 def set_cert(
jpayne@7 574 self,
jpayne@7 575 key_file: str | None = None,
jpayne@7 576 cert_file: str | None = None,
jpayne@7 577 cert_reqs: int | str | None = None,
jpayne@7 578 key_password: str | None = None,
jpayne@7 579 ca_certs: str | None = None,
jpayne@7 580 assert_hostname: None | str | Literal[False] = None,
jpayne@7 581 assert_fingerprint: str | None = None,
jpayne@7 582 ca_cert_dir: str | None = None,
jpayne@7 583 ca_cert_data: None | str | bytes = None,
jpayne@7 584 ) -> None:
jpayne@7 585 """
jpayne@7 586 This method should only be called once, before the connection is used.
jpayne@7 587 """
jpayne@7 588 warnings.warn(
jpayne@7 589 "HTTPSConnection.set_cert() is deprecated and will be removed "
jpayne@7 590 "in urllib3 v2.1.0. Instead provide the parameters to the "
jpayne@7 591 "HTTPSConnection constructor.",
jpayne@7 592 category=DeprecationWarning,
jpayne@7 593 stacklevel=2,
jpayne@7 594 )
jpayne@7 595
jpayne@7 596 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
jpayne@7 597 # have an SSLContext object in which case we'll use its verify_mode.
jpayne@7 598 if cert_reqs is None:
jpayne@7 599 if self.ssl_context is not None:
jpayne@7 600 cert_reqs = self.ssl_context.verify_mode
jpayne@7 601 else:
jpayne@7 602 cert_reqs = resolve_cert_reqs(None)
jpayne@7 603
jpayne@7 604 self.key_file = key_file
jpayne@7 605 self.cert_file = cert_file
jpayne@7 606 self.cert_reqs = cert_reqs
jpayne@7 607 self.key_password = key_password
jpayne@7 608 self.assert_hostname = assert_hostname
jpayne@7 609 self.assert_fingerprint = assert_fingerprint
jpayne@7 610 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
jpayne@7 611 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
jpayne@7 612 self.ca_cert_data = ca_cert_data
jpayne@7 613
jpayne@7 614 def connect(self) -> None:
jpayne@7 615 sock: socket.socket | ssl.SSLSocket
jpayne@7 616 self.sock = sock = self._new_conn()
jpayne@7 617 server_hostname: str = self.host
jpayne@7 618 tls_in_tls = False
jpayne@7 619
jpayne@7 620 # Do we need to establish a tunnel?
jpayne@7 621 if self._tunnel_host is not None:
jpayne@7 622 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS.
jpayne@7 623 if self._tunnel_scheme == "https":
jpayne@7 624 # _connect_tls_proxy will verify and assign proxy_is_verified
jpayne@7 625 self.sock = sock = self._connect_tls_proxy(self.host, sock)
jpayne@7 626 tls_in_tls = True
jpayne@7 627 elif self._tunnel_scheme == "http":
jpayne@7 628 self.proxy_is_verified = False
jpayne@7 629
jpayne@7 630 # If we're tunneling it means we're connected to our proxy.
jpayne@7 631 self._has_connected_to_proxy = True
jpayne@7 632
jpayne@7 633 self._tunnel() # type: ignore[attr-defined]
jpayne@7 634 # Override the host with the one we're requesting data from.
jpayne@7 635 server_hostname = self._tunnel_host
jpayne@7 636
jpayne@7 637 if self.server_hostname is not None:
jpayne@7 638 server_hostname = self.server_hostname
jpayne@7 639
jpayne@7 640 is_time_off = datetime.date.today() < RECENT_DATE
jpayne@7 641 if is_time_off:
jpayne@7 642 warnings.warn(
jpayne@7 643 (
jpayne@7 644 f"System time is way off (before {RECENT_DATE}). This will probably "
jpayne@7 645 "lead to SSL verification errors"
jpayne@7 646 ),
jpayne@7 647 SystemTimeWarning,
jpayne@7 648 )
jpayne@7 649
jpayne@7 650 # Remove trailing '.' from fqdn hostnames to allow certificate validation
jpayne@7 651 server_hostname_rm_dot = server_hostname.rstrip(".")
jpayne@7 652
jpayne@7 653 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
jpayne@7 654 sock=sock,
jpayne@7 655 cert_reqs=self.cert_reqs,
jpayne@7 656 ssl_version=self.ssl_version,
jpayne@7 657 ssl_minimum_version=self.ssl_minimum_version,
jpayne@7 658 ssl_maximum_version=self.ssl_maximum_version,
jpayne@7 659 ca_certs=self.ca_certs,
jpayne@7 660 ca_cert_dir=self.ca_cert_dir,
jpayne@7 661 ca_cert_data=self.ca_cert_data,
jpayne@7 662 cert_file=self.cert_file,
jpayne@7 663 key_file=self.key_file,
jpayne@7 664 key_password=self.key_password,
jpayne@7 665 server_hostname=server_hostname_rm_dot,
jpayne@7 666 ssl_context=self.ssl_context,
jpayne@7 667 tls_in_tls=tls_in_tls,
jpayne@7 668 assert_hostname=self.assert_hostname,
jpayne@7 669 assert_fingerprint=self.assert_fingerprint,
jpayne@7 670 )
jpayne@7 671 self.sock = sock_and_verified.socket
jpayne@7 672
jpayne@7 673 # Forwarding proxies can never have a verified target since
jpayne@7 674 # the proxy is the one doing the verification. Should instead
jpayne@7 675 # use a CONNECT tunnel in order to verify the target.
jpayne@7 676 # See: https://github.com/urllib3/urllib3/issues/3267.
jpayne@7 677 if self.proxy_is_forwarding:
jpayne@7 678 self.is_verified = False
jpayne@7 679 else:
jpayne@7 680 self.is_verified = sock_and_verified.is_verified
jpayne@7 681
jpayne@7 682 # If there's a proxy to be connected to we are fully connected.
jpayne@7 683 # This is set twice (once above and here) due to forwarding proxies
jpayne@7 684 # not using tunnelling.
jpayne@7 685 self._has_connected_to_proxy = bool(self.proxy)
jpayne@7 686
jpayne@7 687 # Set `self.proxy_is_verified` unless it's already set while
jpayne@7 688 # establishing a tunnel.
jpayne@7 689 if self._has_connected_to_proxy and self.proxy_is_verified is None:
jpayne@7 690 self.proxy_is_verified = sock_and_verified.is_verified
jpayne@7 691
jpayne@7 692 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket:
jpayne@7 693 """
jpayne@7 694 Establish a TLS connection to the proxy using the provided SSL context.
jpayne@7 695 """
jpayne@7 696 # `_connect_tls_proxy` is called when self._tunnel_host is truthy.
jpayne@7 697 proxy_config = typing.cast(ProxyConfig, self.proxy_config)
jpayne@7 698 ssl_context = proxy_config.ssl_context
jpayne@7 699 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
jpayne@7 700 sock,
jpayne@7 701 cert_reqs=self.cert_reqs,
jpayne@7 702 ssl_version=self.ssl_version,
jpayne@7 703 ssl_minimum_version=self.ssl_minimum_version,
jpayne@7 704 ssl_maximum_version=self.ssl_maximum_version,
jpayne@7 705 ca_certs=self.ca_certs,
jpayne@7 706 ca_cert_dir=self.ca_cert_dir,
jpayne@7 707 ca_cert_data=self.ca_cert_data,
jpayne@7 708 server_hostname=hostname,
jpayne@7 709 ssl_context=ssl_context,
jpayne@7 710 assert_hostname=proxy_config.assert_hostname,
jpayne@7 711 assert_fingerprint=proxy_config.assert_fingerprint,
jpayne@7 712 # Features that aren't implemented for proxies yet:
jpayne@7 713 cert_file=None,
jpayne@7 714 key_file=None,
jpayne@7 715 key_password=None,
jpayne@7 716 tls_in_tls=False,
jpayne@7 717 )
jpayne@7 718 self.proxy_is_verified = sock_and_verified.is_verified
jpayne@7 719 return sock_and_verified.socket # type: ignore[return-value]
jpayne@7 720
jpayne@7 721
jpayne@7 722 class _WrappedAndVerifiedSocket(typing.NamedTuple):
jpayne@7 723 """
jpayne@7 724 Wrapped socket and whether the connection is
jpayne@7 725 verified after the TLS handshake
jpayne@7 726 """
jpayne@7 727
jpayne@7 728 socket: ssl.SSLSocket | SSLTransport
jpayne@7 729 is_verified: bool
jpayne@7 730
jpayne@7 731
jpayne@7 732 def _ssl_wrap_socket_and_match_hostname(
jpayne@7 733 sock: socket.socket,
jpayne@7 734 *,
jpayne@7 735 cert_reqs: None | str | int,
jpayne@7 736 ssl_version: None | str | int,
jpayne@7 737 ssl_minimum_version: int | None,
jpayne@7 738 ssl_maximum_version: int | None,
jpayne@7 739 cert_file: str | None,
jpayne@7 740 key_file: str | None,
jpayne@7 741 key_password: str | None,
jpayne@7 742 ca_certs: str | None,
jpayne@7 743 ca_cert_dir: str | None,
jpayne@7 744 ca_cert_data: None | str | bytes,
jpayne@7 745 assert_hostname: None | str | Literal[False],
jpayne@7 746 assert_fingerprint: str | None,
jpayne@7 747 server_hostname: str | None,
jpayne@7 748 ssl_context: ssl.SSLContext | None,
jpayne@7 749 tls_in_tls: bool = False,
jpayne@7 750 ) -> _WrappedAndVerifiedSocket:
jpayne@7 751 """Logic for constructing an SSLContext from all TLS parameters, passing
jpayne@7 752 that down into ssl_wrap_socket, and then doing certificate verification
jpayne@7 753 either via hostname or fingerprint. This function exists to guarantee
jpayne@7 754 that both proxies and targets have the same behavior when connecting via TLS.
jpayne@7 755 """
jpayne@7 756 default_ssl_context = False
jpayne@7 757 if ssl_context is None:
jpayne@7 758 default_ssl_context = True
jpayne@7 759 context = create_urllib3_context(
jpayne@7 760 ssl_version=resolve_ssl_version(ssl_version),
jpayne@7 761 ssl_minimum_version=ssl_minimum_version,
jpayne@7 762 ssl_maximum_version=ssl_maximum_version,
jpayne@7 763 cert_reqs=resolve_cert_reqs(cert_reqs),
jpayne@7 764 )
jpayne@7 765 else:
jpayne@7 766 context = ssl_context
jpayne@7 767
jpayne@7 768 context.verify_mode = resolve_cert_reqs(cert_reqs)
jpayne@7 769
jpayne@7 770 # In some cases, we want to verify hostnames ourselves
jpayne@7 771 if (
jpayne@7 772 # `ssl` can't verify fingerprints or alternate hostnames
jpayne@7 773 assert_fingerprint
jpayne@7 774 or assert_hostname
jpayne@7 775 # assert_hostname can be set to False to disable hostname checking
jpayne@7 776 or assert_hostname is False
jpayne@7 777 # We still support OpenSSL 1.0.2, which prevents us from verifying
jpayne@7 778 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933
jpayne@7 779 or ssl_.IS_PYOPENSSL
jpayne@7 780 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME
jpayne@7 781 ):
jpayne@7 782 context.check_hostname = False
jpayne@7 783
jpayne@7 784 # Try to load OS default certs if none are given. We need to do the hasattr() check
jpayne@7 785 # for custom pyOpenSSL SSLContext objects because they don't support
jpayne@7 786 # load_default_certs().
jpayne@7 787 if (
jpayne@7 788 not ca_certs
jpayne@7 789 and not ca_cert_dir
jpayne@7 790 and not ca_cert_data
jpayne@7 791 and default_ssl_context
jpayne@7 792 and hasattr(context, "load_default_certs")
jpayne@7 793 ):
jpayne@7 794 context.load_default_certs()
jpayne@7 795
jpayne@7 796 # Ensure that IPv6 addresses are in the proper format and don't have a
jpayne@7 797 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses
jpayne@7 798 # and interprets them as DNS hostnames.
jpayne@7 799 if server_hostname is not None:
jpayne@7 800 normalized = server_hostname.strip("[]")
jpayne@7 801 if "%" in normalized:
jpayne@7 802 normalized = normalized[: normalized.rfind("%")]
jpayne@7 803 if is_ipaddress(normalized):
jpayne@7 804 server_hostname = normalized
jpayne@7 805
jpayne@7 806 ssl_sock = ssl_wrap_socket(
jpayne@7 807 sock=sock,
jpayne@7 808 keyfile=key_file,
jpayne@7 809 certfile=cert_file,
jpayne@7 810 key_password=key_password,
jpayne@7 811 ca_certs=ca_certs,
jpayne@7 812 ca_cert_dir=ca_cert_dir,
jpayne@7 813 ca_cert_data=ca_cert_data,
jpayne@7 814 server_hostname=server_hostname,
jpayne@7 815 ssl_context=context,
jpayne@7 816 tls_in_tls=tls_in_tls,
jpayne@7 817 )
jpayne@7 818
jpayne@7 819 try:
jpayne@7 820 if assert_fingerprint:
jpayne@7 821 _assert_fingerprint(
jpayne@7 822 ssl_sock.getpeercert(binary_form=True), assert_fingerprint
jpayne@7 823 )
jpayne@7 824 elif (
jpayne@7 825 context.verify_mode != ssl.CERT_NONE
jpayne@7 826 and not context.check_hostname
jpayne@7 827 and assert_hostname is not False
jpayne@7 828 ):
jpayne@7 829 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment]
jpayne@7 830
jpayne@7 831 # Need to signal to our match_hostname whether to use 'commonName' or not.
jpayne@7 832 # If we're using our own constructed SSLContext we explicitly set 'False'
jpayne@7 833 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name.
jpayne@7 834 if default_ssl_context:
jpayne@7 835 hostname_checks_common_name = False
jpayne@7 836 else:
jpayne@7 837 hostname_checks_common_name = (
jpayne@7 838 getattr(context, "hostname_checks_common_name", False) or False
jpayne@7 839 )
jpayne@7 840
jpayne@7 841 _match_hostname(
jpayne@7 842 cert,
jpayne@7 843 assert_hostname or server_hostname, # type: ignore[arg-type]
jpayne@7 844 hostname_checks_common_name,
jpayne@7 845 )
jpayne@7 846
jpayne@7 847 return _WrappedAndVerifiedSocket(
jpayne@7 848 socket=ssl_sock,
jpayne@7 849 is_verified=context.verify_mode == ssl.CERT_REQUIRED
jpayne@7 850 or bool(assert_fingerprint),
jpayne@7 851 )
jpayne@7 852 except BaseException:
jpayne@7 853 ssl_sock.close()
jpayne@7 854 raise
jpayne@7 855
jpayne@7 856
jpayne@7 857 def _match_hostname(
jpayne@7 858 cert: _TYPE_PEER_CERT_RET_DICT | None,
jpayne@7 859 asserted_hostname: str,
jpayne@7 860 hostname_checks_common_name: bool = False,
jpayne@7 861 ) -> None:
jpayne@7 862 # Our upstream implementation of ssl.match_hostname()
jpayne@7 863 # only applies this normalization to IP addresses so it doesn't
jpayne@7 864 # match DNS SANs so we do the same thing!
jpayne@7 865 stripped_hostname = asserted_hostname.strip("[]")
jpayne@7 866 if is_ipaddress(stripped_hostname):
jpayne@7 867 asserted_hostname = stripped_hostname
jpayne@7 868
jpayne@7 869 try:
jpayne@7 870 match_hostname(cert, asserted_hostname, hostname_checks_common_name)
jpayne@7 871 except CertificateError as e:
jpayne@7 872 log.warning(
jpayne@7 873 "Certificate did not match expected hostname: %s. Certificate: %s",
jpayne@7 874 asserted_hostname,
jpayne@7 875 cert,
jpayne@7 876 )
jpayne@7 877 # Add cert to exception and reraise so client code can inspect
jpayne@7 878 # the cert when catching the exception, if they want to
jpayne@7 879 e._peer_cert = cert # type: ignore[attr-defined]
jpayne@7 880 raise
jpayne@7 881
jpayne@7 882
jpayne@7 883 def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError:
jpayne@7 884 # Look for the phrase 'wrong version number', if found
jpayne@7 885 # then we should warn the user that we're very sure that
jpayne@7 886 # this proxy is HTTP-only and they have a configuration issue.
jpayne@7 887 error_normalized = " ".join(re.split("[^a-z]", str(err).lower()))
jpayne@7 888 is_likely_http_proxy = (
jpayne@7 889 "wrong version number" in error_normalized
jpayne@7 890 or "unknown protocol" in error_normalized
jpayne@7 891 or "record layer failure" in error_normalized
jpayne@7 892 )
jpayne@7 893 http_proxy_warning = (
jpayne@7 894 ". Your proxy appears to only use HTTP and not HTTPS, "
jpayne@7 895 "try changing your proxy URL to be HTTP. See: "
jpayne@7 896 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html"
jpayne@7 897 "#https-proxy-error-http-proxy"
jpayne@7 898 )
jpayne@7 899 new_err = ProxyError(
jpayne@7 900 f"Unable to connect to proxy"
jpayne@7 901 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}",
jpayne@7 902 err,
jpayne@7 903 )
jpayne@7 904 new_err.__cause__ = err
jpayne@7 905 return new_err
jpayne@7 906
jpayne@7 907
jpayne@7 908 def _get_default_user_agent() -> str:
jpayne@7 909 return f"python-urllib3/{__version__}"
jpayne@7 910
jpayne@7 911
jpayne@7 912 class DummyConnection:
jpayne@7 913 """Used to detect a failed ConnectionCls import."""
jpayne@7 914
jpayne@7 915
jpayne@7 916 if not ssl:
jpayne@7 917 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811
jpayne@7 918
jpayne@7 919
jpayne@7 920 VerifiedHTTPSConnection = HTTPSConnection
jpayne@7 921
jpayne@7 922
jpayne@7 923 def _url_from_connection(
jpayne@7 924 conn: HTTPConnection | HTTPSConnection, path: str | None = None
jpayne@7 925 ) -> str:
jpayne@7 926 """Returns the URL from a given connection. This is mainly used for testing and logging."""
jpayne@7 927
jpayne@7 928 scheme = "https" if isinstance(conn, HTTPSConnection) else "http"
jpayne@7 929
jpayne@7 930 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url