annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/synchronize.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 #
jpayne@68 2 # Module implementing synchronization primitives
jpayne@68 3 #
jpayne@68 4 # multiprocessing/synchronize.py
jpayne@68 5 #
jpayne@68 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@68 7 # Licensed to PSF under a Contributor Agreement.
jpayne@68 8 #
jpayne@68 9
jpayne@68 10 __all__ = [
jpayne@68 11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
jpayne@68 12 ]
jpayne@68 13
jpayne@68 14 import threading
jpayne@68 15 import sys
jpayne@68 16 import tempfile
jpayne@68 17 import _multiprocessing
jpayne@68 18 import time
jpayne@68 19
jpayne@68 20 from . import context
jpayne@68 21 from . import process
jpayne@68 22 from . import util
jpayne@68 23
jpayne@68 24 # Try to import the mp.synchronize module cleanly, if it fails
jpayne@68 25 # raise ImportError for platforms lacking a working sem_open implementation.
jpayne@68 26 # See issue 3770
jpayne@68 27 try:
jpayne@68 28 from _multiprocessing import SemLock, sem_unlink
jpayne@68 29 except (ImportError):
jpayne@68 30 raise ImportError("This platform lacks a functioning sem_open" +
jpayne@68 31 " implementation, therefore, the required" +
jpayne@68 32 " synchronization primitives needed will not" +
jpayne@68 33 " function, see issue 3770.")
jpayne@68 34
jpayne@68 35 #
jpayne@68 36 # Constants
jpayne@68 37 #
jpayne@68 38
jpayne@68 39 RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
jpayne@68 40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
jpayne@68 41
jpayne@68 42 #
jpayne@68 43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
jpayne@68 44 #
jpayne@68 45
jpayne@68 46 class SemLock(object):
jpayne@68 47
jpayne@68 48 _rand = tempfile._RandomNameSequence()
jpayne@68 49
jpayne@68 50 def __init__(self, kind, value, maxvalue, *, ctx):
jpayne@68 51 if ctx is None:
jpayne@68 52 ctx = context._default_context.get_context()
jpayne@68 53 name = ctx.get_start_method()
jpayne@68 54 unlink_now = sys.platform == 'win32' or name == 'fork'
jpayne@68 55 for i in range(100):
jpayne@68 56 try:
jpayne@68 57 sl = self._semlock = _multiprocessing.SemLock(
jpayne@68 58 kind, value, maxvalue, self._make_name(),
jpayne@68 59 unlink_now)
jpayne@68 60 except FileExistsError:
jpayne@68 61 pass
jpayne@68 62 else:
jpayne@68 63 break
jpayne@68 64 else:
jpayne@68 65 raise FileExistsError('cannot find name for semaphore')
jpayne@68 66
jpayne@68 67 util.debug('created semlock with handle %s' % sl.handle)
jpayne@68 68 self._make_methods()
jpayne@68 69
jpayne@68 70 if sys.platform != 'win32':
jpayne@68 71 def _after_fork(obj):
jpayne@68 72 obj._semlock._after_fork()
jpayne@68 73 util.register_after_fork(self, _after_fork)
jpayne@68 74
jpayne@68 75 if self._semlock.name is not None:
jpayne@68 76 # We only get here if we are on Unix with forking
jpayne@68 77 # disabled. When the object is garbage collected or the
jpayne@68 78 # process shuts down we unlink the semaphore name
jpayne@68 79 from .resource_tracker import register
jpayne@68 80 register(self._semlock.name, "semaphore")
jpayne@68 81 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
jpayne@68 82 exitpriority=0)
jpayne@68 83
jpayne@68 84 @staticmethod
jpayne@68 85 def _cleanup(name):
jpayne@68 86 from .resource_tracker import unregister
jpayne@68 87 sem_unlink(name)
jpayne@68 88 unregister(name, "semaphore")
jpayne@68 89
jpayne@68 90 def _make_methods(self):
jpayne@68 91 self.acquire = self._semlock.acquire
jpayne@68 92 self.release = self._semlock.release
jpayne@68 93
jpayne@68 94 def __enter__(self):
jpayne@68 95 return self._semlock.__enter__()
jpayne@68 96
jpayne@68 97 def __exit__(self, *args):
jpayne@68 98 return self._semlock.__exit__(*args)
jpayne@68 99
jpayne@68 100 def __getstate__(self):
jpayne@68 101 context.assert_spawning(self)
jpayne@68 102 sl = self._semlock
jpayne@68 103 if sys.platform == 'win32':
jpayne@68 104 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
jpayne@68 105 else:
jpayne@68 106 h = sl.handle
jpayne@68 107 return (h, sl.kind, sl.maxvalue, sl.name)
jpayne@68 108
jpayne@68 109 def __setstate__(self, state):
jpayne@68 110 self._semlock = _multiprocessing.SemLock._rebuild(*state)
jpayne@68 111 util.debug('recreated blocker with handle %r' % state[0])
jpayne@68 112 self._make_methods()
jpayne@68 113
jpayne@68 114 @staticmethod
jpayne@68 115 def _make_name():
jpayne@68 116 return '%s-%s' % (process.current_process()._config['semprefix'],
jpayne@68 117 next(SemLock._rand))
jpayne@68 118
jpayne@68 119 #
jpayne@68 120 # Semaphore
jpayne@68 121 #
jpayne@68 122
jpayne@68 123 class Semaphore(SemLock):
jpayne@68 124
jpayne@68 125 def __init__(self, value=1, *, ctx):
jpayne@68 126 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
jpayne@68 127
jpayne@68 128 def get_value(self):
jpayne@68 129 return self._semlock._get_value()
jpayne@68 130
jpayne@68 131 def __repr__(self):
jpayne@68 132 try:
jpayne@68 133 value = self._semlock._get_value()
jpayne@68 134 except Exception:
jpayne@68 135 value = 'unknown'
jpayne@68 136 return '<%s(value=%s)>' % (self.__class__.__name__, value)
jpayne@68 137
jpayne@68 138 #
jpayne@68 139 # Bounded semaphore
jpayne@68 140 #
jpayne@68 141
jpayne@68 142 class BoundedSemaphore(Semaphore):
jpayne@68 143
jpayne@68 144 def __init__(self, value=1, *, ctx):
jpayne@68 145 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
jpayne@68 146
jpayne@68 147 def __repr__(self):
jpayne@68 148 try:
jpayne@68 149 value = self._semlock._get_value()
jpayne@68 150 except Exception:
jpayne@68 151 value = 'unknown'
jpayne@68 152 return '<%s(value=%s, maxvalue=%s)>' % \
jpayne@68 153 (self.__class__.__name__, value, self._semlock.maxvalue)
jpayne@68 154
jpayne@68 155 #
jpayne@68 156 # Non-recursive lock
jpayne@68 157 #
jpayne@68 158
jpayne@68 159 class Lock(SemLock):
jpayne@68 160
jpayne@68 161 def __init__(self, *, ctx):
jpayne@68 162 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
jpayne@68 163
jpayne@68 164 def __repr__(self):
jpayne@68 165 try:
jpayne@68 166 if self._semlock._is_mine():
jpayne@68 167 name = process.current_process().name
jpayne@68 168 if threading.current_thread().name != 'MainThread':
jpayne@68 169 name += '|' + threading.current_thread().name
jpayne@68 170 elif self._semlock._get_value() == 1:
jpayne@68 171 name = 'None'
jpayne@68 172 elif self._semlock._count() > 0:
jpayne@68 173 name = 'SomeOtherThread'
jpayne@68 174 else:
jpayne@68 175 name = 'SomeOtherProcess'
jpayne@68 176 except Exception:
jpayne@68 177 name = 'unknown'
jpayne@68 178 return '<%s(owner=%s)>' % (self.__class__.__name__, name)
jpayne@68 179
jpayne@68 180 #
jpayne@68 181 # Recursive lock
jpayne@68 182 #
jpayne@68 183
jpayne@68 184 class RLock(SemLock):
jpayne@68 185
jpayne@68 186 def __init__(self, *, ctx):
jpayne@68 187 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
jpayne@68 188
jpayne@68 189 def __repr__(self):
jpayne@68 190 try:
jpayne@68 191 if self._semlock._is_mine():
jpayne@68 192 name = process.current_process().name
jpayne@68 193 if threading.current_thread().name != 'MainThread':
jpayne@68 194 name += '|' + threading.current_thread().name
jpayne@68 195 count = self._semlock._count()
jpayne@68 196 elif self._semlock._get_value() == 1:
jpayne@68 197 name, count = 'None', 0
jpayne@68 198 elif self._semlock._count() > 0:
jpayne@68 199 name, count = 'SomeOtherThread', 'nonzero'
jpayne@68 200 else:
jpayne@68 201 name, count = 'SomeOtherProcess', 'nonzero'
jpayne@68 202 except Exception:
jpayne@68 203 name, count = 'unknown', 'unknown'
jpayne@68 204 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
jpayne@68 205
jpayne@68 206 #
jpayne@68 207 # Condition variable
jpayne@68 208 #
jpayne@68 209
jpayne@68 210 class Condition(object):
jpayne@68 211
jpayne@68 212 def __init__(self, lock=None, *, ctx):
jpayne@68 213 self._lock = lock or ctx.RLock()
jpayne@68 214 self._sleeping_count = ctx.Semaphore(0)
jpayne@68 215 self._woken_count = ctx.Semaphore(0)
jpayne@68 216 self._wait_semaphore = ctx.Semaphore(0)
jpayne@68 217 self._make_methods()
jpayne@68 218
jpayne@68 219 def __getstate__(self):
jpayne@68 220 context.assert_spawning(self)
jpayne@68 221 return (self._lock, self._sleeping_count,
jpayne@68 222 self._woken_count, self._wait_semaphore)
jpayne@68 223
jpayne@68 224 def __setstate__(self, state):
jpayne@68 225 (self._lock, self._sleeping_count,
jpayne@68 226 self._woken_count, self._wait_semaphore) = state
jpayne@68 227 self._make_methods()
jpayne@68 228
jpayne@68 229 def __enter__(self):
jpayne@68 230 return self._lock.__enter__()
jpayne@68 231
jpayne@68 232 def __exit__(self, *args):
jpayne@68 233 return self._lock.__exit__(*args)
jpayne@68 234
jpayne@68 235 def _make_methods(self):
jpayne@68 236 self.acquire = self._lock.acquire
jpayne@68 237 self.release = self._lock.release
jpayne@68 238
jpayne@68 239 def __repr__(self):
jpayne@68 240 try:
jpayne@68 241 num_waiters = (self._sleeping_count._semlock._get_value() -
jpayne@68 242 self._woken_count._semlock._get_value())
jpayne@68 243 except Exception:
jpayne@68 244 num_waiters = 'unknown'
jpayne@68 245 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
jpayne@68 246
jpayne@68 247 def wait(self, timeout=None):
jpayne@68 248 assert self._lock._semlock._is_mine(), \
jpayne@68 249 'must acquire() condition before using wait()'
jpayne@68 250
jpayne@68 251 # indicate that this thread is going to sleep
jpayne@68 252 self._sleeping_count.release()
jpayne@68 253
jpayne@68 254 # release lock
jpayne@68 255 count = self._lock._semlock._count()
jpayne@68 256 for i in range(count):
jpayne@68 257 self._lock.release()
jpayne@68 258
jpayne@68 259 try:
jpayne@68 260 # wait for notification or timeout
jpayne@68 261 return self._wait_semaphore.acquire(True, timeout)
jpayne@68 262 finally:
jpayne@68 263 # indicate that this thread has woken
jpayne@68 264 self._woken_count.release()
jpayne@68 265
jpayne@68 266 # reacquire lock
jpayne@68 267 for i in range(count):
jpayne@68 268 self._lock.acquire()
jpayne@68 269
jpayne@68 270 def notify(self, n=1):
jpayne@68 271 assert self._lock._semlock._is_mine(), 'lock is not owned'
jpayne@68 272 assert not self._wait_semaphore.acquire(
jpayne@68 273 False), ('notify: Should not have been able to acquire'
jpayne@68 274 + '_wait_semaphore')
jpayne@68 275
jpayne@68 276 # to take account of timeouts since last notify*() we subtract
jpayne@68 277 # woken_count from sleeping_count and rezero woken_count
jpayne@68 278 while self._woken_count.acquire(False):
jpayne@68 279 res = self._sleeping_count.acquire(False)
jpayne@68 280 assert res, ('notify: Bug in sleeping_count.acquire'
jpayne@68 281 + '- res should not be False')
jpayne@68 282
jpayne@68 283 sleepers = 0
jpayne@68 284 while sleepers < n and self._sleeping_count.acquire(False):
jpayne@68 285 self._wait_semaphore.release() # wake up one sleeper
jpayne@68 286 sleepers += 1
jpayne@68 287
jpayne@68 288 if sleepers:
jpayne@68 289 for i in range(sleepers):
jpayne@68 290 self._woken_count.acquire() # wait for a sleeper to wake
jpayne@68 291
jpayne@68 292 # rezero wait_semaphore in case some timeouts just happened
jpayne@68 293 while self._wait_semaphore.acquire(False):
jpayne@68 294 pass
jpayne@68 295
jpayne@68 296 def notify_all(self):
jpayne@68 297 self.notify(n=sys.maxsize)
jpayne@68 298
jpayne@68 299 def wait_for(self, predicate, timeout=None):
jpayne@68 300 result = predicate()
jpayne@68 301 if result:
jpayne@68 302 return result
jpayne@68 303 if timeout is not None:
jpayne@68 304 endtime = time.monotonic() + timeout
jpayne@68 305 else:
jpayne@68 306 endtime = None
jpayne@68 307 waittime = None
jpayne@68 308 while not result:
jpayne@68 309 if endtime is not None:
jpayne@68 310 waittime = endtime - time.monotonic()
jpayne@68 311 if waittime <= 0:
jpayne@68 312 break
jpayne@68 313 self.wait(waittime)
jpayne@68 314 result = predicate()
jpayne@68 315 return result
jpayne@68 316
jpayne@68 317 #
jpayne@68 318 # Event
jpayne@68 319 #
jpayne@68 320
jpayne@68 321 class Event(object):
jpayne@68 322
jpayne@68 323 def __init__(self, *, ctx):
jpayne@68 324 self._cond = ctx.Condition(ctx.Lock())
jpayne@68 325 self._flag = ctx.Semaphore(0)
jpayne@68 326
jpayne@68 327 def is_set(self):
jpayne@68 328 with self._cond:
jpayne@68 329 if self._flag.acquire(False):
jpayne@68 330 self._flag.release()
jpayne@68 331 return True
jpayne@68 332 return False
jpayne@68 333
jpayne@68 334 def set(self):
jpayne@68 335 with self._cond:
jpayne@68 336 self._flag.acquire(False)
jpayne@68 337 self._flag.release()
jpayne@68 338 self._cond.notify_all()
jpayne@68 339
jpayne@68 340 def clear(self):
jpayne@68 341 with self._cond:
jpayne@68 342 self._flag.acquire(False)
jpayne@68 343
jpayne@68 344 def wait(self, timeout=None):
jpayne@68 345 with self._cond:
jpayne@68 346 if self._flag.acquire(False):
jpayne@68 347 self._flag.release()
jpayne@68 348 else:
jpayne@68 349 self._cond.wait(timeout)
jpayne@68 350
jpayne@68 351 if self._flag.acquire(False):
jpayne@68 352 self._flag.release()
jpayne@68 353 return True
jpayne@68 354 return False
jpayne@68 355
jpayne@68 356 #
jpayne@68 357 # Barrier
jpayne@68 358 #
jpayne@68 359
jpayne@68 360 class Barrier(threading.Barrier):
jpayne@68 361
jpayne@68 362 def __init__(self, parties, action=None, timeout=None, *, ctx):
jpayne@68 363 import struct
jpayne@68 364 from .heap import BufferWrapper
jpayne@68 365 wrapper = BufferWrapper(struct.calcsize('i') * 2)
jpayne@68 366 cond = ctx.Condition()
jpayne@68 367 self.__setstate__((parties, action, timeout, cond, wrapper))
jpayne@68 368 self._state = 0
jpayne@68 369 self._count = 0
jpayne@68 370
jpayne@68 371 def __setstate__(self, state):
jpayne@68 372 (self._parties, self._action, self._timeout,
jpayne@68 373 self._cond, self._wrapper) = state
jpayne@68 374 self._array = self._wrapper.create_memoryview().cast('i')
jpayne@68 375
jpayne@68 376 def __getstate__(self):
jpayne@68 377 return (self._parties, self._action, self._timeout,
jpayne@68 378 self._cond, self._wrapper)
jpayne@68 379
jpayne@68 380 @property
jpayne@68 381 def _state(self):
jpayne@68 382 return self._array[0]
jpayne@68 383
jpayne@68 384 @_state.setter
jpayne@68 385 def _state(self, value):
jpayne@68 386 self._array[0] = value
jpayne@68 387
jpayne@68 388 @property
jpayne@68 389 def _count(self):
jpayne@68 390 return self._array[1]
jpayne@68 391
jpayne@68 392 @_count.setter
jpayne@68 393 def _count(self, value):
jpayne@68 394 self._array[1] = value