jpayne@68: # jpayne@68: # Module which deals with pickling of objects. jpayne@68: # jpayne@68: # multiprocessing/reduction.py jpayne@68: # jpayne@68: # Copyright (c) 2006-2008, R Oudkerk jpayne@68: # Licensed to PSF under a Contributor Agreement. jpayne@68: # jpayne@68: jpayne@68: from abc import ABCMeta jpayne@68: import copyreg jpayne@68: import functools jpayne@68: import io jpayne@68: import os jpayne@68: import pickle jpayne@68: import socket jpayne@68: import sys jpayne@68: jpayne@68: from . import context jpayne@68: jpayne@68: __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] jpayne@68: jpayne@68: jpayne@68: HAVE_SEND_HANDLE = (sys.platform == 'win32' or jpayne@68: (hasattr(socket, 'CMSG_LEN') and jpayne@68: hasattr(socket, 'SCM_RIGHTS') and jpayne@68: hasattr(socket.socket, 'sendmsg'))) jpayne@68: jpayne@68: # jpayne@68: # Pickler subclass jpayne@68: # jpayne@68: jpayne@68: class ForkingPickler(pickle.Pickler): jpayne@68: '''Pickler subclass used by multiprocessing.''' jpayne@68: _extra_reducers = {} jpayne@68: _copyreg_dispatch_table = copyreg.dispatch_table jpayne@68: jpayne@68: def __init__(self, *args): jpayne@68: super().__init__(*args) jpayne@68: self.dispatch_table = self._copyreg_dispatch_table.copy() jpayne@68: self.dispatch_table.update(self._extra_reducers) jpayne@68: jpayne@68: @classmethod jpayne@68: def register(cls, type, reduce): jpayne@68: '''Register a reduce function for a type.''' jpayne@68: cls._extra_reducers[type] = reduce jpayne@68: jpayne@68: @classmethod jpayne@68: def dumps(cls, obj, protocol=None): jpayne@68: buf = io.BytesIO() jpayne@68: cls(buf, protocol).dump(obj) jpayne@68: return buf.getbuffer() jpayne@68: jpayne@68: loads = pickle.loads jpayne@68: jpayne@68: register = ForkingPickler.register jpayne@68: jpayne@68: def dump(obj, file, protocol=None): jpayne@68: '''Replacement for pickle.dump() using ForkingPickler.''' jpayne@68: ForkingPickler(file, protocol).dump(obj) jpayne@68: jpayne@68: # jpayne@68: # Platform specific definitions jpayne@68: # jpayne@68: jpayne@68: if sys.platform == 'win32': jpayne@68: # Windows jpayne@68: __all__ += ['DupHandle', 'duplicate', 'steal_handle'] jpayne@68: import _winapi jpayne@68: jpayne@68: def duplicate(handle, target_process=None, inheritable=False, jpayne@68: *, source_process=None): jpayne@68: '''Duplicate a handle. (target_process is a handle not a pid!)''' jpayne@68: current_process = _winapi.GetCurrentProcess() jpayne@68: if source_process is None: jpayne@68: source_process = current_process jpayne@68: if target_process is None: jpayne@68: target_process = current_process jpayne@68: return _winapi.DuplicateHandle( jpayne@68: source_process, handle, target_process, jpayne@68: 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) jpayne@68: jpayne@68: def steal_handle(source_pid, handle): jpayne@68: '''Steal a handle from process identified by source_pid.''' jpayne@68: source_process_handle = _winapi.OpenProcess( jpayne@68: _winapi.PROCESS_DUP_HANDLE, False, source_pid) jpayne@68: try: jpayne@68: return _winapi.DuplicateHandle( jpayne@68: source_process_handle, handle, jpayne@68: _winapi.GetCurrentProcess(), 0, False, jpayne@68: _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) jpayne@68: finally: jpayne@68: _winapi.CloseHandle(source_process_handle) jpayne@68: jpayne@68: def send_handle(conn, handle, destination_pid): jpayne@68: '''Send a handle over a local connection.''' jpayne@68: dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) jpayne@68: conn.send(dh) jpayne@68: jpayne@68: def recv_handle(conn): jpayne@68: '''Receive a handle over a local connection.''' jpayne@68: return conn.recv().detach() jpayne@68: jpayne@68: class DupHandle(object): jpayne@68: '''Picklable wrapper for a handle.''' jpayne@68: def __init__(self, handle, access, pid=None): jpayne@68: if pid is None: jpayne@68: # We just duplicate the handle in the current process and jpayne@68: # let the receiving process steal the handle. jpayne@68: pid = os.getpid() jpayne@68: proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) jpayne@68: try: jpayne@68: self._handle = _winapi.DuplicateHandle( jpayne@68: _winapi.GetCurrentProcess(), jpayne@68: handle, proc, access, False, 0) jpayne@68: finally: jpayne@68: _winapi.CloseHandle(proc) jpayne@68: self._access = access jpayne@68: self._pid = pid jpayne@68: jpayne@68: def detach(self): jpayne@68: '''Get the handle. This should only be called once.''' jpayne@68: # retrieve handle from process which currently owns it jpayne@68: if self._pid == os.getpid(): jpayne@68: # The handle has already been duplicated for this process. jpayne@68: return self._handle jpayne@68: # We must steal the handle from the process whose pid is self._pid. jpayne@68: proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, jpayne@68: self._pid) jpayne@68: try: jpayne@68: return _winapi.DuplicateHandle( jpayne@68: proc, self._handle, _winapi.GetCurrentProcess(), jpayne@68: self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) jpayne@68: finally: jpayne@68: _winapi.CloseHandle(proc) jpayne@68: jpayne@68: else: jpayne@68: # Unix jpayne@68: __all__ += ['DupFd', 'sendfds', 'recvfds'] jpayne@68: import array jpayne@68: jpayne@68: # On MacOSX we should acknowledge receipt of fds -- see Issue14669 jpayne@68: ACKNOWLEDGE = sys.platform == 'darwin' jpayne@68: jpayne@68: def sendfds(sock, fds): jpayne@68: '''Send an array of fds over an AF_UNIX socket.''' jpayne@68: fds = array.array('i', fds) jpayne@68: msg = bytes([len(fds) % 256]) jpayne@68: sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) jpayne@68: if ACKNOWLEDGE and sock.recv(1) != b'A': jpayne@68: raise RuntimeError('did not receive acknowledgement of fd') jpayne@68: jpayne@68: def recvfds(sock, size): jpayne@68: '''Receive an array of fds over an AF_UNIX socket.''' jpayne@68: a = array.array('i') jpayne@68: bytes_size = a.itemsize * size jpayne@68: msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) jpayne@68: if not msg and not ancdata: jpayne@68: raise EOFError jpayne@68: try: jpayne@68: if ACKNOWLEDGE: jpayne@68: sock.send(b'A') jpayne@68: if len(ancdata) != 1: jpayne@68: raise RuntimeError('received %d items of ancdata' % jpayne@68: len(ancdata)) jpayne@68: cmsg_level, cmsg_type, cmsg_data = ancdata[0] jpayne@68: if (cmsg_level == socket.SOL_SOCKET and jpayne@68: cmsg_type == socket.SCM_RIGHTS): jpayne@68: if len(cmsg_data) % a.itemsize != 0: jpayne@68: raise ValueError jpayne@68: a.frombytes(cmsg_data) jpayne@68: if len(a) % 256 != msg[0]: jpayne@68: raise AssertionError( jpayne@68: "Len is {0:n} but msg[0] is {1!r}".format( jpayne@68: len(a), msg[0])) jpayne@68: return list(a) jpayne@68: except (ValueError, IndexError): jpayne@68: pass jpayne@68: raise RuntimeError('Invalid data received') jpayne@68: jpayne@68: def send_handle(conn, handle, destination_pid): jpayne@68: '''Send a handle over a local connection.''' jpayne@68: with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: jpayne@68: sendfds(s, [handle]) jpayne@68: jpayne@68: def recv_handle(conn): jpayne@68: '''Receive a handle over a local connection.''' jpayne@68: with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: jpayne@68: return recvfds(s, 1)[0] jpayne@68: jpayne@68: def DupFd(fd): jpayne@68: '''Return a wrapper for an fd.''' jpayne@68: popen_obj = context.get_spawning_popen() jpayne@68: if popen_obj is not None: jpayne@68: return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) jpayne@68: elif HAVE_SEND_HANDLE: jpayne@68: from . import resource_sharer jpayne@68: return resource_sharer.DupFd(fd) jpayne@68: else: jpayne@68: raise ValueError('SCM_RIGHTS appears not to be available') jpayne@68: jpayne@68: # jpayne@68: # Try making some callable types picklable jpayne@68: # jpayne@68: jpayne@68: def _reduce_method(m): jpayne@68: if m.__self__ is None: jpayne@68: return getattr, (m.__class__, m.__func__.__name__) jpayne@68: else: jpayne@68: return getattr, (m.__self__, m.__func__.__name__) jpayne@68: class _C: jpayne@68: def f(self): jpayne@68: pass jpayne@68: register(type(_C().f), _reduce_method) jpayne@68: jpayne@68: jpayne@68: def _reduce_method_descriptor(m): jpayne@68: return getattr, (m.__objclass__, m.__name__) jpayne@68: register(type(list.append), _reduce_method_descriptor) jpayne@68: register(type(int.__add__), _reduce_method_descriptor) jpayne@68: jpayne@68: jpayne@68: def _reduce_partial(p): jpayne@68: return _rebuild_partial, (p.func, p.args, p.keywords or {}) jpayne@68: def _rebuild_partial(func, args, keywords): jpayne@68: return functools.partial(func, *args, **keywords) jpayne@68: register(functools.partial, _reduce_partial) jpayne@68: jpayne@68: # jpayne@68: # Make sockets picklable jpayne@68: # jpayne@68: jpayne@68: if sys.platform == 'win32': jpayne@68: def _reduce_socket(s): jpayne@68: from .resource_sharer import DupSocket jpayne@68: return _rebuild_socket, (DupSocket(s),) jpayne@68: def _rebuild_socket(ds): jpayne@68: return ds.detach() jpayne@68: register(socket.socket, _reduce_socket) jpayne@68: jpayne@68: else: jpayne@68: def _reduce_socket(s): jpayne@68: df = DupFd(s.fileno()) jpayne@68: return _rebuild_socket, (df, s.family, s.type, s.proto) jpayne@68: def _rebuild_socket(df, family, type, proto): jpayne@68: fd = df.detach() jpayne@68: return socket.socket(family, type, proto, fileno=fd) jpayne@68: register(socket.socket, _reduce_socket) jpayne@68: jpayne@68: jpayne@68: class AbstractReducer(metaclass=ABCMeta): jpayne@68: '''Abstract base class for use in implementing a Reduction class jpayne@68: suitable for use in replacing the standard reduction mechanism jpayne@68: used in multiprocessing.''' jpayne@68: ForkingPickler = ForkingPickler jpayne@68: register = register jpayne@68: dump = dump jpayne@68: send_handle = send_handle jpayne@68: recv_handle = recv_handle jpayne@68: jpayne@68: if sys.platform == 'win32': jpayne@68: steal_handle = steal_handle jpayne@68: duplicate = duplicate jpayne@68: DupHandle = DupHandle jpayne@68: else: jpayne@68: sendfds = sendfds jpayne@68: recvfds = recvfds jpayne@68: DupFd = DupFd jpayne@68: jpayne@68: _reduce_method = _reduce_method jpayne@68: _reduce_method_descriptor = _reduce_method_descriptor jpayne@68: _rebuild_partial = _rebuild_partial jpayne@68: _reduce_socket = _reduce_socket jpayne@68: _rebuild_socket = _rebuild_socket jpayne@68: jpayne@68: def __init__(self, *args): jpayne@68: register(type(_C().f), _reduce_method) jpayne@68: register(type(list.append), _reduce_method_descriptor) jpayne@68: register(type(int.__add__), _reduce_method_descriptor) jpayne@68: register(functools.partial, _reduce_partial) jpayne@68: register(socket.socket, _reduce_socket)