jpayne@69: # jpayne@69: # Module implementing queues jpayne@69: # jpayne@69: # multiprocessing/queues.py jpayne@69: # jpayne@69: # Copyright (c) 2006-2008, R Oudkerk jpayne@69: # Licensed to PSF under a Contributor Agreement. jpayne@69: # jpayne@69: jpayne@69: __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] jpayne@69: jpayne@69: import sys jpayne@69: import os jpayne@69: import threading jpayne@69: import collections jpayne@69: import time jpayne@69: import weakref jpayne@69: import errno jpayne@69: jpayne@69: from queue import Empty, Full jpayne@69: jpayne@69: import _multiprocessing jpayne@69: jpayne@69: from . import connection jpayne@69: from . import context jpayne@69: _ForkingPickler = context.reduction.ForkingPickler jpayne@69: jpayne@69: from .util import debug, info, Finalize, register_after_fork, is_exiting jpayne@69: jpayne@69: # jpayne@69: # Queue type using a pipe, buffer and thread jpayne@69: # jpayne@69: jpayne@69: class Queue(object): jpayne@69: jpayne@69: def __init__(self, maxsize=0, *, ctx): jpayne@69: if maxsize <= 0: jpayne@69: # Can raise ImportError (see issues #3770 and #23400) jpayne@69: from .synchronize import SEM_VALUE_MAX as maxsize jpayne@69: self._maxsize = maxsize jpayne@69: self._reader, self._writer = connection.Pipe(duplex=False) jpayne@69: self._rlock = ctx.Lock() jpayne@69: self._opid = os.getpid() jpayne@69: if sys.platform == 'win32': jpayne@69: self._wlock = None jpayne@69: else: jpayne@69: self._wlock = ctx.Lock() jpayne@69: self._sem = ctx.BoundedSemaphore(maxsize) jpayne@69: # For use by concurrent.futures jpayne@69: self._ignore_epipe = False jpayne@69: jpayne@69: self._after_fork() jpayne@69: jpayne@69: if sys.platform != 'win32': jpayne@69: register_after_fork(self, Queue._after_fork) jpayne@69: jpayne@69: def __getstate__(self): jpayne@69: context.assert_spawning(self) jpayne@69: return (self._ignore_epipe, self._maxsize, self._reader, self._writer, jpayne@69: self._rlock, self._wlock, self._sem, self._opid) jpayne@69: jpayne@69: def __setstate__(self, state): jpayne@69: (self._ignore_epipe, self._maxsize, self._reader, self._writer, jpayne@69: self._rlock, self._wlock, self._sem, self._opid) = state jpayne@69: self._after_fork() jpayne@69: jpayne@69: def _after_fork(self): jpayne@69: debug('Queue._after_fork()') jpayne@69: self._notempty = threading.Condition(threading.Lock()) jpayne@69: self._buffer = collections.deque() jpayne@69: self._thread = None jpayne@69: self._jointhread = None jpayne@69: self._joincancelled = False jpayne@69: self._closed = False jpayne@69: self._close = None jpayne@69: self._send_bytes = self._writer.send_bytes jpayne@69: self._recv_bytes = self._reader.recv_bytes jpayne@69: self._poll = self._reader.poll jpayne@69: jpayne@69: def put(self, obj, block=True, timeout=None): jpayne@69: if self._closed: jpayne@69: raise ValueError(f"Queue {self!r} is closed") jpayne@69: if not self._sem.acquire(block, timeout): jpayne@69: raise Full jpayne@69: jpayne@69: with self._notempty: jpayne@69: if self._thread is None: jpayne@69: self._start_thread() jpayne@69: self._buffer.append(obj) jpayne@69: self._notempty.notify() jpayne@69: jpayne@69: def get(self, block=True, timeout=None): jpayne@69: if self._closed: jpayne@69: raise ValueError(f"Queue {self!r} is closed") jpayne@69: if block and timeout is None: jpayne@69: with self._rlock: jpayne@69: res = self._recv_bytes() jpayne@69: self._sem.release() jpayne@69: else: jpayne@69: if block: jpayne@69: deadline = time.monotonic() + timeout jpayne@69: if not self._rlock.acquire(block, timeout): jpayne@69: raise Empty jpayne@69: try: jpayne@69: if block: jpayne@69: timeout = deadline - time.monotonic() jpayne@69: if not self._poll(timeout): jpayne@69: raise Empty jpayne@69: elif not self._poll(): jpayne@69: raise Empty jpayne@69: res = self._recv_bytes() jpayne@69: self._sem.release() jpayne@69: finally: jpayne@69: self._rlock.release() jpayne@69: # unserialize the data after having released the lock jpayne@69: return _ForkingPickler.loads(res) jpayne@69: jpayne@69: def qsize(self): jpayne@69: # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() jpayne@69: return self._maxsize - self._sem._semlock._get_value() jpayne@69: jpayne@69: def empty(self): jpayne@69: return not self._poll() jpayne@69: jpayne@69: def full(self): jpayne@69: return self._sem._semlock._is_zero() jpayne@69: jpayne@69: def get_nowait(self): jpayne@69: return self.get(False) jpayne@69: jpayne@69: def put_nowait(self, obj): jpayne@69: return self.put(obj, False) jpayne@69: jpayne@69: def close(self): jpayne@69: self._closed = True jpayne@69: try: jpayne@69: self._reader.close() jpayne@69: finally: jpayne@69: close = self._close jpayne@69: if close: jpayne@69: self._close = None jpayne@69: close() jpayne@69: jpayne@69: def join_thread(self): jpayne@69: debug('Queue.join_thread()') jpayne@69: assert self._closed, "Queue {0!r} not closed".format(self) jpayne@69: if self._jointhread: jpayne@69: self._jointhread() jpayne@69: jpayne@69: def cancel_join_thread(self): jpayne@69: debug('Queue.cancel_join_thread()') jpayne@69: self._joincancelled = True jpayne@69: try: jpayne@69: self._jointhread.cancel() jpayne@69: except AttributeError: jpayne@69: pass jpayne@69: jpayne@69: def _start_thread(self): jpayne@69: debug('Queue._start_thread()') jpayne@69: jpayne@69: # Start thread which transfers data from buffer to pipe jpayne@69: self._buffer.clear() jpayne@69: self._thread = threading.Thread( jpayne@69: target=Queue._feed, jpayne@69: args=(self._buffer, self._notempty, self._send_bytes, jpayne@69: self._wlock, self._writer.close, self._ignore_epipe, jpayne@69: self._on_queue_feeder_error, self._sem), jpayne@69: name='QueueFeederThread' jpayne@69: ) jpayne@69: self._thread.daemon = True jpayne@69: jpayne@69: debug('doing self._thread.start()') jpayne@69: self._thread.start() jpayne@69: debug('... done self._thread.start()') jpayne@69: jpayne@69: if not self._joincancelled: jpayne@69: self._jointhread = Finalize( jpayne@69: self._thread, Queue._finalize_join, jpayne@69: [weakref.ref(self._thread)], jpayne@69: exitpriority=-5 jpayne@69: ) jpayne@69: jpayne@69: # Send sentinel to the thread queue object when garbage collected jpayne@69: self._close = Finalize( jpayne@69: self, Queue._finalize_close, jpayne@69: [self._buffer, self._notempty], jpayne@69: exitpriority=10 jpayne@69: ) jpayne@69: jpayne@69: @staticmethod jpayne@69: def _finalize_join(twr): jpayne@69: debug('joining queue thread') jpayne@69: thread = twr() jpayne@69: if thread is not None: jpayne@69: thread.join() jpayne@69: debug('... queue thread joined') jpayne@69: else: jpayne@69: debug('... queue thread already dead') jpayne@69: jpayne@69: @staticmethod jpayne@69: def _finalize_close(buffer, notempty): jpayne@69: debug('telling queue thread to quit') jpayne@69: with notempty: jpayne@69: buffer.append(_sentinel) jpayne@69: notempty.notify() jpayne@69: jpayne@69: @staticmethod jpayne@69: def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, jpayne@69: onerror, queue_sem): jpayne@69: debug('starting thread to feed data to pipe') jpayne@69: nacquire = notempty.acquire jpayne@69: nrelease = notempty.release jpayne@69: nwait = notempty.wait jpayne@69: bpopleft = buffer.popleft jpayne@69: sentinel = _sentinel jpayne@69: if sys.platform != 'win32': jpayne@69: wacquire = writelock.acquire jpayne@69: wrelease = writelock.release jpayne@69: else: jpayne@69: wacquire = None jpayne@69: jpayne@69: while 1: jpayne@69: try: jpayne@69: nacquire() jpayne@69: try: jpayne@69: if not buffer: jpayne@69: nwait() jpayne@69: finally: jpayne@69: nrelease() jpayne@69: try: jpayne@69: while 1: jpayne@69: obj = bpopleft() jpayne@69: if obj is sentinel: jpayne@69: debug('feeder thread got sentinel -- exiting') jpayne@69: close() jpayne@69: return jpayne@69: jpayne@69: # serialize the data before acquiring the lock jpayne@69: obj = _ForkingPickler.dumps(obj) jpayne@69: if wacquire is None: jpayne@69: send_bytes(obj) jpayne@69: else: jpayne@69: wacquire() jpayne@69: try: jpayne@69: send_bytes(obj) jpayne@69: finally: jpayne@69: wrelease() jpayne@69: except IndexError: jpayne@69: pass jpayne@69: except Exception as e: jpayne@69: if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: jpayne@69: return jpayne@69: # Since this runs in a daemon thread the resources it uses jpayne@69: # may be become unusable while the process is cleaning up. jpayne@69: # We ignore errors which happen after the process has jpayne@69: # started to cleanup. jpayne@69: if is_exiting(): jpayne@69: info('error in queue thread: %s', e) jpayne@69: return jpayne@69: else: jpayne@69: # Since the object has not been sent in the queue, we need jpayne@69: # to decrease the size of the queue. The error acts as jpayne@69: # if the object had been silently removed from the queue jpayne@69: # and this step is necessary to have a properly working jpayne@69: # queue. jpayne@69: queue_sem.release() jpayne@69: onerror(e, obj) jpayne@69: jpayne@69: @staticmethod jpayne@69: def _on_queue_feeder_error(e, obj): jpayne@69: """ jpayne@69: Private API hook called when feeding data in the background thread jpayne@69: raises an exception. For overriding by concurrent.futures. jpayne@69: """ jpayne@69: import traceback jpayne@69: traceback.print_exc() jpayne@69: jpayne@69: jpayne@69: _sentinel = object() jpayne@69: jpayne@69: # jpayne@69: # A queue type which also supports join() and task_done() methods jpayne@69: # jpayne@69: # Note that if you do not call task_done() for each finished task then jpayne@69: # eventually the counter's semaphore may overflow causing Bad Things jpayne@69: # to happen. jpayne@69: # jpayne@69: jpayne@69: class JoinableQueue(Queue): jpayne@69: jpayne@69: def __init__(self, maxsize=0, *, ctx): jpayne@69: Queue.__init__(self, maxsize, ctx=ctx) jpayne@69: self._unfinished_tasks = ctx.Semaphore(0) jpayne@69: self._cond = ctx.Condition() jpayne@69: jpayne@69: def __getstate__(self): jpayne@69: return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) jpayne@69: jpayne@69: def __setstate__(self, state): jpayne@69: Queue.__setstate__(self, state[:-2]) jpayne@69: self._cond, self._unfinished_tasks = state[-2:] jpayne@69: jpayne@69: def put(self, obj, block=True, timeout=None): jpayne@69: if self._closed: jpayne@69: raise ValueError(f"Queue {self!r} is closed") jpayne@69: if not self._sem.acquire(block, timeout): jpayne@69: raise Full jpayne@69: jpayne@69: with self._notempty, self._cond: jpayne@69: if self._thread is None: jpayne@69: self._start_thread() jpayne@69: self._buffer.append(obj) jpayne@69: self._unfinished_tasks.release() jpayne@69: self._notempty.notify() jpayne@69: jpayne@69: def task_done(self): jpayne@69: with self._cond: jpayne@69: if not self._unfinished_tasks.acquire(False): jpayne@69: raise ValueError('task_done() called too many times') jpayne@69: if self._unfinished_tasks._semlock._is_zero(): jpayne@69: self._cond.notify_all() jpayne@69: jpayne@69: def join(self): jpayne@69: with self._cond: jpayne@69: if not self._unfinished_tasks._semlock._is_zero(): jpayne@69: self._cond.wait() jpayne@69: jpayne@69: # jpayne@69: # Simplified Queue type -- really just a locked pipe jpayne@69: # jpayne@69: jpayne@69: class SimpleQueue(object): jpayne@69: jpayne@69: def __init__(self, *, ctx): jpayne@69: self._reader, self._writer = connection.Pipe(duplex=False) jpayne@69: self._rlock = ctx.Lock() jpayne@69: self._poll = self._reader.poll jpayne@69: if sys.platform == 'win32': jpayne@69: self._wlock = None jpayne@69: else: jpayne@69: self._wlock = ctx.Lock() jpayne@69: jpayne@69: def empty(self): jpayne@69: return not self._poll() jpayne@69: jpayne@69: def __getstate__(self): jpayne@69: context.assert_spawning(self) jpayne@69: return (self._reader, self._writer, self._rlock, self._wlock) jpayne@69: jpayne@69: def __setstate__(self, state): jpayne@69: (self._reader, self._writer, self._rlock, self._wlock) = state jpayne@69: self._poll = self._reader.poll jpayne@69: jpayne@69: def get(self): jpayne@69: with self._rlock: jpayne@69: res = self._reader.recv_bytes() jpayne@69: # unserialize the data after having released the lock jpayne@69: return _ForkingPickler.loads(res) jpayne@69: jpayne@69: def put(self, obj): jpayne@69: # serialize the data before acquiring the lock jpayne@69: obj = _ForkingPickler.dumps(obj) jpayne@69: if self._wlock is None: jpayne@69: # writes to a message oriented win32 pipe are atomic jpayne@69: self._writer.send_bytes(obj) jpayne@69: else: jpayne@69: with self._wlock: jpayne@69: self._writer.send_bytes(obj)