jpayne@68
|
1 #
|
jpayne@68
|
2 # Module providing the `Pool` class for managing a process pool
|
jpayne@68
|
3 #
|
jpayne@68
|
4 # multiprocessing/pool.py
|
jpayne@68
|
5 #
|
jpayne@68
|
6 # Copyright (c) 2006-2008, R Oudkerk
|
jpayne@68
|
7 # Licensed to PSF under a Contributor Agreement.
|
jpayne@68
|
8 #
|
jpayne@68
|
9
|
jpayne@68
|
10 __all__ = ['Pool', 'ThreadPool']
|
jpayne@68
|
11
|
jpayne@68
|
12 #
|
jpayne@68
|
13 # Imports
|
jpayne@68
|
14 #
|
jpayne@68
|
15
|
jpayne@68
|
16 import collections
|
jpayne@68
|
17 import itertools
|
jpayne@68
|
18 import os
|
jpayne@68
|
19 import queue
|
jpayne@68
|
20 import threading
|
jpayne@68
|
21 import time
|
jpayne@68
|
22 import traceback
|
jpayne@68
|
23 import warnings
|
jpayne@68
|
24 from queue import Empty
|
jpayne@68
|
25
|
jpayne@68
|
26 # If threading is available then ThreadPool should be provided. Therefore
|
jpayne@68
|
27 # we avoid top-level imports which are liable to fail on some systems.
|
jpayne@68
|
28 from . import util
|
jpayne@68
|
29 from . import get_context, TimeoutError
|
jpayne@68
|
30 from .connection import wait
|
jpayne@68
|
31
|
jpayne@68
|
32 #
|
jpayne@68
|
33 # Constants representing the state of a pool
|
jpayne@68
|
34 #
|
jpayne@68
|
35
|
jpayne@68
|
36 INIT = "INIT"
|
jpayne@68
|
37 RUN = "RUN"
|
jpayne@68
|
38 CLOSE = "CLOSE"
|
jpayne@68
|
39 TERMINATE = "TERMINATE"
|
jpayne@68
|
40
|
jpayne@68
|
41 #
|
jpayne@68
|
42 # Miscellaneous
|
jpayne@68
|
43 #
|
jpayne@68
|
44
|
jpayne@68
|
45 job_counter = itertools.count()
|
jpayne@68
|
46
|
jpayne@68
|
47 def mapstar(args):
|
jpayne@68
|
48 return list(map(*args))
|
jpayne@68
|
49
|
jpayne@68
|
50 def starmapstar(args):
|
jpayne@68
|
51 return list(itertools.starmap(args[0], args[1]))
|
jpayne@68
|
52
|
jpayne@68
|
53 #
|
jpayne@68
|
54 # Hack to embed stringification of remote traceback in local traceback
|
jpayne@68
|
55 #
|
jpayne@68
|
56
|
jpayne@68
|
57 class RemoteTraceback(Exception):
|
jpayne@68
|
58 def __init__(self, tb):
|
jpayne@68
|
59 self.tb = tb
|
jpayne@68
|
60 def __str__(self):
|
jpayne@68
|
61 return self.tb
|
jpayne@68
|
62
|
jpayne@68
|
63 class ExceptionWithTraceback:
|
jpayne@68
|
64 def __init__(self, exc, tb):
|
jpayne@68
|
65 tb = traceback.format_exception(type(exc), exc, tb)
|
jpayne@68
|
66 tb = ''.join(tb)
|
jpayne@68
|
67 self.exc = exc
|
jpayne@68
|
68 self.tb = '\n"""\n%s"""' % tb
|
jpayne@68
|
69 def __reduce__(self):
|
jpayne@68
|
70 return rebuild_exc, (self.exc, self.tb)
|
jpayne@68
|
71
|
jpayne@68
|
72 def rebuild_exc(exc, tb):
|
jpayne@68
|
73 exc.__cause__ = RemoteTraceback(tb)
|
jpayne@68
|
74 return exc
|
jpayne@68
|
75
|
jpayne@68
|
76 #
|
jpayne@68
|
77 # Code run by worker processes
|
jpayne@68
|
78 #
|
jpayne@68
|
79
|
jpayne@68
|
80 class MaybeEncodingError(Exception):
|
jpayne@68
|
81 """Wraps possible unpickleable errors, so they can be
|
jpayne@68
|
82 safely sent through the socket."""
|
jpayne@68
|
83
|
jpayne@68
|
84 def __init__(self, exc, value):
|
jpayne@68
|
85 self.exc = repr(exc)
|
jpayne@68
|
86 self.value = repr(value)
|
jpayne@68
|
87 super(MaybeEncodingError, self).__init__(self.exc, self.value)
|
jpayne@68
|
88
|
jpayne@68
|
89 def __str__(self):
|
jpayne@68
|
90 return "Error sending result: '%s'. Reason: '%s'" % (self.value,
|
jpayne@68
|
91 self.exc)
|
jpayne@68
|
92
|
jpayne@68
|
93 def __repr__(self):
|
jpayne@68
|
94 return "<%s: %s>" % (self.__class__.__name__, self)
|
jpayne@68
|
95
|
jpayne@68
|
96
|
jpayne@68
|
97 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
|
jpayne@68
|
98 wrap_exception=False):
|
jpayne@68
|
99 if (maxtasks is not None) and not (isinstance(maxtasks, int)
|
jpayne@68
|
100 and maxtasks >= 1):
|
jpayne@68
|
101 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
|
jpayne@68
|
102 put = outqueue.put
|
jpayne@68
|
103 get = inqueue.get
|
jpayne@68
|
104 if hasattr(inqueue, '_writer'):
|
jpayne@68
|
105 inqueue._writer.close()
|
jpayne@68
|
106 outqueue._reader.close()
|
jpayne@68
|
107
|
jpayne@68
|
108 if initializer is not None:
|
jpayne@68
|
109 initializer(*initargs)
|
jpayne@68
|
110
|
jpayne@68
|
111 completed = 0
|
jpayne@68
|
112 while maxtasks is None or (maxtasks and completed < maxtasks):
|
jpayne@68
|
113 try:
|
jpayne@68
|
114 task = get()
|
jpayne@68
|
115 except (EOFError, OSError):
|
jpayne@68
|
116 util.debug('worker got EOFError or OSError -- exiting')
|
jpayne@68
|
117 break
|
jpayne@68
|
118
|
jpayne@68
|
119 if task is None:
|
jpayne@68
|
120 util.debug('worker got sentinel -- exiting')
|
jpayne@68
|
121 break
|
jpayne@68
|
122
|
jpayne@68
|
123 job, i, func, args, kwds = task
|
jpayne@68
|
124 try:
|
jpayne@68
|
125 result = (True, func(*args, **kwds))
|
jpayne@68
|
126 except Exception as e:
|
jpayne@68
|
127 if wrap_exception and func is not _helper_reraises_exception:
|
jpayne@68
|
128 e = ExceptionWithTraceback(e, e.__traceback__)
|
jpayne@68
|
129 result = (False, e)
|
jpayne@68
|
130 try:
|
jpayne@68
|
131 put((job, i, result))
|
jpayne@68
|
132 except Exception as e:
|
jpayne@68
|
133 wrapped = MaybeEncodingError(e, result[1])
|
jpayne@68
|
134 util.debug("Possible encoding error while sending result: %s" % (
|
jpayne@68
|
135 wrapped))
|
jpayne@68
|
136 put((job, i, (False, wrapped)))
|
jpayne@68
|
137
|
jpayne@68
|
138 task = job = result = func = args = kwds = None
|
jpayne@68
|
139 completed += 1
|
jpayne@68
|
140 util.debug('worker exiting after %d tasks' % completed)
|
jpayne@68
|
141
|
jpayne@68
|
142 def _helper_reraises_exception(ex):
|
jpayne@68
|
143 'Pickle-able helper function for use by _guarded_task_generation.'
|
jpayne@68
|
144 raise ex
|
jpayne@68
|
145
|
jpayne@68
|
146 #
|
jpayne@68
|
147 # Class representing a process pool
|
jpayne@68
|
148 #
|
jpayne@68
|
149
|
jpayne@68
|
150 class _PoolCache(dict):
|
jpayne@68
|
151 """
|
jpayne@68
|
152 Class that implements a cache for the Pool class that will notify
|
jpayne@68
|
153 the pool management threads every time the cache is emptied. The
|
jpayne@68
|
154 notification is done by the use of a queue that is provided when
|
jpayne@68
|
155 instantiating the cache.
|
jpayne@68
|
156 """
|
jpayne@68
|
157 def __init__(self, /, *args, notifier=None, **kwds):
|
jpayne@68
|
158 self.notifier = notifier
|
jpayne@68
|
159 super().__init__(*args, **kwds)
|
jpayne@68
|
160
|
jpayne@68
|
161 def __delitem__(self, item):
|
jpayne@68
|
162 super().__delitem__(item)
|
jpayne@68
|
163
|
jpayne@68
|
164 # Notify that the cache is empty. This is important because the
|
jpayne@68
|
165 # pool keeps maintaining workers until the cache gets drained. This
|
jpayne@68
|
166 # eliminates a race condition in which a task is finished after the
|
jpayne@68
|
167 # the pool's _handle_workers method has enter another iteration of the
|
jpayne@68
|
168 # loop. In this situation, the only event that can wake up the pool
|
jpayne@68
|
169 # is the cache to be emptied (no more tasks available).
|
jpayne@68
|
170 if not self:
|
jpayne@68
|
171 self.notifier.put(None)
|
jpayne@68
|
172
|
jpayne@68
|
173 class Pool(object):
|
jpayne@68
|
174 '''
|
jpayne@68
|
175 Class which supports an async version of applying functions to arguments.
|
jpayne@68
|
176 '''
|
jpayne@68
|
177 _wrap_exception = True
|
jpayne@68
|
178
|
jpayne@68
|
179 @staticmethod
|
jpayne@68
|
180 def Process(ctx, *args, **kwds):
|
jpayne@68
|
181 return ctx.Process(*args, **kwds)
|
jpayne@68
|
182
|
jpayne@68
|
183 def __init__(self, processes=None, initializer=None, initargs=(),
|
jpayne@68
|
184 maxtasksperchild=None, context=None):
|
jpayne@68
|
185 # Attributes initialized early to make sure that they exist in
|
jpayne@68
|
186 # __del__() if __init__() raises an exception
|
jpayne@68
|
187 self._pool = []
|
jpayne@68
|
188 self._state = INIT
|
jpayne@68
|
189
|
jpayne@68
|
190 self._ctx = context or get_context()
|
jpayne@68
|
191 self._setup_queues()
|
jpayne@68
|
192 self._taskqueue = queue.SimpleQueue()
|
jpayne@68
|
193 # The _change_notifier queue exist to wake up self._handle_workers()
|
jpayne@68
|
194 # when the cache (self._cache) is empty or when there is a change in
|
jpayne@68
|
195 # the _state variable of the thread that runs _handle_workers.
|
jpayne@68
|
196 self._change_notifier = self._ctx.SimpleQueue()
|
jpayne@68
|
197 self._cache = _PoolCache(notifier=self._change_notifier)
|
jpayne@68
|
198 self._maxtasksperchild = maxtasksperchild
|
jpayne@68
|
199 self._initializer = initializer
|
jpayne@68
|
200 self._initargs = initargs
|
jpayne@68
|
201
|
jpayne@68
|
202 if processes is None:
|
jpayne@68
|
203 processes = os.cpu_count() or 1
|
jpayne@68
|
204 if processes < 1:
|
jpayne@68
|
205 raise ValueError("Number of processes must be at least 1")
|
jpayne@68
|
206
|
jpayne@68
|
207 if initializer is not None and not callable(initializer):
|
jpayne@68
|
208 raise TypeError('initializer must be a callable')
|
jpayne@68
|
209
|
jpayne@68
|
210 self._processes = processes
|
jpayne@68
|
211 try:
|
jpayne@68
|
212 self._repopulate_pool()
|
jpayne@68
|
213 except Exception:
|
jpayne@68
|
214 for p in self._pool:
|
jpayne@68
|
215 if p.exitcode is None:
|
jpayne@68
|
216 p.terminate()
|
jpayne@68
|
217 for p in self._pool:
|
jpayne@68
|
218 p.join()
|
jpayne@68
|
219 raise
|
jpayne@68
|
220
|
jpayne@68
|
221 sentinels = self._get_sentinels()
|
jpayne@68
|
222
|
jpayne@68
|
223 self._worker_handler = threading.Thread(
|
jpayne@68
|
224 target=Pool._handle_workers,
|
jpayne@68
|
225 args=(self._cache, self._taskqueue, self._ctx, self.Process,
|
jpayne@68
|
226 self._processes, self._pool, self._inqueue, self._outqueue,
|
jpayne@68
|
227 self._initializer, self._initargs, self._maxtasksperchild,
|
jpayne@68
|
228 self._wrap_exception, sentinels, self._change_notifier)
|
jpayne@68
|
229 )
|
jpayne@68
|
230 self._worker_handler.daemon = True
|
jpayne@68
|
231 self._worker_handler._state = RUN
|
jpayne@68
|
232 self._worker_handler.start()
|
jpayne@68
|
233
|
jpayne@68
|
234
|
jpayne@68
|
235 self._task_handler = threading.Thread(
|
jpayne@68
|
236 target=Pool._handle_tasks,
|
jpayne@68
|
237 args=(self._taskqueue, self._quick_put, self._outqueue,
|
jpayne@68
|
238 self._pool, self._cache)
|
jpayne@68
|
239 )
|
jpayne@68
|
240 self._task_handler.daemon = True
|
jpayne@68
|
241 self._task_handler._state = RUN
|
jpayne@68
|
242 self._task_handler.start()
|
jpayne@68
|
243
|
jpayne@68
|
244 self._result_handler = threading.Thread(
|
jpayne@68
|
245 target=Pool._handle_results,
|
jpayne@68
|
246 args=(self._outqueue, self._quick_get, self._cache)
|
jpayne@68
|
247 )
|
jpayne@68
|
248 self._result_handler.daemon = True
|
jpayne@68
|
249 self._result_handler._state = RUN
|
jpayne@68
|
250 self._result_handler.start()
|
jpayne@68
|
251
|
jpayne@68
|
252 self._terminate = util.Finalize(
|
jpayne@68
|
253 self, self._terminate_pool,
|
jpayne@68
|
254 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
|
jpayne@68
|
255 self._change_notifier, self._worker_handler, self._task_handler,
|
jpayne@68
|
256 self._result_handler, self._cache),
|
jpayne@68
|
257 exitpriority=15
|
jpayne@68
|
258 )
|
jpayne@68
|
259 self._state = RUN
|
jpayne@68
|
260
|
jpayne@68
|
261 # Copy globals as function locals to make sure that they are available
|
jpayne@68
|
262 # during Python shutdown when the Pool is destroyed.
|
jpayne@68
|
263 def __del__(self, _warn=warnings.warn, RUN=RUN):
|
jpayne@68
|
264 if self._state == RUN:
|
jpayne@68
|
265 _warn(f"unclosed running multiprocessing pool {self!r}",
|
jpayne@68
|
266 ResourceWarning, source=self)
|
jpayne@68
|
267 if getattr(self, '_change_notifier', None) is not None:
|
jpayne@68
|
268 self._change_notifier.put(None)
|
jpayne@68
|
269
|
jpayne@68
|
270 def __repr__(self):
|
jpayne@68
|
271 cls = self.__class__
|
jpayne@68
|
272 return (f'<{cls.__module__}.{cls.__qualname__} '
|
jpayne@68
|
273 f'state={self._state} '
|
jpayne@68
|
274 f'pool_size={len(self._pool)}>')
|
jpayne@68
|
275
|
jpayne@68
|
276 def _get_sentinels(self):
|
jpayne@68
|
277 task_queue_sentinels = [self._outqueue._reader]
|
jpayne@68
|
278 self_notifier_sentinels = [self._change_notifier._reader]
|
jpayne@68
|
279 return [*task_queue_sentinels, *self_notifier_sentinels]
|
jpayne@68
|
280
|
jpayne@68
|
281 @staticmethod
|
jpayne@68
|
282 def _get_worker_sentinels(workers):
|
jpayne@68
|
283 return [worker.sentinel for worker in
|
jpayne@68
|
284 workers if hasattr(worker, "sentinel")]
|
jpayne@68
|
285
|
jpayne@68
|
286 @staticmethod
|
jpayne@68
|
287 def _join_exited_workers(pool):
|
jpayne@68
|
288 """Cleanup after any worker processes which have exited due to reaching
|
jpayne@68
|
289 their specified lifetime. Returns True if any workers were cleaned up.
|
jpayne@68
|
290 """
|
jpayne@68
|
291 cleaned = False
|
jpayne@68
|
292 for i in reversed(range(len(pool))):
|
jpayne@68
|
293 worker = pool[i]
|
jpayne@68
|
294 if worker.exitcode is not None:
|
jpayne@68
|
295 # worker exited
|
jpayne@68
|
296 util.debug('cleaning up worker %d' % i)
|
jpayne@68
|
297 worker.join()
|
jpayne@68
|
298 cleaned = True
|
jpayne@68
|
299 del pool[i]
|
jpayne@68
|
300 return cleaned
|
jpayne@68
|
301
|
jpayne@68
|
302 def _repopulate_pool(self):
|
jpayne@68
|
303 return self._repopulate_pool_static(self._ctx, self.Process,
|
jpayne@68
|
304 self._processes,
|
jpayne@68
|
305 self._pool, self._inqueue,
|
jpayne@68
|
306 self._outqueue, self._initializer,
|
jpayne@68
|
307 self._initargs,
|
jpayne@68
|
308 self._maxtasksperchild,
|
jpayne@68
|
309 self._wrap_exception)
|
jpayne@68
|
310
|
jpayne@68
|
311 @staticmethod
|
jpayne@68
|
312 def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
|
jpayne@68
|
313 outqueue, initializer, initargs,
|
jpayne@68
|
314 maxtasksperchild, wrap_exception):
|
jpayne@68
|
315 """Bring the number of pool processes up to the specified number,
|
jpayne@68
|
316 for use after reaping workers which have exited.
|
jpayne@68
|
317 """
|
jpayne@68
|
318 for i in range(processes - len(pool)):
|
jpayne@68
|
319 w = Process(ctx, target=worker,
|
jpayne@68
|
320 args=(inqueue, outqueue,
|
jpayne@68
|
321 initializer,
|
jpayne@68
|
322 initargs, maxtasksperchild,
|
jpayne@68
|
323 wrap_exception))
|
jpayne@68
|
324 w.name = w.name.replace('Process', 'PoolWorker')
|
jpayne@68
|
325 w.daemon = True
|
jpayne@68
|
326 w.start()
|
jpayne@68
|
327 pool.append(w)
|
jpayne@68
|
328 util.debug('added worker')
|
jpayne@68
|
329
|
jpayne@68
|
330 @staticmethod
|
jpayne@68
|
331 def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
|
jpayne@68
|
332 initializer, initargs, maxtasksperchild,
|
jpayne@68
|
333 wrap_exception):
|
jpayne@68
|
334 """Clean up any exited workers and start replacements for them.
|
jpayne@68
|
335 """
|
jpayne@68
|
336 if Pool._join_exited_workers(pool):
|
jpayne@68
|
337 Pool._repopulate_pool_static(ctx, Process, processes, pool,
|
jpayne@68
|
338 inqueue, outqueue, initializer,
|
jpayne@68
|
339 initargs, maxtasksperchild,
|
jpayne@68
|
340 wrap_exception)
|
jpayne@68
|
341
|
jpayne@68
|
342 def _setup_queues(self):
|
jpayne@68
|
343 self._inqueue = self._ctx.SimpleQueue()
|
jpayne@68
|
344 self._outqueue = self._ctx.SimpleQueue()
|
jpayne@68
|
345 self._quick_put = self._inqueue._writer.send
|
jpayne@68
|
346 self._quick_get = self._outqueue._reader.recv
|
jpayne@68
|
347
|
jpayne@68
|
348 def _check_running(self):
|
jpayne@68
|
349 if self._state != RUN:
|
jpayne@68
|
350 raise ValueError("Pool not running")
|
jpayne@68
|
351
|
jpayne@68
|
352 def apply(self, func, args=(), kwds={}):
|
jpayne@68
|
353 '''
|
jpayne@68
|
354 Equivalent of `func(*args, **kwds)`.
|
jpayne@68
|
355 Pool must be running.
|
jpayne@68
|
356 '''
|
jpayne@68
|
357 return self.apply_async(func, args, kwds).get()
|
jpayne@68
|
358
|
jpayne@68
|
359 def map(self, func, iterable, chunksize=None):
|
jpayne@68
|
360 '''
|
jpayne@68
|
361 Apply `func` to each element in `iterable`, collecting the results
|
jpayne@68
|
362 in a list that is returned.
|
jpayne@68
|
363 '''
|
jpayne@68
|
364 return self._map_async(func, iterable, mapstar, chunksize).get()
|
jpayne@68
|
365
|
jpayne@68
|
366 def starmap(self, func, iterable, chunksize=None):
|
jpayne@68
|
367 '''
|
jpayne@68
|
368 Like `map()` method but the elements of the `iterable` are expected to
|
jpayne@68
|
369 be iterables as well and will be unpacked as arguments. Hence
|
jpayne@68
|
370 `func` and (a, b) becomes func(a, b).
|
jpayne@68
|
371 '''
|
jpayne@68
|
372 return self._map_async(func, iterable, starmapstar, chunksize).get()
|
jpayne@68
|
373
|
jpayne@68
|
374 def starmap_async(self, func, iterable, chunksize=None, callback=None,
|
jpayne@68
|
375 error_callback=None):
|
jpayne@68
|
376 '''
|
jpayne@68
|
377 Asynchronous version of `starmap()` method.
|
jpayne@68
|
378 '''
|
jpayne@68
|
379 return self._map_async(func, iterable, starmapstar, chunksize,
|
jpayne@68
|
380 callback, error_callback)
|
jpayne@68
|
381
|
jpayne@68
|
382 def _guarded_task_generation(self, result_job, func, iterable):
|
jpayne@68
|
383 '''Provides a generator of tasks for imap and imap_unordered with
|
jpayne@68
|
384 appropriate handling for iterables which throw exceptions during
|
jpayne@68
|
385 iteration.'''
|
jpayne@68
|
386 try:
|
jpayne@68
|
387 i = -1
|
jpayne@68
|
388 for i, x in enumerate(iterable):
|
jpayne@68
|
389 yield (result_job, i, func, (x,), {})
|
jpayne@68
|
390 except Exception as e:
|
jpayne@68
|
391 yield (result_job, i+1, _helper_reraises_exception, (e,), {})
|
jpayne@68
|
392
|
jpayne@68
|
393 def imap(self, func, iterable, chunksize=1):
|
jpayne@68
|
394 '''
|
jpayne@68
|
395 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
|
jpayne@68
|
396 '''
|
jpayne@68
|
397 self._check_running()
|
jpayne@68
|
398 if chunksize == 1:
|
jpayne@68
|
399 result = IMapIterator(self)
|
jpayne@68
|
400 self._taskqueue.put(
|
jpayne@68
|
401 (
|
jpayne@68
|
402 self._guarded_task_generation(result._job, func, iterable),
|
jpayne@68
|
403 result._set_length
|
jpayne@68
|
404 ))
|
jpayne@68
|
405 return result
|
jpayne@68
|
406 else:
|
jpayne@68
|
407 if chunksize < 1:
|
jpayne@68
|
408 raise ValueError(
|
jpayne@68
|
409 "Chunksize must be 1+, not {0:n}".format(
|
jpayne@68
|
410 chunksize))
|
jpayne@68
|
411 task_batches = Pool._get_tasks(func, iterable, chunksize)
|
jpayne@68
|
412 result = IMapIterator(self)
|
jpayne@68
|
413 self._taskqueue.put(
|
jpayne@68
|
414 (
|
jpayne@68
|
415 self._guarded_task_generation(result._job,
|
jpayne@68
|
416 mapstar,
|
jpayne@68
|
417 task_batches),
|
jpayne@68
|
418 result._set_length
|
jpayne@68
|
419 ))
|
jpayne@68
|
420 return (item for chunk in result for item in chunk)
|
jpayne@68
|
421
|
jpayne@68
|
422 def imap_unordered(self, func, iterable, chunksize=1):
|
jpayne@68
|
423 '''
|
jpayne@68
|
424 Like `imap()` method but ordering of results is arbitrary.
|
jpayne@68
|
425 '''
|
jpayne@68
|
426 self._check_running()
|
jpayne@68
|
427 if chunksize == 1:
|
jpayne@68
|
428 result = IMapUnorderedIterator(self)
|
jpayne@68
|
429 self._taskqueue.put(
|
jpayne@68
|
430 (
|
jpayne@68
|
431 self._guarded_task_generation(result._job, func, iterable),
|
jpayne@68
|
432 result._set_length
|
jpayne@68
|
433 ))
|
jpayne@68
|
434 return result
|
jpayne@68
|
435 else:
|
jpayne@68
|
436 if chunksize < 1:
|
jpayne@68
|
437 raise ValueError(
|
jpayne@68
|
438 "Chunksize must be 1+, not {0!r}".format(chunksize))
|
jpayne@68
|
439 task_batches = Pool._get_tasks(func, iterable, chunksize)
|
jpayne@68
|
440 result = IMapUnorderedIterator(self)
|
jpayne@68
|
441 self._taskqueue.put(
|
jpayne@68
|
442 (
|
jpayne@68
|
443 self._guarded_task_generation(result._job,
|
jpayne@68
|
444 mapstar,
|
jpayne@68
|
445 task_batches),
|
jpayne@68
|
446 result._set_length
|
jpayne@68
|
447 ))
|
jpayne@68
|
448 return (item for chunk in result for item in chunk)
|
jpayne@68
|
449
|
jpayne@68
|
450 def apply_async(self, func, args=(), kwds={}, callback=None,
|
jpayne@68
|
451 error_callback=None):
|
jpayne@68
|
452 '''
|
jpayne@68
|
453 Asynchronous version of `apply()` method.
|
jpayne@68
|
454 '''
|
jpayne@68
|
455 self._check_running()
|
jpayne@68
|
456 result = ApplyResult(self, callback, error_callback)
|
jpayne@68
|
457 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
|
jpayne@68
|
458 return result
|
jpayne@68
|
459
|
jpayne@68
|
460 def map_async(self, func, iterable, chunksize=None, callback=None,
|
jpayne@68
|
461 error_callback=None):
|
jpayne@68
|
462 '''
|
jpayne@68
|
463 Asynchronous version of `map()` method.
|
jpayne@68
|
464 '''
|
jpayne@68
|
465 return self._map_async(func, iterable, mapstar, chunksize, callback,
|
jpayne@68
|
466 error_callback)
|
jpayne@68
|
467
|
jpayne@68
|
468 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
|
jpayne@68
|
469 error_callback=None):
|
jpayne@68
|
470 '''
|
jpayne@68
|
471 Helper function to implement map, starmap and their async counterparts.
|
jpayne@68
|
472 '''
|
jpayne@68
|
473 self._check_running()
|
jpayne@68
|
474 if not hasattr(iterable, '__len__'):
|
jpayne@68
|
475 iterable = list(iterable)
|
jpayne@68
|
476
|
jpayne@68
|
477 if chunksize is None:
|
jpayne@68
|
478 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
|
jpayne@68
|
479 if extra:
|
jpayne@68
|
480 chunksize += 1
|
jpayne@68
|
481 if len(iterable) == 0:
|
jpayne@68
|
482 chunksize = 0
|
jpayne@68
|
483
|
jpayne@68
|
484 task_batches = Pool._get_tasks(func, iterable, chunksize)
|
jpayne@68
|
485 result = MapResult(self, chunksize, len(iterable), callback,
|
jpayne@68
|
486 error_callback=error_callback)
|
jpayne@68
|
487 self._taskqueue.put(
|
jpayne@68
|
488 (
|
jpayne@68
|
489 self._guarded_task_generation(result._job,
|
jpayne@68
|
490 mapper,
|
jpayne@68
|
491 task_batches),
|
jpayne@68
|
492 None
|
jpayne@68
|
493 )
|
jpayne@68
|
494 )
|
jpayne@68
|
495 return result
|
jpayne@68
|
496
|
jpayne@68
|
497 @staticmethod
|
jpayne@68
|
498 def _wait_for_updates(sentinels, change_notifier, timeout=None):
|
jpayne@68
|
499 wait(sentinels, timeout=timeout)
|
jpayne@68
|
500 while not change_notifier.empty():
|
jpayne@68
|
501 change_notifier.get()
|
jpayne@68
|
502
|
jpayne@68
|
503 @classmethod
|
jpayne@68
|
504 def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
|
jpayne@68
|
505 pool, inqueue, outqueue, initializer, initargs,
|
jpayne@68
|
506 maxtasksperchild, wrap_exception, sentinels,
|
jpayne@68
|
507 change_notifier):
|
jpayne@68
|
508 thread = threading.current_thread()
|
jpayne@68
|
509
|
jpayne@68
|
510 # Keep maintaining workers until the cache gets drained, unless the pool
|
jpayne@68
|
511 # is terminated.
|
jpayne@68
|
512 while thread._state == RUN or (cache and thread._state != TERMINATE):
|
jpayne@68
|
513 cls._maintain_pool(ctx, Process, processes, pool, inqueue,
|
jpayne@68
|
514 outqueue, initializer, initargs,
|
jpayne@68
|
515 maxtasksperchild, wrap_exception)
|
jpayne@68
|
516
|
jpayne@68
|
517 current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
|
jpayne@68
|
518
|
jpayne@68
|
519 cls._wait_for_updates(current_sentinels, change_notifier)
|
jpayne@68
|
520 # send sentinel to stop workers
|
jpayne@68
|
521 taskqueue.put(None)
|
jpayne@68
|
522 util.debug('worker handler exiting')
|
jpayne@68
|
523
|
jpayne@68
|
524 @staticmethod
|
jpayne@68
|
525 def _handle_tasks(taskqueue, put, outqueue, pool, cache):
|
jpayne@68
|
526 thread = threading.current_thread()
|
jpayne@68
|
527
|
jpayne@68
|
528 for taskseq, set_length in iter(taskqueue.get, None):
|
jpayne@68
|
529 task = None
|
jpayne@68
|
530 try:
|
jpayne@68
|
531 # iterating taskseq cannot fail
|
jpayne@68
|
532 for task in taskseq:
|
jpayne@68
|
533 if thread._state != RUN:
|
jpayne@68
|
534 util.debug('task handler found thread._state != RUN')
|
jpayne@68
|
535 break
|
jpayne@68
|
536 try:
|
jpayne@68
|
537 put(task)
|
jpayne@68
|
538 except Exception as e:
|
jpayne@68
|
539 job, idx = task[:2]
|
jpayne@68
|
540 try:
|
jpayne@68
|
541 cache[job]._set(idx, (False, e))
|
jpayne@68
|
542 except KeyError:
|
jpayne@68
|
543 pass
|
jpayne@68
|
544 else:
|
jpayne@68
|
545 if set_length:
|
jpayne@68
|
546 util.debug('doing set_length()')
|
jpayne@68
|
547 idx = task[1] if task else -1
|
jpayne@68
|
548 set_length(idx + 1)
|
jpayne@68
|
549 continue
|
jpayne@68
|
550 break
|
jpayne@68
|
551 finally:
|
jpayne@68
|
552 task = taskseq = job = None
|
jpayne@68
|
553 else:
|
jpayne@68
|
554 util.debug('task handler got sentinel')
|
jpayne@68
|
555
|
jpayne@68
|
556 try:
|
jpayne@68
|
557 # tell result handler to finish when cache is empty
|
jpayne@68
|
558 util.debug('task handler sending sentinel to result handler')
|
jpayne@68
|
559 outqueue.put(None)
|
jpayne@68
|
560
|
jpayne@68
|
561 # tell workers there is no more work
|
jpayne@68
|
562 util.debug('task handler sending sentinel to workers')
|
jpayne@68
|
563 for p in pool:
|
jpayne@68
|
564 put(None)
|
jpayne@68
|
565 except OSError:
|
jpayne@68
|
566 util.debug('task handler got OSError when sending sentinels')
|
jpayne@68
|
567
|
jpayne@68
|
568 util.debug('task handler exiting')
|
jpayne@68
|
569
|
jpayne@68
|
570 @staticmethod
|
jpayne@68
|
571 def _handle_results(outqueue, get, cache):
|
jpayne@68
|
572 thread = threading.current_thread()
|
jpayne@68
|
573
|
jpayne@68
|
574 while 1:
|
jpayne@68
|
575 try:
|
jpayne@68
|
576 task = get()
|
jpayne@68
|
577 except (OSError, EOFError):
|
jpayne@68
|
578 util.debug('result handler got EOFError/OSError -- exiting')
|
jpayne@68
|
579 return
|
jpayne@68
|
580
|
jpayne@68
|
581 if thread._state != RUN:
|
jpayne@68
|
582 assert thread._state == TERMINATE, "Thread not in TERMINATE"
|
jpayne@68
|
583 util.debug('result handler found thread._state=TERMINATE')
|
jpayne@68
|
584 break
|
jpayne@68
|
585
|
jpayne@68
|
586 if task is None:
|
jpayne@68
|
587 util.debug('result handler got sentinel')
|
jpayne@68
|
588 break
|
jpayne@68
|
589
|
jpayne@68
|
590 job, i, obj = task
|
jpayne@68
|
591 try:
|
jpayne@68
|
592 cache[job]._set(i, obj)
|
jpayne@68
|
593 except KeyError:
|
jpayne@68
|
594 pass
|
jpayne@68
|
595 task = job = obj = None
|
jpayne@68
|
596
|
jpayne@68
|
597 while cache and thread._state != TERMINATE:
|
jpayne@68
|
598 try:
|
jpayne@68
|
599 task = get()
|
jpayne@68
|
600 except (OSError, EOFError):
|
jpayne@68
|
601 util.debug('result handler got EOFError/OSError -- exiting')
|
jpayne@68
|
602 return
|
jpayne@68
|
603
|
jpayne@68
|
604 if task is None:
|
jpayne@68
|
605 util.debug('result handler ignoring extra sentinel')
|
jpayne@68
|
606 continue
|
jpayne@68
|
607 job, i, obj = task
|
jpayne@68
|
608 try:
|
jpayne@68
|
609 cache[job]._set(i, obj)
|
jpayne@68
|
610 except KeyError:
|
jpayne@68
|
611 pass
|
jpayne@68
|
612 task = job = obj = None
|
jpayne@68
|
613
|
jpayne@68
|
614 if hasattr(outqueue, '_reader'):
|
jpayne@68
|
615 util.debug('ensuring that outqueue is not full')
|
jpayne@68
|
616 # If we don't make room available in outqueue then
|
jpayne@68
|
617 # attempts to add the sentinel (None) to outqueue may
|
jpayne@68
|
618 # block. There is guaranteed to be no more than 2 sentinels.
|
jpayne@68
|
619 try:
|
jpayne@68
|
620 for i in range(10):
|
jpayne@68
|
621 if not outqueue._reader.poll():
|
jpayne@68
|
622 break
|
jpayne@68
|
623 get()
|
jpayne@68
|
624 except (OSError, EOFError):
|
jpayne@68
|
625 pass
|
jpayne@68
|
626
|
jpayne@68
|
627 util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
|
jpayne@68
|
628 len(cache), thread._state)
|
jpayne@68
|
629
|
jpayne@68
|
630 @staticmethod
|
jpayne@68
|
631 def _get_tasks(func, it, size):
|
jpayne@68
|
632 it = iter(it)
|
jpayne@68
|
633 while 1:
|
jpayne@68
|
634 x = tuple(itertools.islice(it, size))
|
jpayne@68
|
635 if not x:
|
jpayne@68
|
636 return
|
jpayne@68
|
637 yield (func, x)
|
jpayne@68
|
638
|
jpayne@68
|
639 def __reduce__(self):
|
jpayne@68
|
640 raise NotImplementedError(
|
jpayne@68
|
641 'pool objects cannot be passed between processes or pickled'
|
jpayne@68
|
642 )
|
jpayne@68
|
643
|
jpayne@68
|
644 def close(self):
|
jpayne@68
|
645 util.debug('closing pool')
|
jpayne@68
|
646 if self._state == RUN:
|
jpayne@68
|
647 self._state = CLOSE
|
jpayne@68
|
648 self._worker_handler._state = CLOSE
|
jpayne@68
|
649 self._change_notifier.put(None)
|
jpayne@68
|
650
|
jpayne@68
|
651 def terminate(self):
|
jpayne@68
|
652 util.debug('terminating pool')
|
jpayne@68
|
653 self._state = TERMINATE
|
jpayne@68
|
654 self._worker_handler._state = TERMINATE
|
jpayne@68
|
655 self._change_notifier.put(None)
|
jpayne@68
|
656 self._terminate()
|
jpayne@68
|
657
|
jpayne@68
|
658 def join(self):
|
jpayne@68
|
659 util.debug('joining pool')
|
jpayne@68
|
660 if self._state == RUN:
|
jpayne@68
|
661 raise ValueError("Pool is still running")
|
jpayne@68
|
662 elif self._state not in (CLOSE, TERMINATE):
|
jpayne@68
|
663 raise ValueError("In unknown state")
|
jpayne@68
|
664 self._worker_handler.join()
|
jpayne@68
|
665 self._task_handler.join()
|
jpayne@68
|
666 self._result_handler.join()
|
jpayne@68
|
667 for p in self._pool:
|
jpayne@68
|
668 p.join()
|
jpayne@68
|
669
|
jpayne@68
|
670 @staticmethod
|
jpayne@68
|
671 def _help_stuff_finish(inqueue, task_handler, size):
|
jpayne@68
|
672 # task_handler may be blocked trying to put items on inqueue
|
jpayne@68
|
673 util.debug('removing tasks from inqueue until task handler finished')
|
jpayne@68
|
674 inqueue._rlock.acquire()
|
jpayne@68
|
675 while task_handler.is_alive() and inqueue._reader.poll():
|
jpayne@68
|
676 inqueue._reader.recv()
|
jpayne@68
|
677 time.sleep(0)
|
jpayne@68
|
678
|
jpayne@68
|
679 @classmethod
|
jpayne@68
|
680 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
|
jpayne@68
|
681 worker_handler, task_handler, result_handler, cache):
|
jpayne@68
|
682 # this is guaranteed to only be called once
|
jpayne@68
|
683 util.debug('finalizing pool')
|
jpayne@68
|
684
|
jpayne@68
|
685 worker_handler._state = TERMINATE
|
jpayne@68
|
686 task_handler._state = TERMINATE
|
jpayne@68
|
687
|
jpayne@68
|
688 util.debug('helping task handler/workers to finish')
|
jpayne@68
|
689 cls._help_stuff_finish(inqueue, task_handler, len(pool))
|
jpayne@68
|
690
|
jpayne@68
|
691 if (not result_handler.is_alive()) and (len(cache) != 0):
|
jpayne@68
|
692 raise AssertionError(
|
jpayne@68
|
693 "Cannot have cache with result_hander not alive")
|
jpayne@68
|
694
|
jpayne@68
|
695 result_handler._state = TERMINATE
|
jpayne@68
|
696 change_notifier.put(None)
|
jpayne@68
|
697 outqueue.put(None) # sentinel
|
jpayne@68
|
698
|
jpayne@68
|
699 # We must wait for the worker handler to exit before terminating
|
jpayne@68
|
700 # workers because we don't want workers to be restarted behind our back.
|
jpayne@68
|
701 util.debug('joining worker handler')
|
jpayne@68
|
702 if threading.current_thread() is not worker_handler:
|
jpayne@68
|
703 worker_handler.join()
|
jpayne@68
|
704
|
jpayne@68
|
705 # Terminate workers which haven't already finished.
|
jpayne@68
|
706 if pool and hasattr(pool[0], 'terminate'):
|
jpayne@68
|
707 util.debug('terminating workers')
|
jpayne@68
|
708 for p in pool:
|
jpayne@68
|
709 if p.exitcode is None:
|
jpayne@68
|
710 p.terminate()
|
jpayne@68
|
711
|
jpayne@68
|
712 util.debug('joining task handler')
|
jpayne@68
|
713 if threading.current_thread() is not task_handler:
|
jpayne@68
|
714 task_handler.join()
|
jpayne@68
|
715
|
jpayne@68
|
716 util.debug('joining result handler')
|
jpayne@68
|
717 if threading.current_thread() is not result_handler:
|
jpayne@68
|
718 result_handler.join()
|
jpayne@68
|
719
|
jpayne@68
|
720 if pool and hasattr(pool[0], 'terminate'):
|
jpayne@68
|
721 util.debug('joining pool workers')
|
jpayne@68
|
722 for p in pool:
|
jpayne@68
|
723 if p.is_alive():
|
jpayne@68
|
724 # worker has not yet exited
|
jpayne@68
|
725 util.debug('cleaning up worker %d' % p.pid)
|
jpayne@68
|
726 p.join()
|
jpayne@68
|
727
|
jpayne@68
|
728 def __enter__(self):
|
jpayne@68
|
729 self._check_running()
|
jpayne@68
|
730 return self
|
jpayne@68
|
731
|
jpayne@68
|
732 def __exit__(self, exc_type, exc_val, exc_tb):
|
jpayne@68
|
733 self.terminate()
|
jpayne@68
|
734
|
jpayne@68
|
735 #
|
jpayne@68
|
736 # Class whose instances are returned by `Pool.apply_async()`
|
jpayne@68
|
737 #
|
jpayne@68
|
738
|
jpayne@68
|
739 class ApplyResult(object):
|
jpayne@68
|
740
|
jpayne@68
|
741 def __init__(self, pool, callback, error_callback):
|
jpayne@68
|
742 self._pool = pool
|
jpayne@68
|
743 self._event = threading.Event()
|
jpayne@68
|
744 self._job = next(job_counter)
|
jpayne@68
|
745 self._cache = pool._cache
|
jpayne@68
|
746 self._callback = callback
|
jpayne@68
|
747 self._error_callback = error_callback
|
jpayne@68
|
748 self._cache[self._job] = self
|
jpayne@68
|
749
|
jpayne@68
|
750 def ready(self):
|
jpayne@68
|
751 return self._event.is_set()
|
jpayne@68
|
752
|
jpayne@68
|
753 def successful(self):
|
jpayne@68
|
754 if not self.ready():
|
jpayne@68
|
755 raise ValueError("{0!r} not ready".format(self))
|
jpayne@68
|
756 return self._success
|
jpayne@68
|
757
|
jpayne@68
|
758 def wait(self, timeout=None):
|
jpayne@68
|
759 self._event.wait(timeout)
|
jpayne@68
|
760
|
jpayne@68
|
761 def get(self, timeout=None):
|
jpayne@68
|
762 self.wait(timeout)
|
jpayne@68
|
763 if not self.ready():
|
jpayne@68
|
764 raise TimeoutError
|
jpayne@68
|
765 if self._success:
|
jpayne@68
|
766 return self._value
|
jpayne@68
|
767 else:
|
jpayne@68
|
768 raise self._value
|
jpayne@68
|
769
|
jpayne@68
|
770 def _set(self, i, obj):
|
jpayne@68
|
771 self._success, self._value = obj
|
jpayne@68
|
772 if self._callback and self._success:
|
jpayne@68
|
773 self._callback(self._value)
|
jpayne@68
|
774 if self._error_callback and not self._success:
|
jpayne@68
|
775 self._error_callback(self._value)
|
jpayne@68
|
776 self._event.set()
|
jpayne@68
|
777 del self._cache[self._job]
|
jpayne@68
|
778 self._pool = None
|
jpayne@68
|
779
|
jpayne@68
|
780 AsyncResult = ApplyResult # create alias -- see #17805
|
jpayne@68
|
781
|
jpayne@68
|
782 #
|
jpayne@68
|
783 # Class whose instances are returned by `Pool.map_async()`
|
jpayne@68
|
784 #
|
jpayne@68
|
785
|
jpayne@68
|
786 class MapResult(ApplyResult):
|
jpayne@68
|
787
|
jpayne@68
|
788 def __init__(self, pool, chunksize, length, callback, error_callback):
|
jpayne@68
|
789 ApplyResult.__init__(self, pool, callback,
|
jpayne@68
|
790 error_callback=error_callback)
|
jpayne@68
|
791 self._success = True
|
jpayne@68
|
792 self._value = [None] * length
|
jpayne@68
|
793 self._chunksize = chunksize
|
jpayne@68
|
794 if chunksize <= 0:
|
jpayne@68
|
795 self._number_left = 0
|
jpayne@68
|
796 self._event.set()
|
jpayne@68
|
797 del self._cache[self._job]
|
jpayne@68
|
798 else:
|
jpayne@68
|
799 self._number_left = length//chunksize + bool(length % chunksize)
|
jpayne@68
|
800
|
jpayne@68
|
801 def _set(self, i, success_result):
|
jpayne@68
|
802 self._number_left -= 1
|
jpayne@68
|
803 success, result = success_result
|
jpayne@68
|
804 if success and self._success:
|
jpayne@68
|
805 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
|
jpayne@68
|
806 if self._number_left == 0:
|
jpayne@68
|
807 if self._callback:
|
jpayne@68
|
808 self._callback(self._value)
|
jpayne@68
|
809 del self._cache[self._job]
|
jpayne@68
|
810 self._event.set()
|
jpayne@68
|
811 self._pool = None
|
jpayne@68
|
812 else:
|
jpayne@68
|
813 if not success and self._success:
|
jpayne@68
|
814 # only store first exception
|
jpayne@68
|
815 self._success = False
|
jpayne@68
|
816 self._value = result
|
jpayne@68
|
817 if self._number_left == 0:
|
jpayne@68
|
818 # only consider the result ready once all jobs are done
|
jpayne@68
|
819 if self._error_callback:
|
jpayne@68
|
820 self._error_callback(self._value)
|
jpayne@68
|
821 del self._cache[self._job]
|
jpayne@68
|
822 self._event.set()
|
jpayne@68
|
823 self._pool = None
|
jpayne@68
|
824
|
jpayne@68
|
825 #
|
jpayne@68
|
826 # Class whose instances are returned by `Pool.imap()`
|
jpayne@68
|
827 #
|
jpayne@68
|
828
|
jpayne@68
|
829 class IMapIterator(object):
|
jpayne@68
|
830
|
jpayne@68
|
831 def __init__(self, pool):
|
jpayne@68
|
832 self._pool = pool
|
jpayne@68
|
833 self._cond = threading.Condition(threading.Lock())
|
jpayne@68
|
834 self._job = next(job_counter)
|
jpayne@68
|
835 self._cache = pool._cache
|
jpayne@68
|
836 self._items = collections.deque()
|
jpayne@68
|
837 self._index = 0
|
jpayne@68
|
838 self._length = None
|
jpayne@68
|
839 self._unsorted = {}
|
jpayne@68
|
840 self._cache[self._job] = self
|
jpayne@68
|
841
|
jpayne@68
|
842 def __iter__(self):
|
jpayne@68
|
843 return self
|
jpayne@68
|
844
|
jpayne@68
|
845 def next(self, timeout=None):
|
jpayne@68
|
846 with self._cond:
|
jpayne@68
|
847 try:
|
jpayne@68
|
848 item = self._items.popleft()
|
jpayne@68
|
849 except IndexError:
|
jpayne@68
|
850 if self._index == self._length:
|
jpayne@68
|
851 self._pool = None
|
jpayne@68
|
852 raise StopIteration from None
|
jpayne@68
|
853 self._cond.wait(timeout)
|
jpayne@68
|
854 try:
|
jpayne@68
|
855 item = self._items.popleft()
|
jpayne@68
|
856 except IndexError:
|
jpayne@68
|
857 if self._index == self._length:
|
jpayne@68
|
858 self._pool = None
|
jpayne@68
|
859 raise StopIteration from None
|
jpayne@68
|
860 raise TimeoutError from None
|
jpayne@68
|
861
|
jpayne@68
|
862 success, value = item
|
jpayne@68
|
863 if success:
|
jpayne@68
|
864 return value
|
jpayne@68
|
865 raise value
|
jpayne@68
|
866
|
jpayne@68
|
867 __next__ = next # XXX
|
jpayne@68
|
868
|
jpayne@68
|
869 def _set(self, i, obj):
|
jpayne@68
|
870 with self._cond:
|
jpayne@68
|
871 if self._index == i:
|
jpayne@68
|
872 self._items.append(obj)
|
jpayne@68
|
873 self._index += 1
|
jpayne@68
|
874 while self._index in self._unsorted:
|
jpayne@68
|
875 obj = self._unsorted.pop(self._index)
|
jpayne@68
|
876 self._items.append(obj)
|
jpayne@68
|
877 self._index += 1
|
jpayne@68
|
878 self._cond.notify()
|
jpayne@68
|
879 else:
|
jpayne@68
|
880 self._unsorted[i] = obj
|
jpayne@68
|
881
|
jpayne@68
|
882 if self._index == self._length:
|
jpayne@68
|
883 del self._cache[self._job]
|
jpayne@68
|
884 self._pool = None
|
jpayne@68
|
885
|
jpayne@68
|
886 def _set_length(self, length):
|
jpayne@68
|
887 with self._cond:
|
jpayne@68
|
888 self._length = length
|
jpayne@68
|
889 if self._index == self._length:
|
jpayne@68
|
890 self._cond.notify()
|
jpayne@68
|
891 del self._cache[self._job]
|
jpayne@68
|
892 self._pool = None
|
jpayne@68
|
893
|
jpayne@68
|
894 #
|
jpayne@68
|
895 # Class whose instances are returned by `Pool.imap_unordered()`
|
jpayne@68
|
896 #
|
jpayne@68
|
897
|
jpayne@68
|
898 class IMapUnorderedIterator(IMapIterator):
|
jpayne@68
|
899
|
jpayne@68
|
900 def _set(self, i, obj):
|
jpayne@68
|
901 with self._cond:
|
jpayne@68
|
902 self._items.append(obj)
|
jpayne@68
|
903 self._index += 1
|
jpayne@68
|
904 self._cond.notify()
|
jpayne@68
|
905 if self._index == self._length:
|
jpayne@68
|
906 del self._cache[self._job]
|
jpayne@68
|
907 self._pool = None
|
jpayne@68
|
908
|
jpayne@68
|
909 #
|
jpayne@68
|
910 #
|
jpayne@68
|
911 #
|
jpayne@68
|
912
|
jpayne@68
|
913 class ThreadPool(Pool):
|
jpayne@68
|
914 _wrap_exception = False
|
jpayne@68
|
915
|
jpayne@68
|
916 @staticmethod
|
jpayne@68
|
917 def Process(ctx, *args, **kwds):
|
jpayne@68
|
918 from .dummy import Process
|
jpayne@68
|
919 return Process(*args, **kwds)
|
jpayne@68
|
920
|
jpayne@68
|
921 def __init__(self, processes=None, initializer=None, initargs=()):
|
jpayne@68
|
922 Pool.__init__(self, processes, initializer, initargs)
|
jpayne@68
|
923
|
jpayne@68
|
924 def _setup_queues(self):
|
jpayne@68
|
925 self._inqueue = queue.SimpleQueue()
|
jpayne@68
|
926 self._outqueue = queue.SimpleQueue()
|
jpayne@68
|
927 self._quick_put = self._inqueue.put
|
jpayne@68
|
928 self._quick_get = self._outqueue.get
|
jpayne@68
|
929
|
jpayne@68
|
930 def _get_sentinels(self):
|
jpayne@68
|
931 return [self._change_notifier._reader]
|
jpayne@68
|
932
|
jpayne@68
|
933 @staticmethod
|
jpayne@68
|
934 def _get_worker_sentinels(workers):
|
jpayne@68
|
935 return []
|
jpayne@68
|
936
|
jpayne@68
|
937 @staticmethod
|
jpayne@68
|
938 def _help_stuff_finish(inqueue, task_handler, size):
|
jpayne@68
|
939 # drain inqueue, and put sentinels at its head to make workers finish
|
jpayne@68
|
940 try:
|
jpayne@68
|
941 while True:
|
jpayne@68
|
942 inqueue.get(block=False)
|
jpayne@68
|
943 except queue.Empty:
|
jpayne@68
|
944 pass
|
jpayne@68
|
945 for i in range(size):
|
jpayne@68
|
946 inqueue.put(None)
|
jpayne@68
|
947
|
jpayne@68
|
948 def _wait_for_updates(self, sentinels, change_notifier, timeout):
|
jpayne@68
|
949 time.sleep(timeout)
|