jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import hmac jpayne@7: import os jpayne@7: import socket jpayne@7: import sys jpayne@7: import typing jpayne@7: import warnings jpayne@7: from binascii import unhexlify jpayne@7: from hashlib import md5, sha1, sha256 jpayne@7: jpayne@7: from ..exceptions import ProxySchemeUnsupported, SSLError jpayne@7: from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE jpayne@7: jpayne@7: SSLContext = None jpayne@7: SSLTransport = None jpayne@7: HAS_NEVER_CHECK_COMMON_NAME = False jpayne@7: IS_PYOPENSSL = False jpayne@7: ALPN_PROTOCOLS = ["http/1.1"] jpayne@7: jpayne@7: _TYPE_VERSION_INFO = typing.Tuple[int, int, int, str, int] jpayne@7: jpayne@7: # Maps the length of a digest to a possible hash function producing this digest jpayne@7: HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256} jpayne@7: jpayne@7: jpayne@7: def _is_bpo_43522_fixed( jpayne@7: implementation_name: str, jpayne@7: version_info: _TYPE_VERSION_INFO, jpayne@7: pypy_version_info: _TYPE_VERSION_INFO | None, jpayne@7: ) -> bool: jpayne@7: """Return True for CPython 3.8.9+, 3.9.3+ or 3.10+ and PyPy 7.3.8+ where jpayne@7: setting SSLContext.hostname_checks_common_name to False works. jpayne@7: jpayne@7: Outside of CPython and PyPy we don't know which implementations work jpayne@7: or not so we conservatively use our hostname matching as we know that works jpayne@7: on all implementations. jpayne@7: jpayne@7: https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963 jpayne@7: https://foss.heptapod.net/pypy/pypy/-/issues/3539 jpayne@7: """ jpayne@7: if implementation_name == "pypy": jpayne@7: # https://foss.heptapod.net/pypy/pypy/-/issues/3129 jpayne@7: return pypy_version_info >= (7, 3, 8) # type: ignore[operator] jpayne@7: elif implementation_name == "cpython": jpayne@7: major_minor = version_info[:2] jpayne@7: micro = version_info[2] jpayne@7: return ( jpayne@7: (major_minor == (3, 8) and micro >= 9) jpayne@7: or (major_minor == (3, 9) and micro >= 3) jpayne@7: or major_minor >= (3, 10) jpayne@7: ) jpayne@7: else: # Defensive: jpayne@7: return False jpayne@7: jpayne@7: jpayne@7: def _is_has_never_check_common_name_reliable( jpayne@7: openssl_version: str, jpayne@7: openssl_version_number: int, jpayne@7: implementation_name: str, jpayne@7: version_info: _TYPE_VERSION_INFO, jpayne@7: pypy_version_info: _TYPE_VERSION_INFO | None, jpayne@7: ) -> bool: jpayne@7: # As of May 2023, all released versions of LibreSSL fail to reject certificates with jpayne@7: # only common names, see https://github.com/urllib3/urllib3/pull/3024 jpayne@7: is_openssl = openssl_version.startswith("OpenSSL ") jpayne@7: # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags jpayne@7: # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython. jpayne@7: # https://github.com/openssl/openssl/issues/14579 jpayne@7: # This was released in OpenSSL 1.1.1l+ (>=0x101010cf) jpayne@7: is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF jpayne@7: jpayne@7: return is_openssl and ( jpayne@7: is_openssl_issue_14579_fixed jpayne@7: or _is_bpo_43522_fixed(implementation_name, version_info, pypy_version_info) jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: if typing.TYPE_CHECKING: jpayne@7: from ssl import VerifyMode jpayne@7: from typing import Literal, TypedDict jpayne@7: jpayne@7: from .ssltransport import SSLTransport as SSLTransportType jpayne@7: jpayne@7: class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): jpayne@7: subjectAltName: tuple[tuple[str, str], ...] jpayne@7: subject: tuple[tuple[tuple[str, str], ...], ...] jpayne@7: serialNumber: str jpayne@7: jpayne@7: jpayne@7: # Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' jpayne@7: _SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} jpayne@7: jpayne@7: try: # Do we have ssl at all? jpayne@7: import ssl jpayne@7: from ssl import ( # type: ignore[assignment] jpayne@7: CERT_REQUIRED, jpayne@7: HAS_NEVER_CHECK_COMMON_NAME, jpayne@7: OP_NO_COMPRESSION, jpayne@7: OP_NO_TICKET, jpayne@7: OPENSSL_VERSION, jpayne@7: OPENSSL_VERSION_NUMBER, jpayne@7: PROTOCOL_TLS, jpayne@7: PROTOCOL_TLS_CLIENT, jpayne@7: OP_NO_SSLv2, jpayne@7: OP_NO_SSLv3, jpayne@7: SSLContext, jpayne@7: TLSVersion, jpayne@7: ) jpayne@7: jpayne@7: PROTOCOL_SSLv23 = PROTOCOL_TLS jpayne@7: jpayne@7: # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython jpayne@7: # 3.8.9, 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+ jpayne@7: if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( jpayne@7: OPENSSL_VERSION, jpayne@7: OPENSSL_VERSION_NUMBER, jpayne@7: sys.implementation.name, jpayne@7: sys.version_info, jpayne@7: sys.pypy_version_info if sys.implementation.name == "pypy" else None, # type: ignore[attr-defined] jpayne@7: ): jpayne@7: HAS_NEVER_CHECK_COMMON_NAME = False jpayne@7: jpayne@7: # Need to be careful here in case old TLS versions get jpayne@7: # removed in future 'ssl' module implementations. jpayne@7: for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): jpayne@7: try: jpayne@7: _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( jpayne@7: TLSVersion, attr jpayne@7: ) jpayne@7: except AttributeError: # Defensive: jpayne@7: continue jpayne@7: jpayne@7: from .ssltransport import SSLTransport # type: ignore[assignment] jpayne@7: except ImportError: jpayne@7: OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment] jpayne@7: OP_NO_TICKET = 0x4000 # type: ignore[assignment] jpayne@7: OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment] jpayne@7: OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment] jpayne@7: PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment] jpayne@7: PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment] jpayne@7: jpayne@7: jpayne@7: _TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] jpayne@7: jpayne@7: jpayne@7: def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: jpayne@7: """ jpayne@7: Checks if given fingerprint matches the supplied certificate. jpayne@7: jpayne@7: :param cert: jpayne@7: Certificate as bytes object. jpayne@7: :param fingerprint: jpayne@7: Fingerprint as string of hexdigits, can be interspersed by colons. jpayne@7: """ jpayne@7: jpayne@7: if cert is None: jpayne@7: raise SSLError("No certificate for the peer.") jpayne@7: jpayne@7: fingerprint = fingerprint.replace(":", "").lower() jpayne@7: digest_length = len(fingerprint) jpayne@7: hashfunc = HASHFUNC_MAP.get(digest_length) jpayne@7: if not hashfunc: jpayne@7: raise SSLError(f"Fingerprint of invalid length: {fingerprint}") jpayne@7: jpayne@7: # We need encode() here for py32; works on py2 and p33. jpayne@7: fingerprint_bytes = unhexlify(fingerprint.encode()) jpayne@7: jpayne@7: cert_digest = hashfunc(cert).digest() jpayne@7: jpayne@7: if not hmac.compare_digest(cert_digest, fingerprint_bytes): jpayne@7: raise SSLError( jpayne@7: f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: jpayne@7: """ jpayne@7: Resolves the argument to a numeric constant, which can be passed to jpayne@7: the wrap_socket function/method from the ssl module. jpayne@7: Defaults to :data:`ssl.CERT_REQUIRED`. jpayne@7: If given a string it is assumed to be the name of the constant in the jpayne@7: :mod:`ssl` module or its abbreviation. jpayne@7: (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. jpayne@7: If it's neither `None` nor a string we assume it is already the numeric jpayne@7: constant which can directly be passed to wrap_socket. jpayne@7: """ jpayne@7: if candidate is None: jpayne@7: return CERT_REQUIRED jpayne@7: jpayne@7: if isinstance(candidate, str): jpayne@7: res = getattr(ssl, candidate, None) jpayne@7: if res is None: jpayne@7: res = getattr(ssl, "CERT_" + candidate) jpayne@7: return res # type: ignore[no-any-return] jpayne@7: jpayne@7: return candidate # type: ignore[return-value] jpayne@7: jpayne@7: jpayne@7: def resolve_ssl_version(candidate: None | int | str) -> int: jpayne@7: """ jpayne@7: like resolve_cert_reqs jpayne@7: """ jpayne@7: if candidate is None: jpayne@7: return PROTOCOL_TLS jpayne@7: jpayne@7: if isinstance(candidate, str): jpayne@7: res = getattr(ssl, candidate, None) jpayne@7: if res is None: jpayne@7: res = getattr(ssl, "PROTOCOL_" + candidate) jpayne@7: return typing.cast(int, res) jpayne@7: jpayne@7: return candidate jpayne@7: jpayne@7: jpayne@7: def create_urllib3_context( jpayne@7: ssl_version: int | None = None, jpayne@7: cert_reqs: int | None = None, jpayne@7: options: int | None = None, jpayne@7: ciphers: str | None = None, jpayne@7: ssl_minimum_version: int | None = None, jpayne@7: ssl_maximum_version: int | None = None, jpayne@7: ) -> ssl.SSLContext: jpayne@7: """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. jpayne@7: jpayne@7: :param ssl_version: jpayne@7: The desired protocol version to use. This will default to jpayne@7: PROTOCOL_SSLv23 which will negotiate the highest protocol that both jpayne@7: the server and your installation of OpenSSL support. jpayne@7: jpayne@7: This parameter is deprecated instead use 'ssl_minimum_version'. jpayne@7: :param ssl_minimum_version: jpayne@7: The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. jpayne@7: :param ssl_maximum_version: jpayne@7: The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. jpayne@7: Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the jpayne@7: default value. jpayne@7: :param cert_reqs: jpayne@7: Whether to require the certificate verification. This defaults to jpayne@7: ``ssl.CERT_REQUIRED``. jpayne@7: :param options: jpayne@7: Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, jpayne@7: ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. jpayne@7: :param ciphers: jpayne@7: Which cipher suites to allow the server to select. Defaults to either system configured jpayne@7: ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. jpayne@7: :returns: jpayne@7: Constructed SSLContext object with specified options jpayne@7: :rtype: SSLContext jpayne@7: """ jpayne@7: if SSLContext is None: jpayne@7: raise TypeError("Can't create an SSLContext object without an ssl module") jpayne@7: jpayne@7: # This means 'ssl_version' was specified as an exact value. jpayne@7: if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): jpayne@7: # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' jpayne@7: # to avoid conflicts. jpayne@7: if ssl_minimum_version is not None or ssl_maximum_version is not None: jpayne@7: raise ValueError( jpayne@7: "Can't specify both 'ssl_version' and either " jpayne@7: "'ssl_minimum_version' or 'ssl_maximum_version'" jpayne@7: ) jpayne@7: jpayne@7: # 'ssl_version' is deprecated and will be removed in the future. jpayne@7: else: jpayne@7: # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. jpayne@7: ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( jpayne@7: ssl_version, TLSVersion.MINIMUM_SUPPORTED jpayne@7: ) jpayne@7: ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( jpayne@7: ssl_version, TLSVersion.MAXIMUM_SUPPORTED jpayne@7: ) jpayne@7: jpayne@7: # This warning message is pushing users to use 'ssl_minimum_version' jpayne@7: # instead of both min/max. Best practice is to only set the minimum version and jpayne@7: # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' jpayne@7: warnings.warn( jpayne@7: "'ssl_version' option is deprecated and will be " jpayne@7: "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'", jpayne@7: category=DeprecationWarning, jpayne@7: stacklevel=2, jpayne@7: ) jpayne@7: jpayne@7: # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT jpayne@7: context = SSLContext(PROTOCOL_TLS_CLIENT) jpayne@7: jpayne@7: if ssl_minimum_version is not None: jpayne@7: context.minimum_version = ssl_minimum_version jpayne@7: else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here jpayne@7: context.minimum_version = TLSVersion.TLSv1_2 jpayne@7: jpayne@7: if ssl_maximum_version is not None: jpayne@7: context.maximum_version = ssl_maximum_version jpayne@7: jpayne@7: # Unless we're given ciphers defer to either system ciphers in jpayne@7: # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. jpayne@7: if ciphers: jpayne@7: context.set_ciphers(ciphers) jpayne@7: jpayne@7: # Setting the default here, as we may have no ssl module on import jpayne@7: cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs jpayne@7: jpayne@7: if options is None: jpayne@7: options = 0 jpayne@7: # SSLv2 is easily broken and is considered harmful and dangerous jpayne@7: options |= OP_NO_SSLv2 jpayne@7: # SSLv3 has several problems and is now dangerous jpayne@7: options |= OP_NO_SSLv3 jpayne@7: # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ jpayne@7: # (issue #309) jpayne@7: options |= OP_NO_COMPRESSION jpayne@7: # TLSv1.2 only. Unless set explicitly, do not request tickets. jpayne@7: # This may save some bandwidth on wire, and although the ticket is encrypted, jpayne@7: # there is a risk associated with it being on wire, jpayne@7: # if the server is not rotating its ticketing keys properly. jpayne@7: options |= OP_NO_TICKET jpayne@7: jpayne@7: context.options |= options jpayne@7: jpayne@7: # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is jpayne@7: # necessary for conditional client cert authentication with TLS 1.3. jpayne@7: # The attribute is None for OpenSSL <= 1.1.0 or does not exist when using jpayne@7: # an SSLContext created by pyOpenSSL. jpayne@7: if getattr(context, "post_handshake_auth", None) is not None: jpayne@7: context.post_handshake_auth = True jpayne@7: jpayne@7: # The order of the below lines setting verify_mode and check_hostname jpayne@7: # matter due to safe-guards SSLContext has to prevent an SSLContext with jpayne@7: # check_hostname=True, verify_mode=NONE/OPTIONAL. jpayne@7: # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own jpayne@7: # 'ssl.match_hostname()' implementation. jpayne@7: if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: jpayne@7: context.verify_mode = cert_reqs jpayne@7: context.check_hostname = True jpayne@7: else: jpayne@7: context.check_hostname = False jpayne@7: context.verify_mode = cert_reqs jpayne@7: jpayne@7: try: jpayne@7: context.hostname_checks_common_name = False jpayne@7: except AttributeError: # Defensive: for CPython < 3.8.9 and 3.9.3; for PyPy < 7.3.8 jpayne@7: pass jpayne@7: jpayne@7: # Enable logging of TLS session keys via defacto standard environment variable jpayne@7: # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values. jpayne@7: if hasattr(context, "keylog_filename"): jpayne@7: sslkeylogfile = os.environ.get("SSLKEYLOGFILE") jpayne@7: if sslkeylogfile: jpayne@7: context.keylog_filename = sslkeylogfile jpayne@7: jpayne@7: return context jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def ssl_wrap_socket( jpayne@7: sock: socket.socket, jpayne@7: keyfile: str | None = ..., jpayne@7: certfile: str | None = ..., jpayne@7: cert_reqs: int | None = ..., jpayne@7: ca_certs: str | None = ..., jpayne@7: server_hostname: str | None = ..., jpayne@7: ssl_version: int | None = ..., jpayne@7: ciphers: str | None = ..., jpayne@7: ssl_context: ssl.SSLContext | None = ..., jpayne@7: ca_cert_dir: str | None = ..., jpayne@7: key_password: str | None = ..., jpayne@7: ca_cert_data: None | str | bytes = ..., jpayne@7: tls_in_tls: Literal[False] = ..., jpayne@7: ) -> ssl.SSLSocket: jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def ssl_wrap_socket( jpayne@7: sock: socket.socket, jpayne@7: keyfile: str | None = ..., jpayne@7: certfile: str | None = ..., jpayne@7: cert_reqs: int | None = ..., jpayne@7: ca_certs: str | None = ..., jpayne@7: server_hostname: str | None = ..., jpayne@7: ssl_version: int | None = ..., jpayne@7: ciphers: str | None = ..., jpayne@7: ssl_context: ssl.SSLContext | None = ..., jpayne@7: ca_cert_dir: str | None = ..., jpayne@7: key_password: str | None = ..., jpayne@7: ca_cert_data: None | str | bytes = ..., jpayne@7: tls_in_tls: bool = ..., jpayne@7: ) -> ssl.SSLSocket | SSLTransportType: jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: def ssl_wrap_socket( jpayne@7: sock: socket.socket, jpayne@7: keyfile: str | None = None, jpayne@7: certfile: str | None = None, jpayne@7: cert_reqs: int | None = None, jpayne@7: ca_certs: str | None = None, jpayne@7: server_hostname: str | None = None, jpayne@7: ssl_version: int | None = None, jpayne@7: ciphers: str | None = None, jpayne@7: ssl_context: ssl.SSLContext | None = None, jpayne@7: ca_cert_dir: str | None = None, jpayne@7: key_password: str | None = None, jpayne@7: ca_cert_data: None | str | bytes = None, jpayne@7: tls_in_tls: bool = False, jpayne@7: ) -> ssl.SSLSocket | SSLTransportType: jpayne@7: """ jpayne@7: All arguments except for server_hostname, ssl_context, tls_in_tls, ca_cert_data and jpayne@7: ca_cert_dir have the same meaning as they do when using jpayne@7: :func:`ssl.create_default_context`, :meth:`ssl.SSLContext.load_cert_chain`, jpayne@7: :meth:`ssl.SSLContext.set_ciphers` and :meth:`ssl.SSLContext.wrap_socket`. jpayne@7: jpayne@7: :param server_hostname: jpayne@7: When SNI is supported, the expected hostname of the certificate jpayne@7: :param ssl_context: jpayne@7: A pre-made :class:`SSLContext` object. If none is provided, one will jpayne@7: be created using :func:`create_urllib3_context`. jpayne@7: :param ciphers: jpayne@7: A string of ciphers we wish the client to support. jpayne@7: :param ca_cert_dir: jpayne@7: A directory containing CA certificates in multiple separate files, as jpayne@7: supported by OpenSSL's -CApath flag or the capath argument to jpayne@7: SSLContext.load_verify_locations(). jpayne@7: :param key_password: jpayne@7: Optional password if the keyfile is encrypted. jpayne@7: :param ca_cert_data: jpayne@7: Optional string containing CA certificates in PEM format suitable for jpayne@7: passing as the cadata parameter to SSLContext.load_verify_locations() jpayne@7: :param tls_in_tls: jpayne@7: Use SSLTransport to wrap the existing socket. jpayne@7: """ jpayne@7: context = ssl_context jpayne@7: if context is None: jpayne@7: # Note: This branch of code and all the variables in it are only used in tests. jpayne@7: # We should consider deprecating and removing this code. jpayne@7: context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) jpayne@7: jpayne@7: if ca_certs or ca_cert_dir or ca_cert_data: jpayne@7: try: jpayne@7: context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) jpayne@7: except OSError as e: jpayne@7: raise SSLError(e) from e jpayne@7: jpayne@7: elif ssl_context is None and hasattr(context, "load_default_certs"): jpayne@7: # try to load OS default certs; works well on Windows. jpayne@7: context.load_default_certs() jpayne@7: jpayne@7: # Attempt to detect if we get the goofy behavior of the jpayne@7: # keyfile being encrypted and OpenSSL asking for the jpayne@7: # passphrase via the terminal and instead error out. jpayne@7: if keyfile and key_password is None and _is_key_file_encrypted(keyfile): jpayne@7: raise SSLError("Client private key is encrypted, password is required") jpayne@7: jpayne@7: if certfile: jpayne@7: if key_password is None: jpayne@7: context.load_cert_chain(certfile, keyfile) jpayne@7: else: jpayne@7: context.load_cert_chain(certfile, keyfile, key_password) jpayne@7: jpayne@7: try: jpayne@7: context.set_alpn_protocols(ALPN_PROTOCOLS) jpayne@7: except NotImplementedError: # Defensive: in CI, we always have set_alpn_protocols jpayne@7: pass jpayne@7: jpayne@7: ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) jpayne@7: return ssl_sock jpayne@7: jpayne@7: jpayne@7: def is_ipaddress(hostname: str | bytes) -> bool: jpayne@7: """Detects whether the hostname given is an IPv4 or IPv6 address. jpayne@7: Also detects IPv6 addresses with Zone IDs. jpayne@7: jpayne@7: :param str hostname: Hostname to examine. jpayne@7: :return: True if the hostname is an IP address, False otherwise. jpayne@7: """ jpayne@7: if isinstance(hostname, bytes): jpayne@7: # IDN A-label bytes are ASCII compatible. jpayne@7: hostname = hostname.decode("ascii") jpayne@7: return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) jpayne@7: jpayne@7: jpayne@7: def _is_key_file_encrypted(key_file: str) -> bool: jpayne@7: """Detects if a key file is encrypted or not.""" jpayne@7: with open(key_file) as f: jpayne@7: for line in f: jpayne@7: # Look for Proc-Type: 4,ENCRYPTED jpayne@7: if "ENCRYPTED" in line: jpayne@7: return True jpayne@7: jpayne@7: return False jpayne@7: jpayne@7: jpayne@7: def _ssl_wrap_socket_impl( jpayne@7: sock: socket.socket, jpayne@7: ssl_context: ssl.SSLContext, jpayne@7: tls_in_tls: bool, jpayne@7: server_hostname: str | None = None, jpayne@7: ) -> ssl.SSLSocket | SSLTransportType: jpayne@7: if tls_in_tls: jpayne@7: if not SSLTransport: jpayne@7: # Import error, ssl is not available. jpayne@7: raise ProxySchemeUnsupported( jpayne@7: "TLS in TLS requires support for the 'ssl' module" jpayne@7: ) jpayne@7: jpayne@7: SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) jpayne@7: return SSLTransport(sock, ssl_context, server_hostname) jpayne@7: jpayne@7: return ssl_context.wrap_socket(sock, server_hostname=server_hostname)