jpayne@69: from __future__ import annotations jpayne@69: jpayne@69: import csv jpayne@69: import hashlib jpayne@69: import os.path jpayne@69: import re jpayne@69: import stat jpayne@69: import time jpayne@69: from io import StringIO, TextIOWrapper jpayne@69: from typing import IO, TYPE_CHECKING, Literal jpayne@69: from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo jpayne@69: jpayne@69: from wheel.cli import WheelError jpayne@69: from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode jpayne@69: jpayne@69: if TYPE_CHECKING: jpayne@69: from typing import Protocol, Sized, Union jpayne@69: jpayne@69: from typing_extensions import Buffer jpayne@69: jpayne@69: StrPath = Union[str, os.PathLike[str]] jpayne@69: jpayne@69: class SizedBuffer(Sized, Buffer, Protocol): ... jpayne@69: jpayne@69: jpayne@69: # Non-greedy matching of an optional build number may be too clever (more jpayne@69: # invalid wheel filenames will match). Separate regex for .dist-info? jpayne@69: WHEEL_INFO_RE = re.compile( jpayne@69: r"""^(?P(?P[^\s-]+?)-(?P[^\s-]+?))(-(?P\d[^\s-]*))? jpayne@69: -(?P[^\s-]+?)-(?P[^\s-]+?)-(?P\S+)\.whl$""", jpayne@69: re.VERBOSE, jpayne@69: ) jpayne@69: MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC jpayne@69: jpayne@69: jpayne@69: def get_zipinfo_datetime(timestamp: float | None = None): jpayne@69: # Some applications need reproducible .whl files, but they can't do this without jpayne@69: # forcing the timestamp of the individual ZipInfo objects. See issue #143. jpayne@69: timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time())) jpayne@69: timestamp = max(timestamp, MINIMUM_TIMESTAMP) jpayne@69: return time.gmtime(timestamp)[0:6] jpayne@69: jpayne@69: jpayne@69: class WheelFile(ZipFile): jpayne@69: """A ZipFile derivative class that also reads SHA-256 hashes from jpayne@69: .dist-info/RECORD and checks any read files against those. jpayne@69: """ jpayne@69: jpayne@69: _default_algorithm = hashlib.sha256 jpayne@69: jpayne@69: def __init__( jpayne@69: self, jpayne@69: file: StrPath, jpayne@69: mode: Literal["r", "w", "x", "a"] = "r", jpayne@69: compression: int = ZIP_DEFLATED, jpayne@69: ): jpayne@69: basename = os.path.basename(file) jpayne@69: self.parsed_filename = WHEEL_INFO_RE.match(basename) jpayne@69: if not basename.endswith(".whl") or self.parsed_filename is None: jpayne@69: raise WheelError(f"Bad wheel filename {basename!r}") jpayne@69: jpayne@69: ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True) jpayne@69: jpayne@69: self.dist_info_path = "{}.dist-info".format( jpayne@69: self.parsed_filename.group("namever") jpayne@69: ) jpayne@69: self.record_path = self.dist_info_path + "/RECORD" jpayne@69: self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {} jpayne@69: self._file_sizes = {} jpayne@69: if mode == "r": jpayne@69: # Ignore RECORD and any embedded wheel signatures jpayne@69: self._file_hashes[self.record_path] = None, None jpayne@69: self._file_hashes[self.record_path + ".jws"] = None, None jpayne@69: self._file_hashes[self.record_path + ".p7s"] = None, None jpayne@69: jpayne@69: # Fill in the expected hashes by reading them from RECORD jpayne@69: try: jpayne@69: record = self.open(self.record_path) jpayne@69: except KeyError: jpayne@69: raise WheelError(f"Missing {self.record_path} file") from None jpayne@69: jpayne@69: with record: jpayne@69: for line in csv.reader( jpayne@69: TextIOWrapper(record, newline="", encoding="utf-8") jpayne@69: ): jpayne@69: path, hash_sum, size = line jpayne@69: if not hash_sum: jpayne@69: continue jpayne@69: jpayne@69: algorithm, hash_sum = hash_sum.split("=") jpayne@69: try: jpayne@69: hashlib.new(algorithm) jpayne@69: except ValueError: jpayne@69: raise WheelError( jpayne@69: f"Unsupported hash algorithm: {algorithm}" jpayne@69: ) from None jpayne@69: jpayne@69: if algorithm.lower() in {"md5", "sha1"}: jpayne@69: raise WheelError( jpayne@69: f"Weak hash algorithm ({algorithm}) is not permitted by " jpayne@69: f"PEP 427" jpayne@69: ) jpayne@69: jpayne@69: self._file_hashes[path] = ( jpayne@69: algorithm, jpayne@69: urlsafe_b64decode(hash_sum.encode("ascii")), jpayne@69: ) jpayne@69: jpayne@69: def open( jpayne@69: self, jpayne@69: name_or_info: str | ZipInfo, jpayne@69: mode: Literal["r", "w"] = "r", jpayne@69: pwd: bytes | None = None, jpayne@69: ) -> IO[bytes]: jpayne@69: def _update_crc(newdata: bytes) -> None: jpayne@69: eof = ef._eof jpayne@69: update_crc_orig(newdata) jpayne@69: running_hash.update(newdata) jpayne@69: if eof and running_hash.digest() != expected_hash: jpayne@69: raise WheelError(f"Hash mismatch for file '{ef_name}'") jpayne@69: jpayne@69: ef_name = ( jpayne@69: name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info jpayne@69: ) jpayne@69: if ( jpayne@69: mode == "r" jpayne@69: and not ef_name.endswith("/") jpayne@69: and ef_name not in self._file_hashes jpayne@69: ): jpayne@69: raise WheelError(f"No hash found for file '{ef_name}'") jpayne@69: jpayne@69: ef = ZipFile.open(self, name_or_info, mode, pwd) jpayne@69: if mode == "r" and not ef_name.endswith("/"): jpayne@69: algorithm, expected_hash = self._file_hashes[ef_name] jpayne@69: if expected_hash is not None: jpayne@69: # Monkey patch the _update_crc method to also check for the hash from jpayne@69: # RECORD jpayne@69: running_hash = hashlib.new(algorithm) jpayne@69: update_crc_orig, ef._update_crc = ef._update_crc, _update_crc jpayne@69: jpayne@69: return ef jpayne@69: jpayne@69: def write_files(self, base_dir: str): jpayne@69: log.info(f"creating '{self.filename}' and adding '{base_dir}' to it") jpayne@69: deferred: list[tuple[str, str]] = [] jpayne@69: for root, dirnames, filenames in os.walk(base_dir): jpayne@69: # Sort the directory names so that `os.walk` will walk them in a jpayne@69: # defined order on the next iteration. jpayne@69: dirnames.sort() jpayne@69: for name in sorted(filenames): jpayne@69: path = os.path.normpath(os.path.join(root, name)) jpayne@69: if os.path.isfile(path): jpayne@69: arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/") jpayne@69: if arcname == self.record_path: jpayne@69: pass jpayne@69: elif root.endswith(".dist-info"): jpayne@69: deferred.append((path, arcname)) jpayne@69: else: jpayne@69: self.write(path, arcname) jpayne@69: jpayne@69: deferred.sort() jpayne@69: for path, arcname in deferred: jpayne@69: self.write(path, arcname) jpayne@69: jpayne@69: def write( jpayne@69: self, jpayne@69: filename: str, jpayne@69: arcname: str | None = None, jpayne@69: compress_type: int | None = None, jpayne@69: ) -> None: jpayne@69: with open(filename, "rb") as f: jpayne@69: st = os.fstat(f.fileno()) jpayne@69: data = f.read() jpayne@69: jpayne@69: zinfo = ZipInfo( jpayne@69: arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime) jpayne@69: ) jpayne@69: zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16 jpayne@69: zinfo.compress_type = compress_type or self.compression jpayne@69: self.writestr(zinfo, data, compress_type) jpayne@69: jpayne@69: def writestr( jpayne@69: self, jpayne@69: zinfo_or_arcname: str | ZipInfo, jpayne@69: data: SizedBuffer | str, jpayne@69: compress_type: int | None = None, jpayne@69: ): jpayne@69: if isinstance(zinfo_or_arcname, str): jpayne@69: zinfo_or_arcname = ZipInfo( jpayne@69: zinfo_or_arcname, date_time=get_zipinfo_datetime() jpayne@69: ) jpayne@69: zinfo_or_arcname.compress_type = self.compression jpayne@69: zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16 jpayne@69: jpayne@69: if isinstance(data, str): jpayne@69: data = data.encode("utf-8") jpayne@69: jpayne@69: ZipFile.writestr(self, zinfo_or_arcname, data, compress_type) jpayne@69: fname = ( jpayne@69: zinfo_or_arcname.filename jpayne@69: if isinstance(zinfo_or_arcname, ZipInfo) jpayne@69: else zinfo_or_arcname jpayne@69: ) jpayne@69: log.info(f"adding '{fname}'") jpayne@69: if fname != self.record_path: jpayne@69: hash_ = self._default_algorithm(data) jpayne@69: self._file_hashes[fname] = ( jpayne@69: hash_.name, jpayne@69: urlsafe_b64encode(hash_.digest()).decode("ascii"), jpayne@69: ) jpayne@69: self._file_sizes[fname] = len(data) jpayne@69: jpayne@69: def close(self): jpayne@69: # Write RECORD jpayne@69: if self.fp is not None and self.mode == "w" and self._file_hashes: jpayne@69: data = StringIO() jpayne@69: writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") jpayne@69: writer.writerows( jpayne@69: ( jpayne@69: (fname, algorithm + "=" + hash_, self._file_sizes[fname]) jpayne@69: for fname, (algorithm, hash_) in self._file_hashes.items() jpayne@69: ) jpayne@69: ) jpayne@69: writer.writerow((format(self.record_path), "", "")) jpayne@69: self.writestr(self.record_path, data.getvalue()) jpayne@69: jpayne@69: ZipFile.close(self)