annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/resource_tracker.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 ###############################################################################
jpayne@68 2 # Server process to keep track of unlinked resources (like shared memory
jpayne@68 3 # segments, semaphores etc.) and clean them.
jpayne@68 4 #
jpayne@68 5 # On Unix we run a server process which keeps track of unlinked
jpayne@68 6 # resources. The server ignores SIGINT and SIGTERM and reads from a
jpayne@68 7 # pipe. Every other process of the program has a copy of the writable
jpayne@68 8 # end of the pipe, so we get EOF when all other processes have exited.
jpayne@68 9 # Then the server process unlinks any remaining resource names.
jpayne@68 10 #
jpayne@68 11 # This is important because there may be system limits for such resources: for
jpayne@68 12 # instance, the system only supports a limited number of named semaphores, and
jpayne@68 13 # shared-memory segments live in the RAM. If a python process leaks such a
jpayne@68 14 # resource, this resource will not be removed till the next reboot. Without
jpayne@68 15 # this resource tracker process, "killall python" would probably leave unlinked
jpayne@68 16 # resources.
jpayne@68 17
jpayne@68 18 import os
jpayne@68 19 import signal
jpayne@68 20 import sys
jpayne@68 21 import threading
jpayne@68 22 import warnings
jpayne@68 23
jpayne@68 24 from . import spawn
jpayne@68 25 from . import util
jpayne@68 26
jpayne@68 27 __all__ = ['ensure_running', 'register', 'unregister']
jpayne@68 28
jpayne@68 29 _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
jpayne@68 30 _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
jpayne@68 31
jpayne@68 32 _CLEANUP_FUNCS = {
jpayne@68 33 'noop': lambda: None,
jpayne@68 34 }
jpayne@68 35
jpayne@68 36 if os.name == 'posix':
jpayne@68 37 import _multiprocessing
jpayne@68 38 import _posixshmem
jpayne@68 39
jpayne@68 40 _CLEANUP_FUNCS.update({
jpayne@68 41 'semaphore': _multiprocessing.sem_unlink,
jpayne@68 42 'shared_memory': _posixshmem.shm_unlink,
jpayne@68 43 })
jpayne@68 44
jpayne@68 45
jpayne@68 46 class ResourceTracker(object):
jpayne@68 47
jpayne@68 48 def __init__(self):
jpayne@68 49 self._lock = threading.Lock()
jpayne@68 50 self._fd = None
jpayne@68 51 self._pid = None
jpayne@68 52
jpayne@68 53 def _stop(self):
jpayne@68 54 with self._lock:
jpayne@68 55 if self._fd is None:
jpayne@68 56 # not running
jpayne@68 57 return
jpayne@68 58
jpayne@68 59 # closing the "alive" file descriptor stops main()
jpayne@68 60 os.close(self._fd)
jpayne@68 61 self._fd = None
jpayne@68 62
jpayne@68 63 os.waitpid(self._pid, 0)
jpayne@68 64 self._pid = None
jpayne@68 65
jpayne@68 66 def getfd(self):
jpayne@68 67 self.ensure_running()
jpayne@68 68 return self._fd
jpayne@68 69
jpayne@68 70 def ensure_running(self):
jpayne@68 71 '''Make sure that resource tracker process is running.
jpayne@68 72
jpayne@68 73 This can be run from any process. Usually a child process will use
jpayne@68 74 the resource created by its parent.'''
jpayne@68 75 with self._lock:
jpayne@68 76 if self._fd is not None:
jpayne@68 77 # resource tracker was launched before, is it still running?
jpayne@68 78 if self._check_alive():
jpayne@68 79 # => still alive
jpayne@68 80 return
jpayne@68 81 # => dead, launch it again
jpayne@68 82 os.close(self._fd)
jpayne@68 83
jpayne@68 84 # Clean-up to avoid dangling processes.
jpayne@68 85 try:
jpayne@68 86 # _pid can be None if this process is a child from another
jpayne@68 87 # python process, which has started the resource_tracker.
jpayne@68 88 if self._pid is not None:
jpayne@68 89 os.waitpid(self._pid, 0)
jpayne@68 90 except ChildProcessError:
jpayne@68 91 # The resource_tracker has already been terminated.
jpayne@68 92 pass
jpayne@68 93 self._fd = None
jpayne@68 94 self._pid = None
jpayne@68 95
jpayne@68 96 warnings.warn('resource_tracker: process died unexpectedly, '
jpayne@68 97 'relaunching. Some resources might leak.')
jpayne@68 98
jpayne@68 99 fds_to_pass = []
jpayne@68 100 try:
jpayne@68 101 fds_to_pass.append(sys.stderr.fileno())
jpayne@68 102 except Exception:
jpayne@68 103 pass
jpayne@68 104 cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
jpayne@68 105 r, w = os.pipe()
jpayne@68 106 try:
jpayne@68 107 fds_to_pass.append(r)
jpayne@68 108 # process will out live us, so no need to wait on pid
jpayne@68 109 exe = spawn.get_executable()
jpayne@68 110 args = [exe] + util._args_from_interpreter_flags()
jpayne@68 111 args += ['-c', cmd % r]
jpayne@68 112 # bpo-33613: Register a signal mask that will block the signals.
jpayne@68 113 # This signal mask will be inherited by the child that is going
jpayne@68 114 # to be spawned and will protect the child from a race condition
jpayne@68 115 # that can make the child die before it registers signal handlers
jpayne@68 116 # for SIGINT and SIGTERM. The mask is unregistered after spawning
jpayne@68 117 # the child.
jpayne@68 118 try:
jpayne@68 119 if _HAVE_SIGMASK:
jpayne@68 120 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
jpayne@68 121 pid = util.spawnv_passfds(exe, args, fds_to_pass)
jpayne@68 122 finally:
jpayne@68 123 if _HAVE_SIGMASK:
jpayne@68 124 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
jpayne@68 125 except:
jpayne@68 126 os.close(w)
jpayne@68 127 raise
jpayne@68 128 else:
jpayne@68 129 self._fd = w
jpayne@68 130 self._pid = pid
jpayne@68 131 finally:
jpayne@68 132 os.close(r)
jpayne@68 133
jpayne@68 134 def _check_alive(self):
jpayne@68 135 '''Check that the pipe has not been closed by sending a probe.'''
jpayne@68 136 try:
jpayne@68 137 # We cannot use send here as it calls ensure_running, creating
jpayne@68 138 # a cycle.
jpayne@68 139 os.write(self._fd, b'PROBE:0:noop\n')
jpayne@68 140 except OSError:
jpayne@68 141 return False
jpayne@68 142 else:
jpayne@68 143 return True
jpayne@68 144
jpayne@68 145 def register(self, name, rtype):
jpayne@68 146 '''Register name of resource with resource tracker.'''
jpayne@68 147 self._send('REGISTER', name, rtype)
jpayne@68 148
jpayne@68 149 def unregister(self, name, rtype):
jpayne@68 150 '''Unregister name of resource with resource tracker.'''
jpayne@68 151 self._send('UNREGISTER', name, rtype)
jpayne@68 152
jpayne@68 153 def _send(self, cmd, name, rtype):
jpayne@68 154 self.ensure_running()
jpayne@68 155 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
jpayne@68 156 if len(name) > 512:
jpayne@68 157 # posix guarantees that writes to a pipe of less than PIPE_BUF
jpayne@68 158 # bytes are atomic, and that PIPE_BUF >= 512
jpayne@68 159 raise ValueError('name too long')
jpayne@68 160 nbytes = os.write(self._fd, msg)
jpayne@68 161 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
jpayne@68 162 nbytes, len(msg))
jpayne@68 163
jpayne@68 164
jpayne@68 165 _resource_tracker = ResourceTracker()
jpayne@68 166 ensure_running = _resource_tracker.ensure_running
jpayne@68 167 register = _resource_tracker.register
jpayne@68 168 unregister = _resource_tracker.unregister
jpayne@68 169 getfd = _resource_tracker.getfd
jpayne@68 170
jpayne@68 171 def main(fd):
jpayne@68 172 '''Run resource tracker.'''
jpayne@68 173 # protect the process from ^C and "killall python" etc
jpayne@68 174 signal.signal(signal.SIGINT, signal.SIG_IGN)
jpayne@68 175 signal.signal(signal.SIGTERM, signal.SIG_IGN)
jpayne@68 176 if _HAVE_SIGMASK:
jpayne@68 177 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
jpayne@68 178
jpayne@68 179 for f in (sys.stdin, sys.stdout):
jpayne@68 180 try:
jpayne@68 181 f.close()
jpayne@68 182 except Exception:
jpayne@68 183 pass
jpayne@68 184
jpayne@68 185 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
jpayne@68 186 try:
jpayne@68 187 # keep track of registered/unregistered resources
jpayne@68 188 with open(fd, 'rb') as f:
jpayne@68 189 for line in f:
jpayne@68 190 try:
jpayne@68 191 cmd, name, rtype = line.strip().decode('ascii').split(':')
jpayne@68 192 cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
jpayne@68 193 if cleanup_func is None:
jpayne@68 194 raise ValueError(
jpayne@68 195 f'Cannot register {name} for automatic cleanup: '
jpayne@68 196 f'unknown resource type {rtype}')
jpayne@68 197
jpayne@68 198 if cmd == 'REGISTER':
jpayne@68 199 cache[rtype].add(name)
jpayne@68 200 elif cmd == 'UNREGISTER':
jpayne@68 201 cache[rtype].remove(name)
jpayne@68 202 elif cmd == 'PROBE':
jpayne@68 203 pass
jpayne@68 204 else:
jpayne@68 205 raise RuntimeError('unrecognized command %r' % cmd)
jpayne@68 206 except Exception:
jpayne@68 207 try:
jpayne@68 208 sys.excepthook(*sys.exc_info())
jpayne@68 209 except:
jpayne@68 210 pass
jpayne@68 211 finally:
jpayne@68 212 # all processes have terminated; cleanup any remaining resources
jpayne@68 213 for rtype, rtype_cache in cache.items():
jpayne@68 214 if rtype_cache:
jpayne@68 215 try:
jpayne@68 216 warnings.warn('resource_tracker: There appear to be %d '
jpayne@68 217 'leaked %s objects to clean up at shutdown' %
jpayne@68 218 (len(rtype_cache), rtype))
jpayne@68 219 except Exception:
jpayne@68 220 pass
jpayne@68 221 for name in rtype_cache:
jpayne@68 222 # For some reason the process which created and registered this
jpayne@68 223 # resource has failed to unregister it. Presumably it has
jpayne@68 224 # died. We therefore unlink it.
jpayne@68 225 try:
jpayne@68 226 try:
jpayne@68 227 _CLEANUP_FUNCS[rtype](name)
jpayne@68 228 except Exception as e:
jpayne@68 229 warnings.warn('resource_tracker: %r: %s' % (name, e))
jpayne@68 230 finally:
jpayne@68 231 pass