jpayne@68: # jpayne@68: # Module implementing queues jpayne@68: # jpayne@68: # multiprocessing/queues.py jpayne@68: # jpayne@68: # Copyright (c) 2006-2008, R Oudkerk jpayne@68: # Licensed to PSF under a Contributor Agreement. jpayne@68: # jpayne@68: jpayne@68: __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] jpayne@68: jpayne@68: import sys jpayne@68: import os jpayne@68: import threading jpayne@68: import collections jpayne@68: import time jpayne@68: import weakref jpayne@68: import errno jpayne@68: jpayne@68: from queue import Empty, Full jpayne@68: jpayne@68: import _multiprocessing jpayne@68: jpayne@68: from . import connection jpayne@68: from . import context jpayne@68: _ForkingPickler = context.reduction.ForkingPickler jpayne@68: jpayne@68: from .util import debug, info, Finalize, register_after_fork, is_exiting jpayne@68: jpayne@68: # jpayne@68: # Queue type using a pipe, buffer and thread jpayne@68: # jpayne@68: jpayne@68: class Queue(object): jpayne@68: jpayne@68: def __init__(self, maxsize=0, *, ctx): jpayne@68: if maxsize <= 0: jpayne@68: # Can raise ImportError (see issues #3770 and #23400) jpayne@68: from .synchronize import SEM_VALUE_MAX as maxsize jpayne@68: self._maxsize = maxsize jpayne@68: self._reader, self._writer = connection.Pipe(duplex=False) jpayne@68: self._rlock = ctx.Lock() jpayne@68: self._opid = os.getpid() jpayne@68: if sys.platform == 'win32': jpayne@68: self._wlock = None jpayne@68: else: jpayne@68: self._wlock = ctx.Lock() jpayne@68: self._sem = ctx.BoundedSemaphore(maxsize) jpayne@68: # For use by concurrent.futures jpayne@68: self._ignore_epipe = False jpayne@68: jpayne@68: self._after_fork() jpayne@68: jpayne@68: if sys.platform != 'win32': jpayne@68: register_after_fork(self, Queue._after_fork) jpayne@68: jpayne@68: def __getstate__(self): jpayne@68: context.assert_spawning(self) jpayne@68: return (self._ignore_epipe, self._maxsize, self._reader, self._writer, jpayne@68: self._rlock, self._wlock, self._sem, self._opid) jpayne@68: jpayne@68: def __setstate__(self, state): jpayne@68: (self._ignore_epipe, self._maxsize, self._reader, self._writer, jpayne@68: self._rlock, self._wlock, self._sem, self._opid) = state jpayne@68: self._after_fork() jpayne@68: jpayne@68: def _after_fork(self): jpayne@68: debug('Queue._after_fork()') jpayne@68: self._notempty = threading.Condition(threading.Lock()) jpayne@68: self._buffer = collections.deque() jpayne@68: self._thread = None jpayne@68: self._jointhread = None jpayne@68: self._joincancelled = False jpayne@68: self._closed = False jpayne@68: self._close = None jpayne@68: self._send_bytes = self._writer.send_bytes jpayne@68: self._recv_bytes = self._reader.recv_bytes jpayne@68: self._poll = self._reader.poll jpayne@68: jpayne@68: def put(self, obj, block=True, timeout=None): jpayne@68: if self._closed: jpayne@68: raise ValueError(f"Queue {self!r} is closed") jpayne@68: if not self._sem.acquire(block, timeout): jpayne@68: raise Full jpayne@68: jpayne@68: with self._notempty: jpayne@68: if self._thread is None: jpayne@68: self._start_thread() jpayne@68: self._buffer.append(obj) jpayne@68: self._notempty.notify() jpayne@68: jpayne@68: def get(self, block=True, timeout=None): jpayne@68: if self._closed: jpayne@68: raise ValueError(f"Queue {self!r} is closed") jpayne@68: if block and timeout is None: jpayne@68: with self._rlock: jpayne@68: res = self._recv_bytes() jpayne@68: self._sem.release() jpayne@68: else: jpayne@68: if block: jpayne@68: deadline = time.monotonic() + timeout jpayne@68: if not self._rlock.acquire(block, timeout): jpayne@68: raise Empty jpayne@68: try: jpayne@68: if block: jpayne@68: timeout = deadline - time.monotonic() jpayne@68: if not self._poll(timeout): jpayne@68: raise Empty jpayne@68: elif not self._poll(): jpayne@68: raise Empty jpayne@68: res = self._recv_bytes() jpayne@68: self._sem.release() jpayne@68: finally: jpayne@68: self._rlock.release() jpayne@68: # unserialize the data after having released the lock jpayne@68: return _ForkingPickler.loads(res) jpayne@68: jpayne@68: def qsize(self): jpayne@68: # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() jpayne@68: return self._maxsize - self._sem._semlock._get_value() jpayne@68: jpayne@68: def empty(self): jpayne@68: return not self._poll() jpayne@68: jpayne@68: def full(self): jpayne@68: return self._sem._semlock._is_zero() jpayne@68: jpayne@68: def get_nowait(self): jpayne@68: return self.get(False) jpayne@68: jpayne@68: def put_nowait(self, obj): jpayne@68: return self.put(obj, False) jpayne@68: jpayne@68: def close(self): jpayne@68: self._closed = True jpayne@68: try: jpayne@68: self._reader.close() jpayne@68: finally: jpayne@68: close = self._close jpayne@68: if close: jpayne@68: self._close = None jpayne@68: close() jpayne@68: jpayne@68: def join_thread(self): jpayne@68: debug('Queue.join_thread()') jpayne@68: assert self._closed, "Queue {0!r} not closed".format(self) jpayne@68: if self._jointhread: jpayne@68: self._jointhread() jpayne@68: jpayne@68: def cancel_join_thread(self): jpayne@68: debug('Queue.cancel_join_thread()') jpayne@68: self._joincancelled = True jpayne@68: try: jpayne@68: self._jointhread.cancel() jpayne@68: except AttributeError: jpayne@68: pass jpayne@68: jpayne@68: def _start_thread(self): jpayne@68: debug('Queue._start_thread()') jpayne@68: jpayne@68: # Start thread which transfers data from buffer to pipe jpayne@68: self._buffer.clear() jpayne@68: self._thread = threading.Thread( jpayne@68: target=Queue._feed, jpayne@68: args=(self._buffer, self._notempty, self._send_bytes, jpayne@68: self._wlock, self._writer.close, self._ignore_epipe, jpayne@68: self._on_queue_feeder_error, self._sem), jpayne@68: name='QueueFeederThread' jpayne@68: ) jpayne@68: self._thread.daemon = True jpayne@68: jpayne@68: debug('doing self._thread.start()') jpayne@68: self._thread.start() jpayne@68: debug('... done self._thread.start()') jpayne@68: jpayne@68: if not self._joincancelled: jpayne@68: self._jointhread = Finalize( jpayne@68: self._thread, Queue._finalize_join, jpayne@68: [weakref.ref(self._thread)], jpayne@68: exitpriority=-5 jpayne@68: ) jpayne@68: jpayne@68: # Send sentinel to the thread queue object when garbage collected jpayne@68: self._close = Finalize( jpayne@68: self, Queue._finalize_close, jpayne@68: [self._buffer, self._notempty], jpayne@68: exitpriority=10 jpayne@68: ) jpayne@68: jpayne@68: @staticmethod jpayne@68: def _finalize_join(twr): jpayne@68: debug('joining queue thread') jpayne@68: thread = twr() jpayne@68: if thread is not None: jpayne@68: thread.join() jpayne@68: debug('... queue thread joined') jpayne@68: else: jpayne@68: debug('... queue thread already dead') jpayne@68: jpayne@68: @staticmethod jpayne@68: def _finalize_close(buffer, notempty): jpayne@68: debug('telling queue thread to quit') jpayne@68: with notempty: jpayne@68: buffer.append(_sentinel) jpayne@68: notempty.notify() jpayne@68: jpayne@68: @staticmethod jpayne@68: def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, jpayne@68: onerror, queue_sem): jpayne@68: debug('starting thread to feed data to pipe') jpayne@68: nacquire = notempty.acquire jpayne@68: nrelease = notempty.release jpayne@68: nwait = notempty.wait jpayne@68: bpopleft = buffer.popleft jpayne@68: sentinel = _sentinel jpayne@68: if sys.platform != 'win32': jpayne@68: wacquire = writelock.acquire jpayne@68: wrelease = writelock.release jpayne@68: else: jpayne@68: wacquire = None jpayne@68: jpayne@68: while 1: jpayne@68: try: jpayne@68: nacquire() jpayne@68: try: jpayne@68: if not buffer: jpayne@68: nwait() jpayne@68: finally: jpayne@68: nrelease() jpayne@68: try: jpayne@68: while 1: jpayne@68: obj = bpopleft() jpayne@68: if obj is sentinel: jpayne@68: debug('feeder thread got sentinel -- exiting') jpayne@68: close() jpayne@68: return jpayne@68: jpayne@68: # serialize the data before acquiring the lock jpayne@68: obj = _ForkingPickler.dumps(obj) jpayne@68: if wacquire is None: jpayne@68: send_bytes(obj) jpayne@68: else: jpayne@68: wacquire() jpayne@68: try: jpayne@68: send_bytes(obj) jpayne@68: finally: jpayne@68: wrelease() jpayne@68: except IndexError: jpayne@68: pass jpayne@68: except Exception as e: jpayne@68: if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: jpayne@68: return jpayne@68: # Since this runs in a daemon thread the resources it uses jpayne@68: # may be become unusable while the process is cleaning up. jpayne@68: # We ignore errors which happen after the process has jpayne@68: # started to cleanup. jpayne@68: if is_exiting(): jpayne@68: info('error in queue thread: %s', e) jpayne@68: return jpayne@68: else: jpayne@68: # Since the object has not been sent in the queue, we need jpayne@68: # to decrease the size of the queue. The error acts as jpayne@68: # if the object had been silently removed from the queue jpayne@68: # and this step is necessary to have a properly working jpayne@68: # queue. jpayne@68: queue_sem.release() jpayne@68: onerror(e, obj) jpayne@68: jpayne@68: @staticmethod jpayne@68: def _on_queue_feeder_error(e, obj): jpayne@68: """ jpayne@68: Private API hook called when feeding data in the background thread jpayne@68: raises an exception. For overriding by concurrent.futures. jpayne@68: """ jpayne@68: import traceback jpayne@68: traceback.print_exc() jpayne@68: jpayne@68: jpayne@68: _sentinel = object() jpayne@68: jpayne@68: # jpayne@68: # A queue type which also supports join() and task_done() methods jpayne@68: # jpayne@68: # Note that if you do not call task_done() for each finished task then jpayne@68: # eventually the counter's semaphore may overflow causing Bad Things jpayne@68: # to happen. jpayne@68: # jpayne@68: jpayne@68: class JoinableQueue(Queue): jpayne@68: jpayne@68: def __init__(self, maxsize=0, *, ctx): jpayne@68: Queue.__init__(self, maxsize, ctx=ctx) jpayne@68: self._unfinished_tasks = ctx.Semaphore(0) jpayne@68: self._cond = ctx.Condition() jpayne@68: jpayne@68: def __getstate__(self): jpayne@68: return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) jpayne@68: jpayne@68: def __setstate__(self, state): jpayne@68: Queue.__setstate__(self, state[:-2]) jpayne@68: self._cond, self._unfinished_tasks = state[-2:] jpayne@68: jpayne@68: def put(self, obj, block=True, timeout=None): jpayne@68: if self._closed: jpayne@68: raise ValueError(f"Queue {self!r} is closed") jpayne@68: if not self._sem.acquire(block, timeout): jpayne@68: raise Full jpayne@68: jpayne@68: with self._notempty, self._cond: jpayne@68: if self._thread is None: jpayne@68: self._start_thread() jpayne@68: self._buffer.append(obj) jpayne@68: self._unfinished_tasks.release() jpayne@68: self._notempty.notify() jpayne@68: jpayne@68: def task_done(self): jpayne@68: with self._cond: jpayne@68: if not self._unfinished_tasks.acquire(False): jpayne@68: raise ValueError('task_done() called too many times') jpayne@68: if self._unfinished_tasks._semlock._is_zero(): jpayne@68: self._cond.notify_all() jpayne@68: jpayne@68: def join(self): jpayne@68: with self._cond: jpayne@68: if not self._unfinished_tasks._semlock._is_zero(): jpayne@68: self._cond.wait() jpayne@68: jpayne@68: # jpayne@68: # Simplified Queue type -- really just a locked pipe jpayne@68: # jpayne@68: jpayne@68: class SimpleQueue(object): jpayne@68: jpayne@68: def __init__(self, *, ctx): jpayne@68: self._reader, self._writer = connection.Pipe(duplex=False) jpayne@68: self._rlock = ctx.Lock() jpayne@68: self._poll = self._reader.poll jpayne@68: if sys.platform == 'win32': jpayne@68: self._wlock = None jpayne@68: else: jpayne@68: self._wlock = ctx.Lock() jpayne@68: jpayne@68: def empty(self): jpayne@68: return not self._poll() jpayne@68: jpayne@68: def __getstate__(self): jpayne@68: context.assert_spawning(self) jpayne@68: return (self._reader, self._writer, self._rlock, self._wlock) jpayne@68: jpayne@68: def __setstate__(self, state): jpayne@68: (self._reader, self._writer, self._rlock, self._wlock) = state jpayne@68: self._poll = self._reader.poll jpayne@68: jpayne@68: def get(self): jpayne@68: with self._rlock: jpayne@68: res = self._reader.recv_bytes() jpayne@68: # unserialize the data after having released the lock jpayne@68: return _ForkingPickler.loads(res) jpayne@68: jpayne@68: def put(self, obj): jpayne@68: # serialize the data before acquiring the lock jpayne@68: obj = _ForkingPickler.dumps(obj) jpayne@68: if self._wlock is None: jpayne@68: # writes to a message oriented win32 pipe are atomic jpayne@68: self._writer.send_bytes(obj) jpayne@68: else: jpayne@68: with self._wlock: jpayne@68: self._writer.send_bytes(obj)