annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/requests/auth.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 """
jpayne@68 2 requests.auth
jpayne@68 3 ~~~~~~~~~~~~~
jpayne@68 4
jpayne@68 5 This module contains the authentication handlers for Requests.
jpayne@68 6 """
jpayne@68 7
jpayne@68 8 import hashlib
jpayne@68 9 import os
jpayne@68 10 import re
jpayne@68 11 import threading
jpayne@68 12 import time
jpayne@68 13 import warnings
jpayne@68 14 from base64 import b64encode
jpayne@68 15
jpayne@68 16 from ._internal_utils import to_native_string
jpayne@68 17 from .compat import basestring, str, urlparse
jpayne@68 18 from .cookies import extract_cookies_to_jar
jpayne@68 19 from .utils import parse_dict_header
jpayne@68 20
jpayne@68 21 CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"
jpayne@68 22 CONTENT_TYPE_MULTI_PART = "multipart/form-data"
jpayne@68 23
jpayne@68 24
jpayne@68 25 def _basic_auth_str(username, password):
jpayne@68 26 """Returns a Basic Auth string."""
jpayne@68 27
jpayne@68 28 # "I want us to put a big-ol' comment on top of it that
jpayne@68 29 # says that this behaviour is dumb but we need to preserve
jpayne@68 30 # it because people are relying on it."
jpayne@68 31 # - Lukasa
jpayne@68 32 #
jpayne@68 33 # These are here solely to maintain backwards compatibility
jpayne@68 34 # for things like ints. This will be removed in 3.0.0.
jpayne@68 35 if not isinstance(username, basestring):
jpayne@68 36 warnings.warn(
jpayne@68 37 "Non-string usernames will no longer be supported in Requests "
jpayne@68 38 "3.0.0. Please convert the object you've passed in ({!r}) to "
jpayne@68 39 "a string or bytes object in the near future to avoid "
jpayne@68 40 "problems.".format(username),
jpayne@68 41 category=DeprecationWarning,
jpayne@68 42 )
jpayne@68 43 username = str(username)
jpayne@68 44
jpayne@68 45 if not isinstance(password, basestring):
jpayne@68 46 warnings.warn(
jpayne@68 47 "Non-string passwords will no longer be supported in Requests "
jpayne@68 48 "3.0.0. Please convert the object you've passed in ({!r}) to "
jpayne@68 49 "a string or bytes object in the near future to avoid "
jpayne@68 50 "problems.".format(type(password)),
jpayne@68 51 category=DeprecationWarning,
jpayne@68 52 )
jpayne@68 53 password = str(password)
jpayne@68 54 # -- End Removal --
jpayne@68 55
jpayne@68 56 if isinstance(username, str):
jpayne@68 57 username = username.encode("latin1")
jpayne@68 58
jpayne@68 59 if isinstance(password, str):
jpayne@68 60 password = password.encode("latin1")
jpayne@68 61
jpayne@68 62 authstr = "Basic " + to_native_string(
jpayne@68 63 b64encode(b":".join((username, password))).strip()
jpayne@68 64 )
jpayne@68 65
jpayne@68 66 return authstr
jpayne@68 67
jpayne@68 68
jpayne@68 69 class AuthBase:
jpayne@68 70 """Base class that all auth implementations derive from"""
jpayne@68 71
jpayne@68 72 def __call__(self, r):
jpayne@68 73 raise NotImplementedError("Auth hooks must be callable.")
jpayne@68 74
jpayne@68 75
jpayne@68 76 class HTTPBasicAuth(AuthBase):
jpayne@68 77 """Attaches HTTP Basic Authentication to the given Request object."""
jpayne@68 78
jpayne@68 79 def __init__(self, username, password):
jpayne@68 80 self.username = username
jpayne@68 81 self.password = password
jpayne@68 82
jpayne@68 83 def __eq__(self, other):
jpayne@68 84 return all(
jpayne@68 85 [
jpayne@68 86 self.username == getattr(other, "username", None),
jpayne@68 87 self.password == getattr(other, "password", None),
jpayne@68 88 ]
jpayne@68 89 )
jpayne@68 90
jpayne@68 91 def __ne__(self, other):
jpayne@68 92 return not self == other
jpayne@68 93
jpayne@68 94 def __call__(self, r):
jpayne@68 95 r.headers["Authorization"] = _basic_auth_str(self.username, self.password)
jpayne@68 96 return r
jpayne@68 97
jpayne@68 98
jpayne@68 99 class HTTPProxyAuth(HTTPBasicAuth):
jpayne@68 100 """Attaches HTTP Proxy Authentication to a given Request object."""
jpayne@68 101
jpayne@68 102 def __call__(self, r):
jpayne@68 103 r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password)
jpayne@68 104 return r
jpayne@68 105
jpayne@68 106
jpayne@68 107 class HTTPDigestAuth(AuthBase):
jpayne@68 108 """Attaches HTTP Digest Authentication to the given Request object."""
jpayne@68 109
jpayne@68 110 def __init__(self, username, password):
jpayne@68 111 self.username = username
jpayne@68 112 self.password = password
jpayne@68 113 # Keep state in per-thread local storage
jpayne@68 114 self._thread_local = threading.local()
jpayne@68 115
jpayne@68 116 def init_per_thread_state(self):
jpayne@68 117 # Ensure state is initialized just once per-thread
jpayne@68 118 if not hasattr(self._thread_local, "init"):
jpayne@68 119 self._thread_local.init = True
jpayne@68 120 self._thread_local.last_nonce = ""
jpayne@68 121 self._thread_local.nonce_count = 0
jpayne@68 122 self._thread_local.chal = {}
jpayne@68 123 self._thread_local.pos = None
jpayne@68 124 self._thread_local.num_401_calls = None
jpayne@68 125
jpayne@68 126 def build_digest_header(self, method, url):
jpayne@68 127 """
jpayne@68 128 :rtype: str
jpayne@68 129 """
jpayne@68 130
jpayne@68 131 realm = self._thread_local.chal["realm"]
jpayne@68 132 nonce = self._thread_local.chal["nonce"]
jpayne@68 133 qop = self._thread_local.chal.get("qop")
jpayne@68 134 algorithm = self._thread_local.chal.get("algorithm")
jpayne@68 135 opaque = self._thread_local.chal.get("opaque")
jpayne@68 136 hash_utf8 = None
jpayne@68 137
jpayne@68 138 if algorithm is None:
jpayne@68 139 _algorithm = "MD5"
jpayne@68 140 else:
jpayne@68 141 _algorithm = algorithm.upper()
jpayne@68 142 # lambdas assume digest modules are imported at the top level
jpayne@68 143 if _algorithm == "MD5" or _algorithm == "MD5-SESS":
jpayne@68 144
jpayne@68 145 def md5_utf8(x):
jpayne@68 146 if isinstance(x, str):
jpayne@68 147 x = x.encode("utf-8")
jpayne@68 148 return hashlib.md5(x).hexdigest()
jpayne@68 149
jpayne@68 150 hash_utf8 = md5_utf8
jpayne@68 151 elif _algorithm == "SHA":
jpayne@68 152
jpayne@68 153 def sha_utf8(x):
jpayne@68 154 if isinstance(x, str):
jpayne@68 155 x = x.encode("utf-8")
jpayne@68 156 return hashlib.sha1(x).hexdigest()
jpayne@68 157
jpayne@68 158 hash_utf8 = sha_utf8
jpayne@68 159 elif _algorithm == "SHA-256":
jpayne@68 160
jpayne@68 161 def sha256_utf8(x):
jpayne@68 162 if isinstance(x, str):
jpayne@68 163 x = x.encode("utf-8")
jpayne@68 164 return hashlib.sha256(x).hexdigest()
jpayne@68 165
jpayne@68 166 hash_utf8 = sha256_utf8
jpayne@68 167 elif _algorithm == "SHA-512":
jpayne@68 168
jpayne@68 169 def sha512_utf8(x):
jpayne@68 170 if isinstance(x, str):
jpayne@68 171 x = x.encode("utf-8")
jpayne@68 172 return hashlib.sha512(x).hexdigest()
jpayne@68 173
jpayne@68 174 hash_utf8 = sha512_utf8
jpayne@68 175
jpayne@68 176 KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731
jpayne@68 177
jpayne@68 178 if hash_utf8 is None:
jpayne@68 179 return None
jpayne@68 180
jpayne@68 181 # XXX not implemented yet
jpayne@68 182 entdig = None
jpayne@68 183 p_parsed = urlparse(url)
jpayne@68 184 #: path is request-uri defined in RFC 2616 which should not be empty
jpayne@68 185 path = p_parsed.path or "/"
jpayne@68 186 if p_parsed.query:
jpayne@68 187 path += f"?{p_parsed.query}"
jpayne@68 188
jpayne@68 189 A1 = f"{self.username}:{realm}:{self.password}"
jpayne@68 190 A2 = f"{method}:{path}"
jpayne@68 191
jpayne@68 192 HA1 = hash_utf8(A1)
jpayne@68 193 HA2 = hash_utf8(A2)
jpayne@68 194
jpayne@68 195 if nonce == self._thread_local.last_nonce:
jpayne@68 196 self._thread_local.nonce_count += 1
jpayne@68 197 else:
jpayne@68 198 self._thread_local.nonce_count = 1
jpayne@68 199 ncvalue = f"{self._thread_local.nonce_count:08x}"
jpayne@68 200 s = str(self._thread_local.nonce_count).encode("utf-8")
jpayne@68 201 s += nonce.encode("utf-8")
jpayne@68 202 s += time.ctime().encode("utf-8")
jpayne@68 203 s += os.urandom(8)
jpayne@68 204
jpayne@68 205 cnonce = hashlib.sha1(s).hexdigest()[:16]
jpayne@68 206 if _algorithm == "MD5-SESS":
jpayne@68 207 HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}")
jpayne@68 208
jpayne@68 209 if not qop:
jpayne@68 210 respdig = KD(HA1, f"{nonce}:{HA2}")
jpayne@68 211 elif qop == "auth" or "auth" in qop.split(","):
jpayne@68 212 noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}"
jpayne@68 213 respdig = KD(HA1, noncebit)
jpayne@68 214 else:
jpayne@68 215 # XXX handle auth-int.
jpayne@68 216 return None
jpayne@68 217
jpayne@68 218 self._thread_local.last_nonce = nonce
jpayne@68 219
jpayne@68 220 # XXX should the partial digests be encoded too?
jpayne@68 221 base = (
jpayne@68 222 f'username="{self.username}", realm="{realm}", nonce="{nonce}", '
jpayne@68 223 f'uri="{path}", response="{respdig}"'
jpayne@68 224 )
jpayne@68 225 if opaque:
jpayne@68 226 base += f', opaque="{opaque}"'
jpayne@68 227 if algorithm:
jpayne@68 228 base += f', algorithm="{algorithm}"'
jpayne@68 229 if entdig:
jpayne@68 230 base += f', digest="{entdig}"'
jpayne@68 231 if qop:
jpayne@68 232 base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"'
jpayne@68 233
jpayne@68 234 return f"Digest {base}"
jpayne@68 235
jpayne@68 236 def handle_redirect(self, r, **kwargs):
jpayne@68 237 """Reset num_401_calls counter on redirects."""
jpayne@68 238 if r.is_redirect:
jpayne@68 239 self._thread_local.num_401_calls = 1
jpayne@68 240
jpayne@68 241 def handle_401(self, r, **kwargs):
jpayne@68 242 """
jpayne@68 243 Takes the given response and tries digest-auth, if needed.
jpayne@68 244
jpayne@68 245 :rtype: requests.Response
jpayne@68 246 """
jpayne@68 247
jpayne@68 248 # If response is not 4xx, do not auth
jpayne@68 249 # See https://github.com/psf/requests/issues/3772
jpayne@68 250 if not 400 <= r.status_code < 500:
jpayne@68 251 self._thread_local.num_401_calls = 1
jpayne@68 252 return r
jpayne@68 253
jpayne@68 254 if self._thread_local.pos is not None:
jpayne@68 255 # Rewind the file position indicator of the body to where
jpayne@68 256 # it was to resend the request.
jpayne@68 257 r.request.body.seek(self._thread_local.pos)
jpayne@68 258 s_auth = r.headers.get("www-authenticate", "")
jpayne@68 259
jpayne@68 260 if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2:
jpayne@68 261 self._thread_local.num_401_calls += 1
jpayne@68 262 pat = re.compile(r"digest ", flags=re.IGNORECASE)
jpayne@68 263 self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1))
jpayne@68 264
jpayne@68 265 # Consume content and release the original connection
jpayne@68 266 # to allow our new request to reuse the same one.
jpayne@68 267 r.content
jpayne@68 268 r.close()
jpayne@68 269 prep = r.request.copy()
jpayne@68 270 extract_cookies_to_jar(prep._cookies, r.request, r.raw)
jpayne@68 271 prep.prepare_cookies(prep._cookies)
jpayne@68 272
jpayne@68 273 prep.headers["Authorization"] = self.build_digest_header(
jpayne@68 274 prep.method, prep.url
jpayne@68 275 )
jpayne@68 276 _r = r.connection.send(prep, **kwargs)
jpayne@68 277 _r.history.append(r)
jpayne@68 278 _r.request = prep
jpayne@68 279
jpayne@68 280 return _r
jpayne@68 281
jpayne@68 282 self._thread_local.num_401_calls = 1
jpayne@68 283 return r
jpayne@68 284
jpayne@68 285 def __call__(self, r):
jpayne@68 286 # Initialize per-thread state, if needed
jpayne@68 287 self.init_per_thread_state()
jpayne@68 288 # If we have a saved nonce, skip the 401
jpayne@68 289 if self._thread_local.last_nonce:
jpayne@68 290 r.headers["Authorization"] = self.build_digest_header(r.method, r.url)
jpayne@68 291 try:
jpayne@68 292 self._thread_local.pos = r.body.tell()
jpayne@68 293 except AttributeError:
jpayne@68 294 # In the case of HTTPDigestAuth being reused and the body of
jpayne@68 295 # the previous request was a file-like object, pos has the
jpayne@68 296 # file position of the previous body. Ensure it's set to
jpayne@68 297 # None.
jpayne@68 298 self._thread_local.pos = None
jpayne@68 299 r.register_hook("response", self.handle_401)
jpayne@68 300 r.register_hook("response", self.handle_redirect)
jpayne@68 301 self._thread_local.num_401_calls = 1
jpayne@68 302
jpayne@68 303 return r
jpayne@68 304
jpayne@68 305 def __eq__(self, other):
jpayne@68 306 return all(
jpayne@68 307 [
jpayne@68 308 self.username == getattr(other, "username", None),
jpayne@68 309 self.password == getattr(other, "password", None),
jpayne@68 310 ]
jpayne@68 311 )
jpayne@68 312
jpayne@68 313 def __ne__(self, other):
jpayne@68 314 return not self == other