annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/reduction.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 #
jpayne@69 2 # Module which deals with pickling of objects.
jpayne@69 3 #
jpayne@69 4 # multiprocessing/reduction.py
jpayne@69 5 #
jpayne@69 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@69 7 # Licensed to PSF under a Contributor Agreement.
jpayne@69 8 #
jpayne@69 9
jpayne@69 10 from abc import ABCMeta
jpayne@69 11 import copyreg
jpayne@69 12 import functools
jpayne@69 13 import io
jpayne@69 14 import os
jpayne@69 15 import pickle
jpayne@69 16 import socket
jpayne@69 17 import sys
jpayne@69 18
jpayne@69 19 from . import context
jpayne@69 20
jpayne@69 21 __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
jpayne@69 22
jpayne@69 23
jpayne@69 24 HAVE_SEND_HANDLE = (sys.platform == 'win32' or
jpayne@69 25 (hasattr(socket, 'CMSG_LEN') and
jpayne@69 26 hasattr(socket, 'SCM_RIGHTS') and
jpayne@69 27 hasattr(socket.socket, 'sendmsg')))
jpayne@69 28
jpayne@69 29 #
jpayne@69 30 # Pickler subclass
jpayne@69 31 #
jpayne@69 32
jpayne@69 33 class ForkingPickler(pickle.Pickler):
jpayne@69 34 '''Pickler subclass used by multiprocessing.'''
jpayne@69 35 _extra_reducers = {}
jpayne@69 36 _copyreg_dispatch_table = copyreg.dispatch_table
jpayne@69 37
jpayne@69 38 def __init__(self, *args):
jpayne@69 39 super().__init__(*args)
jpayne@69 40 self.dispatch_table = self._copyreg_dispatch_table.copy()
jpayne@69 41 self.dispatch_table.update(self._extra_reducers)
jpayne@69 42
jpayne@69 43 @classmethod
jpayne@69 44 def register(cls, type, reduce):
jpayne@69 45 '''Register a reduce function for a type.'''
jpayne@69 46 cls._extra_reducers[type] = reduce
jpayne@69 47
jpayne@69 48 @classmethod
jpayne@69 49 def dumps(cls, obj, protocol=None):
jpayne@69 50 buf = io.BytesIO()
jpayne@69 51 cls(buf, protocol).dump(obj)
jpayne@69 52 return buf.getbuffer()
jpayne@69 53
jpayne@69 54 loads = pickle.loads
jpayne@69 55
jpayne@69 56 register = ForkingPickler.register
jpayne@69 57
jpayne@69 58 def dump(obj, file, protocol=None):
jpayne@69 59 '''Replacement for pickle.dump() using ForkingPickler.'''
jpayne@69 60 ForkingPickler(file, protocol).dump(obj)
jpayne@69 61
jpayne@69 62 #
jpayne@69 63 # Platform specific definitions
jpayne@69 64 #
jpayne@69 65
jpayne@69 66 if sys.platform == 'win32':
jpayne@69 67 # Windows
jpayne@69 68 __all__ += ['DupHandle', 'duplicate', 'steal_handle']
jpayne@69 69 import _winapi
jpayne@69 70
jpayne@69 71 def duplicate(handle, target_process=None, inheritable=False,
jpayne@69 72 *, source_process=None):
jpayne@69 73 '''Duplicate a handle. (target_process is a handle not a pid!)'''
jpayne@69 74 current_process = _winapi.GetCurrentProcess()
jpayne@69 75 if source_process is None:
jpayne@69 76 source_process = current_process
jpayne@69 77 if target_process is None:
jpayne@69 78 target_process = current_process
jpayne@69 79 return _winapi.DuplicateHandle(
jpayne@69 80 source_process, handle, target_process,
jpayne@69 81 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
jpayne@69 82
jpayne@69 83 def steal_handle(source_pid, handle):
jpayne@69 84 '''Steal a handle from process identified by source_pid.'''
jpayne@69 85 source_process_handle = _winapi.OpenProcess(
jpayne@69 86 _winapi.PROCESS_DUP_HANDLE, False, source_pid)
jpayne@69 87 try:
jpayne@69 88 return _winapi.DuplicateHandle(
jpayne@69 89 source_process_handle, handle,
jpayne@69 90 _winapi.GetCurrentProcess(), 0, False,
jpayne@69 91 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
jpayne@69 92 finally:
jpayne@69 93 _winapi.CloseHandle(source_process_handle)
jpayne@69 94
jpayne@69 95 def send_handle(conn, handle, destination_pid):
jpayne@69 96 '''Send a handle over a local connection.'''
jpayne@69 97 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
jpayne@69 98 conn.send(dh)
jpayne@69 99
jpayne@69 100 def recv_handle(conn):
jpayne@69 101 '''Receive a handle over a local connection.'''
jpayne@69 102 return conn.recv().detach()
jpayne@69 103
jpayne@69 104 class DupHandle(object):
jpayne@69 105 '''Picklable wrapper for a handle.'''
jpayne@69 106 def __init__(self, handle, access, pid=None):
jpayne@69 107 if pid is None:
jpayne@69 108 # We just duplicate the handle in the current process and
jpayne@69 109 # let the receiving process steal the handle.
jpayne@69 110 pid = os.getpid()
jpayne@69 111 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
jpayne@69 112 try:
jpayne@69 113 self._handle = _winapi.DuplicateHandle(
jpayne@69 114 _winapi.GetCurrentProcess(),
jpayne@69 115 handle, proc, access, False, 0)
jpayne@69 116 finally:
jpayne@69 117 _winapi.CloseHandle(proc)
jpayne@69 118 self._access = access
jpayne@69 119 self._pid = pid
jpayne@69 120
jpayne@69 121 def detach(self):
jpayne@69 122 '''Get the handle. This should only be called once.'''
jpayne@69 123 # retrieve handle from process which currently owns it
jpayne@69 124 if self._pid == os.getpid():
jpayne@69 125 # The handle has already been duplicated for this process.
jpayne@69 126 return self._handle
jpayne@69 127 # We must steal the handle from the process whose pid is self._pid.
jpayne@69 128 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
jpayne@69 129 self._pid)
jpayne@69 130 try:
jpayne@69 131 return _winapi.DuplicateHandle(
jpayne@69 132 proc, self._handle, _winapi.GetCurrentProcess(),
jpayne@69 133 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
jpayne@69 134 finally:
jpayne@69 135 _winapi.CloseHandle(proc)
jpayne@69 136
jpayne@69 137 else:
jpayne@69 138 # Unix
jpayne@69 139 __all__ += ['DupFd', 'sendfds', 'recvfds']
jpayne@69 140 import array
jpayne@69 141
jpayne@69 142 # On MacOSX we should acknowledge receipt of fds -- see Issue14669
jpayne@69 143 ACKNOWLEDGE = sys.platform == 'darwin'
jpayne@69 144
jpayne@69 145 def sendfds(sock, fds):
jpayne@69 146 '''Send an array of fds over an AF_UNIX socket.'''
jpayne@69 147 fds = array.array('i', fds)
jpayne@69 148 msg = bytes([len(fds) % 256])
jpayne@69 149 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
jpayne@69 150 if ACKNOWLEDGE and sock.recv(1) != b'A':
jpayne@69 151 raise RuntimeError('did not receive acknowledgement of fd')
jpayne@69 152
jpayne@69 153 def recvfds(sock, size):
jpayne@69 154 '''Receive an array of fds over an AF_UNIX socket.'''
jpayne@69 155 a = array.array('i')
jpayne@69 156 bytes_size = a.itemsize * size
jpayne@69 157 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
jpayne@69 158 if not msg and not ancdata:
jpayne@69 159 raise EOFError
jpayne@69 160 try:
jpayne@69 161 if ACKNOWLEDGE:
jpayne@69 162 sock.send(b'A')
jpayne@69 163 if len(ancdata) != 1:
jpayne@69 164 raise RuntimeError('received %d items of ancdata' %
jpayne@69 165 len(ancdata))
jpayne@69 166 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
jpayne@69 167 if (cmsg_level == socket.SOL_SOCKET and
jpayne@69 168 cmsg_type == socket.SCM_RIGHTS):
jpayne@69 169 if len(cmsg_data) % a.itemsize != 0:
jpayne@69 170 raise ValueError
jpayne@69 171 a.frombytes(cmsg_data)
jpayne@69 172 if len(a) % 256 != msg[0]:
jpayne@69 173 raise AssertionError(
jpayne@69 174 "Len is {0:n} but msg[0] is {1!r}".format(
jpayne@69 175 len(a), msg[0]))
jpayne@69 176 return list(a)
jpayne@69 177 except (ValueError, IndexError):
jpayne@69 178 pass
jpayne@69 179 raise RuntimeError('Invalid data received')
jpayne@69 180
jpayne@69 181 def send_handle(conn, handle, destination_pid):
jpayne@69 182 '''Send a handle over a local connection.'''
jpayne@69 183 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
jpayne@69 184 sendfds(s, [handle])
jpayne@69 185
jpayne@69 186 def recv_handle(conn):
jpayne@69 187 '''Receive a handle over a local connection.'''
jpayne@69 188 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
jpayne@69 189 return recvfds(s, 1)[0]
jpayne@69 190
jpayne@69 191 def DupFd(fd):
jpayne@69 192 '''Return a wrapper for an fd.'''
jpayne@69 193 popen_obj = context.get_spawning_popen()
jpayne@69 194 if popen_obj is not None:
jpayne@69 195 return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
jpayne@69 196 elif HAVE_SEND_HANDLE:
jpayne@69 197 from . import resource_sharer
jpayne@69 198 return resource_sharer.DupFd(fd)
jpayne@69 199 else:
jpayne@69 200 raise ValueError('SCM_RIGHTS appears not to be available')
jpayne@69 201
jpayne@69 202 #
jpayne@69 203 # Try making some callable types picklable
jpayne@69 204 #
jpayne@69 205
jpayne@69 206 def _reduce_method(m):
jpayne@69 207 if m.__self__ is None:
jpayne@69 208 return getattr, (m.__class__, m.__func__.__name__)
jpayne@69 209 else:
jpayne@69 210 return getattr, (m.__self__, m.__func__.__name__)
jpayne@69 211 class _C:
jpayne@69 212 def f(self):
jpayne@69 213 pass
jpayne@69 214 register(type(_C().f), _reduce_method)
jpayne@69 215
jpayne@69 216
jpayne@69 217 def _reduce_method_descriptor(m):
jpayne@69 218 return getattr, (m.__objclass__, m.__name__)
jpayne@69 219 register(type(list.append), _reduce_method_descriptor)
jpayne@69 220 register(type(int.__add__), _reduce_method_descriptor)
jpayne@69 221
jpayne@69 222
jpayne@69 223 def _reduce_partial(p):
jpayne@69 224 return _rebuild_partial, (p.func, p.args, p.keywords or {})
jpayne@69 225 def _rebuild_partial(func, args, keywords):
jpayne@69 226 return functools.partial(func, *args, **keywords)
jpayne@69 227 register(functools.partial, _reduce_partial)
jpayne@69 228
jpayne@69 229 #
jpayne@69 230 # Make sockets picklable
jpayne@69 231 #
jpayne@69 232
jpayne@69 233 if sys.platform == 'win32':
jpayne@69 234 def _reduce_socket(s):
jpayne@69 235 from .resource_sharer import DupSocket
jpayne@69 236 return _rebuild_socket, (DupSocket(s),)
jpayne@69 237 def _rebuild_socket(ds):
jpayne@69 238 return ds.detach()
jpayne@69 239 register(socket.socket, _reduce_socket)
jpayne@69 240
jpayne@69 241 else:
jpayne@69 242 def _reduce_socket(s):
jpayne@69 243 df = DupFd(s.fileno())
jpayne@69 244 return _rebuild_socket, (df, s.family, s.type, s.proto)
jpayne@69 245 def _rebuild_socket(df, family, type, proto):
jpayne@69 246 fd = df.detach()
jpayne@69 247 return socket.socket(family, type, proto, fileno=fd)
jpayne@69 248 register(socket.socket, _reduce_socket)
jpayne@69 249
jpayne@69 250
jpayne@69 251 class AbstractReducer(metaclass=ABCMeta):
jpayne@69 252 '''Abstract base class for use in implementing a Reduction class
jpayne@69 253 suitable for use in replacing the standard reduction mechanism
jpayne@69 254 used in multiprocessing.'''
jpayne@69 255 ForkingPickler = ForkingPickler
jpayne@69 256 register = register
jpayne@69 257 dump = dump
jpayne@69 258 send_handle = send_handle
jpayne@69 259 recv_handle = recv_handle
jpayne@69 260
jpayne@69 261 if sys.platform == 'win32':
jpayne@69 262 steal_handle = steal_handle
jpayne@69 263 duplicate = duplicate
jpayne@69 264 DupHandle = DupHandle
jpayne@69 265 else:
jpayne@69 266 sendfds = sendfds
jpayne@69 267 recvfds = recvfds
jpayne@69 268 DupFd = DupFd
jpayne@69 269
jpayne@69 270 _reduce_method = _reduce_method
jpayne@69 271 _reduce_method_descriptor = _reduce_method_descriptor
jpayne@69 272 _rebuild_partial = _rebuild_partial
jpayne@69 273 _reduce_socket = _reduce_socket
jpayne@69 274 _rebuild_socket = _rebuild_socket
jpayne@69 275
jpayne@69 276 def __init__(self, *args):
jpayne@69 277 register(type(_C().f), _reduce_method)
jpayne@69 278 register(type(list.append), _reduce_method_descriptor)
jpayne@69 279 register(type(int.__add__), _reduce_method_descriptor)
jpayne@69 280 register(functools.partial, _reduce_partial)
jpayne@69 281 register(socket.socket, _reduce_socket)