Mercurial > repos > jpayne > bioproject_to_srr_2
comparison requests/utils.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 """ | |
2 requests.utils | |
3 ~~~~~~~~~~~~~~ | |
4 | |
5 This module provides utility functions that are used within Requests | |
6 that are also useful for external consumption. | |
7 """ | |
8 | |
9 import codecs | |
10 import contextlib | |
11 import io | |
12 import os | |
13 import re | |
14 import socket | |
15 import struct | |
16 import sys | |
17 import tempfile | |
18 import warnings | |
19 import zipfile | |
20 from collections import OrderedDict | |
21 | |
22 from urllib3.util import make_headers, parse_url | |
23 | |
24 from . import certs | |
25 from .__version__ import __version__ | |
26 | |
27 # to_native_string is unused here, but imported here for backwards compatibility | |
28 from ._internal_utils import ( # noqa: F401 | |
29 _HEADER_VALIDATORS_BYTE, | |
30 _HEADER_VALIDATORS_STR, | |
31 HEADER_VALIDATORS, | |
32 to_native_string, | |
33 ) | |
34 from .compat import ( | |
35 Mapping, | |
36 basestring, | |
37 bytes, | |
38 getproxies, | |
39 getproxies_environment, | |
40 integer_types, | |
41 ) | |
42 from .compat import parse_http_list as _parse_list_header | |
43 from .compat import ( | |
44 proxy_bypass, | |
45 proxy_bypass_environment, | |
46 quote, | |
47 str, | |
48 unquote, | |
49 urlparse, | |
50 urlunparse, | |
51 ) | |
52 from .cookies import cookiejar_from_dict | |
53 from .exceptions import ( | |
54 FileModeWarning, | |
55 InvalidHeader, | |
56 InvalidURL, | |
57 UnrewindableBodyError, | |
58 ) | |
59 from .structures import CaseInsensitiveDict | |
60 | |
61 NETRC_FILES = (".netrc", "_netrc") | |
62 | |
63 DEFAULT_CA_BUNDLE_PATH = certs.where() | |
64 | |
65 DEFAULT_PORTS = {"http": 80, "https": 443} | |
66 | |
67 # Ensure that ', ' is used to preserve previous delimiter behavior. | |
68 DEFAULT_ACCEPT_ENCODING = ", ".join( | |
69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) | |
70 ) | |
71 | |
72 | |
73 if sys.platform == "win32": | |
74 # provide a proxy_bypass version on Windows without DNS lookups | |
75 | |
76 def proxy_bypass_registry(host): | |
77 try: | |
78 import winreg | |
79 except ImportError: | |
80 return False | |
81 | |
82 try: | |
83 internetSettings = winreg.OpenKey( | |
84 winreg.HKEY_CURRENT_USER, | |
85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", | |
86 ) | |
87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it | |
88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) | |
89 # ProxyOverride is almost always a string | |
90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] | |
91 except (OSError, ValueError): | |
92 return False | |
93 if not proxyEnable or not proxyOverride: | |
94 return False | |
95 | |
96 # make a check value list from the registry entry: replace the | |
97 # '<local>' string by the localhost entry and the corresponding | |
98 # canonical entry. | |
99 proxyOverride = proxyOverride.split(";") | |
100 # now check if we match one of the registry values. | |
101 for test in proxyOverride: | |
102 if test == "<local>": | |
103 if "." not in host: | |
104 return True | |
105 test = test.replace(".", r"\.") # mask dots | |
106 test = test.replace("*", r".*") # change glob sequence | |
107 test = test.replace("?", r".") # change glob char | |
108 if re.match(test, host, re.I): | |
109 return True | |
110 return False | |
111 | |
112 def proxy_bypass(host): # noqa | |
113 """Return True, if the host should be bypassed. | |
114 | |
115 Checks proxy settings gathered from the environment, if specified, | |
116 or the registry. | |
117 """ | |
118 if getproxies_environment(): | |
119 return proxy_bypass_environment(host) | |
120 else: | |
121 return proxy_bypass_registry(host) | |
122 | |
123 | |
124 def dict_to_sequence(d): | |
125 """Returns an internal sequence dictionary update.""" | |
126 | |
127 if hasattr(d, "items"): | |
128 d = d.items() | |
129 | |
130 return d | |
131 | |
132 | |
133 def super_len(o): | |
134 total_length = None | |
135 current_position = 0 | |
136 | |
137 if hasattr(o, "__len__"): | |
138 total_length = len(o) | |
139 | |
140 elif hasattr(o, "len"): | |
141 total_length = o.len | |
142 | |
143 elif hasattr(o, "fileno"): | |
144 try: | |
145 fileno = o.fileno() | |
146 except (io.UnsupportedOperation, AttributeError): | |
147 # AttributeError is a surprising exception, seeing as how we've just checked | |
148 # that `hasattr(o, 'fileno')`. It happens for objects obtained via | |
149 # `Tarfile.extractfile()`, per issue 5229. | |
150 pass | |
151 else: | |
152 total_length = os.fstat(fileno).st_size | |
153 | |
154 # Having used fstat to determine the file length, we need to | |
155 # confirm that this file was opened up in binary mode. | |
156 if "b" not in o.mode: | |
157 warnings.warn( | |
158 ( | |
159 "Requests has determined the content-length for this " | |
160 "request using the binary size of the file: however, the " | |
161 "file has been opened in text mode (i.e. without the 'b' " | |
162 "flag in the mode). This may lead to an incorrect " | |
163 "content-length. In Requests 3.0, support will be removed " | |
164 "for files in text mode." | |
165 ), | |
166 FileModeWarning, | |
167 ) | |
168 | |
169 if hasattr(o, "tell"): | |
170 try: | |
171 current_position = o.tell() | |
172 except OSError: | |
173 # This can happen in some weird situations, such as when the file | |
174 # is actually a special file descriptor like stdin. In this | |
175 # instance, we don't know what the length is, so set it to zero and | |
176 # let requests chunk it instead. | |
177 if total_length is not None: | |
178 current_position = total_length | |
179 else: | |
180 if hasattr(o, "seek") and total_length is None: | |
181 # StringIO and BytesIO have seek but no usable fileno | |
182 try: | |
183 # seek to end of file | |
184 o.seek(0, 2) | |
185 total_length = o.tell() | |
186 | |
187 # seek back to current position to support | |
188 # partially read file-like objects | |
189 o.seek(current_position or 0) | |
190 except OSError: | |
191 total_length = 0 | |
192 | |
193 if total_length is None: | |
194 total_length = 0 | |
195 | |
196 return max(0, total_length - current_position) | |
197 | |
198 | |
199 def get_netrc_auth(url, raise_errors=False): | |
200 """Returns the Requests tuple auth for a given url from netrc.""" | |
201 | |
202 netrc_file = os.environ.get("NETRC") | |
203 if netrc_file is not None: | |
204 netrc_locations = (netrc_file,) | |
205 else: | |
206 netrc_locations = (f"~/{f}" for f in NETRC_FILES) | |
207 | |
208 try: | |
209 from netrc import NetrcParseError, netrc | |
210 | |
211 netrc_path = None | |
212 | |
213 for f in netrc_locations: | |
214 try: | |
215 loc = os.path.expanduser(f) | |
216 except KeyError: | |
217 # os.path.expanduser can fail when $HOME is undefined and | |
218 # getpwuid fails. See https://bugs.python.org/issue20164 & | |
219 # https://github.com/psf/requests/issues/1846 | |
220 return | |
221 | |
222 if os.path.exists(loc): | |
223 netrc_path = loc | |
224 break | |
225 | |
226 # Abort early if there isn't one. | |
227 if netrc_path is None: | |
228 return | |
229 | |
230 ri = urlparse(url) | |
231 | |
232 # Strip port numbers from netloc. This weird `if...encode`` dance is | |
233 # used for Python 3.2, which doesn't support unicode literals. | |
234 splitstr = b":" | |
235 if isinstance(url, str): | |
236 splitstr = splitstr.decode("ascii") | |
237 host = ri.netloc.split(splitstr)[0] | |
238 | |
239 try: | |
240 _netrc = netrc(netrc_path).authenticators(host) | |
241 if _netrc: | |
242 # Return with login / password | |
243 login_i = 0 if _netrc[0] else 1 | |
244 return (_netrc[login_i], _netrc[2]) | |
245 except (NetrcParseError, OSError): | |
246 # If there was a parsing error or a permissions issue reading the file, | |
247 # we'll just skip netrc auth unless explicitly asked to raise errors. | |
248 if raise_errors: | |
249 raise | |
250 | |
251 # App Engine hackiness. | |
252 except (ImportError, AttributeError): | |
253 pass | |
254 | |
255 | |
256 def guess_filename(obj): | |
257 """Tries to guess the filename of the given object.""" | |
258 name = getattr(obj, "name", None) | |
259 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": | |
260 return os.path.basename(name) | |
261 | |
262 | |
263 def extract_zipped_paths(path): | |
264 """Replace nonexistent paths that look like they refer to a member of a zip | |
265 archive with the location of an extracted copy of the target, or else | |
266 just return the provided path unchanged. | |
267 """ | |
268 if os.path.exists(path): | |
269 # this is already a valid path, no need to do anything further | |
270 return path | |
271 | |
272 # find the first valid part of the provided path and treat that as a zip archive | |
273 # assume the rest of the path is the name of a member in the archive | |
274 archive, member = os.path.split(path) | |
275 while archive and not os.path.exists(archive): | |
276 archive, prefix = os.path.split(archive) | |
277 if not prefix: | |
278 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), | |
279 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users | |
280 break | |
281 member = "/".join([prefix, member]) | |
282 | |
283 if not zipfile.is_zipfile(archive): | |
284 return path | |
285 | |
286 zip_file = zipfile.ZipFile(archive) | |
287 if member not in zip_file.namelist(): | |
288 return path | |
289 | |
290 # we have a valid zip archive and a valid member of that archive | |
291 tmp = tempfile.gettempdir() | |
292 extracted_path = os.path.join(tmp, member.split("/")[-1]) | |
293 if not os.path.exists(extracted_path): | |
294 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition | |
295 with atomic_open(extracted_path) as file_handler: | |
296 file_handler.write(zip_file.read(member)) | |
297 return extracted_path | |
298 | |
299 | |
300 @contextlib.contextmanager | |
301 def atomic_open(filename): | |
302 """Write a file to the disk in an atomic fashion""" | |
303 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) | |
304 try: | |
305 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: | |
306 yield tmp_handler | |
307 os.replace(tmp_name, filename) | |
308 except BaseException: | |
309 os.remove(tmp_name) | |
310 raise | |
311 | |
312 | |
313 def from_key_val_list(value): | |
314 """Take an object and test to see if it can be represented as a | |
315 dictionary. Unless it can not be represented as such, return an | |
316 OrderedDict, e.g., | |
317 | |
318 :: | |
319 | |
320 >>> from_key_val_list([('key', 'val')]) | |
321 OrderedDict([('key', 'val')]) | |
322 >>> from_key_val_list('string') | |
323 Traceback (most recent call last): | |
324 ... | |
325 ValueError: cannot encode objects that are not 2-tuples | |
326 >>> from_key_val_list({'key': 'val'}) | |
327 OrderedDict([('key', 'val')]) | |
328 | |
329 :rtype: OrderedDict | |
330 """ | |
331 if value is None: | |
332 return None | |
333 | |
334 if isinstance(value, (str, bytes, bool, int)): | |
335 raise ValueError("cannot encode objects that are not 2-tuples") | |
336 | |
337 return OrderedDict(value) | |
338 | |
339 | |
340 def to_key_val_list(value): | |
341 """Take an object and test to see if it can be represented as a | |
342 dictionary. If it can be, return a list of tuples, e.g., | |
343 | |
344 :: | |
345 | |
346 >>> to_key_val_list([('key', 'val')]) | |
347 [('key', 'val')] | |
348 >>> to_key_val_list({'key': 'val'}) | |
349 [('key', 'val')] | |
350 >>> to_key_val_list('string') | |
351 Traceback (most recent call last): | |
352 ... | |
353 ValueError: cannot encode objects that are not 2-tuples | |
354 | |
355 :rtype: list | |
356 """ | |
357 if value is None: | |
358 return None | |
359 | |
360 if isinstance(value, (str, bytes, bool, int)): | |
361 raise ValueError("cannot encode objects that are not 2-tuples") | |
362 | |
363 if isinstance(value, Mapping): | |
364 value = value.items() | |
365 | |
366 return list(value) | |
367 | |
368 | |
369 # From mitsuhiko/werkzeug (used with permission). | |
370 def parse_list_header(value): | |
371 """Parse lists as described by RFC 2068 Section 2. | |
372 | |
373 In particular, parse comma-separated lists where the elements of | |
374 the list may include quoted-strings. A quoted-string could | |
375 contain a comma. A non-quoted string could have quotes in the | |
376 middle. Quotes are removed automatically after parsing. | |
377 | |
378 It basically works like :func:`parse_set_header` just that items | |
379 may appear multiple times and case sensitivity is preserved. | |
380 | |
381 The return value is a standard :class:`list`: | |
382 | |
383 >>> parse_list_header('token, "quoted value"') | |
384 ['token', 'quoted value'] | |
385 | |
386 To create a header from the :class:`list` again, use the | |
387 :func:`dump_header` function. | |
388 | |
389 :param value: a string with a list header. | |
390 :return: :class:`list` | |
391 :rtype: list | |
392 """ | |
393 result = [] | |
394 for item in _parse_list_header(value): | |
395 if item[:1] == item[-1:] == '"': | |
396 item = unquote_header_value(item[1:-1]) | |
397 result.append(item) | |
398 return result | |
399 | |
400 | |
401 # From mitsuhiko/werkzeug (used with permission). | |
402 def parse_dict_header(value): | |
403 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and | |
404 convert them into a python dict: | |
405 | |
406 >>> d = parse_dict_header('foo="is a fish", bar="as well"') | |
407 >>> type(d) is dict | |
408 True | |
409 >>> sorted(d.items()) | |
410 [('bar', 'as well'), ('foo', 'is a fish')] | |
411 | |
412 If there is no value for a key it will be `None`: | |
413 | |
414 >>> parse_dict_header('key_without_value') | |
415 {'key_without_value': None} | |
416 | |
417 To create a header from the :class:`dict` again, use the | |
418 :func:`dump_header` function. | |
419 | |
420 :param value: a string with a dict header. | |
421 :return: :class:`dict` | |
422 :rtype: dict | |
423 """ | |
424 result = {} | |
425 for item in _parse_list_header(value): | |
426 if "=" not in item: | |
427 result[item] = None | |
428 continue | |
429 name, value = item.split("=", 1) | |
430 if value[:1] == value[-1:] == '"': | |
431 value = unquote_header_value(value[1:-1]) | |
432 result[name] = value | |
433 return result | |
434 | |
435 | |
436 # From mitsuhiko/werkzeug (used with permission). | |
437 def unquote_header_value(value, is_filename=False): | |
438 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). | |
439 This does not use the real unquoting but what browsers are actually | |
440 using for quoting. | |
441 | |
442 :param value: the header value to unquote. | |
443 :rtype: str | |
444 """ | |
445 if value and value[0] == value[-1] == '"': | |
446 # this is not the real unquoting, but fixing this so that the | |
447 # RFC is met will result in bugs with internet explorer and | |
448 # probably some other browsers as well. IE for example is | |
449 # uploading files with "C:\foo\bar.txt" as filename | |
450 value = value[1:-1] | |
451 | |
452 # if this is a filename and the starting characters look like | |
453 # a UNC path, then just return the value without quotes. Using the | |
454 # replace sequence below on a UNC path has the effect of turning | |
455 # the leading double slash into a single slash and then | |
456 # _fix_ie_filename() doesn't work correctly. See #458. | |
457 if not is_filename or value[:2] != "\\\\": | |
458 return value.replace("\\\\", "\\").replace('\\"', '"') | |
459 return value | |
460 | |
461 | |
462 def dict_from_cookiejar(cj): | |
463 """Returns a key/value dictionary from a CookieJar. | |
464 | |
465 :param cj: CookieJar object to extract cookies from. | |
466 :rtype: dict | |
467 """ | |
468 | |
469 cookie_dict = {} | |
470 | |
471 for cookie in cj: | |
472 cookie_dict[cookie.name] = cookie.value | |
473 | |
474 return cookie_dict | |
475 | |
476 | |
477 def add_dict_to_cookiejar(cj, cookie_dict): | |
478 """Returns a CookieJar from a key/value dictionary. | |
479 | |
480 :param cj: CookieJar to insert cookies into. | |
481 :param cookie_dict: Dict of key/values to insert into CookieJar. | |
482 :rtype: CookieJar | |
483 """ | |
484 | |
485 return cookiejar_from_dict(cookie_dict, cj) | |
486 | |
487 | |
488 def get_encodings_from_content(content): | |
489 """Returns encodings from given content string. | |
490 | |
491 :param content: bytestring to extract encodings from. | |
492 """ | |
493 warnings.warn( | |
494 ( | |
495 "In requests 3.0, get_encodings_from_content will be removed. For " | |
496 "more information, please see the discussion on issue #2266. (This" | |
497 " warning should only appear once.)" | |
498 ), | |
499 DeprecationWarning, | |
500 ) | |
501 | |
502 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) | |
503 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) | |
504 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') | |
505 | |
506 return ( | |
507 charset_re.findall(content) | |
508 + pragma_re.findall(content) | |
509 + xml_re.findall(content) | |
510 ) | |
511 | |
512 | |
513 def _parse_content_type_header(header): | |
514 """Returns content type and parameters from given header | |
515 | |
516 :param header: string | |
517 :return: tuple containing content type and dictionary of | |
518 parameters | |
519 """ | |
520 | |
521 tokens = header.split(";") | |
522 content_type, params = tokens[0].strip(), tokens[1:] | |
523 params_dict = {} | |
524 items_to_strip = "\"' " | |
525 | |
526 for param in params: | |
527 param = param.strip() | |
528 if param: | |
529 key, value = param, True | |
530 index_of_equals = param.find("=") | |
531 if index_of_equals != -1: | |
532 key = param[:index_of_equals].strip(items_to_strip) | |
533 value = param[index_of_equals + 1 :].strip(items_to_strip) | |
534 params_dict[key.lower()] = value | |
535 return content_type, params_dict | |
536 | |
537 | |
538 def get_encoding_from_headers(headers): | |
539 """Returns encodings from given HTTP Header Dict. | |
540 | |
541 :param headers: dictionary to extract encoding from. | |
542 :rtype: str | |
543 """ | |
544 | |
545 content_type = headers.get("content-type") | |
546 | |
547 if not content_type: | |
548 return None | |
549 | |
550 content_type, params = _parse_content_type_header(content_type) | |
551 | |
552 if "charset" in params: | |
553 return params["charset"].strip("'\"") | |
554 | |
555 if "text" in content_type: | |
556 return "ISO-8859-1" | |
557 | |
558 if "application/json" in content_type: | |
559 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset | |
560 return "utf-8" | |
561 | |
562 | |
563 def stream_decode_response_unicode(iterator, r): | |
564 """Stream decodes an iterator.""" | |
565 | |
566 if r.encoding is None: | |
567 yield from iterator | |
568 return | |
569 | |
570 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") | |
571 for chunk in iterator: | |
572 rv = decoder.decode(chunk) | |
573 if rv: | |
574 yield rv | |
575 rv = decoder.decode(b"", final=True) | |
576 if rv: | |
577 yield rv | |
578 | |
579 | |
580 def iter_slices(string, slice_length): | |
581 """Iterate over slices of a string.""" | |
582 pos = 0 | |
583 if slice_length is None or slice_length <= 0: | |
584 slice_length = len(string) | |
585 while pos < len(string): | |
586 yield string[pos : pos + slice_length] | |
587 pos += slice_length | |
588 | |
589 | |
590 def get_unicode_from_response(r): | |
591 """Returns the requested content back in unicode. | |
592 | |
593 :param r: Response object to get unicode content from. | |
594 | |
595 Tried: | |
596 | |
597 1. charset from content-type | |
598 2. fall back and replace all unicode characters | |
599 | |
600 :rtype: str | |
601 """ | |
602 warnings.warn( | |
603 ( | |
604 "In requests 3.0, get_unicode_from_response will be removed. For " | |
605 "more information, please see the discussion on issue #2266. (This" | |
606 " warning should only appear once.)" | |
607 ), | |
608 DeprecationWarning, | |
609 ) | |
610 | |
611 tried_encodings = [] | |
612 | |
613 # Try charset from content-type | |
614 encoding = get_encoding_from_headers(r.headers) | |
615 | |
616 if encoding: | |
617 try: | |
618 return str(r.content, encoding) | |
619 except UnicodeError: | |
620 tried_encodings.append(encoding) | |
621 | |
622 # Fall back: | |
623 try: | |
624 return str(r.content, encoding, errors="replace") | |
625 except TypeError: | |
626 return r.content | |
627 | |
628 | |
629 # The unreserved URI characters (RFC 3986) | |
630 UNRESERVED_SET = frozenset( | |
631 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" | |
632 ) | |
633 | |
634 | |
635 def unquote_unreserved(uri): | |
636 """Un-escape any percent-escape sequences in a URI that are unreserved | |
637 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. | |
638 | |
639 :rtype: str | |
640 """ | |
641 parts = uri.split("%") | |
642 for i in range(1, len(parts)): | |
643 h = parts[i][0:2] | |
644 if len(h) == 2 and h.isalnum(): | |
645 try: | |
646 c = chr(int(h, 16)) | |
647 except ValueError: | |
648 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") | |
649 | |
650 if c in UNRESERVED_SET: | |
651 parts[i] = c + parts[i][2:] | |
652 else: | |
653 parts[i] = f"%{parts[i]}" | |
654 else: | |
655 parts[i] = f"%{parts[i]}" | |
656 return "".join(parts) | |
657 | |
658 | |
659 def requote_uri(uri): | |
660 """Re-quote the given URI. | |
661 | |
662 This function passes the given URI through an unquote/quote cycle to | |
663 ensure that it is fully and consistently quoted. | |
664 | |
665 :rtype: str | |
666 """ | |
667 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" | |
668 safe_without_percent = "!#$&'()*+,/:;=?@[]~" | |
669 try: | |
670 # Unquote only the unreserved characters | |
671 # Then quote only illegal characters (do not quote reserved, | |
672 # unreserved, or '%') | |
673 return quote(unquote_unreserved(uri), safe=safe_with_percent) | |
674 except InvalidURL: | |
675 # We couldn't unquote the given URI, so let's try quoting it, but | |
676 # there may be unquoted '%'s in the URI. We need to make sure they're | |
677 # properly quoted so they do not cause issues elsewhere. | |
678 return quote(uri, safe=safe_without_percent) | |
679 | |
680 | |
681 def address_in_network(ip, net): | |
682 """This function allows you to check if an IP belongs to a network subnet | |
683 | |
684 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 | |
685 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 | |
686 | |
687 :rtype: bool | |
688 """ | |
689 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] | |
690 netaddr, bits = net.split("/") | |
691 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] | |
692 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask | |
693 return (ipaddr & netmask) == (network & netmask) | |
694 | |
695 | |
696 def dotted_netmask(mask): | |
697 """Converts mask from /xx format to xxx.xxx.xxx.xxx | |
698 | |
699 Example: if mask is 24 function returns 255.255.255.0 | |
700 | |
701 :rtype: str | |
702 """ | |
703 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 | |
704 return socket.inet_ntoa(struct.pack(">I", bits)) | |
705 | |
706 | |
707 def is_ipv4_address(string_ip): | |
708 """ | |
709 :rtype: bool | |
710 """ | |
711 try: | |
712 socket.inet_aton(string_ip) | |
713 except OSError: | |
714 return False | |
715 return True | |
716 | |
717 | |
718 def is_valid_cidr(string_network): | |
719 """ | |
720 Very simple check of the cidr format in no_proxy variable. | |
721 | |
722 :rtype: bool | |
723 """ | |
724 if string_network.count("/") == 1: | |
725 try: | |
726 mask = int(string_network.split("/")[1]) | |
727 except ValueError: | |
728 return False | |
729 | |
730 if mask < 1 or mask > 32: | |
731 return False | |
732 | |
733 try: | |
734 socket.inet_aton(string_network.split("/")[0]) | |
735 except OSError: | |
736 return False | |
737 else: | |
738 return False | |
739 return True | |
740 | |
741 | |
742 @contextlib.contextmanager | |
743 def set_environ(env_name, value): | |
744 """Set the environment variable 'env_name' to 'value' | |
745 | |
746 Save previous value, yield, and then restore the previous value stored in | |
747 the environment variable 'env_name'. | |
748 | |
749 If 'value' is None, do nothing""" | |
750 value_changed = value is not None | |
751 if value_changed: | |
752 old_value = os.environ.get(env_name) | |
753 os.environ[env_name] = value | |
754 try: | |
755 yield | |
756 finally: | |
757 if value_changed: | |
758 if old_value is None: | |
759 del os.environ[env_name] | |
760 else: | |
761 os.environ[env_name] = old_value | |
762 | |
763 | |
764 def should_bypass_proxies(url, no_proxy): | |
765 """ | |
766 Returns whether we should bypass proxies or not. | |
767 | |
768 :rtype: bool | |
769 """ | |
770 # Prioritize lowercase environment variables over uppercase | |
771 # to keep a consistent behaviour with other http projects (curl, wget). | |
772 def get_proxy(key): | |
773 return os.environ.get(key) or os.environ.get(key.upper()) | |
774 | |
775 # First check whether no_proxy is defined. If it is, check that the URL | |
776 # we're getting isn't in the no_proxy list. | |
777 no_proxy_arg = no_proxy | |
778 if no_proxy is None: | |
779 no_proxy = get_proxy("no_proxy") | |
780 parsed = urlparse(url) | |
781 | |
782 if parsed.hostname is None: | |
783 # URLs don't always have hostnames, e.g. file:/// urls. | |
784 return True | |
785 | |
786 if no_proxy: | |
787 # We need to check whether we match here. We need to see if we match | |
788 # the end of the hostname, both with and without the port. | |
789 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) | |
790 | |
791 if is_ipv4_address(parsed.hostname): | |
792 for proxy_ip in no_proxy: | |
793 if is_valid_cidr(proxy_ip): | |
794 if address_in_network(parsed.hostname, proxy_ip): | |
795 return True | |
796 elif parsed.hostname == proxy_ip: | |
797 # If no_proxy ip was defined in plain IP notation instead of cidr notation & | |
798 # matches the IP of the index | |
799 return True | |
800 else: | |
801 host_with_port = parsed.hostname | |
802 if parsed.port: | |
803 host_with_port += f":{parsed.port}" | |
804 | |
805 for host in no_proxy: | |
806 if parsed.hostname.endswith(host) or host_with_port.endswith(host): | |
807 # The URL does match something in no_proxy, so we don't want | |
808 # to apply the proxies on this URL. | |
809 return True | |
810 | |
811 with set_environ("no_proxy", no_proxy_arg): | |
812 # parsed.hostname can be `None` in cases such as a file URI. | |
813 try: | |
814 bypass = proxy_bypass(parsed.hostname) | |
815 except (TypeError, socket.gaierror): | |
816 bypass = False | |
817 | |
818 if bypass: | |
819 return True | |
820 | |
821 return False | |
822 | |
823 | |
824 def get_environ_proxies(url, no_proxy=None): | |
825 """ | |
826 Return a dict of environment proxies. | |
827 | |
828 :rtype: dict | |
829 """ | |
830 if should_bypass_proxies(url, no_proxy=no_proxy): | |
831 return {} | |
832 else: | |
833 return getproxies() | |
834 | |
835 | |
836 def select_proxy(url, proxies): | |
837 """Select a proxy for the url, if applicable. | |
838 | |
839 :param url: The url being for the request | |
840 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs | |
841 """ | |
842 proxies = proxies or {} | |
843 urlparts = urlparse(url) | |
844 if urlparts.hostname is None: | |
845 return proxies.get(urlparts.scheme, proxies.get("all")) | |
846 | |
847 proxy_keys = [ | |
848 urlparts.scheme + "://" + urlparts.hostname, | |
849 urlparts.scheme, | |
850 "all://" + urlparts.hostname, | |
851 "all", | |
852 ] | |
853 proxy = None | |
854 for proxy_key in proxy_keys: | |
855 if proxy_key in proxies: | |
856 proxy = proxies[proxy_key] | |
857 break | |
858 | |
859 return proxy | |
860 | |
861 | |
862 def resolve_proxies(request, proxies, trust_env=True): | |
863 """This method takes proxy information from a request and configuration | |
864 input to resolve a mapping of target proxies. This will consider settings | |
865 such a NO_PROXY to strip proxy configurations. | |
866 | |
867 :param request: Request or PreparedRequest | |
868 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs | |
869 :param trust_env: Boolean declaring whether to trust environment configs | |
870 | |
871 :rtype: dict | |
872 """ | |
873 proxies = proxies if proxies is not None else {} | |
874 url = request.url | |
875 scheme = urlparse(url).scheme | |
876 no_proxy = proxies.get("no_proxy") | |
877 new_proxies = proxies.copy() | |
878 | |
879 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): | |
880 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) | |
881 | |
882 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) | |
883 | |
884 if proxy: | |
885 new_proxies.setdefault(scheme, proxy) | |
886 return new_proxies | |
887 | |
888 | |
889 def default_user_agent(name="python-requests"): | |
890 """ | |
891 Return a string representing the default user agent. | |
892 | |
893 :rtype: str | |
894 """ | |
895 return f"{name}/{__version__}" | |
896 | |
897 | |
898 def default_headers(): | |
899 """ | |
900 :rtype: requests.structures.CaseInsensitiveDict | |
901 """ | |
902 return CaseInsensitiveDict( | |
903 { | |
904 "User-Agent": default_user_agent(), | |
905 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, | |
906 "Accept": "*/*", | |
907 "Connection": "keep-alive", | |
908 } | |
909 ) | |
910 | |
911 | |
912 def parse_header_links(value): | |
913 """Return a list of parsed link headers proxies. | |
914 | |
915 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" | |
916 | |
917 :rtype: list | |
918 """ | |
919 | |
920 links = [] | |
921 | |
922 replace_chars = " '\"" | |
923 | |
924 value = value.strip(replace_chars) | |
925 if not value: | |
926 return links | |
927 | |
928 for val in re.split(", *<", value): | |
929 try: | |
930 url, params = val.split(";", 1) | |
931 except ValueError: | |
932 url, params = val, "" | |
933 | |
934 link = {"url": url.strip("<> '\"")} | |
935 | |
936 for param in params.split(";"): | |
937 try: | |
938 key, value = param.split("=") | |
939 except ValueError: | |
940 break | |
941 | |
942 link[key.strip(replace_chars)] = value.strip(replace_chars) | |
943 | |
944 links.append(link) | |
945 | |
946 return links | |
947 | |
948 | |
949 # Null bytes; no need to recreate these on each call to guess_json_utf | |
950 _null = "\x00".encode("ascii") # encoding to ASCII for Python 3 | |
951 _null2 = _null * 2 | |
952 _null3 = _null * 3 | |
953 | |
954 | |
955 def guess_json_utf(data): | |
956 """ | |
957 :rtype: str | |
958 """ | |
959 # JSON always starts with two ASCII characters, so detection is as | |
960 # easy as counting the nulls and from their location and count | |
961 # determine the encoding. Also detect a BOM, if present. | |
962 sample = data[:4] | |
963 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): | |
964 return "utf-32" # BOM included | |
965 if sample[:3] == codecs.BOM_UTF8: | |
966 return "utf-8-sig" # BOM included, MS style (discouraged) | |
967 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): | |
968 return "utf-16" # BOM included | |
969 nullcount = sample.count(_null) | |
970 if nullcount == 0: | |
971 return "utf-8" | |
972 if nullcount == 2: | |
973 if sample[::2] == _null2: # 1st and 3rd are null | |
974 return "utf-16-be" | |
975 if sample[1::2] == _null2: # 2nd and 4th are null | |
976 return "utf-16-le" | |
977 # Did not detect 2 valid UTF-16 ascii-range characters | |
978 if nullcount == 3: | |
979 if sample[:3] == _null3: | |
980 return "utf-32-be" | |
981 if sample[1:] == _null3: | |
982 return "utf-32-le" | |
983 # Did not detect a valid UTF-32 ascii-range character | |
984 return None | |
985 | |
986 | |
987 def prepend_scheme_if_needed(url, new_scheme): | |
988 """Given a URL that may or may not have a scheme, prepend the given scheme. | |
989 Does not replace a present scheme with the one provided as an argument. | |
990 | |
991 :rtype: str | |
992 """ | |
993 parsed = parse_url(url) | |
994 scheme, auth, host, port, path, query, fragment = parsed | |
995 | |
996 # A defect in urlparse determines that there isn't a netloc present in some | |
997 # urls. We previously assumed parsing was overly cautious, and swapped the | |
998 # netloc and path. Due to a lack of tests on the original defect, this is | |
999 # maintained with parse_url for backwards compatibility. | |
1000 netloc = parsed.netloc | |
1001 if not netloc: | |
1002 netloc, path = path, netloc | |
1003 | |
1004 if auth: | |
1005 # parse_url doesn't provide the netloc with auth | |
1006 # so we'll add it ourselves. | |
1007 netloc = "@".join([auth, netloc]) | |
1008 if scheme is None: | |
1009 scheme = new_scheme | |
1010 if path is None: | |
1011 path = "" | |
1012 | |
1013 return urlunparse((scheme, netloc, path, "", query, fragment)) | |
1014 | |
1015 | |
1016 def get_auth_from_url(url): | |
1017 """Given a url with authentication components, extract them into a tuple of | |
1018 username,password. | |
1019 | |
1020 :rtype: (str,str) | |
1021 """ | |
1022 parsed = urlparse(url) | |
1023 | |
1024 try: | |
1025 auth = (unquote(parsed.username), unquote(parsed.password)) | |
1026 except (AttributeError, TypeError): | |
1027 auth = ("", "") | |
1028 | |
1029 return auth | |
1030 | |
1031 | |
1032 def check_header_validity(header): | |
1033 """Verifies that header parts don't contain leading whitespace | |
1034 reserved characters, or return characters. | |
1035 | |
1036 :param header: tuple, in the format (name, value). | |
1037 """ | |
1038 name, value = header | |
1039 _validate_header_part(header, name, 0) | |
1040 _validate_header_part(header, value, 1) | |
1041 | |
1042 | |
1043 def _validate_header_part(header, header_part, header_validator_index): | |
1044 if isinstance(header_part, str): | |
1045 validator = _HEADER_VALIDATORS_STR[header_validator_index] | |
1046 elif isinstance(header_part, bytes): | |
1047 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] | |
1048 else: | |
1049 raise InvalidHeader( | |
1050 f"Header part ({header_part!r}) from {header} " | |
1051 f"must be of type str or bytes, not {type(header_part)}" | |
1052 ) | |
1053 | |
1054 if not validator.match(header_part): | |
1055 header_kind = "name" if header_validator_index == 0 else "value" | |
1056 raise InvalidHeader( | |
1057 f"Invalid leading whitespace, reserved character(s), or return" | |
1058 f"character(s) in header {header_kind}: {header_part!r}" | |
1059 ) | |
1060 | |
1061 | |
1062 def urldefragauth(url): | |
1063 """ | |
1064 Given a url remove the fragment and the authentication part. | |
1065 | |
1066 :rtype: str | |
1067 """ | |
1068 scheme, netloc, path, params, query, fragment = urlparse(url) | |
1069 | |
1070 # see func:`prepend_scheme_if_needed` | |
1071 if not netloc: | |
1072 netloc, path = path, netloc | |
1073 | |
1074 netloc = netloc.rsplit("@", 1)[-1] | |
1075 | |
1076 return urlunparse((scheme, netloc, path, params, query, "")) | |
1077 | |
1078 | |
1079 def rewind_body(prepared_request): | |
1080 """Move file pointer back to its recorded starting position | |
1081 so it can be read again on redirect. | |
1082 """ | |
1083 body_seek = getattr(prepared_request.body, "seek", None) | |
1084 if body_seek is not None and isinstance( | |
1085 prepared_request._body_position, integer_types | |
1086 ): | |
1087 try: | |
1088 body_seek(prepared_request._body_position) | |
1089 except OSError: | |
1090 raise UnrewindableBodyError( | |
1091 "An error occurred when rewinding request body for redirect." | |
1092 ) | |
1093 else: | |
1094 raise UnrewindableBodyError("Unable to rewind request body for redirect.") |