annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/requests/utils.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 """
jpayne@69 2 requests.utils
jpayne@69 3 ~~~~~~~~~~~~~~
jpayne@69 4
jpayne@69 5 This module provides utility functions that are used within Requests
jpayne@69 6 that are also useful for external consumption.
jpayne@69 7 """
jpayne@69 8
jpayne@69 9 import codecs
jpayne@69 10 import contextlib
jpayne@69 11 import io
jpayne@69 12 import os
jpayne@69 13 import re
jpayne@69 14 import socket
jpayne@69 15 import struct
jpayne@69 16 import sys
jpayne@69 17 import tempfile
jpayne@69 18 import warnings
jpayne@69 19 import zipfile
jpayne@69 20 from collections import OrderedDict
jpayne@69 21
jpayne@69 22 from urllib3.util import make_headers, parse_url
jpayne@69 23
jpayne@69 24 from . import certs
jpayne@69 25 from .__version__ import __version__
jpayne@69 26
jpayne@69 27 # to_native_string is unused here, but imported here for backwards compatibility
jpayne@69 28 from ._internal_utils import ( # noqa: F401
jpayne@69 29 _HEADER_VALIDATORS_BYTE,
jpayne@69 30 _HEADER_VALIDATORS_STR,
jpayne@69 31 HEADER_VALIDATORS,
jpayne@69 32 to_native_string,
jpayne@69 33 )
jpayne@69 34 from .compat import (
jpayne@69 35 Mapping,
jpayne@69 36 basestring,
jpayne@69 37 bytes,
jpayne@69 38 getproxies,
jpayne@69 39 getproxies_environment,
jpayne@69 40 integer_types,
jpayne@69 41 )
jpayne@69 42 from .compat import parse_http_list as _parse_list_header
jpayne@69 43 from .compat import (
jpayne@69 44 proxy_bypass,
jpayne@69 45 proxy_bypass_environment,
jpayne@69 46 quote,
jpayne@69 47 str,
jpayne@69 48 unquote,
jpayne@69 49 urlparse,
jpayne@69 50 urlunparse,
jpayne@69 51 )
jpayne@69 52 from .cookies import cookiejar_from_dict
jpayne@69 53 from .exceptions import (
jpayne@69 54 FileModeWarning,
jpayne@69 55 InvalidHeader,
jpayne@69 56 InvalidURL,
jpayne@69 57 UnrewindableBodyError,
jpayne@69 58 )
jpayne@69 59 from .structures import CaseInsensitiveDict
jpayne@69 60
jpayne@69 61 NETRC_FILES = (".netrc", "_netrc")
jpayne@69 62
jpayne@69 63 DEFAULT_CA_BUNDLE_PATH = certs.where()
jpayne@69 64
jpayne@69 65 DEFAULT_PORTS = {"http": 80, "https": 443}
jpayne@69 66
jpayne@69 67 # Ensure that ', ' is used to preserve previous delimiter behavior.
jpayne@69 68 DEFAULT_ACCEPT_ENCODING = ", ".join(
jpayne@69 69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
jpayne@69 70 )
jpayne@69 71
jpayne@69 72
jpayne@69 73 if sys.platform == "win32":
jpayne@69 74 # provide a proxy_bypass version on Windows without DNS lookups
jpayne@69 75
jpayne@69 76 def proxy_bypass_registry(host):
jpayne@69 77 try:
jpayne@69 78 import winreg
jpayne@69 79 except ImportError:
jpayne@69 80 return False
jpayne@69 81
jpayne@69 82 try:
jpayne@69 83 internetSettings = winreg.OpenKey(
jpayne@69 84 winreg.HKEY_CURRENT_USER,
jpayne@69 85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
jpayne@69 86 )
jpayne@69 87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
jpayne@69 88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
jpayne@69 89 # ProxyOverride is almost always a string
jpayne@69 90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
jpayne@69 91 except (OSError, ValueError):
jpayne@69 92 return False
jpayne@69 93 if not proxyEnable or not proxyOverride:
jpayne@69 94 return False
jpayne@69 95
jpayne@69 96 # make a check value list from the registry entry: replace the
jpayne@69 97 # '<local>' string by the localhost entry and the corresponding
jpayne@69 98 # canonical entry.
jpayne@69 99 proxyOverride = proxyOverride.split(";")
jpayne@69 100 # filter out empty strings to avoid re.match return true in the following code.
jpayne@69 101 proxyOverride = filter(None, proxyOverride)
jpayne@69 102 # now check if we match one of the registry values.
jpayne@69 103 for test in proxyOverride:
jpayne@69 104 if test == "<local>":
jpayne@69 105 if "." not in host:
jpayne@69 106 return True
jpayne@69 107 test = test.replace(".", r"\.") # mask dots
jpayne@69 108 test = test.replace("*", r".*") # change glob sequence
jpayne@69 109 test = test.replace("?", r".") # change glob char
jpayne@69 110 if re.match(test, host, re.I):
jpayne@69 111 return True
jpayne@69 112 return False
jpayne@69 113
jpayne@69 114 def proxy_bypass(host): # noqa
jpayne@69 115 """Return True, if the host should be bypassed.
jpayne@69 116
jpayne@69 117 Checks proxy settings gathered from the environment, if specified,
jpayne@69 118 or the registry.
jpayne@69 119 """
jpayne@69 120 if getproxies_environment():
jpayne@69 121 return proxy_bypass_environment(host)
jpayne@69 122 else:
jpayne@69 123 return proxy_bypass_registry(host)
jpayne@69 124
jpayne@69 125
jpayne@69 126 def dict_to_sequence(d):
jpayne@69 127 """Returns an internal sequence dictionary update."""
jpayne@69 128
jpayne@69 129 if hasattr(d, "items"):
jpayne@69 130 d = d.items()
jpayne@69 131
jpayne@69 132 return d
jpayne@69 133
jpayne@69 134
jpayne@69 135 def super_len(o):
jpayne@69 136 total_length = None
jpayne@69 137 current_position = 0
jpayne@69 138
jpayne@69 139 if isinstance(o, str):
jpayne@69 140 o = o.encode("utf-8")
jpayne@69 141
jpayne@69 142 if hasattr(o, "__len__"):
jpayne@69 143 total_length = len(o)
jpayne@69 144
jpayne@69 145 elif hasattr(o, "len"):
jpayne@69 146 total_length = o.len
jpayne@69 147
jpayne@69 148 elif hasattr(o, "fileno"):
jpayne@69 149 try:
jpayne@69 150 fileno = o.fileno()
jpayne@69 151 except (io.UnsupportedOperation, AttributeError):
jpayne@69 152 # AttributeError is a surprising exception, seeing as how we've just checked
jpayne@69 153 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
jpayne@69 154 # `Tarfile.extractfile()`, per issue 5229.
jpayne@69 155 pass
jpayne@69 156 else:
jpayne@69 157 total_length = os.fstat(fileno).st_size
jpayne@69 158
jpayne@69 159 # Having used fstat to determine the file length, we need to
jpayne@69 160 # confirm that this file was opened up in binary mode.
jpayne@69 161 if "b" not in o.mode:
jpayne@69 162 warnings.warn(
jpayne@69 163 (
jpayne@69 164 "Requests has determined the content-length for this "
jpayne@69 165 "request using the binary size of the file: however, the "
jpayne@69 166 "file has been opened in text mode (i.e. without the 'b' "
jpayne@69 167 "flag in the mode). This may lead to an incorrect "
jpayne@69 168 "content-length. In Requests 3.0, support will be removed "
jpayne@69 169 "for files in text mode."
jpayne@69 170 ),
jpayne@69 171 FileModeWarning,
jpayne@69 172 )
jpayne@69 173
jpayne@69 174 if hasattr(o, "tell"):
jpayne@69 175 try:
jpayne@69 176 current_position = o.tell()
jpayne@69 177 except OSError:
jpayne@69 178 # This can happen in some weird situations, such as when the file
jpayne@69 179 # is actually a special file descriptor like stdin. In this
jpayne@69 180 # instance, we don't know what the length is, so set it to zero and
jpayne@69 181 # let requests chunk it instead.
jpayne@69 182 if total_length is not None:
jpayne@69 183 current_position = total_length
jpayne@69 184 else:
jpayne@69 185 if hasattr(o, "seek") and total_length is None:
jpayne@69 186 # StringIO and BytesIO have seek but no usable fileno
jpayne@69 187 try:
jpayne@69 188 # seek to end of file
jpayne@69 189 o.seek(0, 2)
jpayne@69 190 total_length = o.tell()
jpayne@69 191
jpayne@69 192 # seek back to current position to support
jpayne@69 193 # partially read file-like objects
jpayne@69 194 o.seek(current_position or 0)
jpayne@69 195 except OSError:
jpayne@69 196 total_length = 0
jpayne@69 197
jpayne@69 198 if total_length is None:
jpayne@69 199 total_length = 0
jpayne@69 200
jpayne@69 201 return max(0, total_length - current_position)
jpayne@69 202
jpayne@69 203
jpayne@69 204 def get_netrc_auth(url, raise_errors=False):
jpayne@69 205 """Returns the Requests tuple auth for a given url from netrc."""
jpayne@69 206
jpayne@69 207 netrc_file = os.environ.get("NETRC")
jpayne@69 208 if netrc_file is not None:
jpayne@69 209 netrc_locations = (netrc_file,)
jpayne@69 210 else:
jpayne@69 211 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
jpayne@69 212
jpayne@69 213 try:
jpayne@69 214 from netrc import NetrcParseError, netrc
jpayne@69 215
jpayne@69 216 netrc_path = None
jpayne@69 217
jpayne@69 218 for f in netrc_locations:
jpayne@69 219 try:
jpayne@69 220 loc = os.path.expanduser(f)
jpayne@69 221 except KeyError:
jpayne@69 222 # os.path.expanduser can fail when $HOME is undefined and
jpayne@69 223 # getpwuid fails. See https://bugs.python.org/issue20164 &
jpayne@69 224 # https://github.com/psf/requests/issues/1846
jpayne@69 225 return
jpayne@69 226
jpayne@69 227 if os.path.exists(loc):
jpayne@69 228 netrc_path = loc
jpayne@69 229 break
jpayne@69 230
jpayne@69 231 # Abort early if there isn't one.
jpayne@69 232 if netrc_path is None:
jpayne@69 233 return
jpayne@69 234
jpayne@69 235 ri = urlparse(url)
jpayne@69 236
jpayne@69 237 # Strip port numbers from netloc. This weird `if...encode`` dance is
jpayne@69 238 # used for Python 3.2, which doesn't support unicode literals.
jpayne@69 239 splitstr = b":"
jpayne@69 240 if isinstance(url, str):
jpayne@69 241 splitstr = splitstr.decode("ascii")
jpayne@69 242 host = ri.netloc.split(splitstr)[0]
jpayne@69 243
jpayne@69 244 try:
jpayne@69 245 _netrc = netrc(netrc_path).authenticators(host)
jpayne@69 246 if _netrc:
jpayne@69 247 # Return with login / password
jpayne@69 248 login_i = 0 if _netrc[0] else 1
jpayne@69 249 return (_netrc[login_i], _netrc[2])
jpayne@69 250 except (NetrcParseError, OSError):
jpayne@69 251 # If there was a parsing error or a permissions issue reading the file,
jpayne@69 252 # we'll just skip netrc auth unless explicitly asked to raise errors.
jpayne@69 253 if raise_errors:
jpayne@69 254 raise
jpayne@69 255
jpayne@69 256 # App Engine hackiness.
jpayne@69 257 except (ImportError, AttributeError):
jpayne@69 258 pass
jpayne@69 259
jpayne@69 260
jpayne@69 261 def guess_filename(obj):
jpayne@69 262 """Tries to guess the filename of the given object."""
jpayne@69 263 name = getattr(obj, "name", None)
jpayne@69 264 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
jpayne@69 265 return os.path.basename(name)
jpayne@69 266
jpayne@69 267
jpayne@69 268 def extract_zipped_paths(path):
jpayne@69 269 """Replace nonexistent paths that look like they refer to a member of a zip
jpayne@69 270 archive with the location of an extracted copy of the target, or else
jpayne@69 271 just return the provided path unchanged.
jpayne@69 272 """
jpayne@69 273 if os.path.exists(path):
jpayne@69 274 # this is already a valid path, no need to do anything further
jpayne@69 275 return path
jpayne@69 276
jpayne@69 277 # find the first valid part of the provided path and treat that as a zip archive
jpayne@69 278 # assume the rest of the path is the name of a member in the archive
jpayne@69 279 archive, member = os.path.split(path)
jpayne@69 280 while archive and not os.path.exists(archive):
jpayne@69 281 archive, prefix = os.path.split(archive)
jpayne@69 282 if not prefix:
jpayne@69 283 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
jpayne@69 284 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
jpayne@69 285 break
jpayne@69 286 member = "/".join([prefix, member])
jpayne@69 287
jpayne@69 288 if not zipfile.is_zipfile(archive):
jpayne@69 289 return path
jpayne@69 290
jpayne@69 291 zip_file = zipfile.ZipFile(archive)
jpayne@69 292 if member not in zip_file.namelist():
jpayne@69 293 return path
jpayne@69 294
jpayne@69 295 # we have a valid zip archive and a valid member of that archive
jpayne@69 296 tmp = tempfile.gettempdir()
jpayne@69 297 extracted_path = os.path.join(tmp, member.split("/")[-1])
jpayne@69 298 if not os.path.exists(extracted_path):
jpayne@69 299 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
jpayne@69 300 with atomic_open(extracted_path) as file_handler:
jpayne@69 301 file_handler.write(zip_file.read(member))
jpayne@69 302 return extracted_path
jpayne@69 303
jpayne@69 304
jpayne@69 305 @contextlib.contextmanager
jpayne@69 306 def atomic_open(filename):
jpayne@69 307 """Write a file to the disk in an atomic fashion"""
jpayne@69 308 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
jpayne@69 309 try:
jpayne@69 310 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
jpayne@69 311 yield tmp_handler
jpayne@69 312 os.replace(tmp_name, filename)
jpayne@69 313 except BaseException:
jpayne@69 314 os.remove(tmp_name)
jpayne@69 315 raise
jpayne@69 316
jpayne@69 317
jpayne@69 318 def from_key_val_list(value):
jpayne@69 319 """Take an object and test to see if it can be represented as a
jpayne@69 320 dictionary. Unless it can not be represented as such, return an
jpayne@69 321 OrderedDict, e.g.,
jpayne@69 322
jpayne@69 323 ::
jpayne@69 324
jpayne@69 325 >>> from_key_val_list([('key', 'val')])
jpayne@69 326 OrderedDict([('key', 'val')])
jpayne@69 327 >>> from_key_val_list('string')
jpayne@69 328 Traceback (most recent call last):
jpayne@69 329 ...
jpayne@69 330 ValueError: cannot encode objects that are not 2-tuples
jpayne@69 331 >>> from_key_val_list({'key': 'val'})
jpayne@69 332 OrderedDict([('key', 'val')])
jpayne@69 333
jpayne@69 334 :rtype: OrderedDict
jpayne@69 335 """
jpayne@69 336 if value is None:
jpayne@69 337 return None
jpayne@69 338
jpayne@69 339 if isinstance(value, (str, bytes, bool, int)):
jpayne@69 340 raise ValueError("cannot encode objects that are not 2-tuples")
jpayne@69 341
jpayne@69 342 return OrderedDict(value)
jpayne@69 343
jpayne@69 344
jpayne@69 345 def to_key_val_list(value):
jpayne@69 346 """Take an object and test to see if it can be represented as a
jpayne@69 347 dictionary. If it can be, return a list of tuples, e.g.,
jpayne@69 348
jpayne@69 349 ::
jpayne@69 350
jpayne@69 351 >>> to_key_val_list([('key', 'val')])
jpayne@69 352 [('key', 'val')]
jpayne@69 353 >>> to_key_val_list({'key': 'val'})
jpayne@69 354 [('key', 'val')]
jpayne@69 355 >>> to_key_val_list('string')
jpayne@69 356 Traceback (most recent call last):
jpayne@69 357 ...
jpayne@69 358 ValueError: cannot encode objects that are not 2-tuples
jpayne@69 359
jpayne@69 360 :rtype: list
jpayne@69 361 """
jpayne@69 362 if value is None:
jpayne@69 363 return None
jpayne@69 364
jpayne@69 365 if isinstance(value, (str, bytes, bool, int)):
jpayne@69 366 raise ValueError("cannot encode objects that are not 2-tuples")
jpayne@69 367
jpayne@69 368 if isinstance(value, Mapping):
jpayne@69 369 value = value.items()
jpayne@69 370
jpayne@69 371 return list(value)
jpayne@69 372
jpayne@69 373
jpayne@69 374 # From mitsuhiko/werkzeug (used with permission).
jpayne@69 375 def parse_list_header(value):
jpayne@69 376 """Parse lists as described by RFC 2068 Section 2.
jpayne@69 377
jpayne@69 378 In particular, parse comma-separated lists where the elements of
jpayne@69 379 the list may include quoted-strings. A quoted-string could
jpayne@69 380 contain a comma. A non-quoted string could have quotes in the
jpayne@69 381 middle. Quotes are removed automatically after parsing.
jpayne@69 382
jpayne@69 383 It basically works like :func:`parse_set_header` just that items
jpayne@69 384 may appear multiple times and case sensitivity is preserved.
jpayne@69 385
jpayne@69 386 The return value is a standard :class:`list`:
jpayne@69 387
jpayne@69 388 >>> parse_list_header('token, "quoted value"')
jpayne@69 389 ['token', 'quoted value']
jpayne@69 390
jpayne@69 391 To create a header from the :class:`list` again, use the
jpayne@69 392 :func:`dump_header` function.
jpayne@69 393
jpayne@69 394 :param value: a string with a list header.
jpayne@69 395 :return: :class:`list`
jpayne@69 396 :rtype: list
jpayne@69 397 """
jpayne@69 398 result = []
jpayne@69 399 for item in _parse_list_header(value):
jpayne@69 400 if item[:1] == item[-1:] == '"':
jpayne@69 401 item = unquote_header_value(item[1:-1])
jpayne@69 402 result.append(item)
jpayne@69 403 return result
jpayne@69 404
jpayne@69 405
jpayne@69 406 # From mitsuhiko/werkzeug (used with permission).
jpayne@69 407 def parse_dict_header(value):
jpayne@69 408 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
jpayne@69 409 convert them into a python dict:
jpayne@69 410
jpayne@69 411 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
jpayne@69 412 >>> type(d) is dict
jpayne@69 413 True
jpayne@69 414 >>> sorted(d.items())
jpayne@69 415 [('bar', 'as well'), ('foo', 'is a fish')]
jpayne@69 416
jpayne@69 417 If there is no value for a key it will be `None`:
jpayne@69 418
jpayne@69 419 >>> parse_dict_header('key_without_value')
jpayne@69 420 {'key_without_value': None}
jpayne@69 421
jpayne@69 422 To create a header from the :class:`dict` again, use the
jpayne@69 423 :func:`dump_header` function.
jpayne@69 424
jpayne@69 425 :param value: a string with a dict header.
jpayne@69 426 :return: :class:`dict`
jpayne@69 427 :rtype: dict
jpayne@69 428 """
jpayne@69 429 result = {}
jpayne@69 430 for item in _parse_list_header(value):
jpayne@69 431 if "=" not in item:
jpayne@69 432 result[item] = None
jpayne@69 433 continue
jpayne@69 434 name, value = item.split("=", 1)
jpayne@69 435 if value[:1] == value[-1:] == '"':
jpayne@69 436 value = unquote_header_value(value[1:-1])
jpayne@69 437 result[name] = value
jpayne@69 438 return result
jpayne@69 439
jpayne@69 440
jpayne@69 441 # From mitsuhiko/werkzeug (used with permission).
jpayne@69 442 def unquote_header_value(value, is_filename=False):
jpayne@69 443 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
jpayne@69 444 This does not use the real unquoting but what browsers are actually
jpayne@69 445 using for quoting.
jpayne@69 446
jpayne@69 447 :param value: the header value to unquote.
jpayne@69 448 :rtype: str
jpayne@69 449 """
jpayne@69 450 if value and value[0] == value[-1] == '"':
jpayne@69 451 # this is not the real unquoting, but fixing this so that the
jpayne@69 452 # RFC is met will result in bugs with internet explorer and
jpayne@69 453 # probably some other browsers as well. IE for example is
jpayne@69 454 # uploading files with "C:\foo\bar.txt" as filename
jpayne@69 455 value = value[1:-1]
jpayne@69 456
jpayne@69 457 # if this is a filename and the starting characters look like
jpayne@69 458 # a UNC path, then just return the value without quotes. Using the
jpayne@69 459 # replace sequence below on a UNC path has the effect of turning
jpayne@69 460 # the leading double slash into a single slash and then
jpayne@69 461 # _fix_ie_filename() doesn't work correctly. See #458.
jpayne@69 462 if not is_filename or value[:2] != "\\\\":
jpayne@69 463 return value.replace("\\\\", "\\").replace('\\"', '"')
jpayne@69 464 return value
jpayne@69 465
jpayne@69 466
jpayne@69 467 def dict_from_cookiejar(cj):
jpayne@69 468 """Returns a key/value dictionary from a CookieJar.
jpayne@69 469
jpayne@69 470 :param cj: CookieJar object to extract cookies from.
jpayne@69 471 :rtype: dict
jpayne@69 472 """
jpayne@69 473
jpayne@69 474 cookie_dict = {cookie.name: cookie.value for cookie in cj}
jpayne@69 475 return cookie_dict
jpayne@69 476
jpayne@69 477
jpayne@69 478 def add_dict_to_cookiejar(cj, cookie_dict):
jpayne@69 479 """Returns a CookieJar from a key/value dictionary.
jpayne@69 480
jpayne@69 481 :param cj: CookieJar to insert cookies into.
jpayne@69 482 :param cookie_dict: Dict of key/values to insert into CookieJar.
jpayne@69 483 :rtype: CookieJar
jpayne@69 484 """
jpayne@69 485
jpayne@69 486 return cookiejar_from_dict(cookie_dict, cj)
jpayne@69 487
jpayne@69 488
jpayne@69 489 def get_encodings_from_content(content):
jpayne@69 490 """Returns encodings from given content string.
jpayne@69 491
jpayne@69 492 :param content: bytestring to extract encodings from.
jpayne@69 493 """
jpayne@69 494 warnings.warn(
jpayne@69 495 (
jpayne@69 496 "In requests 3.0, get_encodings_from_content will be removed. For "
jpayne@69 497 "more information, please see the discussion on issue #2266. (This"
jpayne@69 498 " warning should only appear once.)"
jpayne@69 499 ),
jpayne@69 500 DeprecationWarning,
jpayne@69 501 )
jpayne@69 502
jpayne@69 503 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
jpayne@69 504 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
jpayne@69 505 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
jpayne@69 506
jpayne@69 507 return (
jpayne@69 508 charset_re.findall(content)
jpayne@69 509 + pragma_re.findall(content)
jpayne@69 510 + xml_re.findall(content)
jpayne@69 511 )
jpayne@69 512
jpayne@69 513
jpayne@69 514 def _parse_content_type_header(header):
jpayne@69 515 """Returns content type and parameters from given header
jpayne@69 516
jpayne@69 517 :param header: string
jpayne@69 518 :return: tuple containing content type and dictionary of
jpayne@69 519 parameters
jpayne@69 520 """
jpayne@69 521
jpayne@69 522 tokens = header.split(";")
jpayne@69 523 content_type, params = tokens[0].strip(), tokens[1:]
jpayne@69 524 params_dict = {}
jpayne@69 525 items_to_strip = "\"' "
jpayne@69 526
jpayne@69 527 for param in params:
jpayne@69 528 param = param.strip()
jpayne@69 529 if param:
jpayne@69 530 key, value = param, True
jpayne@69 531 index_of_equals = param.find("=")
jpayne@69 532 if index_of_equals != -1:
jpayne@69 533 key = param[:index_of_equals].strip(items_to_strip)
jpayne@69 534 value = param[index_of_equals + 1 :].strip(items_to_strip)
jpayne@69 535 params_dict[key.lower()] = value
jpayne@69 536 return content_type, params_dict
jpayne@69 537
jpayne@69 538
jpayne@69 539 def get_encoding_from_headers(headers):
jpayne@69 540 """Returns encodings from given HTTP Header Dict.
jpayne@69 541
jpayne@69 542 :param headers: dictionary to extract encoding from.
jpayne@69 543 :rtype: str
jpayne@69 544 """
jpayne@69 545
jpayne@69 546 content_type = headers.get("content-type")
jpayne@69 547
jpayne@69 548 if not content_type:
jpayne@69 549 return None
jpayne@69 550
jpayne@69 551 content_type, params = _parse_content_type_header(content_type)
jpayne@69 552
jpayne@69 553 if "charset" in params:
jpayne@69 554 return params["charset"].strip("'\"")
jpayne@69 555
jpayne@69 556 if "text" in content_type:
jpayne@69 557 return "ISO-8859-1"
jpayne@69 558
jpayne@69 559 if "application/json" in content_type:
jpayne@69 560 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
jpayne@69 561 return "utf-8"
jpayne@69 562
jpayne@69 563
jpayne@69 564 def stream_decode_response_unicode(iterator, r):
jpayne@69 565 """Stream decodes an iterator."""
jpayne@69 566
jpayne@69 567 if r.encoding is None:
jpayne@69 568 yield from iterator
jpayne@69 569 return
jpayne@69 570
jpayne@69 571 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
jpayne@69 572 for chunk in iterator:
jpayne@69 573 rv = decoder.decode(chunk)
jpayne@69 574 if rv:
jpayne@69 575 yield rv
jpayne@69 576 rv = decoder.decode(b"", final=True)
jpayne@69 577 if rv:
jpayne@69 578 yield rv
jpayne@69 579
jpayne@69 580
jpayne@69 581 def iter_slices(string, slice_length):
jpayne@69 582 """Iterate over slices of a string."""
jpayne@69 583 pos = 0
jpayne@69 584 if slice_length is None or slice_length <= 0:
jpayne@69 585 slice_length = len(string)
jpayne@69 586 while pos < len(string):
jpayne@69 587 yield string[pos : pos + slice_length]
jpayne@69 588 pos += slice_length
jpayne@69 589
jpayne@69 590
jpayne@69 591 def get_unicode_from_response(r):
jpayne@69 592 """Returns the requested content back in unicode.
jpayne@69 593
jpayne@69 594 :param r: Response object to get unicode content from.
jpayne@69 595
jpayne@69 596 Tried:
jpayne@69 597
jpayne@69 598 1. charset from content-type
jpayne@69 599 2. fall back and replace all unicode characters
jpayne@69 600
jpayne@69 601 :rtype: str
jpayne@69 602 """
jpayne@69 603 warnings.warn(
jpayne@69 604 (
jpayne@69 605 "In requests 3.0, get_unicode_from_response will be removed. For "
jpayne@69 606 "more information, please see the discussion on issue #2266. (This"
jpayne@69 607 " warning should only appear once.)"
jpayne@69 608 ),
jpayne@69 609 DeprecationWarning,
jpayne@69 610 )
jpayne@69 611
jpayne@69 612 tried_encodings = []
jpayne@69 613
jpayne@69 614 # Try charset from content-type
jpayne@69 615 encoding = get_encoding_from_headers(r.headers)
jpayne@69 616
jpayne@69 617 if encoding:
jpayne@69 618 try:
jpayne@69 619 return str(r.content, encoding)
jpayne@69 620 except UnicodeError:
jpayne@69 621 tried_encodings.append(encoding)
jpayne@69 622
jpayne@69 623 # Fall back:
jpayne@69 624 try:
jpayne@69 625 return str(r.content, encoding, errors="replace")
jpayne@69 626 except TypeError:
jpayne@69 627 return r.content
jpayne@69 628
jpayne@69 629
jpayne@69 630 # The unreserved URI characters (RFC 3986)
jpayne@69 631 UNRESERVED_SET = frozenset(
jpayne@69 632 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
jpayne@69 633 )
jpayne@69 634
jpayne@69 635
jpayne@69 636 def unquote_unreserved(uri):
jpayne@69 637 """Un-escape any percent-escape sequences in a URI that are unreserved
jpayne@69 638 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
jpayne@69 639
jpayne@69 640 :rtype: str
jpayne@69 641 """
jpayne@69 642 parts = uri.split("%")
jpayne@69 643 for i in range(1, len(parts)):
jpayne@69 644 h = parts[i][0:2]
jpayne@69 645 if len(h) == 2 and h.isalnum():
jpayne@69 646 try:
jpayne@69 647 c = chr(int(h, 16))
jpayne@69 648 except ValueError:
jpayne@69 649 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
jpayne@69 650
jpayne@69 651 if c in UNRESERVED_SET:
jpayne@69 652 parts[i] = c + parts[i][2:]
jpayne@69 653 else:
jpayne@69 654 parts[i] = f"%{parts[i]}"
jpayne@69 655 else:
jpayne@69 656 parts[i] = f"%{parts[i]}"
jpayne@69 657 return "".join(parts)
jpayne@69 658
jpayne@69 659
jpayne@69 660 def requote_uri(uri):
jpayne@69 661 """Re-quote the given URI.
jpayne@69 662
jpayne@69 663 This function passes the given URI through an unquote/quote cycle to
jpayne@69 664 ensure that it is fully and consistently quoted.
jpayne@69 665
jpayne@69 666 :rtype: str
jpayne@69 667 """
jpayne@69 668 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
jpayne@69 669 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
jpayne@69 670 try:
jpayne@69 671 # Unquote only the unreserved characters
jpayne@69 672 # Then quote only illegal characters (do not quote reserved,
jpayne@69 673 # unreserved, or '%')
jpayne@69 674 return quote(unquote_unreserved(uri), safe=safe_with_percent)
jpayne@69 675 except InvalidURL:
jpayne@69 676 # We couldn't unquote the given URI, so let's try quoting it, but
jpayne@69 677 # there may be unquoted '%'s in the URI. We need to make sure they're
jpayne@69 678 # properly quoted so they do not cause issues elsewhere.
jpayne@69 679 return quote(uri, safe=safe_without_percent)
jpayne@69 680
jpayne@69 681
jpayne@69 682 def address_in_network(ip, net):
jpayne@69 683 """This function allows you to check if an IP belongs to a network subnet
jpayne@69 684
jpayne@69 685 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
jpayne@69 686 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
jpayne@69 687
jpayne@69 688 :rtype: bool
jpayne@69 689 """
jpayne@69 690 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
jpayne@69 691 netaddr, bits = net.split("/")
jpayne@69 692 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
jpayne@69 693 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
jpayne@69 694 return (ipaddr & netmask) == (network & netmask)
jpayne@69 695
jpayne@69 696
jpayne@69 697 def dotted_netmask(mask):
jpayne@69 698 """Converts mask from /xx format to xxx.xxx.xxx.xxx
jpayne@69 699
jpayne@69 700 Example: if mask is 24 function returns 255.255.255.0
jpayne@69 701
jpayne@69 702 :rtype: str
jpayne@69 703 """
jpayne@69 704 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
jpayne@69 705 return socket.inet_ntoa(struct.pack(">I", bits))
jpayne@69 706
jpayne@69 707
jpayne@69 708 def is_ipv4_address(string_ip):
jpayne@69 709 """
jpayne@69 710 :rtype: bool
jpayne@69 711 """
jpayne@69 712 try:
jpayne@69 713 socket.inet_aton(string_ip)
jpayne@69 714 except OSError:
jpayne@69 715 return False
jpayne@69 716 return True
jpayne@69 717
jpayne@69 718
jpayne@69 719 def is_valid_cidr(string_network):
jpayne@69 720 """
jpayne@69 721 Very simple check of the cidr format in no_proxy variable.
jpayne@69 722
jpayne@69 723 :rtype: bool
jpayne@69 724 """
jpayne@69 725 if string_network.count("/") == 1:
jpayne@69 726 try:
jpayne@69 727 mask = int(string_network.split("/")[1])
jpayne@69 728 except ValueError:
jpayne@69 729 return False
jpayne@69 730
jpayne@69 731 if mask < 1 or mask > 32:
jpayne@69 732 return False
jpayne@69 733
jpayne@69 734 try:
jpayne@69 735 socket.inet_aton(string_network.split("/")[0])
jpayne@69 736 except OSError:
jpayne@69 737 return False
jpayne@69 738 else:
jpayne@69 739 return False
jpayne@69 740 return True
jpayne@69 741
jpayne@69 742
jpayne@69 743 @contextlib.contextmanager
jpayne@69 744 def set_environ(env_name, value):
jpayne@69 745 """Set the environment variable 'env_name' to 'value'
jpayne@69 746
jpayne@69 747 Save previous value, yield, and then restore the previous value stored in
jpayne@69 748 the environment variable 'env_name'.
jpayne@69 749
jpayne@69 750 If 'value' is None, do nothing"""
jpayne@69 751 value_changed = value is not None
jpayne@69 752 if value_changed:
jpayne@69 753 old_value = os.environ.get(env_name)
jpayne@69 754 os.environ[env_name] = value
jpayne@69 755 try:
jpayne@69 756 yield
jpayne@69 757 finally:
jpayne@69 758 if value_changed:
jpayne@69 759 if old_value is None:
jpayne@69 760 del os.environ[env_name]
jpayne@69 761 else:
jpayne@69 762 os.environ[env_name] = old_value
jpayne@69 763
jpayne@69 764
jpayne@69 765 def should_bypass_proxies(url, no_proxy):
jpayne@69 766 """
jpayne@69 767 Returns whether we should bypass proxies or not.
jpayne@69 768
jpayne@69 769 :rtype: bool
jpayne@69 770 """
jpayne@69 771
jpayne@69 772 # Prioritize lowercase environment variables over uppercase
jpayne@69 773 # to keep a consistent behaviour with other http projects (curl, wget).
jpayne@69 774 def get_proxy(key):
jpayne@69 775 return os.environ.get(key) or os.environ.get(key.upper())
jpayne@69 776
jpayne@69 777 # First check whether no_proxy is defined. If it is, check that the URL
jpayne@69 778 # we're getting isn't in the no_proxy list.
jpayne@69 779 no_proxy_arg = no_proxy
jpayne@69 780 if no_proxy is None:
jpayne@69 781 no_proxy = get_proxy("no_proxy")
jpayne@69 782 parsed = urlparse(url)
jpayne@69 783
jpayne@69 784 if parsed.hostname is None:
jpayne@69 785 # URLs don't always have hostnames, e.g. file:/// urls.
jpayne@69 786 return True
jpayne@69 787
jpayne@69 788 if no_proxy:
jpayne@69 789 # We need to check whether we match here. We need to see if we match
jpayne@69 790 # the end of the hostname, both with and without the port.
jpayne@69 791 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
jpayne@69 792
jpayne@69 793 if is_ipv4_address(parsed.hostname):
jpayne@69 794 for proxy_ip in no_proxy:
jpayne@69 795 if is_valid_cidr(proxy_ip):
jpayne@69 796 if address_in_network(parsed.hostname, proxy_ip):
jpayne@69 797 return True
jpayne@69 798 elif parsed.hostname == proxy_ip:
jpayne@69 799 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
jpayne@69 800 # matches the IP of the index
jpayne@69 801 return True
jpayne@69 802 else:
jpayne@69 803 host_with_port = parsed.hostname
jpayne@69 804 if parsed.port:
jpayne@69 805 host_with_port += f":{parsed.port}"
jpayne@69 806
jpayne@69 807 for host in no_proxy:
jpayne@69 808 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
jpayne@69 809 # The URL does match something in no_proxy, so we don't want
jpayne@69 810 # to apply the proxies on this URL.
jpayne@69 811 return True
jpayne@69 812
jpayne@69 813 with set_environ("no_proxy", no_proxy_arg):
jpayne@69 814 # parsed.hostname can be `None` in cases such as a file URI.
jpayne@69 815 try:
jpayne@69 816 bypass = proxy_bypass(parsed.hostname)
jpayne@69 817 except (TypeError, socket.gaierror):
jpayne@69 818 bypass = False
jpayne@69 819
jpayne@69 820 if bypass:
jpayne@69 821 return True
jpayne@69 822
jpayne@69 823 return False
jpayne@69 824
jpayne@69 825
jpayne@69 826 def get_environ_proxies(url, no_proxy=None):
jpayne@69 827 """
jpayne@69 828 Return a dict of environment proxies.
jpayne@69 829
jpayne@69 830 :rtype: dict
jpayne@69 831 """
jpayne@69 832 if should_bypass_proxies(url, no_proxy=no_proxy):
jpayne@69 833 return {}
jpayne@69 834 else:
jpayne@69 835 return getproxies()
jpayne@69 836
jpayne@69 837
jpayne@69 838 def select_proxy(url, proxies):
jpayne@69 839 """Select a proxy for the url, if applicable.
jpayne@69 840
jpayne@69 841 :param url: The url being for the request
jpayne@69 842 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
jpayne@69 843 """
jpayne@69 844 proxies = proxies or {}
jpayne@69 845 urlparts = urlparse(url)
jpayne@69 846 if urlparts.hostname is None:
jpayne@69 847 return proxies.get(urlparts.scheme, proxies.get("all"))
jpayne@69 848
jpayne@69 849 proxy_keys = [
jpayne@69 850 urlparts.scheme + "://" + urlparts.hostname,
jpayne@69 851 urlparts.scheme,
jpayne@69 852 "all://" + urlparts.hostname,
jpayne@69 853 "all",
jpayne@69 854 ]
jpayne@69 855 proxy = None
jpayne@69 856 for proxy_key in proxy_keys:
jpayne@69 857 if proxy_key in proxies:
jpayne@69 858 proxy = proxies[proxy_key]
jpayne@69 859 break
jpayne@69 860
jpayne@69 861 return proxy
jpayne@69 862
jpayne@69 863
jpayne@69 864 def resolve_proxies(request, proxies, trust_env=True):
jpayne@69 865 """This method takes proxy information from a request and configuration
jpayne@69 866 input to resolve a mapping of target proxies. This will consider settings
jpayne@69 867 such as NO_PROXY to strip proxy configurations.
jpayne@69 868
jpayne@69 869 :param request: Request or PreparedRequest
jpayne@69 870 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
jpayne@69 871 :param trust_env: Boolean declaring whether to trust environment configs
jpayne@69 872
jpayne@69 873 :rtype: dict
jpayne@69 874 """
jpayne@69 875 proxies = proxies if proxies is not None else {}
jpayne@69 876 url = request.url
jpayne@69 877 scheme = urlparse(url).scheme
jpayne@69 878 no_proxy = proxies.get("no_proxy")
jpayne@69 879 new_proxies = proxies.copy()
jpayne@69 880
jpayne@69 881 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
jpayne@69 882 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
jpayne@69 883
jpayne@69 884 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
jpayne@69 885
jpayne@69 886 if proxy:
jpayne@69 887 new_proxies.setdefault(scheme, proxy)
jpayne@69 888 return new_proxies
jpayne@69 889
jpayne@69 890
jpayne@69 891 def default_user_agent(name="python-requests"):
jpayne@69 892 """
jpayne@69 893 Return a string representing the default user agent.
jpayne@69 894
jpayne@69 895 :rtype: str
jpayne@69 896 """
jpayne@69 897 return f"{name}/{__version__}"
jpayne@69 898
jpayne@69 899
jpayne@69 900 def default_headers():
jpayne@69 901 """
jpayne@69 902 :rtype: requests.structures.CaseInsensitiveDict
jpayne@69 903 """
jpayne@69 904 return CaseInsensitiveDict(
jpayne@69 905 {
jpayne@69 906 "User-Agent": default_user_agent(),
jpayne@69 907 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
jpayne@69 908 "Accept": "*/*",
jpayne@69 909 "Connection": "keep-alive",
jpayne@69 910 }
jpayne@69 911 )
jpayne@69 912
jpayne@69 913
jpayne@69 914 def parse_header_links(value):
jpayne@69 915 """Return a list of parsed link headers proxies.
jpayne@69 916
jpayne@69 917 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
jpayne@69 918
jpayne@69 919 :rtype: list
jpayne@69 920 """
jpayne@69 921
jpayne@69 922 links = []
jpayne@69 923
jpayne@69 924 replace_chars = " '\""
jpayne@69 925
jpayne@69 926 value = value.strip(replace_chars)
jpayne@69 927 if not value:
jpayne@69 928 return links
jpayne@69 929
jpayne@69 930 for val in re.split(", *<", value):
jpayne@69 931 try:
jpayne@69 932 url, params = val.split(";", 1)
jpayne@69 933 except ValueError:
jpayne@69 934 url, params = val, ""
jpayne@69 935
jpayne@69 936 link = {"url": url.strip("<> '\"")}
jpayne@69 937
jpayne@69 938 for param in params.split(";"):
jpayne@69 939 try:
jpayne@69 940 key, value = param.split("=")
jpayne@69 941 except ValueError:
jpayne@69 942 break
jpayne@69 943
jpayne@69 944 link[key.strip(replace_chars)] = value.strip(replace_chars)
jpayne@69 945
jpayne@69 946 links.append(link)
jpayne@69 947
jpayne@69 948 return links
jpayne@69 949
jpayne@69 950
jpayne@69 951 # Null bytes; no need to recreate these on each call to guess_json_utf
jpayne@69 952 _null = "\x00".encode("ascii") # encoding to ASCII for Python 3
jpayne@69 953 _null2 = _null * 2
jpayne@69 954 _null3 = _null * 3
jpayne@69 955
jpayne@69 956
jpayne@69 957 def guess_json_utf(data):
jpayne@69 958 """
jpayne@69 959 :rtype: str
jpayne@69 960 """
jpayne@69 961 # JSON always starts with two ASCII characters, so detection is as
jpayne@69 962 # easy as counting the nulls and from their location and count
jpayne@69 963 # determine the encoding. Also detect a BOM, if present.
jpayne@69 964 sample = data[:4]
jpayne@69 965 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
jpayne@69 966 return "utf-32" # BOM included
jpayne@69 967 if sample[:3] == codecs.BOM_UTF8:
jpayne@69 968 return "utf-8-sig" # BOM included, MS style (discouraged)
jpayne@69 969 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
jpayne@69 970 return "utf-16" # BOM included
jpayne@69 971 nullcount = sample.count(_null)
jpayne@69 972 if nullcount == 0:
jpayne@69 973 return "utf-8"
jpayne@69 974 if nullcount == 2:
jpayne@69 975 if sample[::2] == _null2: # 1st and 3rd are null
jpayne@69 976 return "utf-16-be"
jpayne@69 977 if sample[1::2] == _null2: # 2nd and 4th are null
jpayne@69 978 return "utf-16-le"
jpayne@69 979 # Did not detect 2 valid UTF-16 ascii-range characters
jpayne@69 980 if nullcount == 3:
jpayne@69 981 if sample[:3] == _null3:
jpayne@69 982 return "utf-32-be"
jpayne@69 983 if sample[1:] == _null3:
jpayne@69 984 return "utf-32-le"
jpayne@69 985 # Did not detect a valid UTF-32 ascii-range character
jpayne@69 986 return None
jpayne@69 987
jpayne@69 988
jpayne@69 989 def prepend_scheme_if_needed(url, new_scheme):
jpayne@69 990 """Given a URL that may or may not have a scheme, prepend the given scheme.
jpayne@69 991 Does not replace a present scheme with the one provided as an argument.
jpayne@69 992
jpayne@69 993 :rtype: str
jpayne@69 994 """
jpayne@69 995 parsed = parse_url(url)
jpayne@69 996 scheme, auth, host, port, path, query, fragment = parsed
jpayne@69 997
jpayne@69 998 # A defect in urlparse determines that there isn't a netloc present in some
jpayne@69 999 # urls. We previously assumed parsing was overly cautious, and swapped the
jpayne@69 1000 # netloc and path. Due to a lack of tests on the original defect, this is
jpayne@69 1001 # maintained with parse_url for backwards compatibility.
jpayne@69 1002 netloc = parsed.netloc
jpayne@69 1003 if not netloc:
jpayne@69 1004 netloc, path = path, netloc
jpayne@69 1005
jpayne@69 1006 if auth:
jpayne@69 1007 # parse_url doesn't provide the netloc with auth
jpayne@69 1008 # so we'll add it ourselves.
jpayne@69 1009 netloc = "@".join([auth, netloc])
jpayne@69 1010 if scheme is None:
jpayne@69 1011 scheme = new_scheme
jpayne@69 1012 if path is None:
jpayne@69 1013 path = ""
jpayne@69 1014
jpayne@69 1015 return urlunparse((scheme, netloc, path, "", query, fragment))
jpayne@69 1016
jpayne@69 1017
jpayne@69 1018 def get_auth_from_url(url):
jpayne@69 1019 """Given a url with authentication components, extract them into a tuple of
jpayne@69 1020 username,password.
jpayne@69 1021
jpayne@69 1022 :rtype: (str,str)
jpayne@69 1023 """
jpayne@69 1024 parsed = urlparse(url)
jpayne@69 1025
jpayne@69 1026 try:
jpayne@69 1027 auth = (unquote(parsed.username), unquote(parsed.password))
jpayne@69 1028 except (AttributeError, TypeError):
jpayne@69 1029 auth = ("", "")
jpayne@69 1030
jpayne@69 1031 return auth
jpayne@69 1032
jpayne@69 1033
jpayne@69 1034 def check_header_validity(header):
jpayne@69 1035 """Verifies that header parts don't contain leading whitespace
jpayne@69 1036 reserved characters, or return characters.
jpayne@69 1037
jpayne@69 1038 :param header: tuple, in the format (name, value).
jpayne@69 1039 """
jpayne@69 1040 name, value = header
jpayne@69 1041 _validate_header_part(header, name, 0)
jpayne@69 1042 _validate_header_part(header, value, 1)
jpayne@69 1043
jpayne@69 1044
jpayne@69 1045 def _validate_header_part(header, header_part, header_validator_index):
jpayne@69 1046 if isinstance(header_part, str):
jpayne@69 1047 validator = _HEADER_VALIDATORS_STR[header_validator_index]
jpayne@69 1048 elif isinstance(header_part, bytes):
jpayne@69 1049 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
jpayne@69 1050 else:
jpayne@69 1051 raise InvalidHeader(
jpayne@69 1052 f"Header part ({header_part!r}) from {header} "
jpayne@69 1053 f"must be of type str or bytes, not {type(header_part)}"
jpayne@69 1054 )
jpayne@69 1055
jpayne@69 1056 if not validator.match(header_part):
jpayne@69 1057 header_kind = "name" if header_validator_index == 0 else "value"
jpayne@69 1058 raise InvalidHeader(
jpayne@69 1059 f"Invalid leading whitespace, reserved character(s), or return "
jpayne@69 1060 f"character(s) in header {header_kind}: {header_part!r}"
jpayne@69 1061 )
jpayne@69 1062
jpayne@69 1063
jpayne@69 1064 def urldefragauth(url):
jpayne@69 1065 """
jpayne@69 1066 Given a url remove the fragment and the authentication part.
jpayne@69 1067
jpayne@69 1068 :rtype: str
jpayne@69 1069 """
jpayne@69 1070 scheme, netloc, path, params, query, fragment = urlparse(url)
jpayne@69 1071
jpayne@69 1072 # see func:`prepend_scheme_if_needed`
jpayne@69 1073 if not netloc:
jpayne@69 1074 netloc, path = path, netloc
jpayne@69 1075
jpayne@69 1076 netloc = netloc.rsplit("@", 1)[-1]
jpayne@69 1077
jpayne@69 1078 return urlunparse((scheme, netloc, path, params, query, ""))
jpayne@69 1079
jpayne@69 1080
jpayne@69 1081 def rewind_body(prepared_request):
jpayne@69 1082 """Move file pointer back to its recorded starting position
jpayne@69 1083 so it can be read again on redirect.
jpayne@69 1084 """
jpayne@69 1085 body_seek = getattr(prepared_request.body, "seek", None)
jpayne@69 1086 if body_seek is not None and isinstance(
jpayne@69 1087 prepared_request._body_position, integer_types
jpayne@69 1088 ):
jpayne@69 1089 try:
jpayne@69 1090 body_seek(prepared_request._body_position)
jpayne@69 1091 except OSError:
jpayne@69 1092 raise UnrewindableBodyError(
jpayne@69 1093 "An error occurred when rewinding request body for redirect."
jpayne@69 1094 )
jpayne@69 1095 else:
jpayne@69 1096 raise UnrewindableBodyError("Unable to rewind request body for redirect.")