annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/forkserver.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 import errno
jpayne@69 2 import os
jpayne@69 3 import selectors
jpayne@69 4 import signal
jpayne@69 5 import socket
jpayne@69 6 import struct
jpayne@69 7 import sys
jpayne@69 8 import threading
jpayne@69 9 import warnings
jpayne@69 10
jpayne@69 11 from . import connection
jpayne@69 12 from . import process
jpayne@69 13 from .context import reduction
jpayne@69 14 from . import resource_tracker
jpayne@69 15 from . import spawn
jpayne@69 16 from . import util
jpayne@69 17
jpayne@69 18 __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
jpayne@69 19 'set_forkserver_preload']
jpayne@69 20
jpayne@69 21 #
jpayne@69 22 #
jpayne@69 23 #
jpayne@69 24
jpayne@69 25 MAXFDS_TO_SEND = 256
jpayne@69 26 SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
jpayne@69 27
jpayne@69 28 #
jpayne@69 29 # Forkserver class
jpayne@69 30 #
jpayne@69 31
jpayne@69 32 class ForkServer(object):
jpayne@69 33
jpayne@69 34 def __init__(self):
jpayne@69 35 self._forkserver_address = None
jpayne@69 36 self._forkserver_alive_fd = None
jpayne@69 37 self._forkserver_pid = None
jpayne@69 38 self._inherited_fds = None
jpayne@69 39 self._lock = threading.Lock()
jpayne@69 40 self._preload_modules = ['__main__']
jpayne@69 41
jpayne@69 42 def _stop(self):
jpayne@69 43 # Method used by unit tests to stop the server
jpayne@69 44 with self._lock:
jpayne@69 45 self._stop_unlocked()
jpayne@69 46
jpayne@69 47 def _stop_unlocked(self):
jpayne@69 48 if self._forkserver_pid is None:
jpayne@69 49 return
jpayne@69 50
jpayne@69 51 # close the "alive" file descriptor asks the server to stop
jpayne@69 52 os.close(self._forkserver_alive_fd)
jpayne@69 53 self._forkserver_alive_fd = None
jpayne@69 54
jpayne@69 55 os.waitpid(self._forkserver_pid, 0)
jpayne@69 56 self._forkserver_pid = None
jpayne@69 57
jpayne@69 58 os.unlink(self._forkserver_address)
jpayne@69 59 self._forkserver_address = None
jpayne@69 60
jpayne@69 61 def set_forkserver_preload(self, modules_names):
jpayne@69 62 '''Set list of module names to try to load in forkserver process.'''
jpayne@69 63 if not all(type(mod) is str for mod in self._preload_modules):
jpayne@69 64 raise TypeError('module_names must be a list of strings')
jpayne@69 65 self._preload_modules = modules_names
jpayne@69 66
jpayne@69 67 def get_inherited_fds(self):
jpayne@69 68 '''Return list of fds inherited from parent process.
jpayne@69 69
jpayne@69 70 This returns None if the current process was not started by fork
jpayne@69 71 server.
jpayne@69 72 '''
jpayne@69 73 return self._inherited_fds
jpayne@69 74
jpayne@69 75 def connect_to_new_process(self, fds):
jpayne@69 76 '''Request forkserver to create a child process.
jpayne@69 77
jpayne@69 78 Returns a pair of fds (status_r, data_w). The calling process can read
jpayne@69 79 the child process's pid and (eventually) its returncode from status_r.
jpayne@69 80 The calling process should write to data_w the pickled preparation and
jpayne@69 81 process data.
jpayne@69 82 '''
jpayne@69 83 self.ensure_running()
jpayne@69 84 if len(fds) + 4 >= MAXFDS_TO_SEND:
jpayne@69 85 raise ValueError('too many fds')
jpayne@69 86 with socket.socket(socket.AF_UNIX) as client:
jpayne@69 87 client.connect(self._forkserver_address)
jpayne@69 88 parent_r, child_w = os.pipe()
jpayne@69 89 child_r, parent_w = os.pipe()
jpayne@69 90 allfds = [child_r, child_w, self._forkserver_alive_fd,
jpayne@69 91 resource_tracker.getfd()]
jpayne@69 92 allfds += fds
jpayne@69 93 try:
jpayne@69 94 reduction.sendfds(client, allfds)
jpayne@69 95 return parent_r, parent_w
jpayne@69 96 except:
jpayne@69 97 os.close(parent_r)
jpayne@69 98 os.close(parent_w)
jpayne@69 99 raise
jpayne@69 100 finally:
jpayne@69 101 os.close(child_r)
jpayne@69 102 os.close(child_w)
jpayne@69 103
jpayne@69 104 def ensure_running(self):
jpayne@69 105 '''Make sure that a fork server is running.
jpayne@69 106
jpayne@69 107 This can be called from any process. Note that usually a child
jpayne@69 108 process will just reuse the forkserver started by its parent, so
jpayne@69 109 ensure_running() will do nothing.
jpayne@69 110 '''
jpayne@69 111 with self._lock:
jpayne@69 112 resource_tracker.ensure_running()
jpayne@69 113 if self._forkserver_pid is not None:
jpayne@69 114 # forkserver was launched before, is it still running?
jpayne@69 115 pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
jpayne@69 116 if not pid:
jpayne@69 117 # still alive
jpayne@69 118 return
jpayne@69 119 # dead, launch it again
jpayne@69 120 os.close(self._forkserver_alive_fd)
jpayne@69 121 self._forkserver_address = None
jpayne@69 122 self._forkserver_alive_fd = None
jpayne@69 123 self._forkserver_pid = None
jpayne@69 124
jpayne@69 125 cmd = ('from multiprocessing.forkserver import main; ' +
jpayne@69 126 'main(%d, %d, %r, **%r)')
jpayne@69 127
jpayne@69 128 if self._preload_modules:
jpayne@69 129 desired_keys = {'main_path', 'sys_path'}
jpayne@69 130 data = spawn.get_preparation_data('ignore')
jpayne@69 131 data = {x: y for x, y in data.items() if x in desired_keys}
jpayne@69 132 else:
jpayne@69 133 data = {}
jpayne@69 134
jpayne@69 135 with socket.socket(socket.AF_UNIX) as listener:
jpayne@69 136 address = connection.arbitrary_address('AF_UNIX')
jpayne@69 137 listener.bind(address)
jpayne@69 138 os.chmod(address, 0o600)
jpayne@69 139 listener.listen()
jpayne@69 140
jpayne@69 141 # all client processes own the write end of the "alive" pipe;
jpayne@69 142 # when they all terminate the read end becomes ready.
jpayne@69 143 alive_r, alive_w = os.pipe()
jpayne@69 144 try:
jpayne@69 145 fds_to_pass = [listener.fileno(), alive_r]
jpayne@69 146 cmd %= (listener.fileno(), alive_r, self._preload_modules,
jpayne@69 147 data)
jpayne@69 148 exe = spawn.get_executable()
jpayne@69 149 args = [exe] + util._args_from_interpreter_flags()
jpayne@69 150 args += ['-c', cmd]
jpayne@69 151 pid = util.spawnv_passfds(exe, args, fds_to_pass)
jpayne@69 152 except:
jpayne@69 153 os.close(alive_w)
jpayne@69 154 raise
jpayne@69 155 finally:
jpayne@69 156 os.close(alive_r)
jpayne@69 157 self._forkserver_address = address
jpayne@69 158 self._forkserver_alive_fd = alive_w
jpayne@69 159 self._forkserver_pid = pid
jpayne@69 160
jpayne@69 161 #
jpayne@69 162 #
jpayne@69 163 #
jpayne@69 164
jpayne@69 165 def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
jpayne@69 166 '''Run forkserver.'''
jpayne@69 167 if preload:
jpayne@69 168 if '__main__' in preload and main_path is not None:
jpayne@69 169 process.current_process()._inheriting = True
jpayne@69 170 try:
jpayne@69 171 spawn.import_main_path(main_path)
jpayne@69 172 finally:
jpayne@69 173 del process.current_process()._inheriting
jpayne@69 174 for modname in preload:
jpayne@69 175 try:
jpayne@69 176 __import__(modname)
jpayne@69 177 except ImportError:
jpayne@69 178 pass
jpayne@69 179
jpayne@69 180 util._close_stdin()
jpayne@69 181
jpayne@69 182 sig_r, sig_w = os.pipe()
jpayne@69 183 os.set_blocking(sig_r, False)
jpayne@69 184 os.set_blocking(sig_w, False)
jpayne@69 185
jpayne@69 186 def sigchld_handler(*_unused):
jpayne@69 187 # Dummy signal handler, doesn't do anything
jpayne@69 188 pass
jpayne@69 189
jpayne@69 190 handlers = {
jpayne@69 191 # unblocking SIGCHLD allows the wakeup fd to notify our event loop
jpayne@69 192 signal.SIGCHLD: sigchld_handler,
jpayne@69 193 # protect the process from ^C
jpayne@69 194 signal.SIGINT: signal.SIG_IGN,
jpayne@69 195 }
jpayne@69 196 old_handlers = {sig: signal.signal(sig, val)
jpayne@69 197 for (sig, val) in handlers.items()}
jpayne@69 198
jpayne@69 199 # calling os.write() in the Python signal handler is racy
jpayne@69 200 signal.set_wakeup_fd(sig_w)
jpayne@69 201
jpayne@69 202 # map child pids to client fds
jpayne@69 203 pid_to_fd = {}
jpayne@69 204
jpayne@69 205 with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
jpayne@69 206 selectors.DefaultSelector() as selector:
jpayne@69 207 _forkserver._forkserver_address = listener.getsockname()
jpayne@69 208
jpayne@69 209 selector.register(listener, selectors.EVENT_READ)
jpayne@69 210 selector.register(alive_r, selectors.EVENT_READ)
jpayne@69 211 selector.register(sig_r, selectors.EVENT_READ)
jpayne@69 212
jpayne@69 213 while True:
jpayne@69 214 try:
jpayne@69 215 while True:
jpayne@69 216 rfds = [key.fileobj for (key, events) in selector.select()]
jpayne@69 217 if rfds:
jpayne@69 218 break
jpayne@69 219
jpayne@69 220 if alive_r in rfds:
jpayne@69 221 # EOF because no more client processes left
jpayne@69 222 assert os.read(alive_r, 1) == b'', "Not at EOF?"
jpayne@69 223 raise SystemExit
jpayne@69 224
jpayne@69 225 if sig_r in rfds:
jpayne@69 226 # Got SIGCHLD
jpayne@69 227 os.read(sig_r, 65536) # exhaust
jpayne@69 228 while True:
jpayne@69 229 # Scan for child processes
jpayne@69 230 try:
jpayne@69 231 pid, sts = os.waitpid(-1, os.WNOHANG)
jpayne@69 232 except ChildProcessError:
jpayne@69 233 break
jpayne@69 234 if pid == 0:
jpayne@69 235 break
jpayne@69 236 child_w = pid_to_fd.pop(pid, None)
jpayne@69 237 if child_w is not None:
jpayne@69 238 if os.WIFSIGNALED(sts):
jpayne@69 239 returncode = -os.WTERMSIG(sts)
jpayne@69 240 else:
jpayne@69 241 if not os.WIFEXITED(sts):
jpayne@69 242 raise AssertionError(
jpayne@69 243 "Child {0:n} status is {1:n}".format(
jpayne@69 244 pid,sts))
jpayne@69 245 returncode = os.WEXITSTATUS(sts)
jpayne@69 246 # Send exit code to client process
jpayne@69 247 try:
jpayne@69 248 write_signed(child_w, returncode)
jpayne@69 249 except BrokenPipeError:
jpayne@69 250 # client vanished
jpayne@69 251 pass
jpayne@69 252 os.close(child_w)
jpayne@69 253 else:
jpayne@69 254 # This shouldn't happen really
jpayne@69 255 warnings.warn('forkserver: waitpid returned '
jpayne@69 256 'unexpected pid %d' % pid)
jpayne@69 257
jpayne@69 258 if listener in rfds:
jpayne@69 259 # Incoming fork request
jpayne@69 260 with listener.accept()[0] as s:
jpayne@69 261 # Receive fds from client
jpayne@69 262 fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
jpayne@69 263 if len(fds) > MAXFDS_TO_SEND:
jpayne@69 264 raise RuntimeError(
jpayne@69 265 "Too many ({0:n}) fds to send".format(
jpayne@69 266 len(fds)))
jpayne@69 267 child_r, child_w, *fds = fds
jpayne@69 268 s.close()
jpayne@69 269 pid = os.fork()
jpayne@69 270 if pid == 0:
jpayne@69 271 # Child
jpayne@69 272 code = 1
jpayne@69 273 try:
jpayne@69 274 listener.close()
jpayne@69 275 selector.close()
jpayne@69 276 unused_fds = [alive_r, child_w, sig_r, sig_w]
jpayne@69 277 unused_fds.extend(pid_to_fd.values())
jpayne@69 278 code = _serve_one(child_r, fds,
jpayne@69 279 unused_fds,
jpayne@69 280 old_handlers)
jpayne@69 281 except Exception:
jpayne@69 282 sys.excepthook(*sys.exc_info())
jpayne@69 283 sys.stderr.flush()
jpayne@69 284 finally:
jpayne@69 285 os._exit(code)
jpayne@69 286 else:
jpayne@69 287 # Send pid to client process
jpayne@69 288 try:
jpayne@69 289 write_signed(child_w, pid)
jpayne@69 290 except BrokenPipeError:
jpayne@69 291 # client vanished
jpayne@69 292 pass
jpayne@69 293 pid_to_fd[pid] = child_w
jpayne@69 294 os.close(child_r)
jpayne@69 295 for fd in fds:
jpayne@69 296 os.close(fd)
jpayne@69 297
jpayne@69 298 except OSError as e:
jpayne@69 299 if e.errno != errno.ECONNABORTED:
jpayne@69 300 raise
jpayne@69 301
jpayne@69 302
jpayne@69 303 def _serve_one(child_r, fds, unused_fds, handlers):
jpayne@69 304 # close unnecessary stuff and reset signal handlers
jpayne@69 305 signal.set_wakeup_fd(-1)
jpayne@69 306 for sig, val in handlers.items():
jpayne@69 307 signal.signal(sig, val)
jpayne@69 308 for fd in unused_fds:
jpayne@69 309 os.close(fd)
jpayne@69 310
jpayne@69 311 (_forkserver._forkserver_alive_fd,
jpayne@69 312 resource_tracker._resource_tracker._fd,
jpayne@69 313 *_forkserver._inherited_fds) = fds
jpayne@69 314
jpayne@69 315 # Run process object received over pipe
jpayne@69 316 parent_sentinel = os.dup(child_r)
jpayne@69 317 code = spawn._main(child_r, parent_sentinel)
jpayne@69 318
jpayne@69 319 return code
jpayne@69 320
jpayne@69 321
jpayne@69 322 #
jpayne@69 323 # Read and write signed numbers
jpayne@69 324 #
jpayne@69 325
jpayne@69 326 def read_signed(fd):
jpayne@69 327 data = b''
jpayne@69 328 length = SIGNED_STRUCT.size
jpayne@69 329 while len(data) < length:
jpayne@69 330 s = os.read(fd, length - len(data))
jpayne@69 331 if not s:
jpayne@69 332 raise EOFError('unexpected EOF')
jpayne@69 333 data += s
jpayne@69 334 return SIGNED_STRUCT.unpack(data)[0]
jpayne@69 335
jpayne@69 336 def write_signed(fd, n):
jpayne@69 337 msg = SIGNED_STRUCT.pack(n)
jpayne@69 338 while msg:
jpayne@69 339 nbytes = os.write(fd, msg)
jpayne@69 340 if nbytes == 0:
jpayne@69 341 raise RuntimeError('should not get here')
jpayne@69 342 msg = msg[nbytes:]
jpayne@69 343
jpayne@69 344 #
jpayne@69 345 #
jpayne@69 346 #
jpayne@69 347
jpayne@69 348 _forkserver = ForkServer()
jpayne@69 349 ensure_running = _forkserver.ensure_running
jpayne@69 350 get_inherited_fds = _forkserver.get_inherited_fds
jpayne@69 351 connect_to_new_process = _forkserver.connect_to_new_process
jpayne@69 352 set_forkserver_preload = _forkserver.set_forkserver_preload