jpayne@7: from __future__ import annotations jpayne@7: jpayne@7: import re jpayne@7: import typing jpayne@7: jpayne@7: from ..exceptions import LocationParseError jpayne@7: from .util import to_str jpayne@7: jpayne@7: # We only want to normalize urls with an HTTP(S) scheme. jpayne@7: # urllib3 infers URLs without a scheme (None) to be http. jpayne@7: _NORMALIZABLE_SCHEMES = ("http", "https", None) jpayne@7: jpayne@7: # Almost all of these patterns were derived from the jpayne@7: # 'rfc3986' module: https://github.com/python-hyper/rfc3986 jpayne@7: _PERCENT_RE = re.compile(r"%[a-fA-F0-9]{2}") jpayne@7: _SCHEME_RE = re.compile(r"^(?:[a-zA-Z][a-zA-Z0-9+-]*:|/)") jpayne@7: _URI_RE = re.compile( jpayne@7: r"^(?:([a-zA-Z][a-zA-Z0-9+.-]*):)?" jpayne@7: r"(?://([^\\/?#]*))?" jpayne@7: r"([^?#]*)" jpayne@7: r"(?:\?([^#]*))?" jpayne@7: r"(?:#(.*))?$", jpayne@7: re.UNICODE | re.DOTALL, jpayne@7: ) jpayne@7: jpayne@7: _IPV4_PAT = r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}" jpayne@7: _HEX_PAT = "[0-9A-Fa-f]{1,4}" jpayne@7: _LS32_PAT = "(?:{hex}:{hex}|{ipv4})".format(hex=_HEX_PAT, ipv4=_IPV4_PAT) jpayne@7: _subs = {"hex": _HEX_PAT, "ls32": _LS32_PAT} jpayne@7: _variations = [ jpayne@7: # 6( h16 ":" ) ls32 jpayne@7: "(?:%(hex)s:){6}%(ls32)s", jpayne@7: # "::" 5( h16 ":" ) ls32 jpayne@7: "::(?:%(hex)s:){5}%(ls32)s", jpayne@7: # [ h16 ] "::" 4( h16 ":" ) ls32 jpayne@7: "(?:%(hex)s)?::(?:%(hex)s:){4}%(ls32)s", jpayne@7: # [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 jpayne@7: "(?:(?:%(hex)s:)?%(hex)s)?::(?:%(hex)s:){3}%(ls32)s", jpayne@7: # [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 jpayne@7: "(?:(?:%(hex)s:){0,2}%(hex)s)?::(?:%(hex)s:){2}%(ls32)s", jpayne@7: # [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 jpayne@7: "(?:(?:%(hex)s:){0,3}%(hex)s)?::%(hex)s:%(ls32)s", jpayne@7: # [ *4( h16 ":" ) h16 ] "::" ls32 jpayne@7: "(?:(?:%(hex)s:){0,4}%(hex)s)?::%(ls32)s", jpayne@7: # [ *5( h16 ":" ) h16 ] "::" h16 jpayne@7: "(?:(?:%(hex)s:){0,5}%(hex)s)?::%(hex)s", jpayne@7: # [ *6( h16 ":" ) h16 ] "::" jpayne@7: "(?:(?:%(hex)s:){0,6}%(hex)s)?::", jpayne@7: ] jpayne@7: jpayne@7: _UNRESERVED_PAT = r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._\-~" jpayne@7: _IPV6_PAT = "(?:" + "|".join([x % _subs for x in _variations]) + ")" jpayne@7: _ZONE_ID_PAT = "(?:%25|%)(?:[" + _UNRESERVED_PAT + "]|%[a-fA-F0-9]{2})+" jpayne@7: _IPV6_ADDRZ_PAT = r"\[" + _IPV6_PAT + r"(?:" + _ZONE_ID_PAT + r")?\]" jpayne@7: _REG_NAME_PAT = r"(?:[^\[\]%:/?#]|%[a-fA-F0-9]{2})*" jpayne@7: _TARGET_RE = re.compile(r"^(/[^?#]*)(?:\?([^#]*))?(?:#.*)?$") jpayne@7: jpayne@7: _IPV4_RE = re.compile("^" + _IPV4_PAT + "$") jpayne@7: _IPV6_RE = re.compile("^" + _IPV6_PAT + "$") jpayne@7: _IPV6_ADDRZ_RE = re.compile("^" + _IPV6_ADDRZ_PAT + "$") jpayne@7: _BRACELESS_IPV6_ADDRZ_RE = re.compile("^" + _IPV6_ADDRZ_PAT[2:-2] + "$") jpayne@7: _ZONE_ID_RE = re.compile("(" + _ZONE_ID_PAT + r")\]$") jpayne@7: jpayne@7: _HOST_PORT_PAT = ("^(%s|%s|%s)(?::0*?(|0|[1-9][0-9]{0,4}))?$") % ( jpayne@7: _REG_NAME_PAT, jpayne@7: _IPV4_PAT, jpayne@7: _IPV6_ADDRZ_PAT, jpayne@7: ) jpayne@7: _HOST_PORT_RE = re.compile(_HOST_PORT_PAT, re.UNICODE | re.DOTALL) jpayne@7: jpayne@7: _UNRESERVED_CHARS = set( jpayne@7: "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-~" jpayne@7: ) jpayne@7: _SUB_DELIM_CHARS = set("!$&'()*+,;=") jpayne@7: _USERINFO_CHARS = _UNRESERVED_CHARS | _SUB_DELIM_CHARS | {":"} jpayne@7: _PATH_CHARS = _USERINFO_CHARS | {"@", "/"} jpayne@7: _QUERY_CHARS = _FRAGMENT_CHARS = _PATH_CHARS | {"?"} jpayne@7: jpayne@7: jpayne@7: class Url( jpayne@7: typing.NamedTuple( jpayne@7: "Url", jpayne@7: [ jpayne@7: ("scheme", typing.Optional[str]), jpayne@7: ("auth", typing.Optional[str]), jpayne@7: ("host", typing.Optional[str]), jpayne@7: ("port", typing.Optional[int]), jpayne@7: ("path", typing.Optional[str]), jpayne@7: ("query", typing.Optional[str]), jpayne@7: ("fragment", typing.Optional[str]), jpayne@7: ], jpayne@7: ) jpayne@7: ): jpayne@7: """ jpayne@7: Data structure for representing an HTTP URL. Used as a return value for jpayne@7: :func:`parse_url`. Both the scheme and host are normalized as they are jpayne@7: both case-insensitive according to RFC 3986. jpayne@7: """ jpayne@7: jpayne@7: def __new__( # type: ignore[no-untyped-def] jpayne@7: cls, jpayne@7: scheme: str | None = None, jpayne@7: auth: str | None = None, jpayne@7: host: str | None = None, jpayne@7: port: int | None = None, jpayne@7: path: str | None = None, jpayne@7: query: str | None = None, jpayne@7: fragment: str | None = None, jpayne@7: ): jpayne@7: if path and not path.startswith("/"): jpayne@7: path = "/" + path jpayne@7: if scheme is not None: jpayne@7: scheme = scheme.lower() jpayne@7: return super().__new__(cls, scheme, auth, host, port, path, query, fragment) jpayne@7: jpayne@7: @property jpayne@7: def hostname(self) -> str | None: jpayne@7: """For backwards-compatibility with urlparse. We're nice like that.""" jpayne@7: return self.host jpayne@7: jpayne@7: @property jpayne@7: def request_uri(self) -> str: jpayne@7: """Absolute path including the query string.""" jpayne@7: uri = self.path or "/" jpayne@7: jpayne@7: if self.query is not None: jpayne@7: uri += "?" + self.query jpayne@7: jpayne@7: return uri jpayne@7: jpayne@7: @property jpayne@7: def authority(self) -> str | None: jpayne@7: """ jpayne@7: Authority component as defined in RFC 3986 3.2. jpayne@7: This includes userinfo (auth), host and port. jpayne@7: jpayne@7: i.e. jpayne@7: userinfo@host:port jpayne@7: """ jpayne@7: userinfo = self.auth jpayne@7: netloc = self.netloc jpayne@7: if netloc is None or userinfo is None: jpayne@7: return netloc jpayne@7: else: jpayne@7: return f"{userinfo}@{netloc}" jpayne@7: jpayne@7: @property jpayne@7: def netloc(self) -> str | None: jpayne@7: """ jpayne@7: Network location including host and port. jpayne@7: jpayne@7: If you need the equivalent of urllib.parse's ``netloc``, jpayne@7: use the ``authority`` property instead. jpayne@7: """ jpayne@7: if self.host is None: jpayne@7: return None jpayne@7: if self.port: jpayne@7: return f"{self.host}:{self.port}" jpayne@7: return self.host jpayne@7: jpayne@7: @property jpayne@7: def url(self) -> str: jpayne@7: """ jpayne@7: Convert self into a url jpayne@7: jpayne@7: This function should more or less round-trip with :func:`.parse_url`. The jpayne@7: returned url may not be exactly the same as the url inputted to jpayne@7: :func:`.parse_url`, but it should be equivalent by the RFC (e.g., urls jpayne@7: with a blank port will have : removed). jpayne@7: jpayne@7: Example: jpayne@7: jpayne@7: .. code-block:: python jpayne@7: jpayne@7: import urllib3 jpayne@7: jpayne@7: U = urllib3.util.parse_url("https://google.com/mail/") jpayne@7: jpayne@7: print(U.url) jpayne@7: # "https://google.com/mail/" jpayne@7: jpayne@7: print( urllib3.util.Url("https", "username:password", jpayne@7: "host.com", 80, "/path", "query", "fragment" jpayne@7: ).url jpayne@7: ) jpayne@7: # "https://username:password@host.com:80/path?query#fragment" jpayne@7: """ jpayne@7: scheme, auth, host, port, path, query, fragment = self jpayne@7: url = "" jpayne@7: jpayne@7: # We use "is not None" we want things to happen with empty strings (or 0 port) jpayne@7: if scheme is not None: jpayne@7: url += scheme + "://" jpayne@7: if auth is not None: jpayne@7: url += auth + "@" jpayne@7: if host is not None: jpayne@7: url += host jpayne@7: if port is not None: jpayne@7: url += ":" + str(port) jpayne@7: if path is not None: jpayne@7: url += path jpayne@7: if query is not None: jpayne@7: url += "?" + query jpayne@7: if fragment is not None: jpayne@7: url += "#" + fragment jpayne@7: jpayne@7: return url jpayne@7: jpayne@7: def __str__(self) -> str: jpayne@7: return self.url jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def _encode_invalid_chars( jpayne@7: component: str, allowed_chars: typing.Container[str] jpayne@7: ) -> str: # Abstract jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def _encode_invalid_chars( jpayne@7: component: None, allowed_chars: typing.Container[str] jpayne@7: ) -> None: # Abstract jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: def _encode_invalid_chars( jpayne@7: component: str | None, allowed_chars: typing.Container[str] jpayne@7: ) -> str | None: jpayne@7: """Percent-encodes a URI component without reapplying jpayne@7: onto an already percent-encoded component. jpayne@7: """ jpayne@7: if component is None: jpayne@7: return component jpayne@7: jpayne@7: component = to_str(component) jpayne@7: jpayne@7: # Normalize existing percent-encoded bytes. jpayne@7: # Try to see if the component we're encoding is already percent-encoded jpayne@7: # so we can skip all '%' characters but still encode all others. jpayne@7: component, percent_encodings = _PERCENT_RE.subn( jpayne@7: lambda match: match.group(0).upper(), component jpayne@7: ) jpayne@7: jpayne@7: uri_bytes = component.encode("utf-8", "surrogatepass") jpayne@7: is_percent_encoded = percent_encodings == uri_bytes.count(b"%") jpayne@7: encoded_component = bytearray() jpayne@7: jpayne@7: for i in range(0, len(uri_bytes)): jpayne@7: # Will return a single character bytestring jpayne@7: byte = uri_bytes[i : i + 1] jpayne@7: byte_ord = ord(byte) jpayne@7: if (is_percent_encoded and byte == b"%") or ( jpayne@7: byte_ord < 128 and byte.decode() in allowed_chars jpayne@7: ): jpayne@7: encoded_component += byte jpayne@7: continue jpayne@7: encoded_component.extend(b"%" + (hex(byte_ord)[2:].encode().zfill(2).upper())) jpayne@7: jpayne@7: return encoded_component.decode() jpayne@7: jpayne@7: jpayne@7: def _remove_path_dot_segments(path: str) -> str: jpayne@7: # See http://tools.ietf.org/html/rfc3986#section-5.2.4 for pseudo-code jpayne@7: segments = path.split("/") # Turn the path into a list of segments jpayne@7: output = [] # Initialize the variable to use to store output jpayne@7: jpayne@7: for segment in segments: jpayne@7: # '.' is the current directory, so ignore it, it is superfluous jpayne@7: if segment == ".": jpayne@7: continue jpayne@7: # Anything other than '..', should be appended to the output jpayne@7: if segment != "..": jpayne@7: output.append(segment) jpayne@7: # In this case segment == '..', if we can, we should pop the last jpayne@7: # element jpayne@7: elif output: jpayne@7: output.pop() jpayne@7: jpayne@7: # If the path starts with '/' and the output is empty or the first string jpayne@7: # is non-empty jpayne@7: if path.startswith("/") and (not output or output[0]): jpayne@7: output.insert(0, "") jpayne@7: jpayne@7: # If the path starts with '/.' or '/..' ensure we add one more empty jpayne@7: # string to add a trailing '/' jpayne@7: if path.endswith(("/.", "/..")): jpayne@7: output.append("") jpayne@7: jpayne@7: return "/".join(output) jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def _normalize_host(host: None, scheme: str | None) -> None: jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: @typing.overload jpayne@7: def _normalize_host(host: str, scheme: str | None) -> str: jpayne@7: ... jpayne@7: jpayne@7: jpayne@7: def _normalize_host(host: str | None, scheme: str | None) -> str | None: jpayne@7: if host: jpayne@7: if scheme in _NORMALIZABLE_SCHEMES: jpayne@7: is_ipv6 = _IPV6_ADDRZ_RE.match(host) jpayne@7: if is_ipv6: jpayne@7: # IPv6 hosts of the form 'a::b%zone' are encoded in a URL as jpayne@7: # such per RFC 6874: 'a::b%25zone'. Unquote the ZoneID jpayne@7: # separator as necessary to return a valid RFC 4007 scoped IP. jpayne@7: match = _ZONE_ID_RE.search(host) jpayne@7: if match: jpayne@7: start, end = match.span(1) jpayne@7: zone_id = host[start:end] jpayne@7: jpayne@7: if zone_id.startswith("%25") and zone_id != "%25": jpayne@7: zone_id = zone_id[3:] jpayne@7: else: jpayne@7: zone_id = zone_id[1:] jpayne@7: zone_id = _encode_invalid_chars(zone_id, _UNRESERVED_CHARS) jpayne@7: return f"{host[:start].lower()}%{zone_id}{host[end:]}" jpayne@7: else: jpayne@7: return host.lower() jpayne@7: elif not _IPV4_RE.match(host): jpayne@7: return to_str( jpayne@7: b".".join([_idna_encode(label) for label in host.split(".")]), jpayne@7: "ascii", jpayne@7: ) jpayne@7: return host jpayne@7: jpayne@7: jpayne@7: def _idna_encode(name: str) -> bytes: jpayne@7: if not name.isascii(): jpayne@7: try: jpayne@7: import idna jpayne@7: except ImportError: jpayne@7: raise LocationParseError( jpayne@7: "Unable to parse URL without the 'idna' module" jpayne@7: ) from None jpayne@7: jpayne@7: try: jpayne@7: return idna.encode(name.lower(), strict=True, std3_rules=True) jpayne@7: except idna.IDNAError: jpayne@7: raise LocationParseError( jpayne@7: f"Name '{name}' is not a valid IDNA label" jpayne@7: ) from None jpayne@7: jpayne@7: return name.lower().encode("ascii") jpayne@7: jpayne@7: jpayne@7: def _encode_target(target: str) -> str: jpayne@7: """Percent-encodes a request target so that there are no invalid characters jpayne@7: jpayne@7: Pre-condition for this function is that 'target' must start with '/'. jpayne@7: If that is the case then _TARGET_RE will always produce a match. jpayne@7: """ jpayne@7: match = _TARGET_RE.match(target) jpayne@7: if not match: # Defensive: jpayne@7: raise LocationParseError(f"{target!r} is not a valid request URI") jpayne@7: jpayne@7: path, query = match.groups() jpayne@7: encoded_target = _encode_invalid_chars(path, _PATH_CHARS) jpayne@7: if query is not None: jpayne@7: query = _encode_invalid_chars(query, _QUERY_CHARS) jpayne@7: encoded_target += "?" + query jpayne@7: return encoded_target jpayne@7: jpayne@7: jpayne@7: def parse_url(url: str) -> Url: jpayne@7: """ jpayne@7: Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is jpayne@7: performed to parse incomplete urls. Fields not provided will be None. jpayne@7: This parser is RFC 3986 and RFC 6874 compliant. jpayne@7: jpayne@7: The parser logic and helper functions are based heavily on jpayne@7: work done in the ``rfc3986`` module. jpayne@7: jpayne@7: :param str url: URL to parse into a :class:`.Url` namedtuple. jpayne@7: jpayne@7: Partly backwards-compatible with :mod:`urllib.parse`. jpayne@7: jpayne@7: Example: jpayne@7: jpayne@7: .. code-block:: python jpayne@7: jpayne@7: import urllib3 jpayne@7: jpayne@7: print( urllib3.util.parse_url('http://google.com/mail/')) jpayne@7: # Url(scheme='http', host='google.com', port=None, path='/mail/', ...) jpayne@7: jpayne@7: print( urllib3.util.parse_url('google.com:80')) jpayne@7: # Url(scheme=None, host='google.com', port=80, path=None, ...) jpayne@7: jpayne@7: print( urllib3.util.parse_url('/foo?bar')) jpayne@7: # Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...) jpayne@7: """ jpayne@7: if not url: jpayne@7: # Empty jpayne@7: return Url() jpayne@7: jpayne@7: source_url = url jpayne@7: if not _SCHEME_RE.search(url): jpayne@7: url = "//" + url jpayne@7: jpayne@7: scheme: str | None jpayne@7: authority: str | None jpayne@7: auth: str | None jpayne@7: host: str | None jpayne@7: port: str | None jpayne@7: port_int: int | None jpayne@7: path: str | None jpayne@7: query: str | None jpayne@7: fragment: str | None jpayne@7: jpayne@7: try: jpayne@7: scheme, authority, path, query, fragment = _URI_RE.match(url).groups() # type: ignore[union-attr] jpayne@7: normalize_uri = scheme is None or scheme.lower() in _NORMALIZABLE_SCHEMES jpayne@7: jpayne@7: if scheme: jpayne@7: scheme = scheme.lower() jpayne@7: jpayne@7: if authority: jpayne@7: auth, _, host_port = authority.rpartition("@") jpayne@7: auth = auth or None jpayne@7: host, port = _HOST_PORT_RE.match(host_port).groups() # type: ignore[union-attr] jpayne@7: if auth and normalize_uri: jpayne@7: auth = _encode_invalid_chars(auth, _USERINFO_CHARS) jpayne@7: if port == "": jpayne@7: port = None jpayne@7: else: jpayne@7: auth, host, port = None, None, None jpayne@7: jpayne@7: if port is not None: jpayne@7: port_int = int(port) jpayne@7: if not (0 <= port_int <= 65535): jpayne@7: raise LocationParseError(url) jpayne@7: else: jpayne@7: port_int = None jpayne@7: jpayne@7: host = _normalize_host(host, scheme) jpayne@7: jpayne@7: if normalize_uri and path: jpayne@7: path = _remove_path_dot_segments(path) jpayne@7: path = _encode_invalid_chars(path, _PATH_CHARS) jpayne@7: if normalize_uri and query: jpayne@7: query = _encode_invalid_chars(query, _QUERY_CHARS) jpayne@7: if normalize_uri and fragment: jpayne@7: fragment = _encode_invalid_chars(fragment, _FRAGMENT_CHARS) jpayne@7: jpayne@7: except (ValueError, AttributeError) as e: jpayne@7: raise LocationParseError(source_url) from e jpayne@7: jpayne@7: # For the sake of backwards compatibility we put empty jpayne@7: # string values for path if there are any defined values jpayne@7: # beyond the path in the URL. jpayne@7: # TODO: Remove this when we break backwards compatibility. jpayne@7: if not path: jpayne@7: if query is not None or fragment is not None: jpayne@7: path = "" jpayne@7: else: jpayne@7: path = None jpayne@7: jpayne@7: return Url( jpayne@7: scheme=scheme, jpayne@7: auth=auth, jpayne@7: host=host, jpayne@7: port=port_int, jpayne@7: path=path, jpayne@7: query=query, jpayne@7: fragment=fragment, jpayne@7: )