annotate requests/utils.py @ 8:832f269deeb0

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Sun, 05 May 2024 23:47:10 -0400
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 """
jpayne@7 2 requests.utils
jpayne@7 3 ~~~~~~~~~~~~~~
jpayne@7 4
jpayne@7 5 This module provides utility functions that are used within Requests
jpayne@7 6 that are also useful for external consumption.
jpayne@7 7 """
jpayne@7 8
jpayne@7 9 import codecs
jpayne@7 10 import contextlib
jpayne@7 11 import io
jpayne@7 12 import os
jpayne@7 13 import re
jpayne@7 14 import socket
jpayne@7 15 import struct
jpayne@7 16 import sys
jpayne@7 17 import tempfile
jpayne@7 18 import warnings
jpayne@7 19 import zipfile
jpayne@7 20 from collections import OrderedDict
jpayne@7 21
jpayne@7 22 from urllib3.util import make_headers, parse_url
jpayne@7 23
jpayne@7 24 from . import certs
jpayne@7 25 from .__version__ import __version__
jpayne@7 26
jpayne@7 27 # to_native_string is unused here, but imported here for backwards compatibility
jpayne@7 28 from ._internal_utils import ( # noqa: F401
jpayne@7 29 _HEADER_VALIDATORS_BYTE,
jpayne@7 30 _HEADER_VALIDATORS_STR,
jpayne@7 31 HEADER_VALIDATORS,
jpayne@7 32 to_native_string,
jpayne@7 33 )
jpayne@7 34 from .compat import (
jpayne@7 35 Mapping,
jpayne@7 36 basestring,
jpayne@7 37 bytes,
jpayne@7 38 getproxies,
jpayne@7 39 getproxies_environment,
jpayne@7 40 integer_types,
jpayne@7 41 )
jpayne@7 42 from .compat import parse_http_list as _parse_list_header
jpayne@7 43 from .compat import (
jpayne@7 44 proxy_bypass,
jpayne@7 45 proxy_bypass_environment,
jpayne@7 46 quote,
jpayne@7 47 str,
jpayne@7 48 unquote,
jpayne@7 49 urlparse,
jpayne@7 50 urlunparse,
jpayne@7 51 )
jpayne@7 52 from .cookies import cookiejar_from_dict
jpayne@7 53 from .exceptions import (
jpayne@7 54 FileModeWarning,
jpayne@7 55 InvalidHeader,
jpayne@7 56 InvalidURL,
jpayne@7 57 UnrewindableBodyError,
jpayne@7 58 )
jpayne@7 59 from .structures import CaseInsensitiveDict
jpayne@7 60
jpayne@7 61 NETRC_FILES = (".netrc", "_netrc")
jpayne@7 62
jpayne@7 63 DEFAULT_CA_BUNDLE_PATH = certs.where()
jpayne@7 64
jpayne@7 65 DEFAULT_PORTS = {"http": 80, "https": 443}
jpayne@7 66
jpayne@7 67 # Ensure that ', ' is used to preserve previous delimiter behavior.
jpayne@7 68 DEFAULT_ACCEPT_ENCODING = ", ".join(
jpayne@7 69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
jpayne@7 70 )
jpayne@7 71
jpayne@7 72
jpayne@7 73 if sys.platform == "win32":
jpayne@7 74 # provide a proxy_bypass version on Windows without DNS lookups
jpayne@7 75
jpayne@7 76 def proxy_bypass_registry(host):
jpayne@7 77 try:
jpayne@7 78 import winreg
jpayne@7 79 except ImportError:
jpayne@7 80 return False
jpayne@7 81
jpayne@7 82 try:
jpayne@7 83 internetSettings = winreg.OpenKey(
jpayne@7 84 winreg.HKEY_CURRENT_USER,
jpayne@7 85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
jpayne@7 86 )
jpayne@7 87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
jpayne@7 88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
jpayne@7 89 # ProxyOverride is almost always a string
jpayne@7 90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
jpayne@7 91 except (OSError, ValueError):
jpayne@7 92 return False
jpayne@7 93 if not proxyEnable or not proxyOverride:
jpayne@7 94 return False
jpayne@7 95
jpayne@7 96 # make a check value list from the registry entry: replace the
jpayne@7 97 # '<local>' string by the localhost entry and the corresponding
jpayne@7 98 # canonical entry.
jpayne@7 99 proxyOverride = proxyOverride.split(";")
jpayne@7 100 # now check if we match one of the registry values.
jpayne@7 101 for test in proxyOverride:
jpayne@7 102 if test == "<local>":
jpayne@7 103 if "." not in host:
jpayne@7 104 return True
jpayne@7 105 test = test.replace(".", r"\.") # mask dots
jpayne@7 106 test = test.replace("*", r".*") # change glob sequence
jpayne@7 107 test = test.replace("?", r".") # change glob char
jpayne@7 108 if re.match(test, host, re.I):
jpayne@7 109 return True
jpayne@7 110 return False
jpayne@7 111
jpayne@7 112 def proxy_bypass(host): # noqa
jpayne@7 113 """Return True, if the host should be bypassed.
jpayne@7 114
jpayne@7 115 Checks proxy settings gathered from the environment, if specified,
jpayne@7 116 or the registry.
jpayne@7 117 """
jpayne@7 118 if getproxies_environment():
jpayne@7 119 return proxy_bypass_environment(host)
jpayne@7 120 else:
jpayne@7 121 return proxy_bypass_registry(host)
jpayne@7 122
jpayne@7 123
jpayne@7 124 def dict_to_sequence(d):
jpayne@7 125 """Returns an internal sequence dictionary update."""
jpayne@7 126
jpayne@7 127 if hasattr(d, "items"):
jpayne@7 128 d = d.items()
jpayne@7 129
jpayne@7 130 return d
jpayne@7 131
jpayne@7 132
jpayne@7 133 def super_len(o):
jpayne@7 134 total_length = None
jpayne@7 135 current_position = 0
jpayne@7 136
jpayne@7 137 if hasattr(o, "__len__"):
jpayne@7 138 total_length = len(o)
jpayne@7 139
jpayne@7 140 elif hasattr(o, "len"):
jpayne@7 141 total_length = o.len
jpayne@7 142
jpayne@7 143 elif hasattr(o, "fileno"):
jpayne@7 144 try:
jpayne@7 145 fileno = o.fileno()
jpayne@7 146 except (io.UnsupportedOperation, AttributeError):
jpayne@7 147 # AttributeError is a surprising exception, seeing as how we've just checked
jpayne@7 148 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
jpayne@7 149 # `Tarfile.extractfile()`, per issue 5229.
jpayne@7 150 pass
jpayne@7 151 else:
jpayne@7 152 total_length = os.fstat(fileno).st_size
jpayne@7 153
jpayne@7 154 # Having used fstat to determine the file length, we need to
jpayne@7 155 # confirm that this file was opened up in binary mode.
jpayne@7 156 if "b" not in o.mode:
jpayne@7 157 warnings.warn(
jpayne@7 158 (
jpayne@7 159 "Requests has determined the content-length for this "
jpayne@7 160 "request using the binary size of the file: however, the "
jpayne@7 161 "file has been opened in text mode (i.e. without the 'b' "
jpayne@7 162 "flag in the mode). This may lead to an incorrect "
jpayne@7 163 "content-length. In Requests 3.0, support will be removed "
jpayne@7 164 "for files in text mode."
jpayne@7 165 ),
jpayne@7 166 FileModeWarning,
jpayne@7 167 )
jpayne@7 168
jpayne@7 169 if hasattr(o, "tell"):
jpayne@7 170 try:
jpayne@7 171 current_position = o.tell()
jpayne@7 172 except OSError:
jpayne@7 173 # This can happen in some weird situations, such as when the file
jpayne@7 174 # is actually a special file descriptor like stdin. In this
jpayne@7 175 # instance, we don't know what the length is, so set it to zero and
jpayne@7 176 # let requests chunk it instead.
jpayne@7 177 if total_length is not None:
jpayne@7 178 current_position = total_length
jpayne@7 179 else:
jpayne@7 180 if hasattr(o, "seek") and total_length is None:
jpayne@7 181 # StringIO and BytesIO have seek but no usable fileno
jpayne@7 182 try:
jpayne@7 183 # seek to end of file
jpayne@7 184 o.seek(0, 2)
jpayne@7 185 total_length = o.tell()
jpayne@7 186
jpayne@7 187 # seek back to current position to support
jpayne@7 188 # partially read file-like objects
jpayne@7 189 o.seek(current_position or 0)
jpayne@7 190 except OSError:
jpayne@7 191 total_length = 0
jpayne@7 192
jpayne@7 193 if total_length is None:
jpayne@7 194 total_length = 0
jpayne@7 195
jpayne@7 196 return max(0, total_length - current_position)
jpayne@7 197
jpayne@7 198
jpayne@7 199 def get_netrc_auth(url, raise_errors=False):
jpayne@7 200 """Returns the Requests tuple auth for a given url from netrc."""
jpayne@7 201
jpayne@7 202 netrc_file = os.environ.get("NETRC")
jpayne@7 203 if netrc_file is not None:
jpayne@7 204 netrc_locations = (netrc_file,)
jpayne@7 205 else:
jpayne@7 206 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
jpayne@7 207
jpayne@7 208 try:
jpayne@7 209 from netrc import NetrcParseError, netrc
jpayne@7 210
jpayne@7 211 netrc_path = None
jpayne@7 212
jpayne@7 213 for f in netrc_locations:
jpayne@7 214 try:
jpayne@7 215 loc = os.path.expanduser(f)
jpayne@7 216 except KeyError:
jpayne@7 217 # os.path.expanduser can fail when $HOME is undefined and
jpayne@7 218 # getpwuid fails. See https://bugs.python.org/issue20164 &
jpayne@7 219 # https://github.com/psf/requests/issues/1846
jpayne@7 220 return
jpayne@7 221
jpayne@7 222 if os.path.exists(loc):
jpayne@7 223 netrc_path = loc
jpayne@7 224 break
jpayne@7 225
jpayne@7 226 # Abort early if there isn't one.
jpayne@7 227 if netrc_path is None:
jpayne@7 228 return
jpayne@7 229
jpayne@7 230 ri = urlparse(url)
jpayne@7 231
jpayne@7 232 # Strip port numbers from netloc. This weird `if...encode`` dance is
jpayne@7 233 # used for Python 3.2, which doesn't support unicode literals.
jpayne@7 234 splitstr = b":"
jpayne@7 235 if isinstance(url, str):
jpayne@7 236 splitstr = splitstr.decode("ascii")
jpayne@7 237 host = ri.netloc.split(splitstr)[0]
jpayne@7 238
jpayne@7 239 try:
jpayne@7 240 _netrc = netrc(netrc_path).authenticators(host)
jpayne@7 241 if _netrc:
jpayne@7 242 # Return with login / password
jpayne@7 243 login_i = 0 if _netrc[0] else 1
jpayne@7 244 return (_netrc[login_i], _netrc[2])
jpayne@7 245 except (NetrcParseError, OSError):
jpayne@7 246 # If there was a parsing error or a permissions issue reading the file,
jpayne@7 247 # we'll just skip netrc auth unless explicitly asked to raise errors.
jpayne@7 248 if raise_errors:
jpayne@7 249 raise
jpayne@7 250
jpayne@7 251 # App Engine hackiness.
jpayne@7 252 except (ImportError, AttributeError):
jpayne@7 253 pass
jpayne@7 254
jpayne@7 255
jpayne@7 256 def guess_filename(obj):
jpayne@7 257 """Tries to guess the filename of the given object."""
jpayne@7 258 name = getattr(obj, "name", None)
jpayne@7 259 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
jpayne@7 260 return os.path.basename(name)
jpayne@7 261
jpayne@7 262
jpayne@7 263 def extract_zipped_paths(path):
jpayne@7 264 """Replace nonexistent paths that look like they refer to a member of a zip
jpayne@7 265 archive with the location of an extracted copy of the target, or else
jpayne@7 266 just return the provided path unchanged.
jpayne@7 267 """
jpayne@7 268 if os.path.exists(path):
jpayne@7 269 # this is already a valid path, no need to do anything further
jpayne@7 270 return path
jpayne@7 271
jpayne@7 272 # find the first valid part of the provided path and treat that as a zip archive
jpayne@7 273 # assume the rest of the path is the name of a member in the archive
jpayne@7 274 archive, member = os.path.split(path)
jpayne@7 275 while archive and not os.path.exists(archive):
jpayne@7 276 archive, prefix = os.path.split(archive)
jpayne@7 277 if not prefix:
jpayne@7 278 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
jpayne@7 279 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
jpayne@7 280 break
jpayne@7 281 member = "/".join([prefix, member])
jpayne@7 282
jpayne@7 283 if not zipfile.is_zipfile(archive):
jpayne@7 284 return path
jpayne@7 285
jpayne@7 286 zip_file = zipfile.ZipFile(archive)
jpayne@7 287 if member not in zip_file.namelist():
jpayne@7 288 return path
jpayne@7 289
jpayne@7 290 # we have a valid zip archive and a valid member of that archive
jpayne@7 291 tmp = tempfile.gettempdir()
jpayne@7 292 extracted_path = os.path.join(tmp, member.split("/")[-1])
jpayne@7 293 if not os.path.exists(extracted_path):
jpayne@7 294 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
jpayne@7 295 with atomic_open(extracted_path) as file_handler:
jpayne@7 296 file_handler.write(zip_file.read(member))
jpayne@7 297 return extracted_path
jpayne@7 298
jpayne@7 299
jpayne@7 300 @contextlib.contextmanager
jpayne@7 301 def atomic_open(filename):
jpayne@7 302 """Write a file to the disk in an atomic fashion"""
jpayne@7 303 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
jpayne@7 304 try:
jpayne@7 305 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
jpayne@7 306 yield tmp_handler
jpayne@7 307 os.replace(tmp_name, filename)
jpayne@7 308 except BaseException:
jpayne@7 309 os.remove(tmp_name)
jpayne@7 310 raise
jpayne@7 311
jpayne@7 312
jpayne@7 313 def from_key_val_list(value):
jpayne@7 314 """Take an object and test to see if it can be represented as a
jpayne@7 315 dictionary. Unless it can not be represented as such, return an
jpayne@7 316 OrderedDict, e.g.,
jpayne@7 317
jpayne@7 318 ::
jpayne@7 319
jpayne@7 320 >>> from_key_val_list([('key', 'val')])
jpayne@7 321 OrderedDict([('key', 'val')])
jpayne@7 322 >>> from_key_val_list('string')
jpayne@7 323 Traceback (most recent call last):
jpayne@7 324 ...
jpayne@7 325 ValueError: cannot encode objects that are not 2-tuples
jpayne@7 326 >>> from_key_val_list({'key': 'val'})
jpayne@7 327 OrderedDict([('key', 'val')])
jpayne@7 328
jpayne@7 329 :rtype: OrderedDict
jpayne@7 330 """
jpayne@7 331 if value is None:
jpayne@7 332 return None
jpayne@7 333
jpayne@7 334 if isinstance(value, (str, bytes, bool, int)):
jpayne@7 335 raise ValueError("cannot encode objects that are not 2-tuples")
jpayne@7 336
jpayne@7 337 return OrderedDict(value)
jpayne@7 338
jpayne@7 339
jpayne@7 340 def to_key_val_list(value):
jpayne@7 341 """Take an object and test to see if it can be represented as a
jpayne@7 342 dictionary. If it can be, return a list of tuples, e.g.,
jpayne@7 343
jpayne@7 344 ::
jpayne@7 345
jpayne@7 346 >>> to_key_val_list([('key', 'val')])
jpayne@7 347 [('key', 'val')]
jpayne@7 348 >>> to_key_val_list({'key': 'val'})
jpayne@7 349 [('key', 'val')]
jpayne@7 350 >>> to_key_val_list('string')
jpayne@7 351 Traceback (most recent call last):
jpayne@7 352 ...
jpayne@7 353 ValueError: cannot encode objects that are not 2-tuples
jpayne@7 354
jpayne@7 355 :rtype: list
jpayne@7 356 """
jpayne@7 357 if value is None:
jpayne@7 358 return None
jpayne@7 359
jpayne@7 360 if isinstance(value, (str, bytes, bool, int)):
jpayne@7 361 raise ValueError("cannot encode objects that are not 2-tuples")
jpayne@7 362
jpayne@7 363 if isinstance(value, Mapping):
jpayne@7 364 value = value.items()
jpayne@7 365
jpayne@7 366 return list(value)
jpayne@7 367
jpayne@7 368
jpayne@7 369 # From mitsuhiko/werkzeug (used with permission).
jpayne@7 370 def parse_list_header(value):
jpayne@7 371 """Parse lists as described by RFC 2068 Section 2.
jpayne@7 372
jpayne@7 373 In particular, parse comma-separated lists where the elements of
jpayne@7 374 the list may include quoted-strings. A quoted-string could
jpayne@7 375 contain a comma. A non-quoted string could have quotes in the
jpayne@7 376 middle. Quotes are removed automatically after parsing.
jpayne@7 377
jpayne@7 378 It basically works like :func:`parse_set_header` just that items
jpayne@7 379 may appear multiple times and case sensitivity is preserved.
jpayne@7 380
jpayne@7 381 The return value is a standard :class:`list`:
jpayne@7 382
jpayne@7 383 >>> parse_list_header('token, "quoted value"')
jpayne@7 384 ['token', 'quoted value']
jpayne@7 385
jpayne@7 386 To create a header from the :class:`list` again, use the
jpayne@7 387 :func:`dump_header` function.
jpayne@7 388
jpayne@7 389 :param value: a string with a list header.
jpayne@7 390 :return: :class:`list`
jpayne@7 391 :rtype: list
jpayne@7 392 """
jpayne@7 393 result = []
jpayne@7 394 for item in _parse_list_header(value):
jpayne@7 395 if item[:1] == item[-1:] == '"':
jpayne@7 396 item = unquote_header_value(item[1:-1])
jpayne@7 397 result.append(item)
jpayne@7 398 return result
jpayne@7 399
jpayne@7 400
jpayne@7 401 # From mitsuhiko/werkzeug (used with permission).
jpayne@7 402 def parse_dict_header(value):
jpayne@7 403 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
jpayne@7 404 convert them into a python dict:
jpayne@7 405
jpayne@7 406 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
jpayne@7 407 >>> type(d) is dict
jpayne@7 408 True
jpayne@7 409 >>> sorted(d.items())
jpayne@7 410 [('bar', 'as well'), ('foo', 'is a fish')]
jpayne@7 411
jpayne@7 412 If there is no value for a key it will be `None`:
jpayne@7 413
jpayne@7 414 >>> parse_dict_header('key_without_value')
jpayne@7 415 {'key_without_value': None}
jpayne@7 416
jpayne@7 417 To create a header from the :class:`dict` again, use the
jpayne@7 418 :func:`dump_header` function.
jpayne@7 419
jpayne@7 420 :param value: a string with a dict header.
jpayne@7 421 :return: :class:`dict`
jpayne@7 422 :rtype: dict
jpayne@7 423 """
jpayne@7 424 result = {}
jpayne@7 425 for item in _parse_list_header(value):
jpayne@7 426 if "=" not in item:
jpayne@7 427 result[item] = None
jpayne@7 428 continue
jpayne@7 429 name, value = item.split("=", 1)
jpayne@7 430 if value[:1] == value[-1:] == '"':
jpayne@7 431 value = unquote_header_value(value[1:-1])
jpayne@7 432 result[name] = value
jpayne@7 433 return result
jpayne@7 434
jpayne@7 435
jpayne@7 436 # From mitsuhiko/werkzeug (used with permission).
jpayne@7 437 def unquote_header_value(value, is_filename=False):
jpayne@7 438 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
jpayne@7 439 This does not use the real unquoting but what browsers are actually
jpayne@7 440 using for quoting.
jpayne@7 441
jpayne@7 442 :param value: the header value to unquote.
jpayne@7 443 :rtype: str
jpayne@7 444 """
jpayne@7 445 if value and value[0] == value[-1] == '"':
jpayne@7 446 # this is not the real unquoting, but fixing this so that the
jpayne@7 447 # RFC is met will result in bugs with internet explorer and
jpayne@7 448 # probably some other browsers as well. IE for example is
jpayne@7 449 # uploading files with "C:\foo\bar.txt" as filename
jpayne@7 450 value = value[1:-1]
jpayne@7 451
jpayne@7 452 # if this is a filename and the starting characters look like
jpayne@7 453 # a UNC path, then just return the value without quotes. Using the
jpayne@7 454 # replace sequence below on a UNC path has the effect of turning
jpayne@7 455 # the leading double slash into a single slash and then
jpayne@7 456 # _fix_ie_filename() doesn't work correctly. See #458.
jpayne@7 457 if not is_filename or value[:2] != "\\\\":
jpayne@7 458 return value.replace("\\\\", "\\").replace('\\"', '"')
jpayne@7 459 return value
jpayne@7 460
jpayne@7 461
jpayne@7 462 def dict_from_cookiejar(cj):
jpayne@7 463 """Returns a key/value dictionary from a CookieJar.
jpayne@7 464
jpayne@7 465 :param cj: CookieJar object to extract cookies from.
jpayne@7 466 :rtype: dict
jpayne@7 467 """
jpayne@7 468
jpayne@7 469 cookie_dict = {}
jpayne@7 470
jpayne@7 471 for cookie in cj:
jpayne@7 472 cookie_dict[cookie.name] = cookie.value
jpayne@7 473
jpayne@7 474 return cookie_dict
jpayne@7 475
jpayne@7 476
jpayne@7 477 def add_dict_to_cookiejar(cj, cookie_dict):
jpayne@7 478 """Returns a CookieJar from a key/value dictionary.
jpayne@7 479
jpayne@7 480 :param cj: CookieJar to insert cookies into.
jpayne@7 481 :param cookie_dict: Dict of key/values to insert into CookieJar.
jpayne@7 482 :rtype: CookieJar
jpayne@7 483 """
jpayne@7 484
jpayne@7 485 return cookiejar_from_dict(cookie_dict, cj)
jpayne@7 486
jpayne@7 487
jpayne@7 488 def get_encodings_from_content(content):
jpayne@7 489 """Returns encodings from given content string.
jpayne@7 490
jpayne@7 491 :param content: bytestring to extract encodings from.
jpayne@7 492 """
jpayne@7 493 warnings.warn(
jpayne@7 494 (
jpayne@7 495 "In requests 3.0, get_encodings_from_content will be removed. For "
jpayne@7 496 "more information, please see the discussion on issue #2266. (This"
jpayne@7 497 " warning should only appear once.)"
jpayne@7 498 ),
jpayne@7 499 DeprecationWarning,
jpayne@7 500 )
jpayne@7 501
jpayne@7 502 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
jpayne@7 503 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
jpayne@7 504 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
jpayne@7 505
jpayne@7 506 return (
jpayne@7 507 charset_re.findall(content)
jpayne@7 508 + pragma_re.findall(content)
jpayne@7 509 + xml_re.findall(content)
jpayne@7 510 )
jpayne@7 511
jpayne@7 512
jpayne@7 513 def _parse_content_type_header(header):
jpayne@7 514 """Returns content type and parameters from given header
jpayne@7 515
jpayne@7 516 :param header: string
jpayne@7 517 :return: tuple containing content type and dictionary of
jpayne@7 518 parameters
jpayne@7 519 """
jpayne@7 520
jpayne@7 521 tokens = header.split(";")
jpayne@7 522 content_type, params = tokens[0].strip(), tokens[1:]
jpayne@7 523 params_dict = {}
jpayne@7 524 items_to_strip = "\"' "
jpayne@7 525
jpayne@7 526 for param in params:
jpayne@7 527 param = param.strip()
jpayne@7 528 if param:
jpayne@7 529 key, value = param, True
jpayne@7 530 index_of_equals = param.find("=")
jpayne@7 531 if index_of_equals != -1:
jpayne@7 532 key = param[:index_of_equals].strip(items_to_strip)
jpayne@7 533 value = param[index_of_equals + 1 :].strip(items_to_strip)
jpayne@7 534 params_dict[key.lower()] = value
jpayne@7 535 return content_type, params_dict
jpayne@7 536
jpayne@7 537
jpayne@7 538 def get_encoding_from_headers(headers):
jpayne@7 539 """Returns encodings from given HTTP Header Dict.
jpayne@7 540
jpayne@7 541 :param headers: dictionary to extract encoding from.
jpayne@7 542 :rtype: str
jpayne@7 543 """
jpayne@7 544
jpayne@7 545 content_type = headers.get("content-type")
jpayne@7 546
jpayne@7 547 if not content_type:
jpayne@7 548 return None
jpayne@7 549
jpayne@7 550 content_type, params = _parse_content_type_header(content_type)
jpayne@7 551
jpayne@7 552 if "charset" in params:
jpayne@7 553 return params["charset"].strip("'\"")
jpayne@7 554
jpayne@7 555 if "text" in content_type:
jpayne@7 556 return "ISO-8859-1"
jpayne@7 557
jpayne@7 558 if "application/json" in content_type:
jpayne@7 559 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
jpayne@7 560 return "utf-8"
jpayne@7 561
jpayne@7 562
jpayne@7 563 def stream_decode_response_unicode(iterator, r):
jpayne@7 564 """Stream decodes an iterator."""
jpayne@7 565
jpayne@7 566 if r.encoding is None:
jpayne@7 567 yield from iterator
jpayne@7 568 return
jpayne@7 569
jpayne@7 570 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
jpayne@7 571 for chunk in iterator:
jpayne@7 572 rv = decoder.decode(chunk)
jpayne@7 573 if rv:
jpayne@7 574 yield rv
jpayne@7 575 rv = decoder.decode(b"", final=True)
jpayne@7 576 if rv:
jpayne@7 577 yield rv
jpayne@7 578
jpayne@7 579
jpayne@7 580 def iter_slices(string, slice_length):
jpayne@7 581 """Iterate over slices of a string."""
jpayne@7 582 pos = 0
jpayne@7 583 if slice_length is None or slice_length <= 0:
jpayne@7 584 slice_length = len(string)
jpayne@7 585 while pos < len(string):
jpayne@7 586 yield string[pos : pos + slice_length]
jpayne@7 587 pos += slice_length
jpayne@7 588
jpayne@7 589
jpayne@7 590 def get_unicode_from_response(r):
jpayne@7 591 """Returns the requested content back in unicode.
jpayne@7 592
jpayne@7 593 :param r: Response object to get unicode content from.
jpayne@7 594
jpayne@7 595 Tried:
jpayne@7 596
jpayne@7 597 1. charset from content-type
jpayne@7 598 2. fall back and replace all unicode characters
jpayne@7 599
jpayne@7 600 :rtype: str
jpayne@7 601 """
jpayne@7 602 warnings.warn(
jpayne@7 603 (
jpayne@7 604 "In requests 3.0, get_unicode_from_response will be removed. For "
jpayne@7 605 "more information, please see the discussion on issue #2266. (This"
jpayne@7 606 " warning should only appear once.)"
jpayne@7 607 ),
jpayne@7 608 DeprecationWarning,
jpayne@7 609 )
jpayne@7 610
jpayne@7 611 tried_encodings = []
jpayne@7 612
jpayne@7 613 # Try charset from content-type
jpayne@7 614 encoding = get_encoding_from_headers(r.headers)
jpayne@7 615
jpayne@7 616 if encoding:
jpayne@7 617 try:
jpayne@7 618 return str(r.content, encoding)
jpayne@7 619 except UnicodeError:
jpayne@7 620 tried_encodings.append(encoding)
jpayne@7 621
jpayne@7 622 # Fall back:
jpayne@7 623 try:
jpayne@7 624 return str(r.content, encoding, errors="replace")
jpayne@7 625 except TypeError:
jpayne@7 626 return r.content
jpayne@7 627
jpayne@7 628
jpayne@7 629 # The unreserved URI characters (RFC 3986)
jpayne@7 630 UNRESERVED_SET = frozenset(
jpayne@7 631 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
jpayne@7 632 )
jpayne@7 633
jpayne@7 634
jpayne@7 635 def unquote_unreserved(uri):
jpayne@7 636 """Un-escape any percent-escape sequences in a URI that are unreserved
jpayne@7 637 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
jpayne@7 638
jpayne@7 639 :rtype: str
jpayne@7 640 """
jpayne@7 641 parts = uri.split("%")
jpayne@7 642 for i in range(1, len(parts)):
jpayne@7 643 h = parts[i][0:2]
jpayne@7 644 if len(h) == 2 and h.isalnum():
jpayne@7 645 try:
jpayne@7 646 c = chr(int(h, 16))
jpayne@7 647 except ValueError:
jpayne@7 648 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
jpayne@7 649
jpayne@7 650 if c in UNRESERVED_SET:
jpayne@7 651 parts[i] = c + parts[i][2:]
jpayne@7 652 else:
jpayne@7 653 parts[i] = f"%{parts[i]}"
jpayne@7 654 else:
jpayne@7 655 parts[i] = f"%{parts[i]}"
jpayne@7 656 return "".join(parts)
jpayne@7 657
jpayne@7 658
jpayne@7 659 def requote_uri(uri):
jpayne@7 660 """Re-quote the given URI.
jpayne@7 661
jpayne@7 662 This function passes the given URI through an unquote/quote cycle to
jpayne@7 663 ensure that it is fully and consistently quoted.
jpayne@7 664
jpayne@7 665 :rtype: str
jpayne@7 666 """
jpayne@7 667 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
jpayne@7 668 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
jpayne@7 669 try:
jpayne@7 670 # Unquote only the unreserved characters
jpayne@7 671 # Then quote only illegal characters (do not quote reserved,
jpayne@7 672 # unreserved, or '%')
jpayne@7 673 return quote(unquote_unreserved(uri), safe=safe_with_percent)
jpayne@7 674 except InvalidURL:
jpayne@7 675 # We couldn't unquote the given URI, so let's try quoting it, but
jpayne@7 676 # there may be unquoted '%'s in the URI. We need to make sure they're
jpayne@7 677 # properly quoted so they do not cause issues elsewhere.
jpayne@7 678 return quote(uri, safe=safe_without_percent)
jpayne@7 679
jpayne@7 680
jpayne@7 681 def address_in_network(ip, net):
jpayne@7 682 """This function allows you to check if an IP belongs to a network subnet
jpayne@7 683
jpayne@7 684 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
jpayne@7 685 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
jpayne@7 686
jpayne@7 687 :rtype: bool
jpayne@7 688 """
jpayne@7 689 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
jpayne@7 690 netaddr, bits = net.split("/")
jpayne@7 691 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
jpayne@7 692 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
jpayne@7 693 return (ipaddr & netmask) == (network & netmask)
jpayne@7 694
jpayne@7 695
jpayne@7 696 def dotted_netmask(mask):
jpayne@7 697 """Converts mask from /xx format to xxx.xxx.xxx.xxx
jpayne@7 698
jpayne@7 699 Example: if mask is 24 function returns 255.255.255.0
jpayne@7 700
jpayne@7 701 :rtype: str
jpayne@7 702 """
jpayne@7 703 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
jpayne@7 704 return socket.inet_ntoa(struct.pack(">I", bits))
jpayne@7 705
jpayne@7 706
jpayne@7 707 def is_ipv4_address(string_ip):
jpayne@7 708 """
jpayne@7 709 :rtype: bool
jpayne@7 710 """
jpayne@7 711 try:
jpayne@7 712 socket.inet_aton(string_ip)
jpayne@7 713 except OSError:
jpayne@7 714 return False
jpayne@7 715 return True
jpayne@7 716
jpayne@7 717
jpayne@7 718 def is_valid_cidr(string_network):
jpayne@7 719 """
jpayne@7 720 Very simple check of the cidr format in no_proxy variable.
jpayne@7 721
jpayne@7 722 :rtype: bool
jpayne@7 723 """
jpayne@7 724 if string_network.count("/") == 1:
jpayne@7 725 try:
jpayne@7 726 mask = int(string_network.split("/")[1])
jpayne@7 727 except ValueError:
jpayne@7 728 return False
jpayne@7 729
jpayne@7 730 if mask < 1 or mask > 32:
jpayne@7 731 return False
jpayne@7 732
jpayne@7 733 try:
jpayne@7 734 socket.inet_aton(string_network.split("/")[0])
jpayne@7 735 except OSError:
jpayne@7 736 return False
jpayne@7 737 else:
jpayne@7 738 return False
jpayne@7 739 return True
jpayne@7 740
jpayne@7 741
jpayne@7 742 @contextlib.contextmanager
jpayne@7 743 def set_environ(env_name, value):
jpayne@7 744 """Set the environment variable 'env_name' to 'value'
jpayne@7 745
jpayne@7 746 Save previous value, yield, and then restore the previous value stored in
jpayne@7 747 the environment variable 'env_name'.
jpayne@7 748
jpayne@7 749 If 'value' is None, do nothing"""
jpayne@7 750 value_changed = value is not None
jpayne@7 751 if value_changed:
jpayne@7 752 old_value = os.environ.get(env_name)
jpayne@7 753 os.environ[env_name] = value
jpayne@7 754 try:
jpayne@7 755 yield
jpayne@7 756 finally:
jpayne@7 757 if value_changed:
jpayne@7 758 if old_value is None:
jpayne@7 759 del os.environ[env_name]
jpayne@7 760 else:
jpayne@7 761 os.environ[env_name] = old_value
jpayne@7 762
jpayne@7 763
jpayne@7 764 def should_bypass_proxies(url, no_proxy):
jpayne@7 765 """
jpayne@7 766 Returns whether we should bypass proxies or not.
jpayne@7 767
jpayne@7 768 :rtype: bool
jpayne@7 769 """
jpayne@7 770 # Prioritize lowercase environment variables over uppercase
jpayne@7 771 # to keep a consistent behaviour with other http projects (curl, wget).
jpayne@7 772 def get_proxy(key):
jpayne@7 773 return os.environ.get(key) or os.environ.get(key.upper())
jpayne@7 774
jpayne@7 775 # First check whether no_proxy is defined. If it is, check that the URL
jpayne@7 776 # we're getting isn't in the no_proxy list.
jpayne@7 777 no_proxy_arg = no_proxy
jpayne@7 778 if no_proxy is None:
jpayne@7 779 no_proxy = get_proxy("no_proxy")
jpayne@7 780 parsed = urlparse(url)
jpayne@7 781
jpayne@7 782 if parsed.hostname is None:
jpayne@7 783 # URLs don't always have hostnames, e.g. file:/// urls.
jpayne@7 784 return True
jpayne@7 785
jpayne@7 786 if no_proxy:
jpayne@7 787 # We need to check whether we match here. We need to see if we match
jpayne@7 788 # the end of the hostname, both with and without the port.
jpayne@7 789 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
jpayne@7 790
jpayne@7 791 if is_ipv4_address(parsed.hostname):
jpayne@7 792 for proxy_ip in no_proxy:
jpayne@7 793 if is_valid_cidr(proxy_ip):
jpayne@7 794 if address_in_network(parsed.hostname, proxy_ip):
jpayne@7 795 return True
jpayne@7 796 elif parsed.hostname == proxy_ip:
jpayne@7 797 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
jpayne@7 798 # matches the IP of the index
jpayne@7 799 return True
jpayne@7 800 else:
jpayne@7 801 host_with_port = parsed.hostname
jpayne@7 802 if parsed.port:
jpayne@7 803 host_with_port += f":{parsed.port}"
jpayne@7 804
jpayne@7 805 for host in no_proxy:
jpayne@7 806 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
jpayne@7 807 # The URL does match something in no_proxy, so we don't want
jpayne@7 808 # to apply the proxies on this URL.
jpayne@7 809 return True
jpayne@7 810
jpayne@7 811 with set_environ("no_proxy", no_proxy_arg):
jpayne@7 812 # parsed.hostname can be `None` in cases such as a file URI.
jpayne@7 813 try:
jpayne@7 814 bypass = proxy_bypass(parsed.hostname)
jpayne@7 815 except (TypeError, socket.gaierror):
jpayne@7 816 bypass = False
jpayne@7 817
jpayne@7 818 if bypass:
jpayne@7 819 return True
jpayne@7 820
jpayne@7 821 return False
jpayne@7 822
jpayne@7 823
jpayne@7 824 def get_environ_proxies(url, no_proxy=None):
jpayne@7 825 """
jpayne@7 826 Return a dict of environment proxies.
jpayne@7 827
jpayne@7 828 :rtype: dict
jpayne@7 829 """
jpayne@7 830 if should_bypass_proxies(url, no_proxy=no_proxy):
jpayne@7 831 return {}
jpayne@7 832 else:
jpayne@7 833 return getproxies()
jpayne@7 834
jpayne@7 835
jpayne@7 836 def select_proxy(url, proxies):
jpayne@7 837 """Select a proxy for the url, if applicable.
jpayne@7 838
jpayne@7 839 :param url: The url being for the request
jpayne@7 840 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
jpayne@7 841 """
jpayne@7 842 proxies = proxies or {}
jpayne@7 843 urlparts = urlparse(url)
jpayne@7 844 if urlparts.hostname is None:
jpayne@7 845 return proxies.get(urlparts.scheme, proxies.get("all"))
jpayne@7 846
jpayne@7 847 proxy_keys = [
jpayne@7 848 urlparts.scheme + "://" + urlparts.hostname,
jpayne@7 849 urlparts.scheme,
jpayne@7 850 "all://" + urlparts.hostname,
jpayne@7 851 "all",
jpayne@7 852 ]
jpayne@7 853 proxy = None
jpayne@7 854 for proxy_key in proxy_keys:
jpayne@7 855 if proxy_key in proxies:
jpayne@7 856 proxy = proxies[proxy_key]
jpayne@7 857 break
jpayne@7 858
jpayne@7 859 return proxy
jpayne@7 860
jpayne@7 861
jpayne@7 862 def resolve_proxies(request, proxies, trust_env=True):
jpayne@7 863 """This method takes proxy information from a request and configuration
jpayne@7 864 input to resolve a mapping of target proxies. This will consider settings
jpayne@7 865 such a NO_PROXY to strip proxy configurations.
jpayne@7 866
jpayne@7 867 :param request: Request or PreparedRequest
jpayne@7 868 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
jpayne@7 869 :param trust_env: Boolean declaring whether to trust environment configs
jpayne@7 870
jpayne@7 871 :rtype: dict
jpayne@7 872 """
jpayne@7 873 proxies = proxies if proxies is not None else {}
jpayne@7 874 url = request.url
jpayne@7 875 scheme = urlparse(url).scheme
jpayne@7 876 no_proxy = proxies.get("no_proxy")
jpayne@7 877 new_proxies = proxies.copy()
jpayne@7 878
jpayne@7 879 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
jpayne@7 880 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
jpayne@7 881
jpayne@7 882 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
jpayne@7 883
jpayne@7 884 if proxy:
jpayne@7 885 new_proxies.setdefault(scheme, proxy)
jpayne@7 886 return new_proxies
jpayne@7 887
jpayne@7 888
jpayne@7 889 def default_user_agent(name="python-requests"):
jpayne@7 890 """
jpayne@7 891 Return a string representing the default user agent.
jpayne@7 892
jpayne@7 893 :rtype: str
jpayne@7 894 """
jpayne@7 895 return f"{name}/{__version__}"
jpayne@7 896
jpayne@7 897
jpayne@7 898 def default_headers():
jpayne@7 899 """
jpayne@7 900 :rtype: requests.structures.CaseInsensitiveDict
jpayne@7 901 """
jpayne@7 902 return CaseInsensitiveDict(
jpayne@7 903 {
jpayne@7 904 "User-Agent": default_user_agent(),
jpayne@7 905 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
jpayne@7 906 "Accept": "*/*",
jpayne@7 907 "Connection": "keep-alive",
jpayne@7 908 }
jpayne@7 909 )
jpayne@7 910
jpayne@7 911
jpayne@7 912 def parse_header_links(value):
jpayne@7 913 """Return a list of parsed link headers proxies.
jpayne@7 914
jpayne@7 915 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
jpayne@7 916
jpayne@7 917 :rtype: list
jpayne@7 918 """
jpayne@7 919
jpayne@7 920 links = []
jpayne@7 921
jpayne@7 922 replace_chars = " '\""
jpayne@7 923
jpayne@7 924 value = value.strip(replace_chars)
jpayne@7 925 if not value:
jpayne@7 926 return links
jpayne@7 927
jpayne@7 928 for val in re.split(", *<", value):
jpayne@7 929 try:
jpayne@7 930 url, params = val.split(";", 1)
jpayne@7 931 except ValueError:
jpayne@7 932 url, params = val, ""
jpayne@7 933
jpayne@7 934 link = {"url": url.strip("<> '\"")}
jpayne@7 935
jpayne@7 936 for param in params.split(";"):
jpayne@7 937 try:
jpayne@7 938 key, value = param.split("=")
jpayne@7 939 except ValueError:
jpayne@7 940 break
jpayne@7 941
jpayne@7 942 link[key.strip(replace_chars)] = value.strip(replace_chars)
jpayne@7 943
jpayne@7 944 links.append(link)
jpayne@7 945
jpayne@7 946 return links
jpayne@7 947
jpayne@7 948
jpayne@7 949 # Null bytes; no need to recreate these on each call to guess_json_utf
jpayne@7 950 _null = "\x00".encode("ascii") # encoding to ASCII for Python 3
jpayne@7 951 _null2 = _null * 2
jpayne@7 952 _null3 = _null * 3
jpayne@7 953
jpayne@7 954
jpayne@7 955 def guess_json_utf(data):
jpayne@7 956 """
jpayne@7 957 :rtype: str
jpayne@7 958 """
jpayne@7 959 # JSON always starts with two ASCII characters, so detection is as
jpayne@7 960 # easy as counting the nulls and from their location and count
jpayne@7 961 # determine the encoding. Also detect a BOM, if present.
jpayne@7 962 sample = data[:4]
jpayne@7 963 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
jpayne@7 964 return "utf-32" # BOM included
jpayne@7 965 if sample[:3] == codecs.BOM_UTF8:
jpayne@7 966 return "utf-8-sig" # BOM included, MS style (discouraged)
jpayne@7 967 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
jpayne@7 968 return "utf-16" # BOM included
jpayne@7 969 nullcount = sample.count(_null)
jpayne@7 970 if nullcount == 0:
jpayne@7 971 return "utf-8"
jpayne@7 972 if nullcount == 2:
jpayne@7 973 if sample[::2] == _null2: # 1st and 3rd are null
jpayne@7 974 return "utf-16-be"
jpayne@7 975 if sample[1::2] == _null2: # 2nd and 4th are null
jpayne@7 976 return "utf-16-le"
jpayne@7 977 # Did not detect 2 valid UTF-16 ascii-range characters
jpayne@7 978 if nullcount == 3:
jpayne@7 979 if sample[:3] == _null3:
jpayne@7 980 return "utf-32-be"
jpayne@7 981 if sample[1:] == _null3:
jpayne@7 982 return "utf-32-le"
jpayne@7 983 # Did not detect a valid UTF-32 ascii-range character
jpayne@7 984 return None
jpayne@7 985
jpayne@7 986
jpayne@7 987 def prepend_scheme_if_needed(url, new_scheme):
jpayne@7 988 """Given a URL that may or may not have a scheme, prepend the given scheme.
jpayne@7 989 Does not replace a present scheme with the one provided as an argument.
jpayne@7 990
jpayne@7 991 :rtype: str
jpayne@7 992 """
jpayne@7 993 parsed = parse_url(url)
jpayne@7 994 scheme, auth, host, port, path, query, fragment = parsed
jpayne@7 995
jpayne@7 996 # A defect in urlparse determines that there isn't a netloc present in some
jpayne@7 997 # urls. We previously assumed parsing was overly cautious, and swapped the
jpayne@7 998 # netloc and path. Due to a lack of tests on the original defect, this is
jpayne@7 999 # maintained with parse_url for backwards compatibility.
jpayne@7 1000 netloc = parsed.netloc
jpayne@7 1001 if not netloc:
jpayne@7 1002 netloc, path = path, netloc
jpayne@7 1003
jpayne@7 1004 if auth:
jpayne@7 1005 # parse_url doesn't provide the netloc with auth
jpayne@7 1006 # so we'll add it ourselves.
jpayne@7 1007 netloc = "@".join([auth, netloc])
jpayne@7 1008 if scheme is None:
jpayne@7 1009 scheme = new_scheme
jpayne@7 1010 if path is None:
jpayne@7 1011 path = ""
jpayne@7 1012
jpayne@7 1013 return urlunparse((scheme, netloc, path, "", query, fragment))
jpayne@7 1014
jpayne@7 1015
jpayne@7 1016 def get_auth_from_url(url):
jpayne@7 1017 """Given a url with authentication components, extract them into a tuple of
jpayne@7 1018 username,password.
jpayne@7 1019
jpayne@7 1020 :rtype: (str,str)
jpayne@7 1021 """
jpayne@7 1022 parsed = urlparse(url)
jpayne@7 1023
jpayne@7 1024 try:
jpayne@7 1025 auth = (unquote(parsed.username), unquote(parsed.password))
jpayne@7 1026 except (AttributeError, TypeError):
jpayne@7 1027 auth = ("", "")
jpayne@7 1028
jpayne@7 1029 return auth
jpayne@7 1030
jpayne@7 1031
jpayne@7 1032 def check_header_validity(header):
jpayne@7 1033 """Verifies that header parts don't contain leading whitespace
jpayne@7 1034 reserved characters, or return characters.
jpayne@7 1035
jpayne@7 1036 :param header: tuple, in the format (name, value).
jpayne@7 1037 """
jpayne@7 1038 name, value = header
jpayne@7 1039 _validate_header_part(header, name, 0)
jpayne@7 1040 _validate_header_part(header, value, 1)
jpayne@7 1041
jpayne@7 1042
jpayne@7 1043 def _validate_header_part(header, header_part, header_validator_index):
jpayne@7 1044 if isinstance(header_part, str):
jpayne@7 1045 validator = _HEADER_VALIDATORS_STR[header_validator_index]
jpayne@7 1046 elif isinstance(header_part, bytes):
jpayne@7 1047 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
jpayne@7 1048 else:
jpayne@7 1049 raise InvalidHeader(
jpayne@7 1050 f"Header part ({header_part!r}) from {header} "
jpayne@7 1051 f"must be of type str or bytes, not {type(header_part)}"
jpayne@7 1052 )
jpayne@7 1053
jpayne@7 1054 if not validator.match(header_part):
jpayne@7 1055 header_kind = "name" if header_validator_index == 0 else "value"
jpayne@7 1056 raise InvalidHeader(
jpayne@7 1057 f"Invalid leading whitespace, reserved character(s), or return"
jpayne@7 1058 f"character(s) in header {header_kind}: {header_part!r}"
jpayne@7 1059 )
jpayne@7 1060
jpayne@7 1061
jpayne@7 1062 def urldefragauth(url):
jpayne@7 1063 """
jpayne@7 1064 Given a url remove the fragment and the authentication part.
jpayne@7 1065
jpayne@7 1066 :rtype: str
jpayne@7 1067 """
jpayne@7 1068 scheme, netloc, path, params, query, fragment = urlparse(url)
jpayne@7 1069
jpayne@7 1070 # see func:`prepend_scheme_if_needed`
jpayne@7 1071 if not netloc:
jpayne@7 1072 netloc, path = path, netloc
jpayne@7 1073
jpayne@7 1074 netloc = netloc.rsplit("@", 1)[-1]
jpayne@7 1075
jpayne@7 1076 return urlunparse((scheme, netloc, path, params, query, ""))
jpayne@7 1077
jpayne@7 1078
jpayne@7 1079 def rewind_body(prepared_request):
jpayne@7 1080 """Move file pointer back to its recorded starting position
jpayne@7 1081 so it can be read again on redirect.
jpayne@7 1082 """
jpayne@7 1083 body_seek = getattr(prepared_request.body, "seek", None)
jpayne@7 1084 if body_seek is not None and isinstance(
jpayne@7 1085 prepared_request._body_position, integer_types
jpayne@7 1086 ):
jpayne@7 1087 try:
jpayne@7 1088 body_seek(prepared_request._body_position)
jpayne@7 1089 except OSError:
jpayne@7 1090 raise UnrewindableBodyError(
jpayne@7 1091 "An error occurred when rewinding request body for redirect."
jpayne@7 1092 )
jpayne@7 1093 else:
jpayne@7 1094 raise UnrewindableBodyError("Unable to rewind request body for redirect.")