jpayne@7: """ jpayne@7: Module for using pyOpenSSL as a TLS backend. This module was relevant before jpayne@7: the standard library ``ssl`` module supported SNI, but now that we've dropped jpayne@7: support for Python 2.7 all relevant Python versions support SNI so jpayne@7: **this module is no longer recommended**. jpayne@7: jpayne@7: This needs the following packages installed: jpayne@7: jpayne@7: * `pyOpenSSL`_ (tested with 16.0.0) jpayne@7: * `cryptography`_ (minimum 1.3.4, from pyopenssl) jpayne@7: * `idna`_ (minimum 2.0) jpayne@7: jpayne@7: However, pyOpenSSL depends on cryptography, so while we use all three directly here we jpayne@7: end up having relatively few packages required. jpayne@7: jpayne@7: You can install them with the following command: jpayne@7: jpayne@7: .. code-block:: bash jpayne@7: jpayne@7: $ python -m pip install pyopenssl cryptography idna jpayne@7: jpayne@7: To activate certificate checking, call jpayne@7: :func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code jpayne@7: before you begin making HTTP requests. This can be done in a ``sitecustomize`` jpayne@7: module, or at any other time before your application begins using ``urllib3``, jpayne@7: like this: jpayne@7: jpayne@7: .. code-block:: python jpayne@7: jpayne@7: try: jpayne@7: import urllib3.contrib.pyopenssl jpayne@7: urllib3.contrib.pyopenssl.inject_into_urllib3() jpayne@7: except ImportError: jpayne@7: pass jpayne@7: jpayne@7: .. _pyopenssl: https://www.pyopenssl.org jpayne@7: .. _cryptography: https://cryptography.io jpayne@7: .. _idna: https://github.com/kjd/idna jpayne@7: """ jpayne@7: jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import OpenSSL.SSL # type: ignore[import-untyped] jpayne@7: from cryptography import x509 jpayne@7: jpayne@7: try: jpayne@7: from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined] jpayne@7: except ImportError: jpayne@7: # UnsupportedExtension is gone in cryptography >= 2.1.0 jpayne@7: class UnsupportedExtension(Exception): # type: ignore[no-redef] jpayne@7: pass jpayne@7: jpayne@7: jpayne@7: import logging jpayne@7: import ssl jpayne@7: import typing jpayne@7: from io import BytesIO jpayne@7: from socket import socket as socket_cls jpayne@7: from socket import timeout jpayne@7: jpayne@7: from .. import util jpayne@7: jpayne@7: if typing.TYPE_CHECKING: jpayne@7: from OpenSSL.crypto import X509 # type: ignore[import-untyped] jpayne@7: jpayne@7: jpayne@7: __all__ = ["inject_into_urllib3", "extract_from_urllib3"] jpayne@7: jpayne@7: # Map from urllib3 to PyOpenSSL compatible parameter-values. jpayne@7: _openssl_versions: dict[int, int] = { jpayne@7: util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] jpayne@7: util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined] jpayne@7: ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, jpayne@7: } jpayne@7: jpayne@7: if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): jpayne@7: _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD jpayne@7: jpayne@7: if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): jpayne@7: _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD jpayne@7: jpayne@7: jpayne@7: _stdlib_to_openssl_verify = { jpayne@7: ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, jpayne@7: ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, jpayne@7: ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER jpayne@7: + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, jpayne@7: } jpayne@7: _openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()} jpayne@7: jpayne@7: # The SSLvX values are the most likely to be missing in the future jpayne@7: # but we check them all just to be sure. jpayne@7: _OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr( jpayne@7: OpenSSL.SSL, "OP_NO_SSLv3", 0 jpayne@7: ) jpayne@7: _OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0) jpayne@7: _OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0) jpayne@7: _OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0) jpayne@7: _OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0) jpayne@7: jpayne@7: _openssl_to_ssl_minimum_version: dict[int, int] = { jpayne@7: ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, jpayne@7: ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3, jpayne@7: ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1, jpayne@7: ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1, jpayne@7: ssl.TLSVersion.TLSv1_3: ( jpayne@7: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 jpayne@7: ), jpayne@7: ssl.TLSVersion.MAXIMUM_SUPPORTED: ( jpayne@7: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 jpayne@7: ), jpayne@7: } jpayne@7: _openssl_to_ssl_maximum_version: dict[int, int] = { jpayne@7: ssl.TLSVersion.MINIMUM_SUPPORTED: ( jpayne@7: _OP_NO_SSLv2_OR_SSLv3 jpayne@7: | _OP_NO_TLSv1 jpayne@7: | _OP_NO_TLSv1_1 jpayne@7: | _OP_NO_TLSv1_2 jpayne@7: | _OP_NO_TLSv1_3 jpayne@7: ), jpayne@7: ssl.TLSVersion.TLSv1: ( jpayne@7: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3 jpayne@7: ), jpayne@7: ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3, jpayne@7: ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3, jpayne@7: ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3, jpayne@7: ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, jpayne@7: } jpayne@7: jpayne@7: # OpenSSL will only write 16K at a time jpayne@7: SSL_WRITE_BLOCKSIZE = 16384 jpayne@7: jpayne@7: orig_util_SSLContext = util.ssl_.SSLContext jpayne@7: jpayne@7: jpayne@7: log = logging.getLogger(__name__) jpayne@7: jpayne@7: jpayne@7: def inject_into_urllib3() -> None: jpayne@7: "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." jpayne@7: jpayne@7: _validate_dependencies_met() jpayne@7: jpayne@7: util.SSLContext = PyOpenSSLContext # type: ignore[assignment] jpayne@7: util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment] jpayne@7: util.IS_PYOPENSSL = True jpayne@7: util.ssl_.IS_PYOPENSSL = True jpayne@7: jpayne@7: jpayne@7: def extract_from_urllib3() -> None: jpayne@7: "Undo monkey-patching by :func:`inject_into_urllib3`." jpayne@7: jpayne@7: util.SSLContext = orig_util_SSLContext jpayne@7: util.ssl_.SSLContext = orig_util_SSLContext jpayne@7: util.IS_PYOPENSSL = False jpayne@7: util.ssl_.IS_PYOPENSSL = False jpayne@7: jpayne@7: jpayne@7: def _validate_dependencies_met() -> None: jpayne@7: """ jpayne@7: Verifies that PyOpenSSL's package-level dependencies have been met. jpayne@7: Throws `ImportError` if they are not met. jpayne@7: """ jpayne@7: # Method added in `cryptography==1.1`; not available in older versions jpayne@7: from cryptography.x509.extensions import Extensions jpayne@7: jpayne@7: if getattr(Extensions, "get_extension_for_class", None) is None: jpayne@7: raise ImportError( jpayne@7: "'cryptography' module missing required functionality. " jpayne@7: "Try upgrading to v1.3.4 or newer." jpayne@7: ) jpayne@7: jpayne@7: # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 jpayne@7: # attribute is only present on those versions. jpayne@7: from OpenSSL.crypto import X509 jpayne@7: jpayne@7: x509 = X509() jpayne@7: if getattr(x509, "_x509", None) is None: jpayne@7: raise ImportError( jpayne@7: "'pyOpenSSL' module missing required functionality. " jpayne@7: "Try upgrading to v0.14 or newer." jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: def _dnsname_to_stdlib(name: str) -> str | None: jpayne@7: """ jpayne@7: Converts a dNSName SubjectAlternativeName field to the form used by the jpayne@7: standard library on the given Python version. jpayne@7: jpayne@7: Cryptography produces a dNSName as a unicode string that was idna-decoded jpayne@7: from ASCII bytes. We need to idna-encode that string to get it back, and jpayne@7: then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib jpayne@7: uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). jpayne@7: jpayne@7: If the name cannot be idna-encoded then we return None signalling that jpayne@7: the name given should be skipped. jpayne@7: """ jpayne@7: jpayne@7: def idna_encode(name: str) -> bytes | None: jpayne@7: """ jpayne@7: Borrowed wholesale from the Python Cryptography Project. It turns out jpayne@7: that we can't just safely call `idna.encode`: it can explode for jpayne@7: wildcard names. This avoids that problem. jpayne@7: """ jpayne@7: import idna jpayne@7: jpayne@7: try: jpayne@7: for prefix in ["*.", "."]: jpayne@7: if name.startswith(prefix): jpayne@7: name = name[len(prefix) :] jpayne@7: return prefix.encode("ascii") + idna.encode(name) jpayne@7: return idna.encode(name) jpayne@7: except idna.core.IDNAError: jpayne@7: return None jpayne@7: jpayne@7: # Don't send IPv6 addresses through the IDNA encoder. jpayne@7: if ":" in name: jpayne@7: return name jpayne@7: jpayne@7: encoded_name = idna_encode(name) jpayne@7: if encoded_name is None: jpayne@7: return None jpayne@7: return encoded_name.decode("utf-8") jpayne@7: jpayne@7: jpayne@7: def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]: jpayne@7: """ jpayne@7: Given an PyOpenSSL certificate, provides all the subject alternative names. jpayne@7: """ jpayne@7: cert = peer_cert.to_cryptography() jpayne@7: jpayne@7: # We want to find the SAN extension. Ask Cryptography to locate it (it's jpayne@7: # faster than looping in Python) jpayne@7: try: jpayne@7: ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value jpayne@7: except x509.ExtensionNotFound: jpayne@7: # No such extension, return the empty list. jpayne@7: return [] jpayne@7: except ( jpayne@7: x509.DuplicateExtension, jpayne@7: UnsupportedExtension, jpayne@7: x509.UnsupportedGeneralNameType, jpayne@7: UnicodeError, jpayne@7: ) as e: jpayne@7: # A problem has been found with the quality of the certificate. Assume jpayne@7: # no SAN field is present. jpayne@7: log.warning( jpayne@7: "A problem was encountered with the certificate that prevented " jpayne@7: "urllib3 from finding the SubjectAlternativeName field. This can " jpayne@7: "affect certificate validation. The error was %s", jpayne@7: e, jpayne@7: ) jpayne@7: return [] jpayne@7: jpayne@7: # We want to return dNSName and iPAddress fields. We need to cast the IPs jpayne@7: # back to strings because the match_hostname function wants them as jpayne@7: # strings. jpayne@7: # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 jpayne@7: # decoded. This is pretty frustrating, but that's what the standard library jpayne@7: # does with certificates, and so we need to attempt to do the same. jpayne@7: # We also want to skip over names which cannot be idna encoded. jpayne@7: names = [ jpayne@7: ("DNS", name) jpayne@7: for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) jpayne@7: if name is not None jpayne@7: ] jpayne@7: names.extend( jpayne@7: ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) jpayne@7: ) jpayne@7: jpayne@7: return names jpayne@7: jpayne@7: jpayne@7: class WrappedSocket: jpayne@7: """API-compatibility wrapper for Python OpenSSL's Connection-class.""" jpayne@7: jpayne@7: def __init__( jpayne@7: self, jpayne@7: connection: OpenSSL.SSL.Connection, jpayne@7: socket: socket_cls, jpayne@7: suppress_ragged_eofs: bool = True, jpayne@7: ) -> None: jpayne@7: self.connection = connection jpayne@7: self.socket = socket jpayne@7: self.suppress_ragged_eofs = suppress_ragged_eofs jpayne@7: self._io_refs = 0 jpayne@7: self._closed = False jpayne@7: jpayne@7: def fileno(self) -> int: jpayne@7: return self.socket.fileno() jpayne@7: jpayne@7: # Copy-pasted from Python 3.5 source code jpayne@7: def _decref_socketios(self) -> None: jpayne@7: if self._io_refs > 0: jpayne@7: self._io_refs -= 1 jpayne@7: if self._closed: jpayne@7: self.close() jpayne@7: jpayne@7: def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes: jpayne@7: try: jpayne@7: data = self.connection.recv(*args, **kwargs) jpayne@7: except OpenSSL.SSL.SysCallError as e: jpayne@7: if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): jpayne@7: return b"" jpayne@7: else: jpayne@7: raise OSError(e.args[0], str(e)) from e jpayne@7: except OpenSSL.SSL.ZeroReturnError: jpayne@7: if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: jpayne@7: return b"" jpayne@7: else: jpayne@7: raise jpayne@7: except OpenSSL.SSL.WantReadError as e: jpayne@7: if not util.wait_for_read(self.socket, self.socket.gettimeout()): jpayne@7: raise timeout("The read operation timed out") from e jpayne@7: else: jpayne@7: return self.recv(*args, **kwargs) jpayne@7: jpayne@7: # TLS 1.3 post-handshake authentication jpayne@7: except OpenSSL.SSL.Error as e: jpayne@7: raise ssl.SSLError(f"read error: {e!r}") from e jpayne@7: else: jpayne@7: return data # type: ignore[no-any-return] jpayne@7: jpayne@7: def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int: jpayne@7: try: jpayne@7: return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return] jpayne@7: except OpenSSL.SSL.SysCallError as e: jpayne@7: if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): jpayne@7: return 0 jpayne@7: else: jpayne@7: raise OSError(e.args[0], str(e)) from e jpayne@7: except OpenSSL.SSL.ZeroReturnError: jpayne@7: if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: jpayne@7: return 0 jpayne@7: else: jpayne@7: raise jpayne@7: except OpenSSL.SSL.WantReadError as e: jpayne@7: if not util.wait_for_read(self.socket, self.socket.gettimeout()): jpayne@7: raise timeout("The read operation timed out") from e jpayne@7: else: jpayne@7: return self.recv_into(*args, **kwargs) jpayne@7: jpayne@7: # TLS 1.3 post-handshake authentication jpayne@7: except OpenSSL.SSL.Error as e: jpayne@7: raise ssl.SSLError(f"read error: {e!r}") from e jpayne@7: jpayne@7: def settimeout(self, timeout: float) -> None: jpayne@7: return self.socket.settimeout(timeout) jpayne@7: jpayne@7: def _send_until_done(self, data: bytes) -> int: jpayne@7: while True: jpayne@7: try: jpayne@7: return self.connection.send(data) # type: ignore[no-any-return] jpayne@7: except OpenSSL.SSL.WantWriteError as e: jpayne@7: if not util.wait_for_write(self.socket, self.socket.gettimeout()): jpayne@7: raise timeout() from e jpayne@7: continue jpayne@7: except OpenSSL.SSL.SysCallError as e: jpayne@7: raise OSError(e.args[0], str(e)) from e jpayne@7: jpayne@7: def sendall(self, data: bytes) -> None: jpayne@7: total_sent = 0 jpayne@7: while total_sent < len(data): jpayne@7: sent = self._send_until_done( jpayne@7: data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] jpayne@7: ) jpayne@7: total_sent += sent jpayne@7: jpayne@7: def shutdown(self) -> None: jpayne@7: # FIXME rethrow compatible exceptions should we ever use this jpayne@7: self.connection.shutdown() jpayne@7: jpayne@7: def close(self) -> None: jpayne@7: self._closed = True jpayne@7: if self._io_refs <= 0: jpayne@7: self._real_close() jpayne@7: jpayne@7: def _real_close(self) -> None: jpayne@7: try: jpayne@7: return self.connection.close() # type: ignore[no-any-return] jpayne@7: except OpenSSL.SSL.Error: jpayne@7: return jpayne@7: jpayne@7: def getpeercert( jpayne@7: self, binary_form: bool = False jpayne@7: ) -> dict[str, list[typing.Any]] | None: jpayne@7: x509 = self.connection.get_peer_certificate() jpayne@7: jpayne@7: if not x509: jpayne@7: return x509 # type: ignore[no-any-return] jpayne@7: jpayne@7: if binary_form: jpayne@7: return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return] jpayne@7: jpayne@7: return { jpayne@7: "subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item] jpayne@7: "subjectAltName": get_subj_alt_name(x509), jpayne@7: } jpayne@7: jpayne@7: def version(self) -> str: jpayne@7: return self.connection.get_protocol_version_name() # type: ignore[no-any-return] jpayne@7: jpayne@7: jpayne@7: WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined] jpayne@7: jpayne@7: jpayne@7: class PyOpenSSLContext: jpayne@7: """ jpayne@7: I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible jpayne@7: for translating the interface of the standard library ``SSLContext`` object jpayne@7: to calls into PyOpenSSL. jpayne@7: """ jpayne@7: jpayne@7: def __init__(self, protocol: int) -> None: jpayne@7: self.protocol = _openssl_versions[protocol] jpayne@7: self._ctx = OpenSSL.SSL.Context(self.protocol) jpayne@7: self._options = 0 jpayne@7: self.check_hostname = False jpayne@7: self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED jpayne@7: self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED jpayne@7: jpayne@7: @property jpayne@7: def options(self) -> int: jpayne@7: return self._options jpayne@7: jpayne@7: @options.setter jpayne@7: def options(self, value: int) -> None: jpayne@7: self._options = value jpayne@7: self._set_ctx_options() jpayne@7: jpayne@7: @property jpayne@7: def verify_mode(self) -> int: jpayne@7: return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] jpayne@7: jpayne@7: @verify_mode.setter jpayne@7: def verify_mode(self, value: ssl.VerifyMode) -> None: jpayne@7: self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) jpayne@7: jpayne@7: def set_default_verify_paths(self) -> None: jpayne@7: self._ctx.set_default_verify_paths() jpayne@7: jpayne@7: def set_ciphers(self, ciphers: bytes | str) -> None: jpayne@7: if isinstance(ciphers, str): jpayne@7: ciphers = ciphers.encode("utf-8") jpayne@7: self._ctx.set_cipher_list(ciphers) jpayne@7: jpayne@7: def load_verify_locations( jpayne@7: self, jpayne@7: cafile: str | None = None, jpayne@7: capath: str | None = None, jpayne@7: cadata: bytes | None = None, jpayne@7: ) -> None: jpayne@7: if cafile is not None: jpayne@7: cafile = cafile.encode("utf-8") # type: ignore[assignment] jpayne@7: if capath is not None: jpayne@7: capath = capath.encode("utf-8") # type: ignore[assignment] jpayne@7: try: jpayne@7: self._ctx.load_verify_locations(cafile, capath) jpayne@7: if cadata is not None: jpayne@7: self._ctx.load_verify_locations(BytesIO(cadata)) jpayne@7: except OpenSSL.SSL.Error as e: jpayne@7: raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e jpayne@7: jpayne@7: def load_cert_chain( jpayne@7: self, jpayne@7: certfile: str, jpayne@7: keyfile: str | None = None, jpayne@7: password: str | None = None, jpayne@7: ) -> None: jpayne@7: try: jpayne@7: self._ctx.use_certificate_chain_file(certfile) jpayne@7: if password is not None: jpayne@7: if not isinstance(password, bytes): jpayne@7: password = password.encode("utf-8") # type: ignore[assignment] jpayne@7: self._ctx.set_passwd_cb(lambda *_: password) jpayne@7: self._ctx.use_privatekey_file(keyfile or certfile) jpayne@7: except OpenSSL.SSL.Error as e: jpayne@7: raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e jpayne@7: jpayne@7: def set_alpn_protocols(self, protocols: list[bytes | str]) -> None: jpayne@7: protocols = [util.util.to_bytes(p, "ascii") for p in protocols] jpayne@7: return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return] jpayne@7: jpayne@7: def wrap_socket( jpayne@7: self, jpayne@7: sock: socket_cls, jpayne@7: server_side: bool = False, jpayne@7: do_handshake_on_connect: bool = True, jpayne@7: suppress_ragged_eofs: bool = True, jpayne@7: server_hostname: bytes | str | None = None, jpayne@7: ) -> WrappedSocket: jpayne@7: cnx = OpenSSL.SSL.Connection(self._ctx, sock) jpayne@7: jpayne@7: # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 jpayne@7: if server_hostname and not util.ssl_.is_ipaddress(server_hostname): jpayne@7: if isinstance(server_hostname, str): jpayne@7: server_hostname = server_hostname.encode("utf-8") jpayne@7: cnx.set_tlsext_host_name(server_hostname) jpayne@7: jpayne@7: cnx.set_connect_state() jpayne@7: jpayne@7: while True: jpayne@7: try: jpayne@7: cnx.do_handshake() jpayne@7: except OpenSSL.SSL.WantReadError as e: jpayne@7: if not util.wait_for_read(sock, sock.gettimeout()): jpayne@7: raise timeout("select timed out") from e jpayne@7: continue jpayne@7: except OpenSSL.SSL.Error as e: jpayne@7: raise ssl.SSLError(f"bad handshake: {e!r}") from e jpayne@7: break jpayne@7: jpayne@7: return WrappedSocket(cnx, sock) jpayne@7: jpayne@7: def _set_ctx_options(self) -> None: jpayne@7: self._ctx.set_options( jpayne@7: self._options jpayne@7: | _openssl_to_ssl_minimum_version[self._minimum_version] jpayne@7: | _openssl_to_ssl_maximum_version[self._maximum_version] jpayne@7: ) jpayne@7: jpayne@7: @property jpayne@7: def minimum_version(self) -> int: jpayne@7: return self._minimum_version jpayne@7: jpayne@7: @minimum_version.setter jpayne@7: def minimum_version(self, minimum_version: int) -> None: jpayne@7: self._minimum_version = minimum_version jpayne@7: self._set_ctx_options() jpayne@7: jpayne@7: @property jpayne@7: def maximum_version(self) -> int: jpayne@7: return self._maximum_version jpayne@7: jpayne@7: @maximum_version.setter jpayne@7: def maximum_version(self, maximum_version: int) -> None: jpayne@7: self._maximum_version = maximum_version jpayne@7: self._set_ctx_options() jpayne@7: jpayne@7: jpayne@7: def _verify_callback( jpayne@7: cnx: OpenSSL.SSL.Connection, jpayne@7: x509: X509, jpayne@7: err_no: int, jpayne@7: err_depth: int, jpayne@7: return_code: int, jpayne@7: ) -> bool: jpayne@7: return err_no == 0