annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/urllib3/connection.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 from __future__ import annotations
jpayne@69 2
jpayne@69 3 import datetime
jpayne@69 4 import http.client
jpayne@69 5 import logging
jpayne@69 6 import os
jpayne@69 7 import re
jpayne@69 8 import socket
jpayne@69 9 import sys
jpayne@69 10 import threading
jpayne@69 11 import typing
jpayne@69 12 import warnings
jpayne@69 13 from http.client import HTTPConnection as _HTTPConnection
jpayne@69 14 from http.client import HTTPException as HTTPException # noqa: F401
jpayne@69 15 from http.client import ResponseNotReady
jpayne@69 16 from socket import timeout as SocketTimeout
jpayne@69 17
jpayne@69 18 if typing.TYPE_CHECKING:
jpayne@69 19 from .response import HTTPResponse
jpayne@69 20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT
jpayne@69 21 from .util.ssltransport import SSLTransport
jpayne@69 22
jpayne@69 23 from ._collections import HTTPHeaderDict
jpayne@69 24 from .http2 import probe as http2_probe
jpayne@69 25 from .util.response import assert_header_parsing
jpayne@69 26 from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout
jpayne@69 27 from .util.util import to_str
jpayne@69 28 from .util.wait import wait_for_read
jpayne@69 29
jpayne@69 30 try: # Compiled with SSL?
jpayne@69 31 import ssl
jpayne@69 32
jpayne@69 33 BaseSSLError = ssl.SSLError
jpayne@69 34 except (ImportError, AttributeError):
jpayne@69 35 ssl = None # type: ignore[assignment]
jpayne@69 36
jpayne@69 37 class BaseSSLError(BaseException): # type: ignore[no-redef]
jpayne@69 38 pass
jpayne@69 39
jpayne@69 40
jpayne@69 41 from ._base_connection import _TYPE_BODY
jpayne@69 42 from ._base_connection import ProxyConfig as ProxyConfig
jpayne@69 43 from ._base_connection import _ResponseOptions as _ResponseOptions
jpayne@69 44 from ._version import __version__
jpayne@69 45 from .exceptions import (
jpayne@69 46 ConnectTimeoutError,
jpayne@69 47 HeaderParsingError,
jpayne@69 48 NameResolutionError,
jpayne@69 49 NewConnectionError,
jpayne@69 50 ProxyError,
jpayne@69 51 SystemTimeWarning,
jpayne@69 52 )
jpayne@69 53 from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_
jpayne@69 54 from .util.request import body_to_chunks
jpayne@69 55 from .util.ssl_ import assert_fingerprint as _assert_fingerprint
jpayne@69 56 from .util.ssl_ import (
jpayne@69 57 create_urllib3_context,
jpayne@69 58 is_ipaddress,
jpayne@69 59 resolve_cert_reqs,
jpayne@69 60 resolve_ssl_version,
jpayne@69 61 ssl_wrap_socket,
jpayne@69 62 )
jpayne@69 63 from .util.ssl_match_hostname import CertificateError, match_hostname
jpayne@69 64 from .util.url import Url
jpayne@69 65
jpayne@69 66 # Not a no-op, we're adding this to the namespace so it can be imported.
jpayne@69 67 ConnectionError = ConnectionError
jpayne@69 68 BrokenPipeError = BrokenPipeError
jpayne@69 69
jpayne@69 70
jpayne@69 71 log = logging.getLogger(__name__)
jpayne@69 72
jpayne@69 73 port_by_scheme = {"http": 80, "https": 443}
jpayne@69 74
jpayne@69 75 # When it comes time to update this value as a part of regular maintenance
jpayne@69 76 # (ie test_recent_date is failing) update it to ~6 months before the current date.
jpayne@69 77 RECENT_DATE = datetime.date(2023, 6, 1)
jpayne@69 78
jpayne@69 79 _CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
jpayne@69 80
jpayne@69 81 _HAS_SYS_AUDIT = hasattr(sys, "audit")
jpayne@69 82
jpayne@69 83
jpayne@69 84 class HTTPConnection(_HTTPConnection):
jpayne@69 85 """
jpayne@69 86 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
jpayne@69 87 backwards-compatibility layer between older and newer Pythons.
jpayne@69 88
jpayne@69 89 Additional keyword parameters are used to configure attributes of the connection.
jpayne@69 90 Accepted parameters include:
jpayne@69 91
jpayne@69 92 - ``source_address``: Set the source address for the current connection.
jpayne@69 93 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
jpayne@69 94 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
jpayne@69 95 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
jpayne@69 96
jpayne@69 97 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
jpayne@69 98 you might pass:
jpayne@69 99
jpayne@69 100 .. code-block:: python
jpayne@69 101
jpayne@69 102 HTTPConnection.default_socket_options + [
jpayne@69 103 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
jpayne@69 104 ]
jpayne@69 105
jpayne@69 106 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
jpayne@69 107 """
jpayne@69 108
jpayne@69 109 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc]
jpayne@69 110
jpayne@69 111 #: Disable Nagle's algorithm by default.
jpayne@69 112 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
jpayne@69 113 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [
jpayne@69 114 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
jpayne@69 115 ]
jpayne@69 116
jpayne@69 117 #: Whether this connection verifies the host's certificate.
jpayne@69 118 is_verified: bool = False
jpayne@69 119
jpayne@69 120 #: Whether this proxy connection verified the proxy host's certificate.
jpayne@69 121 # If no proxy is currently connected to the value will be ``None``.
jpayne@69 122 proxy_is_verified: bool | None = None
jpayne@69 123
jpayne@69 124 blocksize: int
jpayne@69 125 source_address: tuple[str, int] | None
jpayne@69 126 socket_options: connection._TYPE_SOCKET_OPTIONS | None
jpayne@69 127
jpayne@69 128 _has_connected_to_proxy: bool
jpayne@69 129 _response_options: _ResponseOptions | None
jpayne@69 130 _tunnel_host: str | None
jpayne@69 131 _tunnel_port: int | None
jpayne@69 132 _tunnel_scheme: str | None
jpayne@69 133
jpayne@69 134 def __init__(
jpayne@69 135 self,
jpayne@69 136 host: str,
jpayne@69 137 port: int | None = None,
jpayne@69 138 *,
jpayne@69 139 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
jpayne@69 140 source_address: tuple[str, int] | None = None,
jpayne@69 141 blocksize: int = 16384,
jpayne@69 142 socket_options: None
jpayne@69 143 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options,
jpayne@69 144 proxy: Url | None = None,
jpayne@69 145 proxy_config: ProxyConfig | None = None,
jpayne@69 146 ) -> None:
jpayne@69 147 super().__init__(
jpayne@69 148 host=host,
jpayne@69 149 port=port,
jpayne@69 150 timeout=Timeout.resolve_default_timeout(timeout),
jpayne@69 151 source_address=source_address,
jpayne@69 152 blocksize=blocksize,
jpayne@69 153 )
jpayne@69 154 self.socket_options = socket_options
jpayne@69 155 self.proxy = proxy
jpayne@69 156 self.proxy_config = proxy_config
jpayne@69 157
jpayne@69 158 self._has_connected_to_proxy = False
jpayne@69 159 self._response_options = None
jpayne@69 160 self._tunnel_host: str | None = None
jpayne@69 161 self._tunnel_port: int | None = None
jpayne@69 162 self._tunnel_scheme: str | None = None
jpayne@69 163
jpayne@69 164 @property
jpayne@69 165 def host(self) -> str:
jpayne@69 166 """
jpayne@69 167 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
jpayne@69 168
jpayne@69 169 In general, SSL certificates don't include the trailing dot indicating a
jpayne@69 170 fully-qualified domain name, and thus, they don't validate properly when
jpayne@69 171 checked against a domain name that includes the dot. In addition, some
jpayne@69 172 servers may not expect to receive the trailing dot when provided.
jpayne@69 173
jpayne@69 174 However, the hostname with trailing dot is critical to DNS resolution; doing a
jpayne@69 175 lookup with the trailing dot will properly only resolve the appropriate FQDN,
jpayne@69 176 whereas a lookup without a trailing dot will search the system's search domain
jpayne@69 177 list. Thus, it's important to keep the original host around for use only in
jpayne@69 178 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
jpayne@69 179 actual TCP connection across which we're going to send HTTP requests).
jpayne@69 180 """
jpayne@69 181 return self._dns_host.rstrip(".")
jpayne@69 182
jpayne@69 183 @host.setter
jpayne@69 184 def host(self, value: str) -> None:
jpayne@69 185 """
jpayne@69 186 Setter for the `host` property.
jpayne@69 187
jpayne@69 188 We assume that only urllib3 uses the _dns_host attribute; httplib itself
jpayne@69 189 only uses `host`, and it seems reasonable that other libraries follow suit.
jpayne@69 190 """
jpayne@69 191 self._dns_host = value
jpayne@69 192
jpayne@69 193 def _new_conn(self) -> socket.socket:
jpayne@69 194 """Establish a socket connection and set nodelay settings on it.
jpayne@69 195
jpayne@69 196 :return: New socket connection.
jpayne@69 197 """
jpayne@69 198 try:
jpayne@69 199 sock = connection.create_connection(
jpayne@69 200 (self._dns_host, self.port),
jpayne@69 201 self.timeout,
jpayne@69 202 source_address=self.source_address,
jpayne@69 203 socket_options=self.socket_options,
jpayne@69 204 )
jpayne@69 205 except socket.gaierror as e:
jpayne@69 206 raise NameResolutionError(self.host, self, e) from e
jpayne@69 207 except SocketTimeout as e:
jpayne@69 208 raise ConnectTimeoutError(
jpayne@69 209 self,
jpayne@69 210 f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
jpayne@69 211 ) from e
jpayne@69 212
jpayne@69 213 except OSError as e:
jpayne@69 214 raise NewConnectionError(
jpayne@69 215 self, f"Failed to establish a new connection: {e}"
jpayne@69 216 ) from e
jpayne@69 217
jpayne@69 218 # Audit hooks are only available in Python 3.8+
jpayne@69 219 if _HAS_SYS_AUDIT:
jpayne@69 220 sys.audit("http.client.connect", self, self.host, self.port)
jpayne@69 221
jpayne@69 222 return sock
jpayne@69 223
jpayne@69 224 def set_tunnel(
jpayne@69 225 self,
jpayne@69 226 host: str,
jpayne@69 227 port: int | None = None,
jpayne@69 228 headers: typing.Mapping[str, str] | None = None,
jpayne@69 229 scheme: str = "http",
jpayne@69 230 ) -> None:
jpayne@69 231 if scheme not in ("http", "https"):
jpayne@69 232 raise ValueError(
jpayne@69 233 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'"
jpayne@69 234 )
jpayne@69 235 super().set_tunnel(host, port=port, headers=headers)
jpayne@69 236 self._tunnel_scheme = scheme
jpayne@69 237
jpayne@69 238 if sys.version_info < (3, 11, 4):
jpayne@69 239
jpayne@69 240 def _tunnel(self) -> None:
jpayne@69 241 _MAXLINE = http.client._MAXLINE # type: ignore[attr-defined]
jpayne@69 242 connect = b"CONNECT %s:%d HTTP/1.0\r\n" % ( # type: ignore[str-format]
jpayne@69 243 self._tunnel_host.encode("ascii"), # type: ignore[union-attr]
jpayne@69 244 self._tunnel_port,
jpayne@69 245 )
jpayne@69 246 headers = [connect]
jpayne@69 247 for header, value in self._tunnel_headers.items(): # type: ignore[attr-defined]
jpayne@69 248 headers.append(f"{header}: {value}\r\n".encode("latin-1"))
jpayne@69 249 headers.append(b"\r\n")
jpayne@69 250 # Making a single send() call instead of one per line encourages
jpayne@69 251 # the host OS to use a more optimal packet size instead of
jpayne@69 252 # potentially emitting a series of small packets.
jpayne@69 253 self.send(b"".join(headers))
jpayne@69 254 del headers
jpayne@69 255
jpayne@69 256 response = self.response_class(self.sock, method=self._method) # type: ignore[attr-defined]
jpayne@69 257 try:
jpayne@69 258 (version, code, message) = response._read_status() # type: ignore[attr-defined]
jpayne@69 259
jpayne@69 260 if code != http.HTTPStatus.OK:
jpayne@69 261 self.close()
jpayne@69 262 raise OSError(f"Tunnel connection failed: {code} {message.strip()}")
jpayne@69 263 while True:
jpayne@69 264 line = response.fp.readline(_MAXLINE + 1)
jpayne@69 265 if len(line) > _MAXLINE:
jpayne@69 266 raise http.client.LineTooLong("header line")
jpayne@69 267 if not line:
jpayne@69 268 # for sites which EOF without sending a trailer
jpayne@69 269 break
jpayne@69 270 if line in (b"\r\n", b"\n", b""):
jpayne@69 271 break
jpayne@69 272
jpayne@69 273 if self.debuglevel > 0:
jpayne@69 274 print("header:", line.decode())
jpayne@69 275 finally:
jpayne@69 276 response.close()
jpayne@69 277
jpayne@69 278 def connect(self) -> None:
jpayne@69 279 self.sock = self._new_conn()
jpayne@69 280 if self._tunnel_host:
jpayne@69 281 # If we're tunneling it means we're connected to our proxy.
jpayne@69 282 self._has_connected_to_proxy = True
jpayne@69 283
jpayne@69 284 # TODO: Fix tunnel so it doesn't depend on self.sock state.
jpayne@69 285 self._tunnel()
jpayne@69 286
jpayne@69 287 # If there's a proxy to be connected to we are fully connected.
jpayne@69 288 # This is set twice (once above and here) due to forwarding proxies
jpayne@69 289 # not using tunnelling.
jpayne@69 290 self._has_connected_to_proxy = bool(self.proxy)
jpayne@69 291
jpayne@69 292 if self._has_connected_to_proxy:
jpayne@69 293 self.proxy_is_verified = False
jpayne@69 294
jpayne@69 295 @property
jpayne@69 296 def is_closed(self) -> bool:
jpayne@69 297 return self.sock is None
jpayne@69 298
jpayne@69 299 @property
jpayne@69 300 def is_connected(self) -> bool:
jpayne@69 301 if self.sock is None:
jpayne@69 302 return False
jpayne@69 303 return not wait_for_read(self.sock, timeout=0.0)
jpayne@69 304
jpayne@69 305 @property
jpayne@69 306 def has_connected_to_proxy(self) -> bool:
jpayne@69 307 return self._has_connected_to_proxy
jpayne@69 308
jpayne@69 309 @property
jpayne@69 310 def proxy_is_forwarding(self) -> bool:
jpayne@69 311 """
jpayne@69 312 Return True if a forwarding proxy is configured, else return False
jpayne@69 313 """
jpayne@69 314 return bool(self.proxy) and self._tunnel_host is None
jpayne@69 315
jpayne@69 316 def close(self) -> None:
jpayne@69 317 try:
jpayne@69 318 super().close()
jpayne@69 319 finally:
jpayne@69 320 # Reset all stateful properties so connection
jpayne@69 321 # can be re-used without leaking prior configs.
jpayne@69 322 self.sock = None
jpayne@69 323 self.is_verified = False
jpayne@69 324 self.proxy_is_verified = None
jpayne@69 325 self._has_connected_to_proxy = False
jpayne@69 326 self._response_options = None
jpayne@69 327 self._tunnel_host = None
jpayne@69 328 self._tunnel_port = None
jpayne@69 329 self._tunnel_scheme = None
jpayne@69 330
jpayne@69 331 def putrequest(
jpayne@69 332 self,
jpayne@69 333 method: str,
jpayne@69 334 url: str,
jpayne@69 335 skip_host: bool = False,
jpayne@69 336 skip_accept_encoding: bool = False,
jpayne@69 337 ) -> None:
jpayne@69 338 """"""
jpayne@69 339 # Empty docstring because the indentation of CPython's implementation
jpayne@69 340 # is broken but we don't want this method in our documentation.
jpayne@69 341 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
jpayne@69 342 if match:
jpayne@69 343 raise ValueError(
jpayne@69 344 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})"
jpayne@69 345 )
jpayne@69 346
jpayne@69 347 return super().putrequest(
jpayne@69 348 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding
jpayne@69 349 )
jpayne@69 350
jpayne@69 351 def putheader(self, header: str, *values: str) -> None: # type: ignore[override]
jpayne@69 352 """"""
jpayne@69 353 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
jpayne@69 354 super().putheader(header, *values)
jpayne@69 355 elif to_str(header.lower()) not in SKIPPABLE_HEADERS:
jpayne@69 356 skippable_headers = "', '".join(
jpayne@69 357 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)]
jpayne@69 358 )
jpayne@69 359 raise ValueError(
jpayne@69 360 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'"
jpayne@69 361 )
jpayne@69 362
jpayne@69 363 # `request` method's signature intentionally violates LSP.
jpayne@69 364 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental.
jpayne@69 365 def request( # type: ignore[override]
jpayne@69 366 self,
jpayne@69 367 method: str,
jpayne@69 368 url: str,
jpayne@69 369 body: _TYPE_BODY | None = None,
jpayne@69 370 headers: typing.Mapping[str, str] | None = None,
jpayne@69 371 *,
jpayne@69 372 chunked: bool = False,
jpayne@69 373 preload_content: bool = True,
jpayne@69 374 decode_content: bool = True,
jpayne@69 375 enforce_content_length: bool = True,
jpayne@69 376 ) -> None:
jpayne@69 377 # Update the inner socket's timeout value to send the request.
jpayne@69 378 # This only triggers if the connection is re-used.
jpayne@69 379 if self.sock is not None:
jpayne@69 380 self.sock.settimeout(self.timeout)
jpayne@69 381
jpayne@69 382 # Store these values to be fed into the HTTPResponse
jpayne@69 383 # object later. TODO: Remove this in favor of a real
jpayne@69 384 # HTTP lifecycle mechanism.
jpayne@69 385
jpayne@69 386 # We have to store these before we call .request()
jpayne@69 387 # because sometimes we can still salvage a response
jpayne@69 388 # off the wire even if we aren't able to completely
jpayne@69 389 # send the request body.
jpayne@69 390 self._response_options = _ResponseOptions(
jpayne@69 391 request_method=method,
jpayne@69 392 request_url=url,
jpayne@69 393 preload_content=preload_content,
jpayne@69 394 decode_content=decode_content,
jpayne@69 395 enforce_content_length=enforce_content_length,
jpayne@69 396 )
jpayne@69 397
jpayne@69 398 if headers is None:
jpayne@69 399 headers = {}
jpayne@69 400 header_keys = frozenset(to_str(k.lower()) for k in headers)
jpayne@69 401 skip_accept_encoding = "accept-encoding" in header_keys
jpayne@69 402 skip_host = "host" in header_keys
jpayne@69 403 self.putrequest(
jpayne@69 404 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
jpayne@69 405 )
jpayne@69 406
jpayne@69 407 # Transform the body into an iterable of sendall()-able chunks
jpayne@69 408 # and detect if an explicit Content-Length is doable.
jpayne@69 409 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
jpayne@69 410 chunks = chunks_and_cl.chunks
jpayne@69 411 content_length = chunks_and_cl.content_length
jpayne@69 412
jpayne@69 413 # When chunked is explicit set to 'True' we respect that.
jpayne@69 414 if chunked:
jpayne@69 415 if "transfer-encoding" not in header_keys:
jpayne@69 416 self.putheader("Transfer-Encoding", "chunked")
jpayne@69 417 else:
jpayne@69 418 # Detect whether a framing mechanism is already in use. If so
jpayne@69 419 # we respect that value, otherwise we pick chunked vs content-length
jpayne@69 420 # depending on the type of 'body'.
jpayne@69 421 if "content-length" in header_keys:
jpayne@69 422 chunked = False
jpayne@69 423 elif "transfer-encoding" in header_keys:
jpayne@69 424 chunked = True
jpayne@69 425
jpayne@69 426 # Otherwise we go off the recommendation of 'body_to_chunks()'.
jpayne@69 427 else:
jpayne@69 428 chunked = False
jpayne@69 429 if content_length is None:
jpayne@69 430 if chunks is not None:
jpayne@69 431 chunked = True
jpayne@69 432 self.putheader("Transfer-Encoding", "chunked")
jpayne@69 433 else:
jpayne@69 434 self.putheader("Content-Length", str(content_length))
jpayne@69 435
jpayne@69 436 # Now that framing headers are out of the way we send all the other headers.
jpayne@69 437 if "user-agent" not in header_keys:
jpayne@69 438 self.putheader("User-Agent", _get_default_user_agent())
jpayne@69 439 for header, value in headers.items():
jpayne@69 440 self.putheader(header, value)
jpayne@69 441 self.endheaders()
jpayne@69 442
jpayne@69 443 # If we're given a body we start sending that in chunks.
jpayne@69 444 if chunks is not None:
jpayne@69 445 for chunk in chunks:
jpayne@69 446 # Sending empty chunks isn't allowed for TE: chunked
jpayne@69 447 # as it indicates the end of the body.
jpayne@69 448 if not chunk:
jpayne@69 449 continue
jpayne@69 450 if isinstance(chunk, str):
jpayne@69 451 chunk = chunk.encode("utf-8")
jpayne@69 452 if chunked:
jpayne@69 453 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk))
jpayne@69 454 else:
jpayne@69 455 self.send(chunk)
jpayne@69 456
jpayne@69 457 # Regardless of whether we have a body or not, if we're in
jpayne@69 458 # chunked mode we want to send an explicit empty chunk.
jpayne@69 459 if chunked:
jpayne@69 460 self.send(b"0\r\n\r\n")
jpayne@69 461
jpayne@69 462 def request_chunked(
jpayne@69 463 self,
jpayne@69 464 method: str,
jpayne@69 465 url: str,
jpayne@69 466 body: _TYPE_BODY | None = None,
jpayne@69 467 headers: typing.Mapping[str, str] | None = None,
jpayne@69 468 ) -> None:
jpayne@69 469 """
jpayne@69 470 Alternative to the common request method, which sends the
jpayne@69 471 body with chunked encoding and not as one block
jpayne@69 472 """
jpayne@69 473 warnings.warn(
jpayne@69 474 "HTTPConnection.request_chunked() is deprecated and will be removed "
jpayne@69 475 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).",
jpayne@69 476 category=DeprecationWarning,
jpayne@69 477 stacklevel=2,
jpayne@69 478 )
jpayne@69 479 self.request(method, url, body=body, headers=headers, chunked=True)
jpayne@69 480
jpayne@69 481 def getresponse( # type: ignore[override]
jpayne@69 482 self,
jpayne@69 483 ) -> HTTPResponse:
jpayne@69 484 """
jpayne@69 485 Get the response from the server.
jpayne@69 486
jpayne@69 487 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable.
jpayne@69 488
jpayne@69 489 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed.
jpayne@69 490 """
jpayne@69 491 # Raise the same error as http.client.HTTPConnection
jpayne@69 492 if self._response_options is None:
jpayne@69 493 raise ResponseNotReady()
jpayne@69 494
jpayne@69 495 # Reset this attribute for being used again.
jpayne@69 496 resp_options = self._response_options
jpayne@69 497 self._response_options = None
jpayne@69 498
jpayne@69 499 # Since the connection's timeout value may have been updated
jpayne@69 500 # we need to set the timeout on the socket.
jpayne@69 501 self.sock.settimeout(self.timeout)
jpayne@69 502
jpayne@69 503 # This is needed here to avoid circular import errors
jpayne@69 504 from .response import HTTPResponse
jpayne@69 505
jpayne@69 506 # Get the response from http.client.HTTPConnection
jpayne@69 507 httplib_response = super().getresponse()
jpayne@69 508
jpayne@69 509 try:
jpayne@69 510 assert_header_parsing(httplib_response.msg)
jpayne@69 511 except (HeaderParsingError, TypeError) as hpe:
jpayne@69 512 log.warning(
jpayne@69 513 "Failed to parse headers (url=%s): %s",
jpayne@69 514 _url_from_connection(self, resp_options.request_url),
jpayne@69 515 hpe,
jpayne@69 516 exc_info=True,
jpayne@69 517 )
jpayne@69 518
jpayne@69 519 headers = HTTPHeaderDict(httplib_response.msg.items())
jpayne@69 520
jpayne@69 521 response = HTTPResponse(
jpayne@69 522 body=httplib_response,
jpayne@69 523 headers=headers,
jpayne@69 524 status=httplib_response.status,
jpayne@69 525 version=httplib_response.version,
jpayne@69 526 version_string=getattr(self, "_http_vsn_str", "HTTP/?"),
jpayne@69 527 reason=httplib_response.reason,
jpayne@69 528 preload_content=resp_options.preload_content,
jpayne@69 529 decode_content=resp_options.decode_content,
jpayne@69 530 original_response=httplib_response,
jpayne@69 531 enforce_content_length=resp_options.enforce_content_length,
jpayne@69 532 request_method=resp_options.request_method,
jpayne@69 533 request_url=resp_options.request_url,
jpayne@69 534 )
jpayne@69 535 return response
jpayne@69 536
jpayne@69 537
jpayne@69 538 class HTTPSConnection(HTTPConnection):
jpayne@69 539 """
jpayne@69 540 Many of the parameters to this constructor are passed to the underlying SSL
jpayne@69 541 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
jpayne@69 542 """
jpayne@69 543
jpayne@69 544 default_port = port_by_scheme["https"] # type: ignore[misc]
jpayne@69 545
jpayne@69 546 cert_reqs: int | str | None = None
jpayne@69 547 ca_certs: str | None = None
jpayne@69 548 ca_cert_dir: str | None = None
jpayne@69 549 ca_cert_data: None | str | bytes = None
jpayne@69 550 ssl_version: int | str | None = None
jpayne@69 551 ssl_minimum_version: int | None = None
jpayne@69 552 ssl_maximum_version: int | None = None
jpayne@69 553 assert_fingerprint: str | None = None
jpayne@69 554 _connect_callback: typing.Callable[..., None] | None = None
jpayne@69 555
jpayne@69 556 def __init__(
jpayne@69 557 self,
jpayne@69 558 host: str,
jpayne@69 559 port: int | None = None,
jpayne@69 560 *,
jpayne@69 561 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
jpayne@69 562 source_address: tuple[str, int] | None = None,
jpayne@69 563 blocksize: int = 16384,
jpayne@69 564 socket_options: None
jpayne@69 565 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options,
jpayne@69 566 proxy: Url | None = None,
jpayne@69 567 proxy_config: ProxyConfig | None = None,
jpayne@69 568 cert_reqs: int | str | None = None,
jpayne@69 569 assert_hostname: None | str | typing.Literal[False] = None,
jpayne@69 570 assert_fingerprint: str | None = None,
jpayne@69 571 server_hostname: str | None = None,
jpayne@69 572 ssl_context: ssl.SSLContext | None = None,
jpayne@69 573 ca_certs: str | None = None,
jpayne@69 574 ca_cert_dir: str | None = None,
jpayne@69 575 ca_cert_data: None | str | bytes = None,
jpayne@69 576 ssl_minimum_version: int | None = None,
jpayne@69 577 ssl_maximum_version: int | None = None,
jpayne@69 578 ssl_version: int | str | None = None, # Deprecated
jpayne@69 579 cert_file: str | None = None,
jpayne@69 580 key_file: str | None = None,
jpayne@69 581 key_password: str | None = None,
jpayne@69 582 ) -> None:
jpayne@69 583 super().__init__(
jpayne@69 584 host,
jpayne@69 585 port=port,
jpayne@69 586 timeout=timeout,
jpayne@69 587 source_address=source_address,
jpayne@69 588 blocksize=blocksize,
jpayne@69 589 socket_options=socket_options,
jpayne@69 590 proxy=proxy,
jpayne@69 591 proxy_config=proxy_config,
jpayne@69 592 )
jpayne@69 593
jpayne@69 594 self.key_file = key_file
jpayne@69 595 self.cert_file = cert_file
jpayne@69 596 self.key_password = key_password
jpayne@69 597 self.ssl_context = ssl_context
jpayne@69 598 self.server_hostname = server_hostname
jpayne@69 599 self.assert_hostname = assert_hostname
jpayne@69 600 self.assert_fingerprint = assert_fingerprint
jpayne@69 601 self.ssl_version = ssl_version
jpayne@69 602 self.ssl_minimum_version = ssl_minimum_version
jpayne@69 603 self.ssl_maximum_version = ssl_maximum_version
jpayne@69 604 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
jpayne@69 605 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
jpayne@69 606 self.ca_cert_data = ca_cert_data
jpayne@69 607
jpayne@69 608 # cert_reqs depends on ssl_context so calculate last.
jpayne@69 609 if cert_reqs is None:
jpayne@69 610 if self.ssl_context is not None:
jpayne@69 611 cert_reqs = self.ssl_context.verify_mode
jpayne@69 612 else:
jpayne@69 613 cert_reqs = resolve_cert_reqs(None)
jpayne@69 614 self.cert_reqs = cert_reqs
jpayne@69 615 self._connect_callback = None
jpayne@69 616
jpayne@69 617 def set_cert(
jpayne@69 618 self,
jpayne@69 619 key_file: str | None = None,
jpayne@69 620 cert_file: str | None = None,
jpayne@69 621 cert_reqs: int | str | None = None,
jpayne@69 622 key_password: str | None = None,
jpayne@69 623 ca_certs: str | None = None,
jpayne@69 624 assert_hostname: None | str | typing.Literal[False] = None,
jpayne@69 625 assert_fingerprint: str | None = None,
jpayne@69 626 ca_cert_dir: str | None = None,
jpayne@69 627 ca_cert_data: None | str | bytes = None,
jpayne@69 628 ) -> None:
jpayne@69 629 """
jpayne@69 630 This method should only be called once, before the connection is used.
jpayne@69 631 """
jpayne@69 632 warnings.warn(
jpayne@69 633 "HTTPSConnection.set_cert() is deprecated and will be removed "
jpayne@69 634 "in urllib3 v2.1.0. Instead provide the parameters to the "
jpayne@69 635 "HTTPSConnection constructor.",
jpayne@69 636 category=DeprecationWarning,
jpayne@69 637 stacklevel=2,
jpayne@69 638 )
jpayne@69 639
jpayne@69 640 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
jpayne@69 641 # have an SSLContext object in which case we'll use its verify_mode.
jpayne@69 642 if cert_reqs is None:
jpayne@69 643 if self.ssl_context is not None:
jpayne@69 644 cert_reqs = self.ssl_context.verify_mode
jpayne@69 645 else:
jpayne@69 646 cert_reqs = resolve_cert_reqs(None)
jpayne@69 647
jpayne@69 648 self.key_file = key_file
jpayne@69 649 self.cert_file = cert_file
jpayne@69 650 self.cert_reqs = cert_reqs
jpayne@69 651 self.key_password = key_password
jpayne@69 652 self.assert_hostname = assert_hostname
jpayne@69 653 self.assert_fingerprint = assert_fingerprint
jpayne@69 654 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
jpayne@69 655 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
jpayne@69 656 self.ca_cert_data = ca_cert_data
jpayne@69 657
jpayne@69 658 def connect(self) -> None:
jpayne@69 659 # Today we don't need to be doing this step before the /actual/ socket
jpayne@69 660 # connection, however in the future we'll need to decide whether to
jpayne@69 661 # create a new socket or re-use an existing "shared" socket as a part
jpayne@69 662 # of the HTTP/2 handshake dance.
jpayne@69 663 if self._tunnel_host is not None and self._tunnel_port is not None:
jpayne@69 664 probe_http2_host = self._tunnel_host
jpayne@69 665 probe_http2_port = self._tunnel_port
jpayne@69 666 else:
jpayne@69 667 probe_http2_host = self.host
jpayne@69 668 probe_http2_port = self.port
jpayne@69 669
jpayne@69 670 # Check if the target origin supports HTTP/2.
jpayne@69 671 # If the value comes back as 'None' it means that the current thread
jpayne@69 672 # is probing for HTTP/2 support. Otherwise, we're waiting for another
jpayne@69 673 # probe to complete, or we get a value right away.
jpayne@69 674 target_supports_http2: bool | None
jpayne@69 675 if "h2" in ssl_.ALPN_PROTOCOLS:
jpayne@69 676 target_supports_http2 = http2_probe.acquire_and_get(
jpayne@69 677 host=probe_http2_host, port=probe_http2_port
jpayne@69 678 )
jpayne@69 679 else:
jpayne@69 680 # If HTTP/2 isn't going to be offered it doesn't matter if
jpayne@69 681 # the target supports HTTP/2. Don't want to make a probe.
jpayne@69 682 target_supports_http2 = False
jpayne@69 683
jpayne@69 684 if self._connect_callback is not None:
jpayne@69 685 self._connect_callback(
jpayne@69 686 "before connect",
jpayne@69 687 thread_id=threading.get_ident(),
jpayne@69 688 target_supports_http2=target_supports_http2,
jpayne@69 689 )
jpayne@69 690
jpayne@69 691 try:
jpayne@69 692 sock: socket.socket | ssl.SSLSocket
jpayne@69 693 self.sock = sock = self._new_conn()
jpayne@69 694 server_hostname: str = self.host
jpayne@69 695 tls_in_tls = False
jpayne@69 696
jpayne@69 697 # Do we need to establish a tunnel?
jpayne@69 698 if self._tunnel_host is not None:
jpayne@69 699 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS.
jpayne@69 700 if self._tunnel_scheme == "https":
jpayne@69 701 # _connect_tls_proxy will verify and assign proxy_is_verified
jpayne@69 702 self.sock = sock = self._connect_tls_proxy(self.host, sock)
jpayne@69 703 tls_in_tls = True
jpayne@69 704 elif self._tunnel_scheme == "http":
jpayne@69 705 self.proxy_is_verified = False
jpayne@69 706
jpayne@69 707 # If we're tunneling it means we're connected to our proxy.
jpayne@69 708 self._has_connected_to_proxy = True
jpayne@69 709
jpayne@69 710 self._tunnel()
jpayne@69 711 # Override the host with the one we're requesting data from.
jpayne@69 712 server_hostname = self._tunnel_host
jpayne@69 713
jpayne@69 714 if self.server_hostname is not None:
jpayne@69 715 server_hostname = self.server_hostname
jpayne@69 716
jpayne@69 717 is_time_off = datetime.date.today() < RECENT_DATE
jpayne@69 718 if is_time_off:
jpayne@69 719 warnings.warn(
jpayne@69 720 (
jpayne@69 721 f"System time is way off (before {RECENT_DATE}). This will probably "
jpayne@69 722 "lead to SSL verification errors"
jpayne@69 723 ),
jpayne@69 724 SystemTimeWarning,
jpayne@69 725 )
jpayne@69 726
jpayne@69 727 # Remove trailing '.' from fqdn hostnames to allow certificate validation
jpayne@69 728 server_hostname_rm_dot = server_hostname.rstrip(".")
jpayne@69 729
jpayne@69 730 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
jpayne@69 731 sock=sock,
jpayne@69 732 cert_reqs=self.cert_reqs,
jpayne@69 733 ssl_version=self.ssl_version,
jpayne@69 734 ssl_minimum_version=self.ssl_minimum_version,
jpayne@69 735 ssl_maximum_version=self.ssl_maximum_version,
jpayne@69 736 ca_certs=self.ca_certs,
jpayne@69 737 ca_cert_dir=self.ca_cert_dir,
jpayne@69 738 ca_cert_data=self.ca_cert_data,
jpayne@69 739 cert_file=self.cert_file,
jpayne@69 740 key_file=self.key_file,
jpayne@69 741 key_password=self.key_password,
jpayne@69 742 server_hostname=server_hostname_rm_dot,
jpayne@69 743 ssl_context=self.ssl_context,
jpayne@69 744 tls_in_tls=tls_in_tls,
jpayne@69 745 assert_hostname=self.assert_hostname,
jpayne@69 746 assert_fingerprint=self.assert_fingerprint,
jpayne@69 747 )
jpayne@69 748 self.sock = sock_and_verified.socket
jpayne@69 749
jpayne@69 750 # If an error occurs during connection/handshake we may need to release
jpayne@69 751 # our lock so another connection can probe the origin.
jpayne@69 752 except BaseException:
jpayne@69 753 if self._connect_callback is not None:
jpayne@69 754 self._connect_callback(
jpayne@69 755 "after connect failure",
jpayne@69 756 thread_id=threading.get_ident(),
jpayne@69 757 target_supports_http2=target_supports_http2,
jpayne@69 758 )
jpayne@69 759
jpayne@69 760 if target_supports_http2 is None:
jpayne@69 761 http2_probe.set_and_release(
jpayne@69 762 host=probe_http2_host, port=probe_http2_port, supports_http2=None
jpayne@69 763 )
jpayne@69 764 raise
jpayne@69 765
jpayne@69 766 # If this connection doesn't know if the origin supports HTTP/2
jpayne@69 767 # we report back to the HTTP/2 probe our result.
jpayne@69 768 if target_supports_http2 is None:
jpayne@69 769 supports_http2 = sock_and_verified.socket.selected_alpn_protocol() == "h2"
jpayne@69 770 http2_probe.set_and_release(
jpayne@69 771 host=probe_http2_host,
jpayne@69 772 port=probe_http2_port,
jpayne@69 773 supports_http2=supports_http2,
jpayne@69 774 )
jpayne@69 775
jpayne@69 776 # Forwarding proxies can never have a verified target since
jpayne@69 777 # the proxy is the one doing the verification. Should instead
jpayne@69 778 # use a CONNECT tunnel in order to verify the target.
jpayne@69 779 # See: https://github.com/urllib3/urllib3/issues/3267.
jpayne@69 780 if self.proxy_is_forwarding:
jpayne@69 781 self.is_verified = False
jpayne@69 782 else:
jpayne@69 783 self.is_verified = sock_and_verified.is_verified
jpayne@69 784
jpayne@69 785 # If there's a proxy to be connected to we are fully connected.
jpayne@69 786 # This is set twice (once above and here) due to forwarding proxies
jpayne@69 787 # not using tunnelling.
jpayne@69 788 self._has_connected_to_proxy = bool(self.proxy)
jpayne@69 789
jpayne@69 790 # Set `self.proxy_is_verified` unless it's already set while
jpayne@69 791 # establishing a tunnel.
jpayne@69 792 if self._has_connected_to_proxy and self.proxy_is_verified is None:
jpayne@69 793 self.proxy_is_verified = sock_and_verified.is_verified
jpayne@69 794
jpayne@69 795 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket:
jpayne@69 796 """
jpayne@69 797 Establish a TLS connection to the proxy using the provided SSL context.
jpayne@69 798 """
jpayne@69 799 # `_connect_tls_proxy` is called when self._tunnel_host is truthy.
jpayne@69 800 proxy_config = typing.cast(ProxyConfig, self.proxy_config)
jpayne@69 801 ssl_context = proxy_config.ssl_context
jpayne@69 802 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
jpayne@69 803 sock,
jpayne@69 804 cert_reqs=self.cert_reqs,
jpayne@69 805 ssl_version=self.ssl_version,
jpayne@69 806 ssl_minimum_version=self.ssl_minimum_version,
jpayne@69 807 ssl_maximum_version=self.ssl_maximum_version,
jpayne@69 808 ca_certs=self.ca_certs,
jpayne@69 809 ca_cert_dir=self.ca_cert_dir,
jpayne@69 810 ca_cert_data=self.ca_cert_data,
jpayne@69 811 server_hostname=hostname,
jpayne@69 812 ssl_context=ssl_context,
jpayne@69 813 assert_hostname=proxy_config.assert_hostname,
jpayne@69 814 assert_fingerprint=proxy_config.assert_fingerprint,
jpayne@69 815 # Features that aren't implemented for proxies yet:
jpayne@69 816 cert_file=None,
jpayne@69 817 key_file=None,
jpayne@69 818 key_password=None,
jpayne@69 819 tls_in_tls=False,
jpayne@69 820 )
jpayne@69 821 self.proxy_is_verified = sock_and_verified.is_verified
jpayne@69 822 return sock_and_verified.socket # type: ignore[return-value]
jpayne@69 823
jpayne@69 824
jpayne@69 825 class _WrappedAndVerifiedSocket(typing.NamedTuple):
jpayne@69 826 """
jpayne@69 827 Wrapped socket and whether the connection is
jpayne@69 828 verified after the TLS handshake
jpayne@69 829 """
jpayne@69 830
jpayne@69 831 socket: ssl.SSLSocket | SSLTransport
jpayne@69 832 is_verified: bool
jpayne@69 833
jpayne@69 834
jpayne@69 835 def _ssl_wrap_socket_and_match_hostname(
jpayne@69 836 sock: socket.socket,
jpayne@69 837 *,
jpayne@69 838 cert_reqs: None | str | int,
jpayne@69 839 ssl_version: None | str | int,
jpayne@69 840 ssl_minimum_version: int | None,
jpayne@69 841 ssl_maximum_version: int | None,
jpayne@69 842 cert_file: str | None,
jpayne@69 843 key_file: str | None,
jpayne@69 844 key_password: str | None,
jpayne@69 845 ca_certs: str | None,
jpayne@69 846 ca_cert_dir: str | None,
jpayne@69 847 ca_cert_data: None | str | bytes,
jpayne@69 848 assert_hostname: None | str | typing.Literal[False],
jpayne@69 849 assert_fingerprint: str | None,
jpayne@69 850 server_hostname: str | None,
jpayne@69 851 ssl_context: ssl.SSLContext | None,
jpayne@69 852 tls_in_tls: bool = False,
jpayne@69 853 ) -> _WrappedAndVerifiedSocket:
jpayne@69 854 """Logic for constructing an SSLContext from all TLS parameters, passing
jpayne@69 855 that down into ssl_wrap_socket, and then doing certificate verification
jpayne@69 856 either via hostname or fingerprint. This function exists to guarantee
jpayne@69 857 that both proxies and targets have the same behavior when connecting via TLS.
jpayne@69 858 """
jpayne@69 859 default_ssl_context = False
jpayne@69 860 if ssl_context is None:
jpayne@69 861 default_ssl_context = True
jpayne@69 862 context = create_urllib3_context(
jpayne@69 863 ssl_version=resolve_ssl_version(ssl_version),
jpayne@69 864 ssl_minimum_version=ssl_minimum_version,
jpayne@69 865 ssl_maximum_version=ssl_maximum_version,
jpayne@69 866 cert_reqs=resolve_cert_reqs(cert_reqs),
jpayne@69 867 )
jpayne@69 868 else:
jpayne@69 869 context = ssl_context
jpayne@69 870
jpayne@69 871 context.verify_mode = resolve_cert_reqs(cert_reqs)
jpayne@69 872
jpayne@69 873 # In some cases, we want to verify hostnames ourselves
jpayne@69 874 if (
jpayne@69 875 # `ssl` can't verify fingerprints or alternate hostnames
jpayne@69 876 assert_fingerprint
jpayne@69 877 or assert_hostname
jpayne@69 878 # assert_hostname can be set to False to disable hostname checking
jpayne@69 879 or assert_hostname is False
jpayne@69 880 # We still support OpenSSL 1.0.2, which prevents us from verifying
jpayne@69 881 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933
jpayne@69 882 or ssl_.IS_PYOPENSSL
jpayne@69 883 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME
jpayne@69 884 ):
jpayne@69 885 context.check_hostname = False
jpayne@69 886
jpayne@69 887 # Try to load OS default certs if none are given. We need to do the hasattr() check
jpayne@69 888 # for custom pyOpenSSL SSLContext objects because they don't support
jpayne@69 889 # load_default_certs().
jpayne@69 890 if (
jpayne@69 891 not ca_certs
jpayne@69 892 and not ca_cert_dir
jpayne@69 893 and not ca_cert_data
jpayne@69 894 and default_ssl_context
jpayne@69 895 and hasattr(context, "load_default_certs")
jpayne@69 896 ):
jpayne@69 897 context.load_default_certs()
jpayne@69 898
jpayne@69 899 # Ensure that IPv6 addresses are in the proper format and don't have a
jpayne@69 900 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses
jpayne@69 901 # and interprets them as DNS hostnames.
jpayne@69 902 if server_hostname is not None:
jpayne@69 903 normalized = server_hostname.strip("[]")
jpayne@69 904 if "%" in normalized:
jpayne@69 905 normalized = normalized[: normalized.rfind("%")]
jpayne@69 906 if is_ipaddress(normalized):
jpayne@69 907 server_hostname = normalized
jpayne@69 908
jpayne@69 909 ssl_sock = ssl_wrap_socket(
jpayne@69 910 sock=sock,
jpayne@69 911 keyfile=key_file,
jpayne@69 912 certfile=cert_file,
jpayne@69 913 key_password=key_password,
jpayne@69 914 ca_certs=ca_certs,
jpayne@69 915 ca_cert_dir=ca_cert_dir,
jpayne@69 916 ca_cert_data=ca_cert_data,
jpayne@69 917 server_hostname=server_hostname,
jpayne@69 918 ssl_context=context,
jpayne@69 919 tls_in_tls=tls_in_tls,
jpayne@69 920 )
jpayne@69 921
jpayne@69 922 try:
jpayne@69 923 if assert_fingerprint:
jpayne@69 924 _assert_fingerprint(
jpayne@69 925 ssl_sock.getpeercert(binary_form=True), assert_fingerprint
jpayne@69 926 )
jpayne@69 927 elif (
jpayne@69 928 context.verify_mode != ssl.CERT_NONE
jpayne@69 929 and not context.check_hostname
jpayne@69 930 and assert_hostname is not False
jpayne@69 931 ):
jpayne@69 932 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment]
jpayne@69 933
jpayne@69 934 # Need to signal to our match_hostname whether to use 'commonName' or not.
jpayne@69 935 # If we're using our own constructed SSLContext we explicitly set 'False'
jpayne@69 936 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name.
jpayne@69 937 if default_ssl_context:
jpayne@69 938 hostname_checks_common_name = False
jpayne@69 939 else:
jpayne@69 940 hostname_checks_common_name = (
jpayne@69 941 getattr(context, "hostname_checks_common_name", False) or False
jpayne@69 942 )
jpayne@69 943
jpayne@69 944 _match_hostname(
jpayne@69 945 cert,
jpayne@69 946 assert_hostname or server_hostname, # type: ignore[arg-type]
jpayne@69 947 hostname_checks_common_name,
jpayne@69 948 )
jpayne@69 949
jpayne@69 950 return _WrappedAndVerifiedSocket(
jpayne@69 951 socket=ssl_sock,
jpayne@69 952 is_verified=context.verify_mode == ssl.CERT_REQUIRED
jpayne@69 953 or bool(assert_fingerprint),
jpayne@69 954 )
jpayne@69 955 except BaseException:
jpayne@69 956 ssl_sock.close()
jpayne@69 957 raise
jpayne@69 958
jpayne@69 959
jpayne@69 960 def _match_hostname(
jpayne@69 961 cert: _TYPE_PEER_CERT_RET_DICT | None,
jpayne@69 962 asserted_hostname: str,
jpayne@69 963 hostname_checks_common_name: bool = False,
jpayne@69 964 ) -> None:
jpayne@69 965 # Our upstream implementation of ssl.match_hostname()
jpayne@69 966 # only applies this normalization to IP addresses so it doesn't
jpayne@69 967 # match DNS SANs so we do the same thing!
jpayne@69 968 stripped_hostname = asserted_hostname.strip("[]")
jpayne@69 969 if is_ipaddress(stripped_hostname):
jpayne@69 970 asserted_hostname = stripped_hostname
jpayne@69 971
jpayne@69 972 try:
jpayne@69 973 match_hostname(cert, asserted_hostname, hostname_checks_common_name)
jpayne@69 974 except CertificateError as e:
jpayne@69 975 log.warning(
jpayne@69 976 "Certificate did not match expected hostname: %s. Certificate: %s",
jpayne@69 977 asserted_hostname,
jpayne@69 978 cert,
jpayne@69 979 )
jpayne@69 980 # Add cert to exception and reraise so client code can inspect
jpayne@69 981 # the cert when catching the exception, if they want to
jpayne@69 982 e._peer_cert = cert # type: ignore[attr-defined]
jpayne@69 983 raise
jpayne@69 984
jpayne@69 985
jpayne@69 986 def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError:
jpayne@69 987 # Look for the phrase 'wrong version number', if found
jpayne@69 988 # then we should warn the user that we're very sure that
jpayne@69 989 # this proxy is HTTP-only and they have a configuration issue.
jpayne@69 990 error_normalized = " ".join(re.split("[^a-z]", str(err).lower()))
jpayne@69 991 is_likely_http_proxy = (
jpayne@69 992 "wrong version number" in error_normalized
jpayne@69 993 or "unknown protocol" in error_normalized
jpayne@69 994 or "record layer failure" in error_normalized
jpayne@69 995 )
jpayne@69 996 http_proxy_warning = (
jpayne@69 997 ". Your proxy appears to only use HTTP and not HTTPS, "
jpayne@69 998 "try changing your proxy URL to be HTTP. See: "
jpayne@69 999 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html"
jpayne@69 1000 "#https-proxy-error-http-proxy"
jpayne@69 1001 )
jpayne@69 1002 new_err = ProxyError(
jpayne@69 1003 f"Unable to connect to proxy"
jpayne@69 1004 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}",
jpayne@69 1005 err,
jpayne@69 1006 )
jpayne@69 1007 new_err.__cause__ = err
jpayne@69 1008 return new_err
jpayne@69 1009
jpayne@69 1010
jpayne@69 1011 def _get_default_user_agent() -> str:
jpayne@69 1012 return f"python-urllib3/{__version__}"
jpayne@69 1013
jpayne@69 1014
jpayne@69 1015 class DummyConnection:
jpayne@69 1016 """Used to detect a failed ConnectionCls import."""
jpayne@69 1017
jpayne@69 1018
jpayne@69 1019 if not ssl:
jpayne@69 1020 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811
jpayne@69 1021
jpayne@69 1022
jpayne@69 1023 VerifiedHTTPSConnection = HTTPSConnection
jpayne@69 1024
jpayne@69 1025
jpayne@69 1026 def _url_from_connection(
jpayne@69 1027 conn: HTTPConnection | HTTPSConnection, path: str | None = None
jpayne@69 1028 ) -> str:
jpayne@69 1029 """Returns the URL from a given connection. This is mainly used for testing and logging."""
jpayne@69 1030
jpayne@69 1031 scheme = "https" if isinstance(conn, HTTPSConnection) else "http"
jpayne@69 1032
jpayne@69 1033 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url