Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/pool.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 # | |
2 # Module providing the `Pool` class for managing a process pool | |
3 # | |
4 # multiprocessing/pool.py | |
5 # | |
6 # Copyright (c) 2006-2008, R Oudkerk | |
7 # Licensed to PSF under a Contributor Agreement. | |
8 # | |
9 | |
10 __all__ = ['Pool', 'ThreadPool'] | |
11 | |
12 # | |
13 # Imports | |
14 # | |
15 | |
16 import collections | |
17 import itertools | |
18 import os | |
19 import queue | |
20 import threading | |
21 import time | |
22 import traceback | |
23 import warnings | |
24 from queue import Empty | |
25 | |
26 # If threading is available then ThreadPool should be provided. Therefore | |
27 # we avoid top-level imports which are liable to fail on some systems. | |
28 from . import util | |
29 from . import get_context, TimeoutError | |
30 from .connection import wait | |
31 | |
32 # | |
33 # Constants representing the state of a pool | |
34 # | |
35 | |
36 INIT = "INIT" | |
37 RUN = "RUN" | |
38 CLOSE = "CLOSE" | |
39 TERMINATE = "TERMINATE" | |
40 | |
41 # | |
42 # Miscellaneous | |
43 # | |
44 | |
45 job_counter = itertools.count() | |
46 | |
47 def mapstar(args): | |
48 return list(map(*args)) | |
49 | |
50 def starmapstar(args): | |
51 return list(itertools.starmap(args[0], args[1])) | |
52 | |
53 # | |
54 # Hack to embed stringification of remote traceback in local traceback | |
55 # | |
56 | |
57 class RemoteTraceback(Exception): | |
58 def __init__(self, tb): | |
59 self.tb = tb | |
60 def __str__(self): | |
61 return self.tb | |
62 | |
63 class ExceptionWithTraceback: | |
64 def __init__(self, exc, tb): | |
65 tb = traceback.format_exception(type(exc), exc, tb) | |
66 tb = ''.join(tb) | |
67 self.exc = exc | |
68 self.tb = '\n"""\n%s"""' % tb | |
69 def __reduce__(self): | |
70 return rebuild_exc, (self.exc, self.tb) | |
71 | |
72 def rebuild_exc(exc, tb): | |
73 exc.__cause__ = RemoteTraceback(tb) | |
74 return exc | |
75 | |
76 # | |
77 # Code run by worker processes | |
78 # | |
79 | |
80 class MaybeEncodingError(Exception): | |
81 """Wraps possible unpickleable errors, so they can be | |
82 safely sent through the socket.""" | |
83 | |
84 def __init__(self, exc, value): | |
85 self.exc = repr(exc) | |
86 self.value = repr(value) | |
87 super(MaybeEncodingError, self).__init__(self.exc, self.value) | |
88 | |
89 def __str__(self): | |
90 return "Error sending result: '%s'. Reason: '%s'" % (self.value, | |
91 self.exc) | |
92 | |
93 def __repr__(self): | |
94 return "<%s: %s>" % (self.__class__.__name__, self) | |
95 | |
96 | |
97 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, | |
98 wrap_exception=False): | |
99 if (maxtasks is not None) and not (isinstance(maxtasks, int) | |
100 and maxtasks >= 1): | |
101 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) | |
102 put = outqueue.put | |
103 get = inqueue.get | |
104 if hasattr(inqueue, '_writer'): | |
105 inqueue._writer.close() | |
106 outqueue._reader.close() | |
107 | |
108 if initializer is not None: | |
109 initializer(*initargs) | |
110 | |
111 completed = 0 | |
112 while maxtasks is None or (maxtasks and completed < maxtasks): | |
113 try: | |
114 task = get() | |
115 except (EOFError, OSError): | |
116 util.debug('worker got EOFError or OSError -- exiting') | |
117 break | |
118 | |
119 if task is None: | |
120 util.debug('worker got sentinel -- exiting') | |
121 break | |
122 | |
123 job, i, func, args, kwds = task | |
124 try: | |
125 result = (True, func(*args, **kwds)) | |
126 except Exception as e: | |
127 if wrap_exception and func is not _helper_reraises_exception: | |
128 e = ExceptionWithTraceback(e, e.__traceback__) | |
129 result = (False, e) | |
130 try: | |
131 put((job, i, result)) | |
132 except Exception as e: | |
133 wrapped = MaybeEncodingError(e, result[1]) | |
134 util.debug("Possible encoding error while sending result: %s" % ( | |
135 wrapped)) | |
136 put((job, i, (False, wrapped))) | |
137 | |
138 task = job = result = func = args = kwds = None | |
139 completed += 1 | |
140 util.debug('worker exiting after %d tasks' % completed) | |
141 | |
142 def _helper_reraises_exception(ex): | |
143 'Pickle-able helper function for use by _guarded_task_generation.' | |
144 raise ex | |
145 | |
146 # | |
147 # Class representing a process pool | |
148 # | |
149 | |
150 class _PoolCache(dict): | |
151 """ | |
152 Class that implements a cache for the Pool class that will notify | |
153 the pool management threads every time the cache is emptied. The | |
154 notification is done by the use of a queue that is provided when | |
155 instantiating the cache. | |
156 """ | |
157 def __init__(self, /, *args, notifier=None, **kwds): | |
158 self.notifier = notifier | |
159 super().__init__(*args, **kwds) | |
160 | |
161 def __delitem__(self, item): | |
162 super().__delitem__(item) | |
163 | |
164 # Notify that the cache is empty. This is important because the | |
165 # pool keeps maintaining workers until the cache gets drained. This | |
166 # eliminates a race condition in which a task is finished after the | |
167 # the pool's _handle_workers method has enter another iteration of the | |
168 # loop. In this situation, the only event that can wake up the pool | |
169 # is the cache to be emptied (no more tasks available). | |
170 if not self: | |
171 self.notifier.put(None) | |
172 | |
173 class Pool(object): | |
174 ''' | |
175 Class which supports an async version of applying functions to arguments. | |
176 ''' | |
177 _wrap_exception = True | |
178 | |
179 @staticmethod | |
180 def Process(ctx, *args, **kwds): | |
181 return ctx.Process(*args, **kwds) | |
182 | |
183 def __init__(self, processes=None, initializer=None, initargs=(), | |
184 maxtasksperchild=None, context=None): | |
185 # Attributes initialized early to make sure that they exist in | |
186 # __del__() if __init__() raises an exception | |
187 self._pool = [] | |
188 self._state = INIT | |
189 | |
190 self._ctx = context or get_context() | |
191 self._setup_queues() | |
192 self._taskqueue = queue.SimpleQueue() | |
193 # The _change_notifier queue exist to wake up self._handle_workers() | |
194 # when the cache (self._cache) is empty or when there is a change in | |
195 # the _state variable of the thread that runs _handle_workers. | |
196 self._change_notifier = self._ctx.SimpleQueue() | |
197 self._cache = _PoolCache(notifier=self._change_notifier) | |
198 self._maxtasksperchild = maxtasksperchild | |
199 self._initializer = initializer | |
200 self._initargs = initargs | |
201 | |
202 if processes is None: | |
203 processes = os.cpu_count() or 1 | |
204 if processes < 1: | |
205 raise ValueError("Number of processes must be at least 1") | |
206 | |
207 if initializer is not None and not callable(initializer): | |
208 raise TypeError('initializer must be a callable') | |
209 | |
210 self._processes = processes | |
211 try: | |
212 self._repopulate_pool() | |
213 except Exception: | |
214 for p in self._pool: | |
215 if p.exitcode is None: | |
216 p.terminate() | |
217 for p in self._pool: | |
218 p.join() | |
219 raise | |
220 | |
221 sentinels = self._get_sentinels() | |
222 | |
223 self._worker_handler = threading.Thread( | |
224 target=Pool._handle_workers, | |
225 args=(self._cache, self._taskqueue, self._ctx, self.Process, | |
226 self._processes, self._pool, self._inqueue, self._outqueue, | |
227 self._initializer, self._initargs, self._maxtasksperchild, | |
228 self._wrap_exception, sentinels, self._change_notifier) | |
229 ) | |
230 self._worker_handler.daemon = True | |
231 self._worker_handler._state = RUN | |
232 self._worker_handler.start() | |
233 | |
234 | |
235 self._task_handler = threading.Thread( | |
236 target=Pool._handle_tasks, | |
237 args=(self._taskqueue, self._quick_put, self._outqueue, | |
238 self._pool, self._cache) | |
239 ) | |
240 self._task_handler.daemon = True | |
241 self._task_handler._state = RUN | |
242 self._task_handler.start() | |
243 | |
244 self._result_handler = threading.Thread( | |
245 target=Pool._handle_results, | |
246 args=(self._outqueue, self._quick_get, self._cache) | |
247 ) | |
248 self._result_handler.daemon = True | |
249 self._result_handler._state = RUN | |
250 self._result_handler.start() | |
251 | |
252 self._terminate = util.Finalize( | |
253 self, self._terminate_pool, | |
254 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, | |
255 self._change_notifier, self._worker_handler, self._task_handler, | |
256 self._result_handler, self._cache), | |
257 exitpriority=15 | |
258 ) | |
259 self._state = RUN | |
260 | |
261 # Copy globals as function locals to make sure that they are available | |
262 # during Python shutdown when the Pool is destroyed. | |
263 def __del__(self, _warn=warnings.warn, RUN=RUN): | |
264 if self._state == RUN: | |
265 _warn(f"unclosed running multiprocessing pool {self!r}", | |
266 ResourceWarning, source=self) | |
267 if getattr(self, '_change_notifier', None) is not None: | |
268 self._change_notifier.put(None) | |
269 | |
270 def __repr__(self): | |
271 cls = self.__class__ | |
272 return (f'<{cls.__module__}.{cls.__qualname__} ' | |
273 f'state={self._state} ' | |
274 f'pool_size={len(self._pool)}>') | |
275 | |
276 def _get_sentinels(self): | |
277 task_queue_sentinels = [self._outqueue._reader] | |
278 self_notifier_sentinels = [self._change_notifier._reader] | |
279 return [*task_queue_sentinels, *self_notifier_sentinels] | |
280 | |
281 @staticmethod | |
282 def _get_worker_sentinels(workers): | |
283 return [worker.sentinel for worker in | |
284 workers if hasattr(worker, "sentinel")] | |
285 | |
286 @staticmethod | |
287 def _join_exited_workers(pool): | |
288 """Cleanup after any worker processes which have exited due to reaching | |
289 their specified lifetime. Returns True if any workers were cleaned up. | |
290 """ | |
291 cleaned = False | |
292 for i in reversed(range(len(pool))): | |
293 worker = pool[i] | |
294 if worker.exitcode is not None: | |
295 # worker exited | |
296 util.debug('cleaning up worker %d' % i) | |
297 worker.join() | |
298 cleaned = True | |
299 del pool[i] | |
300 return cleaned | |
301 | |
302 def _repopulate_pool(self): | |
303 return self._repopulate_pool_static(self._ctx, self.Process, | |
304 self._processes, | |
305 self._pool, self._inqueue, | |
306 self._outqueue, self._initializer, | |
307 self._initargs, | |
308 self._maxtasksperchild, | |
309 self._wrap_exception) | |
310 | |
311 @staticmethod | |
312 def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, | |
313 outqueue, initializer, initargs, | |
314 maxtasksperchild, wrap_exception): | |
315 """Bring the number of pool processes up to the specified number, | |
316 for use after reaping workers which have exited. | |
317 """ | |
318 for i in range(processes - len(pool)): | |
319 w = Process(ctx, target=worker, | |
320 args=(inqueue, outqueue, | |
321 initializer, | |
322 initargs, maxtasksperchild, | |
323 wrap_exception)) | |
324 w.name = w.name.replace('Process', 'PoolWorker') | |
325 w.daemon = True | |
326 w.start() | |
327 pool.append(w) | |
328 util.debug('added worker') | |
329 | |
330 @staticmethod | |
331 def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, | |
332 initializer, initargs, maxtasksperchild, | |
333 wrap_exception): | |
334 """Clean up any exited workers and start replacements for them. | |
335 """ | |
336 if Pool._join_exited_workers(pool): | |
337 Pool._repopulate_pool_static(ctx, Process, processes, pool, | |
338 inqueue, outqueue, initializer, | |
339 initargs, maxtasksperchild, | |
340 wrap_exception) | |
341 | |
342 def _setup_queues(self): | |
343 self._inqueue = self._ctx.SimpleQueue() | |
344 self._outqueue = self._ctx.SimpleQueue() | |
345 self._quick_put = self._inqueue._writer.send | |
346 self._quick_get = self._outqueue._reader.recv | |
347 | |
348 def _check_running(self): | |
349 if self._state != RUN: | |
350 raise ValueError("Pool not running") | |
351 | |
352 def apply(self, func, args=(), kwds={}): | |
353 ''' | |
354 Equivalent of `func(*args, **kwds)`. | |
355 Pool must be running. | |
356 ''' | |
357 return self.apply_async(func, args, kwds).get() | |
358 | |
359 def map(self, func, iterable, chunksize=None): | |
360 ''' | |
361 Apply `func` to each element in `iterable`, collecting the results | |
362 in a list that is returned. | |
363 ''' | |
364 return self._map_async(func, iterable, mapstar, chunksize).get() | |
365 | |
366 def starmap(self, func, iterable, chunksize=None): | |
367 ''' | |
368 Like `map()` method but the elements of the `iterable` are expected to | |
369 be iterables as well and will be unpacked as arguments. Hence | |
370 `func` and (a, b) becomes func(a, b). | |
371 ''' | |
372 return self._map_async(func, iterable, starmapstar, chunksize).get() | |
373 | |
374 def starmap_async(self, func, iterable, chunksize=None, callback=None, | |
375 error_callback=None): | |
376 ''' | |
377 Asynchronous version of `starmap()` method. | |
378 ''' | |
379 return self._map_async(func, iterable, starmapstar, chunksize, | |
380 callback, error_callback) | |
381 | |
382 def _guarded_task_generation(self, result_job, func, iterable): | |
383 '''Provides a generator of tasks for imap and imap_unordered with | |
384 appropriate handling for iterables which throw exceptions during | |
385 iteration.''' | |
386 try: | |
387 i = -1 | |
388 for i, x in enumerate(iterable): | |
389 yield (result_job, i, func, (x,), {}) | |
390 except Exception as e: | |
391 yield (result_job, i+1, _helper_reraises_exception, (e,), {}) | |
392 | |
393 def imap(self, func, iterable, chunksize=1): | |
394 ''' | |
395 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. | |
396 ''' | |
397 self._check_running() | |
398 if chunksize == 1: | |
399 result = IMapIterator(self) | |
400 self._taskqueue.put( | |
401 ( | |
402 self._guarded_task_generation(result._job, func, iterable), | |
403 result._set_length | |
404 )) | |
405 return result | |
406 else: | |
407 if chunksize < 1: | |
408 raise ValueError( | |
409 "Chunksize must be 1+, not {0:n}".format( | |
410 chunksize)) | |
411 task_batches = Pool._get_tasks(func, iterable, chunksize) | |
412 result = IMapIterator(self) | |
413 self._taskqueue.put( | |
414 ( | |
415 self._guarded_task_generation(result._job, | |
416 mapstar, | |
417 task_batches), | |
418 result._set_length | |
419 )) | |
420 return (item for chunk in result for item in chunk) | |
421 | |
422 def imap_unordered(self, func, iterable, chunksize=1): | |
423 ''' | |
424 Like `imap()` method but ordering of results is arbitrary. | |
425 ''' | |
426 self._check_running() | |
427 if chunksize == 1: | |
428 result = IMapUnorderedIterator(self) | |
429 self._taskqueue.put( | |
430 ( | |
431 self._guarded_task_generation(result._job, func, iterable), | |
432 result._set_length | |
433 )) | |
434 return result | |
435 else: | |
436 if chunksize < 1: | |
437 raise ValueError( | |
438 "Chunksize must be 1+, not {0!r}".format(chunksize)) | |
439 task_batches = Pool._get_tasks(func, iterable, chunksize) | |
440 result = IMapUnorderedIterator(self) | |
441 self._taskqueue.put( | |
442 ( | |
443 self._guarded_task_generation(result._job, | |
444 mapstar, | |
445 task_batches), | |
446 result._set_length | |
447 )) | |
448 return (item for chunk in result for item in chunk) | |
449 | |
450 def apply_async(self, func, args=(), kwds={}, callback=None, | |
451 error_callback=None): | |
452 ''' | |
453 Asynchronous version of `apply()` method. | |
454 ''' | |
455 self._check_running() | |
456 result = ApplyResult(self, callback, error_callback) | |
457 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) | |
458 return result | |
459 | |
460 def map_async(self, func, iterable, chunksize=None, callback=None, | |
461 error_callback=None): | |
462 ''' | |
463 Asynchronous version of `map()` method. | |
464 ''' | |
465 return self._map_async(func, iterable, mapstar, chunksize, callback, | |
466 error_callback) | |
467 | |
468 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, | |
469 error_callback=None): | |
470 ''' | |
471 Helper function to implement map, starmap and their async counterparts. | |
472 ''' | |
473 self._check_running() | |
474 if not hasattr(iterable, '__len__'): | |
475 iterable = list(iterable) | |
476 | |
477 if chunksize is None: | |
478 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) | |
479 if extra: | |
480 chunksize += 1 | |
481 if len(iterable) == 0: | |
482 chunksize = 0 | |
483 | |
484 task_batches = Pool._get_tasks(func, iterable, chunksize) | |
485 result = MapResult(self, chunksize, len(iterable), callback, | |
486 error_callback=error_callback) | |
487 self._taskqueue.put( | |
488 ( | |
489 self._guarded_task_generation(result._job, | |
490 mapper, | |
491 task_batches), | |
492 None | |
493 ) | |
494 ) | |
495 return result | |
496 | |
497 @staticmethod | |
498 def _wait_for_updates(sentinels, change_notifier, timeout=None): | |
499 wait(sentinels, timeout=timeout) | |
500 while not change_notifier.empty(): | |
501 change_notifier.get() | |
502 | |
503 @classmethod | |
504 def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, | |
505 pool, inqueue, outqueue, initializer, initargs, | |
506 maxtasksperchild, wrap_exception, sentinels, | |
507 change_notifier): | |
508 thread = threading.current_thread() | |
509 | |
510 # Keep maintaining workers until the cache gets drained, unless the pool | |
511 # is terminated. | |
512 while thread._state == RUN or (cache and thread._state != TERMINATE): | |
513 cls._maintain_pool(ctx, Process, processes, pool, inqueue, | |
514 outqueue, initializer, initargs, | |
515 maxtasksperchild, wrap_exception) | |
516 | |
517 current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] | |
518 | |
519 cls._wait_for_updates(current_sentinels, change_notifier) | |
520 # send sentinel to stop workers | |
521 taskqueue.put(None) | |
522 util.debug('worker handler exiting') | |
523 | |
524 @staticmethod | |
525 def _handle_tasks(taskqueue, put, outqueue, pool, cache): | |
526 thread = threading.current_thread() | |
527 | |
528 for taskseq, set_length in iter(taskqueue.get, None): | |
529 task = None | |
530 try: | |
531 # iterating taskseq cannot fail | |
532 for task in taskseq: | |
533 if thread._state != RUN: | |
534 util.debug('task handler found thread._state != RUN') | |
535 break | |
536 try: | |
537 put(task) | |
538 except Exception as e: | |
539 job, idx = task[:2] | |
540 try: | |
541 cache[job]._set(idx, (False, e)) | |
542 except KeyError: | |
543 pass | |
544 else: | |
545 if set_length: | |
546 util.debug('doing set_length()') | |
547 idx = task[1] if task else -1 | |
548 set_length(idx + 1) | |
549 continue | |
550 break | |
551 finally: | |
552 task = taskseq = job = None | |
553 else: | |
554 util.debug('task handler got sentinel') | |
555 | |
556 try: | |
557 # tell result handler to finish when cache is empty | |
558 util.debug('task handler sending sentinel to result handler') | |
559 outqueue.put(None) | |
560 | |
561 # tell workers there is no more work | |
562 util.debug('task handler sending sentinel to workers') | |
563 for p in pool: | |
564 put(None) | |
565 except OSError: | |
566 util.debug('task handler got OSError when sending sentinels') | |
567 | |
568 util.debug('task handler exiting') | |
569 | |
570 @staticmethod | |
571 def _handle_results(outqueue, get, cache): | |
572 thread = threading.current_thread() | |
573 | |
574 while 1: | |
575 try: | |
576 task = get() | |
577 except (OSError, EOFError): | |
578 util.debug('result handler got EOFError/OSError -- exiting') | |
579 return | |
580 | |
581 if thread._state != RUN: | |
582 assert thread._state == TERMINATE, "Thread not in TERMINATE" | |
583 util.debug('result handler found thread._state=TERMINATE') | |
584 break | |
585 | |
586 if task is None: | |
587 util.debug('result handler got sentinel') | |
588 break | |
589 | |
590 job, i, obj = task | |
591 try: | |
592 cache[job]._set(i, obj) | |
593 except KeyError: | |
594 pass | |
595 task = job = obj = None | |
596 | |
597 while cache and thread._state != TERMINATE: | |
598 try: | |
599 task = get() | |
600 except (OSError, EOFError): | |
601 util.debug('result handler got EOFError/OSError -- exiting') | |
602 return | |
603 | |
604 if task is None: | |
605 util.debug('result handler ignoring extra sentinel') | |
606 continue | |
607 job, i, obj = task | |
608 try: | |
609 cache[job]._set(i, obj) | |
610 except KeyError: | |
611 pass | |
612 task = job = obj = None | |
613 | |
614 if hasattr(outqueue, '_reader'): | |
615 util.debug('ensuring that outqueue is not full') | |
616 # If we don't make room available in outqueue then | |
617 # attempts to add the sentinel (None) to outqueue may | |
618 # block. There is guaranteed to be no more than 2 sentinels. | |
619 try: | |
620 for i in range(10): | |
621 if not outqueue._reader.poll(): | |
622 break | |
623 get() | |
624 except (OSError, EOFError): | |
625 pass | |
626 | |
627 util.debug('result handler exiting: len(cache)=%s, thread._state=%s', | |
628 len(cache), thread._state) | |
629 | |
630 @staticmethod | |
631 def _get_tasks(func, it, size): | |
632 it = iter(it) | |
633 while 1: | |
634 x = tuple(itertools.islice(it, size)) | |
635 if not x: | |
636 return | |
637 yield (func, x) | |
638 | |
639 def __reduce__(self): | |
640 raise NotImplementedError( | |
641 'pool objects cannot be passed between processes or pickled' | |
642 ) | |
643 | |
644 def close(self): | |
645 util.debug('closing pool') | |
646 if self._state == RUN: | |
647 self._state = CLOSE | |
648 self._worker_handler._state = CLOSE | |
649 self._change_notifier.put(None) | |
650 | |
651 def terminate(self): | |
652 util.debug('terminating pool') | |
653 self._state = TERMINATE | |
654 self._worker_handler._state = TERMINATE | |
655 self._change_notifier.put(None) | |
656 self._terminate() | |
657 | |
658 def join(self): | |
659 util.debug('joining pool') | |
660 if self._state == RUN: | |
661 raise ValueError("Pool is still running") | |
662 elif self._state not in (CLOSE, TERMINATE): | |
663 raise ValueError("In unknown state") | |
664 self._worker_handler.join() | |
665 self._task_handler.join() | |
666 self._result_handler.join() | |
667 for p in self._pool: | |
668 p.join() | |
669 | |
670 @staticmethod | |
671 def _help_stuff_finish(inqueue, task_handler, size): | |
672 # task_handler may be blocked trying to put items on inqueue | |
673 util.debug('removing tasks from inqueue until task handler finished') | |
674 inqueue._rlock.acquire() | |
675 while task_handler.is_alive() and inqueue._reader.poll(): | |
676 inqueue._reader.recv() | |
677 time.sleep(0) | |
678 | |
679 @classmethod | |
680 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, | |
681 worker_handler, task_handler, result_handler, cache): | |
682 # this is guaranteed to only be called once | |
683 util.debug('finalizing pool') | |
684 | |
685 worker_handler._state = TERMINATE | |
686 task_handler._state = TERMINATE | |
687 | |
688 util.debug('helping task handler/workers to finish') | |
689 cls._help_stuff_finish(inqueue, task_handler, len(pool)) | |
690 | |
691 if (not result_handler.is_alive()) and (len(cache) != 0): | |
692 raise AssertionError( | |
693 "Cannot have cache with result_hander not alive") | |
694 | |
695 result_handler._state = TERMINATE | |
696 change_notifier.put(None) | |
697 outqueue.put(None) # sentinel | |
698 | |
699 # We must wait for the worker handler to exit before terminating | |
700 # workers because we don't want workers to be restarted behind our back. | |
701 util.debug('joining worker handler') | |
702 if threading.current_thread() is not worker_handler: | |
703 worker_handler.join() | |
704 | |
705 # Terminate workers which haven't already finished. | |
706 if pool and hasattr(pool[0], 'terminate'): | |
707 util.debug('terminating workers') | |
708 for p in pool: | |
709 if p.exitcode is None: | |
710 p.terminate() | |
711 | |
712 util.debug('joining task handler') | |
713 if threading.current_thread() is not task_handler: | |
714 task_handler.join() | |
715 | |
716 util.debug('joining result handler') | |
717 if threading.current_thread() is not result_handler: | |
718 result_handler.join() | |
719 | |
720 if pool and hasattr(pool[0], 'terminate'): | |
721 util.debug('joining pool workers') | |
722 for p in pool: | |
723 if p.is_alive(): | |
724 # worker has not yet exited | |
725 util.debug('cleaning up worker %d' % p.pid) | |
726 p.join() | |
727 | |
728 def __enter__(self): | |
729 self._check_running() | |
730 return self | |
731 | |
732 def __exit__(self, exc_type, exc_val, exc_tb): | |
733 self.terminate() | |
734 | |
735 # | |
736 # Class whose instances are returned by `Pool.apply_async()` | |
737 # | |
738 | |
739 class ApplyResult(object): | |
740 | |
741 def __init__(self, pool, callback, error_callback): | |
742 self._pool = pool | |
743 self._event = threading.Event() | |
744 self._job = next(job_counter) | |
745 self._cache = pool._cache | |
746 self._callback = callback | |
747 self._error_callback = error_callback | |
748 self._cache[self._job] = self | |
749 | |
750 def ready(self): | |
751 return self._event.is_set() | |
752 | |
753 def successful(self): | |
754 if not self.ready(): | |
755 raise ValueError("{0!r} not ready".format(self)) | |
756 return self._success | |
757 | |
758 def wait(self, timeout=None): | |
759 self._event.wait(timeout) | |
760 | |
761 def get(self, timeout=None): | |
762 self.wait(timeout) | |
763 if not self.ready(): | |
764 raise TimeoutError | |
765 if self._success: | |
766 return self._value | |
767 else: | |
768 raise self._value | |
769 | |
770 def _set(self, i, obj): | |
771 self._success, self._value = obj | |
772 if self._callback and self._success: | |
773 self._callback(self._value) | |
774 if self._error_callback and not self._success: | |
775 self._error_callback(self._value) | |
776 self._event.set() | |
777 del self._cache[self._job] | |
778 self._pool = None | |
779 | |
780 AsyncResult = ApplyResult # create alias -- see #17805 | |
781 | |
782 # | |
783 # Class whose instances are returned by `Pool.map_async()` | |
784 # | |
785 | |
786 class MapResult(ApplyResult): | |
787 | |
788 def __init__(self, pool, chunksize, length, callback, error_callback): | |
789 ApplyResult.__init__(self, pool, callback, | |
790 error_callback=error_callback) | |
791 self._success = True | |
792 self._value = [None] * length | |
793 self._chunksize = chunksize | |
794 if chunksize <= 0: | |
795 self._number_left = 0 | |
796 self._event.set() | |
797 del self._cache[self._job] | |
798 else: | |
799 self._number_left = length//chunksize + bool(length % chunksize) | |
800 | |
801 def _set(self, i, success_result): | |
802 self._number_left -= 1 | |
803 success, result = success_result | |
804 if success and self._success: | |
805 self._value[i*self._chunksize:(i+1)*self._chunksize] = result | |
806 if self._number_left == 0: | |
807 if self._callback: | |
808 self._callback(self._value) | |
809 del self._cache[self._job] | |
810 self._event.set() | |
811 self._pool = None | |
812 else: | |
813 if not success and self._success: | |
814 # only store first exception | |
815 self._success = False | |
816 self._value = result | |
817 if self._number_left == 0: | |
818 # only consider the result ready once all jobs are done | |
819 if self._error_callback: | |
820 self._error_callback(self._value) | |
821 del self._cache[self._job] | |
822 self._event.set() | |
823 self._pool = None | |
824 | |
825 # | |
826 # Class whose instances are returned by `Pool.imap()` | |
827 # | |
828 | |
829 class IMapIterator(object): | |
830 | |
831 def __init__(self, pool): | |
832 self._pool = pool | |
833 self._cond = threading.Condition(threading.Lock()) | |
834 self._job = next(job_counter) | |
835 self._cache = pool._cache | |
836 self._items = collections.deque() | |
837 self._index = 0 | |
838 self._length = None | |
839 self._unsorted = {} | |
840 self._cache[self._job] = self | |
841 | |
842 def __iter__(self): | |
843 return self | |
844 | |
845 def next(self, timeout=None): | |
846 with self._cond: | |
847 try: | |
848 item = self._items.popleft() | |
849 except IndexError: | |
850 if self._index == self._length: | |
851 self._pool = None | |
852 raise StopIteration from None | |
853 self._cond.wait(timeout) | |
854 try: | |
855 item = self._items.popleft() | |
856 except IndexError: | |
857 if self._index == self._length: | |
858 self._pool = None | |
859 raise StopIteration from None | |
860 raise TimeoutError from None | |
861 | |
862 success, value = item | |
863 if success: | |
864 return value | |
865 raise value | |
866 | |
867 __next__ = next # XXX | |
868 | |
869 def _set(self, i, obj): | |
870 with self._cond: | |
871 if self._index == i: | |
872 self._items.append(obj) | |
873 self._index += 1 | |
874 while self._index in self._unsorted: | |
875 obj = self._unsorted.pop(self._index) | |
876 self._items.append(obj) | |
877 self._index += 1 | |
878 self._cond.notify() | |
879 else: | |
880 self._unsorted[i] = obj | |
881 | |
882 if self._index == self._length: | |
883 del self._cache[self._job] | |
884 self._pool = None | |
885 | |
886 def _set_length(self, length): | |
887 with self._cond: | |
888 self._length = length | |
889 if self._index == self._length: | |
890 self._cond.notify() | |
891 del self._cache[self._job] | |
892 self._pool = None | |
893 | |
894 # | |
895 # Class whose instances are returned by `Pool.imap_unordered()` | |
896 # | |
897 | |
898 class IMapUnorderedIterator(IMapIterator): | |
899 | |
900 def _set(self, i, obj): | |
901 with self._cond: | |
902 self._items.append(obj) | |
903 self._index += 1 | |
904 self._cond.notify() | |
905 if self._index == self._length: | |
906 del self._cache[self._job] | |
907 self._pool = None | |
908 | |
909 # | |
910 # | |
911 # | |
912 | |
913 class ThreadPool(Pool): | |
914 _wrap_exception = False | |
915 | |
916 @staticmethod | |
917 def Process(ctx, *args, **kwds): | |
918 from .dummy import Process | |
919 return Process(*args, **kwds) | |
920 | |
921 def __init__(self, processes=None, initializer=None, initargs=()): | |
922 Pool.__init__(self, processes, initializer, initargs) | |
923 | |
924 def _setup_queues(self): | |
925 self._inqueue = queue.SimpleQueue() | |
926 self._outqueue = queue.SimpleQueue() | |
927 self._quick_put = self._inqueue.put | |
928 self._quick_get = self._outqueue.get | |
929 | |
930 def _get_sentinels(self): | |
931 return [self._change_notifier._reader] | |
932 | |
933 @staticmethod | |
934 def _get_worker_sentinels(workers): | |
935 return [] | |
936 | |
937 @staticmethod | |
938 def _help_stuff_finish(inqueue, task_handler, size): | |
939 # drain inqueue, and put sentinels at its head to make workers finish | |
940 try: | |
941 while True: | |
942 inqueue.get(block=False) | |
943 except queue.Empty: | |
944 pass | |
945 for i in range(size): | |
946 inqueue.put(None) | |
947 | |
948 def _wait_for_updates(self, sentinels, change_notifier, timeout): | |
949 time.sleep(timeout) |