jpayne@68: from __future__ import annotations jpayne@68: jpayne@68: import builtins jpayne@68: import contextlib jpayne@68: import functools jpayne@68: import itertools jpayne@68: import operator jpayne@68: import os jpayne@68: import pickle jpayne@68: import re jpayne@68: import sys jpayne@68: import tempfile jpayne@68: import textwrap jpayne@68: from types import TracebackType jpayne@68: from typing import TYPE_CHECKING jpayne@68: jpayne@68: import pkg_resources jpayne@68: from pkg_resources import working_set jpayne@68: jpayne@68: from distutils.errors import DistutilsError jpayne@68: jpayne@68: if sys.platform.startswith('java'): jpayne@68: import org.python.modules.posix.PosixModule as _os # pyright: ignore[reportMissingImports] jpayne@68: else: jpayne@68: _os = sys.modules[os.name] jpayne@68: _open = open jpayne@68: jpayne@68: jpayne@68: if TYPE_CHECKING: jpayne@68: from typing_extensions import Self jpayne@68: jpayne@68: __all__ = [ jpayne@68: "AbstractSandbox", jpayne@68: "DirectorySandbox", jpayne@68: "SandboxViolation", jpayne@68: "run_setup", jpayne@68: ] jpayne@68: jpayne@68: jpayne@68: def _execfile(filename, globals, locals=None): jpayne@68: """ jpayne@68: Python 3 implementation of execfile. jpayne@68: """ jpayne@68: mode = 'rb' jpayne@68: with open(filename, mode) as stream: jpayne@68: script = stream.read() jpayne@68: if locals is None: jpayne@68: locals = globals jpayne@68: code = compile(script, filename, 'exec') jpayne@68: exec(code, globals, locals) jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def save_argv(repl=None): jpayne@68: saved = sys.argv[:] jpayne@68: if repl is not None: jpayne@68: sys.argv[:] = repl jpayne@68: try: jpayne@68: yield saved jpayne@68: finally: jpayne@68: sys.argv[:] = saved jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def save_path(): jpayne@68: saved = sys.path[:] jpayne@68: try: jpayne@68: yield saved jpayne@68: finally: jpayne@68: sys.path[:] = saved jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def override_temp(replacement): jpayne@68: """ jpayne@68: Monkey-patch tempfile.tempdir with replacement, ensuring it exists jpayne@68: """ jpayne@68: os.makedirs(replacement, exist_ok=True) jpayne@68: jpayne@68: saved = tempfile.tempdir jpayne@68: jpayne@68: tempfile.tempdir = replacement jpayne@68: jpayne@68: try: jpayne@68: yield jpayne@68: finally: jpayne@68: tempfile.tempdir = saved jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def pushd(target): jpayne@68: saved = os.getcwd() jpayne@68: os.chdir(target) jpayne@68: try: jpayne@68: yield saved jpayne@68: finally: jpayne@68: os.chdir(saved) jpayne@68: jpayne@68: jpayne@68: class UnpickleableException(Exception): jpayne@68: """ jpayne@68: An exception representing another Exception that could not be pickled. jpayne@68: """ jpayne@68: jpayne@68: @staticmethod jpayne@68: def dump(type, exc): jpayne@68: """ jpayne@68: Always return a dumped (pickled) type and exc. If exc can't be pickled, jpayne@68: wrap it in UnpickleableException first. jpayne@68: """ jpayne@68: try: jpayne@68: return pickle.dumps(type), pickle.dumps(exc) jpayne@68: except Exception: jpayne@68: # get UnpickleableException inside the sandbox jpayne@68: from setuptools.sandbox import UnpickleableException as cls jpayne@68: jpayne@68: return cls.dump(cls, cls(repr(exc))) jpayne@68: jpayne@68: jpayne@68: class ExceptionSaver: jpayne@68: """ jpayne@68: A Context Manager that will save an exception, serialize, and restore it jpayne@68: later. jpayne@68: """ jpayne@68: jpayne@68: def __enter__(self) -> Self: jpayne@68: return self jpayne@68: jpayne@68: def __exit__( jpayne@68: self, jpayne@68: type: type[BaseException] | None, jpayne@68: exc: BaseException | None, jpayne@68: tb: TracebackType | None, jpayne@68: ) -> bool: jpayne@68: if not exc: jpayne@68: return False jpayne@68: jpayne@68: # dump the exception jpayne@68: self._saved = UnpickleableException.dump(type, exc) jpayne@68: self._tb = tb jpayne@68: jpayne@68: # suppress the exception jpayne@68: return True jpayne@68: jpayne@68: def resume(self): jpayne@68: "restore and re-raise any exception" jpayne@68: jpayne@68: if '_saved' not in vars(self): jpayne@68: return jpayne@68: jpayne@68: type, exc = map(pickle.loads, self._saved) jpayne@68: raise exc.with_traceback(self._tb) jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def save_modules(): jpayne@68: """ jpayne@68: Context in which imported modules are saved. jpayne@68: jpayne@68: Translates exceptions internal to the context into the equivalent exception jpayne@68: outside the context. jpayne@68: """ jpayne@68: saved = sys.modules.copy() jpayne@68: with ExceptionSaver() as saved_exc: jpayne@68: yield saved jpayne@68: jpayne@68: sys.modules.update(saved) jpayne@68: # remove any modules imported since jpayne@68: del_modules = ( jpayne@68: mod_name jpayne@68: for mod_name in sys.modules jpayne@68: if mod_name not in saved jpayne@68: # exclude any encodings modules. See #285 jpayne@68: and not mod_name.startswith('encodings.') jpayne@68: ) jpayne@68: _clear_modules(del_modules) jpayne@68: jpayne@68: saved_exc.resume() jpayne@68: jpayne@68: jpayne@68: def _clear_modules(module_names): jpayne@68: for mod_name in list(module_names): jpayne@68: del sys.modules[mod_name] jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def save_pkg_resources_state(): jpayne@68: saved = pkg_resources.__getstate__() jpayne@68: try: jpayne@68: yield saved jpayne@68: finally: jpayne@68: pkg_resources.__setstate__(saved) jpayne@68: jpayne@68: jpayne@68: @contextlib.contextmanager jpayne@68: def setup_context(setup_dir): jpayne@68: temp_dir = os.path.join(setup_dir, 'temp') jpayne@68: with save_pkg_resources_state(): jpayne@68: with save_modules(): jpayne@68: with save_path(): jpayne@68: hide_setuptools() jpayne@68: with save_argv(): jpayne@68: with override_temp(temp_dir): jpayne@68: with pushd(setup_dir): jpayne@68: # ensure setuptools commands are available jpayne@68: __import__('setuptools') jpayne@68: yield jpayne@68: jpayne@68: jpayne@68: _MODULES_TO_HIDE = { jpayne@68: 'setuptools', jpayne@68: 'distutils', jpayne@68: 'pkg_resources', jpayne@68: 'Cython', jpayne@68: '_distutils_hack', jpayne@68: } jpayne@68: jpayne@68: jpayne@68: def _needs_hiding(mod_name): jpayne@68: """ jpayne@68: >>> _needs_hiding('setuptools') jpayne@68: True jpayne@68: >>> _needs_hiding('pkg_resources') jpayne@68: True jpayne@68: >>> _needs_hiding('setuptools_plugin') jpayne@68: False jpayne@68: >>> _needs_hiding('setuptools.__init__') jpayne@68: True jpayne@68: >>> _needs_hiding('distutils') jpayne@68: True jpayne@68: >>> _needs_hiding('os') jpayne@68: False jpayne@68: >>> _needs_hiding('Cython') jpayne@68: True jpayne@68: """ jpayne@68: base_module = mod_name.split('.', 1)[0] jpayne@68: return base_module in _MODULES_TO_HIDE jpayne@68: jpayne@68: jpayne@68: def hide_setuptools(): jpayne@68: """ jpayne@68: Remove references to setuptools' modules from sys.modules to allow the jpayne@68: invocation to import the most appropriate setuptools. This technique is jpayne@68: necessary to avoid issues such as #315 where setuptools upgrading itself jpayne@68: would fail to find a function declared in the metadata. jpayne@68: """ jpayne@68: _distutils_hack = sys.modules.get('_distutils_hack', None) jpayne@68: if _distutils_hack is not None: jpayne@68: _distutils_hack._remove_shim() jpayne@68: jpayne@68: modules = filter(_needs_hiding, sys.modules) jpayne@68: _clear_modules(modules) jpayne@68: jpayne@68: jpayne@68: def run_setup(setup_script, args): jpayne@68: """Run a distutils setup script, sandboxed in its directory""" jpayne@68: setup_dir = os.path.abspath(os.path.dirname(setup_script)) jpayne@68: with setup_context(setup_dir): jpayne@68: try: jpayne@68: sys.argv[:] = [setup_script] + list(args) jpayne@68: sys.path.insert(0, setup_dir) jpayne@68: # reset to include setup dir, w/clean callback list jpayne@68: working_set.__init__() jpayne@68: working_set.callbacks.append(lambda dist: dist.activate()) jpayne@68: jpayne@68: with DirectorySandbox(setup_dir): jpayne@68: ns = dict(__file__=setup_script, __name__='__main__') jpayne@68: _execfile(setup_script, ns) jpayne@68: except SystemExit as v: jpayne@68: if v.args and v.args[0]: jpayne@68: raise jpayne@68: # Normal exit, just return jpayne@68: jpayne@68: jpayne@68: class AbstractSandbox: jpayne@68: """Wrap 'os' module and 'open()' builtin for virtualizing setup scripts""" jpayne@68: jpayne@68: _active = False jpayne@68: jpayne@68: def __init__(self): jpayne@68: self._attrs = [ jpayne@68: name jpayne@68: for name in dir(_os) jpayne@68: if not name.startswith('_') and hasattr(self, name) jpayne@68: ] jpayne@68: jpayne@68: def _copy(self, source): jpayne@68: for name in self._attrs: jpayne@68: setattr(os, name, getattr(source, name)) jpayne@68: jpayne@68: def __enter__(self) -> None: jpayne@68: self._copy(self) jpayne@68: builtins.open = self._open jpayne@68: self._active = True jpayne@68: jpayne@68: def __exit__( jpayne@68: self, jpayne@68: exc_type: type[BaseException] | None, jpayne@68: exc_value: BaseException | None, jpayne@68: traceback: TracebackType | None, jpayne@68: ): jpayne@68: self._active = False jpayne@68: builtins.open = _open jpayne@68: self._copy(_os) jpayne@68: jpayne@68: def run(self, func): jpayne@68: """Run 'func' under os sandboxing""" jpayne@68: with self: jpayne@68: return func() jpayne@68: jpayne@68: def _mk_dual_path_wrapper(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@68: original = getattr(_os, name) jpayne@68: jpayne@68: def wrap(self, src, dst, *args, **kw): jpayne@68: if self._active: jpayne@68: src, dst = self._remap_pair(name, src, dst, *args, **kw) jpayne@68: return original(src, dst, *args, **kw) jpayne@68: jpayne@68: return wrap jpayne@68: jpayne@68: for __name in ["rename", "link", "symlink"]: jpayne@68: if hasattr(_os, __name): jpayne@68: locals()[__name] = _mk_dual_path_wrapper(__name) jpayne@68: jpayne@68: def _mk_single_path_wrapper(name: str, original=None): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@68: original = original or getattr(_os, name) jpayne@68: jpayne@68: def wrap(self, path, *args, **kw): jpayne@68: if self._active: jpayne@68: path = self._remap_input(name, path, *args, **kw) jpayne@68: return original(path, *args, **kw) jpayne@68: jpayne@68: return wrap jpayne@68: jpayne@68: _open = _mk_single_path_wrapper('open', _open) jpayne@68: for __name in [ jpayne@68: "stat", jpayne@68: "listdir", jpayne@68: "chdir", jpayne@68: "open", jpayne@68: "chmod", jpayne@68: "chown", jpayne@68: "mkdir", jpayne@68: "remove", jpayne@68: "unlink", jpayne@68: "rmdir", jpayne@68: "utime", jpayne@68: "lchown", jpayne@68: "chroot", jpayne@68: "lstat", jpayne@68: "startfile", jpayne@68: "mkfifo", jpayne@68: "mknod", jpayne@68: "pathconf", jpayne@68: "access", jpayne@68: ]: jpayne@68: if hasattr(_os, __name): jpayne@68: locals()[__name] = _mk_single_path_wrapper(__name) jpayne@68: jpayne@68: def _mk_single_with_return(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@68: original = getattr(_os, name) jpayne@68: jpayne@68: def wrap(self, path, *args, **kw): jpayne@68: if self._active: jpayne@68: path = self._remap_input(name, path, *args, **kw) jpayne@68: return self._remap_output(name, original(path, *args, **kw)) jpayne@68: return original(path, *args, **kw) jpayne@68: jpayne@68: return wrap jpayne@68: jpayne@68: for __name in ['readlink', 'tempnam']: jpayne@68: if hasattr(_os, __name): jpayne@68: locals()[__name] = _mk_single_with_return(__name) jpayne@68: jpayne@68: def _mk_query(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@68: original = getattr(_os, name) jpayne@68: jpayne@68: def wrap(self, *args, **kw): jpayne@68: retval = original(*args, **kw) jpayne@68: if self._active: jpayne@68: return self._remap_output(name, retval) jpayne@68: return retval jpayne@68: jpayne@68: return wrap jpayne@68: jpayne@68: for __name in ['getcwd', 'tmpnam']: jpayne@68: if hasattr(_os, __name): jpayne@68: locals()[__name] = _mk_query(__name) jpayne@68: jpayne@68: def _validate_path(self, path): jpayne@68: """Called to remap or validate any path, whether input or output""" jpayne@68: return path jpayne@68: jpayne@68: def _remap_input(self, operation, path, *args, **kw): jpayne@68: """Called for path inputs""" jpayne@68: return self._validate_path(path) jpayne@68: jpayne@68: def _remap_output(self, operation, path): jpayne@68: """Called for path outputs""" jpayne@68: return self._validate_path(path) jpayne@68: jpayne@68: def _remap_pair(self, operation, src, dst, *args, **kw): jpayne@68: """Called for path pairs like rename, link, and symlink operations""" jpayne@68: return ( jpayne@68: self._remap_input(operation + '-from', src, *args, **kw), jpayne@68: self._remap_input(operation + '-to', dst, *args, **kw), jpayne@68: ) jpayne@68: jpayne@68: jpayne@68: if hasattr(os, 'devnull'): jpayne@68: _EXCEPTIONS = [os.devnull] jpayne@68: else: jpayne@68: _EXCEPTIONS = [] jpayne@68: jpayne@68: jpayne@68: class DirectorySandbox(AbstractSandbox): jpayne@68: """Restrict operations to a single subdirectory - pseudo-chroot""" jpayne@68: jpayne@68: write_ops: dict[str, None] = dict.fromkeys([ jpayne@68: "open", jpayne@68: "chmod", jpayne@68: "chown", jpayne@68: "mkdir", jpayne@68: "remove", jpayne@68: "unlink", jpayne@68: "rmdir", jpayne@68: "utime", jpayne@68: "lchown", jpayne@68: "chroot", jpayne@68: "mkfifo", jpayne@68: "mknod", jpayne@68: "tempnam", jpayne@68: ]) jpayne@68: jpayne@68: _exception_patterns: list[str | re.Pattern] = [] jpayne@68: "exempt writing to paths that match the pattern" jpayne@68: jpayne@68: def __init__(self, sandbox, exceptions=_EXCEPTIONS): jpayne@68: self._sandbox = os.path.normcase(os.path.realpath(sandbox)) jpayne@68: self._prefix = os.path.join(self._sandbox, '') jpayne@68: self._exceptions = [ jpayne@68: os.path.normcase(os.path.realpath(path)) for path in exceptions jpayne@68: ] jpayne@68: AbstractSandbox.__init__(self) jpayne@68: jpayne@68: def _violation(self, operation, *args, **kw): jpayne@68: from setuptools.sandbox import SandboxViolation jpayne@68: jpayne@68: raise SandboxViolation(operation, args, kw) jpayne@68: jpayne@68: def _open(self, path, mode='r', *args, **kw): jpayne@68: if mode not in ('r', 'rt', 'rb', 'rU', 'U') and not self._ok(path): jpayne@68: self._violation("open", path, mode, *args, **kw) jpayne@68: return _open(path, mode, *args, **kw) jpayne@68: jpayne@68: def tmpnam(self): jpayne@68: self._violation("tmpnam") jpayne@68: jpayne@68: def _ok(self, path): jpayne@68: active = self._active jpayne@68: try: jpayne@68: self._active = False jpayne@68: realpath = os.path.normcase(os.path.realpath(path)) jpayne@68: return ( jpayne@68: self._exempted(realpath) jpayne@68: or realpath == self._sandbox jpayne@68: or realpath.startswith(self._prefix) jpayne@68: ) jpayne@68: finally: jpayne@68: self._active = active jpayne@68: jpayne@68: def _exempted(self, filepath): jpayne@68: start_matches = ( jpayne@68: filepath.startswith(exception) for exception in self._exceptions jpayne@68: ) jpayne@68: pattern_matches = ( jpayne@68: re.match(pattern, filepath) for pattern in self._exception_patterns jpayne@68: ) jpayne@68: candidates = itertools.chain(start_matches, pattern_matches) jpayne@68: return any(candidates) jpayne@68: jpayne@68: def _remap_input(self, operation, path, *args, **kw): jpayne@68: """Called for path inputs""" jpayne@68: if operation in self.write_ops and not self._ok(path): jpayne@68: self._violation(operation, os.path.realpath(path), *args, **kw) jpayne@68: return path jpayne@68: jpayne@68: def _remap_pair(self, operation, src, dst, *args, **kw): jpayne@68: """Called for path pairs like rename, link, and symlink operations""" jpayne@68: if not self._ok(src) or not self._ok(dst): jpayne@68: self._violation(operation, src, dst, *args, **kw) jpayne@68: return (src, dst) jpayne@68: jpayne@68: def open(self, file, flags, mode: int = 0o777, *args, **kw): jpayne@68: """Called for low-level os.open()""" jpayne@68: if flags & WRITE_FLAGS and not self._ok(file): jpayne@68: self._violation("os.open", file, flags, mode, *args, **kw) jpayne@68: return _os.open(file, flags, mode, *args, **kw) jpayne@68: jpayne@68: jpayne@68: WRITE_FLAGS = functools.reduce( jpayne@68: operator.or_, jpayne@68: [ jpayne@68: getattr(_os, a, 0) jpayne@68: for a in "O_WRONLY O_RDWR O_APPEND O_CREAT O_TRUNC O_TEMPORARY".split() jpayne@68: ], jpayne@68: ) jpayne@68: jpayne@68: jpayne@68: class SandboxViolation(DistutilsError): jpayne@68: """A setup script attempted to modify the filesystem outside the sandbox""" jpayne@68: jpayne@68: tmpl = textwrap.dedent( jpayne@68: """ jpayne@68: SandboxViolation: {cmd}{args!r} {kwargs} jpayne@68: jpayne@68: The package setup script has attempted to modify files on your system jpayne@68: that are not within the EasyInstall build area, and has been aborted. jpayne@68: jpayne@68: This package cannot be safely installed by EasyInstall, and may not jpayne@68: support alternate installation locations even if you run its setup jpayne@68: script by hand. Please inform the package's author and the EasyInstall jpayne@68: maintainers to find out if a fix or workaround is available. jpayne@68: """ jpayne@68: ).lstrip() jpayne@68: jpayne@68: def __str__(self) -> str: jpayne@68: cmd, args, kwargs = self.args jpayne@68: return self.tmpl.format(**locals())