jpayne@7
|
1 from __future__ import annotations
|
jpayne@7
|
2
|
jpayne@7
|
3 import io
|
jpayne@7
|
4 import typing
|
jpayne@7
|
5 from base64 import b64encode
|
jpayne@7
|
6 from enum import Enum
|
jpayne@7
|
7
|
jpayne@7
|
8 from ..exceptions import UnrewindableBodyError
|
jpayne@7
|
9 from .util import to_bytes
|
jpayne@7
|
10
|
jpayne@7
|
11 if typing.TYPE_CHECKING:
|
jpayne@7
|
12 from typing import Final
|
jpayne@7
|
13
|
jpayne@7
|
14 # Pass as a value within ``headers`` to skip
|
jpayne@7
|
15 # emitting some HTTP headers that are added automatically.
|
jpayne@7
|
16 # The only headers that are supported are ``Accept-Encoding``,
|
jpayne@7
|
17 # ``Host``, and ``User-Agent``.
|
jpayne@7
|
18 SKIP_HEADER = "@@@SKIP_HEADER@@@"
|
jpayne@7
|
19 SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
|
jpayne@7
|
20
|
jpayne@7
|
21 ACCEPT_ENCODING = "gzip,deflate"
|
jpayne@7
|
22 try:
|
jpayne@7
|
23 try:
|
jpayne@7
|
24 import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
|
jpayne@7
|
25 except ImportError:
|
jpayne@7
|
26 import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
|
jpayne@7
|
27 except ImportError:
|
jpayne@7
|
28 pass
|
jpayne@7
|
29 else:
|
jpayne@7
|
30 ACCEPT_ENCODING += ",br"
|
jpayne@7
|
31 try:
|
jpayne@7
|
32 import zstandard as _unused_module_zstd # type: ignore[import-not-found] # noqa: F401
|
jpayne@7
|
33 except ImportError:
|
jpayne@7
|
34 pass
|
jpayne@7
|
35 else:
|
jpayne@7
|
36 ACCEPT_ENCODING += ",zstd"
|
jpayne@7
|
37
|
jpayne@7
|
38
|
jpayne@7
|
39 class _TYPE_FAILEDTELL(Enum):
|
jpayne@7
|
40 token = 0
|
jpayne@7
|
41
|
jpayne@7
|
42
|
jpayne@7
|
43 _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
|
jpayne@7
|
44
|
jpayne@7
|
45 _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
|
jpayne@7
|
46
|
jpayne@7
|
47 # When sending a request with these methods we aren't expecting
|
jpayne@7
|
48 # a body so don't need to set an explicit 'Content-Length: 0'
|
jpayne@7
|
49 # The reason we do this in the negative instead of tracking methods
|
jpayne@7
|
50 # which 'should' have a body is because unknown methods should be
|
jpayne@7
|
51 # treated as if they were 'POST' which *does* expect a body.
|
jpayne@7
|
52 _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
|
jpayne@7
|
53
|
jpayne@7
|
54
|
jpayne@7
|
55 def make_headers(
|
jpayne@7
|
56 keep_alive: bool | None = None,
|
jpayne@7
|
57 accept_encoding: bool | list[str] | str | None = None,
|
jpayne@7
|
58 user_agent: str | None = None,
|
jpayne@7
|
59 basic_auth: str | None = None,
|
jpayne@7
|
60 proxy_basic_auth: str | None = None,
|
jpayne@7
|
61 disable_cache: bool | None = None,
|
jpayne@7
|
62 ) -> dict[str, str]:
|
jpayne@7
|
63 """
|
jpayne@7
|
64 Shortcuts for generating request headers.
|
jpayne@7
|
65
|
jpayne@7
|
66 :param keep_alive:
|
jpayne@7
|
67 If ``True``, adds 'connection: keep-alive' header.
|
jpayne@7
|
68
|
jpayne@7
|
69 :param accept_encoding:
|
jpayne@7
|
70 Can be a boolean, list, or string.
|
jpayne@7
|
71 ``True`` translates to 'gzip,deflate'. If either the ``brotli`` or
|
jpayne@7
|
72 ``brotlicffi`` package is installed 'gzip,deflate,br' is used instead.
|
jpayne@7
|
73 List will get joined by comma.
|
jpayne@7
|
74 String will be used as provided.
|
jpayne@7
|
75
|
jpayne@7
|
76 :param user_agent:
|
jpayne@7
|
77 String representing the user-agent you want, such as
|
jpayne@7
|
78 "python-urllib3/0.6"
|
jpayne@7
|
79
|
jpayne@7
|
80 :param basic_auth:
|
jpayne@7
|
81 Colon-separated username:password string for 'authorization: basic ...'
|
jpayne@7
|
82 auth header.
|
jpayne@7
|
83
|
jpayne@7
|
84 :param proxy_basic_auth:
|
jpayne@7
|
85 Colon-separated username:password string for 'proxy-authorization: basic ...'
|
jpayne@7
|
86 auth header.
|
jpayne@7
|
87
|
jpayne@7
|
88 :param disable_cache:
|
jpayne@7
|
89 If ``True``, adds 'cache-control: no-cache' header.
|
jpayne@7
|
90
|
jpayne@7
|
91 Example:
|
jpayne@7
|
92
|
jpayne@7
|
93 .. code-block:: python
|
jpayne@7
|
94
|
jpayne@7
|
95 import urllib3
|
jpayne@7
|
96
|
jpayne@7
|
97 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
|
jpayne@7
|
98 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
|
jpayne@7
|
99 print(urllib3.util.make_headers(accept_encoding=True))
|
jpayne@7
|
100 # {'accept-encoding': 'gzip,deflate'}
|
jpayne@7
|
101 """
|
jpayne@7
|
102 headers: dict[str, str] = {}
|
jpayne@7
|
103 if accept_encoding:
|
jpayne@7
|
104 if isinstance(accept_encoding, str):
|
jpayne@7
|
105 pass
|
jpayne@7
|
106 elif isinstance(accept_encoding, list):
|
jpayne@7
|
107 accept_encoding = ",".join(accept_encoding)
|
jpayne@7
|
108 else:
|
jpayne@7
|
109 accept_encoding = ACCEPT_ENCODING
|
jpayne@7
|
110 headers["accept-encoding"] = accept_encoding
|
jpayne@7
|
111
|
jpayne@7
|
112 if user_agent:
|
jpayne@7
|
113 headers["user-agent"] = user_agent
|
jpayne@7
|
114
|
jpayne@7
|
115 if keep_alive:
|
jpayne@7
|
116 headers["connection"] = "keep-alive"
|
jpayne@7
|
117
|
jpayne@7
|
118 if basic_auth:
|
jpayne@7
|
119 headers[
|
jpayne@7
|
120 "authorization"
|
jpayne@7
|
121 ] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
|
jpayne@7
|
122
|
jpayne@7
|
123 if proxy_basic_auth:
|
jpayne@7
|
124 headers[
|
jpayne@7
|
125 "proxy-authorization"
|
jpayne@7
|
126 ] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
|
jpayne@7
|
127
|
jpayne@7
|
128 if disable_cache:
|
jpayne@7
|
129 headers["cache-control"] = "no-cache"
|
jpayne@7
|
130
|
jpayne@7
|
131 return headers
|
jpayne@7
|
132
|
jpayne@7
|
133
|
jpayne@7
|
134 def set_file_position(
|
jpayne@7
|
135 body: typing.Any, pos: _TYPE_BODY_POSITION | None
|
jpayne@7
|
136 ) -> _TYPE_BODY_POSITION | None:
|
jpayne@7
|
137 """
|
jpayne@7
|
138 If a position is provided, move file to that point.
|
jpayne@7
|
139 Otherwise, we'll attempt to record a position for future use.
|
jpayne@7
|
140 """
|
jpayne@7
|
141 if pos is not None:
|
jpayne@7
|
142 rewind_body(body, pos)
|
jpayne@7
|
143 elif getattr(body, "tell", None) is not None:
|
jpayne@7
|
144 try:
|
jpayne@7
|
145 pos = body.tell()
|
jpayne@7
|
146 except OSError:
|
jpayne@7
|
147 # This differentiates from None, allowing us to catch
|
jpayne@7
|
148 # a failed `tell()` later when trying to rewind the body.
|
jpayne@7
|
149 pos = _FAILEDTELL
|
jpayne@7
|
150
|
jpayne@7
|
151 return pos
|
jpayne@7
|
152
|
jpayne@7
|
153
|
jpayne@7
|
154 def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
|
jpayne@7
|
155 """
|
jpayne@7
|
156 Attempt to rewind body to a certain position.
|
jpayne@7
|
157 Primarily used for request redirects and retries.
|
jpayne@7
|
158
|
jpayne@7
|
159 :param body:
|
jpayne@7
|
160 File-like object that supports seek.
|
jpayne@7
|
161
|
jpayne@7
|
162 :param int pos:
|
jpayne@7
|
163 Position to seek to in file.
|
jpayne@7
|
164 """
|
jpayne@7
|
165 body_seek = getattr(body, "seek", None)
|
jpayne@7
|
166 if body_seek is not None and isinstance(body_pos, int):
|
jpayne@7
|
167 try:
|
jpayne@7
|
168 body_seek(body_pos)
|
jpayne@7
|
169 except OSError as e:
|
jpayne@7
|
170 raise UnrewindableBodyError(
|
jpayne@7
|
171 "An error occurred when rewinding request body for redirect/retry."
|
jpayne@7
|
172 ) from e
|
jpayne@7
|
173 elif body_pos is _FAILEDTELL:
|
jpayne@7
|
174 raise UnrewindableBodyError(
|
jpayne@7
|
175 "Unable to record file position for rewinding "
|
jpayne@7
|
176 "request body during a redirect/retry."
|
jpayne@7
|
177 )
|
jpayne@7
|
178 else:
|
jpayne@7
|
179 raise ValueError(
|
jpayne@7
|
180 f"body_pos must be of type integer, instead it was {type(body_pos)}."
|
jpayne@7
|
181 )
|
jpayne@7
|
182
|
jpayne@7
|
183
|
jpayne@7
|
184 class ChunksAndContentLength(typing.NamedTuple):
|
jpayne@7
|
185 chunks: typing.Iterable[bytes] | None
|
jpayne@7
|
186 content_length: int | None
|
jpayne@7
|
187
|
jpayne@7
|
188
|
jpayne@7
|
189 def body_to_chunks(
|
jpayne@7
|
190 body: typing.Any | None, method: str, blocksize: int
|
jpayne@7
|
191 ) -> ChunksAndContentLength:
|
jpayne@7
|
192 """Takes the HTTP request method, body, and blocksize and
|
jpayne@7
|
193 transforms them into an iterable of chunks to pass to
|
jpayne@7
|
194 socket.sendall() and an optional 'Content-Length' header.
|
jpayne@7
|
195
|
jpayne@7
|
196 A 'Content-Length' of 'None' indicates the length of the body
|
jpayne@7
|
197 can't be determined so should use 'Transfer-Encoding: chunked'
|
jpayne@7
|
198 for framing instead.
|
jpayne@7
|
199 """
|
jpayne@7
|
200
|
jpayne@7
|
201 chunks: typing.Iterable[bytes] | None
|
jpayne@7
|
202 content_length: int | None
|
jpayne@7
|
203
|
jpayne@7
|
204 # No body, we need to make a recommendation on 'Content-Length'
|
jpayne@7
|
205 # based on whether that request method is expected to have
|
jpayne@7
|
206 # a body or not.
|
jpayne@7
|
207 if body is None:
|
jpayne@7
|
208 chunks = None
|
jpayne@7
|
209 if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
|
jpayne@7
|
210 content_length = 0
|
jpayne@7
|
211 else:
|
jpayne@7
|
212 content_length = None
|
jpayne@7
|
213
|
jpayne@7
|
214 # Bytes or strings become bytes
|
jpayne@7
|
215 elif isinstance(body, (str, bytes)):
|
jpayne@7
|
216 chunks = (to_bytes(body),)
|
jpayne@7
|
217 content_length = len(chunks[0])
|
jpayne@7
|
218
|
jpayne@7
|
219 # File-like object, TODO: use seek() and tell() for length?
|
jpayne@7
|
220 elif hasattr(body, "read"):
|
jpayne@7
|
221
|
jpayne@7
|
222 def chunk_readable() -> typing.Iterable[bytes]:
|
jpayne@7
|
223 nonlocal body, blocksize
|
jpayne@7
|
224 encode = isinstance(body, io.TextIOBase)
|
jpayne@7
|
225 while True:
|
jpayne@7
|
226 datablock = body.read(blocksize)
|
jpayne@7
|
227 if not datablock:
|
jpayne@7
|
228 break
|
jpayne@7
|
229 if encode:
|
jpayne@7
|
230 datablock = datablock.encode("iso-8859-1")
|
jpayne@7
|
231 yield datablock
|
jpayne@7
|
232
|
jpayne@7
|
233 chunks = chunk_readable()
|
jpayne@7
|
234 content_length = None
|
jpayne@7
|
235
|
jpayne@7
|
236 # Otherwise we need to start checking via duck-typing.
|
jpayne@7
|
237 else:
|
jpayne@7
|
238 try:
|
jpayne@7
|
239 # Check if the body implements the buffer API.
|
jpayne@7
|
240 mv = memoryview(body)
|
jpayne@7
|
241 except TypeError:
|
jpayne@7
|
242 try:
|
jpayne@7
|
243 # Check if the body is an iterable
|
jpayne@7
|
244 chunks = iter(body)
|
jpayne@7
|
245 content_length = None
|
jpayne@7
|
246 except TypeError:
|
jpayne@7
|
247 raise TypeError(
|
jpayne@7
|
248 f"'body' must be a bytes-like object, file-like "
|
jpayne@7
|
249 f"object, or iterable. Instead was {body!r}"
|
jpayne@7
|
250 ) from None
|
jpayne@7
|
251 else:
|
jpayne@7
|
252 # Since it implements the buffer API can be passed directly to socket.sendall()
|
jpayne@7
|
253 chunks = (body,)
|
jpayne@7
|
254 content_length = mv.nbytes
|
jpayne@7
|
255
|
jpayne@7
|
256 return ChunksAndContentLength(chunks=chunks, content_length=content_length)
|