jpayne@68: """ jpayne@68: requests.auth jpayne@68: ~~~~~~~~~~~~~ jpayne@68: jpayne@68: This module contains the authentication handlers for Requests. jpayne@68: """ jpayne@68: jpayne@68: import hashlib jpayne@68: import os jpayne@68: import re jpayne@68: import threading jpayne@68: import time jpayne@68: import warnings jpayne@68: from base64 import b64encode jpayne@68: jpayne@68: from ._internal_utils import to_native_string jpayne@68: from .compat import basestring, str, urlparse jpayne@68: from .cookies import extract_cookies_to_jar jpayne@68: from .utils import parse_dict_header jpayne@68: jpayne@68: CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded" jpayne@68: CONTENT_TYPE_MULTI_PART = "multipart/form-data" jpayne@68: jpayne@68: jpayne@68: def _basic_auth_str(username, password): jpayne@68: """Returns a Basic Auth string.""" jpayne@68: jpayne@68: # "I want us to put a big-ol' comment on top of it that jpayne@68: # says that this behaviour is dumb but we need to preserve jpayne@68: # it because people are relying on it." jpayne@68: # - Lukasa jpayne@68: # jpayne@68: # These are here solely to maintain backwards compatibility jpayne@68: # for things like ints. This will be removed in 3.0.0. jpayne@68: if not isinstance(username, basestring): jpayne@68: warnings.warn( jpayne@68: "Non-string usernames will no longer be supported in Requests " jpayne@68: "3.0.0. Please convert the object you've passed in ({!r}) to " jpayne@68: "a string or bytes object in the near future to avoid " jpayne@68: "problems.".format(username), jpayne@68: category=DeprecationWarning, jpayne@68: ) jpayne@68: username = str(username) jpayne@68: jpayne@68: if not isinstance(password, basestring): jpayne@68: warnings.warn( jpayne@68: "Non-string passwords will no longer be supported in Requests " jpayne@68: "3.0.0. Please convert the object you've passed in ({!r}) to " jpayne@68: "a string or bytes object in the near future to avoid " jpayne@68: "problems.".format(type(password)), jpayne@68: category=DeprecationWarning, jpayne@68: ) jpayne@68: password = str(password) jpayne@68: # -- End Removal -- jpayne@68: jpayne@68: if isinstance(username, str): jpayne@68: username = username.encode("latin1") jpayne@68: jpayne@68: if isinstance(password, str): jpayne@68: password = password.encode("latin1") jpayne@68: jpayne@68: authstr = "Basic " + to_native_string( jpayne@68: b64encode(b":".join((username, password))).strip() jpayne@68: ) jpayne@68: jpayne@68: return authstr jpayne@68: jpayne@68: jpayne@68: class AuthBase: jpayne@68: """Base class that all auth implementations derive from""" jpayne@68: jpayne@68: def __call__(self, r): jpayne@68: raise NotImplementedError("Auth hooks must be callable.") jpayne@68: jpayne@68: jpayne@68: class HTTPBasicAuth(AuthBase): jpayne@68: """Attaches HTTP Basic Authentication to the given Request object.""" jpayne@68: jpayne@68: def __init__(self, username, password): jpayne@68: self.username = username jpayne@68: self.password = password jpayne@68: jpayne@68: def __eq__(self, other): jpayne@68: return all( jpayne@68: [ jpayne@68: self.username == getattr(other, "username", None), jpayne@68: self.password == getattr(other, "password", None), jpayne@68: ] jpayne@68: ) jpayne@68: jpayne@68: def __ne__(self, other): jpayne@68: return not self == other jpayne@68: jpayne@68: def __call__(self, r): jpayne@68: r.headers["Authorization"] = _basic_auth_str(self.username, self.password) jpayne@68: return r jpayne@68: jpayne@68: jpayne@68: class HTTPProxyAuth(HTTPBasicAuth): jpayne@68: """Attaches HTTP Proxy Authentication to a given Request object.""" jpayne@68: jpayne@68: def __call__(self, r): jpayne@68: r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) jpayne@68: return r jpayne@68: jpayne@68: jpayne@68: class HTTPDigestAuth(AuthBase): jpayne@68: """Attaches HTTP Digest Authentication to the given Request object.""" jpayne@68: jpayne@68: def __init__(self, username, password): jpayne@68: self.username = username jpayne@68: self.password = password jpayne@68: # Keep state in per-thread local storage jpayne@68: self._thread_local = threading.local() jpayne@68: jpayne@68: def init_per_thread_state(self): jpayne@68: # Ensure state is initialized just once per-thread jpayne@68: if not hasattr(self._thread_local, "init"): jpayne@68: self._thread_local.init = True jpayne@68: self._thread_local.last_nonce = "" jpayne@68: self._thread_local.nonce_count = 0 jpayne@68: self._thread_local.chal = {} jpayne@68: self._thread_local.pos = None jpayne@68: self._thread_local.num_401_calls = None jpayne@68: jpayne@68: def build_digest_header(self, method, url): jpayne@68: """ jpayne@68: :rtype: str jpayne@68: """ jpayne@68: jpayne@68: realm = self._thread_local.chal["realm"] jpayne@68: nonce = self._thread_local.chal["nonce"] jpayne@68: qop = self._thread_local.chal.get("qop") jpayne@68: algorithm = self._thread_local.chal.get("algorithm") jpayne@68: opaque = self._thread_local.chal.get("opaque") jpayne@68: hash_utf8 = None jpayne@68: jpayne@68: if algorithm is None: jpayne@68: _algorithm = "MD5" jpayne@68: else: jpayne@68: _algorithm = algorithm.upper() jpayne@68: # lambdas assume digest modules are imported at the top level jpayne@68: if _algorithm == "MD5" or _algorithm == "MD5-SESS": jpayne@68: jpayne@68: def md5_utf8(x): jpayne@68: if isinstance(x, str): jpayne@68: x = x.encode("utf-8") jpayne@68: return hashlib.md5(x).hexdigest() jpayne@68: jpayne@68: hash_utf8 = md5_utf8 jpayne@68: elif _algorithm == "SHA": jpayne@68: jpayne@68: def sha_utf8(x): jpayne@68: if isinstance(x, str): jpayne@68: x = x.encode("utf-8") jpayne@68: return hashlib.sha1(x).hexdigest() jpayne@68: jpayne@68: hash_utf8 = sha_utf8 jpayne@68: elif _algorithm == "SHA-256": jpayne@68: jpayne@68: def sha256_utf8(x): jpayne@68: if isinstance(x, str): jpayne@68: x = x.encode("utf-8") jpayne@68: return hashlib.sha256(x).hexdigest() jpayne@68: jpayne@68: hash_utf8 = sha256_utf8 jpayne@68: elif _algorithm == "SHA-512": jpayne@68: jpayne@68: def sha512_utf8(x): jpayne@68: if isinstance(x, str): jpayne@68: x = x.encode("utf-8") jpayne@68: return hashlib.sha512(x).hexdigest() jpayne@68: jpayne@68: hash_utf8 = sha512_utf8 jpayne@68: jpayne@68: KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731 jpayne@68: jpayne@68: if hash_utf8 is None: jpayne@68: return None jpayne@68: jpayne@68: # XXX not implemented yet jpayne@68: entdig = None jpayne@68: p_parsed = urlparse(url) jpayne@68: #: path is request-uri defined in RFC 2616 which should not be empty jpayne@68: path = p_parsed.path or "/" jpayne@68: if p_parsed.query: jpayne@68: path += f"?{p_parsed.query}" jpayne@68: jpayne@68: A1 = f"{self.username}:{realm}:{self.password}" jpayne@68: A2 = f"{method}:{path}" jpayne@68: jpayne@68: HA1 = hash_utf8(A1) jpayne@68: HA2 = hash_utf8(A2) jpayne@68: jpayne@68: if nonce == self._thread_local.last_nonce: jpayne@68: self._thread_local.nonce_count += 1 jpayne@68: else: jpayne@68: self._thread_local.nonce_count = 1 jpayne@68: ncvalue = f"{self._thread_local.nonce_count:08x}" jpayne@68: s = str(self._thread_local.nonce_count).encode("utf-8") jpayne@68: s += nonce.encode("utf-8") jpayne@68: s += time.ctime().encode("utf-8") jpayne@68: s += os.urandom(8) jpayne@68: jpayne@68: cnonce = hashlib.sha1(s).hexdigest()[:16] jpayne@68: if _algorithm == "MD5-SESS": jpayne@68: HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") jpayne@68: jpayne@68: if not qop: jpayne@68: respdig = KD(HA1, f"{nonce}:{HA2}") jpayne@68: elif qop == "auth" or "auth" in qop.split(","): jpayne@68: noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" jpayne@68: respdig = KD(HA1, noncebit) jpayne@68: else: jpayne@68: # XXX handle auth-int. jpayne@68: return None jpayne@68: jpayne@68: self._thread_local.last_nonce = nonce jpayne@68: jpayne@68: # XXX should the partial digests be encoded too? jpayne@68: base = ( jpayne@68: f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' jpayne@68: f'uri="{path}", response="{respdig}"' jpayne@68: ) jpayne@68: if opaque: jpayne@68: base += f', opaque="{opaque}"' jpayne@68: if algorithm: jpayne@68: base += f', algorithm="{algorithm}"' jpayne@68: if entdig: jpayne@68: base += f', digest="{entdig}"' jpayne@68: if qop: jpayne@68: base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' jpayne@68: jpayne@68: return f"Digest {base}" jpayne@68: jpayne@68: def handle_redirect(self, r, **kwargs): jpayne@68: """Reset num_401_calls counter on redirects.""" jpayne@68: if r.is_redirect: jpayne@68: self._thread_local.num_401_calls = 1 jpayne@68: jpayne@68: def handle_401(self, r, **kwargs): jpayne@68: """ jpayne@68: Takes the given response and tries digest-auth, if needed. jpayne@68: jpayne@68: :rtype: requests.Response jpayne@68: """ jpayne@68: jpayne@68: # If response is not 4xx, do not auth jpayne@68: # See https://github.com/psf/requests/issues/3772 jpayne@68: if not 400 <= r.status_code < 500: jpayne@68: self._thread_local.num_401_calls = 1 jpayne@68: return r jpayne@68: jpayne@68: if self._thread_local.pos is not None: jpayne@68: # Rewind the file position indicator of the body to where jpayne@68: # it was to resend the request. jpayne@68: r.request.body.seek(self._thread_local.pos) jpayne@68: s_auth = r.headers.get("www-authenticate", "") jpayne@68: jpayne@68: if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: jpayne@68: self._thread_local.num_401_calls += 1 jpayne@68: pat = re.compile(r"digest ", flags=re.IGNORECASE) jpayne@68: self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) jpayne@68: jpayne@68: # Consume content and release the original connection jpayne@68: # to allow our new request to reuse the same one. jpayne@68: r.content jpayne@68: r.close() jpayne@68: prep = r.request.copy() jpayne@68: extract_cookies_to_jar(prep._cookies, r.request, r.raw) jpayne@68: prep.prepare_cookies(prep._cookies) jpayne@68: jpayne@68: prep.headers["Authorization"] = self.build_digest_header( jpayne@68: prep.method, prep.url jpayne@68: ) jpayne@68: _r = r.connection.send(prep, **kwargs) jpayne@68: _r.history.append(r) jpayne@68: _r.request = prep jpayne@68: jpayne@68: return _r jpayne@68: jpayne@68: self._thread_local.num_401_calls = 1 jpayne@68: return r jpayne@68: jpayne@68: def __call__(self, r): jpayne@68: # Initialize per-thread state, if needed jpayne@68: self.init_per_thread_state() jpayne@68: # If we have a saved nonce, skip the 401 jpayne@68: if self._thread_local.last_nonce: jpayne@68: r.headers["Authorization"] = self.build_digest_header(r.method, r.url) jpayne@68: try: jpayne@68: self._thread_local.pos = r.body.tell() jpayne@68: except AttributeError: jpayne@68: # In the case of HTTPDigestAuth being reused and the body of jpayne@68: # the previous request was a file-like object, pos has the jpayne@68: # file position of the previous body. Ensure it's set to jpayne@68: # None. jpayne@68: self._thread_local.pos = None jpayne@68: r.register_hook("response", self.handle_401) jpayne@68: r.register_hook("response", self.handle_redirect) jpayne@68: self._thread_local.num_401_calls = 1 jpayne@68: jpayne@68: return r jpayne@68: jpayne@68: def __eq__(self, other): jpayne@68: return all( jpayne@68: [ jpayne@68: self.username == getattr(other, "username", None), jpayne@68: self.password == getattr(other, "password", None), jpayne@68: ] jpayne@68: ) jpayne@68: jpayne@68: def __ne__(self, other): jpayne@68: return not self == other