jpayne@69: from __future__ import annotations jpayne@69: jpayne@69: import builtins jpayne@69: import contextlib jpayne@69: import functools jpayne@69: import itertools jpayne@69: import operator jpayne@69: import os jpayne@69: import pickle jpayne@69: import re jpayne@69: import sys jpayne@69: import tempfile jpayne@69: import textwrap jpayne@69: from types import TracebackType jpayne@69: from typing import TYPE_CHECKING jpayne@69: jpayne@69: import pkg_resources jpayne@69: from pkg_resources import working_set jpayne@69: jpayne@69: from distutils.errors import DistutilsError jpayne@69: jpayne@69: if sys.platform.startswith('java'): jpayne@69: import org.python.modules.posix.PosixModule as _os # pyright: ignore[reportMissingImports] jpayne@69: else: jpayne@69: _os = sys.modules[os.name] jpayne@69: _open = open jpayne@69: jpayne@69: jpayne@69: if TYPE_CHECKING: jpayne@69: from typing_extensions import Self jpayne@69: jpayne@69: __all__ = [ jpayne@69: "AbstractSandbox", jpayne@69: "DirectorySandbox", jpayne@69: "SandboxViolation", jpayne@69: "run_setup", jpayne@69: ] jpayne@69: jpayne@69: jpayne@69: def _execfile(filename, globals, locals=None): jpayne@69: """ jpayne@69: Python 3 implementation of execfile. jpayne@69: """ jpayne@69: mode = 'rb' jpayne@69: with open(filename, mode) as stream: jpayne@69: script = stream.read() jpayne@69: if locals is None: jpayne@69: locals = globals jpayne@69: code = compile(script, filename, 'exec') jpayne@69: exec(code, globals, locals) jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def save_argv(repl=None): jpayne@69: saved = sys.argv[:] jpayne@69: if repl is not None: jpayne@69: sys.argv[:] = repl jpayne@69: try: jpayne@69: yield saved jpayne@69: finally: jpayne@69: sys.argv[:] = saved jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def save_path(): jpayne@69: saved = sys.path[:] jpayne@69: try: jpayne@69: yield saved jpayne@69: finally: jpayne@69: sys.path[:] = saved jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def override_temp(replacement): jpayne@69: """ jpayne@69: Monkey-patch tempfile.tempdir with replacement, ensuring it exists jpayne@69: """ jpayne@69: os.makedirs(replacement, exist_ok=True) jpayne@69: jpayne@69: saved = tempfile.tempdir jpayne@69: jpayne@69: tempfile.tempdir = replacement jpayne@69: jpayne@69: try: jpayne@69: yield jpayne@69: finally: jpayne@69: tempfile.tempdir = saved jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def pushd(target): jpayne@69: saved = os.getcwd() jpayne@69: os.chdir(target) jpayne@69: try: jpayne@69: yield saved jpayne@69: finally: jpayne@69: os.chdir(saved) jpayne@69: jpayne@69: jpayne@69: class UnpickleableException(Exception): jpayne@69: """ jpayne@69: An exception representing another Exception that could not be pickled. jpayne@69: """ jpayne@69: jpayne@69: @staticmethod jpayne@69: def dump(type, exc): jpayne@69: """ jpayne@69: Always return a dumped (pickled) type and exc. If exc can't be pickled, jpayne@69: wrap it in UnpickleableException first. jpayne@69: """ jpayne@69: try: jpayne@69: return pickle.dumps(type), pickle.dumps(exc) jpayne@69: except Exception: jpayne@69: # get UnpickleableException inside the sandbox jpayne@69: from setuptools.sandbox import UnpickleableException as cls jpayne@69: jpayne@69: return cls.dump(cls, cls(repr(exc))) jpayne@69: jpayne@69: jpayne@69: class ExceptionSaver: jpayne@69: """ jpayne@69: A Context Manager that will save an exception, serialize, and restore it jpayne@69: later. jpayne@69: """ jpayne@69: jpayne@69: def __enter__(self) -> Self: jpayne@69: return self jpayne@69: jpayne@69: def __exit__( jpayne@69: self, jpayne@69: type: type[BaseException] | None, jpayne@69: exc: BaseException | None, jpayne@69: tb: TracebackType | None, jpayne@69: ) -> bool: jpayne@69: if not exc: jpayne@69: return False jpayne@69: jpayne@69: # dump the exception jpayne@69: self._saved = UnpickleableException.dump(type, exc) jpayne@69: self._tb = tb jpayne@69: jpayne@69: # suppress the exception jpayne@69: return True jpayne@69: jpayne@69: def resume(self): jpayne@69: "restore and re-raise any exception" jpayne@69: jpayne@69: if '_saved' not in vars(self): jpayne@69: return jpayne@69: jpayne@69: type, exc = map(pickle.loads, self._saved) jpayne@69: raise exc.with_traceback(self._tb) jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def save_modules(): jpayne@69: """ jpayne@69: Context in which imported modules are saved. jpayne@69: jpayne@69: Translates exceptions internal to the context into the equivalent exception jpayne@69: outside the context. jpayne@69: """ jpayne@69: saved = sys.modules.copy() jpayne@69: with ExceptionSaver() as saved_exc: jpayne@69: yield saved jpayne@69: jpayne@69: sys.modules.update(saved) jpayne@69: # remove any modules imported since jpayne@69: del_modules = ( jpayne@69: mod_name jpayne@69: for mod_name in sys.modules jpayne@69: if mod_name not in saved jpayne@69: # exclude any encodings modules. See #285 jpayne@69: and not mod_name.startswith('encodings.') jpayne@69: ) jpayne@69: _clear_modules(del_modules) jpayne@69: jpayne@69: saved_exc.resume() jpayne@69: jpayne@69: jpayne@69: def _clear_modules(module_names): jpayne@69: for mod_name in list(module_names): jpayne@69: del sys.modules[mod_name] jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def save_pkg_resources_state(): jpayne@69: saved = pkg_resources.__getstate__() jpayne@69: try: jpayne@69: yield saved jpayne@69: finally: jpayne@69: pkg_resources.__setstate__(saved) jpayne@69: jpayne@69: jpayne@69: @contextlib.contextmanager jpayne@69: def setup_context(setup_dir): jpayne@69: temp_dir = os.path.join(setup_dir, 'temp') jpayne@69: with save_pkg_resources_state(): jpayne@69: with save_modules(): jpayne@69: with save_path(): jpayne@69: hide_setuptools() jpayne@69: with save_argv(): jpayne@69: with override_temp(temp_dir): jpayne@69: with pushd(setup_dir): jpayne@69: # ensure setuptools commands are available jpayne@69: __import__('setuptools') jpayne@69: yield jpayne@69: jpayne@69: jpayne@69: _MODULES_TO_HIDE = { jpayne@69: 'setuptools', jpayne@69: 'distutils', jpayne@69: 'pkg_resources', jpayne@69: 'Cython', jpayne@69: '_distutils_hack', jpayne@69: } jpayne@69: jpayne@69: jpayne@69: def _needs_hiding(mod_name): jpayne@69: """ jpayne@69: >>> _needs_hiding('setuptools') jpayne@69: True jpayne@69: >>> _needs_hiding('pkg_resources') jpayne@69: True jpayne@69: >>> _needs_hiding('setuptools_plugin') jpayne@69: False jpayne@69: >>> _needs_hiding('setuptools.__init__') jpayne@69: True jpayne@69: >>> _needs_hiding('distutils') jpayne@69: True jpayne@69: >>> _needs_hiding('os') jpayne@69: False jpayne@69: >>> _needs_hiding('Cython') jpayne@69: True jpayne@69: """ jpayne@69: base_module = mod_name.split('.', 1)[0] jpayne@69: return base_module in _MODULES_TO_HIDE jpayne@69: jpayne@69: jpayne@69: def hide_setuptools(): jpayne@69: """ jpayne@69: Remove references to setuptools' modules from sys.modules to allow the jpayne@69: invocation to import the most appropriate setuptools. This technique is jpayne@69: necessary to avoid issues such as #315 where setuptools upgrading itself jpayne@69: would fail to find a function declared in the metadata. jpayne@69: """ jpayne@69: _distutils_hack = sys.modules.get('_distutils_hack', None) jpayne@69: if _distutils_hack is not None: jpayne@69: _distutils_hack._remove_shim() jpayne@69: jpayne@69: modules = filter(_needs_hiding, sys.modules) jpayne@69: _clear_modules(modules) jpayne@69: jpayne@69: jpayne@69: def run_setup(setup_script, args): jpayne@69: """Run a distutils setup script, sandboxed in its directory""" jpayne@69: setup_dir = os.path.abspath(os.path.dirname(setup_script)) jpayne@69: with setup_context(setup_dir): jpayne@69: try: jpayne@69: sys.argv[:] = [setup_script] + list(args) jpayne@69: sys.path.insert(0, setup_dir) jpayne@69: # reset to include setup dir, w/clean callback list jpayne@69: working_set.__init__() jpayne@69: working_set.callbacks.append(lambda dist: dist.activate()) jpayne@69: jpayne@69: with DirectorySandbox(setup_dir): jpayne@69: ns = dict(__file__=setup_script, __name__='__main__') jpayne@69: _execfile(setup_script, ns) jpayne@69: except SystemExit as v: jpayne@69: if v.args and v.args[0]: jpayne@69: raise jpayne@69: # Normal exit, just return jpayne@69: jpayne@69: jpayne@69: class AbstractSandbox: jpayne@69: """Wrap 'os' module and 'open()' builtin for virtualizing setup scripts""" jpayne@69: jpayne@69: _active = False jpayne@69: jpayne@69: def __init__(self): jpayne@69: self._attrs = [ jpayne@69: name jpayne@69: for name in dir(_os) jpayne@69: if not name.startswith('_') and hasattr(self, name) jpayne@69: ] jpayne@69: jpayne@69: def _copy(self, source): jpayne@69: for name in self._attrs: jpayne@69: setattr(os, name, getattr(source, name)) jpayne@69: jpayne@69: def __enter__(self) -> None: jpayne@69: self._copy(self) jpayne@69: builtins.open = self._open jpayne@69: self._active = True jpayne@69: jpayne@69: def __exit__( jpayne@69: self, jpayne@69: exc_type: type[BaseException] | None, jpayne@69: exc_value: BaseException | None, jpayne@69: traceback: TracebackType | None, jpayne@69: ): jpayne@69: self._active = False jpayne@69: builtins.open = _open jpayne@69: self._copy(_os) jpayne@69: jpayne@69: def run(self, func): jpayne@69: """Run 'func' under os sandboxing""" jpayne@69: with self: jpayne@69: return func() jpayne@69: jpayne@69: def _mk_dual_path_wrapper(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@69: original = getattr(_os, name) jpayne@69: jpayne@69: def wrap(self, src, dst, *args, **kw): jpayne@69: if self._active: jpayne@69: src, dst = self._remap_pair(name, src, dst, *args, **kw) jpayne@69: return original(src, dst, *args, **kw) jpayne@69: jpayne@69: return wrap jpayne@69: jpayne@69: for __name in ["rename", "link", "symlink"]: jpayne@69: if hasattr(_os, __name): jpayne@69: locals()[__name] = _mk_dual_path_wrapper(__name) jpayne@69: jpayne@69: def _mk_single_path_wrapper(name: str, original=None): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@69: original = original or getattr(_os, name) jpayne@69: jpayne@69: def wrap(self, path, *args, **kw): jpayne@69: if self._active: jpayne@69: path = self._remap_input(name, path, *args, **kw) jpayne@69: return original(path, *args, **kw) jpayne@69: jpayne@69: return wrap jpayne@69: jpayne@69: _open = _mk_single_path_wrapper('open', _open) jpayne@69: for __name in [ jpayne@69: "stat", jpayne@69: "listdir", jpayne@69: "chdir", jpayne@69: "open", jpayne@69: "chmod", jpayne@69: "chown", jpayne@69: "mkdir", jpayne@69: "remove", jpayne@69: "unlink", jpayne@69: "rmdir", jpayne@69: "utime", jpayne@69: "lchown", jpayne@69: "chroot", jpayne@69: "lstat", jpayne@69: "startfile", jpayne@69: "mkfifo", jpayne@69: "mknod", jpayne@69: "pathconf", jpayne@69: "access", jpayne@69: ]: jpayne@69: if hasattr(_os, __name): jpayne@69: locals()[__name] = _mk_single_path_wrapper(__name) jpayne@69: jpayne@69: def _mk_single_with_return(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@69: original = getattr(_os, name) jpayne@69: jpayne@69: def wrap(self, path, *args, **kw): jpayne@69: if self._active: jpayne@69: path = self._remap_input(name, path, *args, **kw) jpayne@69: return self._remap_output(name, original(path, *args, **kw)) jpayne@69: return original(path, *args, **kw) jpayne@69: jpayne@69: return wrap jpayne@69: jpayne@69: for __name in ['readlink', 'tempnam']: jpayne@69: if hasattr(_os, __name): jpayne@69: locals()[__name] = _mk_single_with_return(__name) jpayne@69: jpayne@69: def _mk_query(name: str): # type: ignore[misc] # https://github.com/pypa/setuptools/pull/4099 jpayne@69: original = getattr(_os, name) jpayne@69: jpayne@69: def wrap(self, *args, **kw): jpayne@69: retval = original(*args, **kw) jpayne@69: if self._active: jpayne@69: return self._remap_output(name, retval) jpayne@69: return retval jpayne@69: jpayne@69: return wrap jpayne@69: jpayne@69: for __name in ['getcwd', 'tmpnam']: jpayne@69: if hasattr(_os, __name): jpayne@69: locals()[__name] = _mk_query(__name) jpayne@69: jpayne@69: def _validate_path(self, path): jpayne@69: """Called to remap or validate any path, whether input or output""" jpayne@69: return path jpayne@69: jpayne@69: def _remap_input(self, operation, path, *args, **kw): jpayne@69: """Called for path inputs""" jpayne@69: return self._validate_path(path) jpayne@69: jpayne@69: def _remap_output(self, operation, path): jpayne@69: """Called for path outputs""" jpayne@69: return self._validate_path(path) jpayne@69: jpayne@69: def _remap_pair(self, operation, src, dst, *args, **kw): jpayne@69: """Called for path pairs like rename, link, and symlink operations""" jpayne@69: return ( jpayne@69: self._remap_input(operation + '-from', src, *args, **kw), jpayne@69: self._remap_input(operation + '-to', dst, *args, **kw), jpayne@69: ) jpayne@69: jpayne@69: jpayne@69: if hasattr(os, 'devnull'): jpayne@69: _EXCEPTIONS = [os.devnull] jpayne@69: else: jpayne@69: _EXCEPTIONS = [] jpayne@69: jpayne@69: jpayne@69: class DirectorySandbox(AbstractSandbox): jpayne@69: """Restrict operations to a single subdirectory - pseudo-chroot""" jpayne@69: jpayne@69: write_ops: dict[str, None] = dict.fromkeys([ jpayne@69: "open", jpayne@69: "chmod", jpayne@69: "chown", jpayne@69: "mkdir", jpayne@69: "remove", jpayne@69: "unlink", jpayne@69: "rmdir", jpayne@69: "utime", jpayne@69: "lchown", jpayne@69: "chroot", jpayne@69: "mkfifo", jpayne@69: "mknod", jpayne@69: "tempnam", jpayne@69: ]) jpayne@69: jpayne@69: _exception_patterns: list[str | re.Pattern] = [] jpayne@69: "exempt writing to paths that match the pattern" jpayne@69: jpayne@69: def __init__(self, sandbox, exceptions=_EXCEPTIONS): jpayne@69: self._sandbox = os.path.normcase(os.path.realpath(sandbox)) jpayne@69: self._prefix = os.path.join(self._sandbox, '') jpayne@69: self._exceptions = [ jpayne@69: os.path.normcase(os.path.realpath(path)) for path in exceptions jpayne@69: ] jpayne@69: AbstractSandbox.__init__(self) jpayne@69: jpayne@69: def _violation(self, operation, *args, **kw): jpayne@69: from setuptools.sandbox import SandboxViolation jpayne@69: jpayne@69: raise SandboxViolation(operation, args, kw) jpayne@69: jpayne@69: def _open(self, path, mode='r', *args, **kw): jpayne@69: if mode not in ('r', 'rt', 'rb', 'rU', 'U') and not self._ok(path): jpayne@69: self._violation("open", path, mode, *args, **kw) jpayne@69: return _open(path, mode, *args, **kw) jpayne@69: jpayne@69: def tmpnam(self): jpayne@69: self._violation("tmpnam") jpayne@69: jpayne@69: def _ok(self, path): jpayne@69: active = self._active jpayne@69: try: jpayne@69: self._active = False jpayne@69: realpath = os.path.normcase(os.path.realpath(path)) jpayne@69: return ( jpayne@69: self._exempted(realpath) jpayne@69: or realpath == self._sandbox jpayne@69: or realpath.startswith(self._prefix) jpayne@69: ) jpayne@69: finally: jpayne@69: self._active = active jpayne@69: jpayne@69: def _exempted(self, filepath): jpayne@69: start_matches = ( jpayne@69: filepath.startswith(exception) for exception in self._exceptions jpayne@69: ) jpayne@69: pattern_matches = ( jpayne@69: re.match(pattern, filepath) for pattern in self._exception_patterns jpayne@69: ) jpayne@69: candidates = itertools.chain(start_matches, pattern_matches) jpayne@69: return any(candidates) jpayne@69: jpayne@69: def _remap_input(self, operation, path, *args, **kw): jpayne@69: """Called for path inputs""" jpayne@69: if operation in self.write_ops and not self._ok(path): jpayne@69: self._violation(operation, os.path.realpath(path), *args, **kw) jpayne@69: return path jpayne@69: jpayne@69: def _remap_pair(self, operation, src, dst, *args, **kw): jpayne@69: """Called for path pairs like rename, link, and symlink operations""" jpayne@69: if not self._ok(src) or not self._ok(dst): jpayne@69: self._violation(operation, src, dst, *args, **kw) jpayne@69: return (src, dst) jpayne@69: jpayne@69: def open(self, file, flags, mode: int = 0o777, *args, **kw): jpayne@69: """Called for low-level os.open()""" jpayne@69: if flags & WRITE_FLAGS and not self._ok(file): jpayne@69: self._violation("os.open", file, flags, mode, *args, **kw) jpayne@69: return _os.open(file, flags, mode, *args, **kw) jpayne@69: jpayne@69: jpayne@69: WRITE_FLAGS = functools.reduce( jpayne@69: operator.or_, jpayne@69: [ jpayne@69: getattr(_os, a, 0) jpayne@69: for a in "O_WRONLY O_RDWR O_APPEND O_CREAT O_TRUNC O_TEMPORARY".split() jpayne@69: ], jpayne@69: ) jpayne@69: jpayne@69: jpayne@69: class SandboxViolation(DistutilsError): jpayne@69: """A setup script attempted to modify the filesystem outside the sandbox""" jpayne@69: jpayne@69: tmpl = textwrap.dedent( jpayne@69: """ jpayne@69: SandboxViolation: {cmd}{args!r} {kwargs} jpayne@69: jpayne@69: The package setup script has attempted to modify files on your system jpayne@69: that are not within the EasyInstall build area, and has been aborted. jpayne@69: jpayne@69: This package cannot be safely installed by EasyInstall, and may not jpayne@69: support alternate installation locations even if you run its setup jpayne@69: script by hand. Please inform the package's author and the EasyInstall jpayne@69: maintainers to find out if a fix or workaround is available. jpayne@69: """ jpayne@69: ).lstrip() jpayne@69: jpayne@69: def __str__(self) -> str: jpayne@69: cmd, args, kwargs = self.args jpayne@69: return self.tmpl.format(**locals())