jpayne@69
|
1 from __future__ import annotations
|
jpayne@69
|
2
|
jpayne@69
|
3 import csv
|
jpayne@69
|
4 import hashlib
|
jpayne@69
|
5 import os.path
|
jpayne@69
|
6 import re
|
jpayne@69
|
7 import stat
|
jpayne@69
|
8 import time
|
jpayne@69
|
9 from io import StringIO, TextIOWrapper
|
jpayne@69
|
10 from typing import IO, TYPE_CHECKING, Literal
|
jpayne@69
|
11 from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo
|
jpayne@69
|
12
|
jpayne@69
|
13 from wheel.cli import WheelError
|
jpayne@69
|
14 from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode
|
jpayne@69
|
15
|
jpayne@69
|
16 if TYPE_CHECKING:
|
jpayne@69
|
17 from typing import Protocol, Sized, Union
|
jpayne@69
|
18
|
jpayne@69
|
19 from typing_extensions import Buffer
|
jpayne@69
|
20
|
jpayne@69
|
21 StrPath = Union[str, os.PathLike[str]]
|
jpayne@69
|
22
|
jpayne@69
|
23 class SizedBuffer(Sized, Buffer, Protocol): ...
|
jpayne@69
|
24
|
jpayne@69
|
25
|
jpayne@69
|
26 # Non-greedy matching of an optional build number may be too clever (more
|
jpayne@69
|
27 # invalid wheel filenames will match). Separate regex for .dist-info?
|
jpayne@69
|
28 WHEEL_INFO_RE = re.compile(
|
jpayne@69
|
29 r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))?
|
jpayne@69
|
30 -(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""",
|
jpayne@69
|
31 re.VERBOSE,
|
jpayne@69
|
32 )
|
jpayne@69
|
33 MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC
|
jpayne@69
|
34
|
jpayne@69
|
35
|
jpayne@69
|
36 def get_zipinfo_datetime(timestamp: float | None = None):
|
jpayne@69
|
37 # Some applications need reproducible .whl files, but they can't do this without
|
jpayne@69
|
38 # forcing the timestamp of the individual ZipInfo objects. See issue #143.
|
jpayne@69
|
39 timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time()))
|
jpayne@69
|
40 timestamp = max(timestamp, MINIMUM_TIMESTAMP)
|
jpayne@69
|
41 return time.gmtime(timestamp)[0:6]
|
jpayne@69
|
42
|
jpayne@69
|
43
|
jpayne@69
|
44 class WheelFile(ZipFile):
|
jpayne@69
|
45 """A ZipFile derivative class that also reads SHA-256 hashes from
|
jpayne@69
|
46 .dist-info/RECORD and checks any read files against those.
|
jpayne@69
|
47 """
|
jpayne@69
|
48
|
jpayne@69
|
49 _default_algorithm = hashlib.sha256
|
jpayne@69
|
50
|
jpayne@69
|
51 def __init__(
|
jpayne@69
|
52 self,
|
jpayne@69
|
53 file: StrPath,
|
jpayne@69
|
54 mode: Literal["r", "w", "x", "a"] = "r",
|
jpayne@69
|
55 compression: int = ZIP_DEFLATED,
|
jpayne@69
|
56 ):
|
jpayne@69
|
57 basename = os.path.basename(file)
|
jpayne@69
|
58 self.parsed_filename = WHEEL_INFO_RE.match(basename)
|
jpayne@69
|
59 if not basename.endswith(".whl") or self.parsed_filename is None:
|
jpayne@69
|
60 raise WheelError(f"Bad wheel filename {basename!r}")
|
jpayne@69
|
61
|
jpayne@69
|
62 ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True)
|
jpayne@69
|
63
|
jpayne@69
|
64 self.dist_info_path = "{}.dist-info".format(
|
jpayne@69
|
65 self.parsed_filename.group("namever")
|
jpayne@69
|
66 )
|
jpayne@69
|
67 self.record_path = self.dist_info_path + "/RECORD"
|
jpayne@69
|
68 self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {}
|
jpayne@69
|
69 self._file_sizes = {}
|
jpayne@69
|
70 if mode == "r":
|
jpayne@69
|
71 # Ignore RECORD and any embedded wheel signatures
|
jpayne@69
|
72 self._file_hashes[self.record_path] = None, None
|
jpayne@69
|
73 self._file_hashes[self.record_path + ".jws"] = None, None
|
jpayne@69
|
74 self._file_hashes[self.record_path + ".p7s"] = None, None
|
jpayne@69
|
75
|
jpayne@69
|
76 # Fill in the expected hashes by reading them from RECORD
|
jpayne@69
|
77 try:
|
jpayne@69
|
78 record = self.open(self.record_path)
|
jpayne@69
|
79 except KeyError:
|
jpayne@69
|
80 raise WheelError(f"Missing {self.record_path} file") from None
|
jpayne@69
|
81
|
jpayne@69
|
82 with record:
|
jpayne@69
|
83 for line in csv.reader(
|
jpayne@69
|
84 TextIOWrapper(record, newline="", encoding="utf-8")
|
jpayne@69
|
85 ):
|
jpayne@69
|
86 path, hash_sum, size = line
|
jpayne@69
|
87 if not hash_sum:
|
jpayne@69
|
88 continue
|
jpayne@69
|
89
|
jpayne@69
|
90 algorithm, hash_sum = hash_sum.split("=")
|
jpayne@69
|
91 try:
|
jpayne@69
|
92 hashlib.new(algorithm)
|
jpayne@69
|
93 except ValueError:
|
jpayne@69
|
94 raise WheelError(
|
jpayne@69
|
95 f"Unsupported hash algorithm: {algorithm}"
|
jpayne@69
|
96 ) from None
|
jpayne@69
|
97
|
jpayne@69
|
98 if algorithm.lower() in {"md5", "sha1"}:
|
jpayne@69
|
99 raise WheelError(
|
jpayne@69
|
100 f"Weak hash algorithm ({algorithm}) is not permitted by "
|
jpayne@69
|
101 f"PEP 427"
|
jpayne@69
|
102 )
|
jpayne@69
|
103
|
jpayne@69
|
104 self._file_hashes[path] = (
|
jpayne@69
|
105 algorithm,
|
jpayne@69
|
106 urlsafe_b64decode(hash_sum.encode("ascii")),
|
jpayne@69
|
107 )
|
jpayne@69
|
108
|
jpayne@69
|
109 def open(
|
jpayne@69
|
110 self,
|
jpayne@69
|
111 name_or_info: str | ZipInfo,
|
jpayne@69
|
112 mode: Literal["r", "w"] = "r",
|
jpayne@69
|
113 pwd: bytes | None = None,
|
jpayne@69
|
114 ) -> IO[bytes]:
|
jpayne@69
|
115 def _update_crc(newdata: bytes) -> None:
|
jpayne@69
|
116 eof = ef._eof
|
jpayne@69
|
117 update_crc_orig(newdata)
|
jpayne@69
|
118 running_hash.update(newdata)
|
jpayne@69
|
119 if eof and running_hash.digest() != expected_hash:
|
jpayne@69
|
120 raise WheelError(f"Hash mismatch for file '{ef_name}'")
|
jpayne@69
|
121
|
jpayne@69
|
122 ef_name = (
|
jpayne@69
|
123 name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info
|
jpayne@69
|
124 )
|
jpayne@69
|
125 if (
|
jpayne@69
|
126 mode == "r"
|
jpayne@69
|
127 and not ef_name.endswith("/")
|
jpayne@69
|
128 and ef_name not in self._file_hashes
|
jpayne@69
|
129 ):
|
jpayne@69
|
130 raise WheelError(f"No hash found for file '{ef_name}'")
|
jpayne@69
|
131
|
jpayne@69
|
132 ef = ZipFile.open(self, name_or_info, mode, pwd)
|
jpayne@69
|
133 if mode == "r" and not ef_name.endswith("/"):
|
jpayne@69
|
134 algorithm, expected_hash = self._file_hashes[ef_name]
|
jpayne@69
|
135 if expected_hash is not None:
|
jpayne@69
|
136 # Monkey patch the _update_crc method to also check for the hash from
|
jpayne@69
|
137 # RECORD
|
jpayne@69
|
138 running_hash = hashlib.new(algorithm)
|
jpayne@69
|
139 update_crc_orig, ef._update_crc = ef._update_crc, _update_crc
|
jpayne@69
|
140
|
jpayne@69
|
141 return ef
|
jpayne@69
|
142
|
jpayne@69
|
143 def write_files(self, base_dir: str):
|
jpayne@69
|
144 log.info(f"creating '{self.filename}' and adding '{base_dir}' to it")
|
jpayne@69
|
145 deferred: list[tuple[str, str]] = []
|
jpayne@69
|
146 for root, dirnames, filenames in os.walk(base_dir):
|
jpayne@69
|
147 # Sort the directory names so that `os.walk` will walk them in a
|
jpayne@69
|
148 # defined order on the next iteration.
|
jpayne@69
|
149 dirnames.sort()
|
jpayne@69
|
150 for name in sorted(filenames):
|
jpayne@69
|
151 path = os.path.normpath(os.path.join(root, name))
|
jpayne@69
|
152 if os.path.isfile(path):
|
jpayne@69
|
153 arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/")
|
jpayne@69
|
154 if arcname == self.record_path:
|
jpayne@69
|
155 pass
|
jpayne@69
|
156 elif root.endswith(".dist-info"):
|
jpayne@69
|
157 deferred.append((path, arcname))
|
jpayne@69
|
158 else:
|
jpayne@69
|
159 self.write(path, arcname)
|
jpayne@69
|
160
|
jpayne@69
|
161 deferred.sort()
|
jpayne@69
|
162 for path, arcname in deferred:
|
jpayne@69
|
163 self.write(path, arcname)
|
jpayne@69
|
164
|
jpayne@69
|
165 def write(
|
jpayne@69
|
166 self,
|
jpayne@69
|
167 filename: str,
|
jpayne@69
|
168 arcname: str | None = None,
|
jpayne@69
|
169 compress_type: int | None = None,
|
jpayne@69
|
170 ) -> None:
|
jpayne@69
|
171 with open(filename, "rb") as f:
|
jpayne@69
|
172 st = os.fstat(f.fileno())
|
jpayne@69
|
173 data = f.read()
|
jpayne@69
|
174
|
jpayne@69
|
175 zinfo = ZipInfo(
|
jpayne@69
|
176 arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime)
|
jpayne@69
|
177 )
|
jpayne@69
|
178 zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16
|
jpayne@69
|
179 zinfo.compress_type = compress_type or self.compression
|
jpayne@69
|
180 self.writestr(zinfo, data, compress_type)
|
jpayne@69
|
181
|
jpayne@69
|
182 def writestr(
|
jpayne@69
|
183 self,
|
jpayne@69
|
184 zinfo_or_arcname: str | ZipInfo,
|
jpayne@69
|
185 data: SizedBuffer | str,
|
jpayne@69
|
186 compress_type: int | None = None,
|
jpayne@69
|
187 ):
|
jpayne@69
|
188 if isinstance(zinfo_or_arcname, str):
|
jpayne@69
|
189 zinfo_or_arcname = ZipInfo(
|
jpayne@69
|
190 zinfo_or_arcname, date_time=get_zipinfo_datetime()
|
jpayne@69
|
191 )
|
jpayne@69
|
192 zinfo_or_arcname.compress_type = self.compression
|
jpayne@69
|
193 zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16
|
jpayne@69
|
194
|
jpayne@69
|
195 if isinstance(data, str):
|
jpayne@69
|
196 data = data.encode("utf-8")
|
jpayne@69
|
197
|
jpayne@69
|
198 ZipFile.writestr(self, zinfo_or_arcname, data, compress_type)
|
jpayne@69
|
199 fname = (
|
jpayne@69
|
200 zinfo_or_arcname.filename
|
jpayne@69
|
201 if isinstance(zinfo_or_arcname, ZipInfo)
|
jpayne@69
|
202 else zinfo_or_arcname
|
jpayne@69
|
203 )
|
jpayne@69
|
204 log.info(f"adding '{fname}'")
|
jpayne@69
|
205 if fname != self.record_path:
|
jpayne@69
|
206 hash_ = self._default_algorithm(data)
|
jpayne@69
|
207 self._file_hashes[fname] = (
|
jpayne@69
|
208 hash_.name,
|
jpayne@69
|
209 urlsafe_b64encode(hash_.digest()).decode("ascii"),
|
jpayne@69
|
210 )
|
jpayne@69
|
211 self._file_sizes[fname] = len(data)
|
jpayne@69
|
212
|
jpayne@69
|
213 def close(self):
|
jpayne@69
|
214 # Write RECORD
|
jpayne@69
|
215 if self.fp is not None and self.mode == "w" and self._file_hashes:
|
jpayne@69
|
216 data = StringIO()
|
jpayne@69
|
217 writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n")
|
jpayne@69
|
218 writer.writerows(
|
jpayne@69
|
219 (
|
jpayne@69
|
220 (fname, algorithm + "=" + hash_, self._file_sizes[fname])
|
jpayne@69
|
221 for fname, (algorithm, hash_) in self._file_hashes.items()
|
jpayne@69
|
222 )
|
jpayne@69
|
223 )
|
jpayne@69
|
224 writer.writerow((format(self.record_path), "", ""))
|
jpayne@69
|
225 self.writestr(self.record_path, data.getvalue())
|
jpayne@69
|
226
|
jpayne@69
|
227 ZipFile.close(self)
|