jpayne@68: ############################################################################### jpayne@68: # Server process to keep track of unlinked resources (like shared memory jpayne@68: # segments, semaphores etc.) and clean them. jpayne@68: # jpayne@68: # On Unix we run a server process which keeps track of unlinked jpayne@68: # resources. The server ignores SIGINT and SIGTERM and reads from a jpayne@68: # pipe. Every other process of the program has a copy of the writable jpayne@68: # end of the pipe, so we get EOF when all other processes have exited. jpayne@68: # Then the server process unlinks any remaining resource names. jpayne@68: # jpayne@68: # This is important because there may be system limits for such resources: for jpayne@68: # instance, the system only supports a limited number of named semaphores, and jpayne@68: # shared-memory segments live in the RAM. If a python process leaks such a jpayne@68: # resource, this resource will not be removed till the next reboot. Without jpayne@68: # this resource tracker process, "killall python" would probably leave unlinked jpayne@68: # resources. jpayne@68: jpayne@68: import os jpayne@68: import signal jpayne@68: import sys jpayne@68: import threading jpayne@68: import warnings jpayne@68: jpayne@68: from . import spawn jpayne@68: from . import util jpayne@68: jpayne@68: __all__ = ['ensure_running', 'register', 'unregister'] jpayne@68: jpayne@68: _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') jpayne@68: _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) jpayne@68: jpayne@68: _CLEANUP_FUNCS = { jpayne@68: 'noop': lambda: None, jpayne@68: } jpayne@68: jpayne@68: if os.name == 'posix': jpayne@68: import _multiprocessing jpayne@68: import _posixshmem jpayne@68: jpayne@68: _CLEANUP_FUNCS.update({ jpayne@68: 'semaphore': _multiprocessing.sem_unlink, jpayne@68: 'shared_memory': _posixshmem.shm_unlink, jpayne@68: }) jpayne@68: jpayne@68: jpayne@68: class ResourceTracker(object): jpayne@68: jpayne@68: def __init__(self): jpayne@68: self._lock = threading.Lock() jpayne@68: self._fd = None jpayne@68: self._pid = None jpayne@68: jpayne@68: def _stop(self): jpayne@68: with self._lock: jpayne@68: if self._fd is None: jpayne@68: # not running jpayne@68: return jpayne@68: jpayne@68: # closing the "alive" file descriptor stops main() jpayne@68: os.close(self._fd) jpayne@68: self._fd = None jpayne@68: jpayne@68: os.waitpid(self._pid, 0) jpayne@68: self._pid = None jpayne@68: jpayne@68: def getfd(self): jpayne@68: self.ensure_running() jpayne@68: return self._fd jpayne@68: jpayne@68: def ensure_running(self): jpayne@68: '''Make sure that resource tracker process is running. jpayne@68: jpayne@68: This can be run from any process. Usually a child process will use jpayne@68: the resource created by its parent.''' jpayne@68: with self._lock: jpayne@68: if self._fd is not None: jpayne@68: # resource tracker was launched before, is it still running? jpayne@68: if self._check_alive(): jpayne@68: # => still alive jpayne@68: return jpayne@68: # => dead, launch it again jpayne@68: os.close(self._fd) jpayne@68: jpayne@68: # Clean-up to avoid dangling processes. jpayne@68: try: jpayne@68: # _pid can be None if this process is a child from another jpayne@68: # python process, which has started the resource_tracker. jpayne@68: if self._pid is not None: jpayne@68: os.waitpid(self._pid, 0) jpayne@68: except ChildProcessError: jpayne@68: # The resource_tracker has already been terminated. jpayne@68: pass jpayne@68: self._fd = None jpayne@68: self._pid = None jpayne@68: jpayne@68: warnings.warn('resource_tracker: process died unexpectedly, ' jpayne@68: 'relaunching. Some resources might leak.') jpayne@68: jpayne@68: fds_to_pass = [] jpayne@68: try: jpayne@68: fds_to_pass.append(sys.stderr.fileno()) jpayne@68: except Exception: jpayne@68: pass jpayne@68: cmd = 'from multiprocessing.resource_tracker import main;main(%d)' jpayne@68: r, w = os.pipe() jpayne@68: try: jpayne@68: fds_to_pass.append(r) jpayne@68: # process will out live us, so no need to wait on pid jpayne@68: exe = spawn.get_executable() jpayne@68: args = [exe] + util._args_from_interpreter_flags() jpayne@68: args += ['-c', cmd % r] jpayne@68: # bpo-33613: Register a signal mask that will block the signals. jpayne@68: # This signal mask will be inherited by the child that is going jpayne@68: # to be spawned and will protect the child from a race condition jpayne@68: # that can make the child die before it registers signal handlers jpayne@68: # for SIGINT and SIGTERM. The mask is unregistered after spawning jpayne@68: # the child. jpayne@68: try: jpayne@68: if _HAVE_SIGMASK: jpayne@68: signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) jpayne@68: pid = util.spawnv_passfds(exe, args, fds_to_pass) jpayne@68: finally: jpayne@68: if _HAVE_SIGMASK: jpayne@68: signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) jpayne@68: except: jpayne@68: os.close(w) jpayne@68: raise jpayne@68: else: jpayne@68: self._fd = w jpayne@68: self._pid = pid jpayne@68: finally: jpayne@68: os.close(r) jpayne@68: jpayne@68: def _check_alive(self): jpayne@68: '''Check that the pipe has not been closed by sending a probe.''' jpayne@68: try: jpayne@68: # We cannot use send here as it calls ensure_running, creating jpayne@68: # a cycle. jpayne@68: os.write(self._fd, b'PROBE:0:noop\n') jpayne@68: except OSError: jpayne@68: return False jpayne@68: else: jpayne@68: return True jpayne@68: jpayne@68: def register(self, name, rtype): jpayne@68: '''Register name of resource with resource tracker.''' jpayne@68: self._send('REGISTER', name, rtype) jpayne@68: jpayne@68: def unregister(self, name, rtype): jpayne@68: '''Unregister name of resource with resource tracker.''' jpayne@68: self._send('UNREGISTER', name, rtype) jpayne@68: jpayne@68: def _send(self, cmd, name, rtype): jpayne@68: self.ensure_running() jpayne@68: msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') jpayne@68: if len(name) > 512: jpayne@68: # posix guarantees that writes to a pipe of less than PIPE_BUF jpayne@68: # bytes are atomic, and that PIPE_BUF >= 512 jpayne@68: raise ValueError('name too long') jpayne@68: nbytes = os.write(self._fd, msg) jpayne@68: assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( jpayne@68: nbytes, len(msg)) jpayne@68: jpayne@68: jpayne@68: _resource_tracker = ResourceTracker() jpayne@68: ensure_running = _resource_tracker.ensure_running jpayne@68: register = _resource_tracker.register jpayne@68: unregister = _resource_tracker.unregister jpayne@68: getfd = _resource_tracker.getfd jpayne@68: jpayne@68: def main(fd): jpayne@68: '''Run resource tracker.''' jpayne@68: # protect the process from ^C and "killall python" etc jpayne@68: signal.signal(signal.SIGINT, signal.SIG_IGN) jpayne@68: signal.signal(signal.SIGTERM, signal.SIG_IGN) jpayne@68: if _HAVE_SIGMASK: jpayne@68: signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) jpayne@68: jpayne@68: for f in (sys.stdin, sys.stdout): jpayne@68: try: jpayne@68: f.close() jpayne@68: except Exception: jpayne@68: pass jpayne@68: jpayne@68: cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} jpayne@68: try: jpayne@68: # keep track of registered/unregistered resources jpayne@68: with open(fd, 'rb') as f: jpayne@68: for line in f: jpayne@68: try: jpayne@68: cmd, name, rtype = line.strip().decode('ascii').split(':') jpayne@68: cleanup_func = _CLEANUP_FUNCS.get(rtype, None) jpayne@68: if cleanup_func is None: jpayne@68: raise ValueError( jpayne@68: f'Cannot register {name} for automatic cleanup: ' jpayne@68: f'unknown resource type {rtype}') jpayne@68: jpayne@68: if cmd == 'REGISTER': jpayne@68: cache[rtype].add(name) jpayne@68: elif cmd == 'UNREGISTER': jpayne@68: cache[rtype].remove(name) jpayne@68: elif cmd == 'PROBE': jpayne@68: pass jpayne@68: else: jpayne@68: raise RuntimeError('unrecognized command %r' % cmd) jpayne@68: except Exception: jpayne@68: try: jpayne@68: sys.excepthook(*sys.exc_info()) jpayne@68: except: jpayne@68: pass jpayne@68: finally: jpayne@68: # all processes have terminated; cleanup any remaining resources jpayne@68: for rtype, rtype_cache in cache.items(): jpayne@68: if rtype_cache: jpayne@68: try: jpayne@68: warnings.warn('resource_tracker: There appear to be %d ' jpayne@68: 'leaked %s objects to clean up at shutdown' % jpayne@68: (len(rtype_cache), rtype)) jpayne@68: except Exception: jpayne@68: pass jpayne@68: for name in rtype_cache: jpayne@68: # For some reason the process which created and registered this jpayne@68: # resource has failed to unregister it. Presumably it has jpayne@68: # died. We therefore unlink it. jpayne@68: try: jpayne@68: try: jpayne@68: _CLEANUP_FUNCS[rtype](name) jpayne@68: except Exception as e: jpayne@68: warnings.warn('resource_tracker: %r: %s' % (name, e)) jpayne@68: finally: jpayne@68: pass