jpayne@68: # jpayne@68: # Module providing various facilities to other parts of the package jpayne@68: # jpayne@68: # multiprocessing/util.py jpayne@68: # jpayne@68: # Copyright (c) 2006-2008, R Oudkerk jpayne@68: # Licensed to PSF under a Contributor Agreement. jpayne@68: # jpayne@68: jpayne@68: import os jpayne@68: import itertools jpayne@68: import sys jpayne@68: import weakref jpayne@68: import atexit jpayne@68: import threading # we want threading to install it's jpayne@68: # cleanup function before multiprocessing does jpayne@68: from subprocess import _args_from_interpreter_flags jpayne@68: jpayne@68: from . import process jpayne@68: jpayne@68: __all__ = [ jpayne@68: 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', jpayne@68: 'log_to_stderr', 'get_temp_dir', 'register_after_fork', jpayne@68: 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', jpayne@68: 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', jpayne@68: ] jpayne@68: jpayne@68: # jpayne@68: # Logging jpayne@68: # jpayne@68: jpayne@68: NOTSET = 0 jpayne@68: SUBDEBUG = 5 jpayne@68: DEBUG = 10 jpayne@68: INFO = 20 jpayne@68: SUBWARNING = 25 jpayne@68: jpayne@68: LOGGER_NAME = 'multiprocessing' jpayne@68: DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' jpayne@68: jpayne@68: _logger = None jpayne@68: _log_to_stderr = False jpayne@68: jpayne@68: def sub_debug(msg, *args): jpayne@68: if _logger: jpayne@68: _logger.log(SUBDEBUG, msg, *args) jpayne@68: jpayne@68: def debug(msg, *args): jpayne@68: if _logger: jpayne@68: _logger.log(DEBUG, msg, *args) jpayne@68: jpayne@68: def info(msg, *args): jpayne@68: if _logger: jpayne@68: _logger.log(INFO, msg, *args) jpayne@68: jpayne@68: def sub_warning(msg, *args): jpayne@68: if _logger: jpayne@68: _logger.log(SUBWARNING, msg, *args) jpayne@68: jpayne@68: def get_logger(): jpayne@68: ''' jpayne@68: Returns logger used by multiprocessing jpayne@68: ''' jpayne@68: global _logger jpayne@68: import logging jpayne@68: jpayne@68: logging._acquireLock() jpayne@68: try: jpayne@68: if not _logger: jpayne@68: jpayne@68: _logger = logging.getLogger(LOGGER_NAME) jpayne@68: _logger.propagate = 0 jpayne@68: jpayne@68: # XXX multiprocessing should cleanup before logging jpayne@68: if hasattr(atexit, 'unregister'): jpayne@68: atexit.unregister(_exit_function) jpayne@68: atexit.register(_exit_function) jpayne@68: else: jpayne@68: atexit._exithandlers.remove((_exit_function, (), {})) jpayne@68: atexit._exithandlers.append((_exit_function, (), {})) jpayne@68: jpayne@68: finally: jpayne@68: logging._releaseLock() jpayne@68: jpayne@68: return _logger jpayne@68: jpayne@68: def log_to_stderr(level=None): jpayne@68: ''' jpayne@68: Turn on logging and add a handler which prints to stderr jpayne@68: ''' jpayne@68: global _log_to_stderr jpayne@68: import logging jpayne@68: jpayne@68: logger = get_logger() jpayne@68: formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) jpayne@68: handler = logging.StreamHandler() jpayne@68: handler.setFormatter(formatter) jpayne@68: logger.addHandler(handler) jpayne@68: jpayne@68: if level: jpayne@68: logger.setLevel(level) jpayne@68: _log_to_stderr = True jpayne@68: return _logger jpayne@68: jpayne@68: # jpayne@68: # Function returning a temp directory which will be removed on exit jpayne@68: # jpayne@68: jpayne@68: def _remove_temp_dir(rmtree, tempdir): jpayne@68: rmtree(tempdir) jpayne@68: jpayne@68: current_process = process.current_process() jpayne@68: # current_process() can be None if the finalizer is called jpayne@68: # late during Python finalization jpayne@68: if current_process is not None: jpayne@68: current_process._config['tempdir'] = None jpayne@68: jpayne@68: def get_temp_dir(): jpayne@68: # get name of a temp directory which will be automatically cleaned up jpayne@68: tempdir = process.current_process()._config.get('tempdir') jpayne@68: if tempdir is None: jpayne@68: import shutil, tempfile jpayne@68: tempdir = tempfile.mkdtemp(prefix='pymp-') jpayne@68: info('created temp directory %s', tempdir) jpayne@68: # keep a strong reference to shutil.rmtree(), since the finalizer jpayne@68: # can be called late during Python shutdown jpayne@68: Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), jpayne@68: exitpriority=-100) jpayne@68: process.current_process()._config['tempdir'] = tempdir jpayne@68: return tempdir jpayne@68: jpayne@68: # jpayne@68: # Support for reinitialization of objects when bootstrapping a child process jpayne@68: # jpayne@68: jpayne@68: _afterfork_registry = weakref.WeakValueDictionary() jpayne@68: _afterfork_counter = itertools.count() jpayne@68: jpayne@68: def _run_after_forkers(): jpayne@68: items = list(_afterfork_registry.items()) jpayne@68: items.sort() jpayne@68: for (index, ident, func), obj in items: jpayne@68: try: jpayne@68: func(obj) jpayne@68: except Exception as e: jpayne@68: info('after forker raised exception %s', e) jpayne@68: jpayne@68: def register_after_fork(obj, func): jpayne@68: _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj jpayne@68: jpayne@68: # jpayne@68: # Finalization using weakrefs jpayne@68: # jpayne@68: jpayne@68: _finalizer_registry = {} jpayne@68: _finalizer_counter = itertools.count() jpayne@68: jpayne@68: jpayne@68: class Finalize(object): jpayne@68: ''' jpayne@68: Class which supports object finalization using weakrefs jpayne@68: ''' jpayne@68: def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): jpayne@68: if (exitpriority is not None) and not isinstance(exitpriority,int): jpayne@68: raise TypeError( jpayne@68: "Exitpriority ({0!r}) must be None or int, not {1!s}".format( jpayne@68: exitpriority, type(exitpriority))) jpayne@68: jpayne@68: if obj is not None: jpayne@68: self._weakref = weakref.ref(obj, self) jpayne@68: elif exitpriority is None: jpayne@68: raise ValueError("Without object, exitpriority cannot be None") jpayne@68: jpayne@68: self._callback = callback jpayne@68: self._args = args jpayne@68: self._kwargs = kwargs or {} jpayne@68: self._key = (exitpriority, next(_finalizer_counter)) jpayne@68: self._pid = os.getpid() jpayne@68: jpayne@68: _finalizer_registry[self._key] = self jpayne@68: jpayne@68: def __call__(self, wr=None, jpayne@68: # Need to bind these locally because the globals can have jpayne@68: # been cleared at shutdown jpayne@68: _finalizer_registry=_finalizer_registry, jpayne@68: sub_debug=sub_debug, getpid=os.getpid): jpayne@68: ''' jpayne@68: Run the callback unless it has already been called or cancelled jpayne@68: ''' jpayne@68: try: jpayne@68: del _finalizer_registry[self._key] jpayne@68: except KeyError: jpayne@68: sub_debug('finalizer no longer registered') jpayne@68: else: jpayne@68: if self._pid != getpid(): jpayne@68: sub_debug('finalizer ignored because different process') jpayne@68: res = None jpayne@68: else: jpayne@68: sub_debug('finalizer calling %s with args %s and kwargs %s', jpayne@68: self._callback, self._args, self._kwargs) jpayne@68: res = self._callback(*self._args, **self._kwargs) jpayne@68: self._weakref = self._callback = self._args = \ jpayne@68: self._kwargs = self._key = None jpayne@68: return res jpayne@68: jpayne@68: def cancel(self): jpayne@68: ''' jpayne@68: Cancel finalization of the object jpayne@68: ''' jpayne@68: try: jpayne@68: del _finalizer_registry[self._key] jpayne@68: except KeyError: jpayne@68: pass jpayne@68: else: jpayne@68: self._weakref = self._callback = self._args = \ jpayne@68: self._kwargs = self._key = None jpayne@68: jpayne@68: def still_active(self): jpayne@68: ''' jpayne@68: Return whether this finalizer is still waiting to invoke callback jpayne@68: ''' jpayne@68: return self._key in _finalizer_registry jpayne@68: jpayne@68: def __repr__(self): jpayne@68: try: jpayne@68: obj = self._weakref() jpayne@68: except (AttributeError, TypeError): jpayne@68: obj = None jpayne@68: jpayne@68: if obj is None: jpayne@68: return '<%s object, dead>' % self.__class__.__name__ jpayne@68: jpayne@68: x = '<%s object, callback=%s' % ( jpayne@68: self.__class__.__name__, jpayne@68: getattr(self._callback, '__name__', self._callback)) jpayne@68: if self._args: jpayne@68: x += ', args=' + str(self._args) jpayne@68: if self._kwargs: jpayne@68: x += ', kwargs=' + str(self._kwargs) jpayne@68: if self._key[0] is not None: jpayne@68: x += ', exitpriority=' + str(self._key[0]) jpayne@68: return x + '>' jpayne@68: jpayne@68: jpayne@68: def _run_finalizers(minpriority=None): jpayne@68: ''' jpayne@68: Run all finalizers whose exit priority is not None and at least minpriority jpayne@68: jpayne@68: Finalizers with highest priority are called first; finalizers with jpayne@68: the same priority will be called in reverse order of creation. jpayne@68: ''' jpayne@68: if _finalizer_registry is None: jpayne@68: # This function may be called after this module's globals are jpayne@68: # destroyed. See the _exit_function function in this module for more jpayne@68: # notes. jpayne@68: return jpayne@68: jpayne@68: if minpriority is None: jpayne@68: f = lambda p : p[0] is not None jpayne@68: else: jpayne@68: f = lambda p : p[0] is not None and p[0] >= minpriority jpayne@68: jpayne@68: # Careful: _finalizer_registry may be mutated while this function jpayne@68: # is running (either by a GC run or by another thread). jpayne@68: jpayne@68: # list(_finalizer_registry) should be atomic, while jpayne@68: # list(_finalizer_registry.items()) is not. jpayne@68: keys = [key for key in list(_finalizer_registry) if f(key)] jpayne@68: keys.sort(reverse=True) jpayne@68: jpayne@68: for key in keys: jpayne@68: finalizer = _finalizer_registry.get(key) jpayne@68: # key may have been removed from the registry jpayne@68: if finalizer is not None: jpayne@68: sub_debug('calling %s', finalizer) jpayne@68: try: jpayne@68: finalizer() jpayne@68: except Exception: jpayne@68: import traceback jpayne@68: traceback.print_exc() jpayne@68: jpayne@68: if minpriority is None: jpayne@68: _finalizer_registry.clear() jpayne@68: jpayne@68: # jpayne@68: # Clean up on exit jpayne@68: # jpayne@68: jpayne@68: def is_exiting(): jpayne@68: ''' jpayne@68: Returns true if the process is shutting down jpayne@68: ''' jpayne@68: return _exiting or _exiting is None jpayne@68: jpayne@68: _exiting = False jpayne@68: jpayne@68: def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, jpayne@68: active_children=process.active_children, jpayne@68: current_process=process.current_process): jpayne@68: # We hold on to references to functions in the arglist due to the jpayne@68: # situation described below, where this function is called after this jpayne@68: # module's globals are destroyed. jpayne@68: jpayne@68: global _exiting jpayne@68: jpayne@68: if not _exiting: jpayne@68: _exiting = True jpayne@68: jpayne@68: info('process shutting down') jpayne@68: debug('running all "atexit" finalizers with priority >= 0') jpayne@68: _run_finalizers(0) jpayne@68: jpayne@68: if current_process() is not None: jpayne@68: # We check if the current process is None here because if jpayne@68: # it's None, any call to ``active_children()`` will raise jpayne@68: # an AttributeError (active_children winds up trying to jpayne@68: # get attributes from util._current_process). One jpayne@68: # situation where this can happen is if someone has jpayne@68: # manipulated sys.modules, causing this module to be jpayne@68: # garbage collected. The destructor for the module type jpayne@68: # then replaces all values in the module dict with None. jpayne@68: # For instance, after setuptools runs a test it replaces jpayne@68: # sys.modules with a copy created earlier. See issues jpayne@68: # #9775 and #15881. Also related: #4106, #9205, and jpayne@68: # #9207. jpayne@68: jpayne@68: for p in active_children(): jpayne@68: if p.daemon: jpayne@68: info('calling terminate() for daemon %s', p.name) jpayne@68: p._popen.terminate() jpayne@68: jpayne@68: for p in active_children(): jpayne@68: info('calling join() for process %s', p.name) jpayne@68: p.join() jpayne@68: jpayne@68: debug('running the remaining "atexit" finalizers') jpayne@68: _run_finalizers() jpayne@68: jpayne@68: atexit.register(_exit_function) jpayne@68: jpayne@68: # jpayne@68: # Some fork aware types jpayne@68: # jpayne@68: jpayne@68: class ForkAwareThreadLock(object): jpayne@68: def __init__(self): jpayne@68: self._reset() jpayne@68: register_after_fork(self, ForkAwareThreadLock._reset) jpayne@68: jpayne@68: def _reset(self): jpayne@68: self._lock = threading.Lock() jpayne@68: self.acquire = self._lock.acquire jpayne@68: self.release = self._lock.release jpayne@68: jpayne@68: def __enter__(self): jpayne@68: return self._lock.__enter__() jpayne@68: jpayne@68: def __exit__(self, *args): jpayne@68: return self._lock.__exit__(*args) jpayne@68: jpayne@68: jpayne@68: class ForkAwareLocal(threading.local): jpayne@68: def __init__(self): jpayne@68: register_after_fork(self, lambda obj : obj.__dict__.clear()) jpayne@68: def __reduce__(self): jpayne@68: return type(self), () jpayne@68: jpayne@68: # jpayne@68: # Close fds except those specified jpayne@68: # jpayne@68: jpayne@68: try: jpayne@68: MAXFD = os.sysconf("SC_OPEN_MAX") jpayne@68: except Exception: jpayne@68: MAXFD = 256 jpayne@68: jpayne@68: def close_all_fds_except(fds): jpayne@68: fds = list(fds) + [-1, MAXFD] jpayne@68: fds.sort() jpayne@68: assert fds[-1] == MAXFD, 'fd too large' jpayne@68: for i in range(len(fds) - 1): jpayne@68: os.closerange(fds[i]+1, fds[i+1]) jpayne@68: # jpayne@68: # Close sys.stdin and replace stdin with os.devnull jpayne@68: # jpayne@68: jpayne@68: def _close_stdin(): jpayne@68: if sys.stdin is None: jpayne@68: return jpayne@68: jpayne@68: try: jpayne@68: sys.stdin.close() jpayne@68: except (OSError, ValueError): jpayne@68: pass jpayne@68: jpayne@68: try: jpayne@68: fd = os.open(os.devnull, os.O_RDONLY) jpayne@68: try: jpayne@68: sys.stdin = open(fd, closefd=False) jpayne@68: except: jpayne@68: os.close(fd) jpayne@68: raise jpayne@68: except (OSError, ValueError): jpayne@68: pass jpayne@68: jpayne@68: # jpayne@68: # Flush standard streams, if any jpayne@68: # jpayne@68: jpayne@68: def _flush_std_streams(): jpayne@68: try: jpayne@68: sys.stdout.flush() jpayne@68: except (AttributeError, ValueError): jpayne@68: pass jpayne@68: try: jpayne@68: sys.stderr.flush() jpayne@68: except (AttributeError, ValueError): jpayne@68: pass jpayne@68: jpayne@68: # jpayne@68: # Start a program with only specified fds kept open jpayne@68: # jpayne@68: jpayne@68: def spawnv_passfds(path, args, passfds): jpayne@68: import _posixsubprocess jpayne@68: passfds = tuple(sorted(map(int, passfds))) jpayne@68: errpipe_read, errpipe_write = os.pipe() jpayne@68: try: jpayne@68: return _posixsubprocess.fork_exec( jpayne@68: args, [os.fsencode(path)], True, passfds, None, None, jpayne@68: -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, jpayne@68: False, False, None) jpayne@68: finally: jpayne@68: os.close(errpipe_read) jpayne@68: os.close(errpipe_write) jpayne@68: jpayne@68: jpayne@68: def close_fds(*fds): jpayne@68: """Close each file descriptor given as an argument""" jpayne@68: for fd in fds: jpayne@68: os.close(fd) jpayne@68: jpayne@68: jpayne@68: def _cleanup_tests(): jpayne@68: """Cleanup multiprocessing resources when multiprocessing tests jpayne@68: completed.""" jpayne@68: jpayne@68: from test import support jpayne@68: jpayne@68: # cleanup multiprocessing jpayne@68: process._cleanup() jpayne@68: jpayne@68: # Stop the ForkServer process if it's running jpayne@68: from multiprocessing import forkserver jpayne@68: forkserver._forkserver._stop() jpayne@68: jpayne@68: # Stop the ResourceTracker process if it's running jpayne@68: from multiprocessing import resource_tracker jpayne@68: resource_tracker._resource_tracker._stop() jpayne@68: jpayne@68: # bpo-37421: Explicitly call _run_finalizers() to remove immediately jpayne@68: # temporary directories created by multiprocessing.util.get_temp_dir(). jpayne@68: _run_finalizers() jpayne@68: support.gc_collect() jpayne@68: jpayne@68: support.reap_children()