annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/context.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 import os
jpayne@68 2 import sys
jpayne@68 3 import threading
jpayne@68 4
jpayne@68 5 from . import process
jpayne@68 6 from . import reduction
jpayne@68 7
jpayne@68 8 __all__ = ()
jpayne@68 9
jpayne@68 10 #
jpayne@68 11 # Exceptions
jpayne@68 12 #
jpayne@68 13
jpayne@68 14 class ProcessError(Exception):
jpayne@68 15 pass
jpayne@68 16
jpayne@68 17 class BufferTooShort(ProcessError):
jpayne@68 18 pass
jpayne@68 19
jpayne@68 20 class TimeoutError(ProcessError):
jpayne@68 21 pass
jpayne@68 22
jpayne@68 23 class AuthenticationError(ProcessError):
jpayne@68 24 pass
jpayne@68 25
jpayne@68 26 #
jpayne@68 27 # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
jpayne@68 28 #
jpayne@68 29
jpayne@68 30 class BaseContext(object):
jpayne@68 31
jpayne@68 32 ProcessError = ProcessError
jpayne@68 33 BufferTooShort = BufferTooShort
jpayne@68 34 TimeoutError = TimeoutError
jpayne@68 35 AuthenticationError = AuthenticationError
jpayne@68 36
jpayne@68 37 current_process = staticmethod(process.current_process)
jpayne@68 38 parent_process = staticmethod(process.parent_process)
jpayne@68 39 active_children = staticmethod(process.active_children)
jpayne@68 40
jpayne@68 41 def cpu_count(self):
jpayne@68 42 '''Returns the number of CPUs in the system'''
jpayne@68 43 num = os.cpu_count()
jpayne@68 44 if num is None:
jpayne@68 45 raise NotImplementedError('cannot determine number of cpus')
jpayne@68 46 else:
jpayne@68 47 return num
jpayne@68 48
jpayne@68 49 def Manager(self):
jpayne@68 50 '''Returns a manager associated with a running server process
jpayne@68 51
jpayne@68 52 The managers methods such as `Lock()`, `Condition()` and `Queue()`
jpayne@68 53 can be used to create shared objects.
jpayne@68 54 '''
jpayne@68 55 from .managers import SyncManager
jpayne@68 56 m = SyncManager(ctx=self.get_context())
jpayne@68 57 m.start()
jpayne@68 58 return m
jpayne@68 59
jpayne@68 60 def Pipe(self, duplex=True):
jpayne@68 61 '''Returns two connection object connected by a pipe'''
jpayne@68 62 from .connection import Pipe
jpayne@68 63 return Pipe(duplex)
jpayne@68 64
jpayne@68 65 def Lock(self):
jpayne@68 66 '''Returns a non-recursive lock object'''
jpayne@68 67 from .synchronize import Lock
jpayne@68 68 return Lock(ctx=self.get_context())
jpayne@68 69
jpayne@68 70 def RLock(self):
jpayne@68 71 '''Returns a recursive lock object'''
jpayne@68 72 from .synchronize import RLock
jpayne@68 73 return RLock(ctx=self.get_context())
jpayne@68 74
jpayne@68 75 def Condition(self, lock=None):
jpayne@68 76 '''Returns a condition object'''
jpayne@68 77 from .synchronize import Condition
jpayne@68 78 return Condition(lock, ctx=self.get_context())
jpayne@68 79
jpayne@68 80 def Semaphore(self, value=1):
jpayne@68 81 '''Returns a semaphore object'''
jpayne@68 82 from .synchronize import Semaphore
jpayne@68 83 return Semaphore(value, ctx=self.get_context())
jpayne@68 84
jpayne@68 85 def BoundedSemaphore(self, value=1):
jpayne@68 86 '''Returns a bounded semaphore object'''
jpayne@68 87 from .synchronize import BoundedSemaphore
jpayne@68 88 return BoundedSemaphore(value, ctx=self.get_context())
jpayne@68 89
jpayne@68 90 def Event(self):
jpayne@68 91 '''Returns an event object'''
jpayne@68 92 from .synchronize import Event
jpayne@68 93 return Event(ctx=self.get_context())
jpayne@68 94
jpayne@68 95 def Barrier(self, parties, action=None, timeout=None):
jpayne@68 96 '''Returns a barrier object'''
jpayne@68 97 from .synchronize import Barrier
jpayne@68 98 return Barrier(parties, action, timeout, ctx=self.get_context())
jpayne@68 99
jpayne@68 100 def Queue(self, maxsize=0):
jpayne@68 101 '''Returns a queue object'''
jpayne@68 102 from .queues import Queue
jpayne@68 103 return Queue(maxsize, ctx=self.get_context())
jpayne@68 104
jpayne@68 105 def JoinableQueue(self, maxsize=0):
jpayne@68 106 '''Returns a queue object'''
jpayne@68 107 from .queues import JoinableQueue
jpayne@68 108 return JoinableQueue(maxsize, ctx=self.get_context())
jpayne@68 109
jpayne@68 110 def SimpleQueue(self):
jpayne@68 111 '''Returns a queue object'''
jpayne@68 112 from .queues import SimpleQueue
jpayne@68 113 return SimpleQueue(ctx=self.get_context())
jpayne@68 114
jpayne@68 115 def Pool(self, processes=None, initializer=None, initargs=(),
jpayne@68 116 maxtasksperchild=None):
jpayne@68 117 '''Returns a process pool object'''
jpayne@68 118 from .pool import Pool
jpayne@68 119 return Pool(processes, initializer, initargs, maxtasksperchild,
jpayne@68 120 context=self.get_context())
jpayne@68 121
jpayne@68 122 def RawValue(self, typecode_or_type, *args):
jpayne@68 123 '''Returns a shared object'''
jpayne@68 124 from .sharedctypes import RawValue
jpayne@68 125 return RawValue(typecode_or_type, *args)
jpayne@68 126
jpayne@68 127 def RawArray(self, typecode_or_type, size_or_initializer):
jpayne@68 128 '''Returns a shared array'''
jpayne@68 129 from .sharedctypes import RawArray
jpayne@68 130 return RawArray(typecode_or_type, size_or_initializer)
jpayne@68 131
jpayne@68 132 def Value(self, typecode_or_type, *args, lock=True):
jpayne@68 133 '''Returns a synchronized shared object'''
jpayne@68 134 from .sharedctypes import Value
jpayne@68 135 return Value(typecode_or_type, *args, lock=lock,
jpayne@68 136 ctx=self.get_context())
jpayne@68 137
jpayne@68 138 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
jpayne@68 139 '''Returns a synchronized shared array'''
jpayne@68 140 from .sharedctypes import Array
jpayne@68 141 return Array(typecode_or_type, size_or_initializer, lock=lock,
jpayne@68 142 ctx=self.get_context())
jpayne@68 143
jpayne@68 144 def freeze_support(self):
jpayne@68 145 '''Check whether this is a fake forked process in a frozen executable.
jpayne@68 146 If so then run code specified by commandline and exit.
jpayne@68 147 '''
jpayne@68 148 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
jpayne@68 149 from .spawn import freeze_support
jpayne@68 150 freeze_support()
jpayne@68 151
jpayne@68 152 def get_logger(self):
jpayne@68 153 '''Return package logger -- if it does not already exist then
jpayne@68 154 it is created.
jpayne@68 155 '''
jpayne@68 156 from .util import get_logger
jpayne@68 157 return get_logger()
jpayne@68 158
jpayne@68 159 def log_to_stderr(self, level=None):
jpayne@68 160 '''Turn on logging and add a handler which prints to stderr'''
jpayne@68 161 from .util import log_to_stderr
jpayne@68 162 return log_to_stderr(level)
jpayne@68 163
jpayne@68 164 def allow_connection_pickling(self):
jpayne@68 165 '''Install support for sending connections and sockets
jpayne@68 166 between processes
jpayne@68 167 '''
jpayne@68 168 # This is undocumented. In previous versions of multiprocessing
jpayne@68 169 # its only effect was to make socket objects inheritable on Windows.
jpayne@68 170 from . import connection
jpayne@68 171
jpayne@68 172 def set_executable(self, executable):
jpayne@68 173 '''Sets the path to a python.exe or pythonw.exe binary used to run
jpayne@68 174 child processes instead of sys.executable when using the 'spawn'
jpayne@68 175 start method. Useful for people embedding Python.
jpayne@68 176 '''
jpayne@68 177 from .spawn import set_executable
jpayne@68 178 set_executable(executable)
jpayne@68 179
jpayne@68 180 def set_forkserver_preload(self, module_names):
jpayne@68 181 '''Set list of module names to try to load in forkserver process.
jpayne@68 182 This is really just a hint.
jpayne@68 183 '''
jpayne@68 184 from .forkserver import set_forkserver_preload
jpayne@68 185 set_forkserver_preload(module_names)
jpayne@68 186
jpayne@68 187 def get_context(self, method=None):
jpayne@68 188 if method is None:
jpayne@68 189 return self
jpayne@68 190 try:
jpayne@68 191 ctx = _concrete_contexts[method]
jpayne@68 192 except KeyError:
jpayne@68 193 raise ValueError('cannot find context for %r' % method) from None
jpayne@68 194 ctx._check_available()
jpayne@68 195 return ctx
jpayne@68 196
jpayne@68 197 def get_start_method(self, allow_none=False):
jpayne@68 198 return self._name
jpayne@68 199
jpayne@68 200 def set_start_method(self, method, force=False):
jpayne@68 201 raise ValueError('cannot set start method of concrete context')
jpayne@68 202
jpayne@68 203 @property
jpayne@68 204 def reducer(self):
jpayne@68 205 '''Controls how objects will be reduced to a form that can be
jpayne@68 206 shared with other processes.'''
jpayne@68 207 return globals().get('reduction')
jpayne@68 208
jpayne@68 209 @reducer.setter
jpayne@68 210 def reducer(self, reduction):
jpayne@68 211 globals()['reduction'] = reduction
jpayne@68 212
jpayne@68 213 def _check_available(self):
jpayne@68 214 pass
jpayne@68 215
jpayne@68 216 #
jpayne@68 217 # Type of default context -- underlying context can be set at most once
jpayne@68 218 #
jpayne@68 219
jpayne@68 220 class Process(process.BaseProcess):
jpayne@68 221 _start_method = None
jpayne@68 222 @staticmethod
jpayne@68 223 def _Popen(process_obj):
jpayne@68 224 return _default_context.get_context().Process._Popen(process_obj)
jpayne@68 225
jpayne@68 226 class DefaultContext(BaseContext):
jpayne@68 227 Process = Process
jpayne@68 228
jpayne@68 229 def __init__(self, context):
jpayne@68 230 self._default_context = context
jpayne@68 231 self._actual_context = None
jpayne@68 232
jpayne@68 233 def get_context(self, method=None):
jpayne@68 234 if method is None:
jpayne@68 235 if self._actual_context is None:
jpayne@68 236 self._actual_context = self._default_context
jpayne@68 237 return self._actual_context
jpayne@68 238 else:
jpayne@68 239 return super().get_context(method)
jpayne@68 240
jpayne@68 241 def set_start_method(self, method, force=False):
jpayne@68 242 if self._actual_context is not None and not force:
jpayne@68 243 raise RuntimeError('context has already been set')
jpayne@68 244 if method is None and force:
jpayne@68 245 self._actual_context = None
jpayne@68 246 return
jpayne@68 247 self._actual_context = self.get_context(method)
jpayne@68 248
jpayne@68 249 def get_start_method(self, allow_none=False):
jpayne@68 250 if self._actual_context is None:
jpayne@68 251 if allow_none:
jpayne@68 252 return None
jpayne@68 253 self._actual_context = self._default_context
jpayne@68 254 return self._actual_context._name
jpayne@68 255
jpayne@68 256 def get_all_start_methods(self):
jpayne@68 257 if sys.platform == 'win32':
jpayne@68 258 return ['spawn']
jpayne@68 259 else:
jpayne@68 260 if reduction.HAVE_SEND_HANDLE:
jpayne@68 261 return ['fork', 'spawn', 'forkserver']
jpayne@68 262 else:
jpayne@68 263 return ['fork', 'spawn']
jpayne@68 264
jpayne@68 265 #
jpayne@68 266 # Context types for fixed start method
jpayne@68 267 #
jpayne@68 268
jpayne@68 269 if sys.platform != 'win32':
jpayne@68 270
jpayne@68 271 class ForkProcess(process.BaseProcess):
jpayne@68 272 _start_method = 'fork'
jpayne@68 273 @staticmethod
jpayne@68 274 def _Popen(process_obj):
jpayne@68 275 from .popen_fork import Popen
jpayne@68 276 return Popen(process_obj)
jpayne@68 277
jpayne@68 278 class SpawnProcess(process.BaseProcess):
jpayne@68 279 _start_method = 'spawn'
jpayne@68 280 @staticmethod
jpayne@68 281 def _Popen(process_obj):
jpayne@68 282 from .popen_spawn_posix import Popen
jpayne@68 283 return Popen(process_obj)
jpayne@68 284
jpayne@68 285 class ForkServerProcess(process.BaseProcess):
jpayne@68 286 _start_method = 'forkserver'
jpayne@68 287 @staticmethod
jpayne@68 288 def _Popen(process_obj):
jpayne@68 289 from .popen_forkserver import Popen
jpayne@68 290 return Popen(process_obj)
jpayne@68 291
jpayne@68 292 class ForkContext(BaseContext):
jpayne@68 293 _name = 'fork'
jpayne@68 294 Process = ForkProcess
jpayne@68 295
jpayne@68 296 class SpawnContext(BaseContext):
jpayne@68 297 _name = 'spawn'
jpayne@68 298 Process = SpawnProcess
jpayne@68 299
jpayne@68 300 class ForkServerContext(BaseContext):
jpayne@68 301 _name = 'forkserver'
jpayne@68 302 Process = ForkServerProcess
jpayne@68 303 def _check_available(self):
jpayne@68 304 if not reduction.HAVE_SEND_HANDLE:
jpayne@68 305 raise ValueError('forkserver start method not available')
jpayne@68 306
jpayne@68 307 _concrete_contexts = {
jpayne@68 308 'fork': ForkContext(),
jpayne@68 309 'spawn': SpawnContext(),
jpayne@68 310 'forkserver': ForkServerContext(),
jpayne@68 311 }
jpayne@68 312 if sys.platform == 'darwin':
jpayne@68 313 # bpo-33725: running arbitrary code after fork() is no longer reliable
jpayne@68 314 # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
jpayne@68 315 _default_context = DefaultContext(_concrete_contexts['spawn'])
jpayne@68 316 else:
jpayne@68 317 _default_context = DefaultContext(_concrete_contexts['fork'])
jpayne@68 318
jpayne@68 319 else:
jpayne@68 320
jpayne@68 321 class SpawnProcess(process.BaseProcess):
jpayne@68 322 _start_method = 'spawn'
jpayne@68 323 @staticmethod
jpayne@68 324 def _Popen(process_obj):
jpayne@68 325 from .popen_spawn_win32 import Popen
jpayne@68 326 return Popen(process_obj)
jpayne@68 327
jpayne@68 328 class SpawnContext(BaseContext):
jpayne@68 329 _name = 'spawn'
jpayne@68 330 Process = SpawnProcess
jpayne@68 331
jpayne@68 332 _concrete_contexts = {
jpayne@68 333 'spawn': SpawnContext(),
jpayne@68 334 }
jpayne@68 335 _default_context = DefaultContext(_concrete_contexts['spawn'])
jpayne@68 336
jpayne@68 337 #
jpayne@68 338 # Force the start method
jpayne@68 339 #
jpayne@68 340
jpayne@68 341 def _force_start_method(method):
jpayne@68 342 _default_context._actual_context = _concrete_contexts[method]
jpayne@68 343
jpayne@68 344 #
jpayne@68 345 # Check that the current thread is spawning a child process
jpayne@68 346 #
jpayne@68 347
jpayne@68 348 _tls = threading.local()
jpayne@68 349
jpayne@68 350 def get_spawning_popen():
jpayne@68 351 return getattr(_tls, 'spawning_popen', None)
jpayne@68 352
jpayne@68 353 def set_spawning_popen(popen):
jpayne@68 354 _tls.spawning_popen = popen
jpayne@68 355
jpayne@68 356 def assert_spawning(obj):
jpayne@68 357 if get_spawning_popen() is None:
jpayne@68 358 raise RuntimeError(
jpayne@68 359 '%s objects should only be shared between processes'
jpayne@68 360 ' through inheritance' % type(obj).__name__
jpayne@68 361 )