annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/requests/utils.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 """
jpayne@68 2 requests.utils
jpayne@68 3 ~~~~~~~~~~~~~~
jpayne@68 4
jpayne@68 5 This module provides utility functions that are used within Requests
jpayne@68 6 that are also useful for external consumption.
jpayne@68 7 """
jpayne@68 8
jpayne@68 9 import codecs
jpayne@68 10 import contextlib
jpayne@68 11 import io
jpayne@68 12 import os
jpayne@68 13 import re
jpayne@68 14 import socket
jpayne@68 15 import struct
jpayne@68 16 import sys
jpayne@68 17 import tempfile
jpayne@68 18 import warnings
jpayne@68 19 import zipfile
jpayne@68 20 from collections import OrderedDict
jpayne@68 21
jpayne@68 22 from urllib3.util import make_headers, parse_url
jpayne@68 23
jpayne@68 24 from . import certs
jpayne@68 25 from .__version__ import __version__
jpayne@68 26
jpayne@68 27 # to_native_string is unused here, but imported here for backwards compatibility
jpayne@68 28 from ._internal_utils import ( # noqa: F401
jpayne@68 29 _HEADER_VALIDATORS_BYTE,
jpayne@68 30 _HEADER_VALIDATORS_STR,
jpayne@68 31 HEADER_VALIDATORS,
jpayne@68 32 to_native_string,
jpayne@68 33 )
jpayne@68 34 from .compat import (
jpayne@68 35 Mapping,
jpayne@68 36 basestring,
jpayne@68 37 bytes,
jpayne@68 38 getproxies,
jpayne@68 39 getproxies_environment,
jpayne@68 40 integer_types,
jpayne@68 41 )
jpayne@68 42 from .compat import parse_http_list as _parse_list_header
jpayne@68 43 from .compat import (
jpayne@68 44 proxy_bypass,
jpayne@68 45 proxy_bypass_environment,
jpayne@68 46 quote,
jpayne@68 47 str,
jpayne@68 48 unquote,
jpayne@68 49 urlparse,
jpayne@68 50 urlunparse,
jpayne@68 51 )
jpayne@68 52 from .cookies import cookiejar_from_dict
jpayne@68 53 from .exceptions import (
jpayne@68 54 FileModeWarning,
jpayne@68 55 InvalidHeader,
jpayne@68 56 InvalidURL,
jpayne@68 57 UnrewindableBodyError,
jpayne@68 58 )
jpayne@68 59 from .structures import CaseInsensitiveDict
jpayne@68 60
jpayne@68 61 NETRC_FILES = (".netrc", "_netrc")
jpayne@68 62
jpayne@68 63 DEFAULT_CA_BUNDLE_PATH = certs.where()
jpayne@68 64
jpayne@68 65 DEFAULT_PORTS = {"http": 80, "https": 443}
jpayne@68 66
jpayne@68 67 # Ensure that ', ' is used to preserve previous delimiter behavior.
jpayne@68 68 DEFAULT_ACCEPT_ENCODING = ", ".join(
jpayne@68 69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
jpayne@68 70 )
jpayne@68 71
jpayne@68 72
jpayne@68 73 if sys.platform == "win32":
jpayne@68 74 # provide a proxy_bypass version on Windows without DNS lookups
jpayne@68 75
jpayne@68 76 def proxy_bypass_registry(host):
jpayne@68 77 try:
jpayne@68 78 import winreg
jpayne@68 79 except ImportError:
jpayne@68 80 return False
jpayne@68 81
jpayne@68 82 try:
jpayne@68 83 internetSettings = winreg.OpenKey(
jpayne@68 84 winreg.HKEY_CURRENT_USER,
jpayne@68 85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
jpayne@68 86 )
jpayne@68 87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
jpayne@68 88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
jpayne@68 89 # ProxyOverride is almost always a string
jpayne@68 90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
jpayne@68 91 except (OSError, ValueError):
jpayne@68 92 return False
jpayne@68 93 if not proxyEnable or not proxyOverride:
jpayne@68 94 return False
jpayne@68 95
jpayne@68 96 # make a check value list from the registry entry: replace the
jpayne@68 97 # '<local>' string by the localhost entry and the corresponding
jpayne@68 98 # canonical entry.
jpayne@68 99 proxyOverride = proxyOverride.split(";")
jpayne@68 100 # filter out empty strings to avoid re.match return true in the following code.
jpayne@68 101 proxyOverride = filter(None, proxyOverride)
jpayne@68 102 # now check if we match one of the registry values.
jpayne@68 103 for test in proxyOverride:
jpayne@68 104 if test == "<local>":
jpayne@68 105 if "." not in host:
jpayne@68 106 return True
jpayne@68 107 test = test.replace(".", r"\.") # mask dots
jpayne@68 108 test = test.replace("*", r".*") # change glob sequence
jpayne@68 109 test = test.replace("?", r".") # change glob char
jpayne@68 110 if re.match(test, host, re.I):
jpayne@68 111 return True
jpayne@68 112 return False
jpayne@68 113
jpayne@68 114 def proxy_bypass(host): # noqa
jpayne@68 115 """Return True, if the host should be bypassed.
jpayne@68 116
jpayne@68 117 Checks proxy settings gathered from the environment, if specified,
jpayne@68 118 or the registry.
jpayne@68 119 """
jpayne@68 120 if getproxies_environment():
jpayne@68 121 return proxy_bypass_environment(host)
jpayne@68 122 else:
jpayne@68 123 return proxy_bypass_registry(host)
jpayne@68 124
jpayne@68 125
jpayne@68 126 def dict_to_sequence(d):
jpayne@68 127 """Returns an internal sequence dictionary update."""
jpayne@68 128
jpayne@68 129 if hasattr(d, "items"):
jpayne@68 130 d = d.items()
jpayne@68 131
jpayne@68 132 return d
jpayne@68 133
jpayne@68 134
jpayne@68 135 def super_len(o):
jpayne@68 136 total_length = None
jpayne@68 137 current_position = 0
jpayne@68 138
jpayne@68 139 if isinstance(o, str):
jpayne@68 140 o = o.encode("utf-8")
jpayne@68 141
jpayne@68 142 if hasattr(o, "__len__"):
jpayne@68 143 total_length = len(o)
jpayne@68 144
jpayne@68 145 elif hasattr(o, "len"):
jpayne@68 146 total_length = o.len
jpayne@68 147
jpayne@68 148 elif hasattr(o, "fileno"):
jpayne@68 149 try:
jpayne@68 150 fileno = o.fileno()
jpayne@68 151 except (io.UnsupportedOperation, AttributeError):
jpayne@68 152 # AttributeError is a surprising exception, seeing as how we've just checked
jpayne@68 153 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
jpayne@68 154 # `Tarfile.extractfile()`, per issue 5229.
jpayne@68 155 pass
jpayne@68 156 else:
jpayne@68 157 total_length = os.fstat(fileno).st_size
jpayne@68 158
jpayne@68 159 # Having used fstat to determine the file length, we need to
jpayne@68 160 # confirm that this file was opened up in binary mode.
jpayne@68 161 if "b" not in o.mode:
jpayne@68 162 warnings.warn(
jpayne@68 163 (
jpayne@68 164 "Requests has determined the content-length for this "
jpayne@68 165 "request using the binary size of the file: however, the "
jpayne@68 166 "file has been opened in text mode (i.e. without the 'b' "
jpayne@68 167 "flag in the mode). This may lead to an incorrect "
jpayne@68 168 "content-length. In Requests 3.0, support will be removed "
jpayne@68 169 "for files in text mode."
jpayne@68 170 ),
jpayne@68 171 FileModeWarning,
jpayne@68 172 )
jpayne@68 173
jpayne@68 174 if hasattr(o, "tell"):
jpayne@68 175 try:
jpayne@68 176 current_position = o.tell()
jpayne@68 177 except OSError:
jpayne@68 178 # This can happen in some weird situations, such as when the file
jpayne@68 179 # is actually a special file descriptor like stdin. In this
jpayne@68 180 # instance, we don't know what the length is, so set it to zero and
jpayne@68 181 # let requests chunk it instead.
jpayne@68 182 if total_length is not None:
jpayne@68 183 current_position = total_length
jpayne@68 184 else:
jpayne@68 185 if hasattr(o, "seek") and total_length is None:
jpayne@68 186 # StringIO and BytesIO have seek but no usable fileno
jpayne@68 187 try:
jpayne@68 188 # seek to end of file
jpayne@68 189 o.seek(0, 2)
jpayne@68 190 total_length = o.tell()
jpayne@68 191
jpayne@68 192 # seek back to current position to support
jpayne@68 193 # partially read file-like objects
jpayne@68 194 o.seek(current_position or 0)
jpayne@68 195 except OSError:
jpayne@68 196 total_length = 0
jpayne@68 197
jpayne@68 198 if total_length is None:
jpayne@68 199 total_length = 0
jpayne@68 200
jpayne@68 201 return max(0, total_length - current_position)
jpayne@68 202
jpayne@68 203
jpayne@68 204 def get_netrc_auth(url, raise_errors=False):
jpayne@68 205 """Returns the Requests tuple auth for a given url from netrc."""
jpayne@68 206
jpayne@68 207 netrc_file = os.environ.get("NETRC")
jpayne@68 208 if netrc_file is not None:
jpayne@68 209 netrc_locations = (netrc_file,)
jpayne@68 210 else:
jpayne@68 211 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
jpayne@68 212
jpayne@68 213 try:
jpayne@68 214 from netrc import NetrcParseError, netrc
jpayne@68 215
jpayne@68 216 netrc_path = None
jpayne@68 217
jpayne@68 218 for f in netrc_locations:
jpayne@68 219 try:
jpayne@68 220 loc = os.path.expanduser(f)
jpayne@68 221 except KeyError:
jpayne@68 222 # os.path.expanduser can fail when $HOME is undefined and
jpayne@68 223 # getpwuid fails. See https://bugs.python.org/issue20164 &
jpayne@68 224 # https://github.com/psf/requests/issues/1846
jpayne@68 225 return
jpayne@68 226
jpayne@68 227 if os.path.exists(loc):
jpayne@68 228 netrc_path = loc
jpayne@68 229 break
jpayne@68 230
jpayne@68 231 # Abort early if there isn't one.
jpayne@68 232 if netrc_path is None:
jpayne@68 233 return
jpayne@68 234
jpayne@68 235 ri = urlparse(url)
jpayne@68 236
jpayne@68 237 # Strip port numbers from netloc. This weird `if...encode`` dance is
jpayne@68 238 # used for Python 3.2, which doesn't support unicode literals.
jpayne@68 239 splitstr = b":"
jpayne@68 240 if isinstance(url, str):
jpayne@68 241 splitstr = splitstr.decode("ascii")
jpayne@68 242 host = ri.netloc.split(splitstr)[0]
jpayne@68 243
jpayne@68 244 try:
jpayne@68 245 _netrc = netrc(netrc_path).authenticators(host)
jpayne@68 246 if _netrc:
jpayne@68 247 # Return with login / password
jpayne@68 248 login_i = 0 if _netrc[0] else 1
jpayne@68 249 return (_netrc[login_i], _netrc[2])
jpayne@68 250 except (NetrcParseError, OSError):
jpayne@68 251 # If there was a parsing error or a permissions issue reading the file,
jpayne@68 252 # we'll just skip netrc auth unless explicitly asked to raise errors.
jpayne@68 253 if raise_errors:
jpayne@68 254 raise
jpayne@68 255
jpayne@68 256 # App Engine hackiness.
jpayne@68 257 except (ImportError, AttributeError):
jpayne@68 258 pass
jpayne@68 259
jpayne@68 260
jpayne@68 261 def guess_filename(obj):
jpayne@68 262 """Tries to guess the filename of the given object."""
jpayne@68 263 name = getattr(obj, "name", None)
jpayne@68 264 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
jpayne@68 265 return os.path.basename(name)
jpayne@68 266
jpayne@68 267
jpayne@68 268 def extract_zipped_paths(path):
jpayne@68 269 """Replace nonexistent paths that look like they refer to a member of a zip
jpayne@68 270 archive with the location of an extracted copy of the target, or else
jpayne@68 271 just return the provided path unchanged.
jpayne@68 272 """
jpayne@68 273 if os.path.exists(path):
jpayne@68 274 # this is already a valid path, no need to do anything further
jpayne@68 275 return path
jpayne@68 276
jpayne@68 277 # find the first valid part of the provided path and treat that as a zip archive
jpayne@68 278 # assume the rest of the path is the name of a member in the archive
jpayne@68 279 archive, member = os.path.split(path)
jpayne@68 280 while archive and not os.path.exists(archive):
jpayne@68 281 archive, prefix = os.path.split(archive)
jpayne@68 282 if not prefix:
jpayne@68 283 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
jpayne@68 284 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
jpayne@68 285 break
jpayne@68 286 member = "/".join([prefix, member])
jpayne@68 287
jpayne@68 288 if not zipfile.is_zipfile(archive):
jpayne@68 289 return path
jpayne@68 290
jpayne@68 291 zip_file = zipfile.ZipFile(archive)
jpayne@68 292 if member not in zip_file.namelist():
jpayne@68 293 return path
jpayne@68 294
jpayne@68 295 # we have a valid zip archive and a valid member of that archive
jpayne@68 296 tmp = tempfile.gettempdir()
jpayne@68 297 extracted_path = os.path.join(tmp, member.split("/")[-1])
jpayne@68 298 if not os.path.exists(extracted_path):
jpayne@68 299 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
jpayne@68 300 with atomic_open(extracted_path) as file_handler:
jpayne@68 301 file_handler.write(zip_file.read(member))
jpayne@68 302 return extracted_path
jpayne@68 303
jpayne@68 304
jpayne@68 305 @contextlib.contextmanager
jpayne@68 306 def atomic_open(filename):
jpayne@68 307 """Write a file to the disk in an atomic fashion"""
jpayne@68 308 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
jpayne@68 309 try:
jpayne@68 310 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
jpayne@68 311 yield tmp_handler
jpayne@68 312 os.replace(tmp_name, filename)
jpayne@68 313 except BaseException:
jpayne@68 314 os.remove(tmp_name)
jpayne@68 315 raise
jpayne@68 316
jpayne@68 317
jpayne@68 318 def from_key_val_list(value):
jpayne@68 319 """Take an object and test to see if it can be represented as a
jpayne@68 320 dictionary. Unless it can not be represented as such, return an
jpayne@68 321 OrderedDict, e.g.,
jpayne@68 322
jpayne@68 323 ::
jpayne@68 324
jpayne@68 325 >>> from_key_val_list([('key', 'val')])
jpayne@68 326 OrderedDict([('key', 'val')])
jpayne@68 327 >>> from_key_val_list('string')
jpayne@68 328 Traceback (most recent call last):
jpayne@68 329 ...
jpayne@68 330 ValueError: cannot encode objects that are not 2-tuples
jpayne@68 331 >>> from_key_val_list({'key': 'val'})
jpayne@68 332 OrderedDict([('key', 'val')])
jpayne@68 333
jpayne@68 334 :rtype: OrderedDict
jpayne@68 335 """
jpayne@68 336 if value is None:
jpayne@68 337 return None
jpayne@68 338
jpayne@68 339 if isinstance(value, (str, bytes, bool, int)):
jpayne@68 340 raise ValueError("cannot encode objects that are not 2-tuples")
jpayne@68 341
jpayne@68 342 return OrderedDict(value)
jpayne@68 343
jpayne@68 344
jpayne@68 345 def to_key_val_list(value):
jpayne@68 346 """Take an object and test to see if it can be represented as a
jpayne@68 347 dictionary. If it can be, return a list of tuples, e.g.,
jpayne@68 348
jpayne@68 349 ::
jpayne@68 350
jpayne@68 351 >>> to_key_val_list([('key', 'val')])
jpayne@68 352 [('key', 'val')]
jpayne@68 353 >>> to_key_val_list({'key': 'val'})
jpayne@68 354 [('key', 'val')]
jpayne@68 355 >>> to_key_val_list('string')
jpayne@68 356 Traceback (most recent call last):
jpayne@68 357 ...
jpayne@68 358 ValueError: cannot encode objects that are not 2-tuples
jpayne@68 359
jpayne@68 360 :rtype: list
jpayne@68 361 """
jpayne@68 362 if value is None:
jpayne@68 363 return None
jpayne@68 364
jpayne@68 365 if isinstance(value, (str, bytes, bool, int)):
jpayne@68 366 raise ValueError("cannot encode objects that are not 2-tuples")
jpayne@68 367
jpayne@68 368 if isinstance(value, Mapping):
jpayne@68 369 value = value.items()
jpayne@68 370
jpayne@68 371 return list(value)
jpayne@68 372
jpayne@68 373
jpayne@68 374 # From mitsuhiko/werkzeug (used with permission).
jpayne@68 375 def parse_list_header(value):
jpayne@68 376 """Parse lists as described by RFC 2068 Section 2.
jpayne@68 377
jpayne@68 378 In particular, parse comma-separated lists where the elements of
jpayne@68 379 the list may include quoted-strings. A quoted-string could
jpayne@68 380 contain a comma. A non-quoted string could have quotes in the
jpayne@68 381 middle. Quotes are removed automatically after parsing.
jpayne@68 382
jpayne@68 383 It basically works like :func:`parse_set_header` just that items
jpayne@68 384 may appear multiple times and case sensitivity is preserved.
jpayne@68 385
jpayne@68 386 The return value is a standard :class:`list`:
jpayne@68 387
jpayne@68 388 >>> parse_list_header('token, "quoted value"')
jpayne@68 389 ['token', 'quoted value']
jpayne@68 390
jpayne@68 391 To create a header from the :class:`list` again, use the
jpayne@68 392 :func:`dump_header` function.
jpayne@68 393
jpayne@68 394 :param value: a string with a list header.
jpayne@68 395 :return: :class:`list`
jpayne@68 396 :rtype: list
jpayne@68 397 """
jpayne@68 398 result = []
jpayne@68 399 for item in _parse_list_header(value):
jpayne@68 400 if item[:1] == item[-1:] == '"':
jpayne@68 401 item = unquote_header_value(item[1:-1])
jpayne@68 402 result.append(item)
jpayne@68 403 return result
jpayne@68 404
jpayne@68 405
jpayne@68 406 # From mitsuhiko/werkzeug (used with permission).
jpayne@68 407 def parse_dict_header(value):
jpayne@68 408 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
jpayne@68 409 convert them into a python dict:
jpayne@68 410
jpayne@68 411 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
jpayne@68 412 >>> type(d) is dict
jpayne@68 413 True
jpayne@68 414 >>> sorted(d.items())
jpayne@68 415 [('bar', 'as well'), ('foo', 'is a fish')]
jpayne@68 416
jpayne@68 417 If there is no value for a key it will be `None`:
jpayne@68 418
jpayne@68 419 >>> parse_dict_header('key_without_value')
jpayne@68 420 {'key_without_value': None}
jpayne@68 421
jpayne@68 422 To create a header from the :class:`dict` again, use the
jpayne@68 423 :func:`dump_header` function.
jpayne@68 424
jpayne@68 425 :param value: a string with a dict header.
jpayne@68 426 :return: :class:`dict`
jpayne@68 427 :rtype: dict
jpayne@68 428 """
jpayne@68 429 result = {}
jpayne@68 430 for item in _parse_list_header(value):
jpayne@68 431 if "=" not in item:
jpayne@68 432 result[item] = None
jpayne@68 433 continue
jpayne@68 434 name, value = item.split("=", 1)
jpayne@68 435 if value[:1] == value[-1:] == '"':
jpayne@68 436 value = unquote_header_value(value[1:-1])
jpayne@68 437 result[name] = value
jpayne@68 438 return result
jpayne@68 439
jpayne@68 440
jpayne@68 441 # From mitsuhiko/werkzeug (used with permission).
jpayne@68 442 def unquote_header_value(value, is_filename=False):
jpayne@68 443 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
jpayne@68 444 This does not use the real unquoting but what browsers are actually
jpayne@68 445 using for quoting.
jpayne@68 446
jpayne@68 447 :param value: the header value to unquote.
jpayne@68 448 :rtype: str
jpayne@68 449 """
jpayne@68 450 if value and value[0] == value[-1] == '"':
jpayne@68 451 # this is not the real unquoting, but fixing this so that the
jpayne@68 452 # RFC is met will result in bugs with internet explorer and
jpayne@68 453 # probably some other browsers as well. IE for example is
jpayne@68 454 # uploading files with "C:\foo\bar.txt" as filename
jpayne@68 455 value = value[1:-1]
jpayne@68 456
jpayne@68 457 # if this is a filename and the starting characters look like
jpayne@68 458 # a UNC path, then just return the value without quotes. Using the
jpayne@68 459 # replace sequence below on a UNC path has the effect of turning
jpayne@68 460 # the leading double slash into a single slash and then
jpayne@68 461 # _fix_ie_filename() doesn't work correctly. See #458.
jpayne@68 462 if not is_filename or value[:2] != "\\\\":
jpayne@68 463 return value.replace("\\\\", "\\").replace('\\"', '"')
jpayne@68 464 return value
jpayne@68 465
jpayne@68 466
jpayne@68 467 def dict_from_cookiejar(cj):
jpayne@68 468 """Returns a key/value dictionary from a CookieJar.
jpayne@68 469
jpayne@68 470 :param cj: CookieJar object to extract cookies from.
jpayne@68 471 :rtype: dict
jpayne@68 472 """
jpayne@68 473
jpayne@68 474 cookie_dict = {cookie.name: cookie.value for cookie in cj}
jpayne@68 475 return cookie_dict
jpayne@68 476
jpayne@68 477
jpayne@68 478 def add_dict_to_cookiejar(cj, cookie_dict):
jpayne@68 479 """Returns a CookieJar from a key/value dictionary.
jpayne@68 480
jpayne@68 481 :param cj: CookieJar to insert cookies into.
jpayne@68 482 :param cookie_dict: Dict of key/values to insert into CookieJar.
jpayne@68 483 :rtype: CookieJar
jpayne@68 484 """
jpayne@68 485
jpayne@68 486 return cookiejar_from_dict(cookie_dict, cj)
jpayne@68 487
jpayne@68 488
jpayne@68 489 def get_encodings_from_content(content):
jpayne@68 490 """Returns encodings from given content string.
jpayne@68 491
jpayne@68 492 :param content: bytestring to extract encodings from.
jpayne@68 493 """
jpayne@68 494 warnings.warn(
jpayne@68 495 (
jpayne@68 496 "In requests 3.0, get_encodings_from_content will be removed. For "
jpayne@68 497 "more information, please see the discussion on issue #2266. (This"
jpayne@68 498 " warning should only appear once.)"
jpayne@68 499 ),
jpayne@68 500 DeprecationWarning,
jpayne@68 501 )
jpayne@68 502
jpayne@68 503 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
jpayne@68 504 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
jpayne@68 505 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
jpayne@68 506
jpayne@68 507 return (
jpayne@68 508 charset_re.findall(content)
jpayne@68 509 + pragma_re.findall(content)
jpayne@68 510 + xml_re.findall(content)
jpayne@68 511 )
jpayne@68 512
jpayne@68 513
jpayne@68 514 def _parse_content_type_header(header):
jpayne@68 515 """Returns content type and parameters from given header
jpayne@68 516
jpayne@68 517 :param header: string
jpayne@68 518 :return: tuple containing content type and dictionary of
jpayne@68 519 parameters
jpayne@68 520 """
jpayne@68 521
jpayne@68 522 tokens = header.split(";")
jpayne@68 523 content_type, params = tokens[0].strip(), tokens[1:]
jpayne@68 524 params_dict = {}
jpayne@68 525 items_to_strip = "\"' "
jpayne@68 526
jpayne@68 527 for param in params:
jpayne@68 528 param = param.strip()
jpayne@68 529 if param:
jpayne@68 530 key, value = param, True
jpayne@68 531 index_of_equals = param.find("=")
jpayne@68 532 if index_of_equals != -1:
jpayne@68 533 key = param[:index_of_equals].strip(items_to_strip)
jpayne@68 534 value = param[index_of_equals + 1 :].strip(items_to_strip)
jpayne@68 535 params_dict[key.lower()] = value
jpayne@68 536 return content_type, params_dict
jpayne@68 537
jpayne@68 538
jpayne@68 539 def get_encoding_from_headers(headers):
jpayne@68 540 """Returns encodings from given HTTP Header Dict.
jpayne@68 541
jpayne@68 542 :param headers: dictionary to extract encoding from.
jpayne@68 543 :rtype: str
jpayne@68 544 """
jpayne@68 545
jpayne@68 546 content_type = headers.get("content-type")
jpayne@68 547
jpayne@68 548 if not content_type:
jpayne@68 549 return None
jpayne@68 550
jpayne@68 551 content_type, params = _parse_content_type_header(content_type)
jpayne@68 552
jpayne@68 553 if "charset" in params:
jpayne@68 554 return params["charset"].strip("'\"")
jpayne@68 555
jpayne@68 556 if "text" in content_type:
jpayne@68 557 return "ISO-8859-1"
jpayne@68 558
jpayne@68 559 if "application/json" in content_type:
jpayne@68 560 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
jpayne@68 561 return "utf-8"
jpayne@68 562
jpayne@68 563
jpayne@68 564 def stream_decode_response_unicode(iterator, r):
jpayne@68 565 """Stream decodes an iterator."""
jpayne@68 566
jpayne@68 567 if r.encoding is None:
jpayne@68 568 yield from iterator
jpayne@68 569 return
jpayne@68 570
jpayne@68 571 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
jpayne@68 572 for chunk in iterator:
jpayne@68 573 rv = decoder.decode(chunk)
jpayne@68 574 if rv:
jpayne@68 575 yield rv
jpayne@68 576 rv = decoder.decode(b"", final=True)
jpayne@68 577 if rv:
jpayne@68 578 yield rv
jpayne@68 579
jpayne@68 580
jpayne@68 581 def iter_slices(string, slice_length):
jpayne@68 582 """Iterate over slices of a string."""
jpayne@68 583 pos = 0
jpayne@68 584 if slice_length is None or slice_length <= 0:
jpayne@68 585 slice_length = len(string)
jpayne@68 586 while pos < len(string):
jpayne@68 587 yield string[pos : pos + slice_length]
jpayne@68 588 pos += slice_length
jpayne@68 589
jpayne@68 590
jpayne@68 591 def get_unicode_from_response(r):
jpayne@68 592 """Returns the requested content back in unicode.
jpayne@68 593
jpayne@68 594 :param r: Response object to get unicode content from.
jpayne@68 595
jpayne@68 596 Tried:
jpayne@68 597
jpayne@68 598 1. charset from content-type
jpayne@68 599 2. fall back and replace all unicode characters
jpayne@68 600
jpayne@68 601 :rtype: str
jpayne@68 602 """
jpayne@68 603 warnings.warn(
jpayne@68 604 (
jpayne@68 605 "In requests 3.0, get_unicode_from_response will be removed. For "
jpayne@68 606 "more information, please see the discussion on issue #2266. (This"
jpayne@68 607 " warning should only appear once.)"
jpayne@68 608 ),
jpayne@68 609 DeprecationWarning,
jpayne@68 610 )
jpayne@68 611
jpayne@68 612 tried_encodings = []
jpayne@68 613
jpayne@68 614 # Try charset from content-type
jpayne@68 615 encoding = get_encoding_from_headers(r.headers)
jpayne@68 616
jpayne@68 617 if encoding:
jpayne@68 618 try:
jpayne@68 619 return str(r.content, encoding)
jpayne@68 620 except UnicodeError:
jpayne@68 621 tried_encodings.append(encoding)
jpayne@68 622
jpayne@68 623 # Fall back:
jpayne@68 624 try:
jpayne@68 625 return str(r.content, encoding, errors="replace")
jpayne@68 626 except TypeError:
jpayne@68 627 return r.content
jpayne@68 628
jpayne@68 629
jpayne@68 630 # The unreserved URI characters (RFC 3986)
jpayne@68 631 UNRESERVED_SET = frozenset(
jpayne@68 632 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
jpayne@68 633 )
jpayne@68 634
jpayne@68 635
jpayne@68 636 def unquote_unreserved(uri):
jpayne@68 637 """Un-escape any percent-escape sequences in a URI that are unreserved
jpayne@68 638 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
jpayne@68 639
jpayne@68 640 :rtype: str
jpayne@68 641 """
jpayne@68 642 parts = uri.split("%")
jpayne@68 643 for i in range(1, len(parts)):
jpayne@68 644 h = parts[i][0:2]
jpayne@68 645 if len(h) == 2 and h.isalnum():
jpayne@68 646 try:
jpayne@68 647 c = chr(int(h, 16))
jpayne@68 648 except ValueError:
jpayne@68 649 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
jpayne@68 650
jpayne@68 651 if c in UNRESERVED_SET:
jpayne@68 652 parts[i] = c + parts[i][2:]
jpayne@68 653 else:
jpayne@68 654 parts[i] = f"%{parts[i]}"
jpayne@68 655 else:
jpayne@68 656 parts[i] = f"%{parts[i]}"
jpayne@68 657 return "".join(parts)
jpayne@68 658
jpayne@68 659
jpayne@68 660 def requote_uri(uri):
jpayne@68 661 """Re-quote the given URI.
jpayne@68 662
jpayne@68 663 This function passes the given URI through an unquote/quote cycle to
jpayne@68 664 ensure that it is fully and consistently quoted.
jpayne@68 665
jpayne@68 666 :rtype: str
jpayne@68 667 """
jpayne@68 668 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
jpayne@68 669 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
jpayne@68 670 try:
jpayne@68 671 # Unquote only the unreserved characters
jpayne@68 672 # Then quote only illegal characters (do not quote reserved,
jpayne@68 673 # unreserved, or '%')
jpayne@68 674 return quote(unquote_unreserved(uri), safe=safe_with_percent)
jpayne@68 675 except InvalidURL:
jpayne@68 676 # We couldn't unquote the given URI, so let's try quoting it, but
jpayne@68 677 # there may be unquoted '%'s in the URI. We need to make sure they're
jpayne@68 678 # properly quoted so they do not cause issues elsewhere.
jpayne@68 679 return quote(uri, safe=safe_without_percent)
jpayne@68 680
jpayne@68 681
jpayne@68 682 def address_in_network(ip, net):
jpayne@68 683 """This function allows you to check if an IP belongs to a network subnet
jpayne@68 684
jpayne@68 685 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
jpayne@68 686 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
jpayne@68 687
jpayne@68 688 :rtype: bool
jpayne@68 689 """
jpayne@68 690 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
jpayne@68 691 netaddr, bits = net.split("/")
jpayne@68 692 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
jpayne@68 693 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
jpayne@68 694 return (ipaddr & netmask) == (network & netmask)
jpayne@68 695
jpayne@68 696
jpayne@68 697 def dotted_netmask(mask):
jpayne@68 698 """Converts mask from /xx format to xxx.xxx.xxx.xxx
jpayne@68 699
jpayne@68 700 Example: if mask is 24 function returns 255.255.255.0
jpayne@68 701
jpayne@68 702 :rtype: str
jpayne@68 703 """
jpayne@68 704 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
jpayne@68 705 return socket.inet_ntoa(struct.pack(">I", bits))
jpayne@68 706
jpayne@68 707
jpayne@68 708 def is_ipv4_address(string_ip):
jpayne@68 709 """
jpayne@68 710 :rtype: bool
jpayne@68 711 """
jpayne@68 712 try:
jpayne@68 713 socket.inet_aton(string_ip)
jpayne@68 714 except OSError:
jpayne@68 715 return False
jpayne@68 716 return True
jpayne@68 717
jpayne@68 718
jpayne@68 719 def is_valid_cidr(string_network):
jpayne@68 720 """
jpayne@68 721 Very simple check of the cidr format in no_proxy variable.
jpayne@68 722
jpayne@68 723 :rtype: bool
jpayne@68 724 """
jpayne@68 725 if string_network.count("/") == 1:
jpayne@68 726 try:
jpayne@68 727 mask = int(string_network.split("/")[1])
jpayne@68 728 except ValueError:
jpayne@68 729 return False
jpayne@68 730
jpayne@68 731 if mask < 1 or mask > 32:
jpayne@68 732 return False
jpayne@68 733
jpayne@68 734 try:
jpayne@68 735 socket.inet_aton(string_network.split("/")[0])
jpayne@68 736 except OSError:
jpayne@68 737 return False
jpayne@68 738 else:
jpayne@68 739 return False
jpayne@68 740 return True
jpayne@68 741
jpayne@68 742
jpayne@68 743 @contextlib.contextmanager
jpayne@68 744 def set_environ(env_name, value):
jpayne@68 745 """Set the environment variable 'env_name' to 'value'
jpayne@68 746
jpayne@68 747 Save previous value, yield, and then restore the previous value stored in
jpayne@68 748 the environment variable 'env_name'.
jpayne@68 749
jpayne@68 750 If 'value' is None, do nothing"""
jpayne@68 751 value_changed = value is not None
jpayne@68 752 if value_changed:
jpayne@68 753 old_value = os.environ.get(env_name)
jpayne@68 754 os.environ[env_name] = value
jpayne@68 755 try:
jpayne@68 756 yield
jpayne@68 757 finally:
jpayne@68 758 if value_changed:
jpayne@68 759 if old_value is None:
jpayne@68 760 del os.environ[env_name]
jpayne@68 761 else:
jpayne@68 762 os.environ[env_name] = old_value
jpayne@68 763
jpayne@68 764
jpayne@68 765 def should_bypass_proxies(url, no_proxy):
jpayne@68 766 """
jpayne@68 767 Returns whether we should bypass proxies or not.
jpayne@68 768
jpayne@68 769 :rtype: bool
jpayne@68 770 """
jpayne@68 771
jpayne@68 772 # Prioritize lowercase environment variables over uppercase
jpayne@68 773 # to keep a consistent behaviour with other http projects (curl, wget).
jpayne@68 774 def get_proxy(key):
jpayne@68 775 return os.environ.get(key) or os.environ.get(key.upper())
jpayne@68 776
jpayne@68 777 # First check whether no_proxy is defined. If it is, check that the URL
jpayne@68 778 # we're getting isn't in the no_proxy list.
jpayne@68 779 no_proxy_arg = no_proxy
jpayne@68 780 if no_proxy is None:
jpayne@68 781 no_proxy = get_proxy("no_proxy")
jpayne@68 782 parsed = urlparse(url)
jpayne@68 783
jpayne@68 784 if parsed.hostname is None:
jpayne@68 785 # URLs don't always have hostnames, e.g. file:/// urls.
jpayne@68 786 return True
jpayne@68 787
jpayne@68 788 if no_proxy:
jpayne@68 789 # We need to check whether we match here. We need to see if we match
jpayne@68 790 # the end of the hostname, both with and without the port.
jpayne@68 791 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
jpayne@68 792
jpayne@68 793 if is_ipv4_address(parsed.hostname):
jpayne@68 794 for proxy_ip in no_proxy:
jpayne@68 795 if is_valid_cidr(proxy_ip):
jpayne@68 796 if address_in_network(parsed.hostname, proxy_ip):
jpayne@68 797 return True
jpayne@68 798 elif parsed.hostname == proxy_ip:
jpayne@68 799 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
jpayne@68 800 # matches the IP of the index
jpayne@68 801 return True
jpayne@68 802 else:
jpayne@68 803 host_with_port = parsed.hostname
jpayne@68 804 if parsed.port:
jpayne@68 805 host_with_port += f":{parsed.port}"
jpayne@68 806
jpayne@68 807 for host in no_proxy:
jpayne@68 808 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
jpayne@68 809 # The URL does match something in no_proxy, so we don't want
jpayne@68 810 # to apply the proxies on this URL.
jpayne@68 811 return True
jpayne@68 812
jpayne@68 813 with set_environ("no_proxy", no_proxy_arg):
jpayne@68 814 # parsed.hostname can be `None` in cases such as a file URI.
jpayne@68 815 try:
jpayne@68 816 bypass = proxy_bypass(parsed.hostname)
jpayne@68 817 except (TypeError, socket.gaierror):
jpayne@68 818 bypass = False
jpayne@68 819
jpayne@68 820 if bypass:
jpayne@68 821 return True
jpayne@68 822
jpayne@68 823 return False
jpayne@68 824
jpayne@68 825
jpayne@68 826 def get_environ_proxies(url, no_proxy=None):
jpayne@68 827 """
jpayne@68 828 Return a dict of environment proxies.
jpayne@68 829
jpayne@68 830 :rtype: dict
jpayne@68 831 """
jpayne@68 832 if should_bypass_proxies(url, no_proxy=no_proxy):
jpayne@68 833 return {}
jpayne@68 834 else:
jpayne@68 835 return getproxies()
jpayne@68 836
jpayne@68 837
jpayne@68 838 def select_proxy(url, proxies):
jpayne@68 839 """Select a proxy for the url, if applicable.
jpayne@68 840
jpayne@68 841 :param url: The url being for the request
jpayne@68 842 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
jpayne@68 843 """
jpayne@68 844 proxies = proxies or {}
jpayne@68 845 urlparts = urlparse(url)
jpayne@68 846 if urlparts.hostname is None:
jpayne@68 847 return proxies.get(urlparts.scheme, proxies.get("all"))
jpayne@68 848
jpayne@68 849 proxy_keys = [
jpayne@68 850 urlparts.scheme + "://" + urlparts.hostname,
jpayne@68 851 urlparts.scheme,
jpayne@68 852 "all://" + urlparts.hostname,
jpayne@68 853 "all",
jpayne@68 854 ]
jpayne@68 855 proxy = None
jpayne@68 856 for proxy_key in proxy_keys:
jpayne@68 857 if proxy_key in proxies:
jpayne@68 858 proxy = proxies[proxy_key]
jpayne@68 859 break
jpayne@68 860
jpayne@68 861 return proxy
jpayne@68 862
jpayne@68 863
jpayne@68 864 def resolve_proxies(request, proxies, trust_env=True):
jpayne@68 865 """This method takes proxy information from a request and configuration
jpayne@68 866 input to resolve a mapping of target proxies. This will consider settings
jpayne@68 867 such as NO_PROXY to strip proxy configurations.
jpayne@68 868
jpayne@68 869 :param request: Request or PreparedRequest
jpayne@68 870 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
jpayne@68 871 :param trust_env: Boolean declaring whether to trust environment configs
jpayne@68 872
jpayne@68 873 :rtype: dict
jpayne@68 874 """
jpayne@68 875 proxies = proxies if proxies is not None else {}
jpayne@68 876 url = request.url
jpayne@68 877 scheme = urlparse(url).scheme
jpayne@68 878 no_proxy = proxies.get("no_proxy")
jpayne@68 879 new_proxies = proxies.copy()
jpayne@68 880
jpayne@68 881 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
jpayne@68 882 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
jpayne@68 883
jpayne@68 884 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
jpayne@68 885
jpayne@68 886 if proxy:
jpayne@68 887 new_proxies.setdefault(scheme, proxy)
jpayne@68 888 return new_proxies
jpayne@68 889
jpayne@68 890
jpayne@68 891 def default_user_agent(name="python-requests"):
jpayne@68 892 """
jpayne@68 893 Return a string representing the default user agent.
jpayne@68 894
jpayne@68 895 :rtype: str
jpayne@68 896 """
jpayne@68 897 return f"{name}/{__version__}"
jpayne@68 898
jpayne@68 899
jpayne@68 900 def default_headers():
jpayne@68 901 """
jpayne@68 902 :rtype: requests.structures.CaseInsensitiveDict
jpayne@68 903 """
jpayne@68 904 return CaseInsensitiveDict(
jpayne@68 905 {
jpayne@68 906 "User-Agent": default_user_agent(),
jpayne@68 907 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
jpayne@68 908 "Accept": "*/*",
jpayne@68 909 "Connection": "keep-alive",
jpayne@68 910 }
jpayne@68 911 )
jpayne@68 912
jpayne@68 913
jpayne@68 914 def parse_header_links(value):
jpayne@68 915 """Return a list of parsed link headers proxies.
jpayne@68 916
jpayne@68 917 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
jpayne@68 918
jpayne@68 919 :rtype: list
jpayne@68 920 """
jpayne@68 921
jpayne@68 922 links = []
jpayne@68 923
jpayne@68 924 replace_chars = " '\""
jpayne@68 925
jpayne@68 926 value = value.strip(replace_chars)
jpayne@68 927 if not value:
jpayne@68 928 return links
jpayne@68 929
jpayne@68 930 for val in re.split(", *<", value):
jpayne@68 931 try:
jpayne@68 932 url, params = val.split(";", 1)
jpayne@68 933 except ValueError:
jpayne@68 934 url, params = val, ""
jpayne@68 935
jpayne@68 936 link = {"url": url.strip("<> '\"")}
jpayne@68 937
jpayne@68 938 for param in params.split(";"):
jpayne@68 939 try:
jpayne@68 940 key, value = param.split("=")
jpayne@68 941 except ValueError:
jpayne@68 942 break
jpayne@68 943
jpayne@68 944 link[key.strip(replace_chars)] = value.strip(replace_chars)
jpayne@68 945
jpayne@68 946 links.append(link)
jpayne@68 947
jpayne@68 948 return links
jpayne@68 949
jpayne@68 950
jpayne@68 951 # Null bytes; no need to recreate these on each call to guess_json_utf
jpayne@68 952 _null = "\x00".encode("ascii") # encoding to ASCII for Python 3
jpayne@68 953 _null2 = _null * 2
jpayne@68 954 _null3 = _null * 3
jpayne@68 955
jpayne@68 956
jpayne@68 957 def guess_json_utf(data):
jpayne@68 958 """
jpayne@68 959 :rtype: str
jpayne@68 960 """
jpayne@68 961 # JSON always starts with two ASCII characters, so detection is as
jpayne@68 962 # easy as counting the nulls and from their location and count
jpayne@68 963 # determine the encoding. Also detect a BOM, if present.
jpayne@68 964 sample = data[:4]
jpayne@68 965 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
jpayne@68 966 return "utf-32" # BOM included
jpayne@68 967 if sample[:3] == codecs.BOM_UTF8:
jpayne@68 968 return "utf-8-sig" # BOM included, MS style (discouraged)
jpayne@68 969 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
jpayne@68 970 return "utf-16" # BOM included
jpayne@68 971 nullcount = sample.count(_null)
jpayne@68 972 if nullcount == 0:
jpayne@68 973 return "utf-8"
jpayne@68 974 if nullcount == 2:
jpayne@68 975 if sample[::2] == _null2: # 1st and 3rd are null
jpayne@68 976 return "utf-16-be"
jpayne@68 977 if sample[1::2] == _null2: # 2nd and 4th are null
jpayne@68 978 return "utf-16-le"
jpayne@68 979 # Did not detect 2 valid UTF-16 ascii-range characters
jpayne@68 980 if nullcount == 3:
jpayne@68 981 if sample[:3] == _null3:
jpayne@68 982 return "utf-32-be"
jpayne@68 983 if sample[1:] == _null3:
jpayne@68 984 return "utf-32-le"
jpayne@68 985 # Did not detect a valid UTF-32 ascii-range character
jpayne@68 986 return None
jpayne@68 987
jpayne@68 988
jpayne@68 989 def prepend_scheme_if_needed(url, new_scheme):
jpayne@68 990 """Given a URL that may or may not have a scheme, prepend the given scheme.
jpayne@68 991 Does not replace a present scheme with the one provided as an argument.
jpayne@68 992
jpayne@68 993 :rtype: str
jpayne@68 994 """
jpayne@68 995 parsed = parse_url(url)
jpayne@68 996 scheme, auth, host, port, path, query, fragment = parsed
jpayne@68 997
jpayne@68 998 # A defect in urlparse determines that there isn't a netloc present in some
jpayne@68 999 # urls. We previously assumed parsing was overly cautious, and swapped the
jpayne@68 1000 # netloc and path. Due to a lack of tests on the original defect, this is
jpayne@68 1001 # maintained with parse_url for backwards compatibility.
jpayne@68 1002 netloc = parsed.netloc
jpayne@68 1003 if not netloc:
jpayne@68 1004 netloc, path = path, netloc
jpayne@68 1005
jpayne@68 1006 if auth:
jpayne@68 1007 # parse_url doesn't provide the netloc with auth
jpayne@68 1008 # so we'll add it ourselves.
jpayne@68 1009 netloc = "@".join([auth, netloc])
jpayne@68 1010 if scheme is None:
jpayne@68 1011 scheme = new_scheme
jpayne@68 1012 if path is None:
jpayne@68 1013 path = ""
jpayne@68 1014
jpayne@68 1015 return urlunparse((scheme, netloc, path, "", query, fragment))
jpayne@68 1016
jpayne@68 1017
jpayne@68 1018 def get_auth_from_url(url):
jpayne@68 1019 """Given a url with authentication components, extract them into a tuple of
jpayne@68 1020 username,password.
jpayne@68 1021
jpayne@68 1022 :rtype: (str,str)
jpayne@68 1023 """
jpayne@68 1024 parsed = urlparse(url)
jpayne@68 1025
jpayne@68 1026 try:
jpayne@68 1027 auth = (unquote(parsed.username), unquote(parsed.password))
jpayne@68 1028 except (AttributeError, TypeError):
jpayne@68 1029 auth = ("", "")
jpayne@68 1030
jpayne@68 1031 return auth
jpayne@68 1032
jpayne@68 1033
jpayne@68 1034 def check_header_validity(header):
jpayne@68 1035 """Verifies that header parts don't contain leading whitespace
jpayne@68 1036 reserved characters, or return characters.
jpayne@68 1037
jpayne@68 1038 :param header: tuple, in the format (name, value).
jpayne@68 1039 """
jpayne@68 1040 name, value = header
jpayne@68 1041 _validate_header_part(header, name, 0)
jpayne@68 1042 _validate_header_part(header, value, 1)
jpayne@68 1043
jpayne@68 1044
jpayne@68 1045 def _validate_header_part(header, header_part, header_validator_index):
jpayne@68 1046 if isinstance(header_part, str):
jpayne@68 1047 validator = _HEADER_VALIDATORS_STR[header_validator_index]
jpayne@68 1048 elif isinstance(header_part, bytes):
jpayne@68 1049 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
jpayne@68 1050 else:
jpayne@68 1051 raise InvalidHeader(
jpayne@68 1052 f"Header part ({header_part!r}) from {header} "
jpayne@68 1053 f"must be of type str or bytes, not {type(header_part)}"
jpayne@68 1054 )
jpayne@68 1055
jpayne@68 1056 if not validator.match(header_part):
jpayne@68 1057 header_kind = "name" if header_validator_index == 0 else "value"
jpayne@68 1058 raise InvalidHeader(
jpayne@68 1059 f"Invalid leading whitespace, reserved character(s), or return "
jpayne@68 1060 f"character(s) in header {header_kind}: {header_part!r}"
jpayne@68 1061 )
jpayne@68 1062
jpayne@68 1063
jpayne@68 1064 def urldefragauth(url):
jpayne@68 1065 """
jpayne@68 1066 Given a url remove the fragment and the authentication part.
jpayne@68 1067
jpayne@68 1068 :rtype: str
jpayne@68 1069 """
jpayne@68 1070 scheme, netloc, path, params, query, fragment = urlparse(url)
jpayne@68 1071
jpayne@68 1072 # see func:`prepend_scheme_if_needed`
jpayne@68 1073 if not netloc:
jpayne@68 1074 netloc, path = path, netloc
jpayne@68 1075
jpayne@68 1076 netloc = netloc.rsplit("@", 1)[-1]
jpayne@68 1077
jpayne@68 1078 return urlunparse((scheme, netloc, path, params, query, ""))
jpayne@68 1079
jpayne@68 1080
jpayne@68 1081 def rewind_body(prepared_request):
jpayne@68 1082 """Move file pointer back to its recorded starting position
jpayne@68 1083 so it can be read again on redirect.
jpayne@68 1084 """
jpayne@68 1085 body_seek = getattr(prepared_request.body, "seek", None)
jpayne@68 1086 if body_seek is not None and isinstance(
jpayne@68 1087 prepared_request._body_position, integer_types
jpayne@68 1088 ):
jpayne@68 1089 try:
jpayne@68 1090 body_seek(prepared_request._body_position)
jpayne@68 1091 except OSError:
jpayne@68 1092 raise UnrewindableBodyError(
jpayne@68 1093 "An error occurred when rewinding request body for redirect."
jpayne@68 1094 )
jpayne@68 1095 else:
jpayne@68 1096 raise UnrewindableBodyError("Unable to rewind request body for redirect.")