annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/context.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 import os
jpayne@69 2 import sys
jpayne@69 3 import threading
jpayne@69 4
jpayne@69 5 from . import process
jpayne@69 6 from . import reduction
jpayne@69 7
jpayne@69 8 __all__ = ()
jpayne@69 9
jpayne@69 10 #
jpayne@69 11 # Exceptions
jpayne@69 12 #
jpayne@69 13
jpayne@69 14 class ProcessError(Exception):
jpayne@69 15 pass
jpayne@69 16
jpayne@69 17 class BufferTooShort(ProcessError):
jpayne@69 18 pass
jpayne@69 19
jpayne@69 20 class TimeoutError(ProcessError):
jpayne@69 21 pass
jpayne@69 22
jpayne@69 23 class AuthenticationError(ProcessError):
jpayne@69 24 pass
jpayne@69 25
jpayne@69 26 #
jpayne@69 27 # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
jpayne@69 28 #
jpayne@69 29
jpayne@69 30 class BaseContext(object):
jpayne@69 31
jpayne@69 32 ProcessError = ProcessError
jpayne@69 33 BufferTooShort = BufferTooShort
jpayne@69 34 TimeoutError = TimeoutError
jpayne@69 35 AuthenticationError = AuthenticationError
jpayne@69 36
jpayne@69 37 current_process = staticmethod(process.current_process)
jpayne@69 38 parent_process = staticmethod(process.parent_process)
jpayne@69 39 active_children = staticmethod(process.active_children)
jpayne@69 40
jpayne@69 41 def cpu_count(self):
jpayne@69 42 '''Returns the number of CPUs in the system'''
jpayne@69 43 num = os.cpu_count()
jpayne@69 44 if num is None:
jpayne@69 45 raise NotImplementedError('cannot determine number of cpus')
jpayne@69 46 else:
jpayne@69 47 return num
jpayne@69 48
jpayne@69 49 def Manager(self):
jpayne@69 50 '''Returns a manager associated with a running server process
jpayne@69 51
jpayne@69 52 The managers methods such as `Lock()`, `Condition()` and `Queue()`
jpayne@69 53 can be used to create shared objects.
jpayne@69 54 '''
jpayne@69 55 from .managers import SyncManager
jpayne@69 56 m = SyncManager(ctx=self.get_context())
jpayne@69 57 m.start()
jpayne@69 58 return m
jpayne@69 59
jpayne@69 60 def Pipe(self, duplex=True):
jpayne@69 61 '''Returns two connection object connected by a pipe'''
jpayne@69 62 from .connection import Pipe
jpayne@69 63 return Pipe(duplex)
jpayne@69 64
jpayne@69 65 def Lock(self):
jpayne@69 66 '''Returns a non-recursive lock object'''
jpayne@69 67 from .synchronize import Lock
jpayne@69 68 return Lock(ctx=self.get_context())
jpayne@69 69
jpayne@69 70 def RLock(self):
jpayne@69 71 '''Returns a recursive lock object'''
jpayne@69 72 from .synchronize import RLock
jpayne@69 73 return RLock(ctx=self.get_context())
jpayne@69 74
jpayne@69 75 def Condition(self, lock=None):
jpayne@69 76 '''Returns a condition object'''
jpayne@69 77 from .synchronize import Condition
jpayne@69 78 return Condition(lock, ctx=self.get_context())
jpayne@69 79
jpayne@69 80 def Semaphore(self, value=1):
jpayne@69 81 '''Returns a semaphore object'''
jpayne@69 82 from .synchronize import Semaphore
jpayne@69 83 return Semaphore(value, ctx=self.get_context())
jpayne@69 84
jpayne@69 85 def BoundedSemaphore(self, value=1):
jpayne@69 86 '''Returns a bounded semaphore object'''
jpayne@69 87 from .synchronize import BoundedSemaphore
jpayne@69 88 return BoundedSemaphore(value, ctx=self.get_context())
jpayne@69 89
jpayne@69 90 def Event(self):
jpayne@69 91 '''Returns an event object'''
jpayne@69 92 from .synchronize import Event
jpayne@69 93 return Event(ctx=self.get_context())
jpayne@69 94
jpayne@69 95 def Barrier(self, parties, action=None, timeout=None):
jpayne@69 96 '''Returns a barrier object'''
jpayne@69 97 from .synchronize import Barrier
jpayne@69 98 return Barrier(parties, action, timeout, ctx=self.get_context())
jpayne@69 99
jpayne@69 100 def Queue(self, maxsize=0):
jpayne@69 101 '''Returns a queue object'''
jpayne@69 102 from .queues import Queue
jpayne@69 103 return Queue(maxsize, ctx=self.get_context())
jpayne@69 104
jpayne@69 105 def JoinableQueue(self, maxsize=0):
jpayne@69 106 '''Returns a queue object'''
jpayne@69 107 from .queues import JoinableQueue
jpayne@69 108 return JoinableQueue(maxsize, ctx=self.get_context())
jpayne@69 109
jpayne@69 110 def SimpleQueue(self):
jpayne@69 111 '''Returns a queue object'''
jpayne@69 112 from .queues import SimpleQueue
jpayne@69 113 return SimpleQueue(ctx=self.get_context())
jpayne@69 114
jpayne@69 115 def Pool(self, processes=None, initializer=None, initargs=(),
jpayne@69 116 maxtasksperchild=None):
jpayne@69 117 '''Returns a process pool object'''
jpayne@69 118 from .pool import Pool
jpayne@69 119 return Pool(processes, initializer, initargs, maxtasksperchild,
jpayne@69 120 context=self.get_context())
jpayne@69 121
jpayne@69 122 def RawValue(self, typecode_or_type, *args):
jpayne@69 123 '''Returns a shared object'''
jpayne@69 124 from .sharedctypes import RawValue
jpayne@69 125 return RawValue(typecode_or_type, *args)
jpayne@69 126
jpayne@69 127 def RawArray(self, typecode_or_type, size_or_initializer):
jpayne@69 128 '''Returns a shared array'''
jpayne@69 129 from .sharedctypes import RawArray
jpayne@69 130 return RawArray(typecode_or_type, size_or_initializer)
jpayne@69 131
jpayne@69 132 def Value(self, typecode_or_type, *args, lock=True):
jpayne@69 133 '''Returns a synchronized shared object'''
jpayne@69 134 from .sharedctypes import Value
jpayne@69 135 return Value(typecode_or_type, *args, lock=lock,
jpayne@69 136 ctx=self.get_context())
jpayne@69 137
jpayne@69 138 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
jpayne@69 139 '''Returns a synchronized shared array'''
jpayne@69 140 from .sharedctypes import Array
jpayne@69 141 return Array(typecode_or_type, size_or_initializer, lock=lock,
jpayne@69 142 ctx=self.get_context())
jpayne@69 143
jpayne@69 144 def freeze_support(self):
jpayne@69 145 '''Check whether this is a fake forked process in a frozen executable.
jpayne@69 146 If so then run code specified by commandline and exit.
jpayne@69 147 '''
jpayne@69 148 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
jpayne@69 149 from .spawn import freeze_support
jpayne@69 150 freeze_support()
jpayne@69 151
jpayne@69 152 def get_logger(self):
jpayne@69 153 '''Return package logger -- if it does not already exist then
jpayne@69 154 it is created.
jpayne@69 155 '''
jpayne@69 156 from .util import get_logger
jpayne@69 157 return get_logger()
jpayne@69 158
jpayne@69 159 def log_to_stderr(self, level=None):
jpayne@69 160 '''Turn on logging and add a handler which prints to stderr'''
jpayne@69 161 from .util import log_to_stderr
jpayne@69 162 return log_to_stderr(level)
jpayne@69 163
jpayne@69 164 def allow_connection_pickling(self):
jpayne@69 165 '''Install support for sending connections and sockets
jpayne@69 166 between processes
jpayne@69 167 '''
jpayne@69 168 # This is undocumented. In previous versions of multiprocessing
jpayne@69 169 # its only effect was to make socket objects inheritable on Windows.
jpayne@69 170 from . import connection
jpayne@69 171
jpayne@69 172 def set_executable(self, executable):
jpayne@69 173 '''Sets the path to a python.exe or pythonw.exe binary used to run
jpayne@69 174 child processes instead of sys.executable when using the 'spawn'
jpayne@69 175 start method. Useful for people embedding Python.
jpayne@69 176 '''
jpayne@69 177 from .spawn import set_executable
jpayne@69 178 set_executable(executable)
jpayne@69 179
jpayne@69 180 def set_forkserver_preload(self, module_names):
jpayne@69 181 '''Set list of module names to try to load in forkserver process.
jpayne@69 182 This is really just a hint.
jpayne@69 183 '''
jpayne@69 184 from .forkserver import set_forkserver_preload
jpayne@69 185 set_forkserver_preload(module_names)
jpayne@69 186
jpayne@69 187 def get_context(self, method=None):
jpayne@69 188 if method is None:
jpayne@69 189 return self
jpayne@69 190 try:
jpayne@69 191 ctx = _concrete_contexts[method]
jpayne@69 192 except KeyError:
jpayne@69 193 raise ValueError('cannot find context for %r' % method) from None
jpayne@69 194 ctx._check_available()
jpayne@69 195 return ctx
jpayne@69 196
jpayne@69 197 def get_start_method(self, allow_none=False):
jpayne@69 198 return self._name
jpayne@69 199
jpayne@69 200 def set_start_method(self, method, force=False):
jpayne@69 201 raise ValueError('cannot set start method of concrete context')
jpayne@69 202
jpayne@69 203 @property
jpayne@69 204 def reducer(self):
jpayne@69 205 '''Controls how objects will be reduced to a form that can be
jpayne@69 206 shared with other processes.'''
jpayne@69 207 return globals().get('reduction')
jpayne@69 208
jpayne@69 209 @reducer.setter
jpayne@69 210 def reducer(self, reduction):
jpayne@69 211 globals()['reduction'] = reduction
jpayne@69 212
jpayne@69 213 def _check_available(self):
jpayne@69 214 pass
jpayne@69 215
jpayne@69 216 #
jpayne@69 217 # Type of default context -- underlying context can be set at most once
jpayne@69 218 #
jpayne@69 219
jpayne@69 220 class Process(process.BaseProcess):
jpayne@69 221 _start_method = None
jpayne@69 222 @staticmethod
jpayne@69 223 def _Popen(process_obj):
jpayne@69 224 return _default_context.get_context().Process._Popen(process_obj)
jpayne@69 225
jpayne@69 226 class DefaultContext(BaseContext):
jpayne@69 227 Process = Process
jpayne@69 228
jpayne@69 229 def __init__(self, context):
jpayne@69 230 self._default_context = context
jpayne@69 231 self._actual_context = None
jpayne@69 232
jpayne@69 233 def get_context(self, method=None):
jpayne@69 234 if method is None:
jpayne@69 235 if self._actual_context is None:
jpayne@69 236 self._actual_context = self._default_context
jpayne@69 237 return self._actual_context
jpayne@69 238 else:
jpayne@69 239 return super().get_context(method)
jpayne@69 240
jpayne@69 241 def set_start_method(self, method, force=False):
jpayne@69 242 if self._actual_context is not None and not force:
jpayne@69 243 raise RuntimeError('context has already been set')
jpayne@69 244 if method is None and force:
jpayne@69 245 self._actual_context = None
jpayne@69 246 return
jpayne@69 247 self._actual_context = self.get_context(method)
jpayne@69 248
jpayne@69 249 def get_start_method(self, allow_none=False):
jpayne@69 250 if self._actual_context is None:
jpayne@69 251 if allow_none:
jpayne@69 252 return None
jpayne@69 253 self._actual_context = self._default_context
jpayne@69 254 return self._actual_context._name
jpayne@69 255
jpayne@69 256 def get_all_start_methods(self):
jpayne@69 257 if sys.platform == 'win32':
jpayne@69 258 return ['spawn']
jpayne@69 259 else:
jpayne@69 260 if reduction.HAVE_SEND_HANDLE:
jpayne@69 261 return ['fork', 'spawn', 'forkserver']
jpayne@69 262 else:
jpayne@69 263 return ['fork', 'spawn']
jpayne@69 264
jpayne@69 265 #
jpayne@69 266 # Context types for fixed start method
jpayne@69 267 #
jpayne@69 268
jpayne@69 269 if sys.platform != 'win32':
jpayne@69 270
jpayne@69 271 class ForkProcess(process.BaseProcess):
jpayne@69 272 _start_method = 'fork'
jpayne@69 273 @staticmethod
jpayne@69 274 def _Popen(process_obj):
jpayne@69 275 from .popen_fork import Popen
jpayne@69 276 return Popen(process_obj)
jpayne@69 277
jpayne@69 278 class SpawnProcess(process.BaseProcess):
jpayne@69 279 _start_method = 'spawn'
jpayne@69 280 @staticmethod
jpayne@69 281 def _Popen(process_obj):
jpayne@69 282 from .popen_spawn_posix import Popen
jpayne@69 283 return Popen(process_obj)
jpayne@69 284
jpayne@69 285 class ForkServerProcess(process.BaseProcess):
jpayne@69 286 _start_method = 'forkserver'
jpayne@69 287 @staticmethod
jpayne@69 288 def _Popen(process_obj):
jpayne@69 289 from .popen_forkserver import Popen
jpayne@69 290 return Popen(process_obj)
jpayne@69 291
jpayne@69 292 class ForkContext(BaseContext):
jpayne@69 293 _name = 'fork'
jpayne@69 294 Process = ForkProcess
jpayne@69 295
jpayne@69 296 class SpawnContext(BaseContext):
jpayne@69 297 _name = 'spawn'
jpayne@69 298 Process = SpawnProcess
jpayne@69 299
jpayne@69 300 class ForkServerContext(BaseContext):
jpayne@69 301 _name = 'forkserver'
jpayne@69 302 Process = ForkServerProcess
jpayne@69 303 def _check_available(self):
jpayne@69 304 if not reduction.HAVE_SEND_HANDLE:
jpayne@69 305 raise ValueError('forkserver start method not available')
jpayne@69 306
jpayne@69 307 _concrete_contexts = {
jpayne@69 308 'fork': ForkContext(),
jpayne@69 309 'spawn': SpawnContext(),
jpayne@69 310 'forkserver': ForkServerContext(),
jpayne@69 311 }
jpayne@69 312 if sys.platform == 'darwin':
jpayne@69 313 # bpo-33725: running arbitrary code after fork() is no longer reliable
jpayne@69 314 # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
jpayne@69 315 _default_context = DefaultContext(_concrete_contexts['spawn'])
jpayne@69 316 else:
jpayne@69 317 _default_context = DefaultContext(_concrete_contexts['fork'])
jpayne@69 318
jpayne@69 319 else:
jpayne@69 320
jpayne@69 321 class SpawnProcess(process.BaseProcess):
jpayne@69 322 _start_method = 'spawn'
jpayne@69 323 @staticmethod
jpayne@69 324 def _Popen(process_obj):
jpayne@69 325 from .popen_spawn_win32 import Popen
jpayne@69 326 return Popen(process_obj)
jpayne@69 327
jpayne@69 328 class SpawnContext(BaseContext):
jpayne@69 329 _name = 'spawn'
jpayne@69 330 Process = SpawnProcess
jpayne@69 331
jpayne@69 332 _concrete_contexts = {
jpayne@69 333 'spawn': SpawnContext(),
jpayne@69 334 }
jpayne@69 335 _default_context = DefaultContext(_concrete_contexts['spawn'])
jpayne@69 336
jpayne@69 337 #
jpayne@69 338 # Force the start method
jpayne@69 339 #
jpayne@69 340
jpayne@69 341 def _force_start_method(method):
jpayne@69 342 _default_context._actual_context = _concrete_contexts[method]
jpayne@69 343
jpayne@69 344 #
jpayne@69 345 # Check that the current thread is spawning a child process
jpayne@69 346 #
jpayne@69 347
jpayne@69 348 _tls = threading.local()
jpayne@69 349
jpayne@69 350 def get_spawning_popen():
jpayne@69 351 return getattr(_tls, 'spawning_popen', None)
jpayne@69 352
jpayne@69 353 def set_spawning_popen(popen):
jpayne@69 354 _tls.spawning_popen = popen
jpayne@69 355
jpayne@69 356 def assert_spawning(obj):
jpayne@69 357 if get_spawning_popen() is None:
jpayne@69 358 raise RuntimeError(
jpayne@69 359 '%s objects should only be shared between processes'
jpayne@69 360 ' through inheritance' % type(obj).__name__
jpayne@69 361 )