Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/wheel/wheelfile.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 from __future__ import annotations | |
2 | |
3 import csv | |
4 import hashlib | |
5 import os.path | |
6 import re | |
7 import stat | |
8 import time | |
9 from io import StringIO, TextIOWrapper | |
10 from typing import IO, TYPE_CHECKING, Literal | |
11 from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo | |
12 | |
13 from wheel.cli import WheelError | |
14 from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode | |
15 | |
16 if TYPE_CHECKING: | |
17 from typing import Protocol, Sized, Union | |
18 | |
19 from typing_extensions import Buffer | |
20 | |
21 StrPath = Union[str, os.PathLike[str]] | |
22 | |
23 class SizedBuffer(Sized, Buffer, Protocol): ... | |
24 | |
25 | |
26 # Non-greedy matching of an optional build number may be too clever (more | |
27 # invalid wheel filenames will match). Separate regex for .dist-info? | |
28 WHEEL_INFO_RE = re.compile( | |
29 r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))? | |
30 -(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""", | |
31 re.VERBOSE, | |
32 ) | |
33 MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC | |
34 | |
35 | |
36 def get_zipinfo_datetime(timestamp: float | None = None): | |
37 # Some applications need reproducible .whl files, but they can't do this without | |
38 # forcing the timestamp of the individual ZipInfo objects. See issue #143. | |
39 timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time())) | |
40 timestamp = max(timestamp, MINIMUM_TIMESTAMP) | |
41 return time.gmtime(timestamp)[0:6] | |
42 | |
43 | |
44 class WheelFile(ZipFile): | |
45 """A ZipFile derivative class that also reads SHA-256 hashes from | |
46 .dist-info/RECORD and checks any read files against those. | |
47 """ | |
48 | |
49 _default_algorithm = hashlib.sha256 | |
50 | |
51 def __init__( | |
52 self, | |
53 file: StrPath, | |
54 mode: Literal["r", "w", "x", "a"] = "r", | |
55 compression: int = ZIP_DEFLATED, | |
56 ): | |
57 basename = os.path.basename(file) | |
58 self.parsed_filename = WHEEL_INFO_RE.match(basename) | |
59 if not basename.endswith(".whl") or self.parsed_filename is None: | |
60 raise WheelError(f"Bad wheel filename {basename!r}") | |
61 | |
62 ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True) | |
63 | |
64 self.dist_info_path = "{}.dist-info".format( | |
65 self.parsed_filename.group("namever") | |
66 ) | |
67 self.record_path = self.dist_info_path + "/RECORD" | |
68 self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {} | |
69 self._file_sizes = {} | |
70 if mode == "r": | |
71 # Ignore RECORD and any embedded wheel signatures | |
72 self._file_hashes[self.record_path] = None, None | |
73 self._file_hashes[self.record_path + ".jws"] = None, None | |
74 self._file_hashes[self.record_path + ".p7s"] = None, None | |
75 | |
76 # Fill in the expected hashes by reading them from RECORD | |
77 try: | |
78 record = self.open(self.record_path) | |
79 except KeyError: | |
80 raise WheelError(f"Missing {self.record_path} file") from None | |
81 | |
82 with record: | |
83 for line in csv.reader( | |
84 TextIOWrapper(record, newline="", encoding="utf-8") | |
85 ): | |
86 path, hash_sum, size = line | |
87 if not hash_sum: | |
88 continue | |
89 | |
90 algorithm, hash_sum = hash_sum.split("=") | |
91 try: | |
92 hashlib.new(algorithm) | |
93 except ValueError: | |
94 raise WheelError( | |
95 f"Unsupported hash algorithm: {algorithm}" | |
96 ) from None | |
97 | |
98 if algorithm.lower() in {"md5", "sha1"}: | |
99 raise WheelError( | |
100 f"Weak hash algorithm ({algorithm}) is not permitted by " | |
101 f"PEP 427" | |
102 ) | |
103 | |
104 self._file_hashes[path] = ( | |
105 algorithm, | |
106 urlsafe_b64decode(hash_sum.encode("ascii")), | |
107 ) | |
108 | |
109 def open( | |
110 self, | |
111 name_or_info: str | ZipInfo, | |
112 mode: Literal["r", "w"] = "r", | |
113 pwd: bytes | None = None, | |
114 ) -> IO[bytes]: | |
115 def _update_crc(newdata: bytes) -> None: | |
116 eof = ef._eof | |
117 update_crc_orig(newdata) | |
118 running_hash.update(newdata) | |
119 if eof and running_hash.digest() != expected_hash: | |
120 raise WheelError(f"Hash mismatch for file '{ef_name}'") | |
121 | |
122 ef_name = ( | |
123 name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info | |
124 ) | |
125 if ( | |
126 mode == "r" | |
127 and not ef_name.endswith("/") | |
128 and ef_name not in self._file_hashes | |
129 ): | |
130 raise WheelError(f"No hash found for file '{ef_name}'") | |
131 | |
132 ef = ZipFile.open(self, name_or_info, mode, pwd) | |
133 if mode == "r" and not ef_name.endswith("/"): | |
134 algorithm, expected_hash = self._file_hashes[ef_name] | |
135 if expected_hash is not None: | |
136 # Monkey patch the _update_crc method to also check for the hash from | |
137 # RECORD | |
138 running_hash = hashlib.new(algorithm) | |
139 update_crc_orig, ef._update_crc = ef._update_crc, _update_crc | |
140 | |
141 return ef | |
142 | |
143 def write_files(self, base_dir: str): | |
144 log.info(f"creating '{self.filename}' and adding '{base_dir}' to it") | |
145 deferred: list[tuple[str, str]] = [] | |
146 for root, dirnames, filenames in os.walk(base_dir): | |
147 # Sort the directory names so that `os.walk` will walk them in a | |
148 # defined order on the next iteration. | |
149 dirnames.sort() | |
150 for name in sorted(filenames): | |
151 path = os.path.normpath(os.path.join(root, name)) | |
152 if os.path.isfile(path): | |
153 arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/") | |
154 if arcname == self.record_path: | |
155 pass | |
156 elif root.endswith(".dist-info"): | |
157 deferred.append((path, arcname)) | |
158 else: | |
159 self.write(path, arcname) | |
160 | |
161 deferred.sort() | |
162 for path, arcname in deferred: | |
163 self.write(path, arcname) | |
164 | |
165 def write( | |
166 self, | |
167 filename: str, | |
168 arcname: str | None = None, | |
169 compress_type: int | None = None, | |
170 ) -> None: | |
171 with open(filename, "rb") as f: | |
172 st = os.fstat(f.fileno()) | |
173 data = f.read() | |
174 | |
175 zinfo = ZipInfo( | |
176 arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime) | |
177 ) | |
178 zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16 | |
179 zinfo.compress_type = compress_type or self.compression | |
180 self.writestr(zinfo, data, compress_type) | |
181 | |
182 def writestr( | |
183 self, | |
184 zinfo_or_arcname: str | ZipInfo, | |
185 data: SizedBuffer | str, | |
186 compress_type: int | None = None, | |
187 ): | |
188 if isinstance(zinfo_or_arcname, str): | |
189 zinfo_or_arcname = ZipInfo( | |
190 zinfo_or_arcname, date_time=get_zipinfo_datetime() | |
191 ) | |
192 zinfo_or_arcname.compress_type = self.compression | |
193 zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16 | |
194 | |
195 if isinstance(data, str): | |
196 data = data.encode("utf-8") | |
197 | |
198 ZipFile.writestr(self, zinfo_or_arcname, data, compress_type) | |
199 fname = ( | |
200 zinfo_or_arcname.filename | |
201 if isinstance(zinfo_or_arcname, ZipInfo) | |
202 else zinfo_or_arcname | |
203 ) | |
204 log.info(f"adding '{fname}'") | |
205 if fname != self.record_path: | |
206 hash_ = self._default_algorithm(data) | |
207 self._file_hashes[fname] = ( | |
208 hash_.name, | |
209 urlsafe_b64encode(hash_.digest()).decode("ascii"), | |
210 ) | |
211 self._file_sizes[fname] = len(data) | |
212 | |
213 def close(self): | |
214 # Write RECORD | |
215 if self.fp is not None and self.mode == "w" and self._file_hashes: | |
216 data = StringIO() | |
217 writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") | |
218 writer.writerows( | |
219 ( | |
220 (fname, algorithm + "=" + hash_, self._file_sizes[fname]) | |
221 for fname, (algorithm, hash_) in self._file_hashes.items() | |
222 ) | |
223 ) | |
224 writer.writerow((format(self.record_path), "", "")) | |
225 self.writestr(self.record_path, data.getvalue()) | |
226 | |
227 ZipFile.close(self) |