Mercurial > repos > jpayne > bioproject_to_srr_2
comparison requests/cookies.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 """ | |
2 requests.cookies | |
3 ~~~~~~~~~~~~~~~~ | |
4 | |
5 Compatibility code to be able to use `cookielib.CookieJar` with requests. | |
6 | |
7 requests.utils imports from here, so be careful with imports. | |
8 """ | |
9 | |
10 import calendar | |
11 import copy | |
12 import time | |
13 | |
14 from ._internal_utils import to_native_string | |
15 from .compat import Morsel, MutableMapping, cookielib, urlparse, urlunparse | |
16 | |
17 try: | |
18 import threading | |
19 except ImportError: | |
20 import dummy_threading as threading | |
21 | |
22 | |
23 class MockRequest: | |
24 """Wraps a `requests.Request` to mimic a `urllib2.Request`. | |
25 | |
26 The code in `cookielib.CookieJar` expects this interface in order to correctly | |
27 manage cookie policies, i.e., determine whether a cookie can be set, given the | |
28 domains of the request and the cookie. | |
29 | |
30 The original request object is read-only. The client is responsible for collecting | |
31 the new headers via `get_new_headers()` and interpreting them appropriately. You | |
32 probably want `get_cookie_header`, defined below. | |
33 """ | |
34 | |
35 def __init__(self, request): | |
36 self._r = request | |
37 self._new_headers = {} | |
38 self.type = urlparse(self._r.url).scheme | |
39 | |
40 def get_type(self): | |
41 return self.type | |
42 | |
43 def get_host(self): | |
44 return urlparse(self._r.url).netloc | |
45 | |
46 def get_origin_req_host(self): | |
47 return self.get_host() | |
48 | |
49 def get_full_url(self): | |
50 # Only return the response's URL if the user hadn't set the Host | |
51 # header | |
52 if not self._r.headers.get("Host"): | |
53 return self._r.url | |
54 # If they did set it, retrieve it and reconstruct the expected domain | |
55 host = to_native_string(self._r.headers["Host"], encoding="utf-8") | |
56 parsed = urlparse(self._r.url) | |
57 # Reconstruct the URL as we expect it | |
58 return urlunparse( | |
59 [ | |
60 parsed.scheme, | |
61 host, | |
62 parsed.path, | |
63 parsed.params, | |
64 parsed.query, | |
65 parsed.fragment, | |
66 ] | |
67 ) | |
68 | |
69 def is_unverifiable(self): | |
70 return True | |
71 | |
72 def has_header(self, name): | |
73 return name in self._r.headers or name in self._new_headers | |
74 | |
75 def get_header(self, name, default=None): | |
76 return self._r.headers.get(name, self._new_headers.get(name, default)) | |
77 | |
78 def add_header(self, key, val): | |
79 """cookielib has no legitimate use for this method; add it back if you find one.""" | |
80 raise NotImplementedError( | |
81 "Cookie headers should be added with add_unredirected_header()" | |
82 ) | |
83 | |
84 def add_unredirected_header(self, name, value): | |
85 self._new_headers[name] = value | |
86 | |
87 def get_new_headers(self): | |
88 return self._new_headers | |
89 | |
90 @property | |
91 def unverifiable(self): | |
92 return self.is_unverifiable() | |
93 | |
94 @property | |
95 def origin_req_host(self): | |
96 return self.get_origin_req_host() | |
97 | |
98 @property | |
99 def host(self): | |
100 return self.get_host() | |
101 | |
102 | |
103 class MockResponse: | |
104 """Wraps a `httplib.HTTPMessage` to mimic a `urllib.addinfourl`. | |
105 | |
106 ...what? Basically, expose the parsed HTTP headers from the server response | |
107 the way `cookielib` expects to see them. | |
108 """ | |
109 | |
110 def __init__(self, headers): | |
111 """Make a MockResponse for `cookielib` to read. | |
112 | |
113 :param headers: a httplib.HTTPMessage or analogous carrying the headers | |
114 """ | |
115 self._headers = headers | |
116 | |
117 def info(self): | |
118 return self._headers | |
119 | |
120 def getheaders(self, name): | |
121 self._headers.getheaders(name) | |
122 | |
123 | |
124 def extract_cookies_to_jar(jar, request, response): | |
125 """Extract the cookies from the response into a CookieJar. | |
126 | |
127 :param jar: cookielib.CookieJar (not necessarily a RequestsCookieJar) | |
128 :param request: our own requests.Request object | |
129 :param response: urllib3.HTTPResponse object | |
130 """ | |
131 if not (hasattr(response, "_original_response") and response._original_response): | |
132 return | |
133 # the _original_response field is the wrapped httplib.HTTPResponse object, | |
134 req = MockRequest(request) | |
135 # pull out the HTTPMessage with the headers and put it in the mock: | |
136 res = MockResponse(response._original_response.msg) | |
137 jar.extract_cookies(res, req) | |
138 | |
139 | |
140 def get_cookie_header(jar, request): | |
141 """ | |
142 Produce an appropriate Cookie header string to be sent with `request`, or None. | |
143 | |
144 :rtype: str | |
145 """ | |
146 r = MockRequest(request) | |
147 jar.add_cookie_header(r) | |
148 return r.get_new_headers().get("Cookie") | |
149 | |
150 | |
151 def remove_cookie_by_name(cookiejar, name, domain=None, path=None): | |
152 """Unsets a cookie by name, by default over all domains and paths. | |
153 | |
154 Wraps CookieJar.clear(), is O(n). | |
155 """ | |
156 clearables = [] | |
157 for cookie in cookiejar: | |
158 if cookie.name != name: | |
159 continue | |
160 if domain is not None and domain != cookie.domain: | |
161 continue | |
162 if path is not None and path != cookie.path: | |
163 continue | |
164 clearables.append((cookie.domain, cookie.path, cookie.name)) | |
165 | |
166 for domain, path, name in clearables: | |
167 cookiejar.clear(domain, path, name) | |
168 | |
169 | |
170 class CookieConflictError(RuntimeError): | |
171 """There are two cookies that meet the criteria specified in the cookie jar. | |
172 Use .get and .set and include domain and path args in order to be more specific. | |
173 """ | |
174 | |
175 | |
176 class RequestsCookieJar(cookielib.CookieJar, MutableMapping): | |
177 """Compatibility class; is a cookielib.CookieJar, but exposes a dict | |
178 interface. | |
179 | |
180 This is the CookieJar we create by default for requests and sessions that | |
181 don't specify one, since some clients may expect response.cookies and | |
182 session.cookies to support dict operations. | |
183 | |
184 Requests does not use the dict interface internally; it's just for | |
185 compatibility with external client code. All requests code should work | |
186 out of the box with externally provided instances of ``CookieJar``, e.g. | |
187 ``LWPCookieJar`` and ``FileCookieJar``. | |
188 | |
189 Unlike a regular CookieJar, this class is pickleable. | |
190 | |
191 .. warning:: dictionary operations that are normally O(1) may be O(n). | |
192 """ | |
193 | |
194 def get(self, name, default=None, domain=None, path=None): | |
195 """Dict-like get() that also supports optional domain and path args in | |
196 order to resolve naming collisions from using one cookie jar over | |
197 multiple domains. | |
198 | |
199 .. warning:: operation is O(n), not O(1). | |
200 """ | |
201 try: | |
202 return self._find_no_duplicates(name, domain, path) | |
203 except KeyError: | |
204 return default | |
205 | |
206 def set(self, name, value, **kwargs): | |
207 """Dict-like set() that also supports optional domain and path args in | |
208 order to resolve naming collisions from using one cookie jar over | |
209 multiple domains. | |
210 """ | |
211 # support client code that unsets cookies by assignment of a None value: | |
212 if value is None: | |
213 remove_cookie_by_name( | |
214 self, name, domain=kwargs.get("domain"), path=kwargs.get("path") | |
215 ) | |
216 return | |
217 | |
218 if isinstance(value, Morsel): | |
219 c = morsel_to_cookie(value) | |
220 else: | |
221 c = create_cookie(name, value, **kwargs) | |
222 self.set_cookie(c) | |
223 return c | |
224 | |
225 def iterkeys(self): | |
226 """Dict-like iterkeys() that returns an iterator of names of cookies | |
227 from the jar. | |
228 | |
229 .. seealso:: itervalues() and iteritems(). | |
230 """ | |
231 for cookie in iter(self): | |
232 yield cookie.name | |
233 | |
234 def keys(self): | |
235 """Dict-like keys() that returns a list of names of cookies from the | |
236 jar. | |
237 | |
238 .. seealso:: values() and items(). | |
239 """ | |
240 return list(self.iterkeys()) | |
241 | |
242 def itervalues(self): | |
243 """Dict-like itervalues() that returns an iterator of values of cookies | |
244 from the jar. | |
245 | |
246 .. seealso:: iterkeys() and iteritems(). | |
247 """ | |
248 for cookie in iter(self): | |
249 yield cookie.value | |
250 | |
251 def values(self): | |
252 """Dict-like values() that returns a list of values of cookies from the | |
253 jar. | |
254 | |
255 .. seealso:: keys() and items(). | |
256 """ | |
257 return list(self.itervalues()) | |
258 | |
259 def iteritems(self): | |
260 """Dict-like iteritems() that returns an iterator of name-value tuples | |
261 from the jar. | |
262 | |
263 .. seealso:: iterkeys() and itervalues(). | |
264 """ | |
265 for cookie in iter(self): | |
266 yield cookie.name, cookie.value | |
267 | |
268 def items(self): | |
269 """Dict-like items() that returns a list of name-value tuples from the | |
270 jar. Allows client-code to call ``dict(RequestsCookieJar)`` and get a | |
271 vanilla python dict of key value pairs. | |
272 | |
273 .. seealso:: keys() and values(). | |
274 """ | |
275 return list(self.iteritems()) | |
276 | |
277 def list_domains(self): | |
278 """Utility method to list all the domains in the jar.""" | |
279 domains = [] | |
280 for cookie in iter(self): | |
281 if cookie.domain not in domains: | |
282 domains.append(cookie.domain) | |
283 return domains | |
284 | |
285 def list_paths(self): | |
286 """Utility method to list all the paths in the jar.""" | |
287 paths = [] | |
288 for cookie in iter(self): | |
289 if cookie.path not in paths: | |
290 paths.append(cookie.path) | |
291 return paths | |
292 | |
293 def multiple_domains(self): | |
294 """Returns True if there are multiple domains in the jar. | |
295 Returns False otherwise. | |
296 | |
297 :rtype: bool | |
298 """ | |
299 domains = [] | |
300 for cookie in iter(self): | |
301 if cookie.domain is not None and cookie.domain in domains: | |
302 return True | |
303 domains.append(cookie.domain) | |
304 return False # there is only one domain in jar | |
305 | |
306 def get_dict(self, domain=None, path=None): | |
307 """Takes as an argument an optional domain and path and returns a plain | |
308 old Python dict of name-value pairs of cookies that meet the | |
309 requirements. | |
310 | |
311 :rtype: dict | |
312 """ | |
313 dictionary = {} | |
314 for cookie in iter(self): | |
315 if (domain is None or cookie.domain == domain) and ( | |
316 path is None or cookie.path == path | |
317 ): | |
318 dictionary[cookie.name] = cookie.value | |
319 return dictionary | |
320 | |
321 def __contains__(self, name): | |
322 try: | |
323 return super().__contains__(name) | |
324 except CookieConflictError: | |
325 return True | |
326 | |
327 def __getitem__(self, name): | |
328 """Dict-like __getitem__() for compatibility with client code. Throws | |
329 exception if there are more than one cookie with name. In that case, | |
330 use the more explicit get() method instead. | |
331 | |
332 .. warning:: operation is O(n), not O(1). | |
333 """ | |
334 return self._find_no_duplicates(name) | |
335 | |
336 def __setitem__(self, name, value): | |
337 """Dict-like __setitem__ for compatibility with client code. Throws | |
338 exception if there is already a cookie of that name in the jar. In that | |
339 case, use the more explicit set() method instead. | |
340 """ | |
341 self.set(name, value) | |
342 | |
343 def __delitem__(self, name): | |
344 """Deletes a cookie given a name. Wraps ``cookielib.CookieJar``'s | |
345 ``remove_cookie_by_name()``. | |
346 """ | |
347 remove_cookie_by_name(self, name) | |
348 | |
349 def set_cookie(self, cookie, *args, **kwargs): | |
350 if ( | |
351 hasattr(cookie.value, "startswith") | |
352 and cookie.value.startswith('"') | |
353 and cookie.value.endswith('"') | |
354 ): | |
355 cookie.value = cookie.value.replace('\\"', "") | |
356 return super().set_cookie(cookie, *args, **kwargs) | |
357 | |
358 def update(self, other): | |
359 """Updates this jar with cookies from another CookieJar or dict-like""" | |
360 if isinstance(other, cookielib.CookieJar): | |
361 for cookie in other: | |
362 self.set_cookie(copy.copy(cookie)) | |
363 else: | |
364 super().update(other) | |
365 | |
366 def _find(self, name, domain=None, path=None): | |
367 """Requests uses this method internally to get cookie values. | |
368 | |
369 If there are conflicting cookies, _find arbitrarily chooses one. | |
370 See _find_no_duplicates if you want an exception thrown if there are | |
371 conflicting cookies. | |
372 | |
373 :param name: a string containing name of cookie | |
374 :param domain: (optional) string containing domain of cookie | |
375 :param path: (optional) string containing path of cookie | |
376 :return: cookie.value | |
377 """ | |
378 for cookie in iter(self): | |
379 if cookie.name == name: | |
380 if domain is None or cookie.domain == domain: | |
381 if path is None or cookie.path == path: | |
382 return cookie.value | |
383 | |
384 raise KeyError(f"name={name!r}, domain={domain!r}, path={path!r}") | |
385 | |
386 def _find_no_duplicates(self, name, domain=None, path=None): | |
387 """Both ``__get_item__`` and ``get`` call this function: it's never | |
388 used elsewhere in Requests. | |
389 | |
390 :param name: a string containing name of cookie | |
391 :param domain: (optional) string containing domain of cookie | |
392 :param path: (optional) string containing path of cookie | |
393 :raises KeyError: if cookie is not found | |
394 :raises CookieConflictError: if there are multiple cookies | |
395 that match name and optionally domain and path | |
396 :return: cookie.value | |
397 """ | |
398 toReturn = None | |
399 for cookie in iter(self): | |
400 if cookie.name == name: | |
401 if domain is None or cookie.domain == domain: | |
402 if path is None or cookie.path == path: | |
403 if toReturn is not None: | |
404 # if there are multiple cookies that meet passed in criteria | |
405 raise CookieConflictError( | |
406 f"There are multiple cookies with name, {name!r}" | |
407 ) | |
408 # we will eventually return this as long as no cookie conflict | |
409 toReturn = cookie.value | |
410 | |
411 if toReturn: | |
412 return toReturn | |
413 raise KeyError(f"name={name!r}, domain={domain!r}, path={path!r}") | |
414 | |
415 def __getstate__(self): | |
416 """Unlike a normal CookieJar, this class is pickleable.""" | |
417 state = self.__dict__.copy() | |
418 # remove the unpickleable RLock object | |
419 state.pop("_cookies_lock") | |
420 return state | |
421 | |
422 def __setstate__(self, state): | |
423 """Unlike a normal CookieJar, this class is pickleable.""" | |
424 self.__dict__.update(state) | |
425 if "_cookies_lock" not in self.__dict__: | |
426 self._cookies_lock = threading.RLock() | |
427 | |
428 def copy(self): | |
429 """Return a copy of this RequestsCookieJar.""" | |
430 new_cj = RequestsCookieJar() | |
431 new_cj.set_policy(self.get_policy()) | |
432 new_cj.update(self) | |
433 return new_cj | |
434 | |
435 def get_policy(self): | |
436 """Return the CookiePolicy instance used.""" | |
437 return self._policy | |
438 | |
439 | |
440 def _copy_cookie_jar(jar): | |
441 if jar is None: | |
442 return None | |
443 | |
444 if hasattr(jar, "copy"): | |
445 # We're dealing with an instance of RequestsCookieJar | |
446 return jar.copy() | |
447 # We're dealing with a generic CookieJar instance | |
448 new_jar = copy.copy(jar) | |
449 new_jar.clear() | |
450 for cookie in jar: | |
451 new_jar.set_cookie(copy.copy(cookie)) | |
452 return new_jar | |
453 | |
454 | |
455 def create_cookie(name, value, **kwargs): | |
456 """Make a cookie from underspecified parameters. | |
457 | |
458 By default, the pair of `name` and `value` will be set for the domain '' | |
459 and sent on every request (this is sometimes called a "supercookie"). | |
460 """ | |
461 result = { | |
462 "version": 0, | |
463 "name": name, | |
464 "value": value, | |
465 "port": None, | |
466 "domain": "", | |
467 "path": "/", | |
468 "secure": False, | |
469 "expires": None, | |
470 "discard": True, | |
471 "comment": None, | |
472 "comment_url": None, | |
473 "rest": {"HttpOnly": None}, | |
474 "rfc2109": False, | |
475 } | |
476 | |
477 badargs = set(kwargs) - set(result) | |
478 if badargs: | |
479 raise TypeError( | |
480 f"create_cookie() got unexpected keyword arguments: {list(badargs)}" | |
481 ) | |
482 | |
483 result.update(kwargs) | |
484 result["port_specified"] = bool(result["port"]) | |
485 result["domain_specified"] = bool(result["domain"]) | |
486 result["domain_initial_dot"] = result["domain"].startswith(".") | |
487 result["path_specified"] = bool(result["path"]) | |
488 | |
489 return cookielib.Cookie(**result) | |
490 | |
491 | |
492 def morsel_to_cookie(morsel): | |
493 """Convert a Morsel object into a Cookie containing the one k/v pair.""" | |
494 | |
495 expires = None | |
496 if morsel["max-age"]: | |
497 try: | |
498 expires = int(time.time() + int(morsel["max-age"])) | |
499 except ValueError: | |
500 raise TypeError(f"max-age: {morsel['max-age']} must be integer") | |
501 elif morsel["expires"]: | |
502 time_template = "%a, %d-%b-%Y %H:%M:%S GMT" | |
503 expires = calendar.timegm(time.strptime(morsel["expires"], time_template)) | |
504 return create_cookie( | |
505 comment=morsel["comment"], | |
506 comment_url=bool(morsel["comment"]), | |
507 discard=False, | |
508 domain=morsel["domain"], | |
509 expires=expires, | |
510 name=morsel.key, | |
511 path=morsel["path"], | |
512 port=None, | |
513 rest={"HttpOnly": morsel["httponly"]}, | |
514 rfc2109=False, | |
515 secure=bool(morsel["secure"]), | |
516 value=morsel.value, | |
517 version=morsel["version"] or 0, | |
518 ) | |
519 | |
520 | |
521 def cookiejar_from_dict(cookie_dict, cookiejar=None, overwrite=True): | |
522 """Returns a CookieJar from a key/value dictionary. | |
523 | |
524 :param cookie_dict: Dict of key/values to insert into CookieJar. | |
525 :param cookiejar: (optional) A cookiejar to add the cookies to. | |
526 :param overwrite: (optional) If False, will not replace cookies | |
527 already in the jar with new ones. | |
528 :rtype: CookieJar | |
529 """ | |
530 if cookiejar is None: | |
531 cookiejar = RequestsCookieJar() | |
532 | |
533 if cookie_dict is not None: | |
534 names_from_jar = [cookie.name for cookie in cookiejar] | |
535 for name in cookie_dict: | |
536 if overwrite or (name not in names_from_jar): | |
537 cookiejar.set_cookie(create_cookie(name, cookie_dict[name])) | |
538 | |
539 return cookiejar | |
540 | |
541 | |
542 def merge_cookies(cookiejar, cookies): | |
543 """Add cookies to cookiejar and returns a merged CookieJar. | |
544 | |
545 :param cookiejar: CookieJar object to add the cookies to. | |
546 :param cookies: Dictionary or CookieJar object to be added. | |
547 :rtype: CookieJar | |
548 """ | |
549 if not isinstance(cookiejar, cookielib.CookieJar): | |
550 raise ValueError("You can only merge into CookieJar") | |
551 | |
552 if isinstance(cookies, dict): | |
553 cookiejar = cookiejar_from_dict(cookies, cookiejar=cookiejar, overwrite=False) | |
554 elif isinstance(cookies, cookielib.CookieJar): | |
555 try: | |
556 cookiejar.update(cookies) | |
557 except AttributeError: | |
558 for cookie_in_jar in cookies: | |
559 cookiejar.set_cookie(cookie_in_jar) | |
560 | |
561 return cookiejar |