jpayne@69
|
1 #
|
jpayne@69
|
2 # Module which deals with pickling of objects.
|
jpayne@69
|
3 #
|
jpayne@69
|
4 # multiprocessing/reduction.py
|
jpayne@69
|
5 #
|
jpayne@69
|
6 # Copyright (c) 2006-2008, R Oudkerk
|
jpayne@69
|
7 # Licensed to PSF under a Contributor Agreement.
|
jpayne@69
|
8 #
|
jpayne@69
|
9
|
jpayne@69
|
10 from abc import ABCMeta
|
jpayne@69
|
11 import copyreg
|
jpayne@69
|
12 import functools
|
jpayne@69
|
13 import io
|
jpayne@69
|
14 import os
|
jpayne@69
|
15 import pickle
|
jpayne@69
|
16 import socket
|
jpayne@69
|
17 import sys
|
jpayne@69
|
18
|
jpayne@69
|
19 from . import context
|
jpayne@69
|
20
|
jpayne@69
|
21 __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
|
jpayne@69
|
22
|
jpayne@69
|
23
|
jpayne@69
|
24 HAVE_SEND_HANDLE = (sys.platform == 'win32' or
|
jpayne@69
|
25 (hasattr(socket, 'CMSG_LEN') and
|
jpayne@69
|
26 hasattr(socket, 'SCM_RIGHTS') and
|
jpayne@69
|
27 hasattr(socket.socket, 'sendmsg')))
|
jpayne@69
|
28
|
jpayne@69
|
29 #
|
jpayne@69
|
30 # Pickler subclass
|
jpayne@69
|
31 #
|
jpayne@69
|
32
|
jpayne@69
|
33 class ForkingPickler(pickle.Pickler):
|
jpayne@69
|
34 '''Pickler subclass used by multiprocessing.'''
|
jpayne@69
|
35 _extra_reducers = {}
|
jpayne@69
|
36 _copyreg_dispatch_table = copyreg.dispatch_table
|
jpayne@69
|
37
|
jpayne@69
|
38 def __init__(self, *args):
|
jpayne@69
|
39 super().__init__(*args)
|
jpayne@69
|
40 self.dispatch_table = self._copyreg_dispatch_table.copy()
|
jpayne@69
|
41 self.dispatch_table.update(self._extra_reducers)
|
jpayne@69
|
42
|
jpayne@69
|
43 @classmethod
|
jpayne@69
|
44 def register(cls, type, reduce):
|
jpayne@69
|
45 '''Register a reduce function for a type.'''
|
jpayne@69
|
46 cls._extra_reducers[type] = reduce
|
jpayne@69
|
47
|
jpayne@69
|
48 @classmethod
|
jpayne@69
|
49 def dumps(cls, obj, protocol=None):
|
jpayne@69
|
50 buf = io.BytesIO()
|
jpayne@69
|
51 cls(buf, protocol).dump(obj)
|
jpayne@69
|
52 return buf.getbuffer()
|
jpayne@69
|
53
|
jpayne@69
|
54 loads = pickle.loads
|
jpayne@69
|
55
|
jpayne@69
|
56 register = ForkingPickler.register
|
jpayne@69
|
57
|
jpayne@69
|
58 def dump(obj, file, protocol=None):
|
jpayne@69
|
59 '''Replacement for pickle.dump() using ForkingPickler.'''
|
jpayne@69
|
60 ForkingPickler(file, protocol).dump(obj)
|
jpayne@69
|
61
|
jpayne@69
|
62 #
|
jpayne@69
|
63 # Platform specific definitions
|
jpayne@69
|
64 #
|
jpayne@69
|
65
|
jpayne@69
|
66 if sys.platform == 'win32':
|
jpayne@69
|
67 # Windows
|
jpayne@69
|
68 __all__ += ['DupHandle', 'duplicate', 'steal_handle']
|
jpayne@69
|
69 import _winapi
|
jpayne@69
|
70
|
jpayne@69
|
71 def duplicate(handle, target_process=None, inheritable=False,
|
jpayne@69
|
72 *, source_process=None):
|
jpayne@69
|
73 '''Duplicate a handle. (target_process is a handle not a pid!)'''
|
jpayne@69
|
74 current_process = _winapi.GetCurrentProcess()
|
jpayne@69
|
75 if source_process is None:
|
jpayne@69
|
76 source_process = current_process
|
jpayne@69
|
77 if target_process is None:
|
jpayne@69
|
78 target_process = current_process
|
jpayne@69
|
79 return _winapi.DuplicateHandle(
|
jpayne@69
|
80 source_process, handle, target_process,
|
jpayne@69
|
81 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
|
jpayne@69
|
82
|
jpayne@69
|
83 def steal_handle(source_pid, handle):
|
jpayne@69
|
84 '''Steal a handle from process identified by source_pid.'''
|
jpayne@69
|
85 source_process_handle = _winapi.OpenProcess(
|
jpayne@69
|
86 _winapi.PROCESS_DUP_HANDLE, False, source_pid)
|
jpayne@69
|
87 try:
|
jpayne@69
|
88 return _winapi.DuplicateHandle(
|
jpayne@69
|
89 source_process_handle, handle,
|
jpayne@69
|
90 _winapi.GetCurrentProcess(), 0, False,
|
jpayne@69
|
91 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
|
jpayne@69
|
92 finally:
|
jpayne@69
|
93 _winapi.CloseHandle(source_process_handle)
|
jpayne@69
|
94
|
jpayne@69
|
95 def send_handle(conn, handle, destination_pid):
|
jpayne@69
|
96 '''Send a handle over a local connection.'''
|
jpayne@69
|
97 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
|
jpayne@69
|
98 conn.send(dh)
|
jpayne@69
|
99
|
jpayne@69
|
100 def recv_handle(conn):
|
jpayne@69
|
101 '''Receive a handle over a local connection.'''
|
jpayne@69
|
102 return conn.recv().detach()
|
jpayne@69
|
103
|
jpayne@69
|
104 class DupHandle(object):
|
jpayne@69
|
105 '''Picklable wrapper for a handle.'''
|
jpayne@69
|
106 def __init__(self, handle, access, pid=None):
|
jpayne@69
|
107 if pid is None:
|
jpayne@69
|
108 # We just duplicate the handle in the current process and
|
jpayne@69
|
109 # let the receiving process steal the handle.
|
jpayne@69
|
110 pid = os.getpid()
|
jpayne@69
|
111 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
|
jpayne@69
|
112 try:
|
jpayne@69
|
113 self._handle = _winapi.DuplicateHandle(
|
jpayne@69
|
114 _winapi.GetCurrentProcess(),
|
jpayne@69
|
115 handle, proc, access, False, 0)
|
jpayne@69
|
116 finally:
|
jpayne@69
|
117 _winapi.CloseHandle(proc)
|
jpayne@69
|
118 self._access = access
|
jpayne@69
|
119 self._pid = pid
|
jpayne@69
|
120
|
jpayne@69
|
121 def detach(self):
|
jpayne@69
|
122 '''Get the handle. This should only be called once.'''
|
jpayne@69
|
123 # retrieve handle from process which currently owns it
|
jpayne@69
|
124 if self._pid == os.getpid():
|
jpayne@69
|
125 # The handle has already been duplicated for this process.
|
jpayne@69
|
126 return self._handle
|
jpayne@69
|
127 # We must steal the handle from the process whose pid is self._pid.
|
jpayne@69
|
128 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
|
jpayne@69
|
129 self._pid)
|
jpayne@69
|
130 try:
|
jpayne@69
|
131 return _winapi.DuplicateHandle(
|
jpayne@69
|
132 proc, self._handle, _winapi.GetCurrentProcess(),
|
jpayne@69
|
133 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
|
jpayne@69
|
134 finally:
|
jpayne@69
|
135 _winapi.CloseHandle(proc)
|
jpayne@69
|
136
|
jpayne@69
|
137 else:
|
jpayne@69
|
138 # Unix
|
jpayne@69
|
139 __all__ += ['DupFd', 'sendfds', 'recvfds']
|
jpayne@69
|
140 import array
|
jpayne@69
|
141
|
jpayne@69
|
142 # On MacOSX we should acknowledge receipt of fds -- see Issue14669
|
jpayne@69
|
143 ACKNOWLEDGE = sys.platform == 'darwin'
|
jpayne@69
|
144
|
jpayne@69
|
145 def sendfds(sock, fds):
|
jpayne@69
|
146 '''Send an array of fds over an AF_UNIX socket.'''
|
jpayne@69
|
147 fds = array.array('i', fds)
|
jpayne@69
|
148 msg = bytes([len(fds) % 256])
|
jpayne@69
|
149 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
|
jpayne@69
|
150 if ACKNOWLEDGE and sock.recv(1) != b'A':
|
jpayne@69
|
151 raise RuntimeError('did not receive acknowledgement of fd')
|
jpayne@69
|
152
|
jpayne@69
|
153 def recvfds(sock, size):
|
jpayne@69
|
154 '''Receive an array of fds over an AF_UNIX socket.'''
|
jpayne@69
|
155 a = array.array('i')
|
jpayne@69
|
156 bytes_size = a.itemsize * size
|
jpayne@69
|
157 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
|
jpayne@69
|
158 if not msg and not ancdata:
|
jpayne@69
|
159 raise EOFError
|
jpayne@69
|
160 try:
|
jpayne@69
|
161 if ACKNOWLEDGE:
|
jpayne@69
|
162 sock.send(b'A')
|
jpayne@69
|
163 if len(ancdata) != 1:
|
jpayne@69
|
164 raise RuntimeError('received %d items of ancdata' %
|
jpayne@69
|
165 len(ancdata))
|
jpayne@69
|
166 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
|
jpayne@69
|
167 if (cmsg_level == socket.SOL_SOCKET and
|
jpayne@69
|
168 cmsg_type == socket.SCM_RIGHTS):
|
jpayne@69
|
169 if len(cmsg_data) % a.itemsize != 0:
|
jpayne@69
|
170 raise ValueError
|
jpayne@69
|
171 a.frombytes(cmsg_data)
|
jpayne@69
|
172 if len(a) % 256 != msg[0]:
|
jpayne@69
|
173 raise AssertionError(
|
jpayne@69
|
174 "Len is {0:n} but msg[0] is {1!r}".format(
|
jpayne@69
|
175 len(a), msg[0]))
|
jpayne@69
|
176 return list(a)
|
jpayne@69
|
177 except (ValueError, IndexError):
|
jpayne@69
|
178 pass
|
jpayne@69
|
179 raise RuntimeError('Invalid data received')
|
jpayne@69
|
180
|
jpayne@69
|
181 def send_handle(conn, handle, destination_pid):
|
jpayne@69
|
182 '''Send a handle over a local connection.'''
|
jpayne@69
|
183 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
jpayne@69
|
184 sendfds(s, [handle])
|
jpayne@69
|
185
|
jpayne@69
|
186 def recv_handle(conn):
|
jpayne@69
|
187 '''Receive a handle over a local connection.'''
|
jpayne@69
|
188 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
jpayne@69
|
189 return recvfds(s, 1)[0]
|
jpayne@69
|
190
|
jpayne@69
|
191 def DupFd(fd):
|
jpayne@69
|
192 '''Return a wrapper for an fd.'''
|
jpayne@69
|
193 popen_obj = context.get_spawning_popen()
|
jpayne@69
|
194 if popen_obj is not None:
|
jpayne@69
|
195 return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
|
jpayne@69
|
196 elif HAVE_SEND_HANDLE:
|
jpayne@69
|
197 from . import resource_sharer
|
jpayne@69
|
198 return resource_sharer.DupFd(fd)
|
jpayne@69
|
199 else:
|
jpayne@69
|
200 raise ValueError('SCM_RIGHTS appears not to be available')
|
jpayne@69
|
201
|
jpayne@69
|
202 #
|
jpayne@69
|
203 # Try making some callable types picklable
|
jpayne@69
|
204 #
|
jpayne@69
|
205
|
jpayne@69
|
206 def _reduce_method(m):
|
jpayne@69
|
207 if m.__self__ is None:
|
jpayne@69
|
208 return getattr, (m.__class__, m.__func__.__name__)
|
jpayne@69
|
209 else:
|
jpayne@69
|
210 return getattr, (m.__self__, m.__func__.__name__)
|
jpayne@69
|
211 class _C:
|
jpayne@69
|
212 def f(self):
|
jpayne@69
|
213 pass
|
jpayne@69
|
214 register(type(_C().f), _reduce_method)
|
jpayne@69
|
215
|
jpayne@69
|
216
|
jpayne@69
|
217 def _reduce_method_descriptor(m):
|
jpayne@69
|
218 return getattr, (m.__objclass__, m.__name__)
|
jpayne@69
|
219 register(type(list.append), _reduce_method_descriptor)
|
jpayne@69
|
220 register(type(int.__add__), _reduce_method_descriptor)
|
jpayne@69
|
221
|
jpayne@69
|
222
|
jpayne@69
|
223 def _reduce_partial(p):
|
jpayne@69
|
224 return _rebuild_partial, (p.func, p.args, p.keywords or {})
|
jpayne@69
|
225 def _rebuild_partial(func, args, keywords):
|
jpayne@69
|
226 return functools.partial(func, *args, **keywords)
|
jpayne@69
|
227 register(functools.partial, _reduce_partial)
|
jpayne@69
|
228
|
jpayne@69
|
229 #
|
jpayne@69
|
230 # Make sockets picklable
|
jpayne@69
|
231 #
|
jpayne@69
|
232
|
jpayne@69
|
233 if sys.platform == 'win32':
|
jpayne@69
|
234 def _reduce_socket(s):
|
jpayne@69
|
235 from .resource_sharer import DupSocket
|
jpayne@69
|
236 return _rebuild_socket, (DupSocket(s),)
|
jpayne@69
|
237 def _rebuild_socket(ds):
|
jpayne@69
|
238 return ds.detach()
|
jpayne@69
|
239 register(socket.socket, _reduce_socket)
|
jpayne@69
|
240
|
jpayne@69
|
241 else:
|
jpayne@69
|
242 def _reduce_socket(s):
|
jpayne@69
|
243 df = DupFd(s.fileno())
|
jpayne@69
|
244 return _rebuild_socket, (df, s.family, s.type, s.proto)
|
jpayne@69
|
245 def _rebuild_socket(df, family, type, proto):
|
jpayne@69
|
246 fd = df.detach()
|
jpayne@69
|
247 return socket.socket(family, type, proto, fileno=fd)
|
jpayne@69
|
248 register(socket.socket, _reduce_socket)
|
jpayne@69
|
249
|
jpayne@69
|
250
|
jpayne@69
|
251 class AbstractReducer(metaclass=ABCMeta):
|
jpayne@69
|
252 '''Abstract base class for use in implementing a Reduction class
|
jpayne@69
|
253 suitable for use in replacing the standard reduction mechanism
|
jpayne@69
|
254 used in multiprocessing.'''
|
jpayne@69
|
255 ForkingPickler = ForkingPickler
|
jpayne@69
|
256 register = register
|
jpayne@69
|
257 dump = dump
|
jpayne@69
|
258 send_handle = send_handle
|
jpayne@69
|
259 recv_handle = recv_handle
|
jpayne@69
|
260
|
jpayne@69
|
261 if sys.platform == 'win32':
|
jpayne@69
|
262 steal_handle = steal_handle
|
jpayne@69
|
263 duplicate = duplicate
|
jpayne@69
|
264 DupHandle = DupHandle
|
jpayne@69
|
265 else:
|
jpayne@69
|
266 sendfds = sendfds
|
jpayne@69
|
267 recvfds = recvfds
|
jpayne@69
|
268 DupFd = DupFd
|
jpayne@69
|
269
|
jpayne@69
|
270 _reduce_method = _reduce_method
|
jpayne@69
|
271 _reduce_method_descriptor = _reduce_method_descriptor
|
jpayne@69
|
272 _rebuild_partial = _rebuild_partial
|
jpayne@69
|
273 _reduce_socket = _reduce_socket
|
jpayne@69
|
274 _rebuild_socket = _rebuild_socket
|
jpayne@69
|
275
|
jpayne@69
|
276 def __init__(self, *args):
|
jpayne@69
|
277 register(type(_C().f), _reduce_method)
|
jpayne@69
|
278 register(type(list.append), _reduce_method_descriptor)
|
jpayne@69
|
279 register(type(int.__add__), _reduce_method_descriptor)
|
jpayne@69
|
280 register(functools.partial, _reduce_partial)
|
jpayne@69
|
281 register(socket.socket, _reduce_socket)
|