jpayne@68: import os jpayne@68: import sys jpayne@68: import threading jpayne@68: jpayne@68: from . import process jpayne@68: from . import reduction jpayne@68: jpayne@68: __all__ = () jpayne@68: jpayne@68: # jpayne@68: # Exceptions jpayne@68: # jpayne@68: jpayne@68: class ProcessError(Exception): jpayne@68: pass jpayne@68: jpayne@68: class BufferTooShort(ProcessError): jpayne@68: pass jpayne@68: jpayne@68: class TimeoutError(ProcessError): jpayne@68: pass jpayne@68: jpayne@68: class AuthenticationError(ProcessError): jpayne@68: pass jpayne@68: jpayne@68: # jpayne@68: # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py jpayne@68: # jpayne@68: jpayne@68: class BaseContext(object): jpayne@68: jpayne@68: ProcessError = ProcessError jpayne@68: BufferTooShort = BufferTooShort jpayne@68: TimeoutError = TimeoutError jpayne@68: AuthenticationError = AuthenticationError jpayne@68: jpayne@68: current_process = staticmethod(process.current_process) jpayne@68: parent_process = staticmethod(process.parent_process) jpayne@68: active_children = staticmethod(process.active_children) jpayne@68: jpayne@68: def cpu_count(self): jpayne@68: '''Returns the number of CPUs in the system''' jpayne@68: num = os.cpu_count() jpayne@68: if num is None: jpayne@68: raise NotImplementedError('cannot determine number of cpus') jpayne@68: else: jpayne@68: return num jpayne@68: jpayne@68: def Manager(self): jpayne@68: '''Returns a manager associated with a running server process jpayne@68: jpayne@68: The managers methods such as `Lock()`, `Condition()` and `Queue()` jpayne@68: can be used to create shared objects. jpayne@68: ''' jpayne@68: from .managers import SyncManager jpayne@68: m = SyncManager(ctx=self.get_context()) jpayne@68: m.start() jpayne@68: return m jpayne@68: jpayne@68: def Pipe(self, duplex=True): jpayne@68: '''Returns two connection object connected by a pipe''' jpayne@68: from .connection import Pipe jpayne@68: return Pipe(duplex) jpayne@68: jpayne@68: def Lock(self): jpayne@68: '''Returns a non-recursive lock object''' jpayne@68: from .synchronize import Lock jpayne@68: return Lock(ctx=self.get_context()) jpayne@68: jpayne@68: def RLock(self): jpayne@68: '''Returns a recursive lock object''' jpayne@68: from .synchronize import RLock jpayne@68: return RLock(ctx=self.get_context()) jpayne@68: jpayne@68: def Condition(self, lock=None): jpayne@68: '''Returns a condition object''' jpayne@68: from .synchronize import Condition jpayne@68: return Condition(lock, ctx=self.get_context()) jpayne@68: jpayne@68: def Semaphore(self, value=1): jpayne@68: '''Returns a semaphore object''' jpayne@68: from .synchronize import Semaphore jpayne@68: return Semaphore(value, ctx=self.get_context()) jpayne@68: jpayne@68: def BoundedSemaphore(self, value=1): jpayne@68: '''Returns a bounded semaphore object''' jpayne@68: from .synchronize import BoundedSemaphore jpayne@68: return BoundedSemaphore(value, ctx=self.get_context()) jpayne@68: jpayne@68: def Event(self): jpayne@68: '''Returns an event object''' jpayne@68: from .synchronize import Event jpayne@68: return Event(ctx=self.get_context()) jpayne@68: jpayne@68: def Barrier(self, parties, action=None, timeout=None): jpayne@68: '''Returns a barrier object''' jpayne@68: from .synchronize import Barrier jpayne@68: return Barrier(parties, action, timeout, ctx=self.get_context()) jpayne@68: jpayne@68: def Queue(self, maxsize=0): jpayne@68: '''Returns a queue object''' jpayne@68: from .queues import Queue jpayne@68: return Queue(maxsize, ctx=self.get_context()) jpayne@68: jpayne@68: def JoinableQueue(self, maxsize=0): jpayne@68: '''Returns a queue object''' jpayne@68: from .queues import JoinableQueue jpayne@68: return JoinableQueue(maxsize, ctx=self.get_context()) jpayne@68: jpayne@68: def SimpleQueue(self): jpayne@68: '''Returns a queue object''' jpayne@68: from .queues import SimpleQueue jpayne@68: return SimpleQueue(ctx=self.get_context()) jpayne@68: jpayne@68: def Pool(self, processes=None, initializer=None, initargs=(), jpayne@68: maxtasksperchild=None): jpayne@68: '''Returns a process pool object''' jpayne@68: from .pool import Pool jpayne@68: return Pool(processes, initializer, initargs, maxtasksperchild, jpayne@68: context=self.get_context()) jpayne@68: jpayne@68: def RawValue(self, typecode_or_type, *args): jpayne@68: '''Returns a shared object''' jpayne@68: from .sharedctypes import RawValue jpayne@68: return RawValue(typecode_or_type, *args) jpayne@68: jpayne@68: def RawArray(self, typecode_or_type, size_or_initializer): jpayne@68: '''Returns a shared array''' jpayne@68: from .sharedctypes import RawArray jpayne@68: return RawArray(typecode_or_type, size_or_initializer) jpayne@68: jpayne@68: def Value(self, typecode_or_type, *args, lock=True): jpayne@68: '''Returns a synchronized shared object''' jpayne@68: from .sharedctypes import Value jpayne@68: return Value(typecode_or_type, *args, lock=lock, jpayne@68: ctx=self.get_context()) jpayne@68: jpayne@68: def Array(self, typecode_or_type, size_or_initializer, *, lock=True): jpayne@68: '''Returns a synchronized shared array''' jpayne@68: from .sharedctypes import Array jpayne@68: return Array(typecode_or_type, size_or_initializer, lock=lock, jpayne@68: ctx=self.get_context()) jpayne@68: jpayne@68: def freeze_support(self): jpayne@68: '''Check whether this is a fake forked process in a frozen executable. jpayne@68: If so then run code specified by commandline and exit. jpayne@68: ''' jpayne@68: if sys.platform == 'win32' and getattr(sys, 'frozen', False): jpayne@68: from .spawn import freeze_support jpayne@68: freeze_support() jpayne@68: jpayne@68: def get_logger(self): jpayne@68: '''Return package logger -- if it does not already exist then jpayne@68: it is created. jpayne@68: ''' jpayne@68: from .util import get_logger jpayne@68: return get_logger() jpayne@68: jpayne@68: def log_to_stderr(self, level=None): jpayne@68: '''Turn on logging and add a handler which prints to stderr''' jpayne@68: from .util import log_to_stderr jpayne@68: return log_to_stderr(level) jpayne@68: jpayne@68: def allow_connection_pickling(self): jpayne@68: '''Install support for sending connections and sockets jpayne@68: between processes jpayne@68: ''' jpayne@68: # This is undocumented. In previous versions of multiprocessing jpayne@68: # its only effect was to make socket objects inheritable on Windows. jpayne@68: from . import connection jpayne@68: jpayne@68: def set_executable(self, executable): jpayne@68: '''Sets the path to a python.exe or pythonw.exe binary used to run jpayne@68: child processes instead of sys.executable when using the 'spawn' jpayne@68: start method. Useful for people embedding Python. jpayne@68: ''' jpayne@68: from .spawn import set_executable jpayne@68: set_executable(executable) jpayne@68: jpayne@68: def set_forkserver_preload(self, module_names): jpayne@68: '''Set list of module names to try to load in forkserver process. jpayne@68: This is really just a hint. jpayne@68: ''' jpayne@68: from .forkserver import set_forkserver_preload jpayne@68: set_forkserver_preload(module_names) jpayne@68: jpayne@68: def get_context(self, method=None): jpayne@68: if method is None: jpayne@68: return self jpayne@68: try: jpayne@68: ctx = _concrete_contexts[method] jpayne@68: except KeyError: jpayne@68: raise ValueError('cannot find context for %r' % method) from None jpayne@68: ctx._check_available() jpayne@68: return ctx jpayne@68: jpayne@68: def get_start_method(self, allow_none=False): jpayne@68: return self._name jpayne@68: jpayne@68: def set_start_method(self, method, force=False): jpayne@68: raise ValueError('cannot set start method of concrete context') jpayne@68: jpayne@68: @property jpayne@68: def reducer(self): jpayne@68: '''Controls how objects will be reduced to a form that can be jpayne@68: shared with other processes.''' jpayne@68: return globals().get('reduction') jpayne@68: jpayne@68: @reducer.setter jpayne@68: def reducer(self, reduction): jpayne@68: globals()['reduction'] = reduction jpayne@68: jpayne@68: def _check_available(self): jpayne@68: pass jpayne@68: jpayne@68: # jpayne@68: # Type of default context -- underlying context can be set at most once jpayne@68: # jpayne@68: jpayne@68: class Process(process.BaseProcess): jpayne@68: _start_method = None jpayne@68: @staticmethod jpayne@68: def _Popen(process_obj): jpayne@68: return _default_context.get_context().Process._Popen(process_obj) jpayne@68: jpayne@68: class DefaultContext(BaseContext): jpayne@68: Process = Process jpayne@68: jpayne@68: def __init__(self, context): jpayne@68: self._default_context = context jpayne@68: self._actual_context = None jpayne@68: jpayne@68: def get_context(self, method=None): jpayne@68: if method is None: jpayne@68: if self._actual_context is None: jpayne@68: self._actual_context = self._default_context jpayne@68: return self._actual_context jpayne@68: else: jpayne@68: return super().get_context(method) jpayne@68: jpayne@68: def set_start_method(self, method, force=False): jpayne@68: if self._actual_context is not None and not force: jpayne@68: raise RuntimeError('context has already been set') jpayne@68: if method is None and force: jpayne@68: self._actual_context = None jpayne@68: return jpayne@68: self._actual_context = self.get_context(method) jpayne@68: jpayne@68: def get_start_method(self, allow_none=False): jpayne@68: if self._actual_context is None: jpayne@68: if allow_none: jpayne@68: return None jpayne@68: self._actual_context = self._default_context jpayne@68: return self._actual_context._name jpayne@68: jpayne@68: def get_all_start_methods(self): jpayne@68: if sys.platform == 'win32': jpayne@68: return ['spawn'] jpayne@68: else: jpayne@68: if reduction.HAVE_SEND_HANDLE: jpayne@68: return ['fork', 'spawn', 'forkserver'] jpayne@68: else: jpayne@68: return ['fork', 'spawn'] jpayne@68: jpayne@68: # jpayne@68: # Context types for fixed start method jpayne@68: # jpayne@68: jpayne@68: if sys.platform != 'win32': jpayne@68: jpayne@68: class ForkProcess(process.BaseProcess): jpayne@68: _start_method = 'fork' jpayne@68: @staticmethod jpayne@68: def _Popen(process_obj): jpayne@68: from .popen_fork import Popen jpayne@68: return Popen(process_obj) jpayne@68: jpayne@68: class SpawnProcess(process.BaseProcess): jpayne@68: _start_method = 'spawn' jpayne@68: @staticmethod jpayne@68: def _Popen(process_obj): jpayne@68: from .popen_spawn_posix import Popen jpayne@68: return Popen(process_obj) jpayne@68: jpayne@68: class ForkServerProcess(process.BaseProcess): jpayne@68: _start_method = 'forkserver' jpayne@68: @staticmethod jpayne@68: def _Popen(process_obj): jpayne@68: from .popen_forkserver import Popen jpayne@68: return Popen(process_obj) jpayne@68: jpayne@68: class ForkContext(BaseContext): jpayne@68: _name = 'fork' jpayne@68: Process = ForkProcess jpayne@68: jpayne@68: class SpawnContext(BaseContext): jpayne@68: _name = 'spawn' jpayne@68: Process = SpawnProcess jpayne@68: jpayne@68: class ForkServerContext(BaseContext): jpayne@68: _name = 'forkserver' jpayne@68: Process = ForkServerProcess jpayne@68: def _check_available(self): jpayne@68: if not reduction.HAVE_SEND_HANDLE: jpayne@68: raise ValueError('forkserver start method not available') jpayne@68: jpayne@68: _concrete_contexts = { jpayne@68: 'fork': ForkContext(), jpayne@68: 'spawn': SpawnContext(), jpayne@68: 'forkserver': ForkServerContext(), jpayne@68: } jpayne@68: if sys.platform == 'darwin': jpayne@68: # bpo-33725: running arbitrary code after fork() is no longer reliable jpayne@68: # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. jpayne@68: _default_context = DefaultContext(_concrete_contexts['spawn']) jpayne@68: else: jpayne@68: _default_context = DefaultContext(_concrete_contexts['fork']) jpayne@68: jpayne@68: else: jpayne@68: jpayne@68: class SpawnProcess(process.BaseProcess): jpayne@68: _start_method = 'spawn' jpayne@68: @staticmethod jpayne@68: def _Popen(process_obj): jpayne@68: from .popen_spawn_win32 import Popen jpayne@68: return Popen(process_obj) jpayne@68: jpayne@68: class SpawnContext(BaseContext): jpayne@68: _name = 'spawn' jpayne@68: Process = SpawnProcess jpayne@68: jpayne@68: _concrete_contexts = { jpayne@68: 'spawn': SpawnContext(), jpayne@68: } jpayne@68: _default_context = DefaultContext(_concrete_contexts['spawn']) jpayne@68: jpayne@68: # jpayne@68: # Force the start method jpayne@68: # jpayne@68: jpayne@68: def _force_start_method(method): jpayne@68: _default_context._actual_context = _concrete_contexts[method] jpayne@68: jpayne@68: # jpayne@68: # Check that the current thread is spawning a child process jpayne@68: # jpayne@68: jpayne@68: _tls = threading.local() jpayne@68: jpayne@68: def get_spawning_popen(): jpayne@68: return getattr(_tls, 'spawning_popen', None) jpayne@68: jpayne@68: def set_spawning_popen(popen): jpayne@68: _tls.spawning_popen = popen jpayne@68: jpayne@68: def assert_spawning(obj): jpayne@68: if get_spawning_popen() is None: jpayne@68: raise RuntimeError( jpayne@68: '%s objects should only be shared between processes' jpayne@68: ' through inheritance' % type(obj).__name__ jpayne@68: )