annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/queues.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 #
jpayne@69 2 # Module implementing queues
jpayne@69 3 #
jpayne@69 4 # multiprocessing/queues.py
jpayne@69 5 #
jpayne@69 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@69 7 # Licensed to PSF under a Contributor Agreement.
jpayne@69 8 #
jpayne@69 9
jpayne@69 10 __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
jpayne@69 11
jpayne@69 12 import sys
jpayne@69 13 import os
jpayne@69 14 import threading
jpayne@69 15 import collections
jpayne@69 16 import time
jpayne@69 17 import weakref
jpayne@69 18 import errno
jpayne@69 19
jpayne@69 20 from queue import Empty, Full
jpayne@69 21
jpayne@69 22 import _multiprocessing
jpayne@69 23
jpayne@69 24 from . import connection
jpayne@69 25 from . import context
jpayne@69 26 _ForkingPickler = context.reduction.ForkingPickler
jpayne@69 27
jpayne@69 28 from .util import debug, info, Finalize, register_after_fork, is_exiting
jpayne@69 29
jpayne@69 30 #
jpayne@69 31 # Queue type using a pipe, buffer and thread
jpayne@69 32 #
jpayne@69 33
jpayne@69 34 class Queue(object):
jpayne@69 35
jpayne@69 36 def __init__(self, maxsize=0, *, ctx):
jpayne@69 37 if maxsize <= 0:
jpayne@69 38 # Can raise ImportError (see issues #3770 and #23400)
jpayne@69 39 from .synchronize import SEM_VALUE_MAX as maxsize
jpayne@69 40 self._maxsize = maxsize
jpayne@69 41 self._reader, self._writer = connection.Pipe(duplex=False)
jpayne@69 42 self._rlock = ctx.Lock()
jpayne@69 43 self._opid = os.getpid()
jpayne@69 44 if sys.platform == 'win32':
jpayne@69 45 self._wlock = None
jpayne@69 46 else:
jpayne@69 47 self._wlock = ctx.Lock()
jpayne@69 48 self._sem = ctx.BoundedSemaphore(maxsize)
jpayne@69 49 # For use by concurrent.futures
jpayne@69 50 self._ignore_epipe = False
jpayne@69 51
jpayne@69 52 self._after_fork()
jpayne@69 53
jpayne@69 54 if sys.platform != 'win32':
jpayne@69 55 register_after_fork(self, Queue._after_fork)
jpayne@69 56
jpayne@69 57 def __getstate__(self):
jpayne@69 58 context.assert_spawning(self)
jpayne@69 59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
jpayne@69 60 self._rlock, self._wlock, self._sem, self._opid)
jpayne@69 61
jpayne@69 62 def __setstate__(self, state):
jpayne@69 63 (self._ignore_epipe, self._maxsize, self._reader, self._writer,
jpayne@69 64 self._rlock, self._wlock, self._sem, self._opid) = state
jpayne@69 65 self._after_fork()
jpayne@69 66
jpayne@69 67 def _after_fork(self):
jpayne@69 68 debug('Queue._after_fork()')
jpayne@69 69 self._notempty = threading.Condition(threading.Lock())
jpayne@69 70 self._buffer = collections.deque()
jpayne@69 71 self._thread = None
jpayne@69 72 self._jointhread = None
jpayne@69 73 self._joincancelled = False
jpayne@69 74 self._closed = False
jpayne@69 75 self._close = None
jpayne@69 76 self._send_bytes = self._writer.send_bytes
jpayne@69 77 self._recv_bytes = self._reader.recv_bytes
jpayne@69 78 self._poll = self._reader.poll
jpayne@69 79
jpayne@69 80 def put(self, obj, block=True, timeout=None):
jpayne@69 81 if self._closed:
jpayne@69 82 raise ValueError(f"Queue {self!r} is closed")
jpayne@69 83 if not self._sem.acquire(block, timeout):
jpayne@69 84 raise Full
jpayne@69 85
jpayne@69 86 with self._notempty:
jpayne@69 87 if self._thread is None:
jpayne@69 88 self._start_thread()
jpayne@69 89 self._buffer.append(obj)
jpayne@69 90 self._notempty.notify()
jpayne@69 91
jpayne@69 92 def get(self, block=True, timeout=None):
jpayne@69 93 if self._closed:
jpayne@69 94 raise ValueError(f"Queue {self!r} is closed")
jpayne@69 95 if block and timeout is None:
jpayne@69 96 with self._rlock:
jpayne@69 97 res = self._recv_bytes()
jpayne@69 98 self._sem.release()
jpayne@69 99 else:
jpayne@69 100 if block:
jpayne@69 101 deadline = time.monotonic() + timeout
jpayne@69 102 if not self._rlock.acquire(block, timeout):
jpayne@69 103 raise Empty
jpayne@69 104 try:
jpayne@69 105 if block:
jpayne@69 106 timeout = deadline - time.monotonic()
jpayne@69 107 if not self._poll(timeout):
jpayne@69 108 raise Empty
jpayne@69 109 elif not self._poll():
jpayne@69 110 raise Empty
jpayne@69 111 res = self._recv_bytes()
jpayne@69 112 self._sem.release()
jpayne@69 113 finally:
jpayne@69 114 self._rlock.release()
jpayne@69 115 # unserialize the data after having released the lock
jpayne@69 116 return _ForkingPickler.loads(res)
jpayne@69 117
jpayne@69 118 def qsize(self):
jpayne@69 119 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
jpayne@69 120 return self._maxsize - self._sem._semlock._get_value()
jpayne@69 121
jpayne@69 122 def empty(self):
jpayne@69 123 return not self._poll()
jpayne@69 124
jpayne@69 125 def full(self):
jpayne@69 126 return self._sem._semlock._is_zero()
jpayne@69 127
jpayne@69 128 def get_nowait(self):
jpayne@69 129 return self.get(False)
jpayne@69 130
jpayne@69 131 def put_nowait(self, obj):
jpayne@69 132 return self.put(obj, False)
jpayne@69 133
jpayne@69 134 def close(self):
jpayne@69 135 self._closed = True
jpayne@69 136 try:
jpayne@69 137 self._reader.close()
jpayne@69 138 finally:
jpayne@69 139 close = self._close
jpayne@69 140 if close:
jpayne@69 141 self._close = None
jpayne@69 142 close()
jpayne@69 143
jpayne@69 144 def join_thread(self):
jpayne@69 145 debug('Queue.join_thread()')
jpayne@69 146 assert self._closed, "Queue {0!r} not closed".format(self)
jpayne@69 147 if self._jointhread:
jpayne@69 148 self._jointhread()
jpayne@69 149
jpayne@69 150 def cancel_join_thread(self):
jpayne@69 151 debug('Queue.cancel_join_thread()')
jpayne@69 152 self._joincancelled = True
jpayne@69 153 try:
jpayne@69 154 self._jointhread.cancel()
jpayne@69 155 except AttributeError:
jpayne@69 156 pass
jpayne@69 157
jpayne@69 158 def _start_thread(self):
jpayne@69 159 debug('Queue._start_thread()')
jpayne@69 160
jpayne@69 161 # Start thread which transfers data from buffer to pipe
jpayne@69 162 self._buffer.clear()
jpayne@69 163 self._thread = threading.Thread(
jpayne@69 164 target=Queue._feed,
jpayne@69 165 args=(self._buffer, self._notempty, self._send_bytes,
jpayne@69 166 self._wlock, self._writer.close, self._ignore_epipe,
jpayne@69 167 self._on_queue_feeder_error, self._sem),
jpayne@69 168 name='QueueFeederThread'
jpayne@69 169 )
jpayne@69 170 self._thread.daemon = True
jpayne@69 171
jpayne@69 172 debug('doing self._thread.start()')
jpayne@69 173 self._thread.start()
jpayne@69 174 debug('... done self._thread.start()')
jpayne@69 175
jpayne@69 176 if not self._joincancelled:
jpayne@69 177 self._jointhread = Finalize(
jpayne@69 178 self._thread, Queue._finalize_join,
jpayne@69 179 [weakref.ref(self._thread)],
jpayne@69 180 exitpriority=-5
jpayne@69 181 )
jpayne@69 182
jpayne@69 183 # Send sentinel to the thread queue object when garbage collected
jpayne@69 184 self._close = Finalize(
jpayne@69 185 self, Queue._finalize_close,
jpayne@69 186 [self._buffer, self._notempty],
jpayne@69 187 exitpriority=10
jpayne@69 188 )
jpayne@69 189
jpayne@69 190 @staticmethod
jpayne@69 191 def _finalize_join(twr):
jpayne@69 192 debug('joining queue thread')
jpayne@69 193 thread = twr()
jpayne@69 194 if thread is not None:
jpayne@69 195 thread.join()
jpayne@69 196 debug('... queue thread joined')
jpayne@69 197 else:
jpayne@69 198 debug('... queue thread already dead')
jpayne@69 199
jpayne@69 200 @staticmethod
jpayne@69 201 def _finalize_close(buffer, notempty):
jpayne@69 202 debug('telling queue thread to quit')
jpayne@69 203 with notempty:
jpayne@69 204 buffer.append(_sentinel)
jpayne@69 205 notempty.notify()
jpayne@69 206
jpayne@69 207 @staticmethod
jpayne@69 208 def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
jpayne@69 209 onerror, queue_sem):
jpayne@69 210 debug('starting thread to feed data to pipe')
jpayne@69 211 nacquire = notempty.acquire
jpayne@69 212 nrelease = notempty.release
jpayne@69 213 nwait = notempty.wait
jpayne@69 214 bpopleft = buffer.popleft
jpayne@69 215 sentinel = _sentinel
jpayne@69 216 if sys.platform != 'win32':
jpayne@69 217 wacquire = writelock.acquire
jpayne@69 218 wrelease = writelock.release
jpayne@69 219 else:
jpayne@69 220 wacquire = None
jpayne@69 221
jpayne@69 222 while 1:
jpayne@69 223 try:
jpayne@69 224 nacquire()
jpayne@69 225 try:
jpayne@69 226 if not buffer:
jpayne@69 227 nwait()
jpayne@69 228 finally:
jpayne@69 229 nrelease()
jpayne@69 230 try:
jpayne@69 231 while 1:
jpayne@69 232 obj = bpopleft()
jpayne@69 233 if obj is sentinel:
jpayne@69 234 debug('feeder thread got sentinel -- exiting')
jpayne@69 235 close()
jpayne@69 236 return
jpayne@69 237
jpayne@69 238 # serialize the data before acquiring the lock
jpayne@69 239 obj = _ForkingPickler.dumps(obj)
jpayne@69 240 if wacquire is None:
jpayne@69 241 send_bytes(obj)
jpayne@69 242 else:
jpayne@69 243 wacquire()
jpayne@69 244 try:
jpayne@69 245 send_bytes(obj)
jpayne@69 246 finally:
jpayne@69 247 wrelease()
jpayne@69 248 except IndexError:
jpayne@69 249 pass
jpayne@69 250 except Exception as e:
jpayne@69 251 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
jpayne@69 252 return
jpayne@69 253 # Since this runs in a daemon thread the resources it uses
jpayne@69 254 # may be become unusable while the process is cleaning up.
jpayne@69 255 # We ignore errors which happen after the process has
jpayne@69 256 # started to cleanup.
jpayne@69 257 if is_exiting():
jpayne@69 258 info('error in queue thread: %s', e)
jpayne@69 259 return
jpayne@69 260 else:
jpayne@69 261 # Since the object has not been sent in the queue, we need
jpayne@69 262 # to decrease the size of the queue. The error acts as
jpayne@69 263 # if the object had been silently removed from the queue
jpayne@69 264 # and this step is necessary to have a properly working
jpayne@69 265 # queue.
jpayne@69 266 queue_sem.release()
jpayne@69 267 onerror(e, obj)
jpayne@69 268
jpayne@69 269 @staticmethod
jpayne@69 270 def _on_queue_feeder_error(e, obj):
jpayne@69 271 """
jpayne@69 272 Private API hook called when feeding data in the background thread
jpayne@69 273 raises an exception. For overriding by concurrent.futures.
jpayne@69 274 """
jpayne@69 275 import traceback
jpayne@69 276 traceback.print_exc()
jpayne@69 277
jpayne@69 278
jpayne@69 279 _sentinel = object()
jpayne@69 280
jpayne@69 281 #
jpayne@69 282 # A queue type which also supports join() and task_done() methods
jpayne@69 283 #
jpayne@69 284 # Note that if you do not call task_done() for each finished task then
jpayne@69 285 # eventually the counter's semaphore may overflow causing Bad Things
jpayne@69 286 # to happen.
jpayne@69 287 #
jpayne@69 288
jpayne@69 289 class JoinableQueue(Queue):
jpayne@69 290
jpayne@69 291 def __init__(self, maxsize=0, *, ctx):
jpayne@69 292 Queue.__init__(self, maxsize, ctx=ctx)
jpayne@69 293 self._unfinished_tasks = ctx.Semaphore(0)
jpayne@69 294 self._cond = ctx.Condition()
jpayne@69 295
jpayne@69 296 def __getstate__(self):
jpayne@69 297 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
jpayne@69 298
jpayne@69 299 def __setstate__(self, state):
jpayne@69 300 Queue.__setstate__(self, state[:-2])
jpayne@69 301 self._cond, self._unfinished_tasks = state[-2:]
jpayne@69 302
jpayne@69 303 def put(self, obj, block=True, timeout=None):
jpayne@69 304 if self._closed:
jpayne@69 305 raise ValueError(f"Queue {self!r} is closed")
jpayne@69 306 if not self._sem.acquire(block, timeout):
jpayne@69 307 raise Full
jpayne@69 308
jpayne@69 309 with self._notempty, self._cond:
jpayne@69 310 if self._thread is None:
jpayne@69 311 self._start_thread()
jpayne@69 312 self._buffer.append(obj)
jpayne@69 313 self._unfinished_tasks.release()
jpayne@69 314 self._notempty.notify()
jpayne@69 315
jpayne@69 316 def task_done(self):
jpayne@69 317 with self._cond:
jpayne@69 318 if not self._unfinished_tasks.acquire(False):
jpayne@69 319 raise ValueError('task_done() called too many times')
jpayne@69 320 if self._unfinished_tasks._semlock._is_zero():
jpayne@69 321 self._cond.notify_all()
jpayne@69 322
jpayne@69 323 def join(self):
jpayne@69 324 with self._cond:
jpayne@69 325 if not self._unfinished_tasks._semlock._is_zero():
jpayne@69 326 self._cond.wait()
jpayne@69 327
jpayne@69 328 #
jpayne@69 329 # Simplified Queue type -- really just a locked pipe
jpayne@69 330 #
jpayne@69 331
jpayne@69 332 class SimpleQueue(object):
jpayne@69 333
jpayne@69 334 def __init__(self, *, ctx):
jpayne@69 335 self._reader, self._writer = connection.Pipe(duplex=False)
jpayne@69 336 self._rlock = ctx.Lock()
jpayne@69 337 self._poll = self._reader.poll
jpayne@69 338 if sys.platform == 'win32':
jpayne@69 339 self._wlock = None
jpayne@69 340 else:
jpayne@69 341 self._wlock = ctx.Lock()
jpayne@69 342
jpayne@69 343 def empty(self):
jpayne@69 344 return not self._poll()
jpayne@69 345
jpayne@69 346 def __getstate__(self):
jpayne@69 347 context.assert_spawning(self)
jpayne@69 348 return (self._reader, self._writer, self._rlock, self._wlock)
jpayne@69 349
jpayne@69 350 def __setstate__(self, state):
jpayne@69 351 (self._reader, self._writer, self._rlock, self._wlock) = state
jpayne@69 352 self._poll = self._reader.poll
jpayne@69 353
jpayne@69 354 def get(self):
jpayne@69 355 with self._rlock:
jpayne@69 356 res = self._reader.recv_bytes()
jpayne@69 357 # unserialize the data after having released the lock
jpayne@69 358 return _ForkingPickler.loads(res)
jpayne@69 359
jpayne@69 360 def put(self, obj):
jpayne@69 361 # serialize the data before acquiring the lock
jpayne@69 362 obj = _ForkingPickler.dumps(obj)
jpayne@69 363 if self._wlock is None:
jpayne@69 364 # writes to a message oriented win32 pipe are atomic
jpayne@69 365 self._writer.send_bytes(obj)
jpayne@69 366 else:
jpayne@69 367 with self._wlock:
jpayne@69 368 self._writer.send_bytes(obj)