jpayne@68
|
1 #
|
jpayne@68
|
2 # Module implementing synchronization primitives
|
jpayne@68
|
3 #
|
jpayne@68
|
4 # multiprocessing/synchronize.py
|
jpayne@68
|
5 #
|
jpayne@68
|
6 # Copyright (c) 2006-2008, R Oudkerk
|
jpayne@68
|
7 # Licensed to PSF under a Contributor Agreement.
|
jpayne@68
|
8 #
|
jpayne@68
|
9
|
jpayne@68
|
10 __all__ = [
|
jpayne@68
|
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
|
jpayne@68
|
12 ]
|
jpayne@68
|
13
|
jpayne@68
|
14 import threading
|
jpayne@68
|
15 import sys
|
jpayne@68
|
16 import tempfile
|
jpayne@68
|
17 import _multiprocessing
|
jpayne@68
|
18 import time
|
jpayne@68
|
19
|
jpayne@68
|
20 from . import context
|
jpayne@68
|
21 from . import process
|
jpayne@68
|
22 from . import util
|
jpayne@68
|
23
|
jpayne@68
|
24 # Try to import the mp.synchronize module cleanly, if it fails
|
jpayne@68
|
25 # raise ImportError for platforms lacking a working sem_open implementation.
|
jpayne@68
|
26 # See issue 3770
|
jpayne@68
|
27 try:
|
jpayne@68
|
28 from _multiprocessing import SemLock, sem_unlink
|
jpayne@68
|
29 except (ImportError):
|
jpayne@68
|
30 raise ImportError("This platform lacks a functioning sem_open" +
|
jpayne@68
|
31 " implementation, therefore, the required" +
|
jpayne@68
|
32 " synchronization primitives needed will not" +
|
jpayne@68
|
33 " function, see issue 3770.")
|
jpayne@68
|
34
|
jpayne@68
|
35 #
|
jpayne@68
|
36 # Constants
|
jpayne@68
|
37 #
|
jpayne@68
|
38
|
jpayne@68
|
39 RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
|
jpayne@68
|
40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
|
jpayne@68
|
41
|
jpayne@68
|
42 #
|
jpayne@68
|
43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
|
jpayne@68
|
44 #
|
jpayne@68
|
45
|
jpayne@68
|
46 class SemLock(object):
|
jpayne@68
|
47
|
jpayne@68
|
48 _rand = tempfile._RandomNameSequence()
|
jpayne@68
|
49
|
jpayne@68
|
50 def __init__(self, kind, value, maxvalue, *, ctx):
|
jpayne@68
|
51 if ctx is None:
|
jpayne@68
|
52 ctx = context._default_context.get_context()
|
jpayne@68
|
53 name = ctx.get_start_method()
|
jpayne@68
|
54 unlink_now = sys.platform == 'win32' or name == 'fork'
|
jpayne@68
|
55 for i in range(100):
|
jpayne@68
|
56 try:
|
jpayne@68
|
57 sl = self._semlock = _multiprocessing.SemLock(
|
jpayne@68
|
58 kind, value, maxvalue, self._make_name(),
|
jpayne@68
|
59 unlink_now)
|
jpayne@68
|
60 except FileExistsError:
|
jpayne@68
|
61 pass
|
jpayne@68
|
62 else:
|
jpayne@68
|
63 break
|
jpayne@68
|
64 else:
|
jpayne@68
|
65 raise FileExistsError('cannot find name for semaphore')
|
jpayne@68
|
66
|
jpayne@68
|
67 util.debug('created semlock with handle %s' % sl.handle)
|
jpayne@68
|
68 self._make_methods()
|
jpayne@68
|
69
|
jpayne@68
|
70 if sys.platform != 'win32':
|
jpayne@68
|
71 def _after_fork(obj):
|
jpayne@68
|
72 obj._semlock._after_fork()
|
jpayne@68
|
73 util.register_after_fork(self, _after_fork)
|
jpayne@68
|
74
|
jpayne@68
|
75 if self._semlock.name is not None:
|
jpayne@68
|
76 # We only get here if we are on Unix with forking
|
jpayne@68
|
77 # disabled. When the object is garbage collected or the
|
jpayne@68
|
78 # process shuts down we unlink the semaphore name
|
jpayne@68
|
79 from .resource_tracker import register
|
jpayne@68
|
80 register(self._semlock.name, "semaphore")
|
jpayne@68
|
81 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
|
jpayne@68
|
82 exitpriority=0)
|
jpayne@68
|
83
|
jpayne@68
|
84 @staticmethod
|
jpayne@68
|
85 def _cleanup(name):
|
jpayne@68
|
86 from .resource_tracker import unregister
|
jpayne@68
|
87 sem_unlink(name)
|
jpayne@68
|
88 unregister(name, "semaphore")
|
jpayne@68
|
89
|
jpayne@68
|
90 def _make_methods(self):
|
jpayne@68
|
91 self.acquire = self._semlock.acquire
|
jpayne@68
|
92 self.release = self._semlock.release
|
jpayne@68
|
93
|
jpayne@68
|
94 def __enter__(self):
|
jpayne@68
|
95 return self._semlock.__enter__()
|
jpayne@68
|
96
|
jpayne@68
|
97 def __exit__(self, *args):
|
jpayne@68
|
98 return self._semlock.__exit__(*args)
|
jpayne@68
|
99
|
jpayne@68
|
100 def __getstate__(self):
|
jpayne@68
|
101 context.assert_spawning(self)
|
jpayne@68
|
102 sl = self._semlock
|
jpayne@68
|
103 if sys.platform == 'win32':
|
jpayne@68
|
104 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
|
jpayne@68
|
105 else:
|
jpayne@68
|
106 h = sl.handle
|
jpayne@68
|
107 return (h, sl.kind, sl.maxvalue, sl.name)
|
jpayne@68
|
108
|
jpayne@68
|
109 def __setstate__(self, state):
|
jpayne@68
|
110 self._semlock = _multiprocessing.SemLock._rebuild(*state)
|
jpayne@68
|
111 util.debug('recreated blocker with handle %r' % state[0])
|
jpayne@68
|
112 self._make_methods()
|
jpayne@68
|
113
|
jpayne@68
|
114 @staticmethod
|
jpayne@68
|
115 def _make_name():
|
jpayne@68
|
116 return '%s-%s' % (process.current_process()._config['semprefix'],
|
jpayne@68
|
117 next(SemLock._rand))
|
jpayne@68
|
118
|
jpayne@68
|
119 #
|
jpayne@68
|
120 # Semaphore
|
jpayne@68
|
121 #
|
jpayne@68
|
122
|
jpayne@68
|
123 class Semaphore(SemLock):
|
jpayne@68
|
124
|
jpayne@68
|
125 def __init__(self, value=1, *, ctx):
|
jpayne@68
|
126 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
|
jpayne@68
|
127
|
jpayne@68
|
128 def get_value(self):
|
jpayne@68
|
129 return self._semlock._get_value()
|
jpayne@68
|
130
|
jpayne@68
|
131 def __repr__(self):
|
jpayne@68
|
132 try:
|
jpayne@68
|
133 value = self._semlock._get_value()
|
jpayne@68
|
134 except Exception:
|
jpayne@68
|
135 value = 'unknown'
|
jpayne@68
|
136 return '<%s(value=%s)>' % (self.__class__.__name__, value)
|
jpayne@68
|
137
|
jpayne@68
|
138 #
|
jpayne@68
|
139 # Bounded semaphore
|
jpayne@68
|
140 #
|
jpayne@68
|
141
|
jpayne@68
|
142 class BoundedSemaphore(Semaphore):
|
jpayne@68
|
143
|
jpayne@68
|
144 def __init__(self, value=1, *, ctx):
|
jpayne@68
|
145 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
|
jpayne@68
|
146
|
jpayne@68
|
147 def __repr__(self):
|
jpayne@68
|
148 try:
|
jpayne@68
|
149 value = self._semlock._get_value()
|
jpayne@68
|
150 except Exception:
|
jpayne@68
|
151 value = 'unknown'
|
jpayne@68
|
152 return '<%s(value=%s, maxvalue=%s)>' % \
|
jpayne@68
|
153 (self.__class__.__name__, value, self._semlock.maxvalue)
|
jpayne@68
|
154
|
jpayne@68
|
155 #
|
jpayne@68
|
156 # Non-recursive lock
|
jpayne@68
|
157 #
|
jpayne@68
|
158
|
jpayne@68
|
159 class Lock(SemLock):
|
jpayne@68
|
160
|
jpayne@68
|
161 def __init__(self, *, ctx):
|
jpayne@68
|
162 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
|
jpayne@68
|
163
|
jpayne@68
|
164 def __repr__(self):
|
jpayne@68
|
165 try:
|
jpayne@68
|
166 if self._semlock._is_mine():
|
jpayne@68
|
167 name = process.current_process().name
|
jpayne@68
|
168 if threading.current_thread().name != 'MainThread':
|
jpayne@68
|
169 name += '|' + threading.current_thread().name
|
jpayne@68
|
170 elif self._semlock._get_value() == 1:
|
jpayne@68
|
171 name = 'None'
|
jpayne@68
|
172 elif self._semlock._count() > 0:
|
jpayne@68
|
173 name = 'SomeOtherThread'
|
jpayne@68
|
174 else:
|
jpayne@68
|
175 name = 'SomeOtherProcess'
|
jpayne@68
|
176 except Exception:
|
jpayne@68
|
177 name = 'unknown'
|
jpayne@68
|
178 return '<%s(owner=%s)>' % (self.__class__.__name__, name)
|
jpayne@68
|
179
|
jpayne@68
|
180 #
|
jpayne@68
|
181 # Recursive lock
|
jpayne@68
|
182 #
|
jpayne@68
|
183
|
jpayne@68
|
184 class RLock(SemLock):
|
jpayne@68
|
185
|
jpayne@68
|
186 def __init__(self, *, ctx):
|
jpayne@68
|
187 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
|
jpayne@68
|
188
|
jpayne@68
|
189 def __repr__(self):
|
jpayne@68
|
190 try:
|
jpayne@68
|
191 if self._semlock._is_mine():
|
jpayne@68
|
192 name = process.current_process().name
|
jpayne@68
|
193 if threading.current_thread().name != 'MainThread':
|
jpayne@68
|
194 name += '|' + threading.current_thread().name
|
jpayne@68
|
195 count = self._semlock._count()
|
jpayne@68
|
196 elif self._semlock._get_value() == 1:
|
jpayne@68
|
197 name, count = 'None', 0
|
jpayne@68
|
198 elif self._semlock._count() > 0:
|
jpayne@68
|
199 name, count = 'SomeOtherThread', 'nonzero'
|
jpayne@68
|
200 else:
|
jpayne@68
|
201 name, count = 'SomeOtherProcess', 'nonzero'
|
jpayne@68
|
202 except Exception:
|
jpayne@68
|
203 name, count = 'unknown', 'unknown'
|
jpayne@68
|
204 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
|
jpayne@68
|
205
|
jpayne@68
|
206 #
|
jpayne@68
|
207 # Condition variable
|
jpayne@68
|
208 #
|
jpayne@68
|
209
|
jpayne@68
|
210 class Condition(object):
|
jpayne@68
|
211
|
jpayne@68
|
212 def __init__(self, lock=None, *, ctx):
|
jpayne@68
|
213 self._lock = lock or ctx.RLock()
|
jpayne@68
|
214 self._sleeping_count = ctx.Semaphore(0)
|
jpayne@68
|
215 self._woken_count = ctx.Semaphore(0)
|
jpayne@68
|
216 self._wait_semaphore = ctx.Semaphore(0)
|
jpayne@68
|
217 self._make_methods()
|
jpayne@68
|
218
|
jpayne@68
|
219 def __getstate__(self):
|
jpayne@68
|
220 context.assert_spawning(self)
|
jpayne@68
|
221 return (self._lock, self._sleeping_count,
|
jpayne@68
|
222 self._woken_count, self._wait_semaphore)
|
jpayne@68
|
223
|
jpayne@68
|
224 def __setstate__(self, state):
|
jpayne@68
|
225 (self._lock, self._sleeping_count,
|
jpayne@68
|
226 self._woken_count, self._wait_semaphore) = state
|
jpayne@68
|
227 self._make_methods()
|
jpayne@68
|
228
|
jpayne@68
|
229 def __enter__(self):
|
jpayne@68
|
230 return self._lock.__enter__()
|
jpayne@68
|
231
|
jpayne@68
|
232 def __exit__(self, *args):
|
jpayne@68
|
233 return self._lock.__exit__(*args)
|
jpayne@68
|
234
|
jpayne@68
|
235 def _make_methods(self):
|
jpayne@68
|
236 self.acquire = self._lock.acquire
|
jpayne@68
|
237 self.release = self._lock.release
|
jpayne@68
|
238
|
jpayne@68
|
239 def __repr__(self):
|
jpayne@68
|
240 try:
|
jpayne@68
|
241 num_waiters = (self._sleeping_count._semlock._get_value() -
|
jpayne@68
|
242 self._woken_count._semlock._get_value())
|
jpayne@68
|
243 except Exception:
|
jpayne@68
|
244 num_waiters = 'unknown'
|
jpayne@68
|
245 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
|
jpayne@68
|
246
|
jpayne@68
|
247 def wait(self, timeout=None):
|
jpayne@68
|
248 assert self._lock._semlock._is_mine(), \
|
jpayne@68
|
249 'must acquire() condition before using wait()'
|
jpayne@68
|
250
|
jpayne@68
|
251 # indicate that this thread is going to sleep
|
jpayne@68
|
252 self._sleeping_count.release()
|
jpayne@68
|
253
|
jpayne@68
|
254 # release lock
|
jpayne@68
|
255 count = self._lock._semlock._count()
|
jpayne@68
|
256 for i in range(count):
|
jpayne@68
|
257 self._lock.release()
|
jpayne@68
|
258
|
jpayne@68
|
259 try:
|
jpayne@68
|
260 # wait for notification or timeout
|
jpayne@68
|
261 return self._wait_semaphore.acquire(True, timeout)
|
jpayne@68
|
262 finally:
|
jpayne@68
|
263 # indicate that this thread has woken
|
jpayne@68
|
264 self._woken_count.release()
|
jpayne@68
|
265
|
jpayne@68
|
266 # reacquire lock
|
jpayne@68
|
267 for i in range(count):
|
jpayne@68
|
268 self._lock.acquire()
|
jpayne@68
|
269
|
jpayne@68
|
270 def notify(self, n=1):
|
jpayne@68
|
271 assert self._lock._semlock._is_mine(), 'lock is not owned'
|
jpayne@68
|
272 assert not self._wait_semaphore.acquire(
|
jpayne@68
|
273 False), ('notify: Should not have been able to acquire'
|
jpayne@68
|
274 + '_wait_semaphore')
|
jpayne@68
|
275
|
jpayne@68
|
276 # to take account of timeouts since last notify*() we subtract
|
jpayne@68
|
277 # woken_count from sleeping_count and rezero woken_count
|
jpayne@68
|
278 while self._woken_count.acquire(False):
|
jpayne@68
|
279 res = self._sleeping_count.acquire(False)
|
jpayne@68
|
280 assert res, ('notify: Bug in sleeping_count.acquire'
|
jpayne@68
|
281 + '- res should not be False')
|
jpayne@68
|
282
|
jpayne@68
|
283 sleepers = 0
|
jpayne@68
|
284 while sleepers < n and self._sleeping_count.acquire(False):
|
jpayne@68
|
285 self._wait_semaphore.release() # wake up one sleeper
|
jpayne@68
|
286 sleepers += 1
|
jpayne@68
|
287
|
jpayne@68
|
288 if sleepers:
|
jpayne@68
|
289 for i in range(sleepers):
|
jpayne@68
|
290 self._woken_count.acquire() # wait for a sleeper to wake
|
jpayne@68
|
291
|
jpayne@68
|
292 # rezero wait_semaphore in case some timeouts just happened
|
jpayne@68
|
293 while self._wait_semaphore.acquire(False):
|
jpayne@68
|
294 pass
|
jpayne@68
|
295
|
jpayne@68
|
296 def notify_all(self):
|
jpayne@68
|
297 self.notify(n=sys.maxsize)
|
jpayne@68
|
298
|
jpayne@68
|
299 def wait_for(self, predicate, timeout=None):
|
jpayne@68
|
300 result = predicate()
|
jpayne@68
|
301 if result:
|
jpayne@68
|
302 return result
|
jpayne@68
|
303 if timeout is not None:
|
jpayne@68
|
304 endtime = time.monotonic() + timeout
|
jpayne@68
|
305 else:
|
jpayne@68
|
306 endtime = None
|
jpayne@68
|
307 waittime = None
|
jpayne@68
|
308 while not result:
|
jpayne@68
|
309 if endtime is not None:
|
jpayne@68
|
310 waittime = endtime - time.monotonic()
|
jpayne@68
|
311 if waittime <= 0:
|
jpayne@68
|
312 break
|
jpayne@68
|
313 self.wait(waittime)
|
jpayne@68
|
314 result = predicate()
|
jpayne@68
|
315 return result
|
jpayne@68
|
316
|
jpayne@68
|
317 #
|
jpayne@68
|
318 # Event
|
jpayne@68
|
319 #
|
jpayne@68
|
320
|
jpayne@68
|
321 class Event(object):
|
jpayne@68
|
322
|
jpayne@68
|
323 def __init__(self, *, ctx):
|
jpayne@68
|
324 self._cond = ctx.Condition(ctx.Lock())
|
jpayne@68
|
325 self._flag = ctx.Semaphore(0)
|
jpayne@68
|
326
|
jpayne@68
|
327 def is_set(self):
|
jpayne@68
|
328 with self._cond:
|
jpayne@68
|
329 if self._flag.acquire(False):
|
jpayne@68
|
330 self._flag.release()
|
jpayne@68
|
331 return True
|
jpayne@68
|
332 return False
|
jpayne@68
|
333
|
jpayne@68
|
334 def set(self):
|
jpayne@68
|
335 with self._cond:
|
jpayne@68
|
336 self._flag.acquire(False)
|
jpayne@68
|
337 self._flag.release()
|
jpayne@68
|
338 self._cond.notify_all()
|
jpayne@68
|
339
|
jpayne@68
|
340 def clear(self):
|
jpayne@68
|
341 with self._cond:
|
jpayne@68
|
342 self._flag.acquire(False)
|
jpayne@68
|
343
|
jpayne@68
|
344 def wait(self, timeout=None):
|
jpayne@68
|
345 with self._cond:
|
jpayne@68
|
346 if self._flag.acquire(False):
|
jpayne@68
|
347 self._flag.release()
|
jpayne@68
|
348 else:
|
jpayne@68
|
349 self._cond.wait(timeout)
|
jpayne@68
|
350
|
jpayne@68
|
351 if self._flag.acquire(False):
|
jpayne@68
|
352 self._flag.release()
|
jpayne@68
|
353 return True
|
jpayne@68
|
354 return False
|
jpayne@68
|
355
|
jpayne@68
|
356 #
|
jpayne@68
|
357 # Barrier
|
jpayne@68
|
358 #
|
jpayne@68
|
359
|
jpayne@68
|
360 class Barrier(threading.Barrier):
|
jpayne@68
|
361
|
jpayne@68
|
362 def __init__(self, parties, action=None, timeout=None, *, ctx):
|
jpayne@68
|
363 import struct
|
jpayne@68
|
364 from .heap import BufferWrapper
|
jpayne@68
|
365 wrapper = BufferWrapper(struct.calcsize('i') * 2)
|
jpayne@68
|
366 cond = ctx.Condition()
|
jpayne@68
|
367 self.__setstate__((parties, action, timeout, cond, wrapper))
|
jpayne@68
|
368 self._state = 0
|
jpayne@68
|
369 self._count = 0
|
jpayne@68
|
370
|
jpayne@68
|
371 def __setstate__(self, state):
|
jpayne@68
|
372 (self._parties, self._action, self._timeout,
|
jpayne@68
|
373 self._cond, self._wrapper) = state
|
jpayne@68
|
374 self._array = self._wrapper.create_memoryview().cast('i')
|
jpayne@68
|
375
|
jpayne@68
|
376 def __getstate__(self):
|
jpayne@68
|
377 return (self._parties, self._action, self._timeout,
|
jpayne@68
|
378 self._cond, self._wrapper)
|
jpayne@68
|
379
|
jpayne@68
|
380 @property
|
jpayne@68
|
381 def _state(self):
|
jpayne@68
|
382 return self._array[0]
|
jpayne@68
|
383
|
jpayne@68
|
384 @_state.setter
|
jpayne@68
|
385 def _state(self, value):
|
jpayne@68
|
386 self._array[0] = value
|
jpayne@68
|
387
|
jpayne@68
|
388 @property
|
jpayne@68
|
389 def _count(self):
|
jpayne@68
|
390 return self._array[1]
|
jpayne@68
|
391
|
jpayne@68
|
392 @_count.setter
|
jpayne@68
|
393 def _count(self, value):
|
jpayne@68
|
394 self._array[1] = value
|