jpayne@68: # jpayne@68: # Module implementing synchronization primitives jpayne@68: # jpayne@68: # multiprocessing/synchronize.py jpayne@68: # jpayne@68: # Copyright (c) 2006-2008, R Oudkerk jpayne@68: # Licensed to PSF under a Contributor Agreement. jpayne@68: # jpayne@68: jpayne@68: __all__ = [ jpayne@68: 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' jpayne@68: ] jpayne@68: jpayne@68: import threading jpayne@68: import sys jpayne@68: import tempfile jpayne@68: import _multiprocessing jpayne@68: import time jpayne@68: jpayne@68: from . import context jpayne@68: from . import process jpayne@68: from . import util jpayne@68: jpayne@68: # Try to import the mp.synchronize module cleanly, if it fails jpayne@68: # raise ImportError for platforms lacking a working sem_open implementation. jpayne@68: # See issue 3770 jpayne@68: try: jpayne@68: from _multiprocessing import SemLock, sem_unlink jpayne@68: except (ImportError): jpayne@68: raise ImportError("This platform lacks a functioning sem_open" + jpayne@68: " implementation, therefore, the required" + jpayne@68: " synchronization primitives needed will not" + jpayne@68: " function, see issue 3770.") jpayne@68: jpayne@68: # jpayne@68: # Constants jpayne@68: # jpayne@68: jpayne@68: RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) jpayne@68: SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX jpayne@68: jpayne@68: # jpayne@68: # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` jpayne@68: # jpayne@68: jpayne@68: class SemLock(object): jpayne@68: jpayne@68: _rand = tempfile._RandomNameSequence() jpayne@68: jpayne@68: def __init__(self, kind, value, maxvalue, *, ctx): jpayne@68: if ctx is None: jpayne@68: ctx = context._default_context.get_context() jpayne@68: name = ctx.get_start_method() jpayne@68: unlink_now = sys.platform == 'win32' or name == 'fork' jpayne@68: for i in range(100): jpayne@68: try: jpayne@68: sl = self._semlock = _multiprocessing.SemLock( jpayne@68: kind, value, maxvalue, self._make_name(), jpayne@68: unlink_now) jpayne@68: except FileExistsError: jpayne@68: pass jpayne@68: else: jpayne@68: break jpayne@68: else: jpayne@68: raise FileExistsError('cannot find name for semaphore') jpayne@68: jpayne@68: util.debug('created semlock with handle %s' % sl.handle) jpayne@68: self._make_methods() jpayne@68: jpayne@68: if sys.platform != 'win32': jpayne@68: def _after_fork(obj): jpayne@68: obj._semlock._after_fork() jpayne@68: util.register_after_fork(self, _after_fork) jpayne@68: jpayne@68: if self._semlock.name is not None: jpayne@68: # We only get here if we are on Unix with forking jpayne@68: # disabled. When the object is garbage collected or the jpayne@68: # process shuts down we unlink the semaphore name jpayne@68: from .resource_tracker import register jpayne@68: register(self._semlock.name, "semaphore") jpayne@68: util.Finalize(self, SemLock._cleanup, (self._semlock.name,), jpayne@68: exitpriority=0) jpayne@68: jpayne@68: @staticmethod jpayne@68: def _cleanup(name): jpayne@68: from .resource_tracker import unregister jpayne@68: sem_unlink(name) jpayne@68: unregister(name, "semaphore") jpayne@68: jpayne@68: def _make_methods(self): jpayne@68: self.acquire = self._semlock.acquire jpayne@68: self.release = self._semlock.release jpayne@68: jpayne@68: def __enter__(self): jpayne@68: return self._semlock.__enter__() jpayne@68: jpayne@68: def __exit__(self, *args): jpayne@68: return self._semlock.__exit__(*args) jpayne@68: jpayne@68: def __getstate__(self): jpayne@68: context.assert_spawning(self) jpayne@68: sl = self._semlock jpayne@68: if sys.platform == 'win32': jpayne@68: h = context.get_spawning_popen().duplicate_for_child(sl.handle) jpayne@68: else: jpayne@68: h = sl.handle jpayne@68: return (h, sl.kind, sl.maxvalue, sl.name) jpayne@68: jpayne@68: def __setstate__(self, state): jpayne@68: self._semlock = _multiprocessing.SemLock._rebuild(*state) jpayne@68: util.debug('recreated blocker with handle %r' % state[0]) jpayne@68: self._make_methods() jpayne@68: jpayne@68: @staticmethod jpayne@68: def _make_name(): jpayne@68: return '%s-%s' % (process.current_process()._config['semprefix'], jpayne@68: next(SemLock._rand)) jpayne@68: jpayne@68: # jpayne@68: # Semaphore jpayne@68: # jpayne@68: jpayne@68: class Semaphore(SemLock): jpayne@68: jpayne@68: def __init__(self, value=1, *, ctx): jpayne@68: SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) jpayne@68: jpayne@68: def get_value(self): jpayne@68: return self._semlock._get_value() jpayne@68: jpayne@68: def __repr__(self): jpayne@68: try: jpayne@68: value = self._semlock._get_value() jpayne@68: except Exception: jpayne@68: value = 'unknown' jpayne@68: return '<%s(value=%s)>' % (self.__class__.__name__, value) jpayne@68: jpayne@68: # jpayne@68: # Bounded semaphore jpayne@68: # jpayne@68: jpayne@68: class BoundedSemaphore(Semaphore): jpayne@68: jpayne@68: def __init__(self, value=1, *, ctx): jpayne@68: SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) jpayne@68: jpayne@68: def __repr__(self): jpayne@68: try: jpayne@68: value = self._semlock._get_value() jpayne@68: except Exception: jpayne@68: value = 'unknown' jpayne@68: return '<%s(value=%s, maxvalue=%s)>' % \ jpayne@68: (self.__class__.__name__, value, self._semlock.maxvalue) jpayne@68: jpayne@68: # jpayne@68: # Non-recursive lock jpayne@68: # jpayne@68: jpayne@68: class Lock(SemLock): jpayne@68: jpayne@68: def __init__(self, *, ctx): jpayne@68: SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) jpayne@68: jpayne@68: def __repr__(self): jpayne@68: try: jpayne@68: if self._semlock._is_mine(): jpayne@68: name = process.current_process().name jpayne@68: if threading.current_thread().name != 'MainThread': jpayne@68: name += '|' + threading.current_thread().name jpayne@68: elif self._semlock._get_value() == 1: jpayne@68: name = 'None' jpayne@68: elif self._semlock._count() > 0: jpayne@68: name = 'SomeOtherThread' jpayne@68: else: jpayne@68: name = 'SomeOtherProcess' jpayne@68: except Exception: jpayne@68: name = 'unknown' jpayne@68: return '<%s(owner=%s)>' % (self.__class__.__name__, name) jpayne@68: jpayne@68: # jpayne@68: # Recursive lock jpayne@68: # jpayne@68: jpayne@68: class RLock(SemLock): jpayne@68: jpayne@68: def __init__(self, *, ctx): jpayne@68: SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) jpayne@68: jpayne@68: def __repr__(self): jpayne@68: try: jpayne@68: if self._semlock._is_mine(): jpayne@68: name = process.current_process().name jpayne@68: if threading.current_thread().name != 'MainThread': jpayne@68: name += '|' + threading.current_thread().name jpayne@68: count = self._semlock._count() jpayne@68: elif self._semlock._get_value() == 1: jpayne@68: name, count = 'None', 0 jpayne@68: elif self._semlock._count() > 0: jpayne@68: name, count = 'SomeOtherThread', 'nonzero' jpayne@68: else: jpayne@68: name, count = 'SomeOtherProcess', 'nonzero' jpayne@68: except Exception: jpayne@68: name, count = 'unknown', 'unknown' jpayne@68: return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) jpayne@68: jpayne@68: # jpayne@68: # Condition variable jpayne@68: # jpayne@68: jpayne@68: class Condition(object): jpayne@68: jpayne@68: def __init__(self, lock=None, *, ctx): jpayne@68: self._lock = lock or ctx.RLock() jpayne@68: self._sleeping_count = ctx.Semaphore(0) jpayne@68: self._woken_count = ctx.Semaphore(0) jpayne@68: self._wait_semaphore = ctx.Semaphore(0) jpayne@68: self._make_methods() jpayne@68: jpayne@68: def __getstate__(self): jpayne@68: context.assert_spawning(self) jpayne@68: return (self._lock, self._sleeping_count, jpayne@68: self._woken_count, self._wait_semaphore) jpayne@68: jpayne@68: def __setstate__(self, state): jpayne@68: (self._lock, self._sleeping_count, jpayne@68: self._woken_count, self._wait_semaphore) = state jpayne@68: self._make_methods() jpayne@68: jpayne@68: def __enter__(self): jpayne@68: return self._lock.__enter__() jpayne@68: jpayne@68: def __exit__(self, *args): jpayne@68: return self._lock.__exit__(*args) jpayne@68: jpayne@68: def _make_methods(self): jpayne@68: self.acquire = self._lock.acquire jpayne@68: self.release = self._lock.release jpayne@68: jpayne@68: def __repr__(self): jpayne@68: try: jpayne@68: num_waiters = (self._sleeping_count._semlock._get_value() - jpayne@68: self._woken_count._semlock._get_value()) jpayne@68: except Exception: jpayne@68: num_waiters = 'unknown' jpayne@68: return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters) jpayne@68: jpayne@68: def wait(self, timeout=None): jpayne@68: assert self._lock._semlock._is_mine(), \ jpayne@68: 'must acquire() condition before using wait()' jpayne@68: jpayne@68: # indicate that this thread is going to sleep jpayne@68: self._sleeping_count.release() jpayne@68: jpayne@68: # release lock jpayne@68: count = self._lock._semlock._count() jpayne@68: for i in range(count): jpayne@68: self._lock.release() jpayne@68: jpayne@68: try: jpayne@68: # wait for notification or timeout jpayne@68: return self._wait_semaphore.acquire(True, timeout) jpayne@68: finally: jpayne@68: # indicate that this thread has woken jpayne@68: self._woken_count.release() jpayne@68: jpayne@68: # reacquire lock jpayne@68: for i in range(count): jpayne@68: self._lock.acquire() jpayne@68: jpayne@68: def notify(self, n=1): jpayne@68: assert self._lock._semlock._is_mine(), 'lock is not owned' jpayne@68: assert not self._wait_semaphore.acquire( jpayne@68: False), ('notify: Should not have been able to acquire' jpayne@68: + '_wait_semaphore') jpayne@68: jpayne@68: # to take account of timeouts since last notify*() we subtract jpayne@68: # woken_count from sleeping_count and rezero woken_count jpayne@68: while self._woken_count.acquire(False): jpayne@68: res = self._sleeping_count.acquire(False) jpayne@68: assert res, ('notify: Bug in sleeping_count.acquire' jpayne@68: + '- res should not be False') jpayne@68: jpayne@68: sleepers = 0 jpayne@68: while sleepers < n and self._sleeping_count.acquire(False): jpayne@68: self._wait_semaphore.release() # wake up one sleeper jpayne@68: sleepers += 1 jpayne@68: jpayne@68: if sleepers: jpayne@68: for i in range(sleepers): jpayne@68: self._woken_count.acquire() # wait for a sleeper to wake jpayne@68: jpayne@68: # rezero wait_semaphore in case some timeouts just happened jpayne@68: while self._wait_semaphore.acquire(False): jpayne@68: pass jpayne@68: jpayne@68: def notify_all(self): jpayne@68: self.notify(n=sys.maxsize) jpayne@68: jpayne@68: def wait_for(self, predicate, timeout=None): jpayne@68: result = predicate() jpayne@68: if result: jpayne@68: return result jpayne@68: if timeout is not None: jpayne@68: endtime = time.monotonic() + timeout jpayne@68: else: jpayne@68: endtime = None jpayne@68: waittime = None jpayne@68: while not result: jpayne@68: if endtime is not None: jpayne@68: waittime = endtime - time.monotonic() jpayne@68: if waittime <= 0: jpayne@68: break jpayne@68: self.wait(waittime) jpayne@68: result = predicate() jpayne@68: return result jpayne@68: jpayne@68: # jpayne@68: # Event jpayne@68: # jpayne@68: jpayne@68: class Event(object): jpayne@68: jpayne@68: def __init__(self, *, ctx): jpayne@68: self._cond = ctx.Condition(ctx.Lock()) jpayne@68: self._flag = ctx.Semaphore(0) jpayne@68: jpayne@68: def is_set(self): jpayne@68: with self._cond: jpayne@68: if self._flag.acquire(False): jpayne@68: self._flag.release() jpayne@68: return True jpayne@68: return False jpayne@68: jpayne@68: def set(self): jpayne@68: with self._cond: jpayne@68: self._flag.acquire(False) jpayne@68: self._flag.release() jpayne@68: self._cond.notify_all() jpayne@68: jpayne@68: def clear(self): jpayne@68: with self._cond: jpayne@68: self._flag.acquire(False) jpayne@68: jpayne@68: def wait(self, timeout=None): jpayne@68: with self._cond: jpayne@68: if self._flag.acquire(False): jpayne@68: self._flag.release() jpayne@68: else: jpayne@68: self._cond.wait(timeout) jpayne@68: jpayne@68: if self._flag.acquire(False): jpayne@68: self._flag.release() jpayne@68: return True jpayne@68: return False jpayne@68: jpayne@68: # jpayne@68: # Barrier jpayne@68: # jpayne@68: jpayne@68: class Barrier(threading.Barrier): jpayne@68: jpayne@68: def __init__(self, parties, action=None, timeout=None, *, ctx): jpayne@68: import struct jpayne@68: from .heap import BufferWrapper jpayne@68: wrapper = BufferWrapper(struct.calcsize('i') * 2) jpayne@68: cond = ctx.Condition() jpayne@68: self.__setstate__((parties, action, timeout, cond, wrapper)) jpayne@68: self._state = 0 jpayne@68: self._count = 0 jpayne@68: jpayne@68: def __setstate__(self, state): jpayne@68: (self._parties, self._action, self._timeout, jpayne@68: self._cond, self._wrapper) = state jpayne@68: self._array = self._wrapper.create_memoryview().cast('i') jpayne@68: jpayne@68: def __getstate__(self): jpayne@68: return (self._parties, self._action, self._timeout, jpayne@68: self._cond, self._wrapper) jpayne@68: jpayne@68: @property jpayne@68: def _state(self): jpayne@68: return self._array[0] jpayne@68: jpayne@68: @_state.setter jpayne@68: def _state(self, value): jpayne@68: self._array[0] = value jpayne@68: jpayne@68: @property jpayne@68: def _count(self): jpayne@68: return self._array[1] jpayne@68: jpayne@68: @_count.setter jpayne@68: def _count(self, value): jpayne@68: self._array[1] = value