jpayne@68: # jpayne@68: # Module providing the `Pool` class for managing a process pool jpayne@68: # jpayne@68: # multiprocessing/pool.py jpayne@68: # jpayne@68: # Copyright (c) 2006-2008, R Oudkerk jpayne@68: # Licensed to PSF under a Contributor Agreement. jpayne@68: # jpayne@68: jpayne@68: __all__ = ['Pool', 'ThreadPool'] jpayne@68: jpayne@68: # jpayne@68: # Imports jpayne@68: # jpayne@68: jpayne@68: import collections jpayne@68: import itertools jpayne@68: import os jpayne@68: import queue jpayne@68: import threading jpayne@68: import time jpayne@68: import traceback jpayne@68: import warnings jpayne@68: from queue import Empty jpayne@68: jpayne@68: # If threading is available then ThreadPool should be provided. Therefore jpayne@68: # we avoid top-level imports which are liable to fail on some systems. jpayne@68: from . import util jpayne@68: from . import get_context, TimeoutError jpayne@68: from .connection import wait jpayne@68: jpayne@68: # jpayne@68: # Constants representing the state of a pool jpayne@68: # jpayne@68: jpayne@68: INIT = "INIT" jpayne@68: RUN = "RUN" jpayne@68: CLOSE = "CLOSE" jpayne@68: TERMINATE = "TERMINATE" jpayne@68: jpayne@68: # jpayne@68: # Miscellaneous jpayne@68: # jpayne@68: jpayne@68: job_counter = itertools.count() jpayne@68: jpayne@68: def mapstar(args): jpayne@68: return list(map(*args)) jpayne@68: jpayne@68: def starmapstar(args): jpayne@68: return list(itertools.starmap(args[0], args[1])) jpayne@68: jpayne@68: # jpayne@68: # Hack to embed stringification of remote traceback in local traceback jpayne@68: # jpayne@68: jpayne@68: class RemoteTraceback(Exception): jpayne@68: def __init__(self, tb): jpayne@68: self.tb = tb jpayne@68: def __str__(self): jpayne@68: return self.tb jpayne@68: jpayne@68: class ExceptionWithTraceback: jpayne@68: def __init__(self, exc, tb): jpayne@68: tb = traceback.format_exception(type(exc), exc, tb) jpayne@68: tb = ''.join(tb) jpayne@68: self.exc = exc jpayne@68: self.tb = '\n"""\n%s"""' % tb jpayne@68: def __reduce__(self): jpayne@68: return rebuild_exc, (self.exc, self.tb) jpayne@68: jpayne@68: def rebuild_exc(exc, tb): jpayne@68: exc.__cause__ = RemoteTraceback(tb) jpayne@68: return exc jpayne@68: jpayne@68: # jpayne@68: # Code run by worker processes jpayne@68: # jpayne@68: jpayne@68: class MaybeEncodingError(Exception): jpayne@68: """Wraps possible unpickleable errors, so they can be jpayne@68: safely sent through the socket.""" jpayne@68: jpayne@68: def __init__(self, exc, value): jpayne@68: self.exc = repr(exc) jpayne@68: self.value = repr(value) jpayne@68: super(MaybeEncodingError, self).__init__(self.exc, self.value) jpayne@68: jpayne@68: def __str__(self): jpayne@68: return "Error sending result: '%s'. Reason: '%s'" % (self.value, jpayne@68: self.exc) jpayne@68: jpayne@68: def __repr__(self): jpayne@68: return "<%s: %s>" % (self.__class__.__name__, self) jpayne@68: jpayne@68: jpayne@68: def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, jpayne@68: wrap_exception=False): jpayne@68: if (maxtasks is not None) and not (isinstance(maxtasks, int) jpayne@68: and maxtasks >= 1): jpayne@68: raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) jpayne@68: put = outqueue.put jpayne@68: get = inqueue.get jpayne@68: if hasattr(inqueue, '_writer'): jpayne@68: inqueue._writer.close() jpayne@68: outqueue._reader.close() jpayne@68: jpayne@68: if initializer is not None: jpayne@68: initializer(*initargs) jpayne@68: jpayne@68: completed = 0 jpayne@68: while maxtasks is None or (maxtasks and completed < maxtasks): jpayne@68: try: jpayne@68: task = get() jpayne@68: except (EOFError, OSError): jpayne@68: util.debug('worker got EOFError or OSError -- exiting') jpayne@68: break jpayne@68: jpayne@68: if task is None: jpayne@68: util.debug('worker got sentinel -- exiting') jpayne@68: break jpayne@68: jpayne@68: job, i, func, args, kwds = task jpayne@68: try: jpayne@68: result = (True, func(*args, **kwds)) jpayne@68: except Exception as e: jpayne@68: if wrap_exception and func is not _helper_reraises_exception: jpayne@68: e = ExceptionWithTraceback(e, e.__traceback__) jpayne@68: result = (False, e) jpayne@68: try: jpayne@68: put((job, i, result)) jpayne@68: except Exception as e: jpayne@68: wrapped = MaybeEncodingError(e, result[1]) jpayne@68: util.debug("Possible encoding error while sending result: %s" % ( jpayne@68: wrapped)) jpayne@68: put((job, i, (False, wrapped))) jpayne@68: jpayne@68: task = job = result = func = args = kwds = None jpayne@68: completed += 1 jpayne@68: util.debug('worker exiting after %d tasks' % completed) jpayne@68: jpayne@68: def _helper_reraises_exception(ex): jpayne@68: 'Pickle-able helper function for use by _guarded_task_generation.' jpayne@68: raise ex jpayne@68: jpayne@68: # jpayne@68: # Class representing a process pool jpayne@68: # jpayne@68: jpayne@68: class _PoolCache(dict): jpayne@68: """ jpayne@68: Class that implements a cache for the Pool class that will notify jpayne@68: the pool management threads every time the cache is emptied. The jpayne@68: notification is done by the use of a queue that is provided when jpayne@68: instantiating the cache. jpayne@68: """ jpayne@68: def __init__(self, /, *args, notifier=None, **kwds): jpayne@68: self.notifier = notifier jpayne@68: super().__init__(*args, **kwds) jpayne@68: jpayne@68: def __delitem__(self, item): jpayne@68: super().__delitem__(item) jpayne@68: jpayne@68: # Notify that the cache is empty. This is important because the jpayne@68: # pool keeps maintaining workers until the cache gets drained. This jpayne@68: # eliminates a race condition in which a task is finished after the jpayne@68: # the pool's _handle_workers method has enter another iteration of the jpayne@68: # loop. In this situation, the only event that can wake up the pool jpayne@68: # is the cache to be emptied (no more tasks available). jpayne@68: if not self: jpayne@68: self.notifier.put(None) jpayne@68: jpayne@68: class Pool(object): jpayne@68: ''' jpayne@68: Class which supports an async version of applying functions to arguments. jpayne@68: ''' jpayne@68: _wrap_exception = True jpayne@68: jpayne@68: @staticmethod jpayne@68: def Process(ctx, *args, **kwds): jpayne@68: return ctx.Process(*args, **kwds) jpayne@68: jpayne@68: def __init__(self, processes=None, initializer=None, initargs=(), jpayne@68: maxtasksperchild=None, context=None): jpayne@68: # Attributes initialized early to make sure that they exist in jpayne@68: # __del__() if __init__() raises an exception jpayne@68: self._pool = [] jpayne@68: self._state = INIT jpayne@68: jpayne@68: self._ctx = context or get_context() jpayne@68: self._setup_queues() jpayne@68: self._taskqueue = queue.SimpleQueue() jpayne@68: # The _change_notifier queue exist to wake up self._handle_workers() jpayne@68: # when the cache (self._cache) is empty or when there is a change in jpayne@68: # the _state variable of the thread that runs _handle_workers. jpayne@68: self._change_notifier = self._ctx.SimpleQueue() jpayne@68: self._cache = _PoolCache(notifier=self._change_notifier) jpayne@68: self._maxtasksperchild = maxtasksperchild jpayne@68: self._initializer = initializer jpayne@68: self._initargs = initargs jpayne@68: jpayne@68: if processes is None: jpayne@68: processes = os.cpu_count() or 1 jpayne@68: if processes < 1: jpayne@68: raise ValueError("Number of processes must be at least 1") jpayne@68: jpayne@68: if initializer is not None and not callable(initializer): jpayne@68: raise TypeError('initializer must be a callable') jpayne@68: jpayne@68: self._processes = processes jpayne@68: try: jpayne@68: self._repopulate_pool() jpayne@68: except Exception: jpayne@68: for p in self._pool: jpayne@68: if p.exitcode is None: jpayne@68: p.terminate() jpayne@68: for p in self._pool: jpayne@68: p.join() jpayne@68: raise jpayne@68: jpayne@68: sentinels = self._get_sentinels() jpayne@68: jpayne@68: self._worker_handler = threading.Thread( jpayne@68: target=Pool._handle_workers, jpayne@68: args=(self._cache, self._taskqueue, self._ctx, self.Process, jpayne@68: self._processes, self._pool, self._inqueue, self._outqueue, jpayne@68: self._initializer, self._initargs, self._maxtasksperchild, jpayne@68: self._wrap_exception, sentinels, self._change_notifier) jpayne@68: ) jpayne@68: self._worker_handler.daemon = True jpayne@68: self._worker_handler._state = RUN jpayne@68: self._worker_handler.start() jpayne@68: jpayne@68: jpayne@68: self._task_handler = threading.Thread( jpayne@68: target=Pool._handle_tasks, jpayne@68: args=(self._taskqueue, self._quick_put, self._outqueue, jpayne@68: self._pool, self._cache) jpayne@68: ) jpayne@68: self._task_handler.daemon = True jpayne@68: self._task_handler._state = RUN jpayne@68: self._task_handler.start() jpayne@68: jpayne@68: self._result_handler = threading.Thread( jpayne@68: target=Pool._handle_results, jpayne@68: args=(self._outqueue, self._quick_get, self._cache) jpayne@68: ) jpayne@68: self._result_handler.daemon = True jpayne@68: self._result_handler._state = RUN jpayne@68: self._result_handler.start() jpayne@68: jpayne@68: self._terminate = util.Finalize( jpayne@68: self, self._terminate_pool, jpayne@68: args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, jpayne@68: self._change_notifier, self._worker_handler, self._task_handler, jpayne@68: self._result_handler, self._cache), jpayne@68: exitpriority=15 jpayne@68: ) jpayne@68: self._state = RUN jpayne@68: jpayne@68: # Copy globals as function locals to make sure that they are available jpayne@68: # during Python shutdown when the Pool is destroyed. jpayne@68: def __del__(self, _warn=warnings.warn, RUN=RUN): jpayne@68: if self._state == RUN: jpayne@68: _warn(f"unclosed running multiprocessing pool {self!r}", jpayne@68: ResourceWarning, source=self) jpayne@68: if getattr(self, '_change_notifier', None) is not None: jpayne@68: self._change_notifier.put(None) jpayne@68: jpayne@68: def __repr__(self): jpayne@68: cls = self.__class__ jpayne@68: return (f'<{cls.__module__}.{cls.__qualname__} ' jpayne@68: f'state={self._state} ' jpayne@68: f'pool_size={len(self._pool)}>') jpayne@68: jpayne@68: def _get_sentinels(self): jpayne@68: task_queue_sentinels = [self._outqueue._reader] jpayne@68: self_notifier_sentinels = [self._change_notifier._reader] jpayne@68: return [*task_queue_sentinels, *self_notifier_sentinels] jpayne@68: jpayne@68: @staticmethod jpayne@68: def _get_worker_sentinels(workers): jpayne@68: return [worker.sentinel for worker in jpayne@68: workers if hasattr(worker, "sentinel")] jpayne@68: jpayne@68: @staticmethod jpayne@68: def _join_exited_workers(pool): jpayne@68: """Cleanup after any worker processes which have exited due to reaching jpayne@68: their specified lifetime. Returns True if any workers were cleaned up. jpayne@68: """ jpayne@68: cleaned = False jpayne@68: for i in reversed(range(len(pool))): jpayne@68: worker = pool[i] jpayne@68: if worker.exitcode is not None: jpayne@68: # worker exited jpayne@68: util.debug('cleaning up worker %d' % i) jpayne@68: worker.join() jpayne@68: cleaned = True jpayne@68: del pool[i] jpayne@68: return cleaned jpayne@68: jpayne@68: def _repopulate_pool(self): jpayne@68: return self._repopulate_pool_static(self._ctx, self.Process, jpayne@68: self._processes, jpayne@68: self._pool, self._inqueue, jpayne@68: self._outqueue, self._initializer, jpayne@68: self._initargs, jpayne@68: self._maxtasksperchild, jpayne@68: self._wrap_exception) jpayne@68: jpayne@68: @staticmethod jpayne@68: def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, jpayne@68: outqueue, initializer, initargs, jpayne@68: maxtasksperchild, wrap_exception): jpayne@68: """Bring the number of pool processes up to the specified number, jpayne@68: for use after reaping workers which have exited. jpayne@68: """ jpayne@68: for i in range(processes - len(pool)): jpayne@68: w = Process(ctx, target=worker, jpayne@68: args=(inqueue, outqueue, jpayne@68: initializer, jpayne@68: initargs, maxtasksperchild, jpayne@68: wrap_exception)) jpayne@68: w.name = w.name.replace('Process', 'PoolWorker') jpayne@68: w.daemon = True jpayne@68: w.start() jpayne@68: pool.append(w) jpayne@68: util.debug('added worker') jpayne@68: jpayne@68: @staticmethod jpayne@68: def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, jpayne@68: initializer, initargs, maxtasksperchild, jpayne@68: wrap_exception): jpayne@68: """Clean up any exited workers and start replacements for them. jpayne@68: """ jpayne@68: if Pool._join_exited_workers(pool): jpayne@68: Pool._repopulate_pool_static(ctx, Process, processes, pool, jpayne@68: inqueue, outqueue, initializer, jpayne@68: initargs, maxtasksperchild, jpayne@68: wrap_exception) jpayne@68: jpayne@68: def _setup_queues(self): jpayne@68: self._inqueue = self._ctx.SimpleQueue() jpayne@68: self._outqueue = self._ctx.SimpleQueue() jpayne@68: self._quick_put = self._inqueue._writer.send jpayne@68: self._quick_get = self._outqueue._reader.recv jpayne@68: jpayne@68: def _check_running(self): jpayne@68: if self._state != RUN: jpayne@68: raise ValueError("Pool not running") jpayne@68: jpayne@68: def apply(self, func, args=(), kwds={}): jpayne@68: ''' jpayne@68: Equivalent of `func(*args, **kwds)`. jpayne@68: Pool must be running. jpayne@68: ''' jpayne@68: return self.apply_async(func, args, kwds).get() jpayne@68: jpayne@68: def map(self, func, iterable, chunksize=None): jpayne@68: ''' jpayne@68: Apply `func` to each element in `iterable`, collecting the results jpayne@68: in a list that is returned. jpayne@68: ''' jpayne@68: return self._map_async(func, iterable, mapstar, chunksize).get() jpayne@68: jpayne@68: def starmap(self, func, iterable, chunksize=None): jpayne@68: ''' jpayne@68: Like `map()` method but the elements of the `iterable` are expected to jpayne@68: be iterables as well and will be unpacked as arguments. Hence jpayne@68: `func` and (a, b) becomes func(a, b). jpayne@68: ''' jpayne@68: return self._map_async(func, iterable, starmapstar, chunksize).get() jpayne@68: jpayne@68: def starmap_async(self, func, iterable, chunksize=None, callback=None, jpayne@68: error_callback=None): jpayne@68: ''' jpayne@68: Asynchronous version of `starmap()` method. jpayne@68: ''' jpayne@68: return self._map_async(func, iterable, starmapstar, chunksize, jpayne@68: callback, error_callback) jpayne@68: jpayne@68: def _guarded_task_generation(self, result_job, func, iterable): jpayne@68: '''Provides a generator of tasks for imap and imap_unordered with jpayne@68: appropriate handling for iterables which throw exceptions during jpayne@68: iteration.''' jpayne@68: try: jpayne@68: i = -1 jpayne@68: for i, x in enumerate(iterable): jpayne@68: yield (result_job, i, func, (x,), {}) jpayne@68: except Exception as e: jpayne@68: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) jpayne@68: jpayne@68: def imap(self, func, iterable, chunksize=1): jpayne@68: ''' jpayne@68: Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. jpayne@68: ''' jpayne@68: self._check_running() jpayne@68: if chunksize == 1: jpayne@68: result = IMapIterator(self) jpayne@68: self._taskqueue.put( jpayne@68: ( jpayne@68: self._guarded_task_generation(result._job, func, iterable), jpayne@68: result._set_length jpayne@68: )) jpayne@68: return result jpayne@68: else: jpayne@68: if chunksize < 1: jpayne@68: raise ValueError( jpayne@68: "Chunksize must be 1+, not {0:n}".format( jpayne@68: chunksize)) jpayne@68: task_batches = Pool._get_tasks(func, iterable, chunksize) jpayne@68: result = IMapIterator(self) jpayne@68: self._taskqueue.put( jpayne@68: ( jpayne@68: self._guarded_task_generation(result._job, jpayne@68: mapstar, jpayne@68: task_batches), jpayne@68: result._set_length jpayne@68: )) jpayne@68: return (item for chunk in result for item in chunk) jpayne@68: jpayne@68: def imap_unordered(self, func, iterable, chunksize=1): jpayne@68: ''' jpayne@68: Like `imap()` method but ordering of results is arbitrary. jpayne@68: ''' jpayne@68: self._check_running() jpayne@68: if chunksize == 1: jpayne@68: result = IMapUnorderedIterator(self) jpayne@68: self._taskqueue.put( jpayne@68: ( jpayne@68: self._guarded_task_generation(result._job, func, iterable), jpayne@68: result._set_length jpayne@68: )) jpayne@68: return result jpayne@68: else: jpayne@68: if chunksize < 1: jpayne@68: raise ValueError( jpayne@68: "Chunksize must be 1+, not {0!r}".format(chunksize)) jpayne@68: task_batches = Pool._get_tasks(func, iterable, chunksize) jpayne@68: result = IMapUnorderedIterator(self) jpayne@68: self._taskqueue.put( jpayne@68: ( jpayne@68: self._guarded_task_generation(result._job, jpayne@68: mapstar, jpayne@68: task_batches), jpayne@68: result._set_length jpayne@68: )) jpayne@68: return (item for chunk in result for item in chunk) jpayne@68: jpayne@68: def apply_async(self, func, args=(), kwds={}, callback=None, jpayne@68: error_callback=None): jpayne@68: ''' jpayne@68: Asynchronous version of `apply()` method. jpayne@68: ''' jpayne@68: self._check_running() jpayne@68: result = ApplyResult(self, callback, error_callback) jpayne@68: self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) jpayne@68: return result jpayne@68: jpayne@68: def map_async(self, func, iterable, chunksize=None, callback=None, jpayne@68: error_callback=None): jpayne@68: ''' jpayne@68: Asynchronous version of `map()` method. jpayne@68: ''' jpayne@68: return self._map_async(func, iterable, mapstar, chunksize, callback, jpayne@68: error_callback) jpayne@68: jpayne@68: def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, jpayne@68: error_callback=None): jpayne@68: ''' jpayne@68: Helper function to implement map, starmap and their async counterparts. jpayne@68: ''' jpayne@68: self._check_running() jpayne@68: if not hasattr(iterable, '__len__'): jpayne@68: iterable = list(iterable) jpayne@68: jpayne@68: if chunksize is None: jpayne@68: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) jpayne@68: if extra: jpayne@68: chunksize += 1 jpayne@68: if len(iterable) == 0: jpayne@68: chunksize = 0 jpayne@68: jpayne@68: task_batches = Pool._get_tasks(func, iterable, chunksize) jpayne@68: result = MapResult(self, chunksize, len(iterable), callback, jpayne@68: error_callback=error_callback) jpayne@68: self._taskqueue.put( jpayne@68: ( jpayne@68: self._guarded_task_generation(result._job, jpayne@68: mapper, jpayne@68: task_batches), jpayne@68: None jpayne@68: ) jpayne@68: ) jpayne@68: return result jpayne@68: jpayne@68: @staticmethod jpayne@68: def _wait_for_updates(sentinels, change_notifier, timeout=None): jpayne@68: wait(sentinels, timeout=timeout) jpayne@68: while not change_notifier.empty(): jpayne@68: change_notifier.get() jpayne@68: jpayne@68: @classmethod jpayne@68: def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, jpayne@68: pool, inqueue, outqueue, initializer, initargs, jpayne@68: maxtasksperchild, wrap_exception, sentinels, jpayne@68: change_notifier): jpayne@68: thread = threading.current_thread() jpayne@68: jpayne@68: # Keep maintaining workers until the cache gets drained, unless the pool jpayne@68: # is terminated. jpayne@68: while thread._state == RUN or (cache and thread._state != TERMINATE): jpayne@68: cls._maintain_pool(ctx, Process, processes, pool, inqueue, jpayne@68: outqueue, initializer, initargs, jpayne@68: maxtasksperchild, wrap_exception) jpayne@68: jpayne@68: current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] jpayne@68: jpayne@68: cls._wait_for_updates(current_sentinels, change_notifier) jpayne@68: # send sentinel to stop workers jpayne@68: taskqueue.put(None) jpayne@68: util.debug('worker handler exiting') jpayne@68: jpayne@68: @staticmethod jpayne@68: def _handle_tasks(taskqueue, put, outqueue, pool, cache): jpayne@68: thread = threading.current_thread() jpayne@68: jpayne@68: for taskseq, set_length in iter(taskqueue.get, None): jpayne@68: task = None jpayne@68: try: jpayne@68: # iterating taskseq cannot fail jpayne@68: for task in taskseq: jpayne@68: if thread._state != RUN: jpayne@68: util.debug('task handler found thread._state != RUN') jpayne@68: break jpayne@68: try: jpayne@68: put(task) jpayne@68: except Exception as e: jpayne@68: job, idx = task[:2] jpayne@68: try: jpayne@68: cache[job]._set(idx, (False, e)) jpayne@68: except KeyError: jpayne@68: pass jpayne@68: else: jpayne@68: if set_length: jpayne@68: util.debug('doing set_length()') jpayne@68: idx = task[1] if task else -1 jpayne@68: set_length(idx + 1) jpayne@68: continue jpayne@68: break jpayne@68: finally: jpayne@68: task = taskseq = job = None jpayne@68: else: jpayne@68: util.debug('task handler got sentinel') jpayne@68: jpayne@68: try: jpayne@68: # tell result handler to finish when cache is empty jpayne@68: util.debug('task handler sending sentinel to result handler') jpayne@68: outqueue.put(None) jpayne@68: jpayne@68: # tell workers there is no more work jpayne@68: util.debug('task handler sending sentinel to workers') jpayne@68: for p in pool: jpayne@68: put(None) jpayne@68: except OSError: jpayne@68: util.debug('task handler got OSError when sending sentinels') jpayne@68: jpayne@68: util.debug('task handler exiting') jpayne@68: jpayne@68: @staticmethod jpayne@68: def _handle_results(outqueue, get, cache): jpayne@68: thread = threading.current_thread() jpayne@68: jpayne@68: while 1: jpayne@68: try: jpayne@68: task = get() jpayne@68: except (OSError, EOFError): jpayne@68: util.debug('result handler got EOFError/OSError -- exiting') jpayne@68: return jpayne@68: jpayne@68: if thread._state != RUN: jpayne@68: assert thread._state == TERMINATE, "Thread not in TERMINATE" jpayne@68: util.debug('result handler found thread._state=TERMINATE') jpayne@68: break jpayne@68: jpayne@68: if task is None: jpayne@68: util.debug('result handler got sentinel') jpayne@68: break jpayne@68: jpayne@68: job, i, obj = task jpayne@68: try: jpayne@68: cache[job]._set(i, obj) jpayne@68: except KeyError: jpayne@68: pass jpayne@68: task = job = obj = None jpayne@68: jpayne@68: while cache and thread._state != TERMINATE: jpayne@68: try: jpayne@68: task = get() jpayne@68: except (OSError, EOFError): jpayne@68: util.debug('result handler got EOFError/OSError -- exiting') jpayne@68: return jpayne@68: jpayne@68: if task is None: jpayne@68: util.debug('result handler ignoring extra sentinel') jpayne@68: continue jpayne@68: job, i, obj = task jpayne@68: try: jpayne@68: cache[job]._set(i, obj) jpayne@68: except KeyError: jpayne@68: pass jpayne@68: task = job = obj = None jpayne@68: jpayne@68: if hasattr(outqueue, '_reader'): jpayne@68: util.debug('ensuring that outqueue is not full') jpayne@68: # If we don't make room available in outqueue then jpayne@68: # attempts to add the sentinel (None) to outqueue may jpayne@68: # block. There is guaranteed to be no more than 2 sentinels. jpayne@68: try: jpayne@68: for i in range(10): jpayne@68: if not outqueue._reader.poll(): jpayne@68: break jpayne@68: get() jpayne@68: except (OSError, EOFError): jpayne@68: pass jpayne@68: jpayne@68: util.debug('result handler exiting: len(cache)=%s, thread._state=%s', jpayne@68: len(cache), thread._state) jpayne@68: jpayne@68: @staticmethod jpayne@68: def _get_tasks(func, it, size): jpayne@68: it = iter(it) jpayne@68: while 1: jpayne@68: x = tuple(itertools.islice(it, size)) jpayne@68: if not x: jpayne@68: return jpayne@68: yield (func, x) jpayne@68: jpayne@68: def __reduce__(self): jpayne@68: raise NotImplementedError( jpayne@68: 'pool objects cannot be passed between processes or pickled' jpayne@68: ) jpayne@68: jpayne@68: def close(self): jpayne@68: util.debug('closing pool') jpayne@68: if self._state == RUN: jpayne@68: self._state = CLOSE jpayne@68: self._worker_handler._state = CLOSE jpayne@68: self._change_notifier.put(None) jpayne@68: jpayne@68: def terminate(self): jpayne@68: util.debug('terminating pool') jpayne@68: self._state = TERMINATE jpayne@68: self._worker_handler._state = TERMINATE jpayne@68: self._change_notifier.put(None) jpayne@68: self._terminate() jpayne@68: jpayne@68: def join(self): jpayne@68: util.debug('joining pool') jpayne@68: if self._state == RUN: jpayne@68: raise ValueError("Pool is still running") jpayne@68: elif self._state not in (CLOSE, TERMINATE): jpayne@68: raise ValueError("In unknown state") jpayne@68: self._worker_handler.join() jpayne@68: self._task_handler.join() jpayne@68: self._result_handler.join() jpayne@68: for p in self._pool: jpayne@68: p.join() jpayne@68: jpayne@68: @staticmethod jpayne@68: def _help_stuff_finish(inqueue, task_handler, size): jpayne@68: # task_handler may be blocked trying to put items on inqueue jpayne@68: util.debug('removing tasks from inqueue until task handler finished') jpayne@68: inqueue._rlock.acquire() jpayne@68: while task_handler.is_alive() and inqueue._reader.poll(): jpayne@68: inqueue._reader.recv() jpayne@68: time.sleep(0) jpayne@68: jpayne@68: @classmethod jpayne@68: def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, jpayne@68: worker_handler, task_handler, result_handler, cache): jpayne@68: # this is guaranteed to only be called once jpayne@68: util.debug('finalizing pool') jpayne@68: jpayne@68: worker_handler._state = TERMINATE jpayne@68: task_handler._state = TERMINATE jpayne@68: jpayne@68: util.debug('helping task handler/workers to finish') jpayne@68: cls._help_stuff_finish(inqueue, task_handler, len(pool)) jpayne@68: jpayne@68: if (not result_handler.is_alive()) and (len(cache) != 0): jpayne@68: raise AssertionError( jpayne@68: "Cannot have cache with result_hander not alive") jpayne@68: jpayne@68: result_handler._state = TERMINATE jpayne@68: change_notifier.put(None) jpayne@68: outqueue.put(None) # sentinel jpayne@68: jpayne@68: # We must wait for the worker handler to exit before terminating jpayne@68: # workers because we don't want workers to be restarted behind our back. jpayne@68: util.debug('joining worker handler') jpayne@68: if threading.current_thread() is not worker_handler: jpayne@68: worker_handler.join() jpayne@68: jpayne@68: # Terminate workers which haven't already finished. jpayne@68: if pool and hasattr(pool[0], 'terminate'): jpayne@68: util.debug('terminating workers') jpayne@68: for p in pool: jpayne@68: if p.exitcode is None: jpayne@68: p.terminate() jpayne@68: jpayne@68: util.debug('joining task handler') jpayne@68: if threading.current_thread() is not task_handler: jpayne@68: task_handler.join() jpayne@68: jpayne@68: util.debug('joining result handler') jpayne@68: if threading.current_thread() is not result_handler: jpayne@68: result_handler.join() jpayne@68: jpayne@68: if pool and hasattr(pool[0], 'terminate'): jpayne@68: util.debug('joining pool workers') jpayne@68: for p in pool: jpayne@68: if p.is_alive(): jpayne@68: # worker has not yet exited jpayne@68: util.debug('cleaning up worker %d' % p.pid) jpayne@68: p.join() jpayne@68: jpayne@68: def __enter__(self): jpayne@68: self._check_running() jpayne@68: return self jpayne@68: jpayne@68: def __exit__(self, exc_type, exc_val, exc_tb): jpayne@68: self.terminate() jpayne@68: jpayne@68: # jpayne@68: # Class whose instances are returned by `Pool.apply_async()` jpayne@68: # jpayne@68: jpayne@68: class ApplyResult(object): jpayne@68: jpayne@68: def __init__(self, pool, callback, error_callback): jpayne@68: self._pool = pool jpayne@68: self._event = threading.Event() jpayne@68: self._job = next(job_counter) jpayne@68: self._cache = pool._cache jpayne@68: self._callback = callback jpayne@68: self._error_callback = error_callback jpayne@68: self._cache[self._job] = self jpayne@68: jpayne@68: def ready(self): jpayne@68: return self._event.is_set() jpayne@68: jpayne@68: def successful(self): jpayne@68: if not self.ready(): jpayne@68: raise ValueError("{0!r} not ready".format(self)) jpayne@68: return self._success jpayne@68: jpayne@68: def wait(self, timeout=None): jpayne@68: self._event.wait(timeout) jpayne@68: jpayne@68: def get(self, timeout=None): jpayne@68: self.wait(timeout) jpayne@68: if not self.ready(): jpayne@68: raise TimeoutError jpayne@68: if self._success: jpayne@68: return self._value jpayne@68: else: jpayne@68: raise self._value jpayne@68: jpayne@68: def _set(self, i, obj): jpayne@68: self._success, self._value = obj jpayne@68: if self._callback and self._success: jpayne@68: self._callback(self._value) jpayne@68: if self._error_callback and not self._success: jpayne@68: self._error_callback(self._value) jpayne@68: self._event.set() jpayne@68: del self._cache[self._job] jpayne@68: self._pool = None jpayne@68: jpayne@68: AsyncResult = ApplyResult # create alias -- see #17805 jpayne@68: jpayne@68: # jpayne@68: # Class whose instances are returned by `Pool.map_async()` jpayne@68: # jpayne@68: jpayne@68: class MapResult(ApplyResult): jpayne@68: jpayne@68: def __init__(self, pool, chunksize, length, callback, error_callback): jpayne@68: ApplyResult.__init__(self, pool, callback, jpayne@68: error_callback=error_callback) jpayne@68: self._success = True jpayne@68: self._value = [None] * length jpayne@68: self._chunksize = chunksize jpayne@68: if chunksize <= 0: jpayne@68: self._number_left = 0 jpayne@68: self._event.set() jpayne@68: del self._cache[self._job] jpayne@68: else: jpayne@68: self._number_left = length//chunksize + bool(length % chunksize) jpayne@68: jpayne@68: def _set(self, i, success_result): jpayne@68: self._number_left -= 1 jpayne@68: success, result = success_result jpayne@68: if success and self._success: jpayne@68: self._value[i*self._chunksize:(i+1)*self._chunksize] = result jpayne@68: if self._number_left == 0: jpayne@68: if self._callback: jpayne@68: self._callback(self._value) jpayne@68: del self._cache[self._job] jpayne@68: self._event.set() jpayne@68: self._pool = None jpayne@68: else: jpayne@68: if not success and self._success: jpayne@68: # only store first exception jpayne@68: self._success = False jpayne@68: self._value = result jpayne@68: if self._number_left == 0: jpayne@68: # only consider the result ready once all jobs are done jpayne@68: if self._error_callback: jpayne@68: self._error_callback(self._value) jpayne@68: del self._cache[self._job] jpayne@68: self._event.set() jpayne@68: self._pool = None jpayne@68: jpayne@68: # jpayne@68: # Class whose instances are returned by `Pool.imap()` jpayne@68: # jpayne@68: jpayne@68: class IMapIterator(object): jpayne@68: jpayne@68: def __init__(self, pool): jpayne@68: self._pool = pool jpayne@68: self._cond = threading.Condition(threading.Lock()) jpayne@68: self._job = next(job_counter) jpayne@68: self._cache = pool._cache jpayne@68: self._items = collections.deque() jpayne@68: self._index = 0 jpayne@68: self._length = None jpayne@68: self._unsorted = {} jpayne@68: self._cache[self._job] = self jpayne@68: jpayne@68: def __iter__(self): jpayne@68: return self jpayne@68: jpayne@68: def next(self, timeout=None): jpayne@68: with self._cond: jpayne@68: try: jpayne@68: item = self._items.popleft() jpayne@68: except IndexError: jpayne@68: if self._index == self._length: jpayne@68: self._pool = None jpayne@68: raise StopIteration from None jpayne@68: self._cond.wait(timeout) jpayne@68: try: jpayne@68: item = self._items.popleft() jpayne@68: except IndexError: jpayne@68: if self._index == self._length: jpayne@68: self._pool = None jpayne@68: raise StopIteration from None jpayne@68: raise TimeoutError from None jpayne@68: jpayne@68: success, value = item jpayne@68: if success: jpayne@68: return value jpayne@68: raise value jpayne@68: jpayne@68: __next__ = next # XXX jpayne@68: jpayne@68: def _set(self, i, obj): jpayne@68: with self._cond: jpayne@68: if self._index == i: jpayne@68: self._items.append(obj) jpayne@68: self._index += 1 jpayne@68: while self._index in self._unsorted: jpayne@68: obj = self._unsorted.pop(self._index) jpayne@68: self._items.append(obj) jpayne@68: self._index += 1 jpayne@68: self._cond.notify() jpayne@68: else: jpayne@68: self._unsorted[i] = obj jpayne@68: jpayne@68: if self._index == self._length: jpayne@68: del self._cache[self._job] jpayne@68: self._pool = None jpayne@68: jpayne@68: def _set_length(self, length): jpayne@68: with self._cond: jpayne@68: self._length = length jpayne@68: if self._index == self._length: jpayne@68: self._cond.notify() jpayne@68: del self._cache[self._job] jpayne@68: self._pool = None jpayne@68: jpayne@68: # jpayne@68: # Class whose instances are returned by `Pool.imap_unordered()` jpayne@68: # jpayne@68: jpayne@68: class IMapUnorderedIterator(IMapIterator): jpayne@68: jpayne@68: def _set(self, i, obj): jpayne@68: with self._cond: jpayne@68: self._items.append(obj) jpayne@68: self._index += 1 jpayne@68: self._cond.notify() jpayne@68: if self._index == self._length: jpayne@68: del self._cache[self._job] jpayne@68: self._pool = None jpayne@68: jpayne@68: # jpayne@68: # jpayne@68: # jpayne@68: jpayne@68: class ThreadPool(Pool): jpayne@68: _wrap_exception = False jpayne@68: jpayne@68: @staticmethod jpayne@68: def Process(ctx, *args, **kwds): jpayne@68: from .dummy import Process jpayne@68: return Process(*args, **kwds) jpayne@68: jpayne@68: def __init__(self, processes=None, initializer=None, initargs=()): jpayne@68: Pool.__init__(self, processes, initializer, initargs) jpayne@68: jpayne@68: def _setup_queues(self): jpayne@68: self._inqueue = queue.SimpleQueue() jpayne@68: self._outqueue = queue.SimpleQueue() jpayne@68: self._quick_put = self._inqueue.put jpayne@68: self._quick_get = self._outqueue.get jpayne@68: jpayne@68: def _get_sentinels(self): jpayne@68: return [self._change_notifier._reader] jpayne@68: jpayne@68: @staticmethod jpayne@68: def _get_worker_sentinels(workers): jpayne@68: return [] jpayne@68: jpayne@68: @staticmethod jpayne@68: def _help_stuff_finish(inqueue, task_handler, size): jpayne@68: # drain inqueue, and put sentinels at its head to make workers finish jpayne@68: try: jpayne@68: while True: jpayne@68: inqueue.get(block=False) jpayne@68: except queue.Empty: jpayne@68: pass jpayne@68: for i in range(size): jpayne@68: inqueue.put(None) jpayne@68: jpayne@68: def _wait_for_updates(self, sentinels, change_notifier, timeout): jpayne@68: time.sleep(timeout)