jpayne@68
|
1 import errno
|
jpayne@68
|
2 import os
|
jpayne@68
|
3 import selectors
|
jpayne@68
|
4 import signal
|
jpayne@68
|
5 import socket
|
jpayne@68
|
6 import struct
|
jpayne@68
|
7 import sys
|
jpayne@68
|
8 import threading
|
jpayne@68
|
9 import warnings
|
jpayne@68
|
10
|
jpayne@68
|
11 from . import connection
|
jpayne@68
|
12 from . import process
|
jpayne@68
|
13 from .context import reduction
|
jpayne@68
|
14 from . import resource_tracker
|
jpayne@68
|
15 from . import spawn
|
jpayne@68
|
16 from . import util
|
jpayne@68
|
17
|
jpayne@68
|
18 __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
|
jpayne@68
|
19 'set_forkserver_preload']
|
jpayne@68
|
20
|
jpayne@68
|
21 #
|
jpayne@68
|
22 #
|
jpayne@68
|
23 #
|
jpayne@68
|
24
|
jpayne@68
|
25 MAXFDS_TO_SEND = 256
|
jpayne@68
|
26 SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
|
jpayne@68
|
27
|
jpayne@68
|
28 #
|
jpayne@68
|
29 # Forkserver class
|
jpayne@68
|
30 #
|
jpayne@68
|
31
|
jpayne@68
|
32 class ForkServer(object):
|
jpayne@68
|
33
|
jpayne@68
|
34 def __init__(self):
|
jpayne@68
|
35 self._forkserver_address = None
|
jpayne@68
|
36 self._forkserver_alive_fd = None
|
jpayne@68
|
37 self._forkserver_pid = None
|
jpayne@68
|
38 self._inherited_fds = None
|
jpayne@68
|
39 self._lock = threading.Lock()
|
jpayne@68
|
40 self._preload_modules = ['__main__']
|
jpayne@68
|
41
|
jpayne@68
|
42 def _stop(self):
|
jpayne@68
|
43 # Method used by unit tests to stop the server
|
jpayne@68
|
44 with self._lock:
|
jpayne@68
|
45 self._stop_unlocked()
|
jpayne@68
|
46
|
jpayne@68
|
47 def _stop_unlocked(self):
|
jpayne@68
|
48 if self._forkserver_pid is None:
|
jpayne@68
|
49 return
|
jpayne@68
|
50
|
jpayne@68
|
51 # close the "alive" file descriptor asks the server to stop
|
jpayne@68
|
52 os.close(self._forkserver_alive_fd)
|
jpayne@68
|
53 self._forkserver_alive_fd = None
|
jpayne@68
|
54
|
jpayne@68
|
55 os.waitpid(self._forkserver_pid, 0)
|
jpayne@68
|
56 self._forkserver_pid = None
|
jpayne@68
|
57
|
jpayne@68
|
58 os.unlink(self._forkserver_address)
|
jpayne@68
|
59 self._forkserver_address = None
|
jpayne@68
|
60
|
jpayne@68
|
61 def set_forkserver_preload(self, modules_names):
|
jpayne@68
|
62 '''Set list of module names to try to load in forkserver process.'''
|
jpayne@68
|
63 if not all(type(mod) is str for mod in self._preload_modules):
|
jpayne@68
|
64 raise TypeError('module_names must be a list of strings')
|
jpayne@68
|
65 self._preload_modules = modules_names
|
jpayne@68
|
66
|
jpayne@68
|
67 def get_inherited_fds(self):
|
jpayne@68
|
68 '''Return list of fds inherited from parent process.
|
jpayne@68
|
69
|
jpayne@68
|
70 This returns None if the current process was not started by fork
|
jpayne@68
|
71 server.
|
jpayne@68
|
72 '''
|
jpayne@68
|
73 return self._inherited_fds
|
jpayne@68
|
74
|
jpayne@68
|
75 def connect_to_new_process(self, fds):
|
jpayne@68
|
76 '''Request forkserver to create a child process.
|
jpayne@68
|
77
|
jpayne@68
|
78 Returns a pair of fds (status_r, data_w). The calling process can read
|
jpayne@68
|
79 the child process's pid and (eventually) its returncode from status_r.
|
jpayne@68
|
80 The calling process should write to data_w the pickled preparation and
|
jpayne@68
|
81 process data.
|
jpayne@68
|
82 '''
|
jpayne@68
|
83 self.ensure_running()
|
jpayne@68
|
84 if len(fds) + 4 >= MAXFDS_TO_SEND:
|
jpayne@68
|
85 raise ValueError('too many fds')
|
jpayne@68
|
86 with socket.socket(socket.AF_UNIX) as client:
|
jpayne@68
|
87 client.connect(self._forkserver_address)
|
jpayne@68
|
88 parent_r, child_w = os.pipe()
|
jpayne@68
|
89 child_r, parent_w = os.pipe()
|
jpayne@68
|
90 allfds = [child_r, child_w, self._forkserver_alive_fd,
|
jpayne@68
|
91 resource_tracker.getfd()]
|
jpayne@68
|
92 allfds += fds
|
jpayne@68
|
93 try:
|
jpayne@68
|
94 reduction.sendfds(client, allfds)
|
jpayne@68
|
95 return parent_r, parent_w
|
jpayne@68
|
96 except:
|
jpayne@68
|
97 os.close(parent_r)
|
jpayne@68
|
98 os.close(parent_w)
|
jpayne@68
|
99 raise
|
jpayne@68
|
100 finally:
|
jpayne@68
|
101 os.close(child_r)
|
jpayne@68
|
102 os.close(child_w)
|
jpayne@68
|
103
|
jpayne@68
|
104 def ensure_running(self):
|
jpayne@68
|
105 '''Make sure that a fork server is running.
|
jpayne@68
|
106
|
jpayne@68
|
107 This can be called from any process. Note that usually a child
|
jpayne@68
|
108 process will just reuse the forkserver started by its parent, so
|
jpayne@68
|
109 ensure_running() will do nothing.
|
jpayne@68
|
110 '''
|
jpayne@68
|
111 with self._lock:
|
jpayne@68
|
112 resource_tracker.ensure_running()
|
jpayne@68
|
113 if self._forkserver_pid is not None:
|
jpayne@68
|
114 # forkserver was launched before, is it still running?
|
jpayne@68
|
115 pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
|
jpayne@68
|
116 if not pid:
|
jpayne@68
|
117 # still alive
|
jpayne@68
|
118 return
|
jpayne@68
|
119 # dead, launch it again
|
jpayne@68
|
120 os.close(self._forkserver_alive_fd)
|
jpayne@68
|
121 self._forkserver_address = None
|
jpayne@68
|
122 self._forkserver_alive_fd = None
|
jpayne@68
|
123 self._forkserver_pid = None
|
jpayne@68
|
124
|
jpayne@68
|
125 cmd = ('from multiprocessing.forkserver import main; ' +
|
jpayne@68
|
126 'main(%d, %d, %r, **%r)')
|
jpayne@68
|
127
|
jpayne@68
|
128 if self._preload_modules:
|
jpayne@68
|
129 desired_keys = {'main_path', 'sys_path'}
|
jpayne@68
|
130 data = spawn.get_preparation_data('ignore')
|
jpayne@68
|
131 data = {x: y for x, y in data.items() if x in desired_keys}
|
jpayne@68
|
132 else:
|
jpayne@68
|
133 data = {}
|
jpayne@68
|
134
|
jpayne@68
|
135 with socket.socket(socket.AF_UNIX) as listener:
|
jpayne@68
|
136 address = connection.arbitrary_address('AF_UNIX')
|
jpayne@68
|
137 listener.bind(address)
|
jpayne@68
|
138 os.chmod(address, 0o600)
|
jpayne@68
|
139 listener.listen()
|
jpayne@68
|
140
|
jpayne@68
|
141 # all client processes own the write end of the "alive" pipe;
|
jpayne@68
|
142 # when they all terminate the read end becomes ready.
|
jpayne@68
|
143 alive_r, alive_w = os.pipe()
|
jpayne@68
|
144 try:
|
jpayne@68
|
145 fds_to_pass = [listener.fileno(), alive_r]
|
jpayne@68
|
146 cmd %= (listener.fileno(), alive_r, self._preload_modules,
|
jpayne@68
|
147 data)
|
jpayne@68
|
148 exe = spawn.get_executable()
|
jpayne@68
|
149 args = [exe] + util._args_from_interpreter_flags()
|
jpayne@68
|
150 args += ['-c', cmd]
|
jpayne@68
|
151 pid = util.spawnv_passfds(exe, args, fds_to_pass)
|
jpayne@68
|
152 except:
|
jpayne@68
|
153 os.close(alive_w)
|
jpayne@68
|
154 raise
|
jpayne@68
|
155 finally:
|
jpayne@68
|
156 os.close(alive_r)
|
jpayne@68
|
157 self._forkserver_address = address
|
jpayne@68
|
158 self._forkserver_alive_fd = alive_w
|
jpayne@68
|
159 self._forkserver_pid = pid
|
jpayne@68
|
160
|
jpayne@68
|
161 #
|
jpayne@68
|
162 #
|
jpayne@68
|
163 #
|
jpayne@68
|
164
|
jpayne@68
|
165 def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
|
jpayne@68
|
166 '''Run forkserver.'''
|
jpayne@68
|
167 if preload:
|
jpayne@68
|
168 if '__main__' in preload and main_path is not None:
|
jpayne@68
|
169 process.current_process()._inheriting = True
|
jpayne@68
|
170 try:
|
jpayne@68
|
171 spawn.import_main_path(main_path)
|
jpayne@68
|
172 finally:
|
jpayne@68
|
173 del process.current_process()._inheriting
|
jpayne@68
|
174 for modname in preload:
|
jpayne@68
|
175 try:
|
jpayne@68
|
176 __import__(modname)
|
jpayne@68
|
177 except ImportError:
|
jpayne@68
|
178 pass
|
jpayne@68
|
179
|
jpayne@68
|
180 util._close_stdin()
|
jpayne@68
|
181
|
jpayne@68
|
182 sig_r, sig_w = os.pipe()
|
jpayne@68
|
183 os.set_blocking(sig_r, False)
|
jpayne@68
|
184 os.set_blocking(sig_w, False)
|
jpayne@68
|
185
|
jpayne@68
|
186 def sigchld_handler(*_unused):
|
jpayne@68
|
187 # Dummy signal handler, doesn't do anything
|
jpayne@68
|
188 pass
|
jpayne@68
|
189
|
jpayne@68
|
190 handlers = {
|
jpayne@68
|
191 # unblocking SIGCHLD allows the wakeup fd to notify our event loop
|
jpayne@68
|
192 signal.SIGCHLD: sigchld_handler,
|
jpayne@68
|
193 # protect the process from ^C
|
jpayne@68
|
194 signal.SIGINT: signal.SIG_IGN,
|
jpayne@68
|
195 }
|
jpayne@68
|
196 old_handlers = {sig: signal.signal(sig, val)
|
jpayne@68
|
197 for (sig, val) in handlers.items()}
|
jpayne@68
|
198
|
jpayne@68
|
199 # calling os.write() in the Python signal handler is racy
|
jpayne@68
|
200 signal.set_wakeup_fd(sig_w)
|
jpayne@68
|
201
|
jpayne@68
|
202 # map child pids to client fds
|
jpayne@68
|
203 pid_to_fd = {}
|
jpayne@68
|
204
|
jpayne@68
|
205 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
|
jpayne@68
|
206 selectors.DefaultSelector() as selector:
|
jpayne@68
|
207 _forkserver._forkserver_address = listener.getsockname()
|
jpayne@68
|
208
|
jpayne@68
|
209 selector.register(listener, selectors.EVENT_READ)
|
jpayne@68
|
210 selector.register(alive_r, selectors.EVENT_READ)
|
jpayne@68
|
211 selector.register(sig_r, selectors.EVENT_READ)
|
jpayne@68
|
212
|
jpayne@68
|
213 while True:
|
jpayne@68
|
214 try:
|
jpayne@68
|
215 while True:
|
jpayne@68
|
216 rfds = [key.fileobj for (key, events) in selector.select()]
|
jpayne@68
|
217 if rfds:
|
jpayne@68
|
218 break
|
jpayne@68
|
219
|
jpayne@68
|
220 if alive_r in rfds:
|
jpayne@68
|
221 # EOF because no more client processes left
|
jpayne@68
|
222 assert os.read(alive_r, 1) == b'', "Not at EOF?"
|
jpayne@68
|
223 raise SystemExit
|
jpayne@68
|
224
|
jpayne@68
|
225 if sig_r in rfds:
|
jpayne@68
|
226 # Got SIGCHLD
|
jpayne@68
|
227 os.read(sig_r, 65536) # exhaust
|
jpayne@68
|
228 while True:
|
jpayne@68
|
229 # Scan for child processes
|
jpayne@68
|
230 try:
|
jpayne@68
|
231 pid, sts = os.waitpid(-1, os.WNOHANG)
|
jpayne@68
|
232 except ChildProcessError:
|
jpayne@68
|
233 break
|
jpayne@68
|
234 if pid == 0:
|
jpayne@68
|
235 break
|
jpayne@68
|
236 child_w = pid_to_fd.pop(pid, None)
|
jpayne@68
|
237 if child_w is not None:
|
jpayne@68
|
238 if os.WIFSIGNALED(sts):
|
jpayne@68
|
239 returncode = -os.WTERMSIG(sts)
|
jpayne@68
|
240 else:
|
jpayne@68
|
241 if not os.WIFEXITED(sts):
|
jpayne@68
|
242 raise AssertionError(
|
jpayne@68
|
243 "Child {0:n} status is {1:n}".format(
|
jpayne@68
|
244 pid,sts))
|
jpayne@68
|
245 returncode = os.WEXITSTATUS(sts)
|
jpayne@68
|
246 # Send exit code to client process
|
jpayne@68
|
247 try:
|
jpayne@68
|
248 write_signed(child_w, returncode)
|
jpayne@68
|
249 except BrokenPipeError:
|
jpayne@68
|
250 # client vanished
|
jpayne@68
|
251 pass
|
jpayne@68
|
252 os.close(child_w)
|
jpayne@68
|
253 else:
|
jpayne@68
|
254 # This shouldn't happen really
|
jpayne@68
|
255 warnings.warn('forkserver: waitpid returned '
|
jpayne@68
|
256 'unexpected pid %d' % pid)
|
jpayne@68
|
257
|
jpayne@68
|
258 if listener in rfds:
|
jpayne@68
|
259 # Incoming fork request
|
jpayne@68
|
260 with listener.accept()[0] as s:
|
jpayne@68
|
261 # Receive fds from client
|
jpayne@68
|
262 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
|
jpayne@68
|
263 if len(fds) > MAXFDS_TO_SEND:
|
jpayne@68
|
264 raise RuntimeError(
|
jpayne@68
|
265 "Too many ({0:n}) fds to send".format(
|
jpayne@68
|
266 len(fds)))
|
jpayne@68
|
267 child_r, child_w, *fds = fds
|
jpayne@68
|
268 s.close()
|
jpayne@68
|
269 pid = os.fork()
|
jpayne@68
|
270 if pid == 0:
|
jpayne@68
|
271 # Child
|
jpayne@68
|
272 code = 1
|
jpayne@68
|
273 try:
|
jpayne@68
|
274 listener.close()
|
jpayne@68
|
275 selector.close()
|
jpayne@68
|
276 unused_fds = [alive_r, child_w, sig_r, sig_w]
|
jpayne@68
|
277 unused_fds.extend(pid_to_fd.values())
|
jpayne@68
|
278 code = _serve_one(child_r, fds,
|
jpayne@68
|
279 unused_fds,
|
jpayne@68
|
280 old_handlers)
|
jpayne@68
|
281 except Exception:
|
jpayne@68
|
282 sys.excepthook(*sys.exc_info())
|
jpayne@68
|
283 sys.stderr.flush()
|
jpayne@68
|
284 finally:
|
jpayne@68
|
285 os._exit(code)
|
jpayne@68
|
286 else:
|
jpayne@68
|
287 # Send pid to client process
|
jpayne@68
|
288 try:
|
jpayne@68
|
289 write_signed(child_w, pid)
|
jpayne@68
|
290 except BrokenPipeError:
|
jpayne@68
|
291 # client vanished
|
jpayne@68
|
292 pass
|
jpayne@68
|
293 pid_to_fd[pid] = child_w
|
jpayne@68
|
294 os.close(child_r)
|
jpayne@68
|
295 for fd in fds:
|
jpayne@68
|
296 os.close(fd)
|
jpayne@68
|
297
|
jpayne@68
|
298 except OSError as e:
|
jpayne@68
|
299 if e.errno != errno.ECONNABORTED:
|
jpayne@68
|
300 raise
|
jpayne@68
|
301
|
jpayne@68
|
302
|
jpayne@68
|
303 def _serve_one(child_r, fds, unused_fds, handlers):
|
jpayne@68
|
304 # close unnecessary stuff and reset signal handlers
|
jpayne@68
|
305 signal.set_wakeup_fd(-1)
|
jpayne@68
|
306 for sig, val in handlers.items():
|
jpayne@68
|
307 signal.signal(sig, val)
|
jpayne@68
|
308 for fd in unused_fds:
|
jpayne@68
|
309 os.close(fd)
|
jpayne@68
|
310
|
jpayne@68
|
311 (_forkserver._forkserver_alive_fd,
|
jpayne@68
|
312 resource_tracker._resource_tracker._fd,
|
jpayne@68
|
313 *_forkserver._inherited_fds) = fds
|
jpayne@68
|
314
|
jpayne@68
|
315 # Run process object received over pipe
|
jpayne@68
|
316 parent_sentinel = os.dup(child_r)
|
jpayne@68
|
317 code = spawn._main(child_r, parent_sentinel)
|
jpayne@68
|
318
|
jpayne@68
|
319 return code
|
jpayne@68
|
320
|
jpayne@68
|
321
|
jpayne@68
|
322 #
|
jpayne@68
|
323 # Read and write signed numbers
|
jpayne@68
|
324 #
|
jpayne@68
|
325
|
jpayne@68
|
326 def read_signed(fd):
|
jpayne@68
|
327 data = b''
|
jpayne@68
|
328 length = SIGNED_STRUCT.size
|
jpayne@68
|
329 while len(data) < length:
|
jpayne@68
|
330 s = os.read(fd, length - len(data))
|
jpayne@68
|
331 if not s:
|
jpayne@68
|
332 raise EOFError('unexpected EOF')
|
jpayne@68
|
333 data += s
|
jpayne@68
|
334 return SIGNED_STRUCT.unpack(data)[0]
|
jpayne@68
|
335
|
jpayne@68
|
336 def write_signed(fd, n):
|
jpayne@68
|
337 msg = SIGNED_STRUCT.pack(n)
|
jpayne@68
|
338 while msg:
|
jpayne@68
|
339 nbytes = os.write(fd, msg)
|
jpayne@68
|
340 if nbytes == 0:
|
jpayne@68
|
341 raise RuntimeError('should not get here')
|
jpayne@68
|
342 msg = msg[nbytes:]
|
jpayne@68
|
343
|
jpayne@68
|
344 #
|
jpayne@68
|
345 #
|
jpayne@68
|
346 #
|
jpayne@68
|
347
|
jpayne@68
|
348 _forkserver = ForkServer()
|
jpayne@68
|
349 ensure_running = _forkserver.ensure_running
|
jpayne@68
|
350 get_inherited_fds = _forkserver.get_inherited_fds
|
jpayne@68
|
351 connect_to_new_process = _forkserver.connect_to_new_process
|
jpayne@68
|
352 set_forkserver_preload = _forkserver.set_forkserver_preload
|