jpayne@68
|
1 ###############################################################################
|
jpayne@68
|
2 # Server process to keep track of unlinked resources (like shared memory
|
jpayne@68
|
3 # segments, semaphores etc.) and clean them.
|
jpayne@68
|
4 #
|
jpayne@68
|
5 # On Unix we run a server process which keeps track of unlinked
|
jpayne@68
|
6 # resources. The server ignores SIGINT and SIGTERM and reads from a
|
jpayne@68
|
7 # pipe. Every other process of the program has a copy of the writable
|
jpayne@68
|
8 # end of the pipe, so we get EOF when all other processes have exited.
|
jpayne@68
|
9 # Then the server process unlinks any remaining resource names.
|
jpayne@68
|
10 #
|
jpayne@68
|
11 # This is important because there may be system limits for such resources: for
|
jpayne@68
|
12 # instance, the system only supports a limited number of named semaphores, and
|
jpayne@68
|
13 # shared-memory segments live in the RAM. If a python process leaks such a
|
jpayne@68
|
14 # resource, this resource will not be removed till the next reboot. Without
|
jpayne@68
|
15 # this resource tracker process, "killall python" would probably leave unlinked
|
jpayne@68
|
16 # resources.
|
jpayne@68
|
17
|
jpayne@68
|
18 import os
|
jpayne@68
|
19 import signal
|
jpayne@68
|
20 import sys
|
jpayne@68
|
21 import threading
|
jpayne@68
|
22 import warnings
|
jpayne@68
|
23
|
jpayne@68
|
24 from . import spawn
|
jpayne@68
|
25 from . import util
|
jpayne@68
|
26
|
jpayne@68
|
27 __all__ = ['ensure_running', 'register', 'unregister']
|
jpayne@68
|
28
|
jpayne@68
|
29 _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
|
jpayne@68
|
30 _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
jpayne@68
|
31
|
jpayne@68
|
32 _CLEANUP_FUNCS = {
|
jpayne@68
|
33 'noop': lambda: None,
|
jpayne@68
|
34 }
|
jpayne@68
|
35
|
jpayne@68
|
36 if os.name == 'posix':
|
jpayne@68
|
37 import _multiprocessing
|
jpayne@68
|
38 import _posixshmem
|
jpayne@68
|
39
|
jpayne@68
|
40 _CLEANUP_FUNCS.update({
|
jpayne@68
|
41 'semaphore': _multiprocessing.sem_unlink,
|
jpayne@68
|
42 'shared_memory': _posixshmem.shm_unlink,
|
jpayne@68
|
43 })
|
jpayne@68
|
44
|
jpayne@68
|
45
|
jpayne@68
|
46 class ResourceTracker(object):
|
jpayne@68
|
47
|
jpayne@68
|
48 def __init__(self):
|
jpayne@68
|
49 self._lock = threading.Lock()
|
jpayne@68
|
50 self._fd = None
|
jpayne@68
|
51 self._pid = None
|
jpayne@68
|
52
|
jpayne@68
|
53 def _stop(self):
|
jpayne@68
|
54 with self._lock:
|
jpayne@68
|
55 if self._fd is None:
|
jpayne@68
|
56 # not running
|
jpayne@68
|
57 return
|
jpayne@68
|
58
|
jpayne@68
|
59 # closing the "alive" file descriptor stops main()
|
jpayne@68
|
60 os.close(self._fd)
|
jpayne@68
|
61 self._fd = None
|
jpayne@68
|
62
|
jpayne@68
|
63 os.waitpid(self._pid, 0)
|
jpayne@68
|
64 self._pid = None
|
jpayne@68
|
65
|
jpayne@68
|
66 def getfd(self):
|
jpayne@68
|
67 self.ensure_running()
|
jpayne@68
|
68 return self._fd
|
jpayne@68
|
69
|
jpayne@68
|
70 def ensure_running(self):
|
jpayne@68
|
71 '''Make sure that resource tracker process is running.
|
jpayne@68
|
72
|
jpayne@68
|
73 This can be run from any process. Usually a child process will use
|
jpayne@68
|
74 the resource created by its parent.'''
|
jpayne@68
|
75 with self._lock:
|
jpayne@68
|
76 if self._fd is not None:
|
jpayne@68
|
77 # resource tracker was launched before, is it still running?
|
jpayne@68
|
78 if self._check_alive():
|
jpayne@68
|
79 # => still alive
|
jpayne@68
|
80 return
|
jpayne@68
|
81 # => dead, launch it again
|
jpayne@68
|
82 os.close(self._fd)
|
jpayne@68
|
83
|
jpayne@68
|
84 # Clean-up to avoid dangling processes.
|
jpayne@68
|
85 try:
|
jpayne@68
|
86 # _pid can be None if this process is a child from another
|
jpayne@68
|
87 # python process, which has started the resource_tracker.
|
jpayne@68
|
88 if self._pid is not None:
|
jpayne@68
|
89 os.waitpid(self._pid, 0)
|
jpayne@68
|
90 except ChildProcessError:
|
jpayne@68
|
91 # The resource_tracker has already been terminated.
|
jpayne@68
|
92 pass
|
jpayne@68
|
93 self._fd = None
|
jpayne@68
|
94 self._pid = None
|
jpayne@68
|
95
|
jpayne@68
|
96 warnings.warn('resource_tracker: process died unexpectedly, '
|
jpayne@68
|
97 'relaunching. Some resources might leak.')
|
jpayne@68
|
98
|
jpayne@68
|
99 fds_to_pass = []
|
jpayne@68
|
100 try:
|
jpayne@68
|
101 fds_to_pass.append(sys.stderr.fileno())
|
jpayne@68
|
102 except Exception:
|
jpayne@68
|
103 pass
|
jpayne@68
|
104 cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
|
jpayne@68
|
105 r, w = os.pipe()
|
jpayne@68
|
106 try:
|
jpayne@68
|
107 fds_to_pass.append(r)
|
jpayne@68
|
108 # process will out live us, so no need to wait on pid
|
jpayne@68
|
109 exe = spawn.get_executable()
|
jpayne@68
|
110 args = [exe] + util._args_from_interpreter_flags()
|
jpayne@68
|
111 args += ['-c', cmd % r]
|
jpayne@68
|
112 # bpo-33613: Register a signal mask that will block the signals.
|
jpayne@68
|
113 # This signal mask will be inherited by the child that is going
|
jpayne@68
|
114 # to be spawned and will protect the child from a race condition
|
jpayne@68
|
115 # that can make the child die before it registers signal handlers
|
jpayne@68
|
116 # for SIGINT and SIGTERM. The mask is unregistered after spawning
|
jpayne@68
|
117 # the child.
|
jpayne@68
|
118 try:
|
jpayne@68
|
119 if _HAVE_SIGMASK:
|
jpayne@68
|
120 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
|
jpayne@68
|
121 pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
jpayne@68
|
122 finally:
|
jpayne@68
|
123 if _HAVE_SIGMASK:
|
jpayne@68
|
124 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
|
jpayne@68
|
125 except:
|
jpayne@68
|
126 os.close(w)
|
jpayne@68
|
127 raise
|
jpayne@68
|
128 else:
|
jpayne@68
|
129 self._fd = w
|
jpayne@68
|
130 self._pid = pid
|
jpayne@68
|
131 finally:
|
jpayne@68
|
132 os.close(r)
|
jpayne@68
|
133
|
jpayne@68
|
134 def _check_alive(self):
|
jpayne@68
|
135 '''Check that the pipe has not been closed by sending a probe.'''
|
jpayne@68
|
136 try:
|
jpayne@68
|
137 # We cannot use send here as it calls ensure_running, creating
|
jpayne@68
|
138 # a cycle.
|
jpayne@68
|
139 os.write(self._fd, b'PROBE:0:noop\n')
|
jpayne@68
|
140 except OSError:
|
jpayne@68
|
141 return False
|
jpayne@68
|
142 else:
|
jpayne@68
|
143 return True
|
jpayne@68
|
144
|
jpayne@68
|
145 def register(self, name, rtype):
|
jpayne@68
|
146 '''Register name of resource with resource tracker.'''
|
jpayne@68
|
147 self._send('REGISTER', name, rtype)
|
jpayne@68
|
148
|
jpayne@68
|
149 def unregister(self, name, rtype):
|
jpayne@68
|
150 '''Unregister name of resource with resource tracker.'''
|
jpayne@68
|
151 self._send('UNREGISTER', name, rtype)
|
jpayne@68
|
152
|
jpayne@68
|
153 def _send(self, cmd, name, rtype):
|
jpayne@68
|
154 self.ensure_running()
|
jpayne@68
|
155 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
|
jpayne@68
|
156 if len(name) > 512:
|
jpayne@68
|
157 # posix guarantees that writes to a pipe of less than PIPE_BUF
|
jpayne@68
|
158 # bytes are atomic, and that PIPE_BUF >= 512
|
jpayne@68
|
159 raise ValueError('name too long')
|
jpayne@68
|
160 nbytes = os.write(self._fd, msg)
|
jpayne@68
|
161 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
|
jpayne@68
|
162 nbytes, len(msg))
|
jpayne@68
|
163
|
jpayne@68
|
164
|
jpayne@68
|
165 _resource_tracker = ResourceTracker()
|
jpayne@68
|
166 ensure_running = _resource_tracker.ensure_running
|
jpayne@68
|
167 register = _resource_tracker.register
|
jpayne@68
|
168 unregister = _resource_tracker.unregister
|
jpayne@68
|
169 getfd = _resource_tracker.getfd
|
jpayne@68
|
170
|
jpayne@68
|
171 def main(fd):
|
jpayne@68
|
172 '''Run resource tracker.'''
|
jpayne@68
|
173 # protect the process from ^C and "killall python" etc
|
jpayne@68
|
174 signal.signal(signal.SIGINT, signal.SIG_IGN)
|
jpayne@68
|
175 signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
jpayne@68
|
176 if _HAVE_SIGMASK:
|
jpayne@68
|
177 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
|
jpayne@68
|
178
|
jpayne@68
|
179 for f in (sys.stdin, sys.stdout):
|
jpayne@68
|
180 try:
|
jpayne@68
|
181 f.close()
|
jpayne@68
|
182 except Exception:
|
jpayne@68
|
183 pass
|
jpayne@68
|
184
|
jpayne@68
|
185 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
|
jpayne@68
|
186 try:
|
jpayne@68
|
187 # keep track of registered/unregistered resources
|
jpayne@68
|
188 with open(fd, 'rb') as f:
|
jpayne@68
|
189 for line in f:
|
jpayne@68
|
190 try:
|
jpayne@68
|
191 cmd, name, rtype = line.strip().decode('ascii').split(':')
|
jpayne@68
|
192 cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
|
jpayne@68
|
193 if cleanup_func is None:
|
jpayne@68
|
194 raise ValueError(
|
jpayne@68
|
195 f'Cannot register {name} for automatic cleanup: '
|
jpayne@68
|
196 f'unknown resource type {rtype}')
|
jpayne@68
|
197
|
jpayne@68
|
198 if cmd == 'REGISTER':
|
jpayne@68
|
199 cache[rtype].add(name)
|
jpayne@68
|
200 elif cmd == 'UNREGISTER':
|
jpayne@68
|
201 cache[rtype].remove(name)
|
jpayne@68
|
202 elif cmd == 'PROBE':
|
jpayne@68
|
203 pass
|
jpayne@68
|
204 else:
|
jpayne@68
|
205 raise RuntimeError('unrecognized command %r' % cmd)
|
jpayne@68
|
206 except Exception:
|
jpayne@68
|
207 try:
|
jpayne@68
|
208 sys.excepthook(*sys.exc_info())
|
jpayne@68
|
209 except:
|
jpayne@68
|
210 pass
|
jpayne@68
|
211 finally:
|
jpayne@68
|
212 # all processes have terminated; cleanup any remaining resources
|
jpayne@68
|
213 for rtype, rtype_cache in cache.items():
|
jpayne@68
|
214 if rtype_cache:
|
jpayne@68
|
215 try:
|
jpayne@68
|
216 warnings.warn('resource_tracker: There appear to be %d '
|
jpayne@68
|
217 'leaked %s objects to clean up at shutdown' %
|
jpayne@68
|
218 (len(rtype_cache), rtype))
|
jpayne@68
|
219 except Exception:
|
jpayne@68
|
220 pass
|
jpayne@68
|
221 for name in rtype_cache:
|
jpayne@68
|
222 # For some reason the process which created and registered this
|
jpayne@68
|
223 # resource has failed to unregister it. Presumably it has
|
jpayne@68
|
224 # died. We therefore unlink it.
|
jpayne@68
|
225 try:
|
jpayne@68
|
226 try:
|
jpayne@68
|
227 _CLEANUP_FUNCS[rtype](name)
|
jpayne@68
|
228 except Exception as e:
|
jpayne@68
|
229 warnings.warn('resource_tracker: %r: %s' % (name, e))
|
jpayne@68
|
230 finally:
|
jpayne@68
|
231 pass
|