jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import errno jpayne@7: import logging jpayne@7: import queue jpayne@7: import sys jpayne@7: import typing jpayne@7: import warnings jpayne@7: import weakref jpayne@7: from socket import timeout as SocketTimeout jpayne@7: from types import TracebackType jpayne@7: jpayne@7: from ._base_connection import _TYPE_BODY jpayne@7: from ._collections import HTTPHeaderDict jpayne@7: from ._request_methods import RequestMethods jpayne@7: from .connection import ( jpayne@7: BaseSSLError, jpayne@7: BrokenPipeError, jpayne@7: DummyConnection, jpayne@7: HTTPConnection, jpayne@7: HTTPException, jpayne@7: HTTPSConnection, jpayne@7: ProxyConfig, jpayne@7: _wrap_proxy_error, jpayne@7: ) jpayne@7: from .connection import port_by_scheme as port_by_scheme jpayne@7: from .exceptions import ( jpayne@7: ClosedPoolError, jpayne@7: EmptyPoolError, jpayne@7: FullPoolError, jpayne@7: HostChangedError, jpayne@7: InsecureRequestWarning, jpayne@7: LocationValueError, jpayne@7: MaxRetryError, jpayne@7: NewConnectionError, jpayne@7: ProtocolError, jpayne@7: ProxyError, jpayne@7: ReadTimeoutError, jpayne@7: SSLError, jpayne@7: TimeoutError, jpayne@7: ) jpayne@7: from .response import BaseHTTPResponse jpayne@7: from .util.connection import is_connection_dropped jpayne@7: from .util.proxy import connection_requires_http_tunnel jpayne@7: from .util.request import _TYPE_BODY_POSITION, set_file_position jpayne@7: from .util.retry import Retry jpayne@7: from .util.ssl_match_hostname import CertificateError jpayne@7: from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_DEFAULT, Timeout jpayne@7: from .util.url import Url, _encode_target jpayne@7: from .util.url import _normalize_host as normalize_host jpayne@7: from .util.url import parse_url jpayne@7: from .util.util import to_str jpayne@7: jpayne@7: if typing.TYPE_CHECKING: jpayne@7: import ssl jpayne@7: from typing import Literal jpayne@7: jpayne@7: from ._base_connection import BaseHTTPConnection, BaseHTTPSConnection jpayne@7: jpayne@7: log = logging.getLogger(__name__) jpayne@7: jpayne@7: _TYPE_TIMEOUT = typing.Union[Timeout, float, _TYPE_DEFAULT, None] jpayne@7: jpayne@7: _SelfT = typing.TypeVar("_SelfT") jpayne@7: jpayne@7: jpayne@7: # Pool objects jpayne@7: class ConnectionPool: jpayne@7: """ jpayne@7: Base class for all connection pools, such as jpayne@7: :class:`.HTTPConnectionPool` and :class:`.HTTPSConnectionPool`. jpayne@7: jpayne@7: .. note:: jpayne@7: ConnectionPool.urlopen() does not normalize or percent-encode target URIs jpayne@7: which is useful if your target server doesn't support percent-encoded jpayne@7: target URIs. jpayne@7: """ jpayne@7: jpayne@7: scheme: str | None = None jpayne@7: QueueCls = queue.LifoQueue jpayne@7: jpayne@7: def __init__(self, host: str, port: int | None = None) -> None: jpayne@7: if not host: jpayne@7: raise LocationValueError("No host specified.") jpayne@7: jpayne@7: self.host = _normalize_host(host, scheme=self.scheme) jpayne@7: self.port = port jpayne@7: jpayne@7: # This property uses 'normalize_host()' (not '_normalize_host()') jpayne@7: # to avoid removing square braces around IPv6 addresses. jpayne@7: # This value is sent to `HTTPConnection.set_tunnel()` if called jpayne@7: # because square braces are required for HTTP CONNECT tunneling. jpayne@7: self._tunnel_host = normalize_host(host, scheme=self.scheme).lower() jpayne@7: jpayne@7: def __str__(self) -> str: jpayne@7: return f"{type(self).__name__}(host={self.host!r}, port={self.port!r})" jpayne@7: jpayne@7: def __enter__(self: _SelfT) -> _SelfT: jpayne@7: return self jpayne@7: jpayne@7: def __exit__( jpayne@7: self, jpayne@7: exc_type: type[BaseException] | None, jpayne@7: exc_val: BaseException | None, jpayne@7: exc_tb: TracebackType | None, jpayne@7: ) -> Literal[False]: jpayne@7: self.close() jpayne@7: # Return False to re-raise any potential exceptions jpayne@7: return False jpayne@7: jpayne@7: def close(self) -> None: jpayne@7: """ jpayne@7: Close all pooled connections and disable the pool. jpayne@7: """ jpayne@7: jpayne@7: jpayne@7: # This is taken from http://hg.python.org/cpython/file/7aaba721ebc0/Lib/socket.py#l252 jpayne@7: _blocking_errnos = {errno.EAGAIN, errno.EWOULDBLOCK} jpayne@7: jpayne@7: jpayne@7: class HTTPConnectionPool(ConnectionPool, RequestMethods): jpayne@7: """ jpayne@7: Thread-safe connection pool for one host. jpayne@7: jpayne@7: :param host: jpayne@7: Host used for this HTTP Connection (e.g. "localhost"), passed into jpayne@7: :class:`http.client.HTTPConnection`. jpayne@7: jpayne@7: :param port: jpayne@7: Port used for this HTTP Connection (None is equivalent to 80), passed jpayne@7: into :class:`http.client.HTTPConnection`. jpayne@7: jpayne@7: :param timeout: jpayne@7: Socket timeout in seconds for each individual connection. This can jpayne@7: be a float or integer, which sets the timeout for the HTTP request, jpayne@7: or an instance of :class:`urllib3.util.Timeout` which gives you more jpayne@7: fine-grained control over request timeouts. After the constructor has jpayne@7: been parsed, this is always a `urllib3.util.Timeout` object. jpayne@7: jpayne@7: :param maxsize: jpayne@7: Number of connections to save that can be reused. More than 1 is useful jpayne@7: in multithreaded situations. If ``block`` is set to False, more jpayne@7: connections will be created but they will not be saved once they've jpayne@7: been used. jpayne@7: jpayne@7: :param block: jpayne@7: If set to True, no more than ``maxsize`` connections will be used at jpayne@7: a time. When no free connections are available, the call will block jpayne@7: until a connection has been released. This is a useful side effect for jpayne@7: particular multithreaded situations where one does not want to use more jpayne@7: than maxsize connections per host to prevent flooding. jpayne@7: jpayne@7: :param headers: jpayne@7: Headers to include with all requests, unless other headers are given jpayne@7: explicitly. jpayne@7: jpayne@7: :param retries: jpayne@7: Retry configuration to use by default with requests in this pool. jpayne@7: jpayne@7: :param _proxy: jpayne@7: Parsed proxy URL, should not be used directly, instead, see jpayne@7: :class:`urllib3.ProxyManager` jpayne@7: jpayne@7: :param _proxy_headers: jpayne@7: A dictionary with proxy headers, should not be used directly, jpayne@7: instead, see :class:`urllib3.ProxyManager` jpayne@7: jpayne@7: :param \\**conn_kw: jpayne@7: Additional parameters are used to create fresh :class:`urllib3.connection.HTTPConnection`, jpayne@7: :class:`urllib3.connection.HTTPSConnection` instances. jpayne@7: """ jpayne@7: jpayne@7: scheme = "http" jpayne@7: ConnectionCls: ( jpayne@7: type[BaseHTTPConnection] | type[BaseHTTPSConnection] jpayne@7: ) = HTTPConnection jpayne@7: jpayne@7: def __init__( jpayne@7: self, jpayne@7: host: str, jpayne@7: port: int | None = None, jpayne@7: timeout: _TYPE_TIMEOUT | None = _DEFAULT_TIMEOUT, jpayne@7: maxsize: int = 1, jpayne@7: block: bool = False, jpayne@7: headers: typing.Mapping[str, str] | None = None, jpayne@7: retries: Retry | bool | int | None = None, jpayne@7: _proxy: Url | None = None, jpayne@7: _proxy_headers: typing.Mapping[str, str] | None = None, jpayne@7: _proxy_config: ProxyConfig | None = None, jpayne@7: **conn_kw: typing.Any, jpayne@7: ): jpayne@7: ConnectionPool.__init__(self, host, port) jpayne@7: RequestMethods.__init__(self, headers) jpayne@7: jpayne@7: if not isinstance(timeout, Timeout): jpayne@7: timeout = Timeout.from_float(timeout) jpayne@7: jpayne@7: if retries is None: jpayne@7: retries = Retry.DEFAULT jpayne@7: jpayne@7: self.timeout = timeout jpayne@7: self.retries = retries jpayne@7: jpayne@7: self.pool: queue.LifoQueue[typing.Any] | None = self.QueueCls(maxsize) jpayne@7: self.block = block jpayne@7: jpayne@7: self.proxy = _proxy jpayne@7: self.proxy_headers = _proxy_headers or {} jpayne@7: self.proxy_config = _proxy_config jpayne@7: jpayne@7: # Fill the queue up so that doing get() on it will block properly jpayne@7: for _ in range(maxsize): jpayne@7: self.pool.put(None) jpayne@7: jpayne@7: # These are mostly for testing and debugging purposes. jpayne@7: self.num_connections = 0 jpayne@7: self.num_requests = 0 jpayne@7: self.conn_kw = conn_kw jpayne@7: jpayne@7: if self.proxy: jpayne@7: # Enable Nagle's algorithm for proxies, to avoid packet fragmentation. jpayne@7: # We cannot know if the user has added default socket options, so we cannot replace the jpayne@7: # list. jpayne@7: self.conn_kw.setdefault("socket_options", []) jpayne@7: jpayne@7: self.conn_kw["proxy"] = self.proxy jpayne@7: self.conn_kw["proxy_config"] = self.proxy_config jpayne@7: jpayne@7: # Do not pass 'self' as callback to 'finalize'. jpayne@7: # Then the 'finalize' would keep an endless living (leak) to self. jpayne@7: # By just passing a reference to the pool allows the garbage collector jpayne@7: # to free self if nobody else has a reference to it. jpayne@7: pool = self.pool jpayne@7: jpayne@7: # Close all the HTTPConnections in the pool before the jpayne@7: # HTTPConnectionPool object is garbage collected. jpayne@7: weakref.finalize(self, _close_pool_connections, pool) jpayne@7: jpayne@7: def _new_conn(self) -> BaseHTTPConnection: jpayne@7: """ jpayne@7: Return a fresh :class:`HTTPConnection`. jpayne@7: """ jpayne@7: self.num_connections += 1 jpayne@7: log.debug( jpayne@7: "Starting new HTTP connection (%d): %s:%s", jpayne@7: self.num_connections, jpayne@7: self.host, jpayne@7: self.port or "80", jpayne@7: ) jpayne@7: jpayne@7: conn = self.ConnectionCls( jpayne@7: host=self.host, jpayne@7: port=self.port, jpayne@7: timeout=self.timeout.connect_timeout, jpayne@7: **self.conn_kw, jpayne@7: ) jpayne@7: return conn jpayne@7: jpayne@7: def _get_conn(self, timeout: float | None = None) -> BaseHTTPConnection: jpayne@7: """ jpayne@7: Get a connection. Will return a pooled connection if one is available. jpayne@7: jpayne@7: If no connections are available and :prop:`.block` is ``False``, then a jpayne@7: fresh connection is returned. jpayne@7: jpayne@7: :param timeout: jpayne@7: Seconds to wait before giving up and raising jpayne@7: :class:`urllib3.exceptions.EmptyPoolError` if the pool is empty and jpayne@7: :prop:`.block` is ``True``. jpayne@7: """ jpayne@7: conn = None jpayne@7: jpayne@7: if self.pool is None: jpayne@7: raise ClosedPoolError(self, "Pool is closed.") jpayne@7: jpayne@7: try: jpayne@7: conn = self.pool.get(block=self.block, timeout=timeout) jpayne@7: jpayne@7: except AttributeError: # self.pool is None jpayne@7: raise ClosedPoolError(self, "Pool is closed.") from None # Defensive: jpayne@7: jpayne@7: except queue.Empty: jpayne@7: if self.block: jpayne@7: raise EmptyPoolError( jpayne@7: self, jpayne@7: "Pool is empty and a new connection can't be opened due to blocking mode.", jpayne@7: ) from None jpayne@7: pass # Oh well, we'll create a new connection then jpayne@7: jpayne@7: # If this is a persistent connection, check if it got disconnected jpayne@7: if conn and is_connection_dropped(conn): jpayne@7: log.debug("Resetting dropped connection: %s", self.host) jpayne@7: conn.close() jpayne@7: jpayne@7: return conn or self._new_conn() jpayne@7: jpayne@7: def _put_conn(self, conn: BaseHTTPConnection | None) -> None: jpayne@7: """ jpayne@7: Put a connection back into the pool. jpayne@7: jpayne@7: :param conn: jpayne@7: Connection object for the current host and port as returned by jpayne@7: :meth:`._new_conn` or :meth:`._get_conn`. jpayne@7: jpayne@7: If the pool is already full, the connection is closed and discarded jpayne@7: because we exceeded maxsize. If connections are discarded frequently, jpayne@7: then maxsize should be increased. jpayne@7: jpayne@7: If the pool is closed, then the connection will be closed and discarded. jpayne@7: """ jpayne@7: if self.pool is not None: jpayne@7: try: jpayne@7: self.pool.put(conn, block=False) jpayne@7: return # Everything is dandy, done. jpayne@7: except AttributeError: jpayne@7: # self.pool is None. jpayne@7: pass jpayne@7: except queue.Full: jpayne@7: # Connection never got put back into the pool, close it. jpayne@7: if conn: jpayne@7: conn.close() jpayne@7: jpayne@7: if self.block: jpayne@7: # This should never happen if you got the conn from self._get_conn jpayne@7: raise FullPoolError( jpayne@7: self, jpayne@7: "Pool reached maximum size and no more connections are allowed.", jpayne@7: ) from None jpayne@7: jpayne@7: log.warning( jpayne@7: "Connection pool is full, discarding connection: %s. Connection pool size: %s", jpayne@7: self.host, jpayne@7: self.pool.qsize(), jpayne@7: ) jpayne@7: jpayne@7: # Connection never got put back into the pool, close it. jpayne@7: if conn: jpayne@7: conn.close() jpayne@7: jpayne@7: def _validate_conn(self, conn: BaseHTTPConnection) -> None: jpayne@7: """ jpayne@7: Called right before a request is made, after the socket is created. jpayne@7: """ jpayne@7: jpayne@7: def _prepare_proxy(self, conn: BaseHTTPConnection) -> None: jpayne@7: # Nothing to do for HTTP connections. jpayne@7: pass jpayne@7: jpayne@7: def _get_timeout(self, timeout: _TYPE_TIMEOUT) -> Timeout: jpayne@7: """Helper that always returns a :class:`urllib3.util.Timeout`""" jpayne@7: if timeout is _DEFAULT_TIMEOUT: jpayne@7: return self.timeout.clone() jpayne@7: jpayne@7: if isinstance(timeout, Timeout): jpayne@7: return timeout.clone() jpayne@7: else: jpayne@7: # User passed us an int/float. This is for backwards compatibility, jpayne@7: # can be removed later jpayne@7: return Timeout.from_float(timeout) jpayne@7: jpayne@7: def _raise_timeout( jpayne@7: self, jpayne@7: err: BaseSSLError | OSError | SocketTimeout, jpayne@7: url: str, jpayne@7: timeout_value: _TYPE_TIMEOUT | None, jpayne@7: ) -> None: jpayne@7: """Is the error actually a timeout? Will raise a ReadTimeout or pass""" jpayne@7: jpayne@7: if isinstance(err, SocketTimeout): jpayne@7: raise ReadTimeoutError( jpayne@7: self, url, f"Read timed out. (read timeout={timeout_value})" jpayne@7: ) from err jpayne@7: jpayne@7: # See the above comment about EAGAIN in Python 3. jpayne@7: if hasattr(err, "errno") and err.errno in _blocking_errnos: jpayne@7: raise ReadTimeoutError( jpayne@7: self, url, f"Read timed out. (read timeout={timeout_value})" jpayne@7: ) from err jpayne@7: jpayne@7: def _make_request( jpayne@7: self, jpayne@7: conn: BaseHTTPConnection, jpayne@7: method: str, jpayne@7: url: str, jpayne@7: body: _TYPE_BODY | None = None, jpayne@7: headers: typing.Mapping[str, str] | None = None, jpayne@7: retries: Retry | None = None, jpayne@7: timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, jpayne@7: chunked: bool = False, jpayne@7: response_conn: BaseHTTPConnection | None = None, jpayne@7: preload_content: bool = True, jpayne@7: decode_content: bool = True, jpayne@7: enforce_content_length: bool = True, jpayne@7: ) -> BaseHTTPResponse: jpayne@7: """ jpayne@7: Perform a request on a given urllib connection object taken from our jpayne@7: pool. jpayne@7: jpayne@7: :param conn: jpayne@7: a connection from one of our connection pools jpayne@7: jpayne@7: :param method: jpayne@7: HTTP request method (such as GET, POST, PUT, etc.) jpayne@7: jpayne@7: :param url: jpayne@7: The URL to perform the request on. jpayne@7: jpayne@7: :param body: jpayne@7: Data to send in the request body, either :class:`str`, :class:`bytes`, jpayne@7: an iterable of :class:`str`/:class:`bytes`, or a file-like object. jpayne@7: jpayne@7: :param headers: jpayne@7: Dictionary of custom headers to send, such as User-Agent, jpayne@7: If-None-Match, etc. If None, pool headers are used. If provided, jpayne@7: these headers completely replace any pool-specific headers. jpayne@7: jpayne@7: :param retries: jpayne@7: Configure the number of retries to allow before raising a jpayne@7: :class:`~urllib3.exceptions.MaxRetryError` exception. jpayne@7: jpayne@7: Pass ``None`` to retry until you receive a response. Pass a jpayne@7: :class:`~urllib3.util.retry.Retry` object for fine-grained control jpayne@7: over different types of retries. jpayne@7: Pass an integer number to retry connection errors that many times, jpayne@7: but no other types of errors. Pass zero to never retry. jpayne@7: jpayne@7: If ``False``, then retries are disabled and any exception is raised jpayne@7: immediately. Also, instead of raising a MaxRetryError on redirects, jpayne@7: the redirect response will be returned. jpayne@7: jpayne@7: :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int. jpayne@7: jpayne@7: :param timeout: jpayne@7: If specified, overrides the default timeout for this one jpayne@7: request. It may be a float (in seconds) or an instance of jpayne@7: :class:`urllib3.util.Timeout`. jpayne@7: jpayne@7: :param chunked: jpayne@7: If True, urllib3 will send the body using chunked transfer jpayne@7: encoding. Otherwise, urllib3 will send the body using the standard jpayne@7: content-length form. Defaults to False. jpayne@7: jpayne@7: :param response_conn: jpayne@7: Set this to ``None`` if you will handle releasing the connection or jpayne@7: set the connection to have the response release it. jpayne@7: jpayne@7: :param preload_content: jpayne@7: If True, the response's body will be preloaded during construction. jpayne@7: jpayne@7: :param decode_content: jpayne@7: If True, will attempt to decode the body based on the jpayne@7: 'content-encoding' header. jpayne@7: jpayne@7: :param enforce_content_length: jpayne@7: Enforce content length checking. Body returned by server must match jpayne@7: value of Content-Length header, if present. Otherwise, raise error. jpayne@7: """ jpayne@7: self.num_requests += 1 jpayne@7: jpayne@7: timeout_obj = self._get_timeout(timeout) jpayne@7: timeout_obj.start_connect() jpayne@7: conn.timeout = Timeout.resolve_default_timeout(timeout_obj.connect_timeout) jpayne@7: jpayne@7: try: jpayne@7: # Trigger any extra validation we need to do. jpayne@7: try: jpayne@7: self._validate_conn(conn) jpayne@7: except (SocketTimeout, BaseSSLError) as e: jpayne@7: self._raise_timeout(err=e, url=url, timeout_value=conn.timeout) jpayne@7: raise jpayne@7: jpayne@7: # _validate_conn() starts the connection to an HTTPS proxy jpayne@7: # so we need to wrap errors with 'ProxyError' here too. jpayne@7: except ( jpayne@7: OSError, jpayne@7: NewConnectionError, jpayne@7: TimeoutError, jpayne@7: BaseSSLError, jpayne@7: CertificateError, jpayne@7: SSLError, jpayne@7: ) as e: jpayne@7: new_e: Exception = e jpayne@7: if isinstance(e, (BaseSSLError, CertificateError)): jpayne@7: new_e = SSLError(e) jpayne@7: # If the connection didn't successfully connect to it's proxy jpayne@7: # then there jpayne@7: if isinstance( jpayne@7: new_e, (OSError, NewConnectionError, TimeoutError, SSLError) jpayne@7: ) and (conn and conn.proxy and not conn.has_connected_to_proxy): jpayne@7: new_e = _wrap_proxy_error(new_e, conn.proxy.scheme) jpayne@7: raise new_e jpayne@7: jpayne@7: # conn.request() calls http.client.*.request, not the method in jpayne@7: # urllib3.request. It also calls makefile (recv) on the socket. jpayne@7: try: jpayne@7: conn.request( jpayne@7: method, jpayne@7: url, jpayne@7: body=body, jpayne@7: headers=headers, jpayne@7: chunked=chunked, jpayne@7: preload_content=preload_content, jpayne@7: decode_content=decode_content, jpayne@7: enforce_content_length=enforce_content_length, jpayne@7: ) jpayne@7: jpayne@7: # We are swallowing BrokenPipeError (errno.EPIPE) since the server is jpayne@7: # legitimately able to close the connection after sending a valid response. jpayne@7: # With this behaviour, the received response is still readable. jpayne@7: except BrokenPipeError: jpayne@7: pass jpayne@7: except OSError as e: jpayne@7: # MacOS/Linux jpayne@7: # EPROTOTYPE and ECONNRESET are needed on macOS jpayne@7: # https://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/ jpayne@7: # Condition changed later to emit ECONNRESET instead of only EPROTOTYPE. jpayne@7: if e.errno != errno.EPROTOTYPE and e.errno != errno.ECONNRESET: jpayne@7: raise jpayne@7: jpayne@7: # Reset the timeout for the recv() on the socket jpayne@7: read_timeout = timeout_obj.read_timeout jpayne@7: jpayne@7: if not conn.is_closed: jpayne@7: # In Python 3 socket.py will catch EAGAIN and return None when you jpayne@7: # try and read into the file pointer created by http.client, which jpayne@7: # instead raises a BadStatusLine exception. Instead of catching jpayne@7: # the exception and assuming all BadStatusLine exceptions are read jpayne@7: # timeouts, check for a zero timeout before making the request. jpayne@7: if read_timeout == 0: jpayne@7: raise ReadTimeoutError( jpayne@7: self, url, f"Read timed out. (read timeout={read_timeout})" jpayne@7: ) jpayne@7: conn.timeout = read_timeout jpayne@7: jpayne@7: # Receive the response from the server jpayne@7: try: jpayne@7: response = conn.getresponse() jpayne@7: except (BaseSSLError, OSError) as e: jpayne@7: self._raise_timeout(err=e, url=url, timeout_value=read_timeout) jpayne@7: raise jpayne@7: jpayne@7: # Set properties that are used by the pooling layer. jpayne@7: response.retries = retries jpayne@7: response._connection = response_conn # type: ignore[attr-defined] jpayne@7: response._pool = self # type: ignore[attr-defined] jpayne@7: jpayne@7: # emscripten connection doesn't have _http_vsn_str jpayne@7: http_version = getattr(conn, "_http_vsn_str", "HTTP/?") jpayne@7: log.debug( jpayne@7: '%s://%s:%s "%s %s %s" %s %s', jpayne@7: self.scheme, jpayne@7: self.host, jpayne@7: self.port, jpayne@7: method, jpayne@7: url, jpayne@7: # HTTP version jpayne@7: http_version, jpayne@7: response.status, jpayne@7: response.length_remaining, jpayne@7: ) jpayne@7: jpayne@7: return response jpayne@7: jpayne@7: def close(self) -> None: jpayne@7: """ jpayne@7: Close all pooled connections and disable the pool. jpayne@7: """ jpayne@7: if self.pool is None: jpayne@7: return jpayne@7: # Disable access to the pool jpayne@7: old_pool, self.pool = self.pool, None jpayne@7: jpayne@7: # Close all the HTTPConnections in the pool. jpayne@7: _close_pool_connections(old_pool) jpayne@7: jpayne@7: def is_same_host(self, url: str) -> bool: jpayne@7: """ jpayne@7: Check if the given ``url`` is a member of the same host as this jpayne@7: connection pool. jpayne@7: """ jpayne@7: if url.startswith("/"): jpayne@7: return True jpayne@7: jpayne@7: # TODO: Add optional support for socket.gethostbyname checking. jpayne@7: scheme, _, host, port, *_ = parse_url(url) jpayne@7: scheme = scheme or "http" jpayne@7: if host is not None: jpayne@7: host = _normalize_host(host, scheme=scheme) jpayne@7: jpayne@7: # Use explicit default port for comparison when none is given jpayne@7: if self.port and not port: jpayne@7: port = port_by_scheme.get(scheme) jpayne@7: elif not self.port and port == port_by_scheme.get(scheme): jpayne@7: port = None jpayne@7: jpayne@7: return (scheme, host, port) == (self.scheme, self.host, self.port) jpayne@7: jpayne@7: def urlopen( # type: ignore[override] jpayne@7: self, jpayne@7: method: str, jpayne@7: url: str, jpayne@7: body: _TYPE_BODY | None = None, jpayne@7: headers: typing.Mapping[str, str] | None = None, jpayne@7: retries: Retry | bool | int | None = None, jpayne@7: redirect: bool = True, jpayne@7: assert_same_host: bool = True, jpayne@7: timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, jpayne@7: pool_timeout: int | None = None, jpayne@7: release_conn: bool | None = None, jpayne@7: chunked: bool = False, jpayne@7: body_pos: _TYPE_BODY_POSITION | None = None, jpayne@7: preload_content: bool = True, jpayne@7: decode_content: bool = True, jpayne@7: **response_kw: typing.Any, jpayne@7: ) -> BaseHTTPResponse: jpayne@7: """ jpayne@7: Get a connection from the pool and perform an HTTP request. This is the jpayne@7: lowest level call for making a request, so you'll need to specify all jpayne@7: the raw details. jpayne@7: jpayne@7: .. note:: jpayne@7: jpayne@7: More commonly, it's appropriate to use a convenience method jpayne@7: such as :meth:`request`. jpayne@7: jpayne@7: .. note:: jpayne@7: jpayne@7: `release_conn` will only behave as expected if jpayne@7: `preload_content=False` because we want to make jpayne@7: `preload_content=False` the default behaviour someday soon without jpayne@7: breaking backwards compatibility. jpayne@7: jpayne@7: :param method: jpayne@7: HTTP request method (such as GET, POST, PUT, etc.) jpayne@7: jpayne@7: :param url: jpayne@7: The URL to perform the request on. jpayne@7: jpayne@7: :param body: jpayne@7: Data to send in the request body, either :class:`str`, :class:`bytes`, jpayne@7: an iterable of :class:`str`/:class:`bytes`, or a file-like object. jpayne@7: jpayne@7: :param headers: jpayne@7: Dictionary of custom headers to send, such as User-Agent, jpayne@7: If-None-Match, etc. If None, pool headers are used. If provided, jpayne@7: these headers completely replace any pool-specific headers. jpayne@7: jpayne@7: :param retries: jpayne@7: Configure the number of retries to allow before raising a jpayne@7: :class:`~urllib3.exceptions.MaxRetryError` exception. jpayne@7: jpayne@7: If ``None`` (default) will retry 3 times, see ``Retry.DEFAULT``. Pass a jpayne@7: :class:`~urllib3.util.retry.Retry` object for fine-grained control jpayne@7: over different types of retries. jpayne@7: Pass an integer number to retry connection errors that many times, jpayne@7: but no other types of errors. Pass zero to never retry. jpayne@7: jpayne@7: If ``False``, then retries are disabled and any exception is raised jpayne@7: immediately. Also, instead of raising a MaxRetryError on redirects, jpayne@7: the redirect response will be returned. jpayne@7: jpayne@7: :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int. jpayne@7: jpayne@7: :param redirect: jpayne@7: If True, automatically handle redirects (status codes 301, 302, jpayne@7: 303, 307, 308). Each redirect counts as a retry. Disabling retries jpayne@7: will disable redirect, too. jpayne@7: jpayne@7: :param assert_same_host: jpayne@7: If ``True``, will make sure that the host of the pool requests is jpayne@7: consistent else will raise HostChangedError. When ``False``, you can jpayne@7: use the pool on an HTTP proxy and request foreign hosts. jpayne@7: jpayne@7: :param timeout: jpayne@7: If specified, overrides the default timeout for this one jpayne@7: request. It may be a float (in seconds) or an instance of jpayne@7: :class:`urllib3.util.Timeout`. jpayne@7: jpayne@7: :param pool_timeout: jpayne@7: If set and the pool is set to block=True, then this method will jpayne@7: block for ``pool_timeout`` seconds and raise EmptyPoolError if no jpayne@7: connection is available within the time period. jpayne@7: jpayne@7: :param bool preload_content: jpayne@7: If True, the response's body will be preloaded into memory. jpayne@7: jpayne@7: :param bool decode_content: jpayne@7: If True, will attempt to decode the body based on the jpayne@7: 'content-encoding' header. jpayne@7: jpayne@7: :param release_conn: jpayne@7: If False, then the urlopen call will not release the connection jpayne@7: back into the pool once a response is received (but will release if jpayne@7: you read the entire contents of the response such as when jpayne@7: `preload_content=True`). This is useful if you're not preloading jpayne@7: the response's content immediately. You will need to call jpayne@7: ``r.release_conn()`` on the response ``r`` to return the connection jpayne@7: back into the pool. If None, it takes the value of ``preload_content`` jpayne@7: which defaults to ``True``. jpayne@7: jpayne@7: :param bool chunked: jpayne@7: If True, urllib3 will send the body using chunked transfer jpayne@7: encoding. Otherwise, urllib3 will send the body using the standard jpayne@7: content-length form. Defaults to False. jpayne@7: jpayne@7: :param int body_pos: jpayne@7: Position to seek to in file-like body in the event of a retry or jpayne@7: redirect. Typically this won't need to be set because urllib3 will jpayne@7: auto-populate the value when needed. jpayne@7: """ jpayne@7: parsed_url = parse_url(url) jpayne@7: destination_scheme = parsed_url.scheme jpayne@7: jpayne@7: if headers is None: jpayne@7: headers = self.headers jpayne@7: jpayne@7: if not isinstance(retries, Retry): jpayne@7: retries = Retry.from_int(retries, redirect=redirect, default=self.retries) jpayne@7: jpayne@7: if release_conn is None: jpayne@7: release_conn = preload_content jpayne@7: jpayne@7: # Check host jpayne@7: if assert_same_host and not self.is_same_host(url): jpayne@7: raise HostChangedError(self, url, retries) jpayne@7: jpayne@7: # Ensure that the URL we're connecting to is properly encoded jpayne@7: if url.startswith("/"): jpayne@7: url = to_str(_encode_target(url)) jpayne@7: else: jpayne@7: url = to_str(parsed_url.url) jpayne@7: jpayne@7: conn = None jpayne@7: jpayne@7: # Track whether `conn` needs to be released before jpayne@7: # returning/raising/recursing. Update this variable if necessary, and jpayne@7: # leave `release_conn` constant throughout the function. That way, if jpayne@7: # the function recurses, the original value of `release_conn` will be jpayne@7: # passed down into the recursive call, and its value will be respected. jpayne@7: # jpayne@7: # See issue #651 [1] for details. jpayne@7: # jpayne@7: # [1] jpayne@7: release_this_conn = release_conn jpayne@7: jpayne@7: http_tunnel_required = connection_requires_http_tunnel( jpayne@7: self.proxy, self.proxy_config, destination_scheme jpayne@7: ) jpayne@7: jpayne@7: # Merge the proxy headers. Only done when not using HTTP CONNECT. We jpayne@7: # have to copy the headers dict so we can safely change it without those jpayne@7: # changes being reflected in anyone else's copy. jpayne@7: if not http_tunnel_required: jpayne@7: headers = headers.copy() # type: ignore[attr-defined] jpayne@7: headers.update(self.proxy_headers) # type: ignore[union-attr] jpayne@7: jpayne@7: # Must keep the exception bound to a separate variable or else Python 3 jpayne@7: # complains about UnboundLocalError. jpayne@7: err = None jpayne@7: jpayne@7: # Keep track of whether we cleanly exited the except block. This jpayne@7: # ensures we do proper cleanup in finally. jpayne@7: clean_exit = False jpayne@7: jpayne@7: # Rewind body position, if needed. Record current position jpayne@7: # for future rewinds in the event of a redirect/retry. jpayne@7: body_pos = set_file_position(body, body_pos) jpayne@7: jpayne@7: try: jpayne@7: # Request a connection from the queue. jpayne@7: timeout_obj = self._get_timeout(timeout) jpayne@7: conn = self._get_conn(timeout=pool_timeout) jpayne@7: jpayne@7: conn.timeout = timeout_obj.connect_timeout # type: ignore[assignment] jpayne@7: jpayne@7: # Is this a closed/new connection that requires CONNECT tunnelling? jpayne@7: if self.proxy is not None and http_tunnel_required and conn.is_closed: jpayne@7: try: jpayne@7: self._prepare_proxy(conn) jpayne@7: except (BaseSSLError, OSError, SocketTimeout) as e: jpayne@7: self._raise_timeout( jpayne@7: err=e, url=self.proxy.url, timeout_value=conn.timeout jpayne@7: ) jpayne@7: raise jpayne@7: jpayne@7: # If we're going to release the connection in ``finally:``, then jpayne@7: # the response doesn't need to know about the connection. Otherwise jpayne@7: # it will also try to release it and we'll have a double-release jpayne@7: # mess. jpayne@7: response_conn = conn if not release_conn else None jpayne@7: jpayne@7: # Make the request on the HTTPConnection object jpayne@7: response = self._make_request( jpayne@7: conn, jpayne@7: method, jpayne@7: url, jpayne@7: timeout=timeout_obj, jpayne@7: body=body, jpayne@7: headers=headers, jpayne@7: chunked=chunked, jpayne@7: retries=retries, jpayne@7: response_conn=response_conn, jpayne@7: preload_content=preload_content, jpayne@7: decode_content=decode_content, jpayne@7: **response_kw, jpayne@7: ) jpayne@7: jpayne@7: # Everything went great! jpayne@7: clean_exit = True jpayne@7: jpayne@7: except EmptyPoolError: jpayne@7: # Didn't get a connection from the pool, no need to clean up jpayne@7: clean_exit = True jpayne@7: release_this_conn = False jpayne@7: raise jpayne@7: jpayne@7: except ( jpayne@7: TimeoutError, jpayne@7: HTTPException, jpayne@7: OSError, jpayne@7: ProtocolError, jpayne@7: BaseSSLError, jpayne@7: SSLError, jpayne@7: CertificateError, jpayne@7: ProxyError, jpayne@7: ) as e: jpayne@7: # Discard the connection for these exceptions. It will be jpayne@7: # replaced during the next _get_conn() call. jpayne@7: clean_exit = False jpayne@7: new_e: Exception = e jpayne@7: if isinstance(e, (BaseSSLError, CertificateError)): jpayne@7: new_e = SSLError(e) jpayne@7: if isinstance( jpayne@7: new_e, jpayne@7: ( jpayne@7: OSError, jpayne@7: NewConnectionError, jpayne@7: TimeoutError, jpayne@7: SSLError, jpayne@7: HTTPException, jpayne@7: ), jpayne@7: ) and (conn and conn.proxy and not conn.has_connected_to_proxy): jpayne@7: new_e = _wrap_proxy_error(new_e, conn.proxy.scheme) jpayne@7: elif isinstance(new_e, (OSError, HTTPException)): jpayne@7: new_e = ProtocolError("Connection aborted.", new_e) jpayne@7: jpayne@7: retries = retries.increment( jpayne@7: method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2] jpayne@7: ) jpayne@7: retries.sleep() jpayne@7: jpayne@7: # Keep track of the error for the retry warning. jpayne@7: err = e jpayne@7: jpayne@7: finally: jpayne@7: if not clean_exit: jpayne@7: # We hit some kind of exception, handled or otherwise. We need jpayne@7: # to throw the connection away unless explicitly told not to. jpayne@7: # Close the connection, set the variable to None, and make sure jpayne@7: # we put the None back in the pool to avoid leaking it. jpayne@7: if conn: jpayne@7: conn.close() jpayne@7: conn = None jpayne@7: release_this_conn = True jpayne@7: jpayne@7: if release_this_conn: jpayne@7: # Put the connection back to be reused. If the connection is jpayne@7: # expired then it will be None, which will get replaced with a jpayne@7: # fresh connection during _get_conn. jpayne@7: self._put_conn(conn) jpayne@7: jpayne@7: if not conn: jpayne@7: # Try again jpayne@7: log.warning( jpayne@7: "Retrying (%r) after connection broken by '%r': %s", retries, err, url jpayne@7: ) jpayne@7: return self.urlopen( jpayne@7: method, jpayne@7: url, jpayne@7: body, jpayne@7: headers, jpayne@7: retries, jpayne@7: redirect, jpayne@7: assert_same_host, jpayne@7: timeout=timeout, jpayne@7: pool_timeout=pool_timeout, jpayne@7: release_conn=release_conn, jpayne@7: chunked=chunked, jpayne@7: body_pos=body_pos, jpayne@7: preload_content=preload_content, jpayne@7: decode_content=decode_content, jpayne@7: **response_kw, jpayne@7: ) jpayne@7: jpayne@7: # Handle redirect? jpayne@7: redirect_location = redirect and response.get_redirect_location() jpayne@7: if redirect_location: jpayne@7: if response.status == 303: jpayne@7: # Change the method according to RFC 9110, Section 15.4.4. jpayne@7: method = "GET" jpayne@7: # And lose the body not to transfer anything sensitive. jpayne@7: body = None jpayne@7: headers = HTTPHeaderDict(headers)._prepare_for_method_change() jpayne@7: jpayne@7: try: jpayne@7: retries = retries.increment(method, url, response=response, _pool=self) jpayne@7: except MaxRetryError: jpayne@7: if retries.raise_on_redirect: jpayne@7: response.drain_conn() jpayne@7: raise jpayne@7: return response jpayne@7: jpayne@7: response.drain_conn() jpayne@7: retries.sleep_for_retry(response) jpayne@7: log.debug("Redirecting %s -> %s", url, redirect_location) jpayne@7: return self.urlopen( jpayne@7: method, jpayne@7: redirect_location, jpayne@7: body, jpayne@7: headers, jpayne@7: retries=retries, jpayne@7: redirect=redirect, jpayne@7: assert_same_host=assert_same_host, jpayne@7: timeout=timeout, jpayne@7: pool_timeout=pool_timeout, jpayne@7: release_conn=release_conn, jpayne@7: chunked=chunked, jpayne@7: body_pos=body_pos, jpayne@7: preload_content=preload_content, jpayne@7: decode_content=decode_content, jpayne@7: **response_kw, jpayne@7: ) jpayne@7: jpayne@7: # Check if we should retry the HTTP response. jpayne@7: has_retry_after = bool(response.headers.get("Retry-After")) jpayne@7: if retries.is_retry(method, response.status, has_retry_after): jpayne@7: try: jpayne@7: retries = retries.increment(method, url, response=response, _pool=self) jpayne@7: except MaxRetryError: jpayne@7: if retries.raise_on_status: jpayne@7: response.drain_conn() jpayne@7: raise jpayne@7: return response jpayne@7: jpayne@7: response.drain_conn() jpayne@7: retries.sleep(response) jpayne@7: log.debug("Retry: %s", url) jpayne@7: return self.urlopen( jpayne@7: method, jpayne@7: url, jpayne@7: body, jpayne@7: headers, jpayne@7: retries=retries, jpayne@7: redirect=redirect, jpayne@7: assert_same_host=assert_same_host, jpayne@7: timeout=timeout, jpayne@7: pool_timeout=pool_timeout, jpayne@7: release_conn=release_conn, jpayne@7: chunked=chunked, jpayne@7: body_pos=body_pos, jpayne@7: preload_content=preload_content, jpayne@7: decode_content=decode_content, jpayne@7: **response_kw, jpayne@7: ) jpayne@7: jpayne@7: return response jpayne@7: jpayne@7: jpayne@7: class HTTPSConnectionPool(HTTPConnectionPool): jpayne@7: """ jpayne@7: Same as :class:`.HTTPConnectionPool`, but HTTPS. jpayne@7: jpayne@7: :class:`.HTTPSConnection` uses one of ``assert_fingerprint``, jpayne@7: ``assert_hostname`` and ``host`` in this order to verify connections. jpayne@7: If ``assert_hostname`` is False, no verification is done. jpayne@7: jpayne@7: The ``key_file``, ``cert_file``, ``cert_reqs``, ``ca_certs``, jpayne@7: ``ca_cert_dir``, ``ssl_version``, ``key_password`` are only used if :mod:`ssl` jpayne@7: is available and are fed into :meth:`urllib3.util.ssl_wrap_socket` to upgrade jpayne@7: the connection socket into an SSL socket. jpayne@7: """ jpayne@7: jpayne@7: scheme = "https" jpayne@7: ConnectionCls: type[BaseHTTPSConnection] = HTTPSConnection jpayne@7: jpayne@7: def __init__( jpayne@7: self, jpayne@7: host: str, jpayne@7: port: int | None = None, jpayne@7: timeout: _TYPE_TIMEOUT | None = _DEFAULT_TIMEOUT, jpayne@7: maxsize: int = 1, jpayne@7: block: bool = False, jpayne@7: headers: typing.Mapping[str, str] | None = None, jpayne@7: retries: Retry | bool | int | None = None, jpayne@7: _proxy: Url | None = None, jpayne@7: _proxy_headers: typing.Mapping[str, str] | None = None, jpayne@7: key_file: str | None = None, jpayne@7: cert_file: str | None = None, jpayne@7: cert_reqs: int | str | None = None, jpayne@7: key_password: str | None = None, jpayne@7: ca_certs: str | None = None, jpayne@7: ssl_version: int | str | None = None, jpayne@7: ssl_minimum_version: ssl.TLSVersion | None = None, jpayne@7: ssl_maximum_version: ssl.TLSVersion | None = None, jpayne@7: assert_hostname: str | Literal[False] | None = None, jpayne@7: assert_fingerprint: str | None = None, jpayne@7: ca_cert_dir: str | None = None, jpayne@7: **conn_kw: typing.Any, jpayne@7: ) -> None: jpayne@7: super().__init__( jpayne@7: host, jpayne@7: port, jpayne@7: timeout, jpayne@7: maxsize, jpayne@7: block, jpayne@7: headers, jpayne@7: retries, jpayne@7: _proxy, jpayne@7: _proxy_headers, jpayne@7: **conn_kw, jpayne@7: ) jpayne@7: jpayne@7: self.key_file = key_file jpayne@7: self.cert_file = cert_file jpayne@7: self.cert_reqs = cert_reqs jpayne@7: self.key_password = key_password jpayne@7: self.ca_certs = ca_certs jpayne@7: self.ca_cert_dir = ca_cert_dir jpayne@7: self.ssl_version = ssl_version jpayne@7: self.ssl_minimum_version = ssl_minimum_version jpayne@7: self.ssl_maximum_version = ssl_maximum_version jpayne@7: self.assert_hostname = assert_hostname jpayne@7: self.assert_fingerprint = assert_fingerprint jpayne@7: jpayne@7: def _prepare_proxy(self, conn: HTTPSConnection) -> None: # type: ignore[override] jpayne@7: """Establishes a tunnel connection through HTTP CONNECT.""" jpayne@7: if self.proxy and self.proxy.scheme == "https": jpayne@7: tunnel_scheme = "https" jpayne@7: else: jpayne@7: tunnel_scheme = "http" jpayne@7: jpayne@7: conn.set_tunnel( jpayne@7: scheme=tunnel_scheme, jpayne@7: host=self._tunnel_host, jpayne@7: port=self.port, jpayne@7: headers=self.proxy_headers, jpayne@7: ) jpayne@7: conn.connect() jpayne@7: jpayne@7: def _new_conn(self) -> BaseHTTPSConnection: jpayne@7: """ jpayne@7: Return a fresh :class:`urllib3.connection.HTTPConnection`. jpayne@7: """ jpayne@7: self.num_connections += 1 jpayne@7: log.debug( jpayne@7: "Starting new HTTPS connection (%d): %s:%s", jpayne@7: self.num_connections, jpayne@7: self.host, jpayne@7: self.port or "443", jpayne@7: ) jpayne@7: jpayne@7: if not self.ConnectionCls or self.ConnectionCls is DummyConnection: # type: ignore[comparison-overlap] jpayne@7: raise ImportError( jpayne@7: "Can't connect to HTTPS URL because the SSL module is not available." jpayne@7: ) jpayne@7: jpayne@7: actual_host: str = self.host jpayne@7: actual_port = self.port jpayne@7: if self.proxy is not None and self.proxy.host is not None: jpayne@7: actual_host = self.proxy.host jpayne@7: actual_port = self.proxy.port jpayne@7: jpayne@7: return self.ConnectionCls( jpayne@7: host=actual_host, jpayne@7: port=actual_port, jpayne@7: timeout=self.timeout.connect_timeout, jpayne@7: cert_file=self.cert_file, jpayne@7: key_file=self.key_file, jpayne@7: key_password=self.key_password, jpayne@7: cert_reqs=self.cert_reqs, jpayne@7: ca_certs=self.ca_certs, jpayne@7: ca_cert_dir=self.ca_cert_dir, jpayne@7: assert_hostname=self.assert_hostname, jpayne@7: assert_fingerprint=self.assert_fingerprint, jpayne@7: ssl_version=self.ssl_version, jpayne@7: ssl_minimum_version=self.ssl_minimum_version, jpayne@7: ssl_maximum_version=self.ssl_maximum_version, jpayne@7: **self.conn_kw, jpayne@7: ) jpayne@7: jpayne@7: def _validate_conn(self, conn: BaseHTTPConnection) -> None: jpayne@7: """ jpayne@7: Called right before a request is made, after the socket is created. jpayne@7: """ jpayne@7: super()._validate_conn(conn) jpayne@7: jpayne@7: # Force connect early to allow us to validate the connection. jpayne@7: if conn.is_closed: jpayne@7: conn.connect() jpayne@7: jpayne@7: # TODO revise this, see https://github.com/urllib3/urllib3/issues/2791 jpayne@7: if not conn.is_verified and not conn.proxy_is_verified: jpayne@7: warnings.warn( jpayne@7: ( jpayne@7: f"Unverified HTTPS request is being made to host '{conn.host}'. " jpayne@7: "Adding certificate verification is strongly advised. See: " jpayne@7: "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" jpayne@7: "#tls-warnings" jpayne@7: ), jpayne@7: InsecureRequestWarning, jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: def connection_from_url(url: str, **kw: typing.Any) -> HTTPConnectionPool: jpayne@7: """ jpayne@7: Given a url, return an :class:`.ConnectionPool` instance of its host. jpayne@7: jpayne@7: This is a shortcut for not having to parse out the scheme, host, and port jpayne@7: of the url before creating an :class:`.ConnectionPool` instance. jpayne@7: jpayne@7: :param url: jpayne@7: Absolute URL string that must include the scheme. Port is optional. jpayne@7: jpayne@7: :param \\**kw: jpayne@7: Passes additional parameters to the constructor of the appropriate jpayne@7: :class:`.ConnectionPool`. Useful for specifying things like jpayne@7: timeout, maxsize, headers, etc. jpayne@7: jpayne@7: Example:: jpayne@7: jpayne@7: >>> conn = connection_from_url('http://google.com/') jpayne@7: >>> r = conn.request('GET', '/') jpayne@7: """ jpayne@7: scheme, _, host, port, *_ = parse_url(url) jpayne@7: scheme = scheme or "http" jpayne@7: port = port or port_by_scheme.get(scheme, 80) jpayne@7: if scheme == "https": jpayne@7: return HTTPSConnectionPool(host, port=port, **kw) # type: ignore[arg-type] jpayne@7: else: jpayne@7: return HTTPConnectionPool(host, port=port, **kw) # type: ignore[arg-type] jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def _normalize_host(host: None, scheme: str | None) -> None: jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def _normalize_host(host: str, scheme: str | None) -> str: jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: def _normalize_host(host: str | None, scheme: str | None) -> str | None: jpayne@7: """ jpayne@7: Normalize hosts for comparisons and use with sockets. jpayne@7: """ jpayne@7: jpayne@7: host = normalize_host(host, scheme) jpayne@7: jpayne@7: # httplib doesn't like it when we include brackets in IPv6 addresses jpayne@7: # Specifically, if we include brackets but also pass the port then jpayne@7: # httplib crazily doubles up the square brackets on the Host header. jpayne@7: # Instead, we need to make sure we never pass ``None`` as the port. jpayne@7: # However, for backward compatibility reasons we can't actually jpayne@7: # *assert* that. See http://bugs.python.org/issue28539 jpayne@7: if host and host.startswith("[") and host.endswith("]"): jpayne@7: host = host[1:-1] jpayne@7: return host jpayne@7: jpayne@7: jpayne@7: def _url_from_pool( jpayne@7: pool: HTTPConnectionPool | HTTPSConnectionPool, path: str | None = None jpayne@7: ) -> str: jpayne@7: """Returns the URL from a given connection pool. This is mainly used for testing and logging.""" jpayne@7: return Url(scheme=pool.scheme, host=pool.host, port=pool.port, path=path).url jpayne@7: jpayne@7: jpayne@7: def _close_pool_connections(pool: queue.LifoQueue[typing.Any]) -> None: jpayne@7: """Drains a queue of connections and closes each one.""" jpayne@7: try: jpayne@7: while True: jpayne@7: conn = pool.get(block=False) jpayne@7: if conn: jpayne@7: conn.close() jpayne@7: except queue.Empty: jpayne@7: pass # Done.