Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/wheel/wheelfile.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/wheel/wheelfile.py Tue Mar 18 17:55:14 2025 -0400 @@ -0,0 +1,227 @@ +from __future__ import annotations + +import csv +import hashlib +import os.path +import re +import stat +import time +from io import StringIO, TextIOWrapper +from typing import IO, TYPE_CHECKING, Literal +from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo + +from wheel.cli import WheelError +from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode + +if TYPE_CHECKING: + from typing import Protocol, Sized, Union + + from typing_extensions import Buffer + + StrPath = Union[str, os.PathLike[str]] + + class SizedBuffer(Sized, Buffer, Protocol): ... + + +# Non-greedy matching of an optional build number may be too clever (more +# invalid wheel filenames will match). Separate regex for .dist-info? +WHEEL_INFO_RE = re.compile( + r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))? + -(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""", + re.VERBOSE, +) +MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC + + +def get_zipinfo_datetime(timestamp: float | None = None): + # Some applications need reproducible .whl files, but they can't do this without + # forcing the timestamp of the individual ZipInfo objects. See issue #143. + timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time())) + timestamp = max(timestamp, MINIMUM_TIMESTAMP) + return time.gmtime(timestamp)[0:6] + + +class WheelFile(ZipFile): + """A ZipFile derivative class that also reads SHA-256 hashes from + .dist-info/RECORD and checks any read files against those. + """ + + _default_algorithm = hashlib.sha256 + + def __init__( + self, + file: StrPath, + mode: Literal["r", "w", "x", "a"] = "r", + compression: int = ZIP_DEFLATED, + ): + basename = os.path.basename(file) + self.parsed_filename = WHEEL_INFO_RE.match(basename) + if not basename.endswith(".whl") or self.parsed_filename is None: + raise WheelError(f"Bad wheel filename {basename!r}") + + ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True) + + self.dist_info_path = "{}.dist-info".format( + self.parsed_filename.group("namever") + ) + self.record_path = self.dist_info_path + "/RECORD" + self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {} + self._file_sizes = {} + if mode == "r": + # Ignore RECORD and any embedded wheel signatures + self._file_hashes[self.record_path] = None, None + self._file_hashes[self.record_path + ".jws"] = None, None + self._file_hashes[self.record_path + ".p7s"] = None, None + + # Fill in the expected hashes by reading them from RECORD + try: + record = self.open(self.record_path) + except KeyError: + raise WheelError(f"Missing {self.record_path} file") from None + + with record: + for line in csv.reader( + TextIOWrapper(record, newline="", encoding="utf-8") + ): + path, hash_sum, size = line + if not hash_sum: + continue + + algorithm, hash_sum = hash_sum.split("=") + try: + hashlib.new(algorithm) + except ValueError: + raise WheelError( + f"Unsupported hash algorithm: {algorithm}" + ) from None + + if algorithm.lower() in {"md5", "sha1"}: + raise WheelError( + f"Weak hash algorithm ({algorithm}) is not permitted by " + f"PEP 427" + ) + + self._file_hashes[path] = ( + algorithm, + urlsafe_b64decode(hash_sum.encode("ascii")), + ) + + def open( + self, + name_or_info: str | ZipInfo, + mode: Literal["r", "w"] = "r", + pwd: bytes | None = None, + ) -> IO[bytes]: + def _update_crc(newdata: bytes) -> None: + eof = ef._eof + update_crc_orig(newdata) + running_hash.update(newdata) + if eof and running_hash.digest() != expected_hash: + raise WheelError(f"Hash mismatch for file '{ef_name}'") + + ef_name = ( + name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info + ) + if ( + mode == "r" + and not ef_name.endswith("/") + and ef_name not in self._file_hashes + ): + raise WheelError(f"No hash found for file '{ef_name}'") + + ef = ZipFile.open(self, name_or_info, mode, pwd) + if mode == "r" and not ef_name.endswith("/"): + algorithm, expected_hash = self._file_hashes[ef_name] + if expected_hash is not None: + # Monkey patch the _update_crc method to also check for the hash from + # RECORD + running_hash = hashlib.new(algorithm) + update_crc_orig, ef._update_crc = ef._update_crc, _update_crc + + return ef + + def write_files(self, base_dir: str): + log.info(f"creating '{self.filename}' and adding '{base_dir}' to it") + deferred: list[tuple[str, str]] = [] + for root, dirnames, filenames in os.walk(base_dir): + # Sort the directory names so that `os.walk` will walk them in a + # defined order on the next iteration. + dirnames.sort() + for name in sorted(filenames): + path = os.path.normpath(os.path.join(root, name)) + if os.path.isfile(path): + arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/") + if arcname == self.record_path: + pass + elif root.endswith(".dist-info"): + deferred.append((path, arcname)) + else: + self.write(path, arcname) + + deferred.sort() + for path, arcname in deferred: + self.write(path, arcname) + + def write( + self, + filename: str, + arcname: str | None = None, + compress_type: int | None = None, + ) -> None: + with open(filename, "rb") as f: + st = os.fstat(f.fileno()) + data = f.read() + + zinfo = ZipInfo( + arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime) + ) + zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16 + zinfo.compress_type = compress_type or self.compression + self.writestr(zinfo, data, compress_type) + + def writestr( + self, + zinfo_or_arcname: str | ZipInfo, + data: SizedBuffer | str, + compress_type: int | None = None, + ): + if isinstance(zinfo_or_arcname, str): + zinfo_or_arcname = ZipInfo( + zinfo_or_arcname, date_time=get_zipinfo_datetime() + ) + zinfo_or_arcname.compress_type = self.compression + zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16 + + if isinstance(data, str): + data = data.encode("utf-8") + + ZipFile.writestr(self, zinfo_or_arcname, data, compress_type) + fname = ( + zinfo_or_arcname.filename + if isinstance(zinfo_or_arcname, ZipInfo) + else zinfo_or_arcname + ) + log.info(f"adding '{fname}'") + if fname != self.record_path: + hash_ = self._default_algorithm(data) + self._file_hashes[fname] = ( + hash_.name, + urlsafe_b64encode(hash_.digest()).decode("ascii"), + ) + self._file_sizes[fname] = len(data) + + def close(self): + # Write RECORD + if self.fp is not None and self.mode == "w" and self._file_hashes: + data = StringIO() + writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") + writer.writerows( + ( + (fname, algorithm + "=" + hash_, self._file_sizes[fname]) + for fname, (algorithm, hash_) in self._file_hashes.items() + ) + ) + writer.writerow((format(self.record_path), "", "")) + self.writestr(self.record_path, data.getvalue()) + + ZipFile.close(self)