comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/setuptools/sandbox.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 from __future__ import annotations
2
3 import builtins
4 import contextlib
5 import functools
6 import itertools
7 import operator
8 import os
9 import pickle
10 import re
11 import sys
12 import tempfile
13 import textwrap
14 from types import TracebackType
15 from typing import TYPE_CHECKING
16
17 import pkg_resources
18 from pkg_resources import working_set
19
20 from distutils.errors import DistutilsError
21
22 if sys.platform.startswith('java'):
23 import org.python.modules.posix.PosixModule as _os # pyright: ignore[reportMissingImports]
24 else:
25 _os = sys.modules[os.name]
26 _open = open
27
28
29 if TYPE_CHECKING:
30 from typing_extensions import Self
31
32 __all__ = [
33 "AbstractSandbox",
34 "DirectorySandbox",
35 "SandboxViolation",
36 "run_setup",
37 ]
38
39
40 def _execfile(filename, globals, locals=None):
41 """
42 Python 3 implementation of execfile.
43 """
44 mode = 'rb'
45 with open(filename, mode) as stream:
46 script = stream.read()
47 if locals is None:
48 locals = globals
49 code = compile(script, filename, 'exec')
50 exec(code, globals, locals)
51
52
53 @contextlib.contextmanager
54 def save_argv(repl=None):
55 saved = sys.argv[:]
56 if repl is not None:
57 sys.argv[:] = repl
58 try:
59 yield saved
60 finally:
61 sys.argv[:] = saved
62
63
64 @contextlib.contextmanager
65 def save_path():
66 saved = sys.path[:]
67 try:
68 yield saved
69 finally:
70 sys.path[:] = saved
71
72
73 @contextlib.contextmanager
74 def override_temp(replacement):
75 """
76 Monkey-patch tempfile.tempdir with replacement, ensuring it exists
77 """
78 os.makedirs(replacement, exist_ok=True)
79
80 saved = tempfile.tempdir
81
82 tempfile.tempdir = replacement
83
84 try:
85 yield
86 finally:
87 tempfile.tempdir = saved
88
89
90 @contextlib.contextmanager
91 def pushd(target):
92 saved = os.getcwd()
93 os.chdir(target)
94 try:
95 yield saved
96 finally:
97 os.chdir(saved)
98
99
100 class UnpickleableException(Exception):
101 """
102 An exception representing another Exception that could not be pickled.
103 """
104
105 @staticmethod
106 def dump(type, exc):
107 """
108 Always return a dumped (pickled) type and exc. If exc can't be pickled,
109 wrap it in UnpickleableException first.
110 """
111 try:
112 return pickle.dumps(type), pickle.dumps(exc)
113 except Exception:
114 # get UnpickleableException inside the sandbox
115 from setuptools.sandbox import UnpickleableException as cls
116
117 return cls.dump(cls, cls(repr(exc)))
118
119
120 class ExceptionSaver:
121 """
122 A Context Manager that will save an exception, serialize, and restore it
123 later.
124 """
125
126 def __enter__(self) -> Self:
127 return self
128
129 def __exit__(
130 self,
131 type: type[BaseException] | None,
132 exc: BaseException | None,
133 tb: TracebackType | None,
134 ) -> bool:
135 if not exc:
136 return False
137
138 # dump the exception
139 self._saved = UnpickleableException.dump(type, exc)
140 self._tb = tb
141
142 # suppress the exception
143 return True
144
145 def resume(self):
146 "restore and re-raise any exception"
147
148 if '_saved' not in vars(self):
149 return
150
151 type, exc = map(pickle.loads, self._saved)
152 raise exc.with_traceback(self._tb)
153
154
155 @contextlib.contextmanager
156 def save_modules():
157 """
158 Context in which imported modules are saved.
159
160 Translates exceptions internal to the context into the equivalent exception
161 outside the context.
162 """
163 saved = sys.modules.copy()
164 with ExceptionSaver() as saved_exc:
165 yield saved
166
167 sys.modules.update(saved)
168 # remove any modules imported since
169 del_modules = (
170 mod_name
171 for mod_name in sys.modules
172 if mod_name not in saved
173 # exclude any encodings modules. See #285
174 and not mod_name.startswith('encodings.')
175 )
176 _clear_modules(del_modules)
177
178 saved_exc.resume()
179
180
181 def _clear_modules(module_names):
182 for mod_name in list(module_names):
183 del sys.modules[mod_name]
184
185
186 @contextlib.contextmanager
187 def save_pkg_resources_state():
188 saved = pkg_resources.__getstate__()
189 try:
190 yield saved
191 finally:
192 pkg_resources.__setstate__(saved)
193
194
195 @contextlib.contextmanager
196 def setup_context(setup_dir):
197 temp_dir = os.path.join(setup_dir, 'temp')
198 with save_pkg_resources_state():
199 with save_modules():
200 with save_path():
201 hide_setuptools()
202 with save_argv():
203 with override_temp(temp_dir):
204 with pushd(setup_dir):
205 # ensure setuptools commands are available
206 __import__('setuptools')
207 yield
208
209
210 _MODULES_TO_HIDE = {
211 'setuptools',
212 'distutils',
213 'pkg_resources',
214 'Cython',
215 '_distutils_hack',
216 }
217
218
219 def _needs_hiding(mod_name):
220 """
221 >>> _needs_hiding('setuptools')
222 True
223 >>> _needs_hiding('pkg_resources')
224 True
225 >>> _needs_hiding('setuptools_plugin')
226 False
227 >>> _needs_hiding('setuptools.__init__')
228 True
229 >>> _needs_hiding('distutils')
230 True
231 >>> _needs_hiding('os')
232 False
233 >>> _needs_hiding('Cython')
234 True
235 """
236 base_module = mod_name.split('.', 1)[0]
237 return base_module in _MODULES_TO_HIDE
238
239
240 def hide_setuptools():
241 """
242 Remove references to setuptools' modules from sys.modules to allow the
243 invocation to import the most appropriate setuptools. This technique is
244 necessary to avoid issues such as #315 where setuptools upgrading itself
245 would fail to find a function declared in the metadata.
246 """
247 _distutils_hack = sys.modules.get('_distutils_hack', None)
248 if _distutils_hack is not None:
249 _distutils_hack._remove_shim()
250
251 modules = filter(_needs_hiding, sys.modules)
252 _clear_modules(modules)
253
254
255 def run_setup(setup_script, args):
256 """Run a distutils setup script, sandboxed in its directory"""
257 setup_dir = os.path.abspath(os.path.dirname(setup_script))
258 with setup_context(setup_dir):
259 try:
260 sys.argv[:] = [setup_script] + list(args)
261 sys.path.insert(0, setup_dir)
262 # reset to include setup dir, w/clean callback list
263 working_set.__init__()
264 working_set.callbacks.append(lambda dist: dist.activate())
265
266 with DirectorySandbox(setup_dir):
267 ns = dict(__file__=setup_script, __name__='__main__')
268 _execfile(setup_script, ns)
269 except SystemExit as v:
270 if v.args and v.args[0]:
271 raise
272 # Normal exit, just return
273
274
275 class AbstractSandbox:
276 """Wrap 'os' module and 'open()' builtin for virtualizing setup scripts"""
277
278 _active = False
279
280 def __init__(self):
281 self._attrs = [
282 name
283 for name in dir(_os)
284 if not name.startswith('_') and hasattr(self, name)
285 ]
286
287 def _copy(self, source):
288 for name in self._attrs:
289 setattr(os, name, getattr(source, name))
290
291 def __enter__(self) -> None:
292 self._copy(self)
293 builtins.open = self._open
294 self._active = True
295
296 def __exit__(
297 self,
298 exc_type: type[BaseException] | None,
299 exc_value: BaseException | None,
300 traceback: TracebackType | None,
301 ):
302 self._active = False
303 builtins.open = _open
304 self._copy(_os)
305
306 def run(self, func):
307 """Run 'func' under os sandboxing"""
308 with self:
309 return func()
310
311 def _mk_dual_path_wrapper(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099
312 original = getattr(_os, name)
313
314 def wrap(self, src, dst, *args, **kw):
315 if self._active:
316 src, dst = self._remap_pair(name, src, dst, *args, **kw)
317 return original(src, dst, *args, **kw)
318
319 return wrap
320
321 for __name in ["rename", "link", "symlink"]:
322 if hasattr(_os, __name):
323 locals()[__name] = _mk_dual_path_wrapper(__name)
324
325 def _mk_single_path_wrapper(name: str, original=None): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099
326 original = original or getattr(_os, name)
327
328 def wrap(self, path, *args, **kw):
329 if self._active:
330 path = self._remap_input(name, path, *args, **kw)
331 return original(path, *args, **kw)
332
333 return wrap
334
335 _open = _mk_single_path_wrapper('open', _open)
336 for __name in [
337 "stat",
338 "listdir",
339 "chdir",
340 "open",
341 "chmod",
342 "chown",
343 "mkdir",
344 "remove",
345 "unlink",
346 "rmdir",
347 "utime",
348 "lchown",
349 "chroot",
350 "lstat",
351 "startfile",
352 "mkfifo",
353 "mknod",
354 "pathconf",
355 "access",
356 ]:
357 if hasattr(_os, __name):
358 locals()[__name] = _mk_single_path_wrapper(__name)
359
360 def _mk_single_with_return(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099
361 original = getattr(_os, name)
362
363 def wrap(self, path, *args, **kw):
364 if self._active:
365 path = self._remap_input(name, path, *args, **kw)
366 return self._remap_output(name, original(path, *args, **kw))
367 return original(path, *args, **kw)
368
369 return wrap
370
371 for __name in ['readlink', 'tempnam']:
372 if hasattr(_os, __name):
373 locals()[__name] = _mk_single_with_return(__name)
374
375 def _mk_query(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099
376 original = getattr(_os, name)
377
378 def wrap(self, *args, **kw):
379 retval = original(*args, **kw)
380 if self._active:
381 return self._remap_output(name, retval)
382 return retval
383
384 return wrap
385
386 for __name in ['getcwd', 'tmpnam']:
387 if hasattr(_os, __name):
388 locals()[__name] = _mk_query(__name)
389
390 def _validate_path(self, path):
391 """Called to remap or validate any path, whether input or output"""
392 return path
393
394 def _remap_input(self, operation, path, *args, **kw):
395 """Called for path inputs"""
396 return self._validate_path(path)
397
398 def _remap_output(self, operation, path):
399 """Called for path outputs"""
400 return self._validate_path(path)
401
402 def _remap_pair(self, operation, src, dst, *args, **kw):
403 """Called for path pairs like rename, link, and symlink operations"""
404 return (
405 self._remap_input(operation + '-from', src, *args, **kw),
406 self._remap_input(operation + '-to', dst, *args, **kw),
407 )
408
409
410 if hasattr(os, 'devnull'):
411 _EXCEPTIONS = [os.devnull]
412 else:
413 _EXCEPTIONS = []
414
415
416 class DirectorySandbox(AbstractSandbox):
417 """Restrict operations to a single subdirectory - pseudo-chroot"""
418
419 write_ops: dict[str, None] = dict.fromkeys([
420 "open",
421 "chmod",
422 "chown",
423 "mkdir",
424 "remove",
425 "unlink",
426 "rmdir",
427 "utime",
428 "lchown",
429 "chroot",
430 "mkfifo",
431 "mknod",
432 "tempnam",
433 ])
434
435 _exception_patterns: list[str | re.Pattern] = []
436 "exempt writing to paths that match the pattern"
437
438 def __init__(self, sandbox, exceptions=_EXCEPTIONS):
439 self._sandbox = os.path.normcase(os.path.realpath(sandbox))
440 self._prefix = os.path.join(self._sandbox, '')
441 self._exceptions = [
442 os.path.normcase(os.path.realpath(path)) for path in exceptions
443 ]
444 AbstractSandbox.__init__(self)
445
446 def _violation(self, operation, *args, **kw):
447 from setuptools.sandbox import SandboxViolation
448
449 raise SandboxViolation(operation, args, kw)
450
451 def _open(self, path, mode='r', *args, **kw):
452 if mode not in ('r', 'rt', 'rb', 'rU', 'U') and not self._ok(path):
453 self._violation("open", path, mode, *args, **kw)
454 return _open(path, mode, *args, **kw)
455
456 def tmpnam(self):
457 self._violation("tmpnam")
458
459 def _ok(self, path):
460 active = self._active
461 try:
462 self._active = False
463 realpath = os.path.normcase(os.path.realpath(path))
464 return (
465 self._exempted(realpath)
466 or realpath == self._sandbox
467 or realpath.startswith(self._prefix)
468 )
469 finally:
470 self._active = active
471
472 def _exempted(self, filepath):
473 start_matches = (
474 filepath.startswith(exception) for exception in self._exceptions
475 )
476 pattern_matches = (
477 re.match(pattern, filepath) for pattern in self._exception_patterns
478 )
479 candidates = itertools.chain(start_matches, pattern_matches)
480 return any(candidates)
481
482 def _remap_input(self, operation, path, *args, **kw):
483 """Called for path inputs"""
484 if operation in self.write_ops and not self._ok(path):
485 self._violation(operation, os.path.realpath(path), *args, **kw)
486 return path
487
488 def _remap_pair(self, operation, src, dst, *args, **kw):
489 """Called for path pairs like rename, link, and symlink operations"""
490 if not self._ok(src) or not self._ok(dst):
491 self._violation(operation, src, dst, *args, **kw)
492 return (src, dst)
493
494 def open(self, file, flags, mode: int = 0o777, *args, **kw):
495 """Called for low-level os.open()"""
496 if flags & WRITE_FLAGS and not self._ok(file):
497 self._violation("os.open", file, flags, mode, *args, **kw)
498 return _os.open(file, flags, mode, *args, **kw)
499
500
501 WRITE_FLAGS = functools.reduce(
502 operator.or_,
503 [
504 getattr(_os, a, 0)
505 for a in "O_WRONLY O_RDWR O_APPEND O_CREAT O_TRUNC O_TEMPORARY".split()
506 ],
507 )
508
509
510 class SandboxViolation(DistutilsError):
511 """A setup script attempted to modify the filesystem outside the sandbox"""
512
513 tmpl = textwrap.dedent(
514 """
515 SandboxViolation: {cmd}{args!r} {kwargs}
516
517 The package setup script has attempted to modify files on your system
518 that are not within the EasyInstall build area, and has been aborted.
519
520 This package cannot be safely installed by EasyInstall, and may not
521 support alternate installation locations even if you run its setup
522 script by hand. Please inform the package's author and the EasyInstall
523 maintainers to find out if a fix or workaround is available.
524 """
525 ).lstrip()
526
527 def __str__(self) -> str:
528 cmd, args, kwargs = self.args
529 return self.tmpl.format(**locals())