jpayne@69: import os jpayne@69: import sys jpayne@69: import threading jpayne@69: jpayne@69: from . import process jpayne@69: from . import reduction jpayne@69: jpayne@69: __all__ = () jpayne@69: jpayne@69: # jpayne@69: # Exceptions jpayne@69: # jpayne@69: jpayne@69: class ProcessError(Exception): jpayne@69: pass jpayne@69: jpayne@69: class BufferTooShort(ProcessError): jpayne@69: pass jpayne@69: jpayne@69: class TimeoutError(ProcessError): jpayne@69: pass jpayne@69: jpayne@69: class AuthenticationError(ProcessError): jpayne@69: pass jpayne@69: jpayne@69: # jpayne@69: # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py jpayne@69: # jpayne@69: jpayne@69: class BaseContext(object): jpayne@69: jpayne@69: ProcessError = ProcessError jpayne@69: BufferTooShort = BufferTooShort jpayne@69: TimeoutError = TimeoutError jpayne@69: AuthenticationError = AuthenticationError jpayne@69: jpayne@69: current_process = staticmethod(process.current_process) jpayne@69: parent_process = staticmethod(process.parent_process) jpayne@69: active_children = staticmethod(process.active_children) jpayne@69: jpayne@69: def cpu_count(self): jpayne@69: '''Returns the number of CPUs in the system''' jpayne@69: num = os.cpu_count() jpayne@69: if num is None: jpayne@69: raise NotImplementedError('cannot determine number of cpus') jpayne@69: else: jpayne@69: return num jpayne@69: jpayne@69: def Manager(self): jpayne@69: '''Returns a manager associated with a running server process jpayne@69: jpayne@69: The managers methods such as `Lock()`, `Condition()` and `Queue()` jpayne@69: can be used to create shared objects. jpayne@69: ''' jpayne@69: from .managers import SyncManager jpayne@69: m = SyncManager(ctx=self.get_context()) jpayne@69: m.start() jpayne@69: return m jpayne@69: jpayne@69: def Pipe(self, duplex=True): jpayne@69: '''Returns two connection object connected by a pipe''' jpayne@69: from .connection import Pipe jpayne@69: return Pipe(duplex) jpayne@69: jpayne@69: def Lock(self): jpayne@69: '''Returns a non-recursive lock object''' jpayne@69: from .synchronize import Lock jpayne@69: return Lock(ctx=self.get_context()) jpayne@69: jpayne@69: def RLock(self): jpayne@69: '''Returns a recursive lock object''' jpayne@69: from .synchronize import RLock jpayne@69: return RLock(ctx=self.get_context()) jpayne@69: jpayne@69: def Condition(self, lock=None): jpayne@69: '''Returns a condition object''' jpayne@69: from .synchronize import Condition jpayne@69: return Condition(lock, ctx=self.get_context()) jpayne@69: jpayne@69: def Semaphore(self, value=1): jpayne@69: '''Returns a semaphore object''' jpayne@69: from .synchronize import Semaphore jpayne@69: return Semaphore(value, ctx=self.get_context()) jpayne@69: jpayne@69: def BoundedSemaphore(self, value=1): jpayne@69: '''Returns a bounded semaphore object''' jpayne@69: from .synchronize import BoundedSemaphore jpayne@69: return BoundedSemaphore(value, ctx=self.get_context()) jpayne@69: jpayne@69: def Event(self): jpayne@69: '''Returns an event object''' jpayne@69: from .synchronize import Event jpayne@69: return Event(ctx=self.get_context()) jpayne@69: jpayne@69: def Barrier(self, parties, action=None, timeout=None): jpayne@69: '''Returns a barrier object''' jpayne@69: from .synchronize import Barrier jpayne@69: return Barrier(parties, action, timeout, ctx=self.get_context()) jpayne@69: jpayne@69: def Queue(self, maxsize=0): jpayne@69: '''Returns a queue object''' jpayne@69: from .queues import Queue jpayne@69: return Queue(maxsize, ctx=self.get_context()) jpayne@69: jpayne@69: def JoinableQueue(self, maxsize=0): jpayne@69: '''Returns a queue object''' jpayne@69: from .queues import JoinableQueue jpayne@69: return JoinableQueue(maxsize, ctx=self.get_context()) jpayne@69: jpayne@69: def SimpleQueue(self): jpayne@69: '''Returns a queue object''' jpayne@69: from .queues import SimpleQueue jpayne@69: return SimpleQueue(ctx=self.get_context()) jpayne@69: jpayne@69: def Pool(self, processes=None, initializer=None, initargs=(), jpayne@69: maxtasksperchild=None): jpayne@69: '''Returns a process pool object''' jpayne@69: from .pool import Pool jpayne@69: return Pool(processes, initializer, initargs, maxtasksperchild, jpayne@69: context=self.get_context()) jpayne@69: jpayne@69: def RawValue(self, typecode_or_type, *args): jpayne@69: '''Returns a shared object''' jpayne@69: from .sharedctypes import RawValue jpayne@69: return RawValue(typecode_or_type, *args) jpayne@69: jpayne@69: def RawArray(self, typecode_or_type, size_or_initializer): jpayne@69: '''Returns a shared array''' jpayne@69: from .sharedctypes import RawArray jpayne@69: return RawArray(typecode_or_type, size_or_initializer) jpayne@69: jpayne@69: def Value(self, typecode_or_type, *args, lock=True): jpayne@69: '''Returns a synchronized shared object''' jpayne@69: from .sharedctypes import Value jpayne@69: return Value(typecode_or_type, *args, lock=lock, jpayne@69: ctx=self.get_context()) jpayne@69: jpayne@69: def Array(self, typecode_or_type, size_or_initializer, *, lock=True): jpayne@69: '''Returns a synchronized shared array''' jpayne@69: from .sharedctypes import Array jpayne@69: return Array(typecode_or_type, size_or_initializer, lock=lock, jpayne@69: ctx=self.get_context()) jpayne@69: jpayne@69: def freeze_support(self): jpayne@69: '''Check whether this is a fake forked process in a frozen executable. jpayne@69: If so then run code specified by commandline and exit. jpayne@69: ''' jpayne@69: if sys.platform == 'win32' and getattr(sys, 'frozen', False): jpayne@69: from .spawn import freeze_support jpayne@69: freeze_support() jpayne@69: jpayne@69: def get_logger(self): jpayne@69: '''Return package logger -- if it does not already exist then jpayne@69: it is created. jpayne@69: ''' jpayne@69: from .util import get_logger jpayne@69: return get_logger() jpayne@69: jpayne@69: def log_to_stderr(self, level=None): jpayne@69: '''Turn on logging and add a handler which prints to stderr''' jpayne@69: from .util import log_to_stderr jpayne@69: return log_to_stderr(level) jpayne@69: jpayne@69: def allow_connection_pickling(self): jpayne@69: '''Install support for sending connections and sockets jpayne@69: between processes jpayne@69: ''' jpayne@69: # This is undocumented. In previous versions of multiprocessing jpayne@69: # its only effect was to make socket objects inheritable on Windows. jpayne@69: from . import connection jpayne@69: jpayne@69: def set_executable(self, executable): jpayne@69: '''Sets the path to a python.exe or pythonw.exe binary used to run jpayne@69: child processes instead of sys.executable when using the 'spawn' jpayne@69: start method. Useful for people embedding Python. jpayne@69: ''' jpayne@69: from .spawn import set_executable jpayne@69: set_executable(executable) jpayne@69: jpayne@69: def set_forkserver_preload(self, module_names): jpayne@69: '''Set list of module names to try to load in forkserver process. jpayne@69: This is really just a hint. jpayne@69: ''' jpayne@69: from .forkserver import set_forkserver_preload jpayne@69: set_forkserver_preload(module_names) jpayne@69: jpayne@69: def get_context(self, method=None): jpayne@69: if method is None: jpayne@69: return self jpayne@69: try: jpayne@69: ctx = _concrete_contexts[method] jpayne@69: except KeyError: jpayne@69: raise ValueError('cannot find context for %r' % method) from None jpayne@69: ctx._check_available() jpayne@69: return ctx jpayne@69: jpayne@69: def get_start_method(self, allow_none=False): jpayne@69: return self._name jpayne@69: jpayne@69: def set_start_method(self, method, force=False): jpayne@69: raise ValueError('cannot set start method of concrete context') jpayne@69: jpayne@69: @property jpayne@69: def reducer(self): jpayne@69: '''Controls how objects will be reduced to a form that can be jpayne@69: shared with other processes.''' jpayne@69: return globals().get('reduction') jpayne@69: jpayne@69: @reducer.setter jpayne@69: def reducer(self, reduction): jpayne@69: globals()['reduction'] = reduction jpayne@69: jpayne@69: def _check_available(self): jpayne@69: pass jpayne@69: jpayne@69: # jpayne@69: # Type of default context -- underlying context can be set at most once jpayne@69: # jpayne@69: jpayne@69: class Process(process.BaseProcess): jpayne@69: _start_method = None jpayne@69: @staticmethod jpayne@69: def _Popen(process_obj): jpayne@69: return _default_context.get_context().Process._Popen(process_obj) jpayne@69: jpayne@69: class DefaultContext(BaseContext): jpayne@69: Process = Process jpayne@69: jpayne@69: def __init__(self, context): jpayne@69: self._default_context = context jpayne@69: self._actual_context = None jpayne@69: jpayne@69: def get_context(self, method=None): jpayne@69: if method is None: jpayne@69: if self._actual_context is None: jpayne@69: self._actual_context = self._default_context jpayne@69: return self._actual_context jpayne@69: else: jpayne@69: return super().get_context(method) jpayne@69: jpayne@69: def set_start_method(self, method, force=False): jpayne@69: if self._actual_context is not None and not force: jpayne@69: raise RuntimeError('context has already been set') jpayne@69: if method is None and force: jpayne@69: self._actual_context = None jpayne@69: return jpayne@69: self._actual_context = self.get_context(method) jpayne@69: jpayne@69: def get_start_method(self, allow_none=False): jpayne@69: if self._actual_context is None: jpayne@69: if allow_none: jpayne@69: return None jpayne@69: self._actual_context = self._default_context jpayne@69: return self._actual_context._name jpayne@69: jpayne@69: def get_all_start_methods(self): jpayne@69: if sys.platform == 'win32': jpayne@69: return ['spawn'] jpayne@69: else: jpayne@69: if reduction.HAVE_SEND_HANDLE: jpayne@69: return ['fork', 'spawn', 'forkserver'] jpayne@69: else: jpayne@69: return ['fork', 'spawn'] jpayne@69: jpayne@69: # jpayne@69: # Context types for fixed start method jpayne@69: # jpayne@69: jpayne@69: if sys.platform != 'win32': jpayne@69: jpayne@69: class ForkProcess(process.BaseProcess): jpayne@69: _start_method = 'fork' jpayne@69: @staticmethod jpayne@69: def _Popen(process_obj): jpayne@69: from .popen_fork import Popen jpayne@69: return Popen(process_obj) jpayne@69: jpayne@69: class SpawnProcess(process.BaseProcess): jpayne@69: _start_method = 'spawn' jpayne@69: @staticmethod jpayne@69: def _Popen(process_obj): jpayne@69: from .popen_spawn_posix import Popen jpayne@69: return Popen(process_obj) jpayne@69: jpayne@69: class ForkServerProcess(process.BaseProcess): jpayne@69: _start_method = 'forkserver' jpayne@69: @staticmethod jpayne@69: def _Popen(process_obj): jpayne@69: from .popen_forkserver import Popen jpayne@69: return Popen(process_obj) jpayne@69: jpayne@69: class ForkContext(BaseContext): jpayne@69: _name = 'fork' jpayne@69: Process = ForkProcess jpayne@69: jpayne@69: class SpawnContext(BaseContext): jpayne@69: _name = 'spawn' jpayne@69: Process = SpawnProcess jpayne@69: jpayne@69: class ForkServerContext(BaseContext): jpayne@69: _name = 'forkserver' jpayne@69: Process = ForkServerProcess jpayne@69: def _check_available(self): jpayne@69: if not reduction.HAVE_SEND_HANDLE: jpayne@69: raise ValueError('forkserver start method not available') jpayne@69: jpayne@69: _concrete_contexts = { jpayne@69: 'fork': ForkContext(), jpayne@69: 'spawn': SpawnContext(), jpayne@69: 'forkserver': ForkServerContext(), jpayne@69: } jpayne@69: if sys.platform == 'darwin': jpayne@69: # bpo-33725: running arbitrary code after fork() is no longer reliable jpayne@69: # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. jpayne@69: _default_context = DefaultContext(_concrete_contexts['spawn']) jpayne@69: else: jpayne@69: _default_context = DefaultContext(_concrete_contexts['fork']) jpayne@69: jpayne@69: else: jpayne@69: jpayne@69: class SpawnProcess(process.BaseProcess): jpayne@69: _start_method = 'spawn' jpayne@69: @staticmethod jpayne@69: def _Popen(process_obj): jpayne@69: from .popen_spawn_win32 import Popen jpayne@69: return Popen(process_obj) jpayne@69: jpayne@69: class SpawnContext(BaseContext): jpayne@69: _name = 'spawn' jpayne@69: Process = SpawnProcess jpayne@69: jpayne@69: _concrete_contexts = { jpayne@69: 'spawn': SpawnContext(), jpayne@69: } jpayne@69: _default_context = DefaultContext(_concrete_contexts['spawn']) jpayne@69: jpayne@69: # jpayne@69: # Force the start method jpayne@69: # jpayne@69: jpayne@69: def _force_start_method(method): jpayne@69: _default_context._actual_context = _concrete_contexts[method] jpayne@69: jpayne@69: # jpayne@69: # Check that the current thread is spawning a child process jpayne@69: # jpayne@69: jpayne@69: _tls = threading.local() jpayne@69: jpayne@69: def get_spawning_popen(): jpayne@69: return getattr(_tls, 'spawning_popen', None) jpayne@69: jpayne@69: def set_spawning_popen(popen): jpayne@69: _tls.spawning_popen = popen jpayne@69: jpayne@69: def assert_spawning(obj): jpayne@69: if get_spawning_popen() is None: jpayne@69: raise RuntimeError( jpayne@69: '%s objects should only be shared between processes' jpayne@69: ' through inheritance' % type(obj).__name__ jpayne@69: )