Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/synchronize.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 # | |
2 # Module implementing synchronization primitives | |
3 # | |
4 # multiprocessing/synchronize.py | |
5 # | |
6 # Copyright (c) 2006-2008, R Oudkerk | |
7 # Licensed to PSF under a Contributor Agreement. | |
8 # | |
9 | |
10 __all__ = [ | |
11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' | |
12 ] | |
13 | |
14 import threading | |
15 import sys | |
16 import tempfile | |
17 import _multiprocessing | |
18 import time | |
19 | |
20 from . import context | |
21 from . import process | |
22 from . import util | |
23 | |
24 # Try to import the mp.synchronize module cleanly, if it fails | |
25 # raise ImportError for platforms lacking a working sem_open implementation. | |
26 # See issue 3770 | |
27 try: | |
28 from _multiprocessing import SemLock, sem_unlink | |
29 except (ImportError): | |
30 raise ImportError("This platform lacks a functioning sem_open" + | |
31 " implementation, therefore, the required" + | |
32 " synchronization primitives needed will not" + | |
33 " function, see issue 3770.") | |
34 | |
35 # | |
36 # Constants | |
37 # | |
38 | |
39 RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) | |
40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX | |
41 | |
42 # | |
43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` | |
44 # | |
45 | |
46 class SemLock(object): | |
47 | |
48 _rand = tempfile._RandomNameSequence() | |
49 | |
50 def __init__(self, kind, value, maxvalue, *, ctx): | |
51 if ctx is None: | |
52 ctx = context._default_context.get_context() | |
53 name = ctx.get_start_method() | |
54 unlink_now = sys.platform == 'win32' or name == 'fork' | |
55 for i in range(100): | |
56 try: | |
57 sl = self._semlock = _multiprocessing.SemLock( | |
58 kind, value, maxvalue, self._make_name(), | |
59 unlink_now) | |
60 except FileExistsError: | |
61 pass | |
62 else: | |
63 break | |
64 else: | |
65 raise FileExistsError('cannot find name for semaphore') | |
66 | |
67 util.debug('created semlock with handle %s' % sl.handle) | |
68 self._make_methods() | |
69 | |
70 if sys.platform != 'win32': | |
71 def _after_fork(obj): | |
72 obj._semlock._after_fork() | |
73 util.register_after_fork(self, _after_fork) | |
74 | |
75 if self._semlock.name is not None: | |
76 # We only get here if we are on Unix with forking | |
77 # disabled. When the object is garbage collected or the | |
78 # process shuts down we unlink the semaphore name | |
79 from .resource_tracker import register | |
80 register(self._semlock.name, "semaphore") | |
81 util.Finalize(self, SemLock._cleanup, (self._semlock.name,), | |
82 exitpriority=0) | |
83 | |
84 @staticmethod | |
85 def _cleanup(name): | |
86 from .resource_tracker import unregister | |
87 sem_unlink(name) | |
88 unregister(name, "semaphore") | |
89 | |
90 def _make_methods(self): | |
91 self.acquire = self._semlock.acquire | |
92 self.release = self._semlock.release | |
93 | |
94 def __enter__(self): | |
95 return self._semlock.__enter__() | |
96 | |
97 def __exit__(self, *args): | |
98 return self._semlock.__exit__(*args) | |
99 | |
100 def __getstate__(self): | |
101 context.assert_spawning(self) | |
102 sl = self._semlock | |
103 if sys.platform == 'win32': | |
104 h = context.get_spawning_popen().duplicate_for_child(sl.handle) | |
105 else: | |
106 h = sl.handle | |
107 return (h, sl.kind, sl.maxvalue, sl.name) | |
108 | |
109 def __setstate__(self, state): | |
110 self._semlock = _multiprocessing.SemLock._rebuild(*state) | |
111 util.debug('recreated blocker with handle %r' % state[0]) | |
112 self._make_methods() | |
113 | |
114 @staticmethod | |
115 def _make_name(): | |
116 return '%s-%s' % (process.current_process()._config['semprefix'], | |
117 next(SemLock._rand)) | |
118 | |
119 # | |
120 # Semaphore | |
121 # | |
122 | |
123 class Semaphore(SemLock): | |
124 | |
125 def __init__(self, value=1, *, ctx): | |
126 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) | |
127 | |
128 def get_value(self): | |
129 return self._semlock._get_value() | |
130 | |
131 def __repr__(self): | |
132 try: | |
133 value = self._semlock._get_value() | |
134 except Exception: | |
135 value = 'unknown' | |
136 return '<%s(value=%s)>' % (self.__class__.__name__, value) | |
137 | |
138 # | |
139 # Bounded semaphore | |
140 # | |
141 | |
142 class BoundedSemaphore(Semaphore): | |
143 | |
144 def __init__(self, value=1, *, ctx): | |
145 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) | |
146 | |
147 def __repr__(self): | |
148 try: | |
149 value = self._semlock._get_value() | |
150 except Exception: | |
151 value = 'unknown' | |
152 return '<%s(value=%s, maxvalue=%s)>' % \ | |
153 (self.__class__.__name__, value, self._semlock.maxvalue) | |
154 | |
155 # | |
156 # Non-recursive lock | |
157 # | |
158 | |
159 class Lock(SemLock): | |
160 | |
161 def __init__(self, *, ctx): | |
162 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) | |
163 | |
164 def __repr__(self): | |
165 try: | |
166 if self._semlock._is_mine(): | |
167 name = process.current_process().name | |
168 if threading.current_thread().name != 'MainThread': | |
169 name += '|' + threading.current_thread().name | |
170 elif self._semlock._get_value() == 1: | |
171 name = 'None' | |
172 elif self._semlock._count() > 0: | |
173 name = 'SomeOtherThread' | |
174 else: | |
175 name = 'SomeOtherProcess' | |
176 except Exception: | |
177 name = 'unknown' | |
178 return '<%s(owner=%s)>' % (self.__class__.__name__, name) | |
179 | |
180 # | |
181 # Recursive lock | |
182 # | |
183 | |
184 class RLock(SemLock): | |
185 | |
186 def __init__(self, *, ctx): | |
187 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) | |
188 | |
189 def __repr__(self): | |
190 try: | |
191 if self._semlock._is_mine(): | |
192 name = process.current_process().name | |
193 if threading.current_thread().name != 'MainThread': | |
194 name += '|' + threading.current_thread().name | |
195 count = self._semlock._count() | |
196 elif self._semlock._get_value() == 1: | |
197 name, count = 'None', 0 | |
198 elif self._semlock._count() > 0: | |
199 name, count = 'SomeOtherThread', 'nonzero' | |
200 else: | |
201 name, count = 'SomeOtherProcess', 'nonzero' | |
202 except Exception: | |
203 name, count = 'unknown', 'unknown' | |
204 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) | |
205 | |
206 # | |
207 # Condition variable | |
208 # | |
209 | |
210 class Condition(object): | |
211 | |
212 def __init__(self, lock=None, *, ctx): | |
213 self._lock = lock or ctx.RLock() | |
214 self._sleeping_count = ctx.Semaphore(0) | |
215 self._woken_count = ctx.Semaphore(0) | |
216 self._wait_semaphore = ctx.Semaphore(0) | |
217 self._make_methods() | |
218 | |
219 def __getstate__(self): | |
220 context.assert_spawning(self) | |
221 return (self._lock, self._sleeping_count, | |
222 self._woken_count, self._wait_semaphore) | |
223 | |
224 def __setstate__(self, state): | |
225 (self._lock, self._sleeping_count, | |
226 self._woken_count, self._wait_semaphore) = state | |
227 self._make_methods() | |
228 | |
229 def __enter__(self): | |
230 return self._lock.__enter__() | |
231 | |
232 def __exit__(self, *args): | |
233 return self._lock.__exit__(*args) | |
234 | |
235 def _make_methods(self): | |
236 self.acquire = self._lock.acquire | |
237 self.release = self._lock.release | |
238 | |
239 def __repr__(self): | |
240 try: | |
241 num_waiters = (self._sleeping_count._semlock._get_value() - | |
242 self._woken_count._semlock._get_value()) | |
243 except Exception: | |
244 num_waiters = 'unknown' | |
245 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters) | |
246 | |
247 def wait(self, timeout=None): | |
248 assert self._lock._semlock._is_mine(), \ | |
249 'must acquire() condition before using wait()' | |
250 | |
251 # indicate that this thread is going to sleep | |
252 self._sleeping_count.release() | |
253 | |
254 # release lock | |
255 count = self._lock._semlock._count() | |
256 for i in range(count): | |
257 self._lock.release() | |
258 | |
259 try: | |
260 # wait for notification or timeout | |
261 return self._wait_semaphore.acquire(True, timeout) | |
262 finally: | |
263 # indicate that this thread has woken | |
264 self._woken_count.release() | |
265 | |
266 # reacquire lock | |
267 for i in range(count): | |
268 self._lock.acquire() | |
269 | |
270 def notify(self, n=1): | |
271 assert self._lock._semlock._is_mine(), 'lock is not owned' | |
272 assert not self._wait_semaphore.acquire( | |
273 False), ('notify: Should not have been able to acquire' | |
274 + '_wait_semaphore') | |
275 | |
276 # to take account of timeouts since last notify*() we subtract | |
277 # woken_count from sleeping_count and rezero woken_count | |
278 while self._woken_count.acquire(False): | |
279 res = self._sleeping_count.acquire(False) | |
280 assert res, ('notify: Bug in sleeping_count.acquire' | |
281 + '- res should not be False') | |
282 | |
283 sleepers = 0 | |
284 while sleepers < n and self._sleeping_count.acquire(False): | |
285 self._wait_semaphore.release() # wake up one sleeper | |
286 sleepers += 1 | |
287 | |
288 if sleepers: | |
289 for i in range(sleepers): | |
290 self._woken_count.acquire() # wait for a sleeper to wake | |
291 | |
292 # rezero wait_semaphore in case some timeouts just happened | |
293 while self._wait_semaphore.acquire(False): | |
294 pass | |
295 | |
296 def notify_all(self): | |
297 self.notify(n=sys.maxsize) | |
298 | |
299 def wait_for(self, predicate, timeout=None): | |
300 result = predicate() | |
301 if result: | |
302 return result | |
303 if timeout is not None: | |
304 endtime = time.monotonic() + timeout | |
305 else: | |
306 endtime = None | |
307 waittime = None | |
308 while not result: | |
309 if endtime is not None: | |
310 waittime = endtime - time.monotonic() | |
311 if waittime <= 0: | |
312 break | |
313 self.wait(waittime) | |
314 result = predicate() | |
315 return result | |
316 | |
317 # | |
318 # Event | |
319 # | |
320 | |
321 class Event(object): | |
322 | |
323 def __init__(self, *, ctx): | |
324 self._cond = ctx.Condition(ctx.Lock()) | |
325 self._flag = ctx.Semaphore(0) | |
326 | |
327 def is_set(self): | |
328 with self._cond: | |
329 if self._flag.acquire(False): | |
330 self._flag.release() | |
331 return True | |
332 return False | |
333 | |
334 def set(self): | |
335 with self._cond: | |
336 self._flag.acquire(False) | |
337 self._flag.release() | |
338 self._cond.notify_all() | |
339 | |
340 def clear(self): | |
341 with self._cond: | |
342 self._flag.acquire(False) | |
343 | |
344 def wait(self, timeout=None): | |
345 with self._cond: | |
346 if self._flag.acquire(False): | |
347 self._flag.release() | |
348 else: | |
349 self._cond.wait(timeout) | |
350 | |
351 if self._flag.acquire(False): | |
352 self._flag.release() | |
353 return True | |
354 return False | |
355 | |
356 # | |
357 # Barrier | |
358 # | |
359 | |
360 class Barrier(threading.Barrier): | |
361 | |
362 def __init__(self, parties, action=None, timeout=None, *, ctx): | |
363 import struct | |
364 from .heap import BufferWrapper | |
365 wrapper = BufferWrapper(struct.calcsize('i') * 2) | |
366 cond = ctx.Condition() | |
367 self.__setstate__((parties, action, timeout, cond, wrapper)) | |
368 self._state = 0 | |
369 self._count = 0 | |
370 | |
371 def __setstate__(self, state): | |
372 (self._parties, self._action, self._timeout, | |
373 self._cond, self._wrapper) = state | |
374 self._array = self._wrapper.create_memoryview().cast('i') | |
375 | |
376 def __getstate__(self): | |
377 return (self._parties, self._action, self._timeout, | |
378 self._cond, self._wrapper) | |
379 | |
380 @property | |
381 def _state(self): | |
382 return self._array[0] | |
383 | |
384 @_state.setter | |
385 def _state(self, value): | |
386 self._array[0] = value | |
387 | |
388 @property | |
389 def _count(self): | |
390 return self._array[1] | |
391 | |
392 @_count.setter | |
393 def _count(self, value): | |
394 self._array[1] = value |