Mercurial > repos > jpayne > bioproject_to_srr_2
view requests/auth.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
line wrap: on
line source
""" requests.auth ~~~~~~~~~~~~~ This module contains the authentication handlers for Requests. """ import hashlib import os import re import threading import time import warnings from base64 import b64encode from ._internal_utils import to_native_string from .compat import basestring, str, urlparse from .cookies import extract_cookies_to_jar from .utils import parse_dict_header CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded" CONTENT_TYPE_MULTI_PART = "multipart/form-data" def _basic_auth_str(username, password): """Returns a Basic Auth string.""" # "I want us to put a big-ol' comment on top of it that # says that this behaviour is dumb but we need to preserve # it because people are relying on it." # - Lukasa # # These are here solely to maintain backwards compatibility # for things like ints. This will be removed in 3.0.0. if not isinstance(username, basestring): warnings.warn( "Non-string usernames will no longer be supported in Requests " "3.0.0. Please convert the object you've passed in ({!r}) to " "a string or bytes object in the near future to avoid " "problems.".format(username), category=DeprecationWarning, ) username = str(username) if not isinstance(password, basestring): warnings.warn( "Non-string passwords will no longer be supported in Requests " "3.0.0. Please convert the object you've passed in ({!r}) to " "a string or bytes object in the near future to avoid " "problems.".format(type(password)), category=DeprecationWarning, ) password = str(password) # -- End Removal -- if isinstance(username, str): username = username.encode("latin1") if isinstance(password, str): password = password.encode("latin1") authstr = "Basic " + to_native_string( b64encode(b":".join((username, password))).strip() ) return authstr class AuthBase: """Base class that all auth implementations derive from""" def __call__(self, r): raise NotImplementedError("Auth hooks must be callable.") class HTTPBasicAuth(AuthBase): """Attaches HTTP Basic Authentication to the given Request object.""" def __init__(self, username, password): self.username = username self.password = password def __eq__(self, other): return all( [ self.username == getattr(other, "username", None), self.password == getattr(other, "password", None), ] ) def __ne__(self, other): return not self == other def __call__(self, r): r.headers["Authorization"] = _basic_auth_str(self.username, self.password) return r class HTTPProxyAuth(HTTPBasicAuth): """Attaches HTTP Proxy Authentication to a given Request object.""" def __call__(self, r): r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) return r class HTTPDigestAuth(AuthBase): """Attaches HTTP Digest Authentication to the given Request object.""" def __init__(self, username, password): self.username = username self.password = password # Keep state in per-thread local storage self._thread_local = threading.local() def init_per_thread_state(self): # Ensure state is initialized just once per-thread if not hasattr(self._thread_local, "init"): self._thread_local.init = True self._thread_local.last_nonce = "" self._thread_local.nonce_count = 0 self._thread_local.chal = {} self._thread_local.pos = None self._thread_local.num_401_calls = None def build_digest_header(self, method, url): """ :rtype: str """ realm = self._thread_local.chal["realm"] nonce = self._thread_local.chal["nonce"] qop = self._thread_local.chal.get("qop") algorithm = self._thread_local.chal.get("algorithm") opaque = self._thread_local.chal.get("opaque") hash_utf8 = None if algorithm is None: _algorithm = "MD5" else: _algorithm = algorithm.upper() # lambdas assume digest modules are imported at the top level if _algorithm == "MD5" or _algorithm == "MD5-SESS": def md5_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.md5(x).hexdigest() hash_utf8 = md5_utf8 elif _algorithm == "SHA": def sha_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.sha1(x).hexdigest() hash_utf8 = sha_utf8 elif _algorithm == "SHA-256": def sha256_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.sha256(x).hexdigest() hash_utf8 = sha256_utf8 elif _algorithm == "SHA-512": def sha512_utf8(x): if isinstance(x, str): x = x.encode("utf-8") return hashlib.sha512(x).hexdigest() hash_utf8 = sha512_utf8 KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731 if hash_utf8 is None: return None # XXX not implemented yet entdig = None p_parsed = urlparse(url) #: path is request-uri defined in RFC 2616 which should not be empty path = p_parsed.path or "/" if p_parsed.query: path += f"?{p_parsed.query}" A1 = f"{self.username}:{realm}:{self.password}" A2 = f"{method}:{path}" HA1 = hash_utf8(A1) HA2 = hash_utf8(A2) if nonce == self._thread_local.last_nonce: self._thread_local.nonce_count += 1 else: self._thread_local.nonce_count = 1 ncvalue = f"{self._thread_local.nonce_count:08x}" s = str(self._thread_local.nonce_count).encode("utf-8") s += nonce.encode("utf-8") s += time.ctime().encode("utf-8") s += os.urandom(8) cnonce = hashlib.sha1(s).hexdigest()[:16] if _algorithm == "MD5-SESS": HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") if not qop: respdig = KD(HA1, f"{nonce}:{HA2}") elif qop == "auth" or "auth" in qop.split(","): noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" respdig = KD(HA1, noncebit) else: # XXX handle auth-int. return None self._thread_local.last_nonce = nonce # XXX should the partial digests be encoded too? base = ( f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' f'uri="{path}", response="{respdig}"' ) if opaque: base += f', opaque="{opaque}"' if algorithm: base += f', algorithm="{algorithm}"' if entdig: base += f', digest="{entdig}"' if qop: base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' return f"Digest {base}" def handle_redirect(self, r, **kwargs): """Reset num_401_calls counter on redirects.""" if r.is_redirect: self._thread_local.num_401_calls = 1 def handle_401(self, r, **kwargs): """ Takes the given response and tries digest-auth, if needed. :rtype: requests.Response """ # If response is not 4xx, do not auth # See https://github.com/psf/requests/issues/3772 if not 400 <= r.status_code < 500: self._thread_local.num_401_calls = 1 return r if self._thread_local.pos is not None: # Rewind the file position indicator of the body to where # it was to resend the request. r.request.body.seek(self._thread_local.pos) s_auth = r.headers.get("www-authenticate", "") if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: self._thread_local.num_401_calls += 1 pat = re.compile(r"digest ", flags=re.IGNORECASE) self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) # Consume content and release the original connection # to allow our new request to reuse the same one. r.content r.close() prep = r.request.copy() extract_cookies_to_jar(prep._cookies, r.request, r.raw) prep.prepare_cookies(prep._cookies) prep.headers["Authorization"] = self.build_digest_header( prep.method, prep.url ) _r = r.connection.send(prep, **kwargs) _r.history.append(r) _r.request = prep return _r self._thread_local.num_401_calls = 1 return r def __call__(self, r): # Initialize per-thread state, if needed self.init_per_thread_state() # If we have a saved nonce, skip the 401 if self._thread_local.last_nonce: r.headers["Authorization"] = self.build_digest_header(r.method, r.url) try: self._thread_local.pos = r.body.tell() except AttributeError: # In the case of HTTPDigestAuth being reused and the body of # the previous request was a file-like object, pos has the # file position of the previous body. Ensure it's set to # None. self._thread_local.pos = None r.register_hook("response", self.handle_401) r.register_hook("response", self.handle_redirect) self._thread_local.num_401_calls = 1 return r def __eq__(self, other): return all( [ self.username == getattr(other, "username", None), self.password == getattr(other, "password", None), ] ) def __ne__(self, other): return not self == other