comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/synchronize.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 #
2 # Module implementing synchronization primitives
3 #
4 # multiprocessing/synchronize.py
5 #
6 # Copyright (c) 2006-2008, R Oudkerk
7 # Licensed to PSF under a Contributor Agreement.
8 #
9
10 __all__ = [
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12 ]
13
14 import threading
15 import sys
16 import tempfile
17 import _multiprocessing
18 import time
19
20 from . import context
21 from . import process
22 from . import util
23
24 # Try to import the mp.synchronize module cleanly, if it fails
25 # raise ImportError for platforms lacking a working sem_open implementation.
26 # See issue 3770
27 try:
28 from _multiprocessing import SemLock, sem_unlink
29 except (ImportError):
30 raise ImportError("This platform lacks a functioning sem_open" +
31 " implementation, therefore, the required" +
32 " synchronization primitives needed will not" +
33 " function, see issue 3770.")
34
35 #
36 # Constants
37 #
38
39 RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
41
42 #
43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
44 #
45
46 class SemLock(object):
47
48 _rand = tempfile._RandomNameSequence()
49
50 def __init__(self, kind, value, maxvalue, *, ctx):
51 if ctx is None:
52 ctx = context._default_context.get_context()
53 name = ctx.get_start_method()
54 unlink_now = sys.platform == 'win32' or name == 'fork'
55 for i in range(100):
56 try:
57 sl = self._semlock = _multiprocessing.SemLock(
58 kind, value, maxvalue, self._make_name(),
59 unlink_now)
60 except FileExistsError:
61 pass
62 else:
63 break
64 else:
65 raise FileExistsError('cannot find name for semaphore')
66
67 util.debug('created semlock with handle %s' % sl.handle)
68 self._make_methods()
69
70 if sys.platform != 'win32':
71 def _after_fork(obj):
72 obj._semlock._after_fork()
73 util.register_after_fork(self, _after_fork)
74
75 if self._semlock.name is not None:
76 # We only get here if we are on Unix with forking
77 # disabled. When the object is garbage collected or the
78 # process shuts down we unlink the semaphore name
79 from .resource_tracker import register
80 register(self._semlock.name, "semaphore")
81 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
82 exitpriority=0)
83
84 @staticmethod
85 def _cleanup(name):
86 from .resource_tracker import unregister
87 sem_unlink(name)
88 unregister(name, "semaphore")
89
90 def _make_methods(self):
91 self.acquire = self._semlock.acquire
92 self.release = self._semlock.release
93
94 def __enter__(self):
95 return self._semlock.__enter__()
96
97 def __exit__(self, *args):
98 return self._semlock.__exit__(*args)
99
100 def __getstate__(self):
101 context.assert_spawning(self)
102 sl = self._semlock
103 if sys.platform == 'win32':
104 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
105 else:
106 h = sl.handle
107 return (h, sl.kind, sl.maxvalue, sl.name)
108
109 def __setstate__(self, state):
110 self._semlock = _multiprocessing.SemLock._rebuild(*state)
111 util.debug('recreated blocker with handle %r' % state[0])
112 self._make_methods()
113
114 @staticmethod
115 def _make_name():
116 return '%s-%s' % (process.current_process()._config['semprefix'],
117 next(SemLock._rand))
118
119 #
120 # Semaphore
121 #
122
123 class Semaphore(SemLock):
124
125 def __init__(self, value=1, *, ctx):
126 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
127
128 def get_value(self):
129 return self._semlock._get_value()
130
131 def __repr__(self):
132 try:
133 value = self._semlock._get_value()
134 except Exception:
135 value = 'unknown'
136 return '<%s(value=%s)>' % (self.__class__.__name__, value)
137
138 #
139 # Bounded semaphore
140 #
141
142 class BoundedSemaphore(Semaphore):
143
144 def __init__(self, value=1, *, ctx):
145 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
146
147 def __repr__(self):
148 try:
149 value = self._semlock._get_value()
150 except Exception:
151 value = 'unknown'
152 return '<%s(value=%s, maxvalue=%s)>' % \
153 (self.__class__.__name__, value, self._semlock.maxvalue)
154
155 #
156 # Non-recursive lock
157 #
158
159 class Lock(SemLock):
160
161 def __init__(self, *, ctx):
162 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
163
164 def __repr__(self):
165 try:
166 if self._semlock._is_mine():
167 name = process.current_process().name
168 if threading.current_thread().name != 'MainThread':
169 name += '|' + threading.current_thread().name
170 elif self._semlock._get_value() == 1:
171 name = 'None'
172 elif self._semlock._count() > 0:
173 name = 'SomeOtherThread'
174 else:
175 name = 'SomeOtherProcess'
176 except Exception:
177 name = 'unknown'
178 return '<%s(owner=%s)>' % (self.__class__.__name__, name)
179
180 #
181 # Recursive lock
182 #
183
184 class RLock(SemLock):
185
186 def __init__(self, *, ctx):
187 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
188
189 def __repr__(self):
190 try:
191 if self._semlock._is_mine():
192 name = process.current_process().name
193 if threading.current_thread().name != 'MainThread':
194 name += '|' + threading.current_thread().name
195 count = self._semlock._count()
196 elif self._semlock._get_value() == 1:
197 name, count = 'None', 0
198 elif self._semlock._count() > 0:
199 name, count = 'SomeOtherThread', 'nonzero'
200 else:
201 name, count = 'SomeOtherProcess', 'nonzero'
202 except Exception:
203 name, count = 'unknown', 'unknown'
204 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
205
206 #
207 # Condition variable
208 #
209
210 class Condition(object):
211
212 def __init__(self, lock=None, *, ctx):
213 self._lock = lock or ctx.RLock()
214 self._sleeping_count = ctx.Semaphore(0)
215 self._woken_count = ctx.Semaphore(0)
216 self._wait_semaphore = ctx.Semaphore(0)
217 self._make_methods()
218
219 def __getstate__(self):
220 context.assert_spawning(self)
221 return (self._lock, self._sleeping_count,
222 self._woken_count, self._wait_semaphore)
223
224 def __setstate__(self, state):
225 (self._lock, self._sleeping_count,
226 self._woken_count, self._wait_semaphore) = state
227 self._make_methods()
228
229 def __enter__(self):
230 return self._lock.__enter__()
231
232 def __exit__(self, *args):
233 return self._lock.__exit__(*args)
234
235 def _make_methods(self):
236 self.acquire = self._lock.acquire
237 self.release = self._lock.release
238
239 def __repr__(self):
240 try:
241 num_waiters = (self._sleeping_count._semlock._get_value() -
242 self._woken_count._semlock._get_value())
243 except Exception:
244 num_waiters = 'unknown'
245 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
246
247 def wait(self, timeout=None):
248 assert self._lock._semlock._is_mine(), \
249 'must acquire() condition before using wait()'
250
251 # indicate that this thread is going to sleep
252 self._sleeping_count.release()
253
254 # release lock
255 count = self._lock._semlock._count()
256 for i in range(count):
257 self._lock.release()
258
259 try:
260 # wait for notification or timeout
261 return self._wait_semaphore.acquire(True, timeout)
262 finally:
263 # indicate that this thread has woken
264 self._woken_count.release()
265
266 # reacquire lock
267 for i in range(count):
268 self._lock.acquire()
269
270 def notify(self, n=1):
271 assert self._lock._semlock._is_mine(), 'lock is not owned'
272 assert not self._wait_semaphore.acquire(
273 False), ('notify: Should not have been able to acquire'
274 + '_wait_semaphore')
275
276 # to take account of timeouts since last notify*() we subtract
277 # woken_count from sleeping_count and rezero woken_count
278 while self._woken_count.acquire(False):
279 res = self._sleeping_count.acquire(False)
280 assert res, ('notify: Bug in sleeping_count.acquire'
281 + '- res should not be False')
282
283 sleepers = 0
284 while sleepers < n and self._sleeping_count.acquire(False):
285 self._wait_semaphore.release() # wake up one sleeper
286 sleepers += 1
287
288 if sleepers:
289 for i in range(sleepers):
290 self._woken_count.acquire() # wait for a sleeper to wake
291
292 # rezero wait_semaphore in case some timeouts just happened
293 while self._wait_semaphore.acquire(False):
294 pass
295
296 def notify_all(self):
297 self.notify(n=sys.maxsize)
298
299 def wait_for(self, predicate, timeout=None):
300 result = predicate()
301 if result:
302 return result
303 if timeout is not None:
304 endtime = time.monotonic() + timeout
305 else:
306 endtime = None
307 waittime = None
308 while not result:
309 if endtime is not None:
310 waittime = endtime - time.monotonic()
311 if waittime <= 0:
312 break
313 self.wait(waittime)
314 result = predicate()
315 return result
316
317 #
318 # Event
319 #
320
321 class Event(object):
322
323 def __init__(self, *, ctx):
324 self._cond = ctx.Condition(ctx.Lock())
325 self._flag = ctx.Semaphore(0)
326
327 def is_set(self):
328 with self._cond:
329 if self._flag.acquire(False):
330 self._flag.release()
331 return True
332 return False
333
334 def set(self):
335 with self._cond:
336 self._flag.acquire(False)
337 self._flag.release()
338 self._cond.notify_all()
339
340 def clear(self):
341 with self._cond:
342 self._flag.acquire(False)
343
344 def wait(self, timeout=None):
345 with self._cond:
346 if self._flag.acquire(False):
347 self._flag.release()
348 else:
349 self._cond.wait(timeout)
350
351 if self._flag.acquire(False):
352 self._flag.release()
353 return True
354 return False
355
356 #
357 # Barrier
358 #
359
360 class Barrier(threading.Barrier):
361
362 def __init__(self, parties, action=None, timeout=None, *, ctx):
363 import struct
364 from .heap import BufferWrapper
365 wrapper = BufferWrapper(struct.calcsize('i') * 2)
366 cond = ctx.Condition()
367 self.__setstate__((parties, action, timeout, cond, wrapper))
368 self._state = 0
369 self._count = 0
370
371 def __setstate__(self, state):
372 (self._parties, self._action, self._timeout,
373 self._cond, self._wrapper) = state
374 self._array = self._wrapper.create_memoryview().cast('i')
375
376 def __getstate__(self):
377 return (self._parties, self._action, self._timeout,
378 self._cond, self._wrapper)
379
380 @property
381 def _state(self):
382 return self._array[0]
383
384 @_state.setter
385 def _state(self, value):
386 self._array[0] = value
387
388 @property
389 def _count(self):
390 return self._array[1]
391
392 @_count.setter
393 def _count(self, value):
394 self._array[1] = value