Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/util.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 # | |
2 # Module providing various facilities to other parts of the package | |
3 # | |
4 # multiprocessing/util.py | |
5 # | |
6 # Copyright (c) 2006-2008, R Oudkerk | |
7 # Licensed to PSF under a Contributor Agreement. | |
8 # | |
9 | |
10 import os | |
11 import itertools | |
12 import sys | |
13 import weakref | |
14 import atexit | |
15 import threading # we want threading to install it's | |
16 # cleanup function before multiprocessing does | |
17 from subprocess import _args_from_interpreter_flags | |
18 | |
19 from . import process | |
20 | |
21 __all__ = [ | |
22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', | |
23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', | |
24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', | |
25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', | |
26 ] | |
27 | |
28 # | |
29 # Logging | |
30 # | |
31 | |
32 NOTSET = 0 | |
33 SUBDEBUG = 5 | |
34 DEBUG = 10 | |
35 INFO = 20 | |
36 SUBWARNING = 25 | |
37 | |
38 LOGGER_NAME = 'multiprocessing' | |
39 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' | |
40 | |
41 _logger = None | |
42 _log_to_stderr = False | |
43 | |
44 def sub_debug(msg, *args): | |
45 if _logger: | |
46 _logger.log(SUBDEBUG, msg, *args) | |
47 | |
48 def debug(msg, *args): | |
49 if _logger: | |
50 _logger.log(DEBUG, msg, *args) | |
51 | |
52 def info(msg, *args): | |
53 if _logger: | |
54 _logger.log(INFO, msg, *args) | |
55 | |
56 def sub_warning(msg, *args): | |
57 if _logger: | |
58 _logger.log(SUBWARNING, msg, *args) | |
59 | |
60 def get_logger(): | |
61 ''' | |
62 Returns logger used by multiprocessing | |
63 ''' | |
64 global _logger | |
65 import logging | |
66 | |
67 logging._acquireLock() | |
68 try: | |
69 if not _logger: | |
70 | |
71 _logger = logging.getLogger(LOGGER_NAME) | |
72 _logger.propagate = 0 | |
73 | |
74 # XXX multiprocessing should cleanup before logging | |
75 if hasattr(atexit, 'unregister'): | |
76 atexit.unregister(_exit_function) | |
77 atexit.register(_exit_function) | |
78 else: | |
79 atexit._exithandlers.remove((_exit_function, (), {})) | |
80 atexit._exithandlers.append((_exit_function, (), {})) | |
81 | |
82 finally: | |
83 logging._releaseLock() | |
84 | |
85 return _logger | |
86 | |
87 def log_to_stderr(level=None): | |
88 ''' | |
89 Turn on logging and add a handler which prints to stderr | |
90 ''' | |
91 global _log_to_stderr | |
92 import logging | |
93 | |
94 logger = get_logger() | |
95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) | |
96 handler = logging.StreamHandler() | |
97 handler.setFormatter(formatter) | |
98 logger.addHandler(handler) | |
99 | |
100 if level: | |
101 logger.setLevel(level) | |
102 _log_to_stderr = True | |
103 return _logger | |
104 | |
105 # | |
106 # Function returning a temp directory which will be removed on exit | |
107 # | |
108 | |
109 def _remove_temp_dir(rmtree, tempdir): | |
110 rmtree(tempdir) | |
111 | |
112 current_process = process.current_process() | |
113 # current_process() can be None if the finalizer is called | |
114 # late during Python finalization | |
115 if current_process is not None: | |
116 current_process._config['tempdir'] = None | |
117 | |
118 def get_temp_dir(): | |
119 # get name of a temp directory which will be automatically cleaned up | |
120 tempdir = process.current_process()._config.get('tempdir') | |
121 if tempdir is None: | |
122 import shutil, tempfile | |
123 tempdir = tempfile.mkdtemp(prefix='pymp-') | |
124 info('created temp directory %s', tempdir) | |
125 # keep a strong reference to shutil.rmtree(), since the finalizer | |
126 # can be called late during Python shutdown | |
127 Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), | |
128 exitpriority=-100) | |
129 process.current_process()._config['tempdir'] = tempdir | |
130 return tempdir | |
131 | |
132 # | |
133 # Support for reinitialization of objects when bootstrapping a child process | |
134 # | |
135 | |
136 _afterfork_registry = weakref.WeakValueDictionary() | |
137 _afterfork_counter = itertools.count() | |
138 | |
139 def _run_after_forkers(): | |
140 items = list(_afterfork_registry.items()) | |
141 items.sort() | |
142 for (index, ident, func), obj in items: | |
143 try: | |
144 func(obj) | |
145 except Exception as e: | |
146 info('after forker raised exception %s', e) | |
147 | |
148 def register_after_fork(obj, func): | |
149 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj | |
150 | |
151 # | |
152 # Finalization using weakrefs | |
153 # | |
154 | |
155 _finalizer_registry = {} | |
156 _finalizer_counter = itertools.count() | |
157 | |
158 | |
159 class Finalize(object): | |
160 ''' | |
161 Class which supports object finalization using weakrefs | |
162 ''' | |
163 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): | |
164 if (exitpriority is not None) and not isinstance(exitpriority,int): | |
165 raise TypeError( | |
166 "Exitpriority ({0!r}) must be None or int, not {1!s}".format( | |
167 exitpriority, type(exitpriority))) | |
168 | |
169 if obj is not None: | |
170 self._weakref = weakref.ref(obj, self) | |
171 elif exitpriority is None: | |
172 raise ValueError("Without object, exitpriority cannot be None") | |
173 | |
174 self._callback = callback | |
175 self._args = args | |
176 self._kwargs = kwargs or {} | |
177 self._key = (exitpriority, next(_finalizer_counter)) | |
178 self._pid = os.getpid() | |
179 | |
180 _finalizer_registry[self._key] = self | |
181 | |
182 def __call__(self, wr=None, | |
183 # Need to bind these locally because the globals can have | |
184 # been cleared at shutdown | |
185 _finalizer_registry=_finalizer_registry, | |
186 sub_debug=sub_debug, getpid=os.getpid): | |
187 ''' | |
188 Run the callback unless it has already been called or cancelled | |
189 ''' | |
190 try: | |
191 del _finalizer_registry[self._key] | |
192 except KeyError: | |
193 sub_debug('finalizer no longer registered') | |
194 else: | |
195 if self._pid != getpid(): | |
196 sub_debug('finalizer ignored because different process') | |
197 res = None | |
198 else: | |
199 sub_debug('finalizer calling %s with args %s and kwargs %s', | |
200 self._callback, self._args, self._kwargs) | |
201 res = self._callback(*self._args, **self._kwargs) | |
202 self._weakref = self._callback = self._args = \ | |
203 self._kwargs = self._key = None | |
204 return res | |
205 | |
206 def cancel(self): | |
207 ''' | |
208 Cancel finalization of the object | |
209 ''' | |
210 try: | |
211 del _finalizer_registry[self._key] | |
212 except KeyError: | |
213 pass | |
214 else: | |
215 self._weakref = self._callback = self._args = \ | |
216 self._kwargs = self._key = None | |
217 | |
218 def still_active(self): | |
219 ''' | |
220 Return whether this finalizer is still waiting to invoke callback | |
221 ''' | |
222 return self._key in _finalizer_registry | |
223 | |
224 def __repr__(self): | |
225 try: | |
226 obj = self._weakref() | |
227 except (AttributeError, TypeError): | |
228 obj = None | |
229 | |
230 if obj is None: | |
231 return '<%s object, dead>' % self.__class__.__name__ | |
232 | |
233 x = '<%s object, callback=%s' % ( | |
234 self.__class__.__name__, | |
235 getattr(self._callback, '__name__', self._callback)) | |
236 if self._args: | |
237 x += ', args=' + str(self._args) | |
238 if self._kwargs: | |
239 x += ', kwargs=' + str(self._kwargs) | |
240 if self._key[0] is not None: | |
241 x += ', exitpriority=' + str(self._key[0]) | |
242 return x + '>' | |
243 | |
244 | |
245 def _run_finalizers(minpriority=None): | |
246 ''' | |
247 Run all finalizers whose exit priority is not None and at least minpriority | |
248 | |
249 Finalizers with highest priority are called first; finalizers with | |
250 the same priority will be called in reverse order of creation. | |
251 ''' | |
252 if _finalizer_registry is None: | |
253 # This function may be called after this module's globals are | |
254 # destroyed. See the _exit_function function in this module for more | |
255 # notes. | |
256 return | |
257 | |
258 if minpriority is None: | |
259 f = lambda p : p[0] is not None | |
260 else: | |
261 f = lambda p : p[0] is not None and p[0] >= minpriority | |
262 | |
263 # Careful: _finalizer_registry may be mutated while this function | |
264 # is running (either by a GC run or by another thread). | |
265 | |
266 # list(_finalizer_registry) should be atomic, while | |
267 # list(_finalizer_registry.items()) is not. | |
268 keys = [key for key in list(_finalizer_registry) if f(key)] | |
269 keys.sort(reverse=True) | |
270 | |
271 for key in keys: | |
272 finalizer = _finalizer_registry.get(key) | |
273 # key may have been removed from the registry | |
274 if finalizer is not None: | |
275 sub_debug('calling %s', finalizer) | |
276 try: | |
277 finalizer() | |
278 except Exception: | |
279 import traceback | |
280 traceback.print_exc() | |
281 | |
282 if minpriority is None: | |
283 _finalizer_registry.clear() | |
284 | |
285 # | |
286 # Clean up on exit | |
287 # | |
288 | |
289 def is_exiting(): | |
290 ''' | |
291 Returns true if the process is shutting down | |
292 ''' | |
293 return _exiting or _exiting is None | |
294 | |
295 _exiting = False | |
296 | |
297 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, | |
298 active_children=process.active_children, | |
299 current_process=process.current_process): | |
300 # We hold on to references to functions in the arglist due to the | |
301 # situation described below, where this function is called after this | |
302 # module's globals are destroyed. | |
303 | |
304 global _exiting | |
305 | |
306 if not _exiting: | |
307 _exiting = True | |
308 | |
309 info('process shutting down') | |
310 debug('running all "atexit" finalizers with priority >= 0') | |
311 _run_finalizers(0) | |
312 | |
313 if current_process() is not None: | |
314 # We check if the current process is None here because if | |
315 # it's None, any call to ``active_children()`` will raise | |
316 # an AttributeError (active_children winds up trying to | |
317 # get attributes from util._current_process). One | |
318 # situation where this can happen is if someone has | |
319 # manipulated sys.modules, causing this module to be | |
320 # garbage collected. The destructor for the module type | |
321 # then replaces all values in the module dict with None. | |
322 # For instance, after setuptools runs a test it replaces | |
323 # sys.modules with a copy created earlier. See issues | |
324 # #9775 and #15881. Also related: #4106, #9205, and | |
325 # #9207. | |
326 | |
327 for p in active_children(): | |
328 if p.daemon: | |
329 info('calling terminate() for daemon %s', p.name) | |
330 p._popen.terminate() | |
331 | |
332 for p in active_children(): | |
333 info('calling join() for process %s', p.name) | |
334 p.join() | |
335 | |
336 debug('running the remaining "atexit" finalizers') | |
337 _run_finalizers() | |
338 | |
339 atexit.register(_exit_function) | |
340 | |
341 # | |
342 # Some fork aware types | |
343 # | |
344 | |
345 class ForkAwareThreadLock(object): | |
346 def __init__(self): | |
347 self._reset() | |
348 register_after_fork(self, ForkAwareThreadLock._reset) | |
349 | |
350 def _reset(self): | |
351 self._lock = threading.Lock() | |
352 self.acquire = self._lock.acquire | |
353 self.release = self._lock.release | |
354 | |
355 def __enter__(self): | |
356 return self._lock.__enter__() | |
357 | |
358 def __exit__(self, *args): | |
359 return self._lock.__exit__(*args) | |
360 | |
361 | |
362 class ForkAwareLocal(threading.local): | |
363 def __init__(self): | |
364 register_after_fork(self, lambda obj : obj.__dict__.clear()) | |
365 def __reduce__(self): | |
366 return type(self), () | |
367 | |
368 # | |
369 # Close fds except those specified | |
370 # | |
371 | |
372 try: | |
373 MAXFD = os.sysconf("SC_OPEN_MAX") | |
374 except Exception: | |
375 MAXFD = 256 | |
376 | |
377 def close_all_fds_except(fds): | |
378 fds = list(fds) + [-1, MAXFD] | |
379 fds.sort() | |
380 assert fds[-1] == MAXFD, 'fd too large' | |
381 for i in range(len(fds) - 1): | |
382 os.closerange(fds[i]+1, fds[i+1]) | |
383 # | |
384 # Close sys.stdin and replace stdin with os.devnull | |
385 # | |
386 | |
387 def _close_stdin(): | |
388 if sys.stdin is None: | |
389 return | |
390 | |
391 try: | |
392 sys.stdin.close() | |
393 except (OSError, ValueError): | |
394 pass | |
395 | |
396 try: | |
397 fd = os.open(os.devnull, os.O_RDONLY) | |
398 try: | |
399 sys.stdin = open(fd, closefd=False) | |
400 except: | |
401 os.close(fd) | |
402 raise | |
403 except (OSError, ValueError): | |
404 pass | |
405 | |
406 # | |
407 # Flush standard streams, if any | |
408 # | |
409 | |
410 def _flush_std_streams(): | |
411 try: | |
412 sys.stdout.flush() | |
413 except (AttributeError, ValueError): | |
414 pass | |
415 try: | |
416 sys.stderr.flush() | |
417 except (AttributeError, ValueError): | |
418 pass | |
419 | |
420 # | |
421 # Start a program with only specified fds kept open | |
422 # | |
423 | |
424 def spawnv_passfds(path, args, passfds): | |
425 import _posixsubprocess | |
426 passfds = tuple(sorted(map(int, passfds))) | |
427 errpipe_read, errpipe_write = os.pipe() | |
428 try: | |
429 return _posixsubprocess.fork_exec( | |
430 args, [os.fsencode(path)], True, passfds, None, None, | |
431 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, | |
432 False, False, None) | |
433 finally: | |
434 os.close(errpipe_read) | |
435 os.close(errpipe_write) | |
436 | |
437 | |
438 def close_fds(*fds): | |
439 """Close each file descriptor given as an argument""" | |
440 for fd in fds: | |
441 os.close(fd) | |
442 | |
443 | |
444 def _cleanup_tests(): | |
445 """Cleanup multiprocessing resources when multiprocessing tests | |
446 completed.""" | |
447 | |
448 from test import support | |
449 | |
450 # cleanup multiprocessing | |
451 process._cleanup() | |
452 | |
453 # Stop the ForkServer process if it's running | |
454 from multiprocessing import forkserver | |
455 forkserver._forkserver._stop() | |
456 | |
457 # Stop the ResourceTracker process if it's running | |
458 from multiprocessing import resource_tracker | |
459 resource_tracker._resource_tracker._stop() | |
460 | |
461 # bpo-37421: Explicitly call _run_finalizers() to remove immediately | |
462 # temporary directories created by multiprocessing.util.get_temp_dir(). | |
463 _run_finalizers() | |
464 support.gc_collect() | |
465 | |
466 support.reap_children() |