comparison requests/utils.py @ 7:5eb2d5e3bf22

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Sun, 05 May 2024 23:32:17 -0400
parents
children
comparison
equal deleted inserted replaced
6:b2745907b1eb 7:5eb2d5e3bf22
1 """
2 requests.utils
3 ~~~~~~~~~~~~~~
4
5 This module provides utility functions that are used within Requests
6 that are also useful for external consumption.
7 """
8
9 import codecs
10 import contextlib
11 import io
12 import os
13 import re
14 import socket
15 import struct
16 import sys
17 import tempfile
18 import warnings
19 import zipfile
20 from collections import OrderedDict
21
22 from urllib3.util import make_headers, parse_url
23
24 from . import certs
25 from .__version__ import __version__
26
27 # to_native_string is unused here, but imported here for backwards compatibility
28 from ._internal_utils import ( # noqa: F401
29 _HEADER_VALIDATORS_BYTE,
30 _HEADER_VALIDATORS_STR,
31 HEADER_VALIDATORS,
32 to_native_string,
33 )
34 from .compat import (
35 Mapping,
36 basestring,
37 bytes,
38 getproxies,
39 getproxies_environment,
40 integer_types,
41 )
42 from .compat import parse_http_list as _parse_list_header
43 from .compat import (
44 proxy_bypass,
45 proxy_bypass_environment,
46 quote,
47 str,
48 unquote,
49 urlparse,
50 urlunparse,
51 )
52 from .cookies import cookiejar_from_dict
53 from .exceptions import (
54 FileModeWarning,
55 InvalidHeader,
56 InvalidURL,
57 UnrewindableBodyError,
58 )
59 from .structures import CaseInsensitiveDict
60
61 NETRC_FILES = (".netrc", "_netrc")
62
63 DEFAULT_CA_BUNDLE_PATH = certs.where()
64
65 DEFAULT_PORTS = {"http": 80, "https": 443}
66
67 # Ensure that ', ' is used to preserve previous delimiter behavior.
68 DEFAULT_ACCEPT_ENCODING = ", ".join(
69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
70 )
71
72
73 if sys.platform == "win32":
74 # provide a proxy_bypass version on Windows without DNS lookups
75
76 def proxy_bypass_registry(host):
77 try:
78 import winreg
79 except ImportError:
80 return False
81
82 try:
83 internetSettings = winreg.OpenKey(
84 winreg.HKEY_CURRENT_USER,
85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
86 )
87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
89 # ProxyOverride is almost always a string
90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
91 except (OSError, ValueError):
92 return False
93 if not proxyEnable or not proxyOverride:
94 return False
95
96 # make a check value list from the registry entry: replace the
97 # '<local>' string by the localhost entry and the corresponding
98 # canonical entry.
99 proxyOverride = proxyOverride.split(";")
100 # now check if we match one of the registry values.
101 for test in proxyOverride:
102 if test == "<local>":
103 if "." not in host:
104 return True
105 test = test.replace(".", r"\.") # mask dots
106 test = test.replace("*", r".*") # change glob sequence
107 test = test.replace("?", r".") # change glob char
108 if re.match(test, host, re.I):
109 return True
110 return False
111
112 def proxy_bypass(host): # noqa
113 """Return True, if the host should be bypassed.
114
115 Checks proxy settings gathered from the environment, if specified,
116 or the registry.
117 """
118 if getproxies_environment():
119 return proxy_bypass_environment(host)
120 else:
121 return proxy_bypass_registry(host)
122
123
124 def dict_to_sequence(d):
125 """Returns an internal sequence dictionary update."""
126
127 if hasattr(d, "items"):
128 d = d.items()
129
130 return d
131
132
133 def super_len(o):
134 total_length = None
135 current_position = 0
136
137 if hasattr(o, "__len__"):
138 total_length = len(o)
139
140 elif hasattr(o, "len"):
141 total_length = o.len
142
143 elif hasattr(o, "fileno"):
144 try:
145 fileno = o.fileno()
146 except (io.UnsupportedOperation, AttributeError):
147 # AttributeError is a surprising exception, seeing as how we've just checked
148 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
149 # `Tarfile.extractfile()`, per issue 5229.
150 pass
151 else:
152 total_length = os.fstat(fileno).st_size
153
154 # Having used fstat to determine the file length, we need to
155 # confirm that this file was opened up in binary mode.
156 if "b" not in o.mode:
157 warnings.warn(
158 (
159 "Requests has determined the content-length for this "
160 "request using the binary size of the file: however, the "
161 "file has been opened in text mode (i.e. without the 'b' "
162 "flag in the mode). This may lead to an incorrect "
163 "content-length. In Requests 3.0, support will be removed "
164 "for files in text mode."
165 ),
166 FileModeWarning,
167 )
168
169 if hasattr(o, "tell"):
170 try:
171 current_position = o.tell()
172 except OSError:
173 # This can happen in some weird situations, such as when the file
174 # is actually a special file descriptor like stdin. In this
175 # instance, we don't know what the length is, so set it to zero and
176 # let requests chunk it instead.
177 if total_length is not None:
178 current_position = total_length
179 else:
180 if hasattr(o, "seek") and total_length is None:
181 # StringIO and BytesIO have seek but no usable fileno
182 try:
183 # seek to end of file
184 o.seek(0, 2)
185 total_length = o.tell()
186
187 # seek back to current position to support
188 # partially read file-like objects
189 o.seek(current_position or 0)
190 except OSError:
191 total_length = 0
192
193 if total_length is None:
194 total_length = 0
195
196 return max(0, total_length - current_position)
197
198
199 def get_netrc_auth(url, raise_errors=False):
200 """Returns the Requests tuple auth for a given url from netrc."""
201
202 netrc_file = os.environ.get("NETRC")
203 if netrc_file is not None:
204 netrc_locations = (netrc_file,)
205 else:
206 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
207
208 try:
209 from netrc import NetrcParseError, netrc
210
211 netrc_path = None
212
213 for f in netrc_locations:
214 try:
215 loc = os.path.expanduser(f)
216 except KeyError:
217 # os.path.expanduser can fail when $HOME is undefined and
218 # getpwuid fails. See https://bugs.python.org/issue20164 &
219 # https://github.com/psf/requests/issues/1846
220 return
221
222 if os.path.exists(loc):
223 netrc_path = loc
224 break
225
226 # Abort early if there isn't one.
227 if netrc_path is None:
228 return
229
230 ri = urlparse(url)
231
232 # Strip port numbers from netloc. This weird `if...encode`` dance is
233 # used for Python 3.2, which doesn't support unicode literals.
234 splitstr = b":"
235 if isinstance(url, str):
236 splitstr = splitstr.decode("ascii")
237 host = ri.netloc.split(splitstr)[0]
238
239 try:
240 _netrc = netrc(netrc_path).authenticators(host)
241 if _netrc:
242 # Return with login / password
243 login_i = 0 if _netrc[0] else 1
244 return (_netrc[login_i], _netrc[2])
245 except (NetrcParseError, OSError):
246 # If there was a parsing error or a permissions issue reading the file,
247 # we'll just skip netrc auth unless explicitly asked to raise errors.
248 if raise_errors:
249 raise
250
251 # App Engine hackiness.
252 except (ImportError, AttributeError):
253 pass
254
255
256 def guess_filename(obj):
257 """Tries to guess the filename of the given object."""
258 name = getattr(obj, "name", None)
259 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
260 return os.path.basename(name)
261
262
263 def extract_zipped_paths(path):
264 """Replace nonexistent paths that look like they refer to a member of a zip
265 archive with the location of an extracted copy of the target, or else
266 just return the provided path unchanged.
267 """
268 if os.path.exists(path):
269 # this is already a valid path, no need to do anything further
270 return path
271
272 # find the first valid part of the provided path and treat that as a zip archive
273 # assume the rest of the path is the name of a member in the archive
274 archive, member = os.path.split(path)
275 while archive and not os.path.exists(archive):
276 archive, prefix = os.path.split(archive)
277 if not prefix:
278 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
279 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
280 break
281 member = "/".join([prefix, member])
282
283 if not zipfile.is_zipfile(archive):
284 return path
285
286 zip_file = zipfile.ZipFile(archive)
287 if member not in zip_file.namelist():
288 return path
289
290 # we have a valid zip archive and a valid member of that archive
291 tmp = tempfile.gettempdir()
292 extracted_path = os.path.join(tmp, member.split("/")[-1])
293 if not os.path.exists(extracted_path):
294 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
295 with atomic_open(extracted_path) as file_handler:
296 file_handler.write(zip_file.read(member))
297 return extracted_path
298
299
300 @contextlib.contextmanager
301 def atomic_open(filename):
302 """Write a file to the disk in an atomic fashion"""
303 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
304 try:
305 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
306 yield tmp_handler
307 os.replace(tmp_name, filename)
308 except BaseException:
309 os.remove(tmp_name)
310 raise
311
312
313 def from_key_val_list(value):
314 """Take an object and test to see if it can be represented as a
315 dictionary. Unless it can not be represented as such, return an
316 OrderedDict, e.g.,
317
318 ::
319
320 >>> from_key_val_list([('key', 'val')])
321 OrderedDict([('key', 'val')])
322 >>> from_key_val_list('string')
323 Traceback (most recent call last):
324 ...
325 ValueError: cannot encode objects that are not 2-tuples
326 >>> from_key_val_list({'key': 'val'})
327 OrderedDict([('key', 'val')])
328
329 :rtype: OrderedDict
330 """
331 if value is None:
332 return None
333
334 if isinstance(value, (str, bytes, bool, int)):
335 raise ValueError("cannot encode objects that are not 2-tuples")
336
337 return OrderedDict(value)
338
339
340 def to_key_val_list(value):
341 """Take an object and test to see if it can be represented as a
342 dictionary. If it can be, return a list of tuples, e.g.,
343
344 ::
345
346 >>> to_key_val_list([('key', 'val')])
347 [('key', 'val')]
348 >>> to_key_val_list({'key': 'val'})
349 [('key', 'val')]
350 >>> to_key_val_list('string')
351 Traceback (most recent call last):
352 ...
353 ValueError: cannot encode objects that are not 2-tuples
354
355 :rtype: list
356 """
357 if value is None:
358 return None
359
360 if isinstance(value, (str, bytes, bool, int)):
361 raise ValueError("cannot encode objects that are not 2-tuples")
362
363 if isinstance(value, Mapping):
364 value = value.items()
365
366 return list(value)
367
368
369 # From mitsuhiko/werkzeug (used with permission).
370 def parse_list_header(value):
371 """Parse lists as described by RFC 2068 Section 2.
372
373 In particular, parse comma-separated lists where the elements of
374 the list may include quoted-strings. A quoted-string could
375 contain a comma. A non-quoted string could have quotes in the
376 middle. Quotes are removed automatically after parsing.
377
378 It basically works like :func:`parse_set_header` just that items
379 may appear multiple times and case sensitivity is preserved.
380
381 The return value is a standard :class:`list`:
382
383 >>> parse_list_header('token, "quoted value"')
384 ['token', 'quoted value']
385
386 To create a header from the :class:`list` again, use the
387 :func:`dump_header` function.
388
389 :param value: a string with a list header.
390 :return: :class:`list`
391 :rtype: list
392 """
393 result = []
394 for item in _parse_list_header(value):
395 if item[:1] == item[-1:] == '"':
396 item = unquote_header_value(item[1:-1])
397 result.append(item)
398 return result
399
400
401 # From mitsuhiko/werkzeug (used with permission).
402 def parse_dict_header(value):
403 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
404 convert them into a python dict:
405
406 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
407 >>> type(d) is dict
408 True
409 >>> sorted(d.items())
410 [('bar', 'as well'), ('foo', 'is a fish')]
411
412 If there is no value for a key it will be `None`:
413
414 >>> parse_dict_header('key_without_value')
415 {'key_without_value': None}
416
417 To create a header from the :class:`dict` again, use the
418 :func:`dump_header` function.
419
420 :param value: a string with a dict header.
421 :return: :class:`dict`
422 :rtype: dict
423 """
424 result = {}
425 for item in _parse_list_header(value):
426 if "=" not in item:
427 result[item] = None
428 continue
429 name, value = item.split("=", 1)
430 if value[:1] == value[-1:] == '"':
431 value = unquote_header_value(value[1:-1])
432 result[name] = value
433 return result
434
435
436 # From mitsuhiko/werkzeug (used with permission).
437 def unquote_header_value(value, is_filename=False):
438 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
439 This does not use the real unquoting but what browsers are actually
440 using for quoting.
441
442 :param value: the header value to unquote.
443 :rtype: str
444 """
445 if value and value[0] == value[-1] == '"':
446 # this is not the real unquoting, but fixing this so that the
447 # RFC is met will result in bugs with internet explorer and
448 # probably some other browsers as well. IE for example is
449 # uploading files with "C:\foo\bar.txt" as filename
450 value = value[1:-1]
451
452 # if this is a filename and the starting characters look like
453 # a UNC path, then just return the value without quotes. Using the
454 # replace sequence below on a UNC path has the effect of turning
455 # the leading double slash into a single slash and then
456 # _fix_ie_filename() doesn't work correctly. See #458.
457 if not is_filename or value[:2] != "\\\\":
458 return value.replace("\\\\", "\\").replace('\\"', '"')
459 return value
460
461
462 def dict_from_cookiejar(cj):
463 """Returns a key/value dictionary from a CookieJar.
464
465 :param cj: CookieJar object to extract cookies from.
466 :rtype: dict
467 """
468
469 cookie_dict = {}
470
471 for cookie in cj:
472 cookie_dict[cookie.name] = cookie.value
473
474 return cookie_dict
475
476
477 def add_dict_to_cookiejar(cj, cookie_dict):
478 """Returns a CookieJar from a key/value dictionary.
479
480 :param cj: CookieJar to insert cookies into.
481 :param cookie_dict: Dict of key/values to insert into CookieJar.
482 :rtype: CookieJar
483 """
484
485 return cookiejar_from_dict(cookie_dict, cj)
486
487
488 def get_encodings_from_content(content):
489 """Returns encodings from given content string.
490
491 :param content: bytestring to extract encodings from.
492 """
493 warnings.warn(
494 (
495 "In requests 3.0, get_encodings_from_content will be removed. For "
496 "more information, please see the discussion on issue #2266. (This"
497 " warning should only appear once.)"
498 ),
499 DeprecationWarning,
500 )
501
502 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
503 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
504 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
505
506 return (
507 charset_re.findall(content)
508 + pragma_re.findall(content)
509 + xml_re.findall(content)
510 )
511
512
513 def _parse_content_type_header(header):
514 """Returns content type and parameters from given header
515
516 :param header: string
517 :return: tuple containing content type and dictionary of
518 parameters
519 """
520
521 tokens = header.split(";")
522 content_type, params = tokens[0].strip(), tokens[1:]
523 params_dict = {}
524 items_to_strip = "\"' "
525
526 for param in params:
527 param = param.strip()
528 if param:
529 key, value = param, True
530 index_of_equals = param.find("=")
531 if index_of_equals != -1:
532 key = param[:index_of_equals].strip(items_to_strip)
533 value = param[index_of_equals + 1 :].strip(items_to_strip)
534 params_dict[key.lower()] = value
535 return content_type, params_dict
536
537
538 def get_encoding_from_headers(headers):
539 """Returns encodings from given HTTP Header Dict.
540
541 :param headers: dictionary to extract encoding from.
542 :rtype: str
543 """
544
545 content_type = headers.get("content-type")
546
547 if not content_type:
548 return None
549
550 content_type, params = _parse_content_type_header(content_type)
551
552 if "charset" in params:
553 return params["charset"].strip("'\"")
554
555 if "text" in content_type:
556 return "ISO-8859-1"
557
558 if "application/json" in content_type:
559 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
560 return "utf-8"
561
562
563 def stream_decode_response_unicode(iterator, r):
564 """Stream decodes an iterator."""
565
566 if r.encoding is None:
567 yield from iterator
568 return
569
570 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
571 for chunk in iterator:
572 rv = decoder.decode(chunk)
573 if rv:
574 yield rv
575 rv = decoder.decode(b"", final=True)
576 if rv:
577 yield rv
578
579
580 def iter_slices(string, slice_length):
581 """Iterate over slices of a string."""
582 pos = 0
583 if slice_length is None or slice_length <= 0:
584 slice_length = len(string)
585 while pos < len(string):
586 yield string[pos : pos + slice_length]
587 pos += slice_length
588
589
590 def get_unicode_from_response(r):
591 """Returns the requested content back in unicode.
592
593 :param r: Response object to get unicode content from.
594
595 Tried:
596
597 1. charset from content-type
598 2. fall back and replace all unicode characters
599
600 :rtype: str
601 """
602 warnings.warn(
603 (
604 "In requests 3.0, get_unicode_from_response will be removed. For "
605 "more information, please see the discussion on issue #2266. (This"
606 " warning should only appear once.)"
607 ),
608 DeprecationWarning,
609 )
610
611 tried_encodings = []
612
613 # Try charset from content-type
614 encoding = get_encoding_from_headers(r.headers)
615
616 if encoding:
617 try:
618 return str(r.content, encoding)
619 except UnicodeError:
620 tried_encodings.append(encoding)
621
622 # Fall back:
623 try:
624 return str(r.content, encoding, errors="replace")
625 except TypeError:
626 return r.content
627
628
629 # The unreserved URI characters (RFC 3986)
630 UNRESERVED_SET = frozenset(
631 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
632 )
633
634
635 def unquote_unreserved(uri):
636 """Un-escape any percent-escape sequences in a URI that are unreserved
637 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
638
639 :rtype: str
640 """
641 parts = uri.split("%")
642 for i in range(1, len(parts)):
643 h = parts[i][0:2]
644 if len(h) == 2 and h.isalnum():
645 try:
646 c = chr(int(h, 16))
647 except ValueError:
648 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
649
650 if c in UNRESERVED_SET:
651 parts[i] = c + parts[i][2:]
652 else:
653 parts[i] = f"%{parts[i]}"
654 else:
655 parts[i] = f"%{parts[i]}"
656 return "".join(parts)
657
658
659 def requote_uri(uri):
660 """Re-quote the given URI.
661
662 This function passes the given URI through an unquote/quote cycle to
663 ensure that it is fully and consistently quoted.
664
665 :rtype: str
666 """
667 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
668 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
669 try:
670 # Unquote only the unreserved characters
671 # Then quote only illegal characters (do not quote reserved,
672 # unreserved, or '%')
673 return quote(unquote_unreserved(uri), safe=safe_with_percent)
674 except InvalidURL:
675 # We couldn't unquote the given URI, so let's try quoting it, but
676 # there may be unquoted '%'s in the URI. We need to make sure they're
677 # properly quoted so they do not cause issues elsewhere.
678 return quote(uri, safe=safe_without_percent)
679
680
681 def address_in_network(ip, net):
682 """This function allows you to check if an IP belongs to a network subnet
683
684 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
685 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
686
687 :rtype: bool
688 """
689 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
690 netaddr, bits = net.split("/")
691 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
692 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
693 return (ipaddr & netmask) == (network & netmask)
694
695
696 def dotted_netmask(mask):
697 """Converts mask from /xx format to xxx.xxx.xxx.xxx
698
699 Example: if mask is 24 function returns 255.255.255.0
700
701 :rtype: str
702 """
703 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
704 return socket.inet_ntoa(struct.pack(">I", bits))
705
706
707 def is_ipv4_address(string_ip):
708 """
709 :rtype: bool
710 """
711 try:
712 socket.inet_aton(string_ip)
713 except OSError:
714 return False
715 return True
716
717
718 def is_valid_cidr(string_network):
719 """
720 Very simple check of the cidr format in no_proxy variable.
721
722 :rtype: bool
723 """
724 if string_network.count("/") == 1:
725 try:
726 mask = int(string_network.split("/")[1])
727 except ValueError:
728 return False
729
730 if mask < 1 or mask > 32:
731 return False
732
733 try:
734 socket.inet_aton(string_network.split("/")[0])
735 except OSError:
736 return False
737 else:
738 return False
739 return True
740
741
742 @contextlib.contextmanager
743 def set_environ(env_name, value):
744 """Set the environment variable 'env_name' to 'value'
745
746 Save previous value, yield, and then restore the previous value stored in
747 the environment variable 'env_name'.
748
749 If 'value' is None, do nothing"""
750 value_changed = value is not None
751 if value_changed:
752 old_value = os.environ.get(env_name)
753 os.environ[env_name] = value
754 try:
755 yield
756 finally:
757 if value_changed:
758 if old_value is None:
759 del os.environ[env_name]
760 else:
761 os.environ[env_name] = old_value
762
763
764 def should_bypass_proxies(url, no_proxy):
765 """
766 Returns whether we should bypass proxies or not.
767
768 :rtype: bool
769 """
770 # Prioritize lowercase environment variables over uppercase
771 # to keep a consistent behaviour with other http projects (curl, wget).
772 def get_proxy(key):
773 return os.environ.get(key) or os.environ.get(key.upper())
774
775 # First check whether no_proxy is defined. If it is, check that the URL
776 # we're getting isn't in the no_proxy list.
777 no_proxy_arg = no_proxy
778 if no_proxy is None:
779 no_proxy = get_proxy("no_proxy")
780 parsed = urlparse(url)
781
782 if parsed.hostname is None:
783 # URLs don't always have hostnames, e.g. file:/// urls.
784 return True
785
786 if no_proxy:
787 # We need to check whether we match here. We need to see if we match
788 # the end of the hostname, both with and without the port.
789 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
790
791 if is_ipv4_address(parsed.hostname):
792 for proxy_ip in no_proxy:
793 if is_valid_cidr(proxy_ip):
794 if address_in_network(parsed.hostname, proxy_ip):
795 return True
796 elif parsed.hostname == proxy_ip:
797 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
798 # matches the IP of the index
799 return True
800 else:
801 host_with_port = parsed.hostname
802 if parsed.port:
803 host_with_port += f":{parsed.port}"
804
805 for host in no_proxy:
806 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
807 # The URL does match something in no_proxy, so we don't want
808 # to apply the proxies on this URL.
809 return True
810
811 with set_environ("no_proxy", no_proxy_arg):
812 # parsed.hostname can be `None` in cases such as a file URI.
813 try:
814 bypass = proxy_bypass(parsed.hostname)
815 except (TypeError, socket.gaierror):
816 bypass = False
817
818 if bypass:
819 return True
820
821 return False
822
823
824 def get_environ_proxies(url, no_proxy=None):
825 """
826 Return a dict of environment proxies.
827
828 :rtype: dict
829 """
830 if should_bypass_proxies(url, no_proxy=no_proxy):
831 return {}
832 else:
833 return getproxies()
834
835
836 def select_proxy(url, proxies):
837 """Select a proxy for the url, if applicable.
838
839 :param url: The url being for the request
840 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
841 """
842 proxies = proxies or {}
843 urlparts = urlparse(url)
844 if urlparts.hostname is None:
845 return proxies.get(urlparts.scheme, proxies.get("all"))
846
847 proxy_keys = [
848 urlparts.scheme + "://" + urlparts.hostname,
849 urlparts.scheme,
850 "all://" + urlparts.hostname,
851 "all",
852 ]
853 proxy = None
854 for proxy_key in proxy_keys:
855 if proxy_key in proxies:
856 proxy = proxies[proxy_key]
857 break
858
859 return proxy
860
861
862 def resolve_proxies(request, proxies, trust_env=True):
863 """This method takes proxy information from a request and configuration
864 input to resolve a mapping of target proxies. This will consider settings
865 such a NO_PROXY to strip proxy configurations.
866
867 :param request: Request or PreparedRequest
868 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
869 :param trust_env: Boolean declaring whether to trust environment configs
870
871 :rtype: dict
872 """
873 proxies = proxies if proxies is not None else {}
874 url = request.url
875 scheme = urlparse(url).scheme
876 no_proxy = proxies.get("no_proxy")
877 new_proxies = proxies.copy()
878
879 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
880 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
881
882 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
883
884 if proxy:
885 new_proxies.setdefault(scheme, proxy)
886 return new_proxies
887
888
889 def default_user_agent(name="python-requests"):
890 """
891 Return a string representing the default user agent.
892
893 :rtype: str
894 """
895 return f"{name}/{__version__}"
896
897
898 def default_headers():
899 """
900 :rtype: requests.structures.CaseInsensitiveDict
901 """
902 return CaseInsensitiveDict(
903 {
904 "User-Agent": default_user_agent(),
905 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
906 "Accept": "*/*",
907 "Connection": "keep-alive",
908 }
909 )
910
911
912 def parse_header_links(value):
913 """Return a list of parsed link headers proxies.
914
915 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
916
917 :rtype: list
918 """
919
920 links = []
921
922 replace_chars = " '\""
923
924 value = value.strip(replace_chars)
925 if not value:
926 return links
927
928 for val in re.split(", *<", value):
929 try:
930 url, params = val.split(";", 1)
931 except ValueError:
932 url, params = val, ""
933
934 link = {"url": url.strip("<> '\"")}
935
936 for param in params.split(";"):
937 try:
938 key, value = param.split("=")
939 except ValueError:
940 break
941
942 link[key.strip(replace_chars)] = value.strip(replace_chars)
943
944 links.append(link)
945
946 return links
947
948
949 # Null bytes; no need to recreate these on each call to guess_json_utf
950 _null = "\x00".encode("ascii") # encoding to ASCII for Python 3
951 _null2 = _null * 2
952 _null3 = _null * 3
953
954
955 def guess_json_utf(data):
956 """
957 :rtype: str
958 """
959 # JSON always starts with two ASCII characters, so detection is as
960 # easy as counting the nulls and from their location and count
961 # determine the encoding. Also detect a BOM, if present.
962 sample = data[:4]
963 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
964 return "utf-32" # BOM included
965 if sample[:3] == codecs.BOM_UTF8:
966 return "utf-8-sig" # BOM included, MS style (discouraged)
967 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
968 return "utf-16" # BOM included
969 nullcount = sample.count(_null)
970 if nullcount == 0:
971 return "utf-8"
972 if nullcount == 2:
973 if sample[::2] == _null2: # 1st and 3rd are null
974 return "utf-16-be"
975 if sample[1::2] == _null2: # 2nd and 4th are null
976 return "utf-16-le"
977 # Did not detect 2 valid UTF-16 ascii-range characters
978 if nullcount == 3:
979 if sample[:3] == _null3:
980 return "utf-32-be"
981 if sample[1:] == _null3:
982 return "utf-32-le"
983 # Did not detect a valid UTF-32 ascii-range character
984 return None
985
986
987 def prepend_scheme_if_needed(url, new_scheme):
988 """Given a URL that may or may not have a scheme, prepend the given scheme.
989 Does not replace a present scheme with the one provided as an argument.
990
991 :rtype: str
992 """
993 parsed = parse_url(url)
994 scheme, auth, host, port, path, query, fragment = parsed
995
996 # A defect in urlparse determines that there isn't a netloc present in some
997 # urls. We previously assumed parsing was overly cautious, and swapped the
998 # netloc and path. Due to a lack of tests on the original defect, this is
999 # maintained with parse_url for backwards compatibility.
1000 netloc = parsed.netloc
1001 if not netloc:
1002 netloc, path = path, netloc
1003
1004 if auth:
1005 # parse_url doesn't provide the netloc with auth
1006 # so we'll add it ourselves.
1007 netloc = "@".join([auth, netloc])
1008 if scheme is None:
1009 scheme = new_scheme
1010 if path is None:
1011 path = ""
1012
1013 return urlunparse((scheme, netloc, path, "", query, fragment))
1014
1015
1016 def get_auth_from_url(url):
1017 """Given a url with authentication components, extract them into a tuple of
1018 username,password.
1019
1020 :rtype: (str,str)
1021 """
1022 parsed = urlparse(url)
1023
1024 try:
1025 auth = (unquote(parsed.username), unquote(parsed.password))
1026 except (AttributeError, TypeError):
1027 auth = ("", "")
1028
1029 return auth
1030
1031
1032 def check_header_validity(header):
1033 """Verifies that header parts don't contain leading whitespace
1034 reserved characters, or return characters.
1035
1036 :param header: tuple, in the format (name, value).
1037 """
1038 name, value = header
1039 _validate_header_part(header, name, 0)
1040 _validate_header_part(header, value, 1)
1041
1042
1043 def _validate_header_part(header, header_part, header_validator_index):
1044 if isinstance(header_part, str):
1045 validator = _HEADER_VALIDATORS_STR[header_validator_index]
1046 elif isinstance(header_part, bytes):
1047 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
1048 else:
1049 raise InvalidHeader(
1050 f"Header part ({header_part!r}) from {header} "
1051 f"must be of type str or bytes, not {type(header_part)}"
1052 )
1053
1054 if not validator.match(header_part):
1055 header_kind = "name" if header_validator_index == 0 else "value"
1056 raise InvalidHeader(
1057 f"Invalid leading whitespace, reserved character(s), or return"
1058 f"character(s) in header {header_kind}: {header_part!r}"
1059 )
1060
1061
1062 def urldefragauth(url):
1063 """
1064 Given a url remove the fragment and the authentication part.
1065
1066 :rtype: str
1067 """
1068 scheme, netloc, path, params, query, fragment = urlparse(url)
1069
1070 # see func:`prepend_scheme_if_needed`
1071 if not netloc:
1072 netloc, path = path, netloc
1073
1074 netloc = netloc.rsplit("@", 1)[-1]
1075
1076 return urlunparse((scheme, netloc, path, params, query, ""))
1077
1078
1079 def rewind_body(prepared_request):
1080 """Move file pointer back to its recorded starting position
1081 so it can be read again on redirect.
1082 """
1083 body_seek = getattr(prepared_request.body, "seek", None)
1084 if body_seek is not None and isinstance(
1085 prepared_request._body_position, integer_types
1086 ):
1087 try:
1088 body_seek(prepared_request._body_position)
1089 except OSError:
1090 raise UnrewindableBodyError(
1091 "An error occurred when rewinding request body for redirect."
1092 )
1093 else:
1094 raise UnrewindableBodyError("Unable to rewind request body for redirect.")