annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/util.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 #
jpayne@69 2 # Module providing various facilities to other parts of the package
jpayne@69 3 #
jpayne@69 4 # multiprocessing/util.py
jpayne@69 5 #
jpayne@69 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@69 7 # Licensed to PSF under a Contributor Agreement.
jpayne@69 8 #
jpayne@69 9
jpayne@69 10 import os
jpayne@69 11 import itertools
jpayne@69 12 import sys
jpayne@69 13 import weakref
jpayne@69 14 import atexit
jpayne@69 15 import threading # we want threading to install it's
jpayne@69 16 # cleanup function before multiprocessing does
jpayne@69 17 from subprocess import _args_from_interpreter_flags
jpayne@69 18
jpayne@69 19 from . import process
jpayne@69 20
jpayne@69 21 __all__ = [
jpayne@69 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
jpayne@69 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
jpayne@69 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
jpayne@69 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
jpayne@69 26 ]
jpayne@69 27
jpayne@69 28 #
jpayne@69 29 # Logging
jpayne@69 30 #
jpayne@69 31
jpayne@69 32 NOTSET = 0
jpayne@69 33 SUBDEBUG = 5
jpayne@69 34 DEBUG = 10
jpayne@69 35 INFO = 20
jpayne@69 36 SUBWARNING = 25
jpayne@69 37
jpayne@69 38 LOGGER_NAME = 'multiprocessing'
jpayne@69 39 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
jpayne@69 40
jpayne@69 41 _logger = None
jpayne@69 42 _log_to_stderr = False
jpayne@69 43
jpayne@69 44 def sub_debug(msg, *args):
jpayne@69 45 if _logger:
jpayne@69 46 _logger.log(SUBDEBUG, msg, *args)
jpayne@69 47
jpayne@69 48 def debug(msg, *args):
jpayne@69 49 if _logger:
jpayne@69 50 _logger.log(DEBUG, msg, *args)
jpayne@69 51
jpayne@69 52 def info(msg, *args):
jpayne@69 53 if _logger:
jpayne@69 54 _logger.log(INFO, msg, *args)
jpayne@69 55
jpayne@69 56 def sub_warning(msg, *args):
jpayne@69 57 if _logger:
jpayne@69 58 _logger.log(SUBWARNING, msg, *args)
jpayne@69 59
jpayne@69 60 def get_logger():
jpayne@69 61 '''
jpayne@69 62 Returns logger used by multiprocessing
jpayne@69 63 '''
jpayne@69 64 global _logger
jpayne@69 65 import logging
jpayne@69 66
jpayne@69 67 logging._acquireLock()
jpayne@69 68 try:
jpayne@69 69 if not _logger:
jpayne@69 70
jpayne@69 71 _logger = logging.getLogger(LOGGER_NAME)
jpayne@69 72 _logger.propagate = 0
jpayne@69 73
jpayne@69 74 # XXX multiprocessing should cleanup before logging
jpayne@69 75 if hasattr(atexit, 'unregister'):
jpayne@69 76 atexit.unregister(_exit_function)
jpayne@69 77 atexit.register(_exit_function)
jpayne@69 78 else:
jpayne@69 79 atexit._exithandlers.remove((_exit_function, (), {}))
jpayne@69 80 atexit._exithandlers.append((_exit_function, (), {}))
jpayne@69 81
jpayne@69 82 finally:
jpayne@69 83 logging._releaseLock()
jpayne@69 84
jpayne@69 85 return _logger
jpayne@69 86
jpayne@69 87 def log_to_stderr(level=None):
jpayne@69 88 '''
jpayne@69 89 Turn on logging and add a handler which prints to stderr
jpayne@69 90 '''
jpayne@69 91 global _log_to_stderr
jpayne@69 92 import logging
jpayne@69 93
jpayne@69 94 logger = get_logger()
jpayne@69 95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
jpayne@69 96 handler = logging.StreamHandler()
jpayne@69 97 handler.setFormatter(formatter)
jpayne@69 98 logger.addHandler(handler)
jpayne@69 99
jpayne@69 100 if level:
jpayne@69 101 logger.setLevel(level)
jpayne@69 102 _log_to_stderr = True
jpayne@69 103 return _logger
jpayne@69 104
jpayne@69 105 #
jpayne@69 106 # Function returning a temp directory which will be removed on exit
jpayne@69 107 #
jpayne@69 108
jpayne@69 109 def _remove_temp_dir(rmtree, tempdir):
jpayne@69 110 rmtree(tempdir)
jpayne@69 111
jpayne@69 112 current_process = process.current_process()
jpayne@69 113 # current_process() can be None if the finalizer is called
jpayne@69 114 # late during Python finalization
jpayne@69 115 if current_process is not None:
jpayne@69 116 current_process._config['tempdir'] = None
jpayne@69 117
jpayne@69 118 def get_temp_dir():
jpayne@69 119 # get name of a temp directory which will be automatically cleaned up
jpayne@69 120 tempdir = process.current_process()._config.get('tempdir')
jpayne@69 121 if tempdir is None:
jpayne@69 122 import shutil, tempfile
jpayne@69 123 tempdir = tempfile.mkdtemp(prefix='pymp-')
jpayne@69 124 info('created temp directory %s', tempdir)
jpayne@69 125 # keep a strong reference to shutil.rmtree(), since the finalizer
jpayne@69 126 # can be called late during Python shutdown
jpayne@69 127 Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
jpayne@69 128 exitpriority=-100)
jpayne@69 129 process.current_process()._config['tempdir'] = tempdir
jpayne@69 130 return tempdir
jpayne@69 131
jpayne@69 132 #
jpayne@69 133 # Support for reinitialization of objects when bootstrapping a child process
jpayne@69 134 #
jpayne@69 135
jpayne@69 136 _afterfork_registry = weakref.WeakValueDictionary()
jpayne@69 137 _afterfork_counter = itertools.count()
jpayne@69 138
jpayne@69 139 def _run_after_forkers():
jpayne@69 140 items = list(_afterfork_registry.items())
jpayne@69 141 items.sort()
jpayne@69 142 for (index, ident, func), obj in items:
jpayne@69 143 try:
jpayne@69 144 func(obj)
jpayne@69 145 except Exception as e:
jpayne@69 146 info('after forker raised exception %s', e)
jpayne@69 147
jpayne@69 148 def register_after_fork(obj, func):
jpayne@69 149 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
jpayne@69 150
jpayne@69 151 #
jpayne@69 152 # Finalization using weakrefs
jpayne@69 153 #
jpayne@69 154
jpayne@69 155 _finalizer_registry = {}
jpayne@69 156 _finalizer_counter = itertools.count()
jpayne@69 157
jpayne@69 158
jpayne@69 159 class Finalize(object):
jpayne@69 160 '''
jpayne@69 161 Class which supports object finalization using weakrefs
jpayne@69 162 '''
jpayne@69 163 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
jpayne@69 164 if (exitpriority is not None) and not isinstance(exitpriority,int):
jpayne@69 165 raise TypeError(
jpayne@69 166 "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
jpayne@69 167 exitpriority, type(exitpriority)))
jpayne@69 168
jpayne@69 169 if obj is not None:
jpayne@69 170 self._weakref = weakref.ref(obj, self)
jpayne@69 171 elif exitpriority is None:
jpayne@69 172 raise ValueError("Without object, exitpriority cannot be None")
jpayne@69 173
jpayne@69 174 self._callback = callback
jpayne@69 175 self._args = args
jpayne@69 176 self._kwargs = kwargs or {}
jpayne@69 177 self._key = (exitpriority, next(_finalizer_counter))
jpayne@69 178 self._pid = os.getpid()
jpayne@69 179
jpayne@69 180 _finalizer_registry[self._key] = self
jpayne@69 181
jpayne@69 182 def __call__(self, wr=None,
jpayne@69 183 # Need to bind these locally because the globals can have
jpayne@69 184 # been cleared at shutdown
jpayne@69 185 _finalizer_registry=_finalizer_registry,
jpayne@69 186 sub_debug=sub_debug, getpid=os.getpid):
jpayne@69 187 '''
jpayne@69 188 Run the callback unless it has already been called or cancelled
jpayne@69 189 '''
jpayne@69 190 try:
jpayne@69 191 del _finalizer_registry[self._key]
jpayne@69 192 except KeyError:
jpayne@69 193 sub_debug('finalizer no longer registered')
jpayne@69 194 else:
jpayne@69 195 if self._pid != getpid():
jpayne@69 196 sub_debug('finalizer ignored because different process')
jpayne@69 197 res = None
jpayne@69 198 else:
jpayne@69 199 sub_debug('finalizer calling %s with args %s and kwargs %s',
jpayne@69 200 self._callback, self._args, self._kwargs)
jpayne@69 201 res = self._callback(*self._args, **self._kwargs)
jpayne@69 202 self._weakref = self._callback = self._args = \
jpayne@69 203 self._kwargs = self._key = None
jpayne@69 204 return res
jpayne@69 205
jpayne@69 206 def cancel(self):
jpayne@69 207 '''
jpayne@69 208 Cancel finalization of the object
jpayne@69 209 '''
jpayne@69 210 try:
jpayne@69 211 del _finalizer_registry[self._key]
jpayne@69 212 except KeyError:
jpayne@69 213 pass
jpayne@69 214 else:
jpayne@69 215 self._weakref = self._callback = self._args = \
jpayne@69 216 self._kwargs = self._key = None
jpayne@69 217
jpayne@69 218 def still_active(self):
jpayne@69 219 '''
jpayne@69 220 Return whether this finalizer is still waiting to invoke callback
jpayne@69 221 '''
jpayne@69 222 return self._key in _finalizer_registry
jpayne@69 223
jpayne@69 224 def __repr__(self):
jpayne@69 225 try:
jpayne@69 226 obj = self._weakref()
jpayne@69 227 except (AttributeError, TypeError):
jpayne@69 228 obj = None
jpayne@69 229
jpayne@69 230 if obj is None:
jpayne@69 231 return '<%s object, dead>' % self.__class__.__name__
jpayne@69 232
jpayne@69 233 x = '<%s object, callback=%s' % (
jpayne@69 234 self.__class__.__name__,
jpayne@69 235 getattr(self._callback, '__name__', self._callback))
jpayne@69 236 if self._args:
jpayne@69 237 x += ', args=' + str(self._args)
jpayne@69 238 if self._kwargs:
jpayne@69 239 x += ', kwargs=' + str(self._kwargs)
jpayne@69 240 if self._key[0] is not None:
jpayne@69 241 x += ', exitpriority=' + str(self._key[0])
jpayne@69 242 return x + '>'
jpayne@69 243
jpayne@69 244
jpayne@69 245 def _run_finalizers(minpriority=None):
jpayne@69 246 '''
jpayne@69 247 Run all finalizers whose exit priority is not None and at least minpriority
jpayne@69 248
jpayne@69 249 Finalizers with highest priority are called first; finalizers with
jpayne@69 250 the same priority will be called in reverse order of creation.
jpayne@69 251 '''
jpayne@69 252 if _finalizer_registry is None:
jpayne@69 253 # This function may be called after this module's globals are
jpayne@69 254 # destroyed. See the _exit_function function in this module for more
jpayne@69 255 # notes.
jpayne@69 256 return
jpayne@69 257
jpayne@69 258 if minpriority is None:
jpayne@69 259 f = lambda p : p[0] is not None
jpayne@69 260 else:
jpayne@69 261 f = lambda p : p[0] is not None and p[0] >= minpriority
jpayne@69 262
jpayne@69 263 # Careful: _finalizer_registry may be mutated while this function
jpayne@69 264 # is running (either by a GC run or by another thread).
jpayne@69 265
jpayne@69 266 # list(_finalizer_registry) should be atomic, while
jpayne@69 267 # list(_finalizer_registry.items()) is not.
jpayne@69 268 keys = [key for key in list(_finalizer_registry) if f(key)]
jpayne@69 269 keys.sort(reverse=True)
jpayne@69 270
jpayne@69 271 for key in keys:
jpayne@69 272 finalizer = _finalizer_registry.get(key)
jpayne@69 273 # key may have been removed from the registry
jpayne@69 274 if finalizer is not None:
jpayne@69 275 sub_debug('calling %s', finalizer)
jpayne@69 276 try:
jpayne@69 277 finalizer()
jpayne@69 278 except Exception:
jpayne@69 279 import traceback
jpayne@69 280 traceback.print_exc()
jpayne@69 281
jpayne@69 282 if minpriority is None:
jpayne@69 283 _finalizer_registry.clear()
jpayne@69 284
jpayne@69 285 #
jpayne@69 286 # Clean up on exit
jpayne@69 287 #
jpayne@69 288
jpayne@69 289 def is_exiting():
jpayne@69 290 '''
jpayne@69 291 Returns true if the process is shutting down
jpayne@69 292 '''
jpayne@69 293 return _exiting or _exiting is None
jpayne@69 294
jpayne@69 295 _exiting = False
jpayne@69 296
jpayne@69 297 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
jpayne@69 298 active_children=process.active_children,
jpayne@69 299 current_process=process.current_process):
jpayne@69 300 # We hold on to references to functions in the arglist due to the
jpayne@69 301 # situation described below, where this function is called after this
jpayne@69 302 # module's globals are destroyed.
jpayne@69 303
jpayne@69 304 global _exiting
jpayne@69 305
jpayne@69 306 if not _exiting:
jpayne@69 307 _exiting = True
jpayne@69 308
jpayne@69 309 info('process shutting down')
jpayne@69 310 debug('running all "atexit" finalizers with priority >= 0')
jpayne@69 311 _run_finalizers(0)
jpayne@69 312
jpayne@69 313 if current_process() is not None:
jpayne@69 314 # We check if the current process is None here because if
jpayne@69 315 # it's None, any call to ``active_children()`` will raise
jpayne@69 316 # an AttributeError (active_children winds up trying to
jpayne@69 317 # get attributes from util._current_process). One
jpayne@69 318 # situation where this can happen is if someone has
jpayne@69 319 # manipulated sys.modules, causing this module to be
jpayne@69 320 # garbage collected. The destructor for the module type
jpayne@69 321 # then replaces all values in the module dict with None.
jpayne@69 322 # For instance, after setuptools runs a test it replaces
jpayne@69 323 # sys.modules with a copy created earlier. See issues
jpayne@69 324 # #9775 and #15881. Also related: #4106, #9205, and
jpayne@69 325 # #9207.
jpayne@69 326
jpayne@69 327 for p in active_children():
jpayne@69 328 if p.daemon:
jpayne@69 329 info('calling terminate() for daemon %s', p.name)
jpayne@69 330 p._popen.terminate()
jpayne@69 331
jpayne@69 332 for p in active_children():
jpayne@69 333 info('calling join() for process %s', p.name)
jpayne@69 334 p.join()
jpayne@69 335
jpayne@69 336 debug('running the remaining "atexit" finalizers')
jpayne@69 337 _run_finalizers()
jpayne@69 338
jpayne@69 339 atexit.register(_exit_function)
jpayne@69 340
jpayne@69 341 #
jpayne@69 342 # Some fork aware types
jpayne@69 343 #
jpayne@69 344
jpayne@69 345 class ForkAwareThreadLock(object):
jpayne@69 346 def __init__(self):
jpayne@69 347 self._reset()
jpayne@69 348 register_after_fork(self, ForkAwareThreadLock._reset)
jpayne@69 349
jpayne@69 350 def _reset(self):
jpayne@69 351 self._lock = threading.Lock()
jpayne@69 352 self.acquire = self._lock.acquire
jpayne@69 353 self.release = self._lock.release
jpayne@69 354
jpayne@69 355 def __enter__(self):
jpayne@69 356 return self._lock.__enter__()
jpayne@69 357
jpayne@69 358 def __exit__(self, *args):
jpayne@69 359 return self._lock.__exit__(*args)
jpayne@69 360
jpayne@69 361
jpayne@69 362 class ForkAwareLocal(threading.local):
jpayne@69 363 def __init__(self):
jpayne@69 364 register_after_fork(self, lambda obj : obj.__dict__.clear())
jpayne@69 365 def __reduce__(self):
jpayne@69 366 return type(self), ()
jpayne@69 367
jpayne@69 368 #
jpayne@69 369 # Close fds except those specified
jpayne@69 370 #
jpayne@69 371
jpayne@69 372 try:
jpayne@69 373 MAXFD = os.sysconf("SC_OPEN_MAX")
jpayne@69 374 except Exception:
jpayne@69 375 MAXFD = 256
jpayne@69 376
jpayne@69 377 def close_all_fds_except(fds):
jpayne@69 378 fds = list(fds) + [-1, MAXFD]
jpayne@69 379 fds.sort()
jpayne@69 380 assert fds[-1] == MAXFD, 'fd too large'
jpayne@69 381 for i in range(len(fds) - 1):
jpayne@69 382 os.closerange(fds[i]+1, fds[i+1])
jpayne@69 383 #
jpayne@69 384 # Close sys.stdin and replace stdin with os.devnull
jpayne@69 385 #
jpayne@69 386
jpayne@69 387 def _close_stdin():
jpayne@69 388 if sys.stdin is None:
jpayne@69 389 return
jpayne@69 390
jpayne@69 391 try:
jpayne@69 392 sys.stdin.close()
jpayne@69 393 except (OSError, ValueError):
jpayne@69 394 pass
jpayne@69 395
jpayne@69 396 try:
jpayne@69 397 fd = os.open(os.devnull, os.O_RDONLY)
jpayne@69 398 try:
jpayne@69 399 sys.stdin = open(fd, closefd=False)
jpayne@69 400 except:
jpayne@69 401 os.close(fd)
jpayne@69 402 raise
jpayne@69 403 except (OSError, ValueError):
jpayne@69 404 pass
jpayne@69 405
jpayne@69 406 #
jpayne@69 407 # Flush standard streams, if any
jpayne@69 408 #
jpayne@69 409
jpayne@69 410 def _flush_std_streams():
jpayne@69 411 try:
jpayne@69 412 sys.stdout.flush()
jpayne@69 413 except (AttributeError, ValueError):
jpayne@69 414 pass
jpayne@69 415 try:
jpayne@69 416 sys.stderr.flush()
jpayne@69 417 except (AttributeError, ValueError):
jpayne@69 418 pass
jpayne@69 419
jpayne@69 420 #
jpayne@69 421 # Start a program with only specified fds kept open
jpayne@69 422 #
jpayne@69 423
jpayne@69 424 def spawnv_passfds(path, args, passfds):
jpayne@69 425 import _posixsubprocess
jpayne@69 426 passfds = tuple(sorted(map(int, passfds)))
jpayne@69 427 errpipe_read, errpipe_write = os.pipe()
jpayne@69 428 try:
jpayne@69 429 return _posixsubprocess.fork_exec(
jpayne@69 430 args, [os.fsencode(path)], True, passfds, None, None,
jpayne@69 431 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
jpayne@69 432 False, False, None)
jpayne@69 433 finally:
jpayne@69 434 os.close(errpipe_read)
jpayne@69 435 os.close(errpipe_write)
jpayne@69 436
jpayne@69 437
jpayne@69 438 def close_fds(*fds):
jpayne@69 439 """Close each file descriptor given as an argument"""
jpayne@69 440 for fd in fds:
jpayne@69 441 os.close(fd)
jpayne@69 442
jpayne@69 443
jpayne@69 444 def _cleanup_tests():
jpayne@69 445 """Cleanup multiprocessing resources when multiprocessing tests
jpayne@69 446 completed."""
jpayne@69 447
jpayne@69 448 from test import support
jpayne@69 449
jpayne@69 450 # cleanup multiprocessing
jpayne@69 451 process._cleanup()
jpayne@69 452
jpayne@69 453 # Stop the ForkServer process if it's running
jpayne@69 454 from multiprocessing import forkserver
jpayne@69 455 forkserver._forkserver._stop()
jpayne@69 456
jpayne@69 457 # Stop the ResourceTracker process if it's running
jpayne@69 458 from multiprocessing import resource_tracker
jpayne@69 459 resource_tracker._resource_tracker._stop()
jpayne@69 460
jpayne@69 461 # bpo-37421: Explicitly call _run_finalizers() to remove immediately
jpayne@69 462 # temporary directories created by multiprocessing.util.get_temp_dir().
jpayne@69 463 _run_finalizers()
jpayne@69 464 support.gc_collect()
jpayne@69 465
jpayne@69 466 support.reap_children()