annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/util.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 #
jpayne@68 2 # Module providing various facilities to other parts of the package
jpayne@68 3 #
jpayne@68 4 # multiprocessing/util.py
jpayne@68 5 #
jpayne@68 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@68 7 # Licensed to PSF under a Contributor Agreement.
jpayne@68 8 #
jpayne@68 9
jpayne@68 10 import os
jpayne@68 11 import itertools
jpayne@68 12 import sys
jpayne@68 13 import weakref
jpayne@68 14 import atexit
jpayne@68 15 import threading # we want threading to install it's
jpayne@68 16 # cleanup function before multiprocessing does
jpayne@68 17 from subprocess import _args_from_interpreter_flags
jpayne@68 18
jpayne@68 19 from . import process
jpayne@68 20
jpayne@68 21 __all__ = [
jpayne@68 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
jpayne@68 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
jpayne@68 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
jpayne@68 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
jpayne@68 26 ]
jpayne@68 27
jpayne@68 28 #
jpayne@68 29 # Logging
jpayne@68 30 #
jpayne@68 31
jpayne@68 32 NOTSET = 0
jpayne@68 33 SUBDEBUG = 5
jpayne@68 34 DEBUG = 10
jpayne@68 35 INFO = 20
jpayne@68 36 SUBWARNING = 25
jpayne@68 37
jpayne@68 38 LOGGER_NAME = 'multiprocessing'
jpayne@68 39 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
jpayne@68 40
jpayne@68 41 _logger = None
jpayne@68 42 _log_to_stderr = False
jpayne@68 43
jpayne@68 44 def sub_debug(msg, *args):
jpayne@68 45 if _logger:
jpayne@68 46 _logger.log(SUBDEBUG, msg, *args)
jpayne@68 47
jpayne@68 48 def debug(msg, *args):
jpayne@68 49 if _logger:
jpayne@68 50 _logger.log(DEBUG, msg, *args)
jpayne@68 51
jpayne@68 52 def info(msg, *args):
jpayne@68 53 if _logger:
jpayne@68 54 _logger.log(INFO, msg, *args)
jpayne@68 55
jpayne@68 56 def sub_warning(msg, *args):
jpayne@68 57 if _logger:
jpayne@68 58 _logger.log(SUBWARNING, msg, *args)
jpayne@68 59
jpayne@68 60 def get_logger():
jpayne@68 61 '''
jpayne@68 62 Returns logger used by multiprocessing
jpayne@68 63 '''
jpayne@68 64 global _logger
jpayne@68 65 import logging
jpayne@68 66
jpayne@68 67 logging._acquireLock()
jpayne@68 68 try:
jpayne@68 69 if not _logger:
jpayne@68 70
jpayne@68 71 _logger = logging.getLogger(LOGGER_NAME)
jpayne@68 72 _logger.propagate = 0
jpayne@68 73
jpayne@68 74 # XXX multiprocessing should cleanup before logging
jpayne@68 75 if hasattr(atexit, 'unregister'):
jpayne@68 76 atexit.unregister(_exit_function)
jpayne@68 77 atexit.register(_exit_function)
jpayne@68 78 else:
jpayne@68 79 atexit._exithandlers.remove((_exit_function, (), {}))
jpayne@68 80 atexit._exithandlers.append((_exit_function, (), {}))
jpayne@68 81
jpayne@68 82 finally:
jpayne@68 83 logging._releaseLock()
jpayne@68 84
jpayne@68 85 return _logger
jpayne@68 86
jpayne@68 87 def log_to_stderr(level=None):
jpayne@68 88 '''
jpayne@68 89 Turn on logging and add a handler which prints to stderr
jpayne@68 90 '''
jpayne@68 91 global _log_to_stderr
jpayne@68 92 import logging
jpayne@68 93
jpayne@68 94 logger = get_logger()
jpayne@68 95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
jpayne@68 96 handler = logging.StreamHandler()
jpayne@68 97 handler.setFormatter(formatter)
jpayne@68 98 logger.addHandler(handler)
jpayne@68 99
jpayne@68 100 if level:
jpayne@68 101 logger.setLevel(level)
jpayne@68 102 _log_to_stderr = True
jpayne@68 103 return _logger
jpayne@68 104
jpayne@68 105 #
jpayne@68 106 # Function returning a temp directory which will be removed on exit
jpayne@68 107 #
jpayne@68 108
jpayne@68 109 def _remove_temp_dir(rmtree, tempdir):
jpayne@68 110 rmtree(tempdir)
jpayne@68 111
jpayne@68 112 current_process = process.current_process()
jpayne@68 113 # current_process() can be None if the finalizer is called
jpayne@68 114 # late during Python finalization
jpayne@68 115 if current_process is not None:
jpayne@68 116 current_process._config['tempdir'] = None
jpayne@68 117
jpayne@68 118 def get_temp_dir():
jpayne@68 119 # get name of a temp directory which will be automatically cleaned up
jpayne@68 120 tempdir = process.current_process()._config.get('tempdir')
jpayne@68 121 if tempdir is None:
jpayne@68 122 import shutil, tempfile
jpayne@68 123 tempdir = tempfile.mkdtemp(prefix='pymp-')
jpayne@68 124 info('created temp directory %s', tempdir)
jpayne@68 125 # keep a strong reference to shutil.rmtree(), since the finalizer
jpayne@68 126 # can be called late during Python shutdown
jpayne@68 127 Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
jpayne@68 128 exitpriority=-100)
jpayne@68 129 process.current_process()._config['tempdir'] = tempdir
jpayne@68 130 return tempdir
jpayne@68 131
jpayne@68 132 #
jpayne@68 133 # Support for reinitialization of objects when bootstrapping a child process
jpayne@68 134 #
jpayne@68 135
jpayne@68 136 _afterfork_registry = weakref.WeakValueDictionary()
jpayne@68 137 _afterfork_counter = itertools.count()
jpayne@68 138
jpayne@68 139 def _run_after_forkers():
jpayne@68 140 items = list(_afterfork_registry.items())
jpayne@68 141 items.sort()
jpayne@68 142 for (index, ident, func), obj in items:
jpayne@68 143 try:
jpayne@68 144 func(obj)
jpayne@68 145 except Exception as e:
jpayne@68 146 info('after forker raised exception %s', e)
jpayne@68 147
jpayne@68 148 def register_after_fork(obj, func):
jpayne@68 149 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
jpayne@68 150
jpayne@68 151 #
jpayne@68 152 # Finalization using weakrefs
jpayne@68 153 #
jpayne@68 154
jpayne@68 155 _finalizer_registry = {}
jpayne@68 156 _finalizer_counter = itertools.count()
jpayne@68 157
jpayne@68 158
jpayne@68 159 class Finalize(object):
jpayne@68 160 '''
jpayne@68 161 Class which supports object finalization using weakrefs
jpayne@68 162 '''
jpayne@68 163 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
jpayne@68 164 if (exitpriority is not None) and not isinstance(exitpriority,int):
jpayne@68 165 raise TypeError(
jpayne@68 166 "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
jpayne@68 167 exitpriority, type(exitpriority)))
jpayne@68 168
jpayne@68 169 if obj is not None:
jpayne@68 170 self._weakref = weakref.ref(obj, self)
jpayne@68 171 elif exitpriority is None:
jpayne@68 172 raise ValueError("Without object, exitpriority cannot be None")
jpayne@68 173
jpayne@68 174 self._callback = callback
jpayne@68 175 self._args = args
jpayne@68 176 self._kwargs = kwargs or {}
jpayne@68 177 self._key = (exitpriority, next(_finalizer_counter))
jpayne@68 178 self._pid = os.getpid()
jpayne@68 179
jpayne@68 180 _finalizer_registry[self._key] = self
jpayne@68 181
jpayne@68 182 def __call__(self, wr=None,
jpayne@68 183 # Need to bind these locally because the globals can have
jpayne@68 184 # been cleared at shutdown
jpayne@68 185 _finalizer_registry=_finalizer_registry,
jpayne@68 186 sub_debug=sub_debug, getpid=os.getpid):
jpayne@68 187 '''
jpayne@68 188 Run the callback unless it has already been called or cancelled
jpayne@68 189 '''
jpayne@68 190 try:
jpayne@68 191 del _finalizer_registry[self._key]
jpayne@68 192 except KeyError:
jpayne@68 193 sub_debug('finalizer no longer registered')
jpayne@68 194 else:
jpayne@68 195 if self._pid != getpid():
jpayne@68 196 sub_debug('finalizer ignored because different process')
jpayne@68 197 res = None
jpayne@68 198 else:
jpayne@68 199 sub_debug('finalizer calling %s with args %s and kwargs %s',
jpayne@68 200 self._callback, self._args, self._kwargs)
jpayne@68 201 res = self._callback(*self._args, **self._kwargs)
jpayne@68 202 self._weakref = self._callback = self._args = \
jpayne@68 203 self._kwargs = self._key = None
jpayne@68 204 return res
jpayne@68 205
jpayne@68 206 def cancel(self):
jpayne@68 207 '''
jpayne@68 208 Cancel finalization of the object
jpayne@68 209 '''
jpayne@68 210 try:
jpayne@68 211 del _finalizer_registry[self._key]
jpayne@68 212 except KeyError:
jpayne@68 213 pass
jpayne@68 214 else:
jpayne@68 215 self._weakref = self._callback = self._args = \
jpayne@68 216 self._kwargs = self._key = None
jpayne@68 217
jpayne@68 218 def still_active(self):
jpayne@68 219 '''
jpayne@68 220 Return whether this finalizer is still waiting to invoke callback
jpayne@68 221 '''
jpayne@68 222 return self._key in _finalizer_registry
jpayne@68 223
jpayne@68 224 def __repr__(self):
jpayne@68 225 try:
jpayne@68 226 obj = self._weakref()
jpayne@68 227 except (AttributeError, TypeError):
jpayne@68 228 obj = None
jpayne@68 229
jpayne@68 230 if obj is None:
jpayne@68 231 return '<%s object, dead>' % self.__class__.__name__
jpayne@68 232
jpayne@68 233 x = '<%s object, callback=%s' % (
jpayne@68 234 self.__class__.__name__,
jpayne@68 235 getattr(self._callback, '__name__', self._callback))
jpayne@68 236 if self._args:
jpayne@68 237 x += ', args=' + str(self._args)
jpayne@68 238 if self._kwargs:
jpayne@68 239 x += ', kwargs=' + str(self._kwargs)
jpayne@68 240 if self._key[0] is not None:
jpayne@68 241 x += ', exitpriority=' + str(self._key[0])
jpayne@68 242 return x + '>'
jpayne@68 243
jpayne@68 244
jpayne@68 245 def _run_finalizers(minpriority=None):
jpayne@68 246 '''
jpayne@68 247 Run all finalizers whose exit priority is not None and at least minpriority
jpayne@68 248
jpayne@68 249 Finalizers with highest priority are called first; finalizers with
jpayne@68 250 the same priority will be called in reverse order of creation.
jpayne@68 251 '''
jpayne@68 252 if _finalizer_registry is None:
jpayne@68 253 # This function may be called after this module's globals are
jpayne@68 254 # destroyed. See the _exit_function function in this module for more
jpayne@68 255 # notes.
jpayne@68 256 return
jpayne@68 257
jpayne@68 258 if minpriority is None:
jpayne@68 259 f = lambda p : p[0] is not None
jpayne@68 260 else:
jpayne@68 261 f = lambda p : p[0] is not None and p[0] >= minpriority
jpayne@68 262
jpayne@68 263 # Careful: _finalizer_registry may be mutated while this function
jpayne@68 264 # is running (either by a GC run or by another thread).
jpayne@68 265
jpayne@68 266 # list(_finalizer_registry) should be atomic, while
jpayne@68 267 # list(_finalizer_registry.items()) is not.
jpayne@68 268 keys = [key for key in list(_finalizer_registry) if f(key)]
jpayne@68 269 keys.sort(reverse=True)
jpayne@68 270
jpayne@68 271 for key in keys:
jpayne@68 272 finalizer = _finalizer_registry.get(key)
jpayne@68 273 # key may have been removed from the registry
jpayne@68 274 if finalizer is not None:
jpayne@68 275 sub_debug('calling %s', finalizer)
jpayne@68 276 try:
jpayne@68 277 finalizer()
jpayne@68 278 except Exception:
jpayne@68 279 import traceback
jpayne@68 280 traceback.print_exc()
jpayne@68 281
jpayne@68 282 if minpriority is None:
jpayne@68 283 _finalizer_registry.clear()
jpayne@68 284
jpayne@68 285 #
jpayne@68 286 # Clean up on exit
jpayne@68 287 #
jpayne@68 288
jpayne@68 289 def is_exiting():
jpayne@68 290 '''
jpayne@68 291 Returns true if the process is shutting down
jpayne@68 292 '''
jpayne@68 293 return _exiting or _exiting is None
jpayne@68 294
jpayne@68 295 _exiting = False
jpayne@68 296
jpayne@68 297 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
jpayne@68 298 active_children=process.active_children,
jpayne@68 299 current_process=process.current_process):
jpayne@68 300 # We hold on to references to functions in the arglist due to the
jpayne@68 301 # situation described below, where this function is called after this
jpayne@68 302 # module's globals are destroyed.
jpayne@68 303
jpayne@68 304 global _exiting
jpayne@68 305
jpayne@68 306 if not _exiting:
jpayne@68 307 _exiting = True
jpayne@68 308
jpayne@68 309 info('process shutting down')
jpayne@68 310 debug('running all "atexit" finalizers with priority >= 0')
jpayne@68 311 _run_finalizers(0)
jpayne@68 312
jpayne@68 313 if current_process() is not None:
jpayne@68 314 # We check if the current process is None here because if
jpayne@68 315 # it's None, any call to ``active_children()`` will raise
jpayne@68 316 # an AttributeError (active_children winds up trying to
jpayne@68 317 # get attributes from util._current_process). One
jpayne@68 318 # situation where this can happen is if someone has
jpayne@68 319 # manipulated sys.modules, causing this module to be
jpayne@68 320 # garbage collected. The destructor for the module type
jpayne@68 321 # then replaces all values in the module dict with None.
jpayne@68 322 # For instance, after setuptools runs a test it replaces
jpayne@68 323 # sys.modules with a copy created earlier. See issues
jpayne@68 324 # #9775 and #15881. Also related: #4106, #9205, and
jpayne@68 325 # #9207.
jpayne@68 326
jpayne@68 327 for p in active_children():
jpayne@68 328 if p.daemon:
jpayne@68 329 info('calling terminate() for daemon %s', p.name)
jpayne@68 330 p._popen.terminate()
jpayne@68 331
jpayne@68 332 for p in active_children():
jpayne@68 333 info('calling join() for process %s', p.name)
jpayne@68 334 p.join()
jpayne@68 335
jpayne@68 336 debug('running the remaining "atexit" finalizers')
jpayne@68 337 _run_finalizers()
jpayne@68 338
jpayne@68 339 atexit.register(_exit_function)
jpayne@68 340
jpayne@68 341 #
jpayne@68 342 # Some fork aware types
jpayne@68 343 #
jpayne@68 344
jpayne@68 345 class ForkAwareThreadLock(object):
jpayne@68 346 def __init__(self):
jpayne@68 347 self._reset()
jpayne@68 348 register_after_fork(self, ForkAwareThreadLock._reset)
jpayne@68 349
jpayne@68 350 def _reset(self):
jpayne@68 351 self._lock = threading.Lock()
jpayne@68 352 self.acquire = self._lock.acquire
jpayne@68 353 self.release = self._lock.release
jpayne@68 354
jpayne@68 355 def __enter__(self):
jpayne@68 356 return self._lock.__enter__()
jpayne@68 357
jpayne@68 358 def __exit__(self, *args):
jpayne@68 359 return self._lock.__exit__(*args)
jpayne@68 360
jpayne@68 361
jpayne@68 362 class ForkAwareLocal(threading.local):
jpayne@68 363 def __init__(self):
jpayne@68 364 register_after_fork(self, lambda obj : obj.__dict__.clear())
jpayne@68 365 def __reduce__(self):
jpayne@68 366 return type(self), ()
jpayne@68 367
jpayne@68 368 #
jpayne@68 369 # Close fds except those specified
jpayne@68 370 #
jpayne@68 371
jpayne@68 372 try:
jpayne@68 373 MAXFD = os.sysconf("SC_OPEN_MAX")
jpayne@68 374 except Exception:
jpayne@68 375 MAXFD = 256
jpayne@68 376
jpayne@68 377 def close_all_fds_except(fds):
jpayne@68 378 fds = list(fds) + [-1, MAXFD]
jpayne@68 379 fds.sort()
jpayne@68 380 assert fds[-1] == MAXFD, 'fd too large'
jpayne@68 381 for i in range(len(fds) - 1):
jpayne@68 382 os.closerange(fds[i]+1, fds[i+1])
jpayne@68 383 #
jpayne@68 384 # Close sys.stdin and replace stdin with os.devnull
jpayne@68 385 #
jpayne@68 386
jpayne@68 387 def _close_stdin():
jpayne@68 388 if sys.stdin is None:
jpayne@68 389 return
jpayne@68 390
jpayne@68 391 try:
jpayne@68 392 sys.stdin.close()
jpayne@68 393 except (OSError, ValueError):
jpayne@68 394 pass
jpayne@68 395
jpayne@68 396 try:
jpayne@68 397 fd = os.open(os.devnull, os.O_RDONLY)
jpayne@68 398 try:
jpayne@68 399 sys.stdin = open(fd, closefd=False)
jpayne@68 400 except:
jpayne@68 401 os.close(fd)
jpayne@68 402 raise
jpayne@68 403 except (OSError, ValueError):
jpayne@68 404 pass
jpayne@68 405
jpayne@68 406 #
jpayne@68 407 # Flush standard streams, if any
jpayne@68 408 #
jpayne@68 409
jpayne@68 410 def _flush_std_streams():
jpayne@68 411 try:
jpayne@68 412 sys.stdout.flush()
jpayne@68 413 except (AttributeError, ValueError):
jpayne@68 414 pass
jpayne@68 415 try:
jpayne@68 416 sys.stderr.flush()
jpayne@68 417 except (AttributeError, ValueError):
jpayne@68 418 pass
jpayne@68 419
jpayne@68 420 #
jpayne@68 421 # Start a program with only specified fds kept open
jpayne@68 422 #
jpayne@68 423
jpayne@68 424 def spawnv_passfds(path, args, passfds):
jpayne@68 425 import _posixsubprocess
jpayne@68 426 passfds = tuple(sorted(map(int, passfds)))
jpayne@68 427 errpipe_read, errpipe_write = os.pipe()
jpayne@68 428 try:
jpayne@68 429 return _posixsubprocess.fork_exec(
jpayne@68 430 args, [os.fsencode(path)], True, passfds, None, None,
jpayne@68 431 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
jpayne@68 432 False, False, None)
jpayne@68 433 finally:
jpayne@68 434 os.close(errpipe_read)
jpayne@68 435 os.close(errpipe_write)
jpayne@68 436
jpayne@68 437
jpayne@68 438 def close_fds(*fds):
jpayne@68 439 """Close each file descriptor given as an argument"""
jpayne@68 440 for fd in fds:
jpayne@68 441 os.close(fd)
jpayne@68 442
jpayne@68 443
jpayne@68 444 def _cleanup_tests():
jpayne@68 445 """Cleanup multiprocessing resources when multiprocessing tests
jpayne@68 446 completed."""
jpayne@68 447
jpayne@68 448 from test import support
jpayne@68 449
jpayne@68 450 # cleanup multiprocessing
jpayne@68 451 process._cleanup()
jpayne@68 452
jpayne@68 453 # Stop the ForkServer process if it's running
jpayne@68 454 from multiprocessing import forkserver
jpayne@68 455 forkserver._forkserver._stop()
jpayne@68 456
jpayne@68 457 # Stop the ResourceTracker process if it's running
jpayne@68 458 from multiprocessing import resource_tracker
jpayne@68 459 resource_tracker._resource_tracker._stop()
jpayne@68 460
jpayne@68 461 # bpo-37421: Explicitly call _run_finalizers() to remove immediately
jpayne@68 462 # temporary directories created by multiprocessing.util.get_temp_dir().
jpayne@68 463 _run_finalizers()
jpayne@68 464 support.gc_collect()
jpayne@68 465
jpayne@68 466 support.reap_children()