Mercurial > repos > jpayne > bioproject_to_srr_2
comparison urllib3/util/ssl_.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 from __future__ import annotations | |
2 | |
3 import hmac | |
4 import os | |
5 import socket | |
6 import sys | |
7 import typing | |
8 import warnings | |
9 from binascii import unhexlify | |
10 from hashlib import md5, sha1, sha256 | |
11 | |
12 from ..exceptions import ProxySchemeUnsupported, SSLError | |
13 from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE | |
14 | |
15 SSLContext = None | |
16 SSLTransport = None | |
17 HAS_NEVER_CHECK_COMMON_NAME = False | |
18 IS_PYOPENSSL = False | |
19 ALPN_PROTOCOLS = ["http/1.1"] | |
20 | |
21 _TYPE_VERSION_INFO = typing.Tuple[int, int, int, str, int] | |
22 | |
23 # Maps the length of a digest to a possible hash function producing this digest | |
24 HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256} | |
25 | |
26 | |
27 def _is_bpo_43522_fixed( | |
28 implementation_name: str, | |
29 version_info: _TYPE_VERSION_INFO, | |
30 pypy_version_info: _TYPE_VERSION_INFO | None, | |
31 ) -> bool: | |
32 """Return True for CPython 3.8.9+, 3.9.3+ or 3.10+ and PyPy 7.3.8+ where | |
33 setting SSLContext.hostname_checks_common_name to False works. | |
34 | |
35 Outside of CPython and PyPy we don't know which implementations work | |
36 or not so we conservatively use our hostname matching as we know that works | |
37 on all implementations. | |
38 | |
39 https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963 | |
40 https://foss.heptapod.net/pypy/pypy/-/issues/3539 | |
41 """ | |
42 if implementation_name == "pypy": | |
43 # https://foss.heptapod.net/pypy/pypy/-/issues/3129 | |
44 return pypy_version_info >= (7, 3, 8) # type: ignore[operator] | |
45 elif implementation_name == "cpython": | |
46 major_minor = version_info[:2] | |
47 micro = version_info[2] | |
48 return ( | |
49 (major_minor == (3, 8) and micro >= 9) | |
50 or (major_minor == (3, 9) and micro >= 3) | |
51 or major_minor >= (3, 10) | |
52 ) | |
53 else: # Defensive: | |
54 return False | |
55 | |
56 | |
57 def _is_has_never_check_common_name_reliable( | |
58 openssl_version: str, | |
59 openssl_version_number: int, | |
60 implementation_name: str, | |
61 version_info: _TYPE_VERSION_INFO, | |
62 pypy_version_info: _TYPE_VERSION_INFO | None, | |
63 ) -> bool: | |
64 # As of May 2023, all released versions of LibreSSL fail to reject certificates with | |
65 # only common names, see https://github.com/urllib3/urllib3/pull/3024 | |
66 is_openssl = openssl_version.startswith("OpenSSL ") | |
67 # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags | |
68 # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython. | |
69 # https://github.com/openssl/openssl/issues/14579 | |
70 # This was released in OpenSSL 1.1.1l+ (>=0x101010cf) | |
71 is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF | |
72 | |
73 return is_openssl and ( | |
74 is_openssl_issue_14579_fixed | |
75 or _is_bpo_43522_fixed(implementation_name, version_info, pypy_version_info) | |
76 ) | |
77 | |
78 | |
79 if typing.TYPE_CHECKING: | |
80 from ssl import VerifyMode | |
81 from typing import Literal, TypedDict | |
82 | |
83 from .ssltransport import SSLTransport as SSLTransportType | |
84 | |
85 class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): | |
86 subjectAltName: tuple[tuple[str, str], ...] | |
87 subject: tuple[tuple[tuple[str, str], ...], ...] | |
88 serialNumber: str | |
89 | |
90 | |
91 # Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' | |
92 _SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} | |
93 | |
94 try: # Do we have ssl at all? | |
95 import ssl | |
96 from ssl import ( # type: ignore[assignment] | |
97 CERT_REQUIRED, | |
98 HAS_NEVER_CHECK_COMMON_NAME, | |
99 OP_NO_COMPRESSION, | |
100 OP_NO_TICKET, | |
101 OPENSSL_VERSION, | |
102 OPENSSL_VERSION_NUMBER, | |
103 PROTOCOL_TLS, | |
104 PROTOCOL_TLS_CLIENT, | |
105 OP_NO_SSLv2, | |
106 OP_NO_SSLv3, | |
107 SSLContext, | |
108 TLSVersion, | |
109 ) | |
110 | |
111 PROTOCOL_SSLv23 = PROTOCOL_TLS | |
112 | |
113 # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython | |
114 # 3.8.9, 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+ | |
115 if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( | |
116 OPENSSL_VERSION, | |
117 OPENSSL_VERSION_NUMBER, | |
118 sys.implementation.name, | |
119 sys.version_info, | |
120 sys.pypy_version_info if sys.implementation.name == "pypy" else None, # type: ignore[attr-defined] | |
121 ): | |
122 HAS_NEVER_CHECK_COMMON_NAME = False | |
123 | |
124 # Need to be careful here in case old TLS versions get | |
125 # removed in future 'ssl' module implementations. | |
126 for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): | |
127 try: | |
128 _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( | |
129 TLSVersion, attr | |
130 ) | |
131 except AttributeError: # Defensive: | |
132 continue | |
133 | |
134 from .ssltransport import SSLTransport # type: ignore[assignment] | |
135 except ImportError: | |
136 OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment] | |
137 OP_NO_TICKET = 0x4000 # type: ignore[assignment] | |
138 OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment] | |
139 OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment] | |
140 PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment] | |
141 PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment] | |
142 | |
143 | |
144 _TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] | |
145 | |
146 | |
147 def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: | |
148 """ | |
149 Checks if given fingerprint matches the supplied certificate. | |
150 | |
151 :param cert: | |
152 Certificate as bytes object. | |
153 :param fingerprint: | |
154 Fingerprint as string of hexdigits, can be interspersed by colons. | |
155 """ | |
156 | |
157 if cert is None: | |
158 raise SSLError("No certificate for the peer.") | |
159 | |
160 fingerprint = fingerprint.replace(":", "").lower() | |
161 digest_length = len(fingerprint) | |
162 hashfunc = HASHFUNC_MAP.get(digest_length) | |
163 if not hashfunc: | |
164 raise SSLError(f"Fingerprint of invalid length: {fingerprint}") | |
165 | |
166 # We need encode() here for py32; works on py2 and p33. | |
167 fingerprint_bytes = unhexlify(fingerprint.encode()) | |
168 | |
169 cert_digest = hashfunc(cert).digest() | |
170 | |
171 if not hmac.compare_digest(cert_digest, fingerprint_bytes): | |
172 raise SSLError( | |
173 f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' | |
174 ) | |
175 | |
176 | |
177 def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: | |
178 """ | |
179 Resolves the argument to a numeric constant, which can be passed to | |
180 the wrap_socket function/method from the ssl module. | |
181 Defaults to :data:`ssl.CERT_REQUIRED`. | |
182 If given a string it is assumed to be the name of the constant in the | |
183 :mod:`ssl` module or its abbreviation. | |
184 (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. | |
185 If it's neither `None` nor a string we assume it is already the numeric | |
186 constant which can directly be passed to wrap_socket. | |
187 """ | |
188 if candidate is None: | |
189 return CERT_REQUIRED | |
190 | |
191 if isinstance(candidate, str): | |
192 res = getattr(ssl, candidate, None) | |
193 if res is None: | |
194 res = getattr(ssl, "CERT_" + candidate) | |
195 return res # type: ignore[no-any-return] | |
196 | |
197 return candidate # type: ignore[return-value] | |
198 | |
199 | |
200 def resolve_ssl_version(candidate: None | int | str) -> int: | |
201 """ | |
202 like resolve_cert_reqs | |
203 """ | |
204 if candidate is None: | |
205 return PROTOCOL_TLS | |
206 | |
207 if isinstance(candidate, str): | |
208 res = getattr(ssl, candidate, None) | |
209 if res is None: | |
210 res = getattr(ssl, "PROTOCOL_" + candidate) | |
211 return typing.cast(int, res) | |
212 | |
213 return candidate | |
214 | |
215 | |
216 def create_urllib3_context( | |
217 ssl_version: int | None = None, | |
218 cert_reqs: int | None = None, | |
219 options: int | None = None, | |
220 ciphers: str | None = None, | |
221 ssl_minimum_version: int | None = None, | |
222 ssl_maximum_version: int | None = None, | |
223 ) -> ssl.SSLContext: | |
224 """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. | |
225 | |
226 :param ssl_version: | |
227 The desired protocol version to use. This will default to | |
228 PROTOCOL_SSLv23 which will negotiate the highest protocol that both | |
229 the server and your installation of OpenSSL support. | |
230 | |
231 This parameter is deprecated instead use 'ssl_minimum_version'. | |
232 :param ssl_minimum_version: | |
233 The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. | |
234 :param ssl_maximum_version: | |
235 The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. | |
236 Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the | |
237 default value. | |
238 :param cert_reqs: | |
239 Whether to require the certificate verification. This defaults to | |
240 ``ssl.CERT_REQUIRED``. | |
241 :param options: | |
242 Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, | |
243 ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. | |
244 :param ciphers: | |
245 Which cipher suites to allow the server to select. Defaults to either system configured | |
246 ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. | |
247 :returns: | |
248 Constructed SSLContext object with specified options | |
249 :rtype: SSLContext | |
250 """ | |
251 if SSLContext is None: | |
252 raise TypeError("Can't create an SSLContext object without an ssl module") | |
253 | |
254 # This means 'ssl_version' was specified as an exact value. | |
255 if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): | |
256 # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' | |
257 # to avoid conflicts. | |
258 if ssl_minimum_version is not None or ssl_maximum_version is not None: | |
259 raise ValueError( | |
260 "Can't specify both 'ssl_version' and either " | |
261 "'ssl_minimum_version' or 'ssl_maximum_version'" | |
262 ) | |
263 | |
264 # 'ssl_version' is deprecated and will be removed in the future. | |
265 else: | |
266 # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. | |
267 ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( | |
268 ssl_version, TLSVersion.MINIMUM_SUPPORTED | |
269 ) | |
270 ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( | |
271 ssl_version, TLSVersion.MAXIMUM_SUPPORTED | |
272 ) | |
273 | |
274 # This warning message is pushing users to use 'ssl_minimum_version' | |
275 # instead of both min/max. Best practice is to only set the minimum version and | |
276 # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' | |
277 warnings.warn( | |
278 "'ssl_version' option is deprecated and will be " | |
279 "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'", | |
280 category=DeprecationWarning, | |
281 stacklevel=2, | |
282 ) | |
283 | |
284 # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT | |
285 context = SSLContext(PROTOCOL_TLS_CLIENT) | |
286 | |
287 if ssl_minimum_version is not None: | |
288 context.minimum_version = ssl_minimum_version | |
289 else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here | |
290 context.minimum_version = TLSVersion.TLSv1_2 | |
291 | |
292 if ssl_maximum_version is not None: | |
293 context.maximum_version = ssl_maximum_version | |
294 | |
295 # Unless we're given ciphers defer to either system ciphers in | |
296 # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. | |
297 if ciphers: | |
298 context.set_ciphers(ciphers) | |
299 | |
300 # Setting the default here, as we may have no ssl module on import | |
301 cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs | |
302 | |
303 if options is None: | |
304 options = 0 | |
305 # SSLv2 is easily broken and is considered harmful and dangerous | |
306 options |= OP_NO_SSLv2 | |
307 # SSLv3 has several problems and is now dangerous | |
308 options |= OP_NO_SSLv3 | |
309 # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ | |
310 # (issue #309) | |
311 options |= OP_NO_COMPRESSION | |
312 # TLSv1.2 only. Unless set explicitly, do not request tickets. | |
313 # This may save some bandwidth on wire, and although the ticket is encrypted, | |
314 # there is a risk associated with it being on wire, | |
315 # if the server is not rotating its ticketing keys properly. | |
316 options |= OP_NO_TICKET | |
317 | |
318 context.options |= options | |
319 | |
320 # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is | |
321 # necessary for conditional client cert authentication with TLS 1.3. | |
322 # The attribute is None for OpenSSL <= 1.1.0 or does not exist when using | |
323 # an SSLContext created by pyOpenSSL. | |
324 if getattr(context, "post_handshake_auth", None) is not None: | |
325 context.post_handshake_auth = True | |
326 | |
327 # The order of the below lines setting verify_mode and check_hostname | |
328 # matter due to safe-guards SSLContext has to prevent an SSLContext with | |
329 # check_hostname=True, verify_mode=NONE/OPTIONAL. | |
330 # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own | |
331 # 'ssl.match_hostname()' implementation. | |
332 if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: | |
333 context.verify_mode = cert_reqs | |
334 context.check_hostname = True | |
335 else: | |
336 context.check_hostname = False | |
337 context.verify_mode = cert_reqs | |
338 | |
339 try: | |
340 context.hostname_checks_common_name = False | |
341 except AttributeError: # Defensive: for CPython < 3.8.9 and 3.9.3; for PyPy < 7.3.8 | |
342 pass | |
343 | |
344 # Enable logging of TLS session keys via defacto standard environment variable | |
345 # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values. | |
346 if hasattr(context, "keylog_filename"): | |
347 sslkeylogfile = os.environ.get("SSLKEYLOGFILE") | |
348 if sslkeylogfile: | |
349 context.keylog_filename = sslkeylogfile | |
350 | |
351 return context | |
352 | |
353 | |
354 @typing.overload | |
355 def ssl_wrap_socket( | |
356 sock: socket.socket, | |
357 keyfile: str | None = ..., | |
358 certfile: str | None = ..., | |
359 cert_reqs: int | None = ..., | |
360 ca_certs: str | None = ..., | |
361 server_hostname: str | None = ..., | |
362 ssl_version: int | None = ..., | |
363 ciphers: str | None = ..., | |
364 ssl_context: ssl.SSLContext | None = ..., | |
365 ca_cert_dir: str | None = ..., | |
366 key_password: str | None = ..., | |
367 ca_cert_data: None | str | bytes = ..., | |
368 tls_in_tls: Literal[False] = ..., | |
369 ) -> ssl.SSLSocket: | |
370 ... | |
371 | |
372 | |
373 @typing.overload | |
374 def ssl_wrap_socket( | |
375 sock: socket.socket, | |
376 keyfile: str | None = ..., | |
377 certfile: str | None = ..., | |
378 cert_reqs: int | None = ..., | |
379 ca_certs: str | None = ..., | |
380 server_hostname: str | None = ..., | |
381 ssl_version: int | None = ..., | |
382 ciphers: str | None = ..., | |
383 ssl_context: ssl.SSLContext | None = ..., | |
384 ca_cert_dir: str | None = ..., | |
385 key_password: str | None = ..., | |
386 ca_cert_data: None | str | bytes = ..., | |
387 tls_in_tls: bool = ..., | |
388 ) -> ssl.SSLSocket | SSLTransportType: | |
389 ... | |
390 | |
391 | |
392 def ssl_wrap_socket( | |
393 sock: socket.socket, | |
394 keyfile: str | None = None, | |
395 certfile: str | None = None, | |
396 cert_reqs: int | None = None, | |
397 ca_certs: str | None = None, | |
398 server_hostname: str | None = None, | |
399 ssl_version: int | None = None, | |
400 ciphers: str | None = None, | |
401 ssl_context: ssl.SSLContext | None = None, | |
402 ca_cert_dir: str | None = None, | |
403 key_password: str | None = None, | |
404 ca_cert_data: None | str | bytes = None, | |
405 tls_in_tls: bool = False, | |
406 ) -> ssl.SSLSocket | SSLTransportType: | |
407 """ | |
408 All arguments except for server_hostname, ssl_context, tls_in_tls, ca_cert_data and | |
409 ca_cert_dir have the same meaning as they do when using | |
410 :func:`ssl.create_default_context`, :meth:`ssl.SSLContext.load_cert_chain`, | |
411 :meth:`ssl.SSLContext.set_ciphers` and :meth:`ssl.SSLContext.wrap_socket`. | |
412 | |
413 :param server_hostname: | |
414 When SNI is supported, the expected hostname of the certificate | |
415 :param ssl_context: | |
416 A pre-made :class:`SSLContext` object. If none is provided, one will | |
417 be created using :func:`create_urllib3_context`. | |
418 :param ciphers: | |
419 A string of ciphers we wish the client to support. | |
420 :param ca_cert_dir: | |
421 A directory containing CA certificates in multiple separate files, as | |
422 supported by OpenSSL's -CApath flag or the capath argument to | |
423 SSLContext.load_verify_locations(). | |
424 :param key_password: | |
425 Optional password if the keyfile is encrypted. | |
426 :param ca_cert_data: | |
427 Optional string containing CA certificates in PEM format suitable for | |
428 passing as the cadata parameter to SSLContext.load_verify_locations() | |
429 :param tls_in_tls: | |
430 Use SSLTransport to wrap the existing socket. | |
431 """ | |
432 context = ssl_context | |
433 if context is None: | |
434 # Note: This branch of code and all the variables in it are only used in tests. | |
435 # We should consider deprecating and removing this code. | |
436 context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) | |
437 | |
438 if ca_certs or ca_cert_dir or ca_cert_data: | |
439 try: | |
440 context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) | |
441 except OSError as e: | |
442 raise SSLError(e) from e | |
443 | |
444 elif ssl_context is None and hasattr(context, "load_default_certs"): | |
445 # try to load OS default certs; works well on Windows. | |
446 context.load_default_certs() | |
447 | |
448 # Attempt to detect if we get the goofy behavior of the | |
449 # keyfile being encrypted and OpenSSL asking for the | |
450 # passphrase via the terminal and instead error out. | |
451 if keyfile and key_password is None and _is_key_file_encrypted(keyfile): | |
452 raise SSLError("Client private key is encrypted, password is required") | |
453 | |
454 if certfile: | |
455 if key_password is None: | |
456 context.load_cert_chain(certfile, keyfile) | |
457 else: | |
458 context.load_cert_chain(certfile, keyfile, key_password) | |
459 | |
460 try: | |
461 context.set_alpn_protocols(ALPN_PROTOCOLS) | |
462 except NotImplementedError: # Defensive: in CI, we always have set_alpn_protocols | |
463 pass | |
464 | |
465 ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) | |
466 return ssl_sock | |
467 | |
468 | |
469 def is_ipaddress(hostname: str | bytes) -> bool: | |
470 """Detects whether the hostname given is an IPv4 or IPv6 address. | |
471 Also detects IPv6 addresses with Zone IDs. | |
472 | |
473 :param str hostname: Hostname to examine. | |
474 :return: True if the hostname is an IP address, False otherwise. | |
475 """ | |
476 if isinstance(hostname, bytes): | |
477 # IDN A-label bytes are ASCII compatible. | |
478 hostname = hostname.decode("ascii") | |
479 return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) | |
480 | |
481 | |
482 def _is_key_file_encrypted(key_file: str) -> bool: | |
483 """Detects if a key file is encrypted or not.""" | |
484 with open(key_file) as f: | |
485 for line in f: | |
486 # Look for Proc-Type: 4,ENCRYPTED | |
487 if "ENCRYPTED" in line: | |
488 return True | |
489 | |
490 return False | |
491 | |
492 | |
493 def _ssl_wrap_socket_impl( | |
494 sock: socket.socket, | |
495 ssl_context: ssl.SSLContext, | |
496 tls_in_tls: bool, | |
497 server_hostname: str | None = None, | |
498 ) -> ssl.SSLSocket | SSLTransportType: | |
499 if tls_in_tls: | |
500 if not SSLTransport: | |
501 # Import error, ssl is not available. | |
502 raise ProxySchemeUnsupported( | |
503 "TLS in TLS requires support for the 'ssl' module" | |
504 ) | |
505 | |
506 SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) | |
507 return SSLTransport(sock, ssl_context, server_hostname) | |
508 | |
509 return ssl_context.wrap_socket(sock, server_hostname=server_hostname) |