jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import io jpayne@7: import typing jpayne@7: from base64 import b64encode jpayne@7: from enum import Enum jpayne@7: jpayne@7: from ..exceptions import UnrewindableBodyError jpayne@7: from .util import to_bytes jpayne@7: jpayne@7: if typing.TYPE_CHECKING: jpayne@7: from typing import Final jpayne@7: jpayne@7: # Pass as a value within ``headers`` to skip jpayne@7: # emitting some HTTP headers that are added automatically. jpayne@7: # The only headers that are supported are ``Accept-Encoding``, jpayne@7: # ``Host``, and ``User-Agent``. jpayne@7: SKIP_HEADER = "@@@SKIP_HEADER@@@" jpayne@7: SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"]) jpayne@7: jpayne@7: ACCEPT_ENCODING = "gzip,deflate" jpayne@7: try: jpayne@7: try: jpayne@7: import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 jpayne@7: except ImportError: jpayne@7: import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 jpayne@7: except ImportError: jpayne@7: pass jpayne@7: else: jpayne@7: ACCEPT_ENCODING += ",br" jpayne@7: try: jpayne@7: import zstandard as _unused_module_zstd # type: ignore[import-not-found] # noqa: F401 jpayne@7: except ImportError: jpayne@7: pass jpayne@7: else: jpayne@7: ACCEPT_ENCODING += ",zstd" jpayne@7: jpayne@7: jpayne@7: class _TYPE_FAILEDTELL(Enum): jpayne@7: token = 0 jpayne@7: jpayne@7: jpayne@7: _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token jpayne@7: jpayne@7: _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL] jpayne@7: jpayne@7: # When sending a request with these methods we aren't expecting jpayne@7: # a body so don't need to set an explicit 'Content-Length: 0' jpayne@7: # The reason we do this in the negative instead of tracking methods jpayne@7: # which 'should' have a body is because unknown methods should be jpayne@7: # treated as if they were 'POST' which *does* expect a body. jpayne@7: _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"} jpayne@7: jpayne@7: jpayne@7: def make_headers( jpayne@7: keep_alive: bool | None = None, jpayne@7: accept_encoding: bool | list[str] | str | None = None, jpayne@7: user_agent: str | None = None, jpayne@7: basic_auth: str | None = None, jpayne@7: proxy_basic_auth: str | None = None, jpayne@7: disable_cache: bool | None = None, jpayne@7: ) -> dict[str, str]: jpayne@7: """ jpayne@7: Shortcuts for generating request headers. jpayne@7: jpayne@7: :param keep_alive: jpayne@7: If ``True``, adds 'connection: keep-alive' header. jpayne@7: jpayne@7: :param accept_encoding: jpayne@7: Can be a boolean, list, or string. jpayne@7: ``True`` translates to 'gzip,deflate'. If either the ``brotli`` or jpayne@7: ``brotlicffi`` package is installed 'gzip,deflate,br' is used instead. jpayne@7: List will get joined by comma. jpayne@7: String will be used as provided. jpayne@7: jpayne@7: :param user_agent: jpayne@7: String representing the user-agent you want, such as jpayne@7: "python-urllib3/0.6" jpayne@7: jpayne@7: :param basic_auth: jpayne@7: Colon-separated username:password string for 'authorization: basic ...' jpayne@7: auth header. jpayne@7: jpayne@7: :param proxy_basic_auth: jpayne@7: Colon-separated username:password string for 'proxy-authorization: basic ...' jpayne@7: auth header. jpayne@7: jpayne@7: :param disable_cache: jpayne@7: If ``True``, adds 'cache-control: no-cache' header. jpayne@7: jpayne@7: Example: jpayne@7: jpayne@7: .. code-block:: python jpayne@7: jpayne@7: import urllib3 jpayne@7: jpayne@7: print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0")) jpayne@7: # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'} jpayne@7: print(urllib3.util.make_headers(accept_encoding=True)) jpayne@7: # {'accept-encoding': 'gzip,deflate'} jpayne@7: """ jpayne@7: headers: dict[str, str] = {} jpayne@7: if accept_encoding: jpayne@7: if isinstance(accept_encoding, str): jpayne@7: pass jpayne@7: elif isinstance(accept_encoding, list): jpayne@7: accept_encoding = ",".join(accept_encoding) jpayne@7: else: jpayne@7: accept_encoding = ACCEPT_ENCODING jpayne@7: headers["accept-encoding"] = accept_encoding jpayne@7: jpayne@7: if user_agent: jpayne@7: headers["user-agent"] = user_agent jpayne@7: jpayne@7: if keep_alive: jpayne@7: headers["connection"] = "keep-alive" jpayne@7: jpayne@7: if basic_auth: jpayne@7: headers[ jpayne@7: "authorization" jpayne@7: ] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}" jpayne@7: jpayne@7: if proxy_basic_auth: jpayne@7: headers[ jpayne@7: "proxy-authorization" jpayne@7: ] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}" jpayne@7: jpayne@7: if disable_cache: jpayne@7: headers["cache-control"] = "no-cache" jpayne@7: jpayne@7: return headers jpayne@7: jpayne@7: jpayne@7: def set_file_position( jpayne@7: body: typing.Any, pos: _TYPE_BODY_POSITION | None jpayne@7: ) -> _TYPE_BODY_POSITION | None: jpayne@7: """ jpayne@7: If a position is provided, move file to that point. jpayne@7: Otherwise, we'll attempt to record a position for future use. jpayne@7: """ jpayne@7: if pos is not None: jpayne@7: rewind_body(body, pos) jpayne@7: elif getattr(body, "tell", None) is not None: jpayne@7: try: jpayne@7: pos = body.tell() jpayne@7: except OSError: jpayne@7: # This differentiates from None, allowing us to catch jpayne@7: # a failed `tell()` later when trying to rewind the body. jpayne@7: pos = _FAILEDTELL jpayne@7: jpayne@7: return pos jpayne@7: jpayne@7: jpayne@7: def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None: jpayne@7: """ jpayne@7: Attempt to rewind body to a certain position. jpayne@7: Primarily used for request redirects and retries. jpayne@7: jpayne@7: :param body: jpayne@7: File-like object that supports seek. jpayne@7: jpayne@7: :param int pos: jpayne@7: Position to seek to in file. jpayne@7: """ jpayne@7: body_seek = getattr(body, "seek", None) jpayne@7: if body_seek is not None and isinstance(body_pos, int): jpayne@7: try: jpayne@7: body_seek(body_pos) jpayne@7: except OSError as e: jpayne@7: raise UnrewindableBodyError( jpayne@7: "An error occurred when rewinding request body for redirect/retry." jpayne@7: ) from e jpayne@7: elif body_pos is _FAILEDTELL: jpayne@7: raise UnrewindableBodyError( jpayne@7: "Unable to record file position for rewinding " jpayne@7: "request body during a redirect/retry." jpayne@7: ) jpayne@7: else: jpayne@7: raise ValueError( jpayne@7: f"body_pos must be of type integer, instead it was {type(body_pos)}." jpayne@7: ) jpayne@7: jpayne@7: jpayne@7: class ChunksAndContentLength(typing.NamedTuple): jpayne@7: chunks: typing.Iterable[bytes] | None jpayne@7: content_length: int | None jpayne@7: jpayne@7: jpayne@7: def body_to_chunks( jpayne@7: body: typing.Any | None, method: str, blocksize: int jpayne@7: ) -> ChunksAndContentLength: jpayne@7: """Takes the HTTP request method, body, and blocksize and jpayne@7: transforms them into an iterable of chunks to pass to jpayne@7: socket.sendall() and an optional 'Content-Length' header. jpayne@7: jpayne@7: A 'Content-Length' of 'None' indicates the length of the body jpayne@7: can't be determined so should use 'Transfer-Encoding: chunked' jpayne@7: for framing instead. jpayne@7: """ jpayne@7: jpayne@7: chunks: typing.Iterable[bytes] | None jpayne@7: content_length: int | None jpayne@7: jpayne@7: # No body, we need to make a recommendation on 'Content-Length' jpayne@7: # based on whether that request method is expected to have jpayne@7: # a body or not. jpayne@7: if body is None: jpayne@7: chunks = None jpayne@7: if method.upper() not in _METHODS_NOT_EXPECTING_BODY: jpayne@7: content_length = 0 jpayne@7: else: jpayne@7: content_length = None jpayne@7: jpayne@7: # Bytes or strings become bytes jpayne@7: elif isinstance(body, (str, bytes)): jpayne@7: chunks = (to_bytes(body),) jpayne@7: content_length = len(chunks[0]) jpayne@7: jpayne@7: # File-like object, TODO: use seek() and tell() for length? jpayne@7: elif hasattr(body, "read"): jpayne@7: jpayne@7: def chunk_readable() -> typing.Iterable[bytes]: jpayne@7: nonlocal body, blocksize jpayne@7: encode = isinstance(body, io.TextIOBase) jpayne@7: while True: jpayne@7: datablock = body.read(blocksize) jpayne@7: if not datablock: jpayne@7: break jpayne@7: if encode: jpayne@7: datablock = datablock.encode("iso-8859-1") jpayne@7: yield datablock jpayne@7: jpayne@7: chunks = chunk_readable() jpayne@7: content_length = None jpayne@7: jpayne@7: # Otherwise we need to start checking via duck-typing. jpayne@7: else: jpayne@7: try: jpayne@7: # Check if the body implements the buffer API. jpayne@7: mv = memoryview(body) jpayne@7: except TypeError: jpayne@7: try: jpayne@7: # Check if the body is an iterable jpayne@7: chunks = iter(body) jpayne@7: content_length = None jpayne@7: except TypeError: jpayne@7: raise TypeError( jpayne@7: f"'body' must be a bytes-like object, file-like " jpayne@7: f"object, or iterable. Instead was {body!r}" jpayne@7: ) from None jpayne@7: else: jpayne@7: # Since it implements the buffer API can be passed directly to socket.sendall() jpayne@7: chunks = (body,) jpayne@7: content_length = mv.nbytes jpayne@7: jpayne@7: return ChunksAndContentLength(chunks=chunks, content_length=content_length)