jpayne@7: """ jpayne@7: requests.auth jpayne@7: ~~~~~~~~~~~~~ jpayne@7: jpayne@7: This module contains the authentication handlers for Requests. jpayne@7: """ jpayne@7: jpayne@7: import hashlib jpayne@7: import os jpayne@7: import re jpayne@7: import threading jpayne@7: import time jpayne@7: import warnings jpayne@7: from base64 import b64encode jpayne@7: jpayne@7: from ._internal_utils import to_native_string jpayne@7: from .compat import basestring, str, urlparse jpayne@7: from .cookies import extract_cookies_to_jar jpayne@7: from .utils import parse_dict_header jpayne@7: jpayne@7: CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded" jpayne@7: CONTENT_TYPE_MULTI_PART = "multipart/form-data" jpayne@7: jpayne@7: jpayne@7: def _basic_auth_str(username, password): jpayne@7: """Returns a Basic Auth string.""" jpayne@7: jpayne@7: # "I want us to put a big-ol' comment on top of it that jpayne@7: # says that this behaviour is dumb but we need to preserve jpayne@7: # it because people are relying on it." jpayne@7: # - Lukasa jpayne@7: # jpayne@7: # These are here solely to maintain backwards compatibility jpayne@7: # for things like ints. This will be removed in 3.0.0. jpayne@7: if not isinstance(username, basestring): jpayne@7: warnings.warn( jpayne@7: "Non-string usernames will no longer be supported in Requests " jpayne@7: "3.0.0. Please convert the object you've passed in ({!r}) to " jpayne@7: "a string or bytes object in the near future to avoid " jpayne@7: "problems.".format(username), jpayne@7: category=DeprecationWarning, jpayne@7: ) jpayne@7: username = str(username) jpayne@7: jpayne@7: if not isinstance(password, basestring): jpayne@7: warnings.warn( jpayne@7: "Non-string passwords will no longer be supported in Requests " jpayne@7: "3.0.0. Please convert the object you've passed in ({!r}) to " jpayne@7: "a string or bytes object in the near future to avoid " jpayne@7: "problems.".format(type(password)), jpayne@7: category=DeprecationWarning, jpayne@7: ) jpayne@7: password = str(password) jpayne@7: # -- End Removal -- jpayne@7: jpayne@7: if isinstance(username, str): jpayne@7: username = username.encode("latin1") jpayne@7: jpayne@7: if isinstance(password, str): jpayne@7: password = password.encode("latin1") jpayne@7: jpayne@7: authstr = "Basic " + to_native_string( jpayne@7: b64encode(b":".join((username, password))).strip() jpayne@7: ) jpayne@7: jpayne@7: return authstr jpayne@7: jpayne@7: jpayne@7: class AuthBase: jpayne@7: """Base class that all auth implementations derive from""" jpayne@7: jpayne@7: def __call__(self, r): jpayne@7: raise NotImplementedError("Auth hooks must be callable.") jpayne@7: jpayne@7: jpayne@7: class HTTPBasicAuth(AuthBase): jpayne@7: """Attaches HTTP Basic Authentication to the given Request object.""" jpayne@7: jpayne@7: def __init__(self, username, password): jpayne@7: self.username = username jpayne@7: self.password = password jpayne@7: jpayne@7: def __eq__(self, other): jpayne@7: return all( jpayne@7: [ jpayne@7: self.username == getattr(other, "username", None), jpayne@7: self.password == getattr(other, "password", None), jpayne@7: ] jpayne@7: ) jpayne@7: jpayne@7: def __ne__(self, other): jpayne@7: return not self == other jpayne@7: jpayne@7: def __call__(self, r): jpayne@7: r.headers["Authorization"] = _basic_auth_str(self.username, self.password) jpayne@7: return r jpayne@7: jpayne@7: jpayne@7: class HTTPProxyAuth(HTTPBasicAuth): jpayne@7: """Attaches HTTP Proxy Authentication to a given Request object.""" jpayne@7: jpayne@7: def __call__(self, r): jpayne@7: r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) jpayne@7: return r jpayne@7: jpayne@7: jpayne@7: class HTTPDigestAuth(AuthBase): jpayne@7: """Attaches HTTP Digest Authentication to the given Request object.""" jpayne@7: jpayne@7: def __init__(self, username, password): jpayne@7: self.username = username jpayne@7: self.password = password jpayne@7: # Keep state in per-thread local storage jpayne@7: self._thread_local = threading.local() jpayne@7: jpayne@7: def init_per_thread_state(self): jpayne@7: # Ensure state is initialized just once per-thread jpayne@7: if not hasattr(self._thread_local, "init"): jpayne@7: self._thread_local.init = True jpayne@7: self._thread_local.last_nonce = "" jpayne@7: self._thread_local.nonce_count = 0 jpayne@7: self._thread_local.chal = {} jpayne@7: self._thread_local.pos = None jpayne@7: self._thread_local.num_401_calls = None jpayne@7: jpayne@7: def build_digest_header(self, method, url): jpayne@7: """ jpayne@7: :rtype: str jpayne@7: """ jpayne@7: jpayne@7: realm = self._thread_local.chal["realm"] jpayne@7: nonce = self._thread_local.chal["nonce"] jpayne@7: qop = self._thread_local.chal.get("qop") jpayne@7: algorithm = self._thread_local.chal.get("algorithm") jpayne@7: opaque = self._thread_local.chal.get("opaque") jpayne@7: hash_utf8 = None jpayne@7: jpayne@7: if algorithm is None: jpayne@7: _algorithm = "MD5" jpayne@7: else: jpayne@7: _algorithm = algorithm.upper() jpayne@7: # lambdas assume digest modules are imported at the top level jpayne@7: if _algorithm == "MD5" or _algorithm == "MD5-SESS": jpayne@7: jpayne@7: def md5_utf8(x): jpayne@7: if isinstance(x, str): jpayne@7: x = x.encode("utf-8") jpayne@7: return hashlib.md5(x).hexdigest() jpayne@7: jpayne@7: hash_utf8 = md5_utf8 jpayne@7: elif _algorithm == "SHA": jpayne@7: jpayne@7: def sha_utf8(x): jpayne@7: if isinstance(x, str): jpayne@7: x = x.encode("utf-8") jpayne@7: return hashlib.sha1(x).hexdigest() jpayne@7: jpayne@7: hash_utf8 = sha_utf8 jpayne@7: elif _algorithm == "SHA-256": jpayne@7: jpayne@7: def sha256_utf8(x): jpayne@7: if isinstance(x, str): jpayne@7: x = x.encode("utf-8") jpayne@7: return hashlib.sha256(x).hexdigest() jpayne@7: jpayne@7: hash_utf8 = sha256_utf8 jpayne@7: elif _algorithm == "SHA-512": jpayne@7: jpayne@7: def sha512_utf8(x): jpayne@7: if isinstance(x, str): jpayne@7: x = x.encode("utf-8") jpayne@7: return hashlib.sha512(x).hexdigest() jpayne@7: jpayne@7: hash_utf8 = sha512_utf8 jpayne@7: jpayne@7: KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731 jpayne@7: jpayne@7: if hash_utf8 is None: jpayne@7: return None jpayne@7: jpayne@7: # XXX not implemented yet jpayne@7: entdig = None jpayne@7: p_parsed = urlparse(url) jpayne@7: #: path is request-uri defined in RFC 2616 which should not be empty jpayne@7: path = p_parsed.path or "/" jpayne@7: if p_parsed.query: jpayne@7: path += f"?{p_parsed.query}" jpayne@7: jpayne@7: A1 = f"{self.username}:{realm}:{self.password}" jpayne@7: A2 = f"{method}:{path}" jpayne@7: jpayne@7: HA1 = hash_utf8(A1) jpayne@7: HA2 = hash_utf8(A2) jpayne@7: jpayne@7: if nonce == self._thread_local.last_nonce: jpayne@7: self._thread_local.nonce_count += 1 jpayne@7: else: jpayne@7: self._thread_local.nonce_count = 1 jpayne@7: ncvalue = f"{self._thread_local.nonce_count:08x}" jpayne@7: s = str(self._thread_local.nonce_count).encode("utf-8") jpayne@7: s += nonce.encode("utf-8") jpayne@7: s += time.ctime().encode("utf-8") jpayne@7: s += os.urandom(8) jpayne@7: jpayne@7: cnonce = hashlib.sha1(s).hexdigest()[:16] jpayne@7: if _algorithm == "MD5-SESS": jpayne@7: HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") jpayne@7: jpayne@7: if not qop: jpayne@7: respdig = KD(HA1, f"{nonce}:{HA2}") jpayne@7: elif qop == "auth" or "auth" in qop.split(","): jpayne@7: noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" jpayne@7: respdig = KD(HA1, noncebit) jpayne@7: else: jpayne@7: # XXX handle auth-int. jpayne@7: return None jpayne@7: jpayne@7: self._thread_local.last_nonce = nonce jpayne@7: jpayne@7: # XXX should the partial digests be encoded too? jpayne@7: base = ( jpayne@7: f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' jpayne@7: f'uri="{path}", response="{respdig}"' jpayne@7: ) jpayne@7: if opaque: jpayne@7: base += f', opaque="{opaque}"' jpayne@7: if algorithm: jpayne@7: base += f', algorithm="{algorithm}"' jpayne@7: if entdig: jpayne@7: base += f', digest="{entdig}"' jpayne@7: if qop: jpayne@7: base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' jpayne@7: jpayne@7: return f"Digest {base}" jpayne@7: jpayne@7: def handle_redirect(self, r, **kwargs): jpayne@7: """Reset num_401_calls counter on redirects.""" jpayne@7: if r.is_redirect: jpayne@7: self._thread_local.num_401_calls = 1 jpayne@7: jpayne@7: def handle_401(self, r, **kwargs): jpayne@7: """ jpayne@7: Takes the given response and tries digest-auth, if needed. jpayne@7: jpayne@7: :rtype: requests.Response jpayne@7: """ jpayne@7: jpayne@7: # If response is not 4xx, do not auth jpayne@7: # See https://github.com/psf/requests/issues/3772 jpayne@7: if not 400 <= r.status_code < 500: jpayne@7: self._thread_local.num_401_calls = 1 jpayne@7: return r jpayne@7: jpayne@7: if self._thread_local.pos is not None: jpayne@7: # Rewind the file position indicator of the body to where jpayne@7: # it was to resend the request. jpayne@7: r.request.body.seek(self._thread_local.pos) jpayne@7: s_auth = r.headers.get("www-authenticate", "") jpayne@7: jpayne@7: if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: jpayne@7: jpayne@7: self._thread_local.num_401_calls += 1 jpayne@7: pat = re.compile(r"digest ", flags=re.IGNORECASE) jpayne@7: self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) jpayne@7: jpayne@7: # Consume content and release the original connection jpayne@7: # to allow our new request to reuse the same one. jpayne@7: r.content jpayne@7: r.close() jpayne@7: prep = r.request.copy() jpayne@7: extract_cookies_to_jar(prep._cookies, r.request, r.raw) jpayne@7: prep.prepare_cookies(prep._cookies) jpayne@7: jpayne@7: prep.headers["Authorization"] = self.build_digest_header( jpayne@7: prep.method, prep.url jpayne@7: ) jpayne@7: _r = r.connection.send(prep, **kwargs) jpayne@7: _r.history.append(r) jpayne@7: _r.request = prep jpayne@7: jpayne@7: return _r jpayne@7: jpayne@7: self._thread_local.num_401_calls = 1 jpayne@7: return r jpayne@7: jpayne@7: def __call__(self, r): jpayne@7: # Initialize per-thread state, if needed jpayne@7: self.init_per_thread_state() jpayne@7: # If we have a saved nonce, skip the 401 jpayne@7: if self._thread_local.last_nonce: jpayne@7: r.headers["Authorization"] = self.build_digest_header(r.method, r.url) jpayne@7: try: jpayne@7: self._thread_local.pos = r.body.tell() jpayne@7: except AttributeError: jpayne@7: # In the case of HTTPDigestAuth being reused and the body of jpayne@7: # the previous request was a file-like object, pos has the jpayne@7: # file position of the previous body. Ensure it's set to jpayne@7: # None. jpayne@7: self._thread_local.pos = None jpayne@7: r.register_hook("response", self.handle_401) jpayne@7: r.register_hook("response", self.handle_redirect) jpayne@7: self._thread_local.num_401_calls = 1 jpayne@7: jpayne@7: return r jpayne@7: jpayne@7: def __eq__(self, other): jpayne@7: return all( jpayne@7: [ jpayne@7: self.username == getattr(other, "username", None), jpayne@7: self.password == getattr(other, "password", None), jpayne@7: ] jpayne@7: ) jpayne@7: jpayne@7: def __ne__(self, other): jpayne@7: return not self == other