annotate urllib3/util/request.py @ 14:18e1cb6018fd

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Mon, 20 May 2024 02:25:23 -0400
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 from __future__ import annotations
jpayne@7 2
jpayne@7 3 import io
jpayne@7 4 import typing
jpayne@7 5 from base64 import b64encode
jpayne@7 6 from enum import Enum
jpayne@7 7
jpayne@7 8 from ..exceptions import UnrewindableBodyError
jpayne@7 9 from .util import to_bytes
jpayne@7 10
jpayne@7 11 if typing.TYPE_CHECKING:
jpayne@7 12 from typing import Final
jpayne@7 13
jpayne@7 14 # Pass as a value within ``headers`` to skip
jpayne@7 15 # emitting some HTTP headers that are added automatically.
jpayne@7 16 # The only headers that are supported are ``Accept-Encoding``,
jpayne@7 17 # ``Host``, and ``User-Agent``.
jpayne@7 18 SKIP_HEADER = "@@@SKIP_HEADER@@@"
jpayne@7 19 SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
jpayne@7 20
jpayne@7 21 ACCEPT_ENCODING = "gzip,deflate"
jpayne@7 22 try:
jpayne@7 23 try:
jpayne@7 24 import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
jpayne@7 25 except ImportError:
jpayne@7 26 import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
jpayne@7 27 except ImportError:
jpayne@7 28 pass
jpayne@7 29 else:
jpayne@7 30 ACCEPT_ENCODING += ",br"
jpayne@7 31 try:
jpayne@7 32 import zstandard as _unused_module_zstd # type: ignore[import-not-found] # noqa: F401
jpayne@7 33 except ImportError:
jpayne@7 34 pass
jpayne@7 35 else:
jpayne@7 36 ACCEPT_ENCODING += ",zstd"
jpayne@7 37
jpayne@7 38
jpayne@7 39 class _TYPE_FAILEDTELL(Enum):
jpayne@7 40 token = 0
jpayne@7 41
jpayne@7 42
jpayne@7 43 _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
jpayne@7 44
jpayne@7 45 _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
jpayne@7 46
jpayne@7 47 # When sending a request with these methods we aren't expecting
jpayne@7 48 # a body so don't need to set an explicit 'Content-Length: 0'
jpayne@7 49 # The reason we do this in the negative instead of tracking methods
jpayne@7 50 # which 'should' have a body is because unknown methods should be
jpayne@7 51 # treated as if they were 'POST' which *does* expect a body.
jpayne@7 52 _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
jpayne@7 53
jpayne@7 54
jpayne@7 55 def make_headers(
jpayne@7 56 keep_alive: bool | None = None,
jpayne@7 57 accept_encoding: bool | list[str] | str | None = None,
jpayne@7 58 user_agent: str | None = None,
jpayne@7 59 basic_auth: str | None = None,
jpayne@7 60 proxy_basic_auth: str | None = None,
jpayne@7 61 disable_cache: bool | None = None,
jpayne@7 62 ) -> dict[str, str]:
jpayne@7 63 """
jpayne@7 64 Shortcuts for generating request headers.
jpayne@7 65
jpayne@7 66 :param keep_alive:
jpayne@7 67 If ``True``, adds 'connection: keep-alive' header.
jpayne@7 68
jpayne@7 69 :param accept_encoding:
jpayne@7 70 Can be a boolean, list, or string.
jpayne@7 71 ``True`` translates to 'gzip,deflate'. If either the ``brotli`` or
jpayne@7 72 ``brotlicffi`` package is installed 'gzip,deflate,br' is used instead.
jpayne@7 73 List will get joined by comma.
jpayne@7 74 String will be used as provided.
jpayne@7 75
jpayne@7 76 :param user_agent:
jpayne@7 77 String representing the user-agent you want, such as
jpayne@7 78 "python-urllib3/0.6"
jpayne@7 79
jpayne@7 80 :param basic_auth:
jpayne@7 81 Colon-separated username:password string for 'authorization: basic ...'
jpayne@7 82 auth header.
jpayne@7 83
jpayne@7 84 :param proxy_basic_auth:
jpayne@7 85 Colon-separated username:password string for 'proxy-authorization: basic ...'
jpayne@7 86 auth header.
jpayne@7 87
jpayne@7 88 :param disable_cache:
jpayne@7 89 If ``True``, adds 'cache-control: no-cache' header.
jpayne@7 90
jpayne@7 91 Example:
jpayne@7 92
jpayne@7 93 .. code-block:: python
jpayne@7 94
jpayne@7 95 import urllib3
jpayne@7 96
jpayne@7 97 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
jpayne@7 98 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
jpayne@7 99 print(urllib3.util.make_headers(accept_encoding=True))
jpayne@7 100 # {'accept-encoding': 'gzip,deflate'}
jpayne@7 101 """
jpayne@7 102 headers: dict[str, str] = {}
jpayne@7 103 if accept_encoding:
jpayne@7 104 if isinstance(accept_encoding, str):
jpayne@7 105 pass
jpayne@7 106 elif isinstance(accept_encoding, list):
jpayne@7 107 accept_encoding = ",".join(accept_encoding)
jpayne@7 108 else:
jpayne@7 109 accept_encoding = ACCEPT_ENCODING
jpayne@7 110 headers["accept-encoding"] = accept_encoding
jpayne@7 111
jpayne@7 112 if user_agent:
jpayne@7 113 headers["user-agent"] = user_agent
jpayne@7 114
jpayne@7 115 if keep_alive:
jpayne@7 116 headers["connection"] = "keep-alive"
jpayne@7 117
jpayne@7 118 if basic_auth:
jpayne@7 119 headers[
jpayne@7 120 "authorization"
jpayne@7 121 ] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
jpayne@7 122
jpayne@7 123 if proxy_basic_auth:
jpayne@7 124 headers[
jpayne@7 125 "proxy-authorization"
jpayne@7 126 ] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
jpayne@7 127
jpayne@7 128 if disable_cache:
jpayne@7 129 headers["cache-control"] = "no-cache"
jpayne@7 130
jpayne@7 131 return headers
jpayne@7 132
jpayne@7 133
jpayne@7 134 def set_file_position(
jpayne@7 135 body: typing.Any, pos: _TYPE_BODY_POSITION | None
jpayne@7 136 ) -> _TYPE_BODY_POSITION | None:
jpayne@7 137 """
jpayne@7 138 If a position is provided, move file to that point.
jpayne@7 139 Otherwise, we'll attempt to record a position for future use.
jpayne@7 140 """
jpayne@7 141 if pos is not None:
jpayne@7 142 rewind_body(body, pos)
jpayne@7 143 elif getattr(body, "tell", None) is not None:
jpayne@7 144 try:
jpayne@7 145 pos = body.tell()
jpayne@7 146 except OSError:
jpayne@7 147 # This differentiates from None, allowing us to catch
jpayne@7 148 # a failed `tell()` later when trying to rewind the body.
jpayne@7 149 pos = _FAILEDTELL
jpayne@7 150
jpayne@7 151 return pos
jpayne@7 152
jpayne@7 153
jpayne@7 154 def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
jpayne@7 155 """
jpayne@7 156 Attempt to rewind body to a certain position.
jpayne@7 157 Primarily used for request redirects and retries.
jpayne@7 158
jpayne@7 159 :param body:
jpayne@7 160 File-like object that supports seek.
jpayne@7 161
jpayne@7 162 :param int pos:
jpayne@7 163 Position to seek to in file.
jpayne@7 164 """
jpayne@7 165 body_seek = getattr(body, "seek", None)
jpayne@7 166 if body_seek is not None and isinstance(body_pos, int):
jpayne@7 167 try:
jpayne@7 168 body_seek(body_pos)
jpayne@7 169 except OSError as e:
jpayne@7 170 raise UnrewindableBodyError(
jpayne@7 171 "An error occurred when rewinding request body for redirect/retry."
jpayne@7 172 ) from e
jpayne@7 173 elif body_pos is _FAILEDTELL:
jpayne@7 174 raise UnrewindableBodyError(
jpayne@7 175 "Unable to record file position for rewinding "
jpayne@7 176 "request body during a redirect/retry."
jpayne@7 177 )
jpayne@7 178 else:
jpayne@7 179 raise ValueError(
jpayne@7 180 f"body_pos must be of type integer, instead it was {type(body_pos)}."
jpayne@7 181 )
jpayne@7 182
jpayne@7 183
jpayne@7 184 class ChunksAndContentLength(typing.NamedTuple):
jpayne@7 185 chunks: typing.Iterable[bytes] | None
jpayne@7 186 content_length: int | None
jpayne@7 187
jpayne@7 188
jpayne@7 189 def body_to_chunks(
jpayne@7 190 body: typing.Any | None, method: str, blocksize: int
jpayne@7 191 ) -> ChunksAndContentLength:
jpayne@7 192 """Takes the HTTP request method, body, and blocksize and
jpayne@7 193 transforms them into an iterable of chunks to pass to
jpayne@7 194 socket.sendall() and an optional 'Content-Length' header.
jpayne@7 195
jpayne@7 196 A 'Content-Length' of 'None' indicates the length of the body
jpayne@7 197 can't be determined so should use 'Transfer-Encoding: chunked'
jpayne@7 198 for framing instead.
jpayne@7 199 """
jpayne@7 200
jpayne@7 201 chunks: typing.Iterable[bytes] | None
jpayne@7 202 content_length: int | None
jpayne@7 203
jpayne@7 204 # No body, we need to make a recommendation on 'Content-Length'
jpayne@7 205 # based on whether that request method is expected to have
jpayne@7 206 # a body or not.
jpayne@7 207 if body is None:
jpayne@7 208 chunks = None
jpayne@7 209 if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
jpayne@7 210 content_length = 0
jpayne@7 211 else:
jpayne@7 212 content_length = None
jpayne@7 213
jpayne@7 214 # Bytes or strings become bytes
jpayne@7 215 elif isinstance(body, (str, bytes)):
jpayne@7 216 chunks = (to_bytes(body),)
jpayne@7 217 content_length = len(chunks[0])
jpayne@7 218
jpayne@7 219 # File-like object, TODO: use seek() and tell() for length?
jpayne@7 220 elif hasattr(body, "read"):
jpayne@7 221
jpayne@7 222 def chunk_readable() -> typing.Iterable[bytes]:
jpayne@7 223 nonlocal body, blocksize
jpayne@7 224 encode = isinstance(body, io.TextIOBase)
jpayne@7 225 while True:
jpayne@7 226 datablock = body.read(blocksize)
jpayne@7 227 if not datablock:
jpayne@7 228 break
jpayne@7 229 if encode:
jpayne@7 230 datablock = datablock.encode("iso-8859-1")
jpayne@7 231 yield datablock
jpayne@7 232
jpayne@7 233 chunks = chunk_readable()
jpayne@7 234 content_length = None
jpayne@7 235
jpayne@7 236 # Otherwise we need to start checking via duck-typing.
jpayne@7 237 else:
jpayne@7 238 try:
jpayne@7 239 # Check if the body implements the buffer API.
jpayne@7 240 mv = memoryview(body)
jpayne@7 241 except TypeError:
jpayne@7 242 try:
jpayne@7 243 # Check if the body is an iterable
jpayne@7 244 chunks = iter(body)
jpayne@7 245 content_length = None
jpayne@7 246 except TypeError:
jpayne@7 247 raise TypeError(
jpayne@7 248 f"'body' must be a bytes-like object, file-like "
jpayne@7 249 f"object, or iterable. Instead was {body!r}"
jpayne@7 250 ) from None
jpayne@7 251 else:
jpayne@7 252 # Since it implements the buffer API can be passed directly to socket.sendall()
jpayne@7 253 chunks = (body,)
jpayne@7 254 content_length = mv.nbytes
jpayne@7 255
jpayne@7 256 return ChunksAndContentLength(chunks=chunks, content_length=content_length)