comparison urllib3/connection.py @ 7:5eb2d5e3bf22

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Sun, 05 May 2024 23:32:17 -0400
parents
children
comparison
equal deleted inserted replaced
6:b2745907b1eb 7:5eb2d5e3bf22
1 from __future__ import annotations
2
3 import datetime
4 import logging
5 import os
6 import re
7 import socket
8 import sys
9 import typing
10 import warnings
11 from http.client import HTTPConnection as _HTTPConnection
12 from http.client import HTTPException as HTTPException # noqa: F401
13 from http.client import ResponseNotReady
14 from socket import timeout as SocketTimeout
15
16 if typing.TYPE_CHECKING:
17 from typing import Literal
18
19 from .response import HTTPResponse
20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT
21 from .util.ssltransport import SSLTransport
22
23 from ._collections import HTTPHeaderDict
24 from .util.response import assert_header_parsing
25 from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout
26 from .util.util import to_str
27 from .util.wait import wait_for_read
28
29 try: # Compiled with SSL?
30 import ssl
31
32 BaseSSLError = ssl.SSLError
33 except (ImportError, AttributeError):
34 ssl = None # type: ignore[assignment]
35
36 class BaseSSLError(BaseException): # type: ignore[no-redef]
37 pass
38
39
40 from ._base_connection import _TYPE_BODY
41 from ._base_connection import ProxyConfig as ProxyConfig
42 from ._base_connection import _ResponseOptions as _ResponseOptions
43 from ._version import __version__
44 from .exceptions import (
45 ConnectTimeoutError,
46 HeaderParsingError,
47 NameResolutionError,
48 NewConnectionError,
49 ProxyError,
50 SystemTimeWarning,
51 )
52 from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_
53 from .util.request import body_to_chunks
54 from .util.ssl_ import assert_fingerprint as _assert_fingerprint
55 from .util.ssl_ import (
56 create_urllib3_context,
57 is_ipaddress,
58 resolve_cert_reqs,
59 resolve_ssl_version,
60 ssl_wrap_socket,
61 )
62 from .util.ssl_match_hostname import CertificateError, match_hostname
63 from .util.url import Url
64
65 # Not a no-op, we're adding this to the namespace so it can be imported.
66 ConnectionError = ConnectionError
67 BrokenPipeError = BrokenPipeError
68
69
70 log = logging.getLogger(__name__)
71
72 port_by_scheme = {"http": 80, "https": 443}
73
74 # When it comes time to update this value as a part of regular maintenance
75 # (ie test_recent_date is failing) update it to ~6 months before the current date.
76 RECENT_DATE = datetime.date(2023, 6, 1)
77
78 _CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
79
80 _HAS_SYS_AUDIT = hasattr(sys, "audit")
81
82
83 class HTTPConnection(_HTTPConnection):
84 """
85 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
86 backwards-compatibility layer between older and newer Pythons.
87
88 Additional keyword parameters are used to configure attributes of the connection.
89 Accepted parameters include:
90
91 - ``source_address``: Set the source address for the current connection.
92 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
93 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
94 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
95
96 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
97 you might pass:
98
99 .. code-block:: python
100
101 HTTPConnection.default_socket_options + [
102 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
103 ]
104
105 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
106 """
107
108 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc]
109
110 #: Disable Nagle's algorithm by default.
111 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
112 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [
113 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
114 ]
115
116 #: Whether this connection verifies the host's certificate.
117 is_verified: bool = False
118
119 #: Whether this proxy connection verified the proxy host's certificate.
120 # If no proxy is currently connected to the value will be ``None``.
121 proxy_is_verified: bool | None = None
122
123 blocksize: int
124 source_address: tuple[str, int] | None
125 socket_options: connection._TYPE_SOCKET_OPTIONS | None
126
127 _has_connected_to_proxy: bool
128 _response_options: _ResponseOptions | None
129 _tunnel_host: str | None
130 _tunnel_port: int | None
131 _tunnel_scheme: str | None
132
133 def __init__(
134 self,
135 host: str,
136 port: int | None = None,
137 *,
138 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
139 source_address: tuple[str, int] | None = None,
140 blocksize: int = 16384,
141 socket_options: None
142 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options,
143 proxy: Url | None = None,
144 proxy_config: ProxyConfig | None = None,
145 ) -> None:
146 super().__init__(
147 host=host,
148 port=port,
149 timeout=Timeout.resolve_default_timeout(timeout),
150 source_address=source_address,
151 blocksize=blocksize,
152 )
153 self.socket_options = socket_options
154 self.proxy = proxy
155 self.proxy_config = proxy_config
156
157 self._has_connected_to_proxy = False
158 self._response_options = None
159 self._tunnel_host: str | None = None
160 self._tunnel_port: int | None = None
161 self._tunnel_scheme: str | None = None
162
163 @property
164 def host(self) -> str:
165 """
166 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
167
168 In general, SSL certificates don't include the trailing dot indicating a
169 fully-qualified domain name, and thus, they don't validate properly when
170 checked against a domain name that includes the dot. In addition, some
171 servers may not expect to receive the trailing dot when provided.
172
173 However, the hostname with trailing dot is critical to DNS resolution; doing a
174 lookup with the trailing dot will properly only resolve the appropriate FQDN,
175 whereas a lookup without a trailing dot will search the system's search domain
176 list. Thus, it's important to keep the original host around for use only in
177 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
178 actual TCP connection across which we're going to send HTTP requests).
179 """
180 return self._dns_host.rstrip(".")
181
182 @host.setter
183 def host(self, value: str) -> None:
184 """
185 Setter for the `host` property.
186
187 We assume that only urllib3 uses the _dns_host attribute; httplib itself
188 only uses `host`, and it seems reasonable that other libraries follow suit.
189 """
190 self._dns_host = value
191
192 def _new_conn(self) -> socket.socket:
193 """Establish a socket connection and set nodelay settings on it.
194
195 :return: New socket connection.
196 """
197 try:
198 sock = connection.create_connection(
199 (self._dns_host, self.port),
200 self.timeout,
201 source_address=self.source_address,
202 socket_options=self.socket_options,
203 )
204 except socket.gaierror as e:
205 raise NameResolutionError(self.host, self, e) from e
206 except SocketTimeout as e:
207 raise ConnectTimeoutError(
208 self,
209 f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
210 ) from e
211
212 except OSError as e:
213 raise NewConnectionError(
214 self, f"Failed to establish a new connection: {e}"
215 ) from e
216
217 # Audit hooks are only available in Python 3.8+
218 if _HAS_SYS_AUDIT:
219 sys.audit("http.client.connect", self, self.host, self.port)
220
221 return sock
222
223 def set_tunnel(
224 self,
225 host: str,
226 port: int | None = None,
227 headers: typing.Mapping[str, str] | None = None,
228 scheme: str = "http",
229 ) -> None:
230 if scheme not in ("http", "https"):
231 raise ValueError(
232 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'"
233 )
234 super().set_tunnel(host, port=port, headers=headers)
235 self._tunnel_scheme = scheme
236
237 def connect(self) -> None:
238 self.sock = self._new_conn()
239 if self._tunnel_host:
240 # If we're tunneling it means we're connected to our proxy.
241 self._has_connected_to_proxy = True
242
243 # TODO: Fix tunnel so it doesn't depend on self.sock state.
244 self._tunnel() # type: ignore[attr-defined]
245
246 # If there's a proxy to be connected to we are fully connected.
247 # This is set twice (once above and here) due to forwarding proxies
248 # not using tunnelling.
249 self._has_connected_to_proxy = bool(self.proxy)
250
251 if self._has_connected_to_proxy:
252 self.proxy_is_verified = False
253
254 @property
255 def is_closed(self) -> bool:
256 return self.sock is None
257
258 @property
259 def is_connected(self) -> bool:
260 if self.sock is None:
261 return False
262 return not wait_for_read(self.sock, timeout=0.0)
263
264 @property
265 def has_connected_to_proxy(self) -> bool:
266 return self._has_connected_to_proxy
267
268 @property
269 def proxy_is_forwarding(self) -> bool:
270 """
271 Return True if a forwarding proxy is configured, else return False
272 """
273 return bool(self.proxy) and self._tunnel_host is None
274
275 def close(self) -> None:
276 try:
277 super().close()
278 finally:
279 # Reset all stateful properties so connection
280 # can be re-used without leaking prior configs.
281 self.sock = None
282 self.is_verified = False
283 self.proxy_is_verified = None
284 self._has_connected_to_proxy = False
285 self._response_options = None
286 self._tunnel_host = None
287 self._tunnel_port = None
288 self._tunnel_scheme = None
289
290 def putrequest(
291 self,
292 method: str,
293 url: str,
294 skip_host: bool = False,
295 skip_accept_encoding: bool = False,
296 ) -> None:
297 """"""
298 # Empty docstring because the indentation of CPython's implementation
299 # is broken but we don't want this method in our documentation.
300 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
301 if match:
302 raise ValueError(
303 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})"
304 )
305
306 return super().putrequest(
307 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding
308 )
309
310 def putheader(self, header: str, *values: str) -> None: # type: ignore[override]
311 """"""
312 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
313 super().putheader(header, *values)
314 elif to_str(header.lower()) not in SKIPPABLE_HEADERS:
315 skippable_headers = "', '".join(
316 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)]
317 )
318 raise ValueError(
319 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'"
320 )
321
322 # `request` method's signature intentionally violates LSP.
323 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental.
324 def request( # type: ignore[override]
325 self,
326 method: str,
327 url: str,
328 body: _TYPE_BODY | None = None,
329 headers: typing.Mapping[str, str] | None = None,
330 *,
331 chunked: bool = False,
332 preload_content: bool = True,
333 decode_content: bool = True,
334 enforce_content_length: bool = True,
335 ) -> None:
336 # Update the inner socket's timeout value to send the request.
337 # This only triggers if the connection is re-used.
338 if self.sock is not None:
339 self.sock.settimeout(self.timeout)
340
341 # Store these values to be fed into the HTTPResponse
342 # object later. TODO: Remove this in favor of a real
343 # HTTP lifecycle mechanism.
344
345 # We have to store these before we call .request()
346 # because sometimes we can still salvage a response
347 # off the wire even if we aren't able to completely
348 # send the request body.
349 self._response_options = _ResponseOptions(
350 request_method=method,
351 request_url=url,
352 preload_content=preload_content,
353 decode_content=decode_content,
354 enforce_content_length=enforce_content_length,
355 )
356
357 if headers is None:
358 headers = {}
359 header_keys = frozenset(to_str(k.lower()) for k in headers)
360 skip_accept_encoding = "accept-encoding" in header_keys
361 skip_host = "host" in header_keys
362 self.putrequest(
363 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
364 )
365
366 # Transform the body into an iterable of sendall()-able chunks
367 # and detect if an explicit Content-Length is doable.
368 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
369 chunks = chunks_and_cl.chunks
370 content_length = chunks_and_cl.content_length
371
372 # When chunked is explicit set to 'True' we respect that.
373 if chunked:
374 if "transfer-encoding" not in header_keys:
375 self.putheader("Transfer-Encoding", "chunked")
376 else:
377 # Detect whether a framing mechanism is already in use. If so
378 # we respect that value, otherwise we pick chunked vs content-length
379 # depending on the type of 'body'.
380 if "content-length" in header_keys:
381 chunked = False
382 elif "transfer-encoding" in header_keys:
383 chunked = True
384
385 # Otherwise we go off the recommendation of 'body_to_chunks()'.
386 else:
387 chunked = False
388 if content_length is None:
389 if chunks is not None:
390 chunked = True
391 self.putheader("Transfer-Encoding", "chunked")
392 else:
393 self.putheader("Content-Length", str(content_length))
394
395 # Now that framing headers are out of the way we send all the other headers.
396 if "user-agent" not in header_keys:
397 self.putheader("User-Agent", _get_default_user_agent())
398 for header, value in headers.items():
399 self.putheader(header, value)
400 self.endheaders()
401
402 # If we're given a body we start sending that in chunks.
403 if chunks is not None:
404 for chunk in chunks:
405 # Sending empty chunks isn't allowed for TE: chunked
406 # as it indicates the end of the body.
407 if not chunk:
408 continue
409 if isinstance(chunk, str):
410 chunk = chunk.encode("utf-8")
411 if chunked:
412 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk))
413 else:
414 self.send(chunk)
415
416 # Regardless of whether we have a body or not, if we're in
417 # chunked mode we want to send an explicit empty chunk.
418 if chunked:
419 self.send(b"0\r\n\r\n")
420
421 def request_chunked(
422 self,
423 method: str,
424 url: str,
425 body: _TYPE_BODY | None = None,
426 headers: typing.Mapping[str, str] | None = None,
427 ) -> None:
428 """
429 Alternative to the common request method, which sends the
430 body with chunked encoding and not as one block
431 """
432 warnings.warn(
433 "HTTPConnection.request_chunked() is deprecated and will be removed "
434 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).",
435 category=DeprecationWarning,
436 stacklevel=2,
437 )
438 self.request(method, url, body=body, headers=headers, chunked=True)
439
440 def getresponse( # type: ignore[override]
441 self,
442 ) -> HTTPResponse:
443 """
444 Get the response from the server.
445
446 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable.
447
448 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed.
449 """
450 # Raise the same error as http.client.HTTPConnection
451 if self._response_options is None:
452 raise ResponseNotReady()
453
454 # Reset this attribute for being used again.
455 resp_options = self._response_options
456 self._response_options = None
457
458 # Since the connection's timeout value may have been updated
459 # we need to set the timeout on the socket.
460 self.sock.settimeout(self.timeout)
461
462 # This is needed here to avoid circular import errors
463 from .response import HTTPResponse
464
465 # Get the response from http.client.HTTPConnection
466 httplib_response = super().getresponse()
467
468 try:
469 assert_header_parsing(httplib_response.msg)
470 except (HeaderParsingError, TypeError) as hpe:
471 log.warning(
472 "Failed to parse headers (url=%s): %s",
473 _url_from_connection(self, resp_options.request_url),
474 hpe,
475 exc_info=True,
476 )
477
478 headers = HTTPHeaderDict(httplib_response.msg.items())
479
480 response = HTTPResponse(
481 body=httplib_response,
482 headers=headers,
483 status=httplib_response.status,
484 version=httplib_response.version,
485 reason=httplib_response.reason,
486 preload_content=resp_options.preload_content,
487 decode_content=resp_options.decode_content,
488 original_response=httplib_response,
489 enforce_content_length=resp_options.enforce_content_length,
490 request_method=resp_options.request_method,
491 request_url=resp_options.request_url,
492 )
493 return response
494
495
496 class HTTPSConnection(HTTPConnection):
497 """
498 Many of the parameters to this constructor are passed to the underlying SSL
499 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
500 """
501
502 default_port = port_by_scheme["https"] # type: ignore[misc]
503
504 cert_reqs: int | str | None = None
505 ca_certs: str | None = None
506 ca_cert_dir: str | None = None
507 ca_cert_data: None | str | bytes = None
508 ssl_version: int | str | None = None
509 ssl_minimum_version: int | None = None
510 ssl_maximum_version: int | None = None
511 assert_fingerprint: str | None = None
512
513 def __init__(
514 self,
515 host: str,
516 port: int | None = None,
517 *,
518 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
519 source_address: tuple[str, int] | None = None,
520 blocksize: int = 16384,
521 socket_options: None
522 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options,
523 proxy: Url | None = None,
524 proxy_config: ProxyConfig | None = None,
525 cert_reqs: int | str | None = None,
526 assert_hostname: None | str | Literal[False] = None,
527 assert_fingerprint: str | None = None,
528 server_hostname: str | None = None,
529 ssl_context: ssl.SSLContext | None = None,
530 ca_certs: str | None = None,
531 ca_cert_dir: str | None = None,
532 ca_cert_data: None | str | bytes = None,
533 ssl_minimum_version: int | None = None,
534 ssl_maximum_version: int | None = None,
535 ssl_version: int | str | None = None, # Deprecated
536 cert_file: str | None = None,
537 key_file: str | None = None,
538 key_password: str | None = None,
539 ) -> None:
540 super().__init__(
541 host,
542 port=port,
543 timeout=timeout,
544 source_address=source_address,
545 blocksize=blocksize,
546 socket_options=socket_options,
547 proxy=proxy,
548 proxy_config=proxy_config,
549 )
550
551 self.key_file = key_file
552 self.cert_file = cert_file
553 self.key_password = key_password
554 self.ssl_context = ssl_context
555 self.server_hostname = server_hostname
556 self.assert_hostname = assert_hostname
557 self.assert_fingerprint = assert_fingerprint
558 self.ssl_version = ssl_version
559 self.ssl_minimum_version = ssl_minimum_version
560 self.ssl_maximum_version = ssl_maximum_version
561 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
562 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
563 self.ca_cert_data = ca_cert_data
564
565 # cert_reqs depends on ssl_context so calculate last.
566 if cert_reqs is None:
567 if self.ssl_context is not None:
568 cert_reqs = self.ssl_context.verify_mode
569 else:
570 cert_reqs = resolve_cert_reqs(None)
571 self.cert_reqs = cert_reqs
572
573 def set_cert(
574 self,
575 key_file: str | None = None,
576 cert_file: str | None = None,
577 cert_reqs: int | str | None = None,
578 key_password: str | None = None,
579 ca_certs: str | None = None,
580 assert_hostname: None | str | Literal[False] = None,
581 assert_fingerprint: str | None = None,
582 ca_cert_dir: str | None = None,
583 ca_cert_data: None | str | bytes = None,
584 ) -> None:
585 """
586 This method should only be called once, before the connection is used.
587 """
588 warnings.warn(
589 "HTTPSConnection.set_cert() is deprecated and will be removed "
590 "in urllib3 v2.1.0. Instead provide the parameters to the "
591 "HTTPSConnection constructor.",
592 category=DeprecationWarning,
593 stacklevel=2,
594 )
595
596 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
597 # have an SSLContext object in which case we'll use its verify_mode.
598 if cert_reqs is None:
599 if self.ssl_context is not None:
600 cert_reqs = self.ssl_context.verify_mode
601 else:
602 cert_reqs = resolve_cert_reqs(None)
603
604 self.key_file = key_file
605 self.cert_file = cert_file
606 self.cert_reqs = cert_reqs
607 self.key_password = key_password
608 self.assert_hostname = assert_hostname
609 self.assert_fingerprint = assert_fingerprint
610 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
611 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
612 self.ca_cert_data = ca_cert_data
613
614 def connect(self) -> None:
615 sock: socket.socket | ssl.SSLSocket
616 self.sock = sock = self._new_conn()
617 server_hostname: str = self.host
618 tls_in_tls = False
619
620 # Do we need to establish a tunnel?
621 if self._tunnel_host is not None:
622 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS.
623 if self._tunnel_scheme == "https":
624 # _connect_tls_proxy will verify and assign proxy_is_verified
625 self.sock = sock = self._connect_tls_proxy(self.host, sock)
626 tls_in_tls = True
627 elif self._tunnel_scheme == "http":
628 self.proxy_is_verified = False
629
630 # If we're tunneling it means we're connected to our proxy.
631 self._has_connected_to_proxy = True
632
633 self._tunnel() # type: ignore[attr-defined]
634 # Override the host with the one we're requesting data from.
635 server_hostname = self._tunnel_host
636
637 if self.server_hostname is not None:
638 server_hostname = self.server_hostname
639
640 is_time_off = datetime.date.today() < RECENT_DATE
641 if is_time_off:
642 warnings.warn(
643 (
644 f"System time is way off (before {RECENT_DATE}). This will probably "
645 "lead to SSL verification errors"
646 ),
647 SystemTimeWarning,
648 )
649
650 # Remove trailing '.' from fqdn hostnames to allow certificate validation
651 server_hostname_rm_dot = server_hostname.rstrip(".")
652
653 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
654 sock=sock,
655 cert_reqs=self.cert_reqs,
656 ssl_version=self.ssl_version,
657 ssl_minimum_version=self.ssl_minimum_version,
658 ssl_maximum_version=self.ssl_maximum_version,
659 ca_certs=self.ca_certs,
660 ca_cert_dir=self.ca_cert_dir,
661 ca_cert_data=self.ca_cert_data,
662 cert_file=self.cert_file,
663 key_file=self.key_file,
664 key_password=self.key_password,
665 server_hostname=server_hostname_rm_dot,
666 ssl_context=self.ssl_context,
667 tls_in_tls=tls_in_tls,
668 assert_hostname=self.assert_hostname,
669 assert_fingerprint=self.assert_fingerprint,
670 )
671 self.sock = sock_and_verified.socket
672
673 # Forwarding proxies can never have a verified target since
674 # the proxy is the one doing the verification. Should instead
675 # use a CONNECT tunnel in order to verify the target.
676 # See: https://github.com/urllib3/urllib3/issues/3267.
677 if self.proxy_is_forwarding:
678 self.is_verified = False
679 else:
680 self.is_verified = sock_and_verified.is_verified
681
682 # If there's a proxy to be connected to we are fully connected.
683 # This is set twice (once above and here) due to forwarding proxies
684 # not using tunnelling.
685 self._has_connected_to_proxy = bool(self.proxy)
686
687 # Set `self.proxy_is_verified` unless it's already set while
688 # establishing a tunnel.
689 if self._has_connected_to_proxy and self.proxy_is_verified is None:
690 self.proxy_is_verified = sock_and_verified.is_verified
691
692 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket:
693 """
694 Establish a TLS connection to the proxy using the provided SSL context.
695 """
696 # `_connect_tls_proxy` is called when self._tunnel_host is truthy.
697 proxy_config = typing.cast(ProxyConfig, self.proxy_config)
698 ssl_context = proxy_config.ssl_context
699 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
700 sock,
701 cert_reqs=self.cert_reqs,
702 ssl_version=self.ssl_version,
703 ssl_minimum_version=self.ssl_minimum_version,
704 ssl_maximum_version=self.ssl_maximum_version,
705 ca_certs=self.ca_certs,
706 ca_cert_dir=self.ca_cert_dir,
707 ca_cert_data=self.ca_cert_data,
708 server_hostname=hostname,
709 ssl_context=ssl_context,
710 assert_hostname=proxy_config.assert_hostname,
711 assert_fingerprint=proxy_config.assert_fingerprint,
712 # Features that aren't implemented for proxies yet:
713 cert_file=None,
714 key_file=None,
715 key_password=None,
716 tls_in_tls=False,
717 )
718 self.proxy_is_verified = sock_and_verified.is_verified
719 return sock_and_verified.socket # type: ignore[return-value]
720
721
722 class _WrappedAndVerifiedSocket(typing.NamedTuple):
723 """
724 Wrapped socket and whether the connection is
725 verified after the TLS handshake
726 """
727
728 socket: ssl.SSLSocket | SSLTransport
729 is_verified: bool
730
731
732 def _ssl_wrap_socket_and_match_hostname(
733 sock: socket.socket,
734 *,
735 cert_reqs: None | str | int,
736 ssl_version: None | str | int,
737 ssl_minimum_version: int | None,
738 ssl_maximum_version: int | None,
739 cert_file: str | None,
740 key_file: str | None,
741 key_password: str | None,
742 ca_certs: str | None,
743 ca_cert_dir: str | None,
744 ca_cert_data: None | str | bytes,
745 assert_hostname: None | str | Literal[False],
746 assert_fingerprint: str | None,
747 server_hostname: str | None,
748 ssl_context: ssl.SSLContext | None,
749 tls_in_tls: bool = False,
750 ) -> _WrappedAndVerifiedSocket:
751 """Logic for constructing an SSLContext from all TLS parameters, passing
752 that down into ssl_wrap_socket, and then doing certificate verification
753 either via hostname or fingerprint. This function exists to guarantee
754 that both proxies and targets have the same behavior when connecting via TLS.
755 """
756 default_ssl_context = False
757 if ssl_context is None:
758 default_ssl_context = True
759 context = create_urllib3_context(
760 ssl_version=resolve_ssl_version(ssl_version),
761 ssl_minimum_version=ssl_minimum_version,
762 ssl_maximum_version=ssl_maximum_version,
763 cert_reqs=resolve_cert_reqs(cert_reqs),
764 )
765 else:
766 context = ssl_context
767
768 context.verify_mode = resolve_cert_reqs(cert_reqs)
769
770 # In some cases, we want to verify hostnames ourselves
771 if (
772 # `ssl` can't verify fingerprints or alternate hostnames
773 assert_fingerprint
774 or assert_hostname
775 # assert_hostname can be set to False to disable hostname checking
776 or assert_hostname is False
777 # We still support OpenSSL 1.0.2, which prevents us from verifying
778 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933
779 or ssl_.IS_PYOPENSSL
780 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME
781 ):
782 context.check_hostname = False
783
784 # Try to load OS default certs if none are given. We need to do the hasattr() check
785 # for custom pyOpenSSL SSLContext objects because they don't support
786 # load_default_certs().
787 if (
788 not ca_certs
789 and not ca_cert_dir
790 and not ca_cert_data
791 and default_ssl_context
792 and hasattr(context, "load_default_certs")
793 ):
794 context.load_default_certs()
795
796 # Ensure that IPv6 addresses are in the proper format and don't have a
797 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses
798 # and interprets them as DNS hostnames.
799 if server_hostname is not None:
800 normalized = server_hostname.strip("[]")
801 if "%" in normalized:
802 normalized = normalized[: normalized.rfind("%")]
803 if is_ipaddress(normalized):
804 server_hostname = normalized
805
806 ssl_sock = ssl_wrap_socket(
807 sock=sock,
808 keyfile=key_file,
809 certfile=cert_file,
810 key_password=key_password,
811 ca_certs=ca_certs,
812 ca_cert_dir=ca_cert_dir,
813 ca_cert_data=ca_cert_data,
814 server_hostname=server_hostname,
815 ssl_context=context,
816 tls_in_tls=tls_in_tls,
817 )
818
819 try:
820 if assert_fingerprint:
821 _assert_fingerprint(
822 ssl_sock.getpeercert(binary_form=True), assert_fingerprint
823 )
824 elif (
825 context.verify_mode != ssl.CERT_NONE
826 and not context.check_hostname
827 and assert_hostname is not False
828 ):
829 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment]
830
831 # Need to signal to our match_hostname whether to use 'commonName' or not.
832 # If we're using our own constructed SSLContext we explicitly set 'False'
833 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name.
834 if default_ssl_context:
835 hostname_checks_common_name = False
836 else:
837 hostname_checks_common_name = (
838 getattr(context, "hostname_checks_common_name", False) or False
839 )
840
841 _match_hostname(
842 cert,
843 assert_hostname or server_hostname, # type: ignore[arg-type]
844 hostname_checks_common_name,
845 )
846
847 return _WrappedAndVerifiedSocket(
848 socket=ssl_sock,
849 is_verified=context.verify_mode == ssl.CERT_REQUIRED
850 or bool(assert_fingerprint),
851 )
852 except BaseException:
853 ssl_sock.close()
854 raise
855
856
857 def _match_hostname(
858 cert: _TYPE_PEER_CERT_RET_DICT | None,
859 asserted_hostname: str,
860 hostname_checks_common_name: bool = False,
861 ) -> None:
862 # Our upstream implementation of ssl.match_hostname()
863 # only applies this normalization to IP addresses so it doesn't
864 # match DNS SANs so we do the same thing!
865 stripped_hostname = asserted_hostname.strip("[]")
866 if is_ipaddress(stripped_hostname):
867 asserted_hostname = stripped_hostname
868
869 try:
870 match_hostname(cert, asserted_hostname, hostname_checks_common_name)
871 except CertificateError as e:
872 log.warning(
873 "Certificate did not match expected hostname: %s. Certificate: %s",
874 asserted_hostname,
875 cert,
876 )
877 # Add cert to exception and reraise so client code can inspect
878 # the cert when catching the exception, if they want to
879 e._peer_cert = cert # type: ignore[attr-defined]
880 raise
881
882
883 def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError:
884 # Look for the phrase 'wrong version number', if found
885 # then we should warn the user that we're very sure that
886 # this proxy is HTTP-only and they have a configuration issue.
887 error_normalized = " ".join(re.split("[^a-z]", str(err).lower()))
888 is_likely_http_proxy = (
889 "wrong version number" in error_normalized
890 or "unknown protocol" in error_normalized
891 or "record layer failure" in error_normalized
892 )
893 http_proxy_warning = (
894 ". Your proxy appears to only use HTTP and not HTTPS, "
895 "try changing your proxy URL to be HTTP. See: "
896 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html"
897 "#https-proxy-error-http-proxy"
898 )
899 new_err = ProxyError(
900 f"Unable to connect to proxy"
901 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}",
902 err,
903 )
904 new_err.__cause__ = err
905 return new_err
906
907
908 def _get_default_user_agent() -> str:
909 return f"python-urllib3/{__version__}"
910
911
912 class DummyConnection:
913 """Used to detect a failed ConnectionCls import."""
914
915
916 if not ssl:
917 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811
918
919
920 VerifiedHTTPSConnection = HTTPSConnection
921
922
923 def _url_from_connection(
924 conn: HTTPConnection | HTTPSConnection, path: str | None = None
925 ) -> str:
926 """Returns the URL from a given connection. This is mainly used for testing and logging."""
927
928 scheme = "https" if isinstance(conn, HTTPSConnection) else "http"
929
930 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url