Mercurial > repos > jpayne > bioproject_to_srr_2
comparison urllib3/util/request.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 from __future__ import annotations | |
2 | |
3 import io | |
4 import typing | |
5 from base64 import b64encode | |
6 from enum import Enum | |
7 | |
8 from ..exceptions import UnrewindableBodyError | |
9 from .util import to_bytes | |
10 | |
11 if typing.TYPE_CHECKING: | |
12 from typing import Final | |
13 | |
14 # Pass as a value within ``headers`` to skip | |
15 # emitting some HTTP headers that are added automatically. | |
16 # The only headers that are supported are ``Accept-Encoding``, | |
17 # ``Host``, and ``User-Agent``. | |
18 SKIP_HEADER = "@@@SKIP_HEADER@@@" | |
19 SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"]) | |
20 | |
21 ACCEPT_ENCODING = "gzip,deflate" | |
22 try: | |
23 try: | |
24 import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 | |
25 except ImportError: | |
26 import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 | |
27 except ImportError: | |
28 pass | |
29 else: | |
30 ACCEPT_ENCODING += ",br" | |
31 try: | |
32 import zstandard as _unused_module_zstd # type: ignore[import-not-found] # noqa: F401 | |
33 except ImportError: | |
34 pass | |
35 else: | |
36 ACCEPT_ENCODING += ",zstd" | |
37 | |
38 | |
39 class _TYPE_FAILEDTELL(Enum): | |
40 token = 0 | |
41 | |
42 | |
43 _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token | |
44 | |
45 _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL] | |
46 | |
47 # When sending a request with these methods we aren't expecting | |
48 # a body so don't need to set an explicit 'Content-Length: 0' | |
49 # The reason we do this in the negative instead of tracking methods | |
50 # which 'should' have a body is because unknown methods should be | |
51 # treated as if they were 'POST' which *does* expect a body. | |
52 _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"} | |
53 | |
54 | |
55 def make_headers( | |
56 keep_alive: bool | None = None, | |
57 accept_encoding: bool | list[str] | str | None = None, | |
58 user_agent: str | None = None, | |
59 basic_auth: str | None = None, | |
60 proxy_basic_auth: str | None = None, | |
61 disable_cache: bool | None = None, | |
62 ) -> dict[str, str]: | |
63 """ | |
64 Shortcuts for generating request headers. | |
65 | |
66 :param keep_alive: | |
67 If ``True``, adds 'connection: keep-alive' header. | |
68 | |
69 :param accept_encoding: | |
70 Can be a boolean, list, or string. | |
71 ``True`` translates to 'gzip,deflate'. If either the ``brotli`` or | |
72 ``brotlicffi`` package is installed 'gzip,deflate,br' is used instead. | |
73 List will get joined by comma. | |
74 String will be used as provided. | |
75 | |
76 :param user_agent: | |
77 String representing the user-agent you want, such as | |
78 "python-urllib3/0.6" | |
79 | |
80 :param basic_auth: | |
81 Colon-separated username:password string for 'authorization: basic ...' | |
82 auth header. | |
83 | |
84 :param proxy_basic_auth: | |
85 Colon-separated username:password string for 'proxy-authorization: basic ...' | |
86 auth header. | |
87 | |
88 :param disable_cache: | |
89 If ``True``, adds 'cache-control: no-cache' header. | |
90 | |
91 Example: | |
92 | |
93 .. code-block:: python | |
94 | |
95 import urllib3 | |
96 | |
97 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0")) | |
98 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'} | |
99 print(urllib3.util.make_headers(accept_encoding=True)) | |
100 # {'accept-encoding': 'gzip,deflate'} | |
101 """ | |
102 headers: dict[str, str] = {} | |
103 if accept_encoding: | |
104 if isinstance(accept_encoding, str): | |
105 pass | |
106 elif isinstance(accept_encoding, list): | |
107 accept_encoding = ",".join(accept_encoding) | |
108 else: | |
109 accept_encoding = ACCEPT_ENCODING | |
110 headers["accept-encoding"] = accept_encoding | |
111 | |
112 if user_agent: | |
113 headers["user-agent"] = user_agent | |
114 | |
115 if keep_alive: | |
116 headers["connection"] = "keep-alive" | |
117 | |
118 if basic_auth: | |
119 headers[ | |
120 "authorization" | |
121 ] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}" | |
122 | |
123 if proxy_basic_auth: | |
124 headers[ | |
125 "proxy-authorization" | |
126 ] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}" | |
127 | |
128 if disable_cache: | |
129 headers["cache-control"] = "no-cache" | |
130 | |
131 return headers | |
132 | |
133 | |
134 def set_file_position( | |
135 body: typing.Any, pos: _TYPE_BODY_POSITION | None | |
136 ) -> _TYPE_BODY_POSITION | None: | |
137 """ | |
138 If a position is provided, move file to that point. | |
139 Otherwise, we'll attempt to record a position for future use. | |
140 """ | |
141 if pos is not None: | |
142 rewind_body(body, pos) | |
143 elif getattr(body, "tell", None) is not None: | |
144 try: | |
145 pos = body.tell() | |
146 except OSError: | |
147 # This differentiates from None, allowing us to catch | |
148 # a failed `tell()` later when trying to rewind the body. | |
149 pos = _FAILEDTELL | |
150 | |
151 return pos | |
152 | |
153 | |
154 def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None: | |
155 """ | |
156 Attempt to rewind body to a certain position. | |
157 Primarily used for request redirects and retries. | |
158 | |
159 :param body: | |
160 File-like object that supports seek. | |
161 | |
162 :param int pos: | |
163 Position to seek to in file. | |
164 """ | |
165 body_seek = getattr(body, "seek", None) | |
166 if body_seek is not None and isinstance(body_pos, int): | |
167 try: | |
168 body_seek(body_pos) | |
169 except OSError as e: | |
170 raise UnrewindableBodyError( | |
171 "An error occurred when rewinding request body for redirect/retry." | |
172 ) from e | |
173 elif body_pos is _FAILEDTELL: | |
174 raise UnrewindableBodyError( | |
175 "Unable to record file position for rewinding " | |
176 "request body during a redirect/retry." | |
177 ) | |
178 else: | |
179 raise ValueError( | |
180 f"body_pos must be of type integer, instead it was {type(body_pos)}." | |
181 ) | |
182 | |
183 | |
184 class ChunksAndContentLength(typing.NamedTuple): | |
185 chunks: typing.Iterable[bytes] | None | |
186 content_length: int | None | |
187 | |
188 | |
189 def body_to_chunks( | |
190 body: typing.Any | None, method: str, blocksize: int | |
191 ) -> ChunksAndContentLength: | |
192 """Takes the HTTP request method, body, and blocksize and | |
193 transforms them into an iterable of chunks to pass to | |
194 socket.sendall() and an optional 'Content-Length' header. | |
195 | |
196 A 'Content-Length' of 'None' indicates the length of the body | |
197 can't be determined so should use 'Transfer-Encoding: chunked' | |
198 for framing instead. | |
199 """ | |
200 | |
201 chunks: typing.Iterable[bytes] | None | |
202 content_length: int | None | |
203 | |
204 # No body, we need to make a recommendation on 'Content-Length' | |
205 # based on whether that request method is expected to have | |
206 # a body or not. | |
207 if body is None: | |
208 chunks = None | |
209 if method.upper() not in _METHODS_NOT_EXPECTING_BODY: | |
210 content_length = 0 | |
211 else: | |
212 content_length = None | |
213 | |
214 # Bytes or strings become bytes | |
215 elif isinstance(body, (str, bytes)): | |
216 chunks = (to_bytes(body),) | |
217 content_length = len(chunks[0]) | |
218 | |
219 # File-like object, TODO: use seek() and tell() for length? | |
220 elif hasattr(body, "read"): | |
221 | |
222 def chunk_readable() -> typing.Iterable[bytes]: | |
223 nonlocal body, blocksize | |
224 encode = isinstance(body, io.TextIOBase) | |
225 while True: | |
226 datablock = body.read(blocksize) | |
227 if not datablock: | |
228 break | |
229 if encode: | |
230 datablock = datablock.encode("iso-8859-1") | |
231 yield datablock | |
232 | |
233 chunks = chunk_readable() | |
234 content_length = None | |
235 | |
236 # Otherwise we need to start checking via duck-typing. | |
237 else: | |
238 try: | |
239 # Check if the body implements the buffer API. | |
240 mv = memoryview(body) | |
241 except TypeError: | |
242 try: | |
243 # Check if the body is an iterable | |
244 chunks = iter(body) | |
245 content_length = None | |
246 except TypeError: | |
247 raise TypeError( | |
248 f"'body' must be a bytes-like object, file-like " | |
249 f"object, or iterable. Instead was {body!r}" | |
250 ) from None | |
251 else: | |
252 # Since it implements the buffer API can be passed directly to socket.sendall() | |
253 chunks = (body,) | |
254 content_length = mv.nbytes | |
255 | |
256 return ChunksAndContentLength(chunks=chunks, content_length=content_length) |