Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/queues.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 # | |
2 # Module implementing queues | |
3 # | |
4 # multiprocessing/queues.py | |
5 # | |
6 # Copyright (c) 2006-2008, R Oudkerk | |
7 # Licensed to PSF under a Contributor Agreement. | |
8 # | |
9 | |
10 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] | |
11 | |
12 import sys | |
13 import os | |
14 import threading | |
15 import collections | |
16 import time | |
17 import weakref | |
18 import errno | |
19 | |
20 from queue import Empty, Full | |
21 | |
22 import _multiprocessing | |
23 | |
24 from . import connection | |
25 from . import context | |
26 _ForkingPickler = context.reduction.ForkingPickler | |
27 | |
28 from .util import debug, info, Finalize, register_after_fork, is_exiting | |
29 | |
30 # | |
31 # Queue type using a pipe, buffer and thread | |
32 # | |
33 | |
34 class Queue(object): | |
35 | |
36 def __init__(self, maxsize=0, *, ctx): | |
37 if maxsize <= 0: | |
38 # Can raise ImportError (see issues #3770 and #23400) | |
39 from .synchronize import SEM_VALUE_MAX as maxsize | |
40 self._maxsize = maxsize | |
41 self._reader, self._writer = connection.Pipe(duplex=False) | |
42 self._rlock = ctx.Lock() | |
43 self._opid = os.getpid() | |
44 if sys.platform == 'win32': | |
45 self._wlock = None | |
46 else: | |
47 self._wlock = ctx.Lock() | |
48 self._sem = ctx.BoundedSemaphore(maxsize) | |
49 # For use by concurrent.futures | |
50 self._ignore_epipe = False | |
51 | |
52 self._after_fork() | |
53 | |
54 if sys.platform != 'win32': | |
55 register_after_fork(self, Queue._after_fork) | |
56 | |
57 def __getstate__(self): | |
58 context.assert_spawning(self) | |
59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, | |
60 self._rlock, self._wlock, self._sem, self._opid) | |
61 | |
62 def __setstate__(self, state): | |
63 (self._ignore_epipe, self._maxsize, self._reader, self._writer, | |
64 self._rlock, self._wlock, self._sem, self._opid) = state | |
65 self._after_fork() | |
66 | |
67 def _after_fork(self): | |
68 debug('Queue._after_fork()') | |
69 self._notempty = threading.Condition(threading.Lock()) | |
70 self._buffer = collections.deque() | |
71 self._thread = None | |
72 self._jointhread = None | |
73 self._joincancelled = False | |
74 self._closed = False | |
75 self._close = None | |
76 self._send_bytes = self._writer.send_bytes | |
77 self._recv_bytes = self._reader.recv_bytes | |
78 self._poll = self._reader.poll | |
79 | |
80 def put(self, obj, block=True, timeout=None): | |
81 if self._closed: | |
82 raise ValueError(f"Queue {self!r} is closed") | |
83 if not self._sem.acquire(block, timeout): | |
84 raise Full | |
85 | |
86 with self._notempty: | |
87 if self._thread is None: | |
88 self._start_thread() | |
89 self._buffer.append(obj) | |
90 self._notempty.notify() | |
91 | |
92 def get(self, block=True, timeout=None): | |
93 if self._closed: | |
94 raise ValueError(f"Queue {self!r} is closed") | |
95 if block and timeout is None: | |
96 with self._rlock: | |
97 res = self._recv_bytes() | |
98 self._sem.release() | |
99 else: | |
100 if block: | |
101 deadline = time.monotonic() + timeout | |
102 if not self._rlock.acquire(block, timeout): | |
103 raise Empty | |
104 try: | |
105 if block: | |
106 timeout = deadline - time.monotonic() | |
107 if not self._poll(timeout): | |
108 raise Empty | |
109 elif not self._poll(): | |
110 raise Empty | |
111 res = self._recv_bytes() | |
112 self._sem.release() | |
113 finally: | |
114 self._rlock.release() | |
115 # unserialize the data after having released the lock | |
116 return _ForkingPickler.loads(res) | |
117 | |
118 def qsize(self): | |
119 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() | |
120 return self._maxsize - self._sem._semlock._get_value() | |
121 | |
122 def empty(self): | |
123 return not self._poll() | |
124 | |
125 def full(self): | |
126 return self._sem._semlock._is_zero() | |
127 | |
128 def get_nowait(self): | |
129 return self.get(False) | |
130 | |
131 def put_nowait(self, obj): | |
132 return self.put(obj, False) | |
133 | |
134 def close(self): | |
135 self._closed = True | |
136 try: | |
137 self._reader.close() | |
138 finally: | |
139 close = self._close | |
140 if close: | |
141 self._close = None | |
142 close() | |
143 | |
144 def join_thread(self): | |
145 debug('Queue.join_thread()') | |
146 assert self._closed, "Queue {0!r} not closed".format(self) | |
147 if self._jointhread: | |
148 self._jointhread() | |
149 | |
150 def cancel_join_thread(self): | |
151 debug('Queue.cancel_join_thread()') | |
152 self._joincancelled = True | |
153 try: | |
154 self._jointhread.cancel() | |
155 except AttributeError: | |
156 pass | |
157 | |
158 def _start_thread(self): | |
159 debug('Queue._start_thread()') | |
160 | |
161 # Start thread which transfers data from buffer to pipe | |
162 self._buffer.clear() | |
163 self._thread = threading.Thread( | |
164 target=Queue._feed, | |
165 args=(self._buffer, self._notempty, self._send_bytes, | |
166 self._wlock, self._writer.close, self._ignore_epipe, | |
167 self._on_queue_feeder_error, self._sem), | |
168 name='QueueFeederThread' | |
169 ) | |
170 self._thread.daemon = True | |
171 | |
172 debug('doing self._thread.start()') | |
173 self._thread.start() | |
174 debug('... done self._thread.start()') | |
175 | |
176 if not self._joincancelled: | |
177 self._jointhread = Finalize( | |
178 self._thread, Queue._finalize_join, | |
179 [weakref.ref(self._thread)], | |
180 exitpriority=-5 | |
181 ) | |
182 | |
183 # Send sentinel to the thread queue object when garbage collected | |
184 self._close = Finalize( | |
185 self, Queue._finalize_close, | |
186 [self._buffer, self._notempty], | |
187 exitpriority=10 | |
188 ) | |
189 | |
190 @staticmethod | |
191 def _finalize_join(twr): | |
192 debug('joining queue thread') | |
193 thread = twr() | |
194 if thread is not None: | |
195 thread.join() | |
196 debug('... queue thread joined') | |
197 else: | |
198 debug('... queue thread already dead') | |
199 | |
200 @staticmethod | |
201 def _finalize_close(buffer, notempty): | |
202 debug('telling queue thread to quit') | |
203 with notempty: | |
204 buffer.append(_sentinel) | |
205 notempty.notify() | |
206 | |
207 @staticmethod | |
208 def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, | |
209 onerror, queue_sem): | |
210 debug('starting thread to feed data to pipe') | |
211 nacquire = notempty.acquire | |
212 nrelease = notempty.release | |
213 nwait = notempty.wait | |
214 bpopleft = buffer.popleft | |
215 sentinel = _sentinel | |
216 if sys.platform != 'win32': | |
217 wacquire = writelock.acquire | |
218 wrelease = writelock.release | |
219 else: | |
220 wacquire = None | |
221 | |
222 while 1: | |
223 try: | |
224 nacquire() | |
225 try: | |
226 if not buffer: | |
227 nwait() | |
228 finally: | |
229 nrelease() | |
230 try: | |
231 while 1: | |
232 obj = bpopleft() | |
233 if obj is sentinel: | |
234 debug('feeder thread got sentinel -- exiting') | |
235 close() | |
236 return | |
237 | |
238 # serialize the data before acquiring the lock | |
239 obj = _ForkingPickler.dumps(obj) | |
240 if wacquire is None: | |
241 send_bytes(obj) | |
242 else: | |
243 wacquire() | |
244 try: | |
245 send_bytes(obj) | |
246 finally: | |
247 wrelease() | |
248 except IndexError: | |
249 pass | |
250 except Exception as e: | |
251 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: | |
252 return | |
253 # Since this runs in a daemon thread the resources it uses | |
254 # may be become unusable while the process is cleaning up. | |
255 # We ignore errors which happen after the process has | |
256 # started to cleanup. | |
257 if is_exiting(): | |
258 info('error in queue thread: %s', e) | |
259 return | |
260 else: | |
261 # Since the object has not been sent in the queue, we need | |
262 # to decrease the size of the queue. The error acts as | |
263 # if the object had been silently removed from the queue | |
264 # and this step is necessary to have a properly working | |
265 # queue. | |
266 queue_sem.release() | |
267 onerror(e, obj) | |
268 | |
269 @staticmethod | |
270 def _on_queue_feeder_error(e, obj): | |
271 """ | |
272 Private API hook called when feeding data in the background thread | |
273 raises an exception. For overriding by concurrent.futures. | |
274 """ | |
275 import traceback | |
276 traceback.print_exc() | |
277 | |
278 | |
279 _sentinel = object() | |
280 | |
281 # | |
282 # A queue type which also supports join() and task_done() methods | |
283 # | |
284 # Note that if you do not call task_done() for each finished task then | |
285 # eventually the counter's semaphore may overflow causing Bad Things | |
286 # to happen. | |
287 # | |
288 | |
289 class JoinableQueue(Queue): | |
290 | |
291 def __init__(self, maxsize=0, *, ctx): | |
292 Queue.__init__(self, maxsize, ctx=ctx) | |
293 self._unfinished_tasks = ctx.Semaphore(0) | |
294 self._cond = ctx.Condition() | |
295 | |
296 def __getstate__(self): | |
297 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) | |
298 | |
299 def __setstate__(self, state): | |
300 Queue.__setstate__(self, state[:-2]) | |
301 self._cond, self._unfinished_tasks = state[-2:] | |
302 | |
303 def put(self, obj, block=True, timeout=None): | |
304 if self._closed: | |
305 raise ValueError(f"Queue {self!r} is closed") | |
306 if not self._sem.acquire(block, timeout): | |
307 raise Full | |
308 | |
309 with self._notempty, self._cond: | |
310 if self._thread is None: | |
311 self._start_thread() | |
312 self._buffer.append(obj) | |
313 self._unfinished_tasks.release() | |
314 self._notempty.notify() | |
315 | |
316 def task_done(self): | |
317 with self._cond: | |
318 if not self._unfinished_tasks.acquire(False): | |
319 raise ValueError('task_done() called too many times') | |
320 if self._unfinished_tasks._semlock._is_zero(): | |
321 self._cond.notify_all() | |
322 | |
323 def join(self): | |
324 with self._cond: | |
325 if not self._unfinished_tasks._semlock._is_zero(): | |
326 self._cond.wait() | |
327 | |
328 # | |
329 # Simplified Queue type -- really just a locked pipe | |
330 # | |
331 | |
332 class SimpleQueue(object): | |
333 | |
334 def __init__(self, *, ctx): | |
335 self._reader, self._writer = connection.Pipe(duplex=False) | |
336 self._rlock = ctx.Lock() | |
337 self._poll = self._reader.poll | |
338 if sys.platform == 'win32': | |
339 self._wlock = None | |
340 else: | |
341 self._wlock = ctx.Lock() | |
342 | |
343 def empty(self): | |
344 return not self._poll() | |
345 | |
346 def __getstate__(self): | |
347 context.assert_spawning(self) | |
348 return (self._reader, self._writer, self._rlock, self._wlock) | |
349 | |
350 def __setstate__(self, state): | |
351 (self._reader, self._writer, self._rlock, self._wlock) = state | |
352 self._poll = self._reader.poll | |
353 | |
354 def get(self): | |
355 with self._rlock: | |
356 res = self._reader.recv_bytes() | |
357 # unserialize the data after having released the lock | |
358 return _ForkingPickler.loads(res) | |
359 | |
360 def put(self, obj): | |
361 # serialize the data before acquiring the lock | |
362 obj = _ForkingPickler.dumps(obj) | |
363 if self._wlock is None: | |
364 # writes to a message oriented win32 pipe are atomic | |
365 self._writer.send_bytes(obj) | |
366 else: | |
367 with self._wlock: | |
368 self._writer.send_bytes(obj) |