annotate requests/auth.py @ 16:dc2c003078e9 tip

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Tue, 21 May 2024 01:09:25 -0400 (8 months ago)
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 """
jpayne@7 2 requests.auth
jpayne@7 3 ~~~~~~~~~~~~~
jpayne@7 4
jpayne@7 5 This module contains the authentication handlers for Requests.
jpayne@7 6 """
jpayne@7 7
jpayne@7 8 import hashlib
jpayne@7 9 import os
jpayne@7 10 import re
jpayne@7 11 import threading
jpayne@7 12 import time
jpayne@7 13 import warnings
jpayne@7 14 from base64 import b64encode
jpayne@7 15
jpayne@7 16 from ._internal_utils import to_native_string
jpayne@7 17 from .compat import basestring, str, urlparse
jpayne@7 18 from .cookies import extract_cookies_to_jar
jpayne@7 19 from .utils import parse_dict_header
jpayne@7 20
jpayne@7 21 CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"
jpayne@7 22 CONTENT_TYPE_MULTI_PART = "multipart/form-data"
jpayne@7 23
jpayne@7 24
jpayne@7 25 def _basic_auth_str(username, password):
jpayne@7 26 """Returns a Basic Auth string."""
jpayne@7 27
jpayne@7 28 # "I want us to put a big-ol' comment on top of it that
jpayne@7 29 # says that this behaviour is dumb but we need to preserve
jpayne@7 30 # it because people are relying on it."
jpayne@7 31 # - Lukasa
jpayne@7 32 #
jpayne@7 33 # These are here solely to maintain backwards compatibility
jpayne@7 34 # for things like ints. This will be removed in 3.0.0.
jpayne@7 35 if not isinstance(username, basestring):
jpayne@7 36 warnings.warn(
jpayne@7 37 "Non-string usernames will no longer be supported in Requests "
jpayne@7 38 "3.0.0. Please convert the object you've passed in ({!r}) to "
jpayne@7 39 "a string or bytes object in the near future to avoid "
jpayne@7 40 "problems.".format(username),
jpayne@7 41 category=DeprecationWarning,
jpayne@7 42 )
jpayne@7 43 username = str(username)
jpayne@7 44
jpayne@7 45 if not isinstance(password, basestring):
jpayne@7 46 warnings.warn(
jpayne@7 47 "Non-string passwords will no longer be supported in Requests "
jpayne@7 48 "3.0.0. Please convert the object you've passed in ({!r}) to "
jpayne@7 49 "a string or bytes object in the near future to avoid "
jpayne@7 50 "problems.".format(type(password)),
jpayne@7 51 category=DeprecationWarning,
jpayne@7 52 )
jpayne@7 53 password = str(password)
jpayne@7 54 # -- End Removal --
jpayne@7 55
jpayne@7 56 if isinstance(username, str):
jpayne@7 57 username = username.encode("latin1")
jpayne@7 58
jpayne@7 59 if isinstance(password, str):
jpayne@7 60 password = password.encode("latin1")
jpayne@7 61
jpayne@7 62 authstr = "Basic " + to_native_string(
jpayne@7 63 b64encode(b":".join((username, password))).strip()
jpayne@7 64 )
jpayne@7 65
jpayne@7 66 return authstr
jpayne@7 67
jpayne@7 68
jpayne@7 69 class AuthBase:
jpayne@7 70 """Base class that all auth implementations derive from"""
jpayne@7 71
jpayne@7 72 def __call__(self, r):
jpayne@7 73 raise NotImplementedError("Auth hooks must be callable.")
jpayne@7 74
jpayne@7 75
jpayne@7 76 class HTTPBasicAuth(AuthBase):
jpayne@7 77 """Attaches HTTP Basic Authentication to the given Request object."""
jpayne@7 78
jpayne@7 79 def __init__(self, username, password):
jpayne@7 80 self.username = username
jpayne@7 81 self.password = password
jpayne@7 82
jpayne@7 83 def __eq__(self, other):
jpayne@7 84 return all(
jpayne@7 85 [
jpayne@7 86 self.username == getattr(other, "username", None),
jpayne@7 87 self.password == getattr(other, "password", None),
jpayne@7 88 ]
jpayne@7 89 )
jpayne@7 90
jpayne@7 91 def __ne__(self, other):
jpayne@7 92 return not self == other
jpayne@7 93
jpayne@7 94 def __call__(self, r):
jpayne@7 95 r.headers["Authorization"] = _basic_auth_str(self.username, self.password)
jpayne@7 96 return r
jpayne@7 97
jpayne@7 98
jpayne@7 99 class HTTPProxyAuth(HTTPBasicAuth):
jpayne@7 100 """Attaches HTTP Proxy Authentication to a given Request object."""
jpayne@7 101
jpayne@7 102 def __call__(self, r):
jpayne@7 103 r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password)
jpayne@7 104 return r
jpayne@7 105
jpayne@7 106
jpayne@7 107 class HTTPDigestAuth(AuthBase):
jpayne@7 108 """Attaches HTTP Digest Authentication to the given Request object."""
jpayne@7 109
jpayne@7 110 def __init__(self, username, password):
jpayne@7 111 self.username = username
jpayne@7 112 self.password = password
jpayne@7 113 # Keep state in per-thread local storage
jpayne@7 114 self._thread_local = threading.local()
jpayne@7 115
jpayne@7 116 def init_per_thread_state(self):
jpayne@7 117 # Ensure state is initialized just once per-thread
jpayne@7 118 if not hasattr(self._thread_local, "init"):
jpayne@7 119 self._thread_local.init = True
jpayne@7 120 self._thread_local.last_nonce = ""
jpayne@7 121 self._thread_local.nonce_count = 0
jpayne@7 122 self._thread_local.chal = {}
jpayne@7 123 self._thread_local.pos = None
jpayne@7 124 self._thread_local.num_401_calls = None
jpayne@7 125
jpayne@7 126 def build_digest_header(self, method, url):
jpayne@7 127 """
jpayne@7 128 :rtype: str
jpayne@7 129 """
jpayne@7 130
jpayne@7 131 realm = self._thread_local.chal["realm"]
jpayne@7 132 nonce = self._thread_local.chal["nonce"]
jpayne@7 133 qop = self._thread_local.chal.get("qop")
jpayne@7 134 algorithm = self._thread_local.chal.get("algorithm")
jpayne@7 135 opaque = self._thread_local.chal.get("opaque")
jpayne@7 136 hash_utf8 = None
jpayne@7 137
jpayne@7 138 if algorithm is None:
jpayne@7 139 _algorithm = "MD5"
jpayne@7 140 else:
jpayne@7 141 _algorithm = algorithm.upper()
jpayne@7 142 # lambdas assume digest modules are imported at the top level
jpayne@7 143 if _algorithm == "MD5" or _algorithm == "MD5-SESS":
jpayne@7 144
jpayne@7 145 def md5_utf8(x):
jpayne@7 146 if isinstance(x, str):
jpayne@7 147 x = x.encode("utf-8")
jpayne@7 148 return hashlib.md5(x).hexdigest()
jpayne@7 149
jpayne@7 150 hash_utf8 = md5_utf8
jpayne@7 151 elif _algorithm == "SHA":
jpayne@7 152
jpayne@7 153 def sha_utf8(x):
jpayne@7 154 if isinstance(x, str):
jpayne@7 155 x = x.encode("utf-8")
jpayne@7 156 return hashlib.sha1(x).hexdigest()
jpayne@7 157
jpayne@7 158 hash_utf8 = sha_utf8
jpayne@7 159 elif _algorithm == "SHA-256":
jpayne@7 160
jpayne@7 161 def sha256_utf8(x):
jpayne@7 162 if isinstance(x, str):
jpayne@7 163 x = x.encode("utf-8")
jpayne@7 164 return hashlib.sha256(x).hexdigest()
jpayne@7 165
jpayne@7 166 hash_utf8 = sha256_utf8
jpayne@7 167 elif _algorithm == "SHA-512":
jpayne@7 168
jpayne@7 169 def sha512_utf8(x):
jpayne@7 170 if isinstance(x, str):
jpayne@7 171 x = x.encode("utf-8")
jpayne@7 172 return hashlib.sha512(x).hexdigest()
jpayne@7 173
jpayne@7 174 hash_utf8 = sha512_utf8
jpayne@7 175
jpayne@7 176 KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731
jpayne@7 177
jpayne@7 178 if hash_utf8 is None:
jpayne@7 179 return None
jpayne@7 180
jpayne@7 181 # XXX not implemented yet
jpayne@7 182 entdig = None
jpayne@7 183 p_parsed = urlparse(url)
jpayne@7 184 #: path is request-uri defined in RFC 2616 which should not be empty
jpayne@7 185 path = p_parsed.path or "/"
jpayne@7 186 if p_parsed.query:
jpayne@7 187 path += f"?{p_parsed.query}"
jpayne@7 188
jpayne@7 189 A1 = f"{self.username}:{realm}:{self.password}"
jpayne@7 190 A2 = f"{method}:{path}"
jpayne@7 191
jpayne@7 192 HA1 = hash_utf8(A1)
jpayne@7 193 HA2 = hash_utf8(A2)
jpayne@7 194
jpayne@7 195 if nonce == self._thread_local.last_nonce:
jpayne@7 196 self._thread_local.nonce_count += 1
jpayne@7 197 else:
jpayne@7 198 self._thread_local.nonce_count = 1
jpayne@7 199 ncvalue = f"{self._thread_local.nonce_count:08x}"
jpayne@7 200 s = str(self._thread_local.nonce_count).encode("utf-8")
jpayne@7 201 s += nonce.encode("utf-8")
jpayne@7 202 s += time.ctime().encode("utf-8")
jpayne@7 203 s += os.urandom(8)
jpayne@7 204
jpayne@7 205 cnonce = hashlib.sha1(s).hexdigest()[:16]
jpayne@7 206 if _algorithm == "MD5-SESS":
jpayne@7 207 HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}")
jpayne@7 208
jpayne@7 209 if not qop:
jpayne@7 210 respdig = KD(HA1, f"{nonce}:{HA2}")
jpayne@7 211 elif qop == "auth" or "auth" in qop.split(","):
jpayne@7 212 noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}"
jpayne@7 213 respdig = KD(HA1, noncebit)
jpayne@7 214 else:
jpayne@7 215 # XXX handle auth-int.
jpayne@7 216 return None
jpayne@7 217
jpayne@7 218 self._thread_local.last_nonce = nonce
jpayne@7 219
jpayne@7 220 # XXX should the partial digests be encoded too?
jpayne@7 221 base = (
jpayne@7 222 f'username="{self.username}", realm="{realm}", nonce="{nonce}", '
jpayne@7 223 f'uri="{path}", response="{respdig}"'
jpayne@7 224 )
jpayne@7 225 if opaque:
jpayne@7 226 base += f', opaque="{opaque}"'
jpayne@7 227 if algorithm:
jpayne@7 228 base += f', algorithm="{algorithm}"'
jpayne@7 229 if entdig:
jpayne@7 230 base += f', digest="{entdig}"'
jpayne@7 231 if qop:
jpayne@7 232 base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"'
jpayne@7 233
jpayne@7 234 return f"Digest {base}"
jpayne@7 235
jpayne@7 236 def handle_redirect(self, r, **kwargs):
jpayne@7 237 """Reset num_401_calls counter on redirects."""
jpayne@7 238 if r.is_redirect:
jpayne@7 239 self._thread_local.num_401_calls = 1
jpayne@7 240
jpayne@7 241 def handle_401(self, r, **kwargs):
jpayne@7 242 """
jpayne@7 243 Takes the given response and tries digest-auth, if needed.
jpayne@7 244
jpayne@7 245 :rtype: requests.Response
jpayne@7 246 """
jpayne@7 247
jpayne@7 248 # If response is not 4xx, do not auth
jpayne@7 249 # See https://github.com/psf/requests/issues/3772
jpayne@7 250 if not 400 <= r.status_code < 500:
jpayne@7 251 self._thread_local.num_401_calls = 1
jpayne@7 252 return r
jpayne@7 253
jpayne@7 254 if self._thread_local.pos is not None:
jpayne@7 255 # Rewind the file position indicator of the body to where
jpayne@7 256 # it was to resend the request.
jpayne@7 257 r.request.body.seek(self._thread_local.pos)
jpayne@7 258 s_auth = r.headers.get("www-authenticate", "")
jpayne@7 259
jpayne@7 260 if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2:
jpayne@7 261
jpayne@7 262 self._thread_local.num_401_calls += 1
jpayne@7 263 pat = re.compile(r"digest ", flags=re.IGNORECASE)
jpayne@7 264 self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1))
jpayne@7 265
jpayne@7 266 # Consume content and release the original connection
jpayne@7 267 # to allow our new request to reuse the same one.
jpayne@7 268 r.content
jpayne@7 269 r.close()
jpayne@7 270 prep = r.request.copy()
jpayne@7 271 extract_cookies_to_jar(prep._cookies, r.request, r.raw)
jpayne@7 272 prep.prepare_cookies(prep._cookies)
jpayne@7 273
jpayne@7 274 prep.headers["Authorization"] = self.build_digest_header(
jpayne@7 275 prep.method, prep.url
jpayne@7 276 )
jpayne@7 277 _r = r.connection.send(prep, **kwargs)
jpayne@7 278 _r.history.append(r)
jpayne@7 279 _r.request = prep
jpayne@7 280
jpayne@7 281 return _r
jpayne@7 282
jpayne@7 283 self._thread_local.num_401_calls = 1
jpayne@7 284 return r
jpayne@7 285
jpayne@7 286 def __call__(self, r):
jpayne@7 287 # Initialize per-thread state, if needed
jpayne@7 288 self.init_per_thread_state()
jpayne@7 289 # If we have a saved nonce, skip the 401
jpayne@7 290 if self._thread_local.last_nonce:
jpayne@7 291 r.headers["Authorization"] = self.build_digest_header(r.method, r.url)
jpayne@7 292 try:
jpayne@7 293 self._thread_local.pos = r.body.tell()
jpayne@7 294 except AttributeError:
jpayne@7 295 # In the case of HTTPDigestAuth being reused and the body of
jpayne@7 296 # the previous request was a file-like object, pos has the
jpayne@7 297 # file position of the previous body. Ensure it's set to
jpayne@7 298 # None.
jpayne@7 299 self._thread_local.pos = None
jpayne@7 300 r.register_hook("response", self.handle_401)
jpayne@7 301 r.register_hook("response", self.handle_redirect)
jpayne@7 302 self._thread_local.num_401_calls = 1
jpayne@7 303
jpayne@7 304 return r
jpayne@7 305
jpayne@7 306 def __eq__(self, other):
jpayne@7 307 return all(
jpayne@7 308 [
jpayne@7 309 self.username == getattr(other, "username", None),
jpayne@7 310 self.password == getattr(other, "password", None),
jpayne@7 311 ]
jpayne@7 312 )
jpayne@7 313
jpayne@7 314 def __ne__(self, other):
jpayne@7 315 return not self == other