jpayne@69: # jpayne@69: # Module which deals with pickling of objects. jpayne@69: # jpayne@69: # multiprocessing/reduction.py jpayne@69: # jpayne@69: # Copyright (c) 2006-2008, R Oudkerk jpayne@69: # Licensed to PSF under a Contributor Agreement. jpayne@69: # jpayne@69: jpayne@69: from abc import ABCMeta jpayne@69: import copyreg jpayne@69: import functools jpayne@69: import io jpayne@69: import os jpayne@69: import pickle jpayne@69: import socket jpayne@69: import sys jpayne@69: jpayne@69: from . import context jpayne@69: jpayne@69: __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] jpayne@69: jpayne@69: jpayne@69: HAVE_SEND_HANDLE = (sys.platform == 'win32' or jpayne@69: (hasattr(socket, 'CMSG_LEN') and jpayne@69: hasattr(socket, 'SCM_RIGHTS') and jpayne@69: hasattr(socket.socket, 'sendmsg'))) jpayne@69: jpayne@69: # jpayne@69: # Pickler subclass jpayne@69: # jpayne@69: jpayne@69: class ForkingPickler(pickle.Pickler): jpayne@69: '''Pickler subclass used by multiprocessing.''' jpayne@69: _extra_reducers = {} jpayne@69: _copyreg_dispatch_table = copyreg.dispatch_table jpayne@69: jpayne@69: def __init__(self, *args): jpayne@69: super().__init__(*args) jpayne@69: self.dispatch_table = self._copyreg_dispatch_table.copy() jpayne@69: self.dispatch_table.update(self._extra_reducers) jpayne@69: jpayne@69: @classmethod jpayne@69: def register(cls, type, reduce): jpayne@69: '''Register a reduce function for a type.''' jpayne@69: cls._extra_reducers[type] = reduce jpayne@69: jpayne@69: @classmethod jpayne@69: def dumps(cls, obj, protocol=None): jpayne@69: buf = io.BytesIO() jpayne@69: cls(buf, protocol).dump(obj) jpayne@69: return buf.getbuffer() jpayne@69: jpayne@69: loads = pickle.loads jpayne@69: jpayne@69: register = ForkingPickler.register jpayne@69: jpayne@69: def dump(obj, file, protocol=None): jpayne@69: '''Replacement for pickle.dump() using ForkingPickler.''' jpayne@69: ForkingPickler(file, protocol).dump(obj) jpayne@69: jpayne@69: # jpayne@69: # Platform specific definitions jpayne@69: # jpayne@69: jpayne@69: if sys.platform == 'win32': jpayne@69: # Windows jpayne@69: __all__ += ['DupHandle', 'duplicate', 'steal_handle'] jpayne@69: import _winapi jpayne@69: jpayne@69: def duplicate(handle, target_process=None, inheritable=False, jpayne@69: *, source_process=None): jpayne@69: '''Duplicate a handle. (target_process is a handle not a pid!)''' jpayne@69: current_process = _winapi.GetCurrentProcess() jpayne@69: if source_process is None: jpayne@69: source_process = current_process jpayne@69: if target_process is None: jpayne@69: target_process = current_process jpayne@69: return _winapi.DuplicateHandle( jpayne@69: source_process, handle, target_process, jpayne@69: 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) jpayne@69: jpayne@69: def steal_handle(source_pid, handle): jpayne@69: '''Steal a handle from process identified by source_pid.''' jpayne@69: source_process_handle = _winapi.OpenProcess( jpayne@69: _winapi.PROCESS_DUP_HANDLE, False, source_pid) jpayne@69: try: jpayne@69: return _winapi.DuplicateHandle( jpayne@69: source_process_handle, handle, jpayne@69: _winapi.GetCurrentProcess(), 0, False, jpayne@69: _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) jpayne@69: finally: jpayne@69: _winapi.CloseHandle(source_process_handle) jpayne@69: jpayne@69: def send_handle(conn, handle, destination_pid): jpayne@69: '''Send a handle over a local connection.''' jpayne@69: dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) jpayne@69: conn.send(dh) jpayne@69: jpayne@69: def recv_handle(conn): jpayne@69: '''Receive a handle over a local connection.''' jpayne@69: return conn.recv().detach() jpayne@69: jpayne@69: class DupHandle(object): jpayne@69: '''Picklable wrapper for a handle.''' jpayne@69: def __init__(self, handle, access, pid=None): jpayne@69: if pid is None: jpayne@69: # We just duplicate the handle in the current process and jpayne@69: # let the receiving process steal the handle. jpayne@69: pid = os.getpid() jpayne@69: proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) jpayne@69: try: jpayne@69: self._handle = _winapi.DuplicateHandle( jpayne@69: _winapi.GetCurrentProcess(), jpayne@69: handle, proc, access, False, 0) jpayne@69: finally: jpayne@69: _winapi.CloseHandle(proc) jpayne@69: self._access = access jpayne@69: self._pid = pid jpayne@69: jpayne@69: def detach(self): jpayne@69: '''Get the handle. This should only be called once.''' jpayne@69: # retrieve handle from process which currently owns it jpayne@69: if self._pid == os.getpid(): jpayne@69: # The handle has already been duplicated for this process. jpayne@69: return self._handle jpayne@69: # We must steal the handle from the process whose pid is self._pid. jpayne@69: proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, jpayne@69: self._pid) jpayne@69: try: jpayne@69: return _winapi.DuplicateHandle( jpayne@69: proc, self._handle, _winapi.GetCurrentProcess(), jpayne@69: self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) jpayne@69: finally: jpayne@69: _winapi.CloseHandle(proc) jpayne@69: jpayne@69: else: jpayne@69: # Unix jpayne@69: __all__ += ['DupFd', 'sendfds', 'recvfds'] jpayne@69: import array jpayne@69: jpayne@69: # On MacOSX we should acknowledge receipt of fds -- see Issue14669 jpayne@69: ACKNOWLEDGE = sys.platform == 'darwin' jpayne@69: jpayne@69: def sendfds(sock, fds): jpayne@69: '''Send an array of fds over an AF_UNIX socket.''' jpayne@69: fds = array.array('i', fds) jpayne@69: msg = bytes([len(fds) % 256]) jpayne@69: sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) jpayne@69: if ACKNOWLEDGE and sock.recv(1) != b'A': jpayne@69: raise RuntimeError('did not receive acknowledgement of fd') jpayne@69: jpayne@69: def recvfds(sock, size): jpayne@69: '''Receive an array of fds over an AF_UNIX socket.''' jpayne@69: a = array.array('i') jpayne@69: bytes_size = a.itemsize * size jpayne@69: msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) jpayne@69: if not msg and not ancdata: jpayne@69: raise EOFError jpayne@69: try: jpayne@69: if ACKNOWLEDGE: jpayne@69: sock.send(b'A') jpayne@69: if len(ancdata) != 1: jpayne@69: raise RuntimeError('received %d items of ancdata' % jpayne@69: len(ancdata)) jpayne@69: cmsg_level, cmsg_type, cmsg_data = ancdata[0] jpayne@69: if (cmsg_level == socket.SOL_SOCKET and jpayne@69: cmsg_type == socket.SCM_RIGHTS): jpayne@69: if len(cmsg_data) % a.itemsize != 0: jpayne@69: raise ValueError jpayne@69: a.frombytes(cmsg_data) jpayne@69: if len(a) % 256 != msg[0]: jpayne@69: raise AssertionError( jpayne@69: "Len is {0:n} but msg[0] is {1!r}".format( jpayne@69: len(a), msg[0])) jpayne@69: return list(a) jpayne@69: except (ValueError, IndexError): jpayne@69: pass jpayne@69: raise RuntimeError('Invalid data received') jpayne@69: jpayne@69: def send_handle(conn, handle, destination_pid): jpayne@69: '''Send a handle over a local connection.''' jpayne@69: with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: jpayne@69: sendfds(s, [handle]) jpayne@69: jpayne@69: def recv_handle(conn): jpayne@69: '''Receive a handle over a local connection.''' jpayne@69: with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: jpayne@69: return recvfds(s, 1)[0] jpayne@69: jpayne@69: def DupFd(fd): jpayne@69: '''Return a wrapper for an fd.''' jpayne@69: popen_obj = context.get_spawning_popen() jpayne@69: if popen_obj is not None: jpayne@69: return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) jpayne@69: elif HAVE_SEND_HANDLE: jpayne@69: from . import resource_sharer jpayne@69: return resource_sharer.DupFd(fd) jpayne@69: else: jpayne@69: raise ValueError('SCM_RIGHTS appears not to be available') jpayne@69: jpayne@69: # jpayne@69: # Try making some callable types picklable jpayne@69: # jpayne@69: jpayne@69: def _reduce_method(m): jpayne@69: if m.__self__ is None: jpayne@69: return getattr, (m.__class__, m.__func__.__name__) jpayne@69: else: jpayne@69: return getattr, (m.__self__, m.__func__.__name__) jpayne@69: class _C: jpayne@69: def f(self): jpayne@69: pass jpayne@69: register(type(_C().f), _reduce_method) jpayne@69: jpayne@69: jpayne@69: def _reduce_method_descriptor(m): jpayne@69: return getattr, (m.__objclass__, m.__name__) jpayne@69: register(type(list.append), _reduce_method_descriptor) jpayne@69: register(type(int.__add__), _reduce_method_descriptor) jpayne@69: jpayne@69: jpayne@69: def _reduce_partial(p): jpayne@69: return _rebuild_partial, (p.func, p.args, p.keywords or {}) jpayne@69: def _rebuild_partial(func, args, keywords): jpayne@69: return functools.partial(func, *args, **keywords) jpayne@69: register(functools.partial, _reduce_partial) jpayne@69: jpayne@69: # jpayne@69: # Make sockets picklable jpayne@69: # jpayne@69: jpayne@69: if sys.platform == 'win32': jpayne@69: def _reduce_socket(s): jpayne@69: from .resource_sharer import DupSocket jpayne@69: return _rebuild_socket, (DupSocket(s),) jpayne@69: def _rebuild_socket(ds): jpayne@69: return ds.detach() jpayne@69: register(socket.socket, _reduce_socket) jpayne@69: jpayne@69: else: jpayne@69: def _reduce_socket(s): jpayne@69: df = DupFd(s.fileno()) jpayne@69: return _rebuild_socket, (df, s.family, s.type, s.proto) jpayne@69: def _rebuild_socket(df, family, type, proto): jpayne@69: fd = df.detach() jpayne@69: return socket.socket(family, type, proto, fileno=fd) jpayne@69: register(socket.socket, _reduce_socket) jpayne@69: jpayne@69: jpayne@69: class AbstractReducer(metaclass=ABCMeta): jpayne@69: '''Abstract base class for use in implementing a Reduction class jpayne@69: suitable for use in replacing the standard reduction mechanism jpayne@69: used in multiprocessing.''' jpayne@69: ForkingPickler = ForkingPickler jpayne@69: register = register jpayne@69: dump = dump jpayne@69: send_handle = send_handle jpayne@69: recv_handle = recv_handle jpayne@69: jpayne@69: if sys.platform == 'win32': jpayne@69: steal_handle = steal_handle jpayne@69: duplicate = duplicate jpayne@69: DupHandle = DupHandle jpayne@69: else: jpayne@69: sendfds = sendfds jpayne@69: recvfds = recvfds jpayne@69: DupFd = DupFd jpayne@69: jpayne@69: _reduce_method = _reduce_method jpayne@69: _reduce_method_descriptor = _reduce_method_descriptor jpayne@69: _rebuild_partial = _rebuild_partial jpayne@69: _reduce_socket = _reduce_socket jpayne@69: _rebuild_socket = _rebuild_socket jpayne@69: jpayne@69: def __init__(self, *args): jpayne@69: register(type(_C().f), _reduce_method) jpayne@69: register(type(list.append), _reduce_method_descriptor) jpayne@69: register(type(int.__add__), _reduce_method_descriptor) jpayne@69: register(functools.partial, _reduce_partial) jpayne@69: register(socket.socket, _reduce_socket)