annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/wheel/wheelfile.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 from __future__ import annotations
jpayne@68 2
jpayne@68 3 import csv
jpayne@68 4 import hashlib
jpayne@68 5 import os.path
jpayne@68 6 import re
jpayne@68 7 import stat
jpayne@68 8 import time
jpayne@68 9 from io import StringIO, TextIOWrapper
jpayne@68 10 from typing import IO, TYPE_CHECKING, Literal
jpayne@68 11 from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo
jpayne@68 12
jpayne@68 13 from wheel.cli import WheelError
jpayne@68 14 from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode
jpayne@68 15
jpayne@68 16 if TYPE_CHECKING:
jpayne@68 17 from typing import Protocol, Sized, Union
jpayne@68 18
jpayne@68 19 from typing_extensions import Buffer
jpayne@68 20
jpayne@68 21 StrPath = Union[str, os.PathLike[str]]
jpayne@68 22
jpayne@68 23 class SizedBuffer(Sized, Buffer, Protocol): ...
jpayne@68 24
jpayne@68 25
jpayne@68 26 # Non-greedy matching of an optional build number may be too clever (more
jpayne@68 27 # invalid wheel filenames will match). Separate regex for .dist-info?
jpayne@68 28 WHEEL_INFO_RE = re.compile(
jpayne@68 29 r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))?
jpayne@68 30 -(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""",
jpayne@68 31 re.VERBOSE,
jpayne@68 32 )
jpayne@68 33 MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC
jpayne@68 34
jpayne@68 35
jpayne@68 36 def get_zipinfo_datetime(timestamp: float | None = None):
jpayne@68 37 # Some applications need reproducible .whl files, but they can't do this without
jpayne@68 38 # forcing the timestamp of the individual ZipInfo objects. See issue #143.
jpayne@68 39 timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time()))
jpayne@68 40 timestamp = max(timestamp, MINIMUM_TIMESTAMP)
jpayne@68 41 return time.gmtime(timestamp)[0:6]
jpayne@68 42
jpayne@68 43
jpayne@68 44 class WheelFile(ZipFile):
jpayne@68 45 """A ZipFile derivative class that also reads SHA-256 hashes from
jpayne@68 46 .dist-info/RECORD and checks any read files against those.
jpayne@68 47 """
jpayne@68 48
jpayne@68 49 _default_algorithm = hashlib.sha256
jpayne@68 50
jpayne@68 51 def __init__(
jpayne@68 52 self,
jpayne@68 53 file: StrPath,
jpayne@68 54 mode: Literal["r", "w", "x", "a"] = "r",
jpayne@68 55 compression: int = ZIP_DEFLATED,
jpayne@68 56 ):
jpayne@68 57 basename = os.path.basename(file)
jpayne@68 58 self.parsed_filename = WHEEL_INFO_RE.match(basename)
jpayne@68 59 if not basename.endswith(".whl") or self.parsed_filename is None:
jpayne@68 60 raise WheelError(f"Bad wheel filename {basename!r}")
jpayne@68 61
jpayne@68 62 ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True)
jpayne@68 63
jpayne@68 64 self.dist_info_path = "{}.dist-info".format(
jpayne@68 65 self.parsed_filename.group("namever")
jpayne@68 66 )
jpayne@68 67 self.record_path = self.dist_info_path + "/RECORD"
jpayne@68 68 self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {}
jpayne@68 69 self._file_sizes = {}
jpayne@68 70 if mode == "r":
jpayne@68 71 # Ignore RECORD and any embedded wheel signatures
jpayne@68 72 self._file_hashes[self.record_path] = None, None
jpayne@68 73 self._file_hashes[self.record_path + ".jws"] = None, None
jpayne@68 74 self._file_hashes[self.record_path + ".p7s"] = None, None
jpayne@68 75
jpayne@68 76 # Fill in the expected hashes by reading them from RECORD
jpayne@68 77 try:
jpayne@68 78 record = self.open(self.record_path)
jpayne@68 79 except KeyError:
jpayne@68 80 raise WheelError(f"Missing {self.record_path} file") from None
jpayne@68 81
jpayne@68 82 with record:
jpayne@68 83 for line in csv.reader(
jpayne@68 84 TextIOWrapper(record, newline="", encoding="utf-8")
jpayne@68 85 ):
jpayne@68 86 path, hash_sum, size = line
jpayne@68 87 if not hash_sum:
jpayne@68 88 continue
jpayne@68 89
jpayne@68 90 algorithm, hash_sum = hash_sum.split("=")
jpayne@68 91 try:
jpayne@68 92 hashlib.new(algorithm)
jpayne@68 93 except ValueError:
jpayne@68 94 raise WheelError(
jpayne@68 95 f"Unsupported hash algorithm: {algorithm}"
jpayne@68 96 ) from None
jpayne@68 97
jpayne@68 98 if algorithm.lower() in {"md5", "sha1"}:
jpayne@68 99 raise WheelError(
jpayne@68 100 f"Weak hash algorithm ({algorithm}) is not permitted by "
jpayne@68 101 f"PEP 427"
jpayne@68 102 )
jpayne@68 103
jpayne@68 104 self._file_hashes[path] = (
jpayne@68 105 algorithm,
jpayne@68 106 urlsafe_b64decode(hash_sum.encode("ascii")),
jpayne@68 107 )
jpayne@68 108
jpayne@68 109 def open(
jpayne@68 110 self,
jpayne@68 111 name_or_info: str | ZipInfo,
jpayne@68 112 mode: Literal["r", "w"] = "r",
jpayne@68 113 pwd: bytes | None = None,
jpayne@68 114 ) -> IO[bytes]:
jpayne@68 115 def _update_crc(newdata: bytes) -> None:
jpayne@68 116 eof = ef._eof
jpayne@68 117 update_crc_orig(newdata)
jpayne@68 118 running_hash.update(newdata)
jpayne@68 119 if eof and running_hash.digest() != expected_hash:
jpayne@68 120 raise WheelError(f"Hash mismatch for file '{ef_name}'")
jpayne@68 121
jpayne@68 122 ef_name = (
jpayne@68 123 name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info
jpayne@68 124 )
jpayne@68 125 if (
jpayne@68 126 mode == "r"
jpayne@68 127 and not ef_name.endswith("/")
jpayne@68 128 and ef_name not in self._file_hashes
jpayne@68 129 ):
jpayne@68 130 raise WheelError(f"No hash found for file '{ef_name}'")
jpayne@68 131
jpayne@68 132 ef = ZipFile.open(self, name_or_info, mode, pwd)
jpayne@68 133 if mode == "r" and not ef_name.endswith("/"):
jpayne@68 134 algorithm, expected_hash = self._file_hashes[ef_name]
jpayne@68 135 if expected_hash is not None:
jpayne@68 136 # Monkey patch the _update_crc method to also check for the hash from
jpayne@68 137 # RECORD
jpayne@68 138 running_hash = hashlib.new(algorithm)
jpayne@68 139 update_crc_orig, ef._update_crc = ef._update_crc, _update_crc
jpayne@68 140
jpayne@68 141 return ef
jpayne@68 142
jpayne@68 143 def write_files(self, base_dir: str):
jpayne@68 144 log.info(f"creating '{self.filename}' and adding '{base_dir}' to it")
jpayne@68 145 deferred: list[tuple[str, str]] = []
jpayne@68 146 for root, dirnames, filenames in os.walk(base_dir):
jpayne@68 147 # Sort the directory names so that `os.walk` will walk them in a
jpayne@68 148 # defined order on the next iteration.
jpayne@68 149 dirnames.sort()
jpayne@68 150 for name in sorted(filenames):
jpayne@68 151 path = os.path.normpath(os.path.join(root, name))
jpayne@68 152 if os.path.isfile(path):
jpayne@68 153 arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/")
jpayne@68 154 if arcname == self.record_path:
jpayne@68 155 pass
jpayne@68 156 elif root.endswith(".dist-info"):
jpayne@68 157 deferred.append((path, arcname))
jpayne@68 158 else:
jpayne@68 159 self.write(path, arcname)
jpayne@68 160
jpayne@68 161 deferred.sort()
jpayne@68 162 for path, arcname in deferred:
jpayne@68 163 self.write(path, arcname)
jpayne@68 164
jpayne@68 165 def write(
jpayne@68 166 self,
jpayne@68 167 filename: str,
jpayne@68 168 arcname: str | None = None,
jpayne@68 169 compress_type: int | None = None,
jpayne@68 170 ) -> None:
jpayne@68 171 with open(filename, "rb") as f:
jpayne@68 172 st = os.fstat(f.fileno())
jpayne@68 173 data = f.read()
jpayne@68 174
jpayne@68 175 zinfo = ZipInfo(
jpayne@68 176 arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime)
jpayne@68 177 )
jpayne@68 178 zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16
jpayne@68 179 zinfo.compress_type = compress_type or self.compression
jpayne@68 180 self.writestr(zinfo, data, compress_type)
jpayne@68 181
jpayne@68 182 def writestr(
jpayne@68 183 self,
jpayne@68 184 zinfo_or_arcname: str | ZipInfo,
jpayne@68 185 data: SizedBuffer | str,
jpayne@68 186 compress_type: int | None = None,
jpayne@68 187 ):
jpayne@68 188 if isinstance(zinfo_or_arcname, str):
jpayne@68 189 zinfo_or_arcname = ZipInfo(
jpayne@68 190 zinfo_or_arcname, date_time=get_zipinfo_datetime()
jpayne@68 191 )
jpayne@68 192 zinfo_or_arcname.compress_type = self.compression
jpayne@68 193 zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16
jpayne@68 194
jpayne@68 195 if isinstance(data, str):
jpayne@68 196 data = data.encode("utf-8")
jpayne@68 197
jpayne@68 198 ZipFile.writestr(self, zinfo_or_arcname, data, compress_type)
jpayne@68 199 fname = (
jpayne@68 200 zinfo_or_arcname.filename
jpayne@68 201 if isinstance(zinfo_or_arcname, ZipInfo)
jpayne@68 202 else zinfo_or_arcname
jpayne@68 203 )
jpayne@68 204 log.info(f"adding '{fname}'")
jpayne@68 205 if fname != self.record_path:
jpayne@68 206 hash_ = self._default_algorithm(data)
jpayne@68 207 self._file_hashes[fname] = (
jpayne@68 208 hash_.name,
jpayne@68 209 urlsafe_b64encode(hash_.digest()).decode("ascii"),
jpayne@68 210 )
jpayne@68 211 self._file_sizes[fname] = len(data)
jpayne@68 212
jpayne@68 213 def close(self):
jpayne@68 214 # Write RECORD
jpayne@68 215 if self.fp is not None and self.mode == "w" and self._file_hashes:
jpayne@68 216 data = StringIO()
jpayne@68 217 writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n")
jpayne@68 218 writer.writerows(
jpayne@68 219 (
jpayne@68 220 (fname, algorithm + "=" + hash_, self._file_sizes[fname])
jpayne@68 221 for fname, (algorithm, hash_) in self._file_hashes.items()
jpayne@68 222 )
jpayne@68 223 )
jpayne@68 224 writer.writerow((format(self.record_path), "", ""))
jpayne@68 225 self.writestr(self.record_path, data.getvalue())
jpayne@68 226
jpayne@68 227 ZipFile.close(self)