Mercurial > repos > jpayne > bioproject_to_srr_2
comparison urllib3/_collections.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 from __future__ import annotations | |
2 | |
3 import typing | |
4 from collections import OrderedDict | |
5 from enum import Enum, auto | |
6 from threading import RLock | |
7 | |
8 if typing.TYPE_CHECKING: | |
9 # We can only import Protocol if TYPE_CHECKING because it's a development | |
10 # dependency, and is not available at runtime. | |
11 from typing import Protocol | |
12 | |
13 from typing_extensions import Self | |
14 | |
15 class HasGettableStringKeys(Protocol): | |
16 def keys(self) -> typing.Iterator[str]: | |
17 ... | |
18 | |
19 def __getitem__(self, key: str) -> str: | |
20 ... | |
21 | |
22 | |
23 __all__ = ["RecentlyUsedContainer", "HTTPHeaderDict"] | |
24 | |
25 | |
26 # Key type | |
27 _KT = typing.TypeVar("_KT") | |
28 # Value type | |
29 _VT = typing.TypeVar("_VT") | |
30 # Default type | |
31 _DT = typing.TypeVar("_DT") | |
32 | |
33 ValidHTTPHeaderSource = typing.Union[ | |
34 "HTTPHeaderDict", | |
35 typing.Mapping[str, str], | |
36 typing.Iterable[typing.Tuple[str, str]], | |
37 "HasGettableStringKeys", | |
38 ] | |
39 | |
40 | |
41 class _Sentinel(Enum): | |
42 not_passed = auto() | |
43 | |
44 | |
45 def ensure_can_construct_http_header_dict( | |
46 potential: object, | |
47 ) -> ValidHTTPHeaderSource | None: | |
48 if isinstance(potential, HTTPHeaderDict): | |
49 return potential | |
50 elif isinstance(potential, typing.Mapping): | |
51 # Full runtime checking of the contents of a Mapping is expensive, so for the | |
52 # purposes of typechecking, we assume that any Mapping is the right shape. | |
53 return typing.cast(typing.Mapping[str, str], potential) | |
54 elif isinstance(potential, typing.Iterable): | |
55 # Similarly to Mapping, full runtime checking of the contents of an Iterable is | |
56 # expensive, so for the purposes of typechecking, we assume that any Iterable | |
57 # is the right shape. | |
58 return typing.cast(typing.Iterable[typing.Tuple[str, str]], potential) | |
59 elif hasattr(potential, "keys") and hasattr(potential, "__getitem__"): | |
60 return typing.cast("HasGettableStringKeys", potential) | |
61 else: | |
62 return None | |
63 | |
64 | |
65 class RecentlyUsedContainer(typing.Generic[_KT, _VT], typing.MutableMapping[_KT, _VT]): | |
66 """ | |
67 Provides a thread-safe dict-like container which maintains up to | |
68 ``maxsize`` keys while throwing away the least-recently-used keys beyond | |
69 ``maxsize``. | |
70 | |
71 :param maxsize: | |
72 Maximum number of recent elements to retain. | |
73 | |
74 :param dispose_func: | |
75 Every time an item is evicted from the container, | |
76 ``dispose_func(value)`` is called. Callback which will get called | |
77 """ | |
78 | |
79 _container: typing.OrderedDict[_KT, _VT] | |
80 _maxsize: int | |
81 dispose_func: typing.Callable[[_VT], None] | None | |
82 lock: RLock | |
83 | |
84 def __init__( | |
85 self, | |
86 maxsize: int = 10, | |
87 dispose_func: typing.Callable[[_VT], None] | None = None, | |
88 ) -> None: | |
89 super().__init__() | |
90 self._maxsize = maxsize | |
91 self.dispose_func = dispose_func | |
92 self._container = OrderedDict() | |
93 self.lock = RLock() | |
94 | |
95 def __getitem__(self, key: _KT) -> _VT: | |
96 # Re-insert the item, moving it to the end of the eviction line. | |
97 with self.lock: | |
98 item = self._container.pop(key) | |
99 self._container[key] = item | |
100 return item | |
101 | |
102 def __setitem__(self, key: _KT, value: _VT) -> None: | |
103 evicted_item = None | |
104 with self.lock: | |
105 # Possibly evict the existing value of 'key' | |
106 try: | |
107 # If the key exists, we'll overwrite it, which won't change the | |
108 # size of the pool. Because accessing a key should move it to | |
109 # the end of the eviction line, we pop it out first. | |
110 evicted_item = key, self._container.pop(key) | |
111 self._container[key] = value | |
112 except KeyError: | |
113 # When the key does not exist, we insert the value first so that | |
114 # evicting works in all cases, including when self._maxsize is 0 | |
115 self._container[key] = value | |
116 if len(self._container) > self._maxsize: | |
117 # If we didn't evict an existing value, and we've hit our maximum | |
118 # size, then we have to evict the least recently used item from | |
119 # the beginning of the container. | |
120 evicted_item = self._container.popitem(last=False) | |
121 | |
122 # After releasing the lock on the pool, dispose of any evicted value. | |
123 if evicted_item is not None and self.dispose_func: | |
124 _, evicted_value = evicted_item | |
125 self.dispose_func(evicted_value) | |
126 | |
127 def __delitem__(self, key: _KT) -> None: | |
128 with self.lock: | |
129 value = self._container.pop(key) | |
130 | |
131 if self.dispose_func: | |
132 self.dispose_func(value) | |
133 | |
134 def __len__(self) -> int: | |
135 with self.lock: | |
136 return len(self._container) | |
137 | |
138 def __iter__(self) -> typing.NoReturn: | |
139 raise NotImplementedError( | |
140 "Iteration over this class is unlikely to be threadsafe." | |
141 ) | |
142 | |
143 def clear(self) -> None: | |
144 with self.lock: | |
145 # Copy pointers to all values, then wipe the mapping | |
146 values = list(self._container.values()) | |
147 self._container.clear() | |
148 | |
149 if self.dispose_func: | |
150 for value in values: | |
151 self.dispose_func(value) | |
152 | |
153 def keys(self) -> set[_KT]: # type: ignore[override] | |
154 with self.lock: | |
155 return set(self._container.keys()) | |
156 | |
157 | |
158 class HTTPHeaderDictItemView(typing.Set[typing.Tuple[str, str]]): | |
159 """ | |
160 HTTPHeaderDict is unusual for a Mapping[str, str] in that it has two modes of | |
161 address. | |
162 | |
163 If we directly try to get an item with a particular name, we will get a string | |
164 back that is the concatenated version of all the values: | |
165 | |
166 >>> d['X-Header-Name'] | |
167 'Value1, Value2, Value3' | |
168 | |
169 However, if we iterate over an HTTPHeaderDict's items, we will optionally combine | |
170 these values based on whether combine=True was called when building up the dictionary | |
171 | |
172 >>> d = HTTPHeaderDict({"A": "1", "B": "foo"}) | |
173 >>> d.add("A", "2", combine=True) | |
174 >>> d.add("B", "bar") | |
175 >>> list(d.items()) | |
176 [ | |
177 ('A', '1, 2'), | |
178 ('B', 'foo'), | |
179 ('B', 'bar'), | |
180 ] | |
181 | |
182 This class conforms to the interface required by the MutableMapping ABC while | |
183 also giving us the nonstandard iteration behavior we want; items with duplicate | |
184 keys, ordered by time of first insertion. | |
185 """ | |
186 | |
187 _headers: HTTPHeaderDict | |
188 | |
189 def __init__(self, headers: HTTPHeaderDict) -> None: | |
190 self._headers = headers | |
191 | |
192 def __len__(self) -> int: | |
193 return len(list(self._headers.iteritems())) | |
194 | |
195 def __iter__(self) -> typing.Iterator[tuple[str, str]]: | |
196 return self._headers.iteritems() | |
197 | |
198 def __contains__(self, item: object) -> bool: | |
199 if isinstance(item, tuple) and len(item) == 2: | |
200 passed_key, passed_val = item | |
201 if isinstance(passed_key, str) and isinstance(passed_val, str): | |
202 return self._headers._has_value_for_header(passed_key, passed_val) | |
203 return False | |
204 | |
205 | |
206 class HTTPHeaderDict(typing.MutableMapping[str, str]): | |
207 """ | |
208 :param headers: | |
209 An iterable of field-value pairs. Must not contain multiple field names | |
210 when compared case-insensitively. | |
211 | |
212 :param kwargs: | |
213 Additional field-value pairs to pass in to ``dict.update``. | |
214 | |
215 A ``dict`` like container for storing HTTP Headers. | |
216 | |
217 Field names are stored and compared case-insensitively in compliance with | |
218 RFC 7230. Iteration provides the first case-sensitive key seen for each | |
219 case-insensitive pair. | |
220 | |
221 Using ``__setitem__`` syntax overwrites fields that compare equal | |
222 case-insensitively in order to maintain ``dict``'s api. For fields that | |
223 compare equal, instead create a new ``HTTPHeaderDict`` and use ``.add`` | |
224 in a loop. | |
225 | |
226 If multiple fields that are equal case-insensitively are passed to the | |
227 constructor or ``.update``, the behavior is undefined and some will be | |
228 lost. | |
229 | |
230 >>> headers = HTTPHeaderDict() | |
231 >>> headers.add('Set-Cookie', 'foo=bar') | |
232 >>> headers.add('set-cookie', 'baz=quxx') | |
233 >>> headers['content-length'] = '7' | |
234 >>> headers['SET-cookie'] | |
235 'foo=bar, baz=quxx' | |
236 >>> headers['Content-Length'] | |
237 '7' | |
238 """ | |
239 | |
240 _container: typing.MutableMapping[str, list[str]] | |
241 | |
242 def __init__(self, headers: ValidHTTPHeaderSource | None = None, **kwargs: str): | |
243 super().__init__() | |
244 self._container = {} # 'dict' is insert-ordered | |
245 if headers is not None: | |
246 if isinstance(headers, HTTPHeaderDict): | |
247 self._copy_from(headers) | |
248 else: | |
249 self.extend(headers) | |
250 if kwargs: | |
251 self.extend(kwargs) | |
252 | |
253 def __setitem__(self, key: str, val: str) -> None: | |
254 # avoid a bytes/str comparison by decoding before httplib | |
255 if isinstance(key, bytes): | |
256 key = key.decode("latin-1") | |
257 self._container[key.lower()] = [key, val] | |
258 | |
259 def __getitem__(self, key: str) -> str: | |
260 val = self._container[key.lower()] | |
261 return ", ".join(val[1:]) | |
262 | |
263 def __delitem__(self, key: str) -> None: | |
264 del self._container[key.lower()] | |
265 | |
266 def __contains__(self, key: object) -> bool: | |
267 if isinstance(key, str): | |
268 return key.lower() in self._container | |
269 return False | |
270 | |
271 def setdefault(self, key: str, default: str = "") -> str: | |
272 return super().setdefault(key, default) | |
273 | |
274 def __eq__(self, other: object) -> bool: | |
275 maybe_constructable = ensure_can_construct_http_header_dict(other) | |
276 if maybe_constructable is None: | |
277 return False | |
278 else: | |
279 other_as_http_header_dict = type(self)(maybe_constructable) | |
280 | |
281 return {k.lower(): v for k, v in self.itermerged()} == { | |
282 k.lower(): v for k, v in other_as_http_header_dict.itermerged() | |
283 } | |
284 | |
285 def __ne__(self, other: object) -> bool: | |
286 return not self.__eq__(other) | |
287 | |
288 def __len__(self) -> int: | |
289 return len(self._container) | |
290 | |
291 def __iter__(self) -> typing.Iterator[str]: | |
292 # Only provide the originally cased names | |
293 for vals in self._container.values(): | |
294 yield vals[0] | |
295 | |
296 def discard(self, key: str) -> None: | |
297 try: | |
298 del self[key] | |
299 except KeyError: | |
300 pass | |
301 | |
302 def add(self, key: str, val: str, *, combine: bool = False) -> None: | |
303 """Adds a (name, value) pair, doesn't overwrite the value if it already | |
304 exists. | |
305 | |
306 If this is called with combine=True, instead of adding a new header value | |
307 as a distinct item during iteration, this will instead append the value to | |
308 any existing header value with a comma. If no existing header value exists | |
309 for the key, then the value will simply be added, ignoring the combine parameter. | |
310 | |
311 >>> headers = HTTPHeaderDict(foo='bar') | |
312 >>> headers.add('Foo', 'baz') | |
313 >>> headers['foo'] | |
314 'bar, baz' | |
315 >>> list(headers.items()) | |
316 [('foo', 'bar'), ('foo', 'baz')] | |
317 >>> headers.add('foo', 'quz', combine=True) | |
318 >>> list(headers.items()) | |
319 [('foo', 'bar, baz, quz')] | |
320 """ | |
321 # avoid a bytes/str comparison by decoding before httplib | |
322 if isinstance(key, bytes): | |
323 key = key.decode("latin-1") | |
324 key_lower = key.lower() | |
325 new_vals = [key, val] | |
326 # Keep the common case aka no item present as fast as possible | |
327 vals = self._container.setdefault(key_lower, new_vals) | |
328 if new_vals is not vals: | |
329 # if there are values here, then there is at least the initial | |
330 # key/value pair | |
331 assert len(vals) >= 2 | |
332 if combine: | |
333 vals[-1] = vals[-1] + ", " + val | |
334 else: | |
335 vals.append(val) | |
336 | |
337 def extend(self, *args: ValidHTTPHeaderSource, **kwargs: str) -> None: | |
338 """Generic import function for any type of header-like object. | |
339 Adapted version of MutableMapping.update in order to insert items | |
340 with self.add instead of self.__setitem__ | |
341 """ | |
342 if len(args) > 1: | |
343 raise TypeError( | |
344 f"extend() takes at most 1 positional arguments ({len(args)} given)" | |
345 ) | |
346 other = args[0] if len(args) >= 1 else () | |
347 | |
348 if isinstance(other, HTTPHeaderDict): | |
349 for key, val in other.iteritems(): | |
350 self.add(key, val) | |
351 elif isinstance(other, typing.Mapping): | |
352 for key, val in other.items(): | |
353 self.add(key, val) | |
354 elif isinstance(other, typing.Iterable): | |
355 other = typing.cast(typing.Iterable[typing.Tuple[str, str]], other) | |
356 for key, value in other: | |
357 self.add(key, value) | |
358 elif hasattr(other, "keys") and hasattr(other, "__getitem__"): | |
359 # THIS IS NOT A TYPESAFE BRANCH | |
360 # In this branch, the object has a `keys` attr but is not a Mapping or any of | |
361 # the other types indicated in the method signature. We do some stuff with | |
362 # it as though it partially implements the Mapping interface, but we're not | |
363 # doing that stuff safely AT ALL. | |
364 for key in other.keys(): | |
365 self.add(key, other[key]) | |
366 | |
367 for key, value in kwargs.items(): | |
368 self.add(key, value) | |
369 | |
370 @typing.overload | |
371 def getlist(self, key: str) -> list[str]: | |
372 ... | |
373 | |
374 @typing.overload | |
375 def getlist(self, key: str, default: _DT) -> list[str] | _DT: | |
376 ... | |
377 | |
378 def getlist( | |
379 self, key: str, default: _Sentinel | _DT = _Sentinel.not_passed | |
380 ) -> list[str] | _DT: | |
381 """Returns a list of all the values for the named field. Returns an | |
382 empty list if the key doesn't exist.""" | |
383 try: | |
384 vals = self._container[key.lower()] | |
385 except KeyError: | |
386 if default is _Sentinel.not_passed: | |
387 # _DT is unbound; empty list is instance of List[str] | |
388 return [] | |
389 # _DT is bound; default is instance of _DT | |
390 return default | |
391 else: | |
392 # _DT may or may not be bound; vals[1:] is instance of List[str], which | |
393 # meets our external interface requirement of `Union[List[str], _DT]`. | |
394 return vals[1:] | |
395 | |
396 def _prepare_for_method_change(self) -> Self: | |
397 """ | |
398 Remove content-specific header fields before changing the request | |
399 method to GET or HEAD according to RFC 9110, Section 15.4. | |
400 """ | |
401 content_specific_headers = [ | |
402 "Content-Encoding", | |
403 "Content-Language", | |
404 "Content-Location", | |
405 "Content-Type", | |
406 "Content-Length", | |
407 "Digest", | |
408 "Last-Modified", | |
409 ] | |
410 for header in content_specific_headers: | |
411 self.discard(header) | |
412 return self | |
413 | |
414 # Backwards compatibility for httplib | |
415 getheaders = getlist | |
416 getallmatchingheaders = getlist | |
417 iget = getlist | |
418 | |
419 # Backwards compatibility for http.cookiejar | |
420 get_all = getlist | |
421 | |
422 def __repr__(self) -> str: | |
423 return f"{type(self).__name__}({dict(self.itermerged())})" | |
424 | |
425 def _copy_from(self, other: HTTPHeaderDict) -> None: | |
426 for key in other: | |
427 val = other.getlist(key) | |
428 self._container[key.lower()] = [key, *val] | |
429 | |
430 def copy(self) -> HTTPHeaderDict: | |
431 clone = type(self)() | |
432 clone._copy_from(self) | |
433 return clone | |
434 | |
435 def iteritems(self) -> typing.Iterator[tuple[str, str]]: | |
436 """Iterate over all header lines, including duplicate ones.""" | |
437 for key in self: | |
438 vals = self._container[key.lower()] | |
439 for val in vals[1:]: | |
440 yield vals[0], val | |
441 | |
442 def itermerged(self) -> typing.Iterator[tuple[str, str]]: | |
443 """Iterate over all headers, merging duplicate ones together.""" | |
444 for key in self: | |
445 val = self._container[key.lower()] | |
446 yield val[0], ", ".join(val[1:]) | |
447 | |
448 def items(self) -> HTTPHeaderDictItemView: # type: ignore[override] | |
449 return HTTPHeaderDictItemView(self) | |
450 | |
451 def _has_value_for_header(self, header_name: str, potential_value: str) -> bool: | |
452 if header_name in self: | |
453 return potential_value in self._container[header_name.lower()][1:] | |
454 return False | |
455 | |
456 def __ior__(self, other: object) -> HTTPHeaderDict: | |
457 # Supports extending a header dict in-place using operator |= | |
458 # combining items with add instead of __setitem__ | |
459 maybe_constructable = ensure_can_construct_http_header_dict(other) | |
460 if maybe_constructable is None: | |
461 return NotImplemented | |
462 self.extend(maybe_constructable) | |
463 return self | |
464 | |
465 def __or__(self, other: object) -> HTTPHeaderDict: | |
466 # Supports merging header dicts using operator | | |
467 # combining items with add instead of __setitem__ | |
468 maybe_constructable = ensure_can_construct_http_header_dict(other) | |
469 if maybe_constructable is None: | |
470 return NotImplemented | |
471 result = self.copy() | |
472 result.extend(maybe_constructable) | |
473 return result | |
474 | |
475 def __ror__(self, other: object) -> HTTPHeaderDict: | |
476 # Supports merging header dicts using operator | when other is on left side | |
477 # combining items with add instead of __setitem__ | |
478 maybe_constructable = ensure_can_construct_http_header_dict(other) | |
479 if maybe_constructable is None: | |
480 return NotImplemented | |
481 result = type(self)(maybe_constructable) | |
482 result.extend(self) | |
483 return result |