annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/requests/auth.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 """
jpayne@69 2 requests.auth
jpayne@69 3 ~~~~~~~~~~~~~
jpayne@69 4
jpayne@69 5 This module contains the authentication handlers for Requests.
jpayne@69 6 """
jpayne@69 7
jpayne@69 8 import hashlib
jpayne@69 9 import os
jpayne@69 10 import re
jpayne@69 11 import threading
jpayne@69 12 import time
jpayne@69 13 import warnings
jpayne@69 14 from base64 import b64encode
jpayne@69 15
jpayne@69 16 from ._internal_utils import to_native_string
jpayne@69 17 from .compat import basestring, str, urlparse
jpayne@69 18 from .cookies import extract_cookies_to_jar
jpayne@69 19 from .utils import parse_dict_header
jpayne@69 20
jpayne@69 21 CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"
jpayne@69 22 CONTENT_TYPE_MULTI_PART = "multipart/form-data"
jpayne@69 23
jpayne@69 24
jpayne@69 25 def _basic_auth_str(username, password):
jpayne@69 26 """Returns a Basic Auth string."""
jpayne@69 27
jpayne@69 28 # "I want us to put a big-ol' comment on top of it that
jpayne@69 29 # says that this behaviour is dumb but we need to preserve
jpayne@69 30 # it because people are relying on it."
jpayne@69 31 # - Lukasa
jpayne@69 32 #
jpayne@69 33 # These are here solely to maintain backwards compatibility
jpayne@69 34 # for things like ints. This will be removed in 3.0.0.
jpayne@69 35 if not isinstance(username, basestring):
jpayne@69 36 warnings.warn(
jpayne@69 37 "Non-string usernames will no longer be supported in Requests "
jpayne@69 38 "3.0.0. Please convert the object you've passed in ({!r}) to "
jpayne@69 39 "a string or bytes object in the near future to avoid "
jpayne@69 40 "problems.".format(username),
jpayne@69 41 category=DeprecationWarning,
jpayne@69 42 )
jpayne@69 43 username = str(username)
jpayne@69 44
jpayne@69 45 if not isinstance(password, basestring):
jpayne@69 46 warnings.warn(
jpayne@69 47 "Non-string passwords will no longer be supported in Requests "
jpayne@69 48 "3.0.0. Please convert the object you've passed in ({!r}) to "
jpayne@69 49 "a string or bytes object in the near future to avoid "
jpayne@69 50 "problems.".format(type(password)),
jpayne@69 51 category=DeprecationWarning,
jpayne@69 52 )
jpayne@69 53 password = str(password)
jpayne@69 54 # -- End Removal --
jpayne@69 55
jpayne@69 56 if isinstance(username, str):
jpayne@69 57 username = username.encode("latin1")
jpayne@69 58
jpayne@69 59 if isinstance(password, str):
jpayne@69 60 password = password.encode("latin1")
jpayne@69 61
jpayne@69 62 authstr = "Basic " + to_native_string(
jpayne@69 63 b64encode(b":".join((username, password))).strip()
jpayne@69 64 )
jpayne@69 65
jpayne@69 66 return authstr
jpayne@69 67
jpayne@69 68
jpayne@69 69 class AuthBase:
jpayne@69 70 """Base class that all auth implementations derive from"""
jpayne@69 71
jpayne@69 72 def __call__(self, r):
jpayne@69 73 raise NotImplementedError("Auth hooks must be callable.")
jpayne@69 74
jpayne@69 75
jpayne@69 76 class HTTPBasicAuth(AuthBase):
jpayne@69 77 """Attaches HTTP Basic Authentication to the given Request object."""
jpayne@69 78
jpayne@69 79 def __init__(self, username, password):
jpayne@69 80 self.username = username
jpayne@69 81 self.password = password
jpayne@69 82
jpayne@69 83 def __eq__(self, other):
jpayne@69 84 return all(
jpayne@69 85 [
jpayne@69 86 self.username == getattr(other, "username", None),
jpayne@69 87 self.password == getattr(other, "password", None),
jpayne@69 88 ]
jpayne@69 89 )
jpayne@69 90
jpayne@69 91 def __ne__(self, other):
jpayne@69 92 return not self == other
jpayne@69 93
jpayne@69 94 def __call__(self, r):
jpayne@69 95 r.headers["Authorization"] = _basic_auth_str(self.username, self.password)
jpayne@69 96 return r
jpayne@69 97
jpayne@69 98
jpayne@69 99 class HTTPProxyAuth(HTTPBasicAuth):
jpayne@69 100 """Attaches HTTP Proxy Authentication to a given Request object."""
jpayne@69 101
jpayne@69 102 def __call__(self, r):
jpayne@69 103 r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password)
jpayne@69 104 return r
jpayne@69 105
jpayne@69 106
jpayne@69 107 class HTTPDigestAuth(AuthBase):
jpayne@69 108 """Attaches HTTP Digest Authentication to the given Request object."""
jpayne@69 109
jpayne@69 110 def __init__(self, username, password):
jpayne@69 111 self.username = username
jpayne@69 112 self.password = password
jpayne@69 113 # Keep state in per-thread local storage
jpayne@69 114 self._thread_local = threading.local()
jpayne@69 115
jpayne@69 116 def init_per_thread_state(self):
jpayne@69 117 # Ensure state is initialized just once per-thread
jpayne@69 118 if not hasattr(self._thread_local, "init"):
jpayne@69 119 self._thread_local.init = True
jpayne@69 120 self._thread_local.last_nonce = ""
jpayne@69 121 self._thread_local.nonce_count = 0
jpayne@69 122 self._thread_local.chal = {}
jpayne@69 123 self._thread_local.pos = None
jpayne@69 124 self._thread_local.num_401_calls = None
jpayne@69 125
jpayne@69 126 def build_digest_header(self, method, url):
jpayne@69 127 """
jpayne@69 128 :rtype: str
jpayne@69 129 """
jpayne@69 130
jpayne@69 131 realm = self._thread_local.chal["realm"]
jpayne@69 132 nonce = self._thread_local.chal["nonce"]
jpayne@69 133 qop = self._thread_local.chal.get("qop")
jpayne@69 134 algorithm = self._thread_local.chal.get("algorithm")
jpayne@69 135 opaque = self._thread_local.chal.get("opaque")
jpayne@69 136 hash_utf8 = None
jpayne@69 137
jpayne@69 138 if algorithm is None:
jpayne@69 139 _algorithm = "MD5"
jpayne@69 140 else:
jpayne@69 141 _algorithm = algorithm.upper()
jpayne@69 142 # lambdas assume digest modules are imported at the top level
jpayne@69 143 if _algorithm == "MD5" or _algorithm == "MD5-SESS":
jpayne@69 144
jpayne@69 145 def md5_utf8(x):
jpayne@69 146 if isinstance(x, str):
jpayne@69 147 x = x.encode("utf-8")
jpayne@69 148 return hashlib.md5(x).hexdigest()
jpayne@69 149
jpayne@69 150 hash_utf8 = md5_utf8
jpayne@69 151 elif _algorithm == "SHA":
jpayne@69 152
jpayne@69 153 def sha_utf8(x):
jpayne@69 154 if isinstance(x, str):
jpayne@69 155 x = x.encode("utf-8")
jpayne@69 156 return hashlib.sha1(x).hexdigest()
jpayne@69 157
jpayne@69 158 hash_utf8 = sha_utf8
jpayne@69 159 elif _algorithm == "SHA-256":
jpayne@69 160
jpayne@69 161 def sha256_utf8(x):
jpayne@69 162 if isinstance(x, str):
jpayne@69 163 x = x.encode("utf-8")
jpayne@69 164 return hashlib.sha256(x).hexdigest()
jpayne@69 165
jpayne@69 166 hash_utf8 = sha256_utf8
jpayne@69 167 elif _algorithm == "SHA-512":
jpayne@69 168
jpayne@69 169 def sha512_utf8(x):
jpayne@69 170 if isinstance(x, str):
jpayne@69 171 x = x.encode("utf-8")
jpayne@69 172 return hashlib.sha512(x).hexdigest()
jpayne@69 173
jpayne@69 174 hash_utf8 = sha512_utf8
jpayne@69 175
jpayne@69 176 KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731
jpayne@69 177
jpayne@69 178 if hash_utf8 is None:
jpayne@69 179 return None
jpayne@69 180
jpayne@69 181 # XXX not implemented yet
jpayne@69 182 entdig = None
jpayne@69 183 p_parsed = urlparse(url)
jpayne@69 184 #: path is request-uri defined in RFC 2616 which should not be empty
jpayne@69 185 path = p_parsed.path or "/"
jpayne@69 186 if p_parsed.query:
jpayne@69 187 path += f"?{p_parsed.query}"
jpayne@69 188
jpayne@69 189 A1 = f"{self.username}:{realm}:{self.password}"
jpayne@69 190 A2 = f"{method}:{path}"
jpayne@69 191
jpayne@69 192 HA1 = hash_utf8(A1)
jpayne@69 193 HA2 = hash_utf8(A2)
jpayne@69 194
jpayne@69 195 if nonce == self._thread_local.last_nonce:
jpayne@69 196 self._thread_local.nonce_count += 1
jpayne@69 197 else:
jpayne@69 198 self._thread_local.nonce_count = 1
jpayne@69 199 ncvalue = f"{self._thread_local.nonce_count:08x}"
jpayne@69 200 s = str(self._thread_local.nonce_count).encode("utf-8")
jpayne@69 201 s += nonce.encode("utf-8")
jpayne@69 202 s += time.ctime().encode("utf-8")
jpayne@69 203 s += os.urandom(8)
jpayne@69 204
jpayne@69 205 cnonce = hashlib.sha1(s).hexdigest()[:16]
jpayne@69 206 if _algorithm == "MD5-SESS":
jpayne@69 207 HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}")
jpayne@69 208
jpayne@69 209 if not qop:
jpayne@69 210 respdig = KD(HA1, f"{nonce}:{HA2}")
jpayne@69 211 elif qop == "auth" or "auth" in qop.split(","):
jpayne@69 212 noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}"
jpayne@69 213 respdig = KD(HA1, noncebit)
jpayne@69 214 else:
jpayne@69 215 # XXX handle auth-int.
jpayne@69 216 return None
jpayne@69 217
jpayne@69 218 self._thread_local.last_nonce = nonce
jpayne@69 219
jpayne@69 220 # XXX should the partial digests be encoded too?
jpayne@69 221 base = (
jpayne@69 222 f'username="{self.username}", realm="{realm}", nonce="{nonce}", '
jpayne@69 223 f'uri="{path}", response="{respdig}"'
jpayne@69 224 )
jpayne@69 225 if opaque:
jpayne@69 226 base += f', opaque="{opaque}"'
jpayne@69 227 if algorithm:
jpayne@69 228 base += f', algorithm="{algorithm}"'
jpayne@69 229 if entdig:
jpayne@69 230 base += f', digest="{entdig}"'
jpayne@69 231 if qop:
jpayne@69 232 base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"'
jpayne@69 233
jpayne@69 234 return f"Digest {base}"
jpayne@69 235
jpayne@69 236 def handle_redirect(self, r, **kwargs):
jpayne@69 237 """Reset num_401_calls counter on redirects."""
jpayne@69 238 if r.is_redirect:
jpayne@69 239 self._thread_local.num_401_calls = 1
jpayne@69 240
jpayne@69 241 def handle_401(self, r, **kwargs):
jpayne@69 242 """
jpayne@69 243 Takes the given response and tries digest-auth, if needed.
jpayne@69 244
jpayne@69 245 :rtype: requests.Response
jpayne@69 246 """
jpayne@69 247
jpayne@69 248 # If response is not 4xx, do not auth
jpayne@69 249 # See https://github.com/psf/requests/issues/3772
jpayne@69 250 if not 400 <= r.status_code < 500:
jpayne@69 251 self._thread_local.num_401_calls = 1
jpayne@69 252 return r
jpayne@69 253
jpayne@69 254 if self._thread_local.pos is not None:
jpayne@69 255 # Rewind the file position indicator of the body to where
jpayne@69 256 # it was to resend the request.
jpayne@69 257 r.request.body.seek(self._thread_local.pos)
jpayne@69 258 s_auth = r.headers.get("www-authenticate", "")
jpayne@69 259
jpayne@69 260 if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2:
jpayne@69 261 self._thread_local.num_401_calls += 1
jpayne@69 262 pat = re.compile(r"digest ", flags=re.IGNORECASE)
jpayne@69 263 self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1))
jpayne@69 264
jpayne@69 265 # Consume content and release the original connection
jpayne@69 266 # to allow our new request to reuse the same one.
jpayne@69 267 r.content
jpayne@69 268 r.close()
jpayne@69 269 prep = r.request.copy()
jpayne@69 270 extract_cookies_to_jar(prep._cookies, r.request, r.raw)
jpayne@69 271 prep.prepare_cookies(prep._cookies)
jpayne@69 272
jpayne@69 273 prep.headers["Authorization"] = self.build_digest_header(
jpayne@69 274 prep.method, prep.url
jpayne@69 275 )
jpayne@69 276 _r = r.connection.send(prep, **kwargs)
jpayne@69 277 _r.history.append(r)
jpayne@69 278 _r.request = prep
jpayne@69 279
jpayne@69 280 return _r
jpayne@69 281
jpayne@69 282 self._thread_local.num_401_calls = 1
jpayne@69 283 return r
jpayne@69 284
jpayne@69 285 def __call__(self, r):
jpayne@69 286 # Initialize per-thread state, if needed
jpayne@69 287 self.init_per_thread_state()
jpayne@69 288 # If we have a saved nonce, skip the 401
jpayne@69 289 if self._thread_local.last_nonce:
jpayne@69 290 r.headers["Authorization"] = self.build_digest_header(r.method, r.url)
jpayne@69 291 try:
jpayne@69 292 self._thread_local.pos = r.body.tell()
jpayne@69 293 except AttributeError:
jpayne@69 294 # In the case of HTTPDigestAuth being reused and the body of
jpayne@69 295 # the previous request was a file-like object, pos has the
jpayne@69 296 # file position of the previous body. Ensure it's set to
jpayne@69 297 # None.
jpayne@69 298 self._thread_local.pos = None
jpayne@69 299 r.register_hook("response", self.handle_401)
jpayne@69 300 r.register_hook("response", self.handle_redirect)
jpayne@69 301 self._thread_local.num_401_calls = 1
jpayne@69 302
jpayne@69 303 return r
jpayne@69 304
jpayne@69 305 def __eq__(self, other):
jpayne@69 306 return all(
jpayne@69 307 [
jpayne@69 308 self.username == getattr(other, "username", None),
jpayne@69 309 self.password == getattr(other, "password", None),
jpayne@69 310 ]
jpayne@69 311 )
jpayne@69 312
jpayne@69 313 def __ne__(self, other):
jpayne@69 314 return not self == other