annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/forkserver.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 import errno
jpayne@68 2 import os
jpayne@68 3 import selectors
jpayne@68 4 import signal
jpayne@68 5 import socket
jpayne@68 6 import struct
jpayne@68 7 import sys
jpayne@68 8 import threading
jpayne@68 9 import warnings
jpayne@68 10
jpayne@68 11 from . import connection
jpayne@68 12 from . import process
jpayne@68 13 from .context import reduction
jpayne@68 14 from . import resource_tracker
jpayne@68 15 from . import spawn
jpayne@68 16 from . import util
jpayne@68 17
jpayne@68 18 __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
jpayne@68 19 'set_forkserver_preload']
jpayne@68 20
jpayne@68 21 #
jpayne@68 22 #
jpayne@68 23 #
jpayne@68 24
jpayne@68 25 MAXFDS_TO_SEND = 256
jpayne@68 26 SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
jpayne@68 27
jpayne@68 28 #
jpayne@68 29 # Forkserver class
jpayne@68 30 #
jpayne@68 31
jpayne@68 32 class ForkServer(object):
jpayne@68 33
jpayne@68 34 def __init__(self):
jpayne@68 35 self._forkserver_address = None
jpayne@68 36 self._forkserver_alive_fd = None
jpayne@68 37 self._forkserver_pid = None
jpayne@68 38 self._inherited_fds = None
jpayne@68 39 self._lock = threading.Lock()
jpayne@68 40 self._preload_modules = ['__main__']
jpayne@68 41
jpayne@68 42 def _stop(self):
jpayne@68 43 # Method used by unit tests to stop the server
jpayne@68 44 with self._lock:
jpayne@68 45 self._stop_unlocked()
jpayne@68 46
jpayne@68 47 def _stop_unlocked(self):
jpayne@68 48 if self._forkserver_pid is None:
jpayne@68 49 return
jpayne@68 50
jpayne@68 51 # close the "alive" file descriptor asks the server to stop
jpayne@68 52 os.close(self._forkserver_alive_fd)
jpayne@68 53 self._forkserver_alive_fd = None
jpayne@68 54
jpayne@68 55 os.waitpid(self._forkserver_pid, 0)
jpayne@68 56 self._forkserver_pid = None
jpayne@68 57
jpayne@68 58 os.unlink(self._forkserver_address)
jpayne@68 59 self._forkserver_address = None
jpayne@68 60
jpayne@68 61 def set_forkserver_preload(self, modules_names):
jpayne@68 62 '''Set list of module names to try to load in forkserver process.'''
jpayne@68 63 if not all(type(mod) is str for mod in self._preload_modules):
jpayne@68 64 raise TypeError('module_names must be a list of strings')
jpayne@68 65 self._preload_modules = modules_names
jpayne@68 66
jpayne@68 67 def get_inherited_fds(self):
jpayne@68 68 '''Return list of fds inherited from parent process.
jpayne@68 69
jpayne@68 70 This returns None if the current process was not started by fork
jpayne@68 71 server.
jpayne@68 72 '''
jpayne@68 73 return self._inherited_fds
jpayne@68 74
jpayne@68 75 def connect_to_new_process(self, fds):
jpayne@68 76 '''Request forkserver to create a child process.
jpayne@68 77
jpayne@68 78 Returns a pair of fds (status_r, data_w). The calling process can read
jpayne@68 79 the child process's pid and (eventually) its returncode from status_r.
jpayne@68 80 The calling process should write to data_w the pickled preparation and
jpayne@68 81 process data.
jpayne@68 82 '''
jpayne@68 83 self.ensure_running()
jpayne@68 84 if len(fds) + 4 >= MAXFDS_TO_SEND:
jpayne@68 85 raise ValueError('too many fds')
jpayne@68 86 with socket.socket(socket.AF_UNIX) as client:
jpayne@68 87 client.connect(self._forkserver_address)
jpayne@68 88 parent_r, child_w = os.pipe()
jpayne@68 89 child_r, parent_w = os.pipe()
jpayne@68 90 allfds = [child_r, child_w, self._forkserver_alive_fd,
jpayne@68 91 resource_tracker.getfd()]
jpayne@68 92 allfds += fds
jpayne@68 93 try:
jpayne@68 94 reduction.sendfds(client, allfds)
jpayne@68 95 return parent_r, parent_w
jpayne@68 96 except:
jpayne@68 97 os.close(parent_r)
jpayne@68 98 os.close(parent_w)
jpayne@68 99 raise
jpayne@68 100 finally:
jpayne@68 101 os.close(child_r)
jpayne@68 102 os.close(child_w)
jpayne@68 103
jpayne@68 104 def ensure_running(self):
jpayne@68 105 '''Make sure that a fork server is running.
jpayne@68 106
jpayne@68 107 This can be called from any process. Note that usually a child
jpayne@68 108 process will just reuse the forkserver started by its parent, so
jpayne@68 109 ensure_running() will do nothing.
jpayne@68 110 '''
jpayne@68 111 with self._lock:
jpayne@68 112 resource_tracker.ensure_running()
jpayne@68 113 if self._forkserver_pid is not None:
jpayne@68 114 # forkserver was launched before, is it still running?
jpayne@68 115 pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
jpayne@68 116 if not pid:
jpayne@68 117 # still alive
jpayne@68 118 return
jpayne@68 119 # dead, launch it again
jpayne@68 120 os.close(self._forkserver_alive_fd)
jpayne@68 121 self._forkserver_address = None
jpayne@68 122 self._forkserver_alive_fd = None
jpayne@68 123 self._forkserver_pid = None
jpayne@68 124
jpayne@68 125 cmd = ('from multiprocessing.forkserver import main; ' +
jpayne@68 126 'main(%d, %d, %r, **%r)')
jpayne@68 127
jpayne@68 128 if self._preload_modules:
jpayne@68 129 desired_keys = {'main_path', 'sys_path'}
jpayne@68 130 data = spawn.get_preparation_data('ignore')
jpayne@68 131 data = {x: y for x, y in data.items() if x in desired_keys}
jpayne@68 132 else:
jpayne@68 133 data = {}
jpayne@68 134
jpayne@68 135 with socket.socket(socket.AF_UNIX) as listener:
jpayne@68 136 address = connection.arbitrary_address('AF_UNIX')
jpayne@68 137 listener.bind(address)
jpayne@68 138 os.chmod(address, 0o600)
jpayne@68 139 listener.listen()
jpayne@68 140
jpayne@68 141 # all client processes own the write end of the "alive" pipe;
jpayne@68 142 # when they all terminate the read end becomes ready.
jpayne@68 143 alive_r, alive_w = os.pipe()
jpayne@68 144 try:
jpayne@68 145 fds_to_pass = [listener.fileno(), alive_r]
jpayne@68 146 cmd %= (listener.fileno(), alive_r, self._preload_modules,
jpayne@68 147 data)
jpayne@68 148 exe = spawn.get_executable()
jpayne@68 149 args = [exe] + util._args_from_interpreter_flags()
jpayne@68 150 args += ['-c', cmd]
jpayne@68 151 pid = util.spawnv_passfds(exe, args, fds_to_pass)
jpayne@68 152 except:
jpayne@68 153 os.close(alive_w)
jpayne@68 154 raise
jpayne@68 155 finally:
jpayne@68 156 os.close(alive_r)
jpayne@68 157 self._forkserver_address = address
jpayne@68 158 self._forkserver_alive_fd = alive_w
jpayne@68 159 self._forkserver_pid = pid
jpayne@68 160
jpayne@68 161 #
jpayne@68 162 #
jpayne@68 163 #
jpayne@68 164
jpayne@68 165 def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
jpayne@68 166 '''Run forkserver.'''
jpayne@68 167 if preload:
jpayne@68 168 if '__main__' in preload and main_path is not None:
jpayne@68 169 process.current_process()._inheriting = True
jpayne@68 170 try:
jpayne@68 171 spawn.import_main_path(main_path)
jpayne@68 172 finally:
jpayne@68 173 del process.current_process()._inheriting
jpayne@68 174 for modname in preload:
jpayne@68 175 try:
jpayne@68 176 __import__(modname)
jpayne@68 177 except ImportError:
jpayne@68 178 pass
jpayne@68 179
jpayne@68 180 util._close_stdin()
jpayne@68 181
jpayne@68 182 sig_r, sig_w = os.pipe()
jpayne@68 183 os.set_blocking(sig_r, False)
jpayne@68 184 os.set_blocking(sig_w, False)
jpayne@68 185
jpayne@68 186 def sigchld_handler(*_unused):
jpayne@68 187 # Dummy signal handler, doesn't do anything
jpayne@68 188 pass
jpayne@68 189
jpayne@68 190 handlers = {
jpayne@68 191 # unblocking SIGCHLD allows the wakeup fd to notify our event loop
jpayne@68 192 signal.SIGCHLD: sigchld_handler,
jpayne@68 193 # protect the process from ^C
jpayne@68 194 signal.SIGINT: signal.SIG_IGN,
jpayne@68 195 }
jpayne@68 196 old_handlers = {sig: signal.signal(sig, val)
jpayne@68 197 for (sig, val) in handlers.items()}
jpayne@68 198
jpayne@68 199 # calling os.write() in the Python signal handler is racy
jpayne@68 200 signal.set_wakeup_fd(sig_w)
jpayne@68 201
jpayne@68 202 # map child pids to client fds
jpayne@68 203 pid_to_fd = {}
jpayne@68 204
jpayne@68 205 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
jpayne@68 206 selectors.DefaultSelector() as selector:
jpayne@68 207 _forkserver._forkserver_address = listener.getsockname()
jpayne@68 208
jpayne@68 209 selector.register(listener, selectors.EVENT_READ)
jpayne@68 210 selector.register(alive_r, selectors.EVENT_READ)
jpayne@68 211 selector.register(sig_r, selectors.EVENT_READ)
jpayne@68 212
jpayne@68 213 while True:
jpayne@68 214 try:
jpayne@68 215 while True:
jpayne@68 216 rfds = [key.fileobj for (key, events) in selector.select()]
jpayne@68 217 if rfds:
jpayne@68 218 break
jpayne@68 219
jpayne@68 220 if alive_r in rfds:
jpayne@68 221 # EOF because no more client processes left
jpayne@68 222 assert os.read(alive_r, 1) == b'', "Not at EOF?"
jpayne@68 223 raise SystemExit
jpayne@68 224
jpayne@68 225 if sig_r in rfds:
jpayne@68 226 # Got SIGCHLD
jpayne@68 227 os.read(sig_r, 65536) # exhaust
jpayne@68 228 while True:
jpayne@68 229 # Scan for child processes
jpayne@68 230 try:
jpayne@68 231 pid, sts = os.waitpid(-1, os.WNOHANG)
jpayne@68 232 except ChildProcessError:
jpayne@68 233 break
jpayne@68 234 if pid == 0:
jpayne@68 235 break
jpayne@68 236 child_w = pid_to_fd.pop(pid, None)
jpayne@68 237 if child_w is not None:
jpayne@68 238 if os.WIFSIGNALED(sts):
jpayne@68 239 returncode = -os.WTERMSIG(sts)
jpayne@68 240 else:
jpayne@68 241 if not os.WIFEXITED(sts):
jpayne@68 242 raise AssertionError(
jpayne@68 243 "Child {0:n} status is {1:n}".format(
jpayne@68 244 pid,sts))
jpayne@68 245 returncode = os.WEXITSTATUS(sts)
jpayne@68 246 # Send exit code to client process
jpayne@68 247 try:
jpayne@68 248 write_signed(child_w, returncode)
jpayne@68 249 except BrokenPipeError:
jpayne@68 250 # client vanished
jpayne@68 251 pass
jpayne@68 252 os.close(child_w)
jpayne@68 253 else:
jpayne@68 254 # This shouldn't happen really
jpayne@68 255 warnings.warn('forkserver: waitpid returned '
jpayne@68 256 'unexpected pid %d' % pid)
jpayne@68 257
jpayne@68 258 if listener in rfds:
jpayne@68 259 # Incoming fork request
jpayne@68 260 with listener.accept()[0] as s:
jpayne@68 261 # Receive fds from client
jpayne@68 262 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
jpayne@68 263 if len(fds) > MAXFDS_TO_SEND:
jpayne@68 264 raise RuntimeError(
jpayne@68 265 "Too many ({0:n}) fds to send".format(
jpayne@68 266 len(fds)))
jpayne@68 267 child_r, child_w, *fds = fds
jpayne@68 268 s.close()
jpayne@68 269 pid = os.fork()
jpayne@68 270 if pid == 0:
jpayne@68 271 # Child
jpayne@68 272 code = 1
jpayne@68 273 try:
jpayne@68 274 listener.close()
jpayne@68 275 selector.close()
jpayne@68 276 unused_fds = [alive_r, child_w, sig_r, sig_w]
jpayne@68 277 unused_fds.extend(pid_to_fd.values())
jpayne@68 278 code = _serve_one(child_r, fds,
jpayne@68 279 unused_fds,
jpayne@68 280 old_handlers)
jpayne@68 281 except Exception:
jpayne@68 282 sys.excepthook(*sys.exc_info())
jpayne@68 283 sys.stderr.flush()
jpayne@68 284 finally:
jpayne@68 285 os._exit(code)
jpayne@68 286 else:
jpayne@68 287 # Send pid to client process
jpayne@68 288 try:
jpayne@68 289 write_signed(child_w, pid)
jpayne@68 290 except BrokenPipeError:
jpayne@68 291 # client vanished
jpayne@68 292 pass
jpayne@68 293 pid_to_fd[pid] = child_w
jpayne@68 294 os.close(child_r)
jpayne@68 295 for fd in fds:
jpayne@68 296 os.close(fd)
jpayne@68 297
jpayne@68 298 except OSError as e:
jpayne@68 299 if e.errno != errno.ECONNABORTED:
jpayne@68 300 raise
jpayne@68 301
jpayne@68 302
jpayne@68 303 def _serve_one(child_r, fds, unused_fds, handlers):
jpayne@68 304 # close unnecessary stuff and reset signal handlers
jpayne@68 305 signal.set_wakeup_fd(-1)
jpayne@68 306 for sig, val in handlers.items():
jpayne@68 307 signal.signal(sig, val)
jpayne@68 308 for fd in unused_fds:
jpayne@68 309 os.close(fd)
jpayne@68 310
jpayne@68 311 (_forkserver._forkserver_alive_fd,
jpayne@68 312 resource_tracker._resource_tracker._fd,
jpayne@68 313 *_forkserver._inherited_fds) = fds
jpayne@68 314
jpayne@68 315 # Run process object received over pipe
jpayne@68 316 parent_sentinel = os.dup(child_r)
jpayne@68 317 code = spawn._main(child_r, parent_sentinel)
jpayne@68 318
jpayne@68 319 return code
jpayne@68 320
jpayne@68 321
jpayne@68 322 #
jpayne@68 323 # Read and write signed numbers
jpayne@68 324 #
jpayne@68 325
jpayne@68 326 def read_signed(fd):
jpayne@68 327 data = b''
jpayne@68 328 length = SIGNED_STRUCT.size
jpayne@68 329 while len(data) < length:
jpayne@68 330 s = os.read(fd, length - len(data))
jpayne@68 331 if not s:
jpayne@68 332 raise EOFError('unexpected EOF')
jpayne@68 333 data += s
jpayne@68 334 return SIGNED_STRUCT.unpack(data)[0]
jpayne@68 335
jpayne@68 336 def write_signed(fd, n):
jpayne@68 337 msg = SIGNED_STRUCT.pack(n)
jpayne@68 338 while msg:
jpayne@68 339 nbytes = os.write(fd, msg)
jpayne@68 340 if nbytes == 0:
jpayne@68 341 raise RuntimeError('should not get here')
jpayne@68 342 msg = msg[nbytes:]
jpayne@68 343
jpayne@68 344 #
jpayne@68 345 #
jpayne@68 346 #
jpayne@68 347
jpayne@68 348 _forkserver = ForkServer()
jpayne@68 349 ensure_running = _forkserver.ensure_running
jpayne@68 350 get_inherited_fds = _forkserver.get_inherited_fds
jpayne@68 351 connect_to_new_process = _forkserver.connect_to_new_process
jpayne@68 352 set_forkserver_preload = _forkserver.set_forkserver_preload