jpayne@69
|
1 #
|
jpayne@69
|
2 # Module providing manager classes for dealing
|
jpayne@69
|
3 # with shared objects
|
jpayne@69
|
4 #
|
jpayne@69
|
5 # multiprocessing/managers.py
|
jpayne@69
|
6 #
|
jpayne@69
|
7 # Copyright (c) 2006-2008, R Oudkerk
|
jpayne@69
|
8 # Licensed to PSF under a Contributor Agreement.
|
jpayne@69
|
9 #
|
jpayne@69
|
10
|
jpayne@69
|
11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
|
jpayne@69
|
12 'SharedMemoryManager' ]
|
jpayne@69
|
13
|
jpayne@69
|
14 #
|
jpayne@69
|
15 # Imports
|
jpayne@69
|
16 #
|
jpayne@69
|
17
|
jpayne@69
|
18 import sys
|
jpayne@69
|
19 import threading
|
jpayne@69
|
20 import signal
|
jpayne@69
|
21 import array
|
jpayne@69
|
22 import queue
|
jpayne@69
|
23 import time
|
jpayne@69
|
24 import os
|
jpayne@69
|
25 from os import getpid
|
jpayne@69
|
26
|
jpayne@69
|
27 from traceback import format_exc
|
jpayne@69
|
28
|
jpayne@69
|
29 from . import connection
|
jpayne@69
|
30 from .context import reduction, get_spawning_popen, ProcessError
|
jpayne@69
|
31 from . import pool
|
jpayne@69
|
32 from . import process
|
jpayne@69
|
33 from . import util
|
jpayne@69
|
34 from . import get_context
|
jpayne@69
|
35 try:
|
jpayne@69
|
36 from . import shared_memory
|
jpayne@69
|
37 HAS_SHMEM = True
|
jpayne@69
|
38 except ImportError:
|
jpayne@69
|
39 HAS_SHMEM = False
|
jpayne@69
|
40
|
jpayne@69
|
41 #
|
jpayne@69
|
42 # Register some things for pickling
|
jpayne@69
|
43 #
|
jpayne@69
|
44
|
jpayne@69
|
45 def reduce_array(a):
|
jpayne@69
|
46 return array.array, (a.typecode, a.tobytes())
|
jpayne@69
|
47 reduction.register(array.array, reduce_array)
|
jpayne@69
|
48
|
jpayne@69
|
49 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
|
jpayne@69
|
50 if view_types[0] is not list: # only needed in Py3.0
|
jpayne@69
|
51 def rebuild_as_list(obj):
|
jpayne@69
|
52 return list, (list(obj),)
|
jpayne@69
|
53 for view_type in view_types:
|
jpayne@69
|
54 reduction.register(view_type, rebuild_as_list)
|
jpayne@69
|
55
|
jpayne@69
|
56 #
|
jpayne@69
|
57 # Type for identifying shared objects
|
jpayne@69
|
58 #
|
jpayne@69
|
59
|
jpayne@69
|
60 class Token(object):
|
jpayne@69
|
61 '''
|
jpayne@69
|
62 Type to uniquely indentify a shared object
|
jpayne@69
|
63 '''
|
jpayne@69
|
64 __slots__ = ('typeid', 'address', 'id')
|
jpayne@69
|
65
|
jpayne@69
|
66 def __init__(self, typeid, address, id):
|
jpayne@69
|
67 (self.typeid, self.address, self.id) = (typeid, address, id)
|
jpayne@69
|
68
|
jpayne@69
|
69 def __getstate__(self):
|
jpayne@69
|
70 return (self.typeid, self.address, self.id)
|
jpayne@69
|
71
|
jpayne@69
|
72 def __setstate__(self, state):
|
jpayne@69
|
73 (self.typeid, self.address, self.id) = state
|
jpayne@69
|
74
|
jpayne@69
|
75 def __repr__(self):
|
jpayne@69
|
76 return '%s(typeid=%r, address=%r, id=%r)' % \
|
jpayne@69
|
77 (self.__class__.__name__, self.typeid, self.address, self.id)
|
jpayne@69
|
78
|
jpayne@69
|
79 #
|
jpayne@69
|
80 # Function for communication with a manager's server process
|
jpayne@69
|
81 #
|
jpayne@69
|
82
|
jpayne@69
|
83 def dispatch(c, id, methodname, args=(), kwds={}):
|
jpayne@69
|
84 '''
|
jpayne@69
|
85 Send a message to manager using connection `c` and return response
|
jpayne@69
|
86 '''
|
jpayne@69
|
87 c.send((id, methodname, args, kwds))
|
jpayne@69
|
88 kind, result = c.recv()
|
jpayne@69
|
89 if kind == '#RETURN':
|
jpayne@69
|
90 return result
|
jpayne@69
|
91 raise convert_to_error(kind, result)
|
jpayne@69
|
92
|
jpayne@69
|
93 def convert_to_error(kind, result):
|
jpayne@69
|
94 if kind == '#ERROR':
|
jpayne@69
|
95 return result
|
jpayne@69
|
96 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
|
jpayne@69
|
97 if not isinstance(result, str):
|
jpayne@69
|
98 raise TypeError(
|
jpayne@69
|
99 "Result {0!r} (kind '{1}') type is {2}, not str".format(
|
jpayne@69
|
100 result, kind, type(result)))
|
jpayne@69
|
101 if kind == '#UNSERIALIZABLE':
|
jpayne@69
|
102 return RemoteError('Unserializable message: %s\n' % result)
|
jpayne@69
|
103 else:
|
jpayne@69
|
104 return RemoteError(result)
|
jpayne@69
|
105 else:
|
jpayne@69
|
106 return ValueError('Unrecognized message type {!r}'.format(kind))
|
jpayne@69
|
107
|
jpayne@69
|
108 class RemoteError(Exception):
|
jpayne@69
|
109 def __str__(self):
|
jpayne@69
|
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
|
jpayne@69
|
111
|
jpayne@69
|
112 #
|
jpayne@69
|
113 # Functions for finding the method names of an object
|
jpayne@69
|
114 #
|
jpayne@69
|
115
|
jpayne@69
|
116 def all_methods(obj):
|
jpayne@69
|
117 '''
|
jpayne@69
|
118 Return a list of names of methods of `obj`
|
jpayne@69
|
119 '''
|
jpayne@69
|
120 temp = []
|
jpayne@69
|
121 for name in dir(obj):
|
jpayne@69
|
122 func = getattr(obj, name)
|
jpayne@69
|
123 if callable(func):
|
jpayne@69
|
124 temp.append(name)
|
jpayne@69
|
125 return temp
|
jpayne@69
|
126
|
jpayne@69
|
127 def public_methods(obj):
|
jpayne@69
|
128 '''
|
jpayne@69
|
129 Return a list of names of methods of `obj` which do not start with '_'
|
jpayne@69
|
130 '''
|
jpayne@69
|
131 return [name for name in all_methods(obj) if name[0] != '_']
|
jpayne@69
|
132
|
jpayne@69
|
133 #
|
jpayne@69
|
134 # Server which is run in a process controlled by a manager
|
jpayne@69
|
135 #
|
jpayne@69
|
136
|
jpayne@69
|
137 class Server(object):
|
jpayne@69
|
138 '''
|
jpayne@69
|
139 Server class which runs in a process controlled by a manager object
|
jpayne@69
|
140 '''
|
jpayne@69
|
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
|
jpayne@69
|
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
|
jpayne@69
|
143
|
jpayne@69
|
144 def __init__(self, registry, address, authkey, serializer):
|
jpayne@69
|
145 if not isinstance(authkey, bytes):
|
jpayne@69
|
146 raise TypeError(
|
jpayne@69
|
147 "Authkey {0!r} is type {1!s}, not bytes".format(
|
jpayne@69
|
148 authkey, type(authkey)))
|
jpayne@69
|
149 self.registry = registry
|
jpayne@69
|
150 self.authkey = process.AuthenticationString(authkey)
|
jpayne@69
|
151 Listener, Client = listener_client[serializer]
|
jpayne@69
|
152
|
jpayne@69
|
153 # do authentication later
|
jpayne@69
|
154 self.listener = Listener(address=address, backlog=16)
|
jpayne@69
|
155 self.address = self.listener.address
|
jpayne@69
|
156
|
jpayne@69
|
157 self.id_to_obj = {'0': (None, ())}
|
jpayne@69
|
158 self.id_to_refcount = {}
|
jpayne@69
|
159 self.id_to_local_proxy_obj = {}
|
jpayne@69
|
160 self.mutex = threading.Lock()
|
jpayne@69
|
161
|
jpayne@69
|
162 def serve_forever(self):
|
jpayne@69
|
163 '''
|
jpayne@69
|
164 Run the server forever
|
jpayne@69
|
165 '''
|
jpayne@69
|
166 self.stop_event = threading.Event()
|
jpayne@69
|
167 process.current_process()._manager_server = self
|
jpayne@69
|
168 try:
|
jpayne@69
|
169 accepter = threading.Thread(target=self.accepter)
|
jpayne@69
|
170 accepter.daemon = True
|
jpayne@69
|
171 accepter.start()
|
jpayne@69
|
172 try:
|
jpayne@69
|
173 while not self.stop_event.is_set():
|
jpayne@69
|
174 self.stop_event.wait(1)
|
jpayne@69
|
175 except (KeyboardInterrupt, SystemExit):
|
jpayne@69
|
176 pass
|
jpayne@69
|
177 finally:
|
jpayne@69
|
178 if sys.stdout != sys.__stdout__: # what about stderr?
|
jpayne@69
|
179 util.debug('resetting stdout, stderr')
|
jpayne@69
|
180 sys.stdout = sys.__stdout__
|
jpayne@69
|
181 sys.stderr = sys.__stderr__
|
jpayne@69
|
182 sys.exit(0)
|
jpayne@69
|
183
|
jpayne@69
|
184 def accepter(self):
|
jpayne@69
|
185 while True:
|
jpayne@69
|
186 try:
|
jpayne@69
|
187 c = self.listener.accept()
|
jpayne@69
|
188 except OSError:
|
jpayne@69
|
189 continue
|
jpayne@69
|
190 t = threading.Thread(target=self.handle_request, args=(c,))
|
jpayne@69
|
191 t.daemon = True
|
jpayne@69
|
192 t.start()
|
jpayne@69
|
193
|
jpayne@69
|
194 def handle_request(self, c):
|
jpayne@69
|
195 '''
|
jpayne@69
|
196 Handle a new connection
|
jpayne@69
|
197 '''
|
jpayne@69
|
198 funcname = result = request = None
|
jpayne@69
|
199 try:
|
jpayne@69
|
200 connection.deliver_challenge(c, self.authkey)
|
jpayne@69
|
201 connection.answer_challenge(c, self.authkey)
|
jpayne@69
|
202 request = c.recv()
|
jpayne@69
|
203 ignore, funcname, args, kwds = request
|
jpayne@69
|
204 assert funcname in self.public, '%r unrecognized' % funcname
|
jpayne@69
|
205 func = getattr(self, funcname)
|
jpayne@69
|
206 except Exception:
|
jpayne@69
|
207 msg = ('#TRACEBACK', format_exc())
|
jpayne@69
|
208 else:
|
jpayne@69
|
209 try:
|
jpayne@69
|
210 result = func(c, *args, **kwds)
|
jpayne@69
|
211 except Exception:
|
jpayne@69
|
212 msg = ('#TRACEBACK', format_exc())
|
jpayne@69
|
213 else:
|
jpayne@69
|
214 msg = ('#RETURN', result)
|
jpayne@69
|
215 try:
|
jpayne@69
|
216 c.send(msg)
|
jpayne@69
|
217 except Exception as e:
|
jpayne@69
|
218 try:
|
jpayne@69
|
219 c.send(('#TRACEBACK', format_exc()))
|
jpayne@69
|
220 except Exception:
|
jpayne@69
|
221 pass
|
jpayne@69
|
222 util.info('Failure to send message: %r', msg)
|
jpayne@69
|
223 util.info(' ... request was %r', request)
|
jpayne@69
|
224 util.info(' ... exception was %r', e)
|
jpayne@69
|
225
|
jpayne@69
|
226 c.close()
|
jpayne@69
|
227
|
jpayne@69
|
228 def serve_client(self, conn):
|
jpayne@69
|
229 '''
|
jpayne@69
|
230 Handle requests from the proxies in a particular process/thread
|
jpayne@69
|
231 '''
|
jpayne@69
|
232 util.debug('starting server thread to service %r',
|
jpayne@69
|
233 threading.current_thread().name)
|
jpayne@69
|
234
|
jpayne@69
|
235 recv = conn.recv
|
jpayne@69
|
236 send = conn.send
|
jpayne@69
|
237 id_to_obj = self.id_to_obj
|
jpayne@69
|
238
|
jpayne@69
|
239 while not self.stop_event.is_set():
|
jpayne@69
|
240
|
jpayne@69
|
241 try:
|
jpayne@69
|
242 methodname = obj = None
|
jpayne@69
|
243 request = recv()
|
jpayne@69
|
244 ident, methodname, args, kwds = request
|
jpayne@69
|
245 try:
|
jpayne@69
|
246 obj, exposed, gettypeid = id_to_obj[ident]
|
jpayne@69
|
247 except KeyError as ke:
|
jpayne@69
|
248 try:
|
jpayne@69
|
249 obj, exposed, gettypeid = \
|
jpayne@69
|
250 self.id_to_local_proxy_obj[ident]
|
jpayne@69
|
251 except KeyError as second_ke:
|
jpayne@69
|
252 raise ke
|
jpayne@69
|
253
|
jpayne@69
|
254 if methodname not in exposed:
|
jpayne@69
|
255 raise AttributeError(
|
jpayne@69
|
256 'method %r of %r object is not in exposed=%r' %
|
jpayne@69
|
257 (methodname, type(obj), exposed)
|
jpayne@69
|
258 )
|
jpayne@69
|
259
|
jpayne@69
|
260 function = getattr(obj, methodname)
|
jpayne@69
|
261
|
jpayne@69
|
262 try:
|
jpayne@69
|
263 res = function(*args, **kwds)
|
jpayne@69
|
264 except Exception as e:
|
jpayne@69
|
265 msg = ('#ERROR', e)
|
jpayne@69
|
266 else:
|
jpayne@69
|
267 typeid = gettypeid and gettypeid.get(methodname, None)
|
jpayne@69
|
268 if typeid:
|
jpayne@69
|
269 rident, rexposed = self.create(conn, typeid, res)
|
jpayne@69
|
270 token = Token(typeid, self.address, rident)
|
jpayne@69
|
271 msg = ('#PROXY', (rexposed, token))
|
jpayne@69
|
272 else:
|
jpayne@69
|
273 msg = ('#RETURN', res)
|
jpayne@69
|
274
|
jpayne@69
|
275 except AttributeError:
|
jpayne@69
|
276 if methodname is None:
|
jpayne@69
|
277 msg = ('#TRACEBACK', format_exc())
|
jpayne@69
|
278 else:
|
jpayne@69
|
279 try:
|
jpayne@69
|
280 fallback_func = self.fallback_mapping[methodname]
|
jpayne@69
|
281 result = fallback_func(
|
jpayne@69
|
282 self, conn, ident, obj, *args, **kwds
|
jpayne@69
|
283 )
|
jpayne@69
|
284 msg = ('#RETURN', result)
|
jpayne@69
|
285 except Exception:
|
jpayne@69
|
286 msg = ('#TRACEBACK', format_exc())
|
jpayne@69
|
287
|
jpayne@69
|
288 except EOFError:
|
jpayne@69
|
289 util.debug('got EOF -- exiting thread serving %r',
|
jpayne@69
|
290 threading.current_thread().name)
|
jpayne@69
|
291 sys.exit(0)
|
jpayne@69
|
292
|
jpayne@69
|
293 except Exception:
|
jpayne@69
|
294 msg = ('#TRACEBACK', format_exc())
|
jpayne@69
|
295
|
jpayne@69
|
296 try:
|
jpayne@69
|
297 try:
|
jpayne@69
|
298 send(msg)
|
jpayne@69
|
299 except Exception as e:
|
jpayne@69
|
300 send(('#UNSERIALIZABLE', format_exc()))
|
jpayne@69
|
301 except Exception as e:
|
jpayne@69
|
302 util.info('exception in thread serving %r',
|
jpayne@69
|
303 threading.current_thread().name)
|
jpayne@69
|
304 util.info(' ... message was %r', msg)
|
jpayne@69
|
305 util.info(' ... exception was %r', e)
|
jpayne@69
|
306 conn.close()
|
jpayne@69
|
307 sys.exit(1)
|
jpayne@69
|
308
|
jpayne@69
|
309 def fallback_getvalue(self, conn, ident, obj):
|
jpayne@69
|
310 return obj
|
jpayne@69
|
311
|
jpayne@69
|
312 def fallback_str(self, conn, ident, obj):
|
jpayne@69
|
313 return str(obj)
|
jpayne@69
|
314
|
jpayne@69
|
315 def fallback_repr(self, conn, ident, obj):
|
jpayne@69
|
316 return repr(obj)
|
jpayne@69
|
317
|
jpayne@69
|
318 fallback_mapping = {
|
jpayne@69
|
319 '__str__':fallback_str,
|
jpayne@69
|
320 '__repr__':fallback_repr,
|
jpayne@69
|
321 '#GETVALUE':fallback_getvalue
|
jpayne@69
|
322 }
|
jpayne@69
|
323
|
jpayne@69
|
324 def dummy(self, c):
|
jpayne@69
|
325 pass
|
jpayne@69
|
326
|
jpayne@69
|
327 def debug_info(self, c):
|
jpayne@69
|
328 '''
|
jpayne@69
|
329 Return some info --- useful to spot problems with refcounting
|
jpayne@69
|
330 '''
|
jpayne@69
|
331 # Perhaps include debug info about 'c'?
|
jpayne@69
|
332 with self.mutex:
|
jpayne@69
|
333 result = []
|
jpayne@69
|
334 keys = list(self.id_to_refcount.keys())
|
jpayne@69
|
335 keys.sort()
|
jpayne@69
|
336 for ident in keys:
|
jpayne@69
|
337 if ident != '0':
|
jpayne@69
|
338 result.append(' %s: refcount=%s\n %s' %
|
jpayne@69
|
339 (ident, self.id_to_refcount[ident],
|
jpayne@69
|
340 str(self.id_to_obj[ident][0])[:75]))
|
jpayne@69
|
341 return '\n'.join(result)
|
jpayne@69
|
342
|
jpayne@69
|
343 def number_of_objects(self, c):
|
jpayne@69
|
344 '''
|
jpayne@69
|
345 Number of shared objects
|
jpayne@69
|
346 '''
|
jpayne@69
|
347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
|
jpayne@69
|
348 return len(self.id_to_refcount)
|
jpayne@69
|
349
|
jpayne@69
|
350 def shutdown(self, c):
|
jpayne@69
|
351 '''
|
jpayne@69
|
352 Shutdown this process
|
jpayne@69
|
353 '''
|
jpayne@69
|
354 try:
|
jpayne@69
|
355 util.debug('manager received shutdown message')
|
jpayne@69
|
356 c.send(('#RETURN', None))
|
jpayne@69
|
357 except:
|
jpayne@69
|
358 import traceback
|
jpayne@69
|
359 traceback.print_exc()
|
jpayne@69
|
360 finally:
|
jpayne@69
|
361 self.stop_event.set()
|
jpayne@69
|
362
|
jpayne@69
|
363 def create(*args, **kwds):
|
jpayne@69
|
364 '''
|
jpayne@69
|
365 Create a new shared object and return its id
|
jpayne@69
|
366 '''
|
jpayne@69
|
367 if len(args) >= 3:
|
jpayne@69
|
368 self, c, typeid, *args = args
|
jpayne@69
|
369 elif not args:
|
jpayne@69
|
370 raise TypeError("descriptor 'create' of 'Server' object "
|
jpayne@69
|
371 "needs an argument")
|
jpayne@69
|
372 else:
|
jpayne@69
|
373 if 'typeid' not in kwds:
|
jpayne@69
|
374 raise TypeError('create expected at least 2 positional '
|
jpayne@69
|
375 'arguments, got %d' % (len(args)-1))
|
jpayne@69
|
376 typeid = kwds.pop('typeid')
|
jpayne@69
|
377 if len(args) >= 2:
|
jpayne@69
|
378 self, c, *args = args
|
jpayne@69
|
379 import warnings
|
jpayne@69
|
380 warnings.warn("Passing 'typeid' as keyword argument is deprecated",
|
jpayne@69
|
381 DeprecationWarning, stacklevel=2)
|
jpayne@69
|
382 else:
|
jpayne@69
|
383 if 'c' not in kwds:
|
jpayne@69
|
384 raise TypeError('create expected at least 2 positional '
|
jpayne@69
|
385 'arguments, got %d' % (len(args)-1))
|
jpayne@69
|
386 c = kwds.pop('c')
|
jpayne@69
|
387 self, *args = args
|
jpayne@69
|
388 import warnings
|
jpayne@69
|
389 warnings.warn("Passing 'c' as keyword argument is deprecated",
|
jpayne@69
|
390 DeprecationWarning, stacklevel=2)
|
jpayne@69
|
391 args = tuple(args)
|
jpayne@69
|
392
|
jpayne@69
|
393 with self.mutex:
|
jpayne@69
|
394 callable, exposed, method_to_typeid, proxytype = \
|
jpayne@69
|
395 self.registry[typeid]
|
jpayne@69
|
396
|
jpayne@69
|
397 if callable is None:
|
jpayne@69
|
398 if kwds or (len(args) != 1):
|
jpayne@69
|
399 raise ValueError(
|
jpayne@69
|
400 "Without callable, must have one non-keyword argument")
|
jpayne@69
|
401 obj = args[0]
|
jpayne@69
|
402 else:
|
jpayne@69
|
403 obj = callable(*args, **kwds)
|
jpayne@69
|
404
|
jpayne@69
|
405 if exposed is None:
|
jpayne@69
|
406 exposed = public_methods(obj)
|
jpayne@69
|
407 if method_to_typeid is not None:
|
jpayne@69
|
408 if not isinstance(method_to_typeid, dict):
|
jpayne@69
|
409 raise TypeError(
|
jpayne@69
|
410 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
|
jpayne@69
|
411 method_to_typeid, type(method_to_typeid)))
|
jpayne@69
|
412 exposed = list(exposed) + list(method_to_typeid)
|
jpayne@69
|
413
|
jpayne@69
|
414 ident = '%x' % id(obj) # convert to string because xmlrpclib
|
jpayne@69
|
415 # only has 32 bit signed integers
|
jpayne@69
|
416 util.debug('%r callable returned object with id %r', typeid, ident)
|
jpayne@69
|
417
|
jpayne@69
|
418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
|
jpayne@69
|
419 if ident not in self.id_to_refcount:
|
jpayne@69
|
420 self.id_to_refcount[ident] = 0
|
jpayne@69
|
421
|
jpayne@69
|
422 self.incref(c, ident)
|
jpayne@69
|
423 return ident, tuple(exposed)
|
jpayne@69
|
424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)'
|
jpayne@69
|
425
|
jpayne@69
|
426 def get_methods(self, c, token):
|
jpayne@69
|
427 '''
|
jpayne@69
|
428 Return the methods of the shared object indicated by token
|
jpayne@69
|
429 '''
|
jpayne@69
|
430 return tuple(self.id_to_obj[token.id][1])
|
jpayne@69
|
431
|
jpayne@69
|
432 def accept_connection(self, c, name):
|
jpayne@69
|
433 '''
|
jpayne@69
|
434 Spawn a new thread to serve this connection
|
jpayne@69
|
435 '''
|
jpayne@69
|
436 threading.current_thread().name = name
|
jpayne@69
|
437 c.send(('#RETURN', None))
|
jpayne@69
|
438 self.serve_client(c)
|
jpayne@69
|
439
|
jpayne@69
|
440 def incref(self, c, ident):
|
jpayne@69
|
441 with self.mutex:
|
jpayne@69
|
442 try:
|
jpayne@69
|
443 self.id_to_refcount[ident] += 1
|
jpayne@69
|
444 except KeyError as ke:
|
jpayne@69
|
445 # If no external references exist but an internal (to the
|
jpayne@69
|
446 # manager) still does and a new external reference is created
|
jpayne@69
|
447 # from it, restore the manager's tracking of it from the
|
jpayne@69
|
448 # previously stashed internal ref.
|
jpayne@69
|
449 if ident in self.id_to_local_proxy_obj:
|
jpayne@69
|
450 self.id_to_refcount[ident] = 1
|
jpayne@69
|
451 self.id_to_obj[ident] = \
|
jpayne@69
|
452 self.id_to_local_proxy_obj[ident]
|
jpayne@69
|
453 obj, exposed, gettypeid = self.id_to_obj[ident]
|
jpayne@69
|
454 util.debug('Server re-enabled tracking & INCREF %r', ident)
|
jpayne@69
|
455 else:
|
jpayne@69
|
456 raise ke
|
jpayne@69
|
457
|
jpayne@69
|
458 def decref(self, c, ident):
|
jpayne@69
|
459 if ident not in self.id_to_refcount and \
|
jpayne@69
|
460 ident in self.id_to_local_proxy_obj:
|
jpayne@69
|
461 util.debug('Server DECREF skipping %r', ident)
|
jpayne@69
|
462 return
|
jpayne@69
|
463
|
jpayne@69
|
464 with self.mutex:
|
jpayne@69
|
465 if self.id_to_refcount[ident] <= 0:
|
jpayne@69
|
466 raise AssertionError(
|
jpayne@69
|
467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
|
jpayne@69
|
468 ident, self.id_to_obj[ident],
|
jpayne@69
|
469 self.id_to_refcount[ident]))
|
jpayne@69
|
470 self.id_to_refcount[ident] -= 1
|
jpayne@69
|
471 if self.id_to_refcount[ident] == 0:
|
jpayne@69
|
472 del self.id_to_refcount[ident]
|
jpayne@69
|
473
|
jpayne@69
|
474 if ident not in self.id_to_refcount:
|
jpayne@69
|
475 # Two-step process in case the object turns out to contain other
|
jpayne@69
|
476 # proxy objects (e.g. a managed list of managed lists).
|
jpayne@69
|
477 # Otherwise, deleting self.id_to_obj[ident] would trigger the
|
jpayne@69
|
478 # deleting of the stored value (another managed object) which would
|
jpayne@69
|
479 # in turn attempt to acquire the mutex that is already held here.
|
jpayne@69
|
480 self.id_to_obj[ident] = (None, (), None) # thread-safe
|
jpayne@69
|
481 util.debug('disposing of obj with id %r', ident)
|
jpayne@69
|
482 with self.mutex:
|
jpayne@69
|
483 del self.id_to_obj[ident]
|
jpayne@69
|
484
|
jpayne@69
|
485
|
jpayne@69
|
486 #
|
jpayne@69
|
487 # Class to represent state of a manager
|
jpayne@69
|
488 #
|
jpayne@69
|
489
|
jpayne@69
|
490 class State(object):
|
jpayne@69
|
491 __slots__ = ['value']
|
jpayne@69
|
492 INITIAL = 0
|
jpayne@69
|
493 STARTED = 1
|
jpayne@69
|
494 SHUTDOWN = 2
|
jpayne@69
|
495
|
jpayne@69
|
496 #
|
jpayne@69
|
497 # Mapping from serializer name to Listener and Client types
|
jpayne@69
|
498 #
|
jpayne@69
|
499
|
jpayne@69
|
500 listener_client = {
|
jpayne@69
|
501 'pickle' : (connection.Listener, connection.Client),
|
jpayne@69
|
502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
|
jpayne@69
|
503 }
|
jpayne@69
|
504
|
jpayne@69
|
505 #
|
jpayne@69
|
506 # Definition of BaseManager
|
jpayne@69
|
507 #
|
jpayne@69
|
508
|
jpayne@69
|
509 class BaseManager(object):
|
jpayne@69
|
510 '''
|
jpayne@69
|
511 Base class for managers
|
jpayne@69
|
512 '''
|
jpayne@69
|
513 _registry = {}
|
jpayne@69
|
514 _Server = Server
|
jpayne@69
|
515
|
jpayne@69
|
516 def __init__(self, address=None, authkey=None, serializer='pickle',
|
jpayne@69
|
517 ctx=None):
|
jpayne@69
|
518 if authkey is None:
|
jpayne@69
|
519 authkey = process.current_process().authkey
|
jpayne@69
|
520 self._address = address # XXX not final address if eg ('', 0)
|
jpayne@69
|
521 self._authkey = process.AuthenticationString(authkey)
|
jpayne@69
|
522 self._state = State()
|
jpayne@69
|
523 self._state.value = State.INITIAL
|
jpayne@69
|
524 self._serializer = serializer
|
jpayne@69
|
525 self._Listener, self._Client = listener_client[serializer]
|
jpayne@69
|
526 self._ctx = ctx or get_context()
|
jpayne@69
|
527
|
jpayne@69
|
528 def get_server(self):
|
jpayne@69
|
529 '''
|
jpayne@69
|
530 Return server object with serve_forever() method and address attribute
|
jpayne@69
|
531 '''
|
jpayne@69
|
532 if self._state.value != State.INITIAL:
|
jpayne@69
|
533 if self._state.value == State.STARTED:
|
jpayne@69
|
534 raise ProcessError("Already started server")
|
jpayne@69
|
535 elif self._state.value == State.SHUTDOWN:
|
jpayne@69
|
536 raise ProcessError("Manager has shut down")
|
jpayne@69
|
537 else:
|
jpayne@69
|
538 raise ProcessError(
|
jpayne@69
|
539 "Unknown state {!r}".format(self._state.value))
|
jpayne@69
|
540 return Server(self._registry, self._address,
|
jpayne@69
|
541 self._authkey, self._serializer)
|
jpayne@69
|
542
|
jpayne@69
|
543 def connect(self):
|
jpayne@69
|
544 '''
|
jpayne@69
|
545 Connect manager object to the server process
|
jpayne@69
|
546 '''
|
jpayne@69
|
547 Listener, Client = listener_client[self._serializer]
|
jpayne@69
|
548 conn = Client(self._address, authkey=self._authkey)
|
jpayne@69
|
549 dispatch(conn, None, 'dummy')
|
jpayne@69
|
550 self._state.value = State.STARTED
|
jpayne@69
|
551
|
jpayne@69
|
552 def start(self, initializer=None, initargs=()):
|
jpayne@69
|
553 '''
|
jpayne@69
|
554 Spawn a server process for this manager object
|
jpayne@69
|
555 '''
|
jpayne@69
|
556 if self._state.value != State.INITIAL:
|
jpayne@69
|
557 if self._state.value == State.STARTED:
|
jpayne@69
|
558 raise ProcessError("Already started server")
|
jpayne@69
|
559 elif self._state.value == State.SHUTDOWN:
|
jpayne@69
|
560 raise ProcessError("Manager has shut down")
|
jpayne@69
|
561 else:
|
jpayne@69
|
562 raise ProcessError(
|
jpayne@69
|
563 "Unknown state {!r}".format(self._state.value))
|
jpayne@69
|
564
|
jpayne@69
|
565 if initializer is not None and not callable(initializer):
|
jpayne@69
|
566 raise TypeError('initializer must be a callable')
|
jpayne@69
|
567
|
jpayne@69
|
568 # pipe over which we will retrieve address of server
|
jpayne@69
|
569 reader, writer = connection.Pipe(duplex=False)
|
jpayne@69
|
570
|
jpayne@69
|
571 # spawn process which runs a server
|
jpayne@69
|
572 self._process = self._ctx.Process(
|
jpayne@69
|
573 target=type(self)._run_server,
|
jpayne@69
|
574 args=(self._registry, self._address, self._authkey,
|
jpayne@69
|
575 self._serializer, writer, initializer, initargs),
|
jpayne@69
|
576 )
|
jpayne@69
|
577 ident = ':'.join(str(i) for i in self._process._identity)
|
jpayne@69
|
578 self._process.name = type(self).__name__ + '-' + ident
|
jpayne@69
|
579 self._process.start()
|
jpayne@69
|
580
|
jpayne@69
|
581 # get address of server
|
jpayne@69
|
582 writer.close()
|
jpayne@69
|
583 self._address = reader.recv()
|
jpayne@69
|
584 reader.close()
|
jpayne@69
|
585
|
jpayne@69
|
586 # register a finalizer
|
jpayne@69
|
587 self._state.value = State.STARTED
|
jpayne@69
|
588 self.shutdown = util.Finalize(
|
jpayne@69
|
589 self, type(self)._finalize_manager,
|
jpayne@69
|
590 args=(self._process, self._address, self._authkey,
|
jpayne@69
|
591 self._state, self._Client),
|
jpayne@69
|
592 exitpriority=0
|
jpayne@69
|
593 )
|
jpayne@69
|
594
|
jpayne@69
|
595 @classmethod
|
jpayne@69
|
596 def _run_server(cls, registry, address, authkey, serializer, writer,
|
jpayne@69
|
597 initializer=None, initargs=()):
|
jpayne@69
|
598 '''
|
jpayne@69
|
599 Create a server, report its address and run it
|
jpayne@69
|
600 '''
|
jpayne@69
|
601 # bpo-36368: protect server process from KeyboardInterrupt signals
|
jpayne@69
|
602 signal.signal(signal.SIGINT, signal.SIG_IGN)
|
jpayne@69
|
603
|
jpayne@69
|
604 if initializer is not None:
|
jpayne@69
|
605 initializer(*initargs)
|
jpayne@69
|
606
|
jpayne@69
|
607 # create server
|
jpayne@69
|
608 server = cls._Server(registry, address, authkey, serializer)
|
jpayne@69
|
609
|
jpayne@69
|
610 # inform parent process of the server's address
|
jpayne@69
|
611 writer.send(server.address)
|
jpayne@69
|
612 writer.close()
|
jpayne@69
|
613
|
jpayne@69
|
614 # run the manager
|
jpayne@69
|
615 util.info('manager serving at %r', server.address)
|
jpayne@69
|
616 server.serve_forever()
|
jpayne@69
|
617
|
jpayne@69
|
618 def _create(self, typeid, /, *args, **kwds):
|
jpayne@69
|
619 '''
|
jpayne@69
|
620 Create a new shared object; return the token and exposed tuple
|
jpayne@69
|
621 '''
|
jpayne@69
|
622 assert self._state.value == State.STARTED, 'server not yet started'
|
jpayne@69
|
623 conn = self._Client(self._address, authkey=self._authkey)
|
jpayne@69
|
624 try:
|
jpayne@69
|
625 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
|
jpayne@69
|
626 finally:
|
jpayne@69
|
627 conn.close()
|
jpayne@69
|
628 return Token(typeid, self._address, id), exposed
|
jpayne@69
|
629
|
jpayne@69
|
630 def join(self, timeout=None):
|
jpayne@69
|
631 '''
|
jpayne@69
|
632 Join the manager process (if it has been spawned)
|
jpayne@69
|
633 '''
|
jpayne@69
|
634 if self._process is not None:
|
jpayne@69
|
635 self._process.join(timeout)
|
jpayne@69
|
636 if not self._process.is_alive():
|
jpayne@69
|
637 self._process = None
|
jpayne@69
|
638
|
jpayne@69
|
639 def _debug_info(self):
|
jpayne@69
|
640 '''
|
jpayne@69
|
641 Return some info about the servers shared objects and connections
|
jpayne@69
|
642 '''
|
jpayne@69
|
643 conn = self._Client(self._address, authkey=self._authkey)
|
jpayne@69
|
644 try:
|
jpayne@69
|
645 return dispatch(conn, None, 'debug_info')
|
jpayne@69
|
646 finally:
|
jpayne@69
|
647 conn.close()
|
jpayne@69
|
648
|
jpayne@69
|
649 def _number_of_objects(self):
|
jpayne@69
|
650 '''
|
jpayne@69
|
651 Return the number of shared objects
|
jpayne@69
|
652 '''
|
jpayne@69
|
653 conn = self._Client(self._address, authkey=self._authkey)
|
jpayne@69
|
654 try:
|
jpayne@69
|
655 return dispatch(conn, None, 'number_of_objects')
|
jpayne@69
|
656 finally:
|
jpayne@69
|
657 conn.close()
|
jpayne@69
|
658
|
jpayne@69
|
659 def __enter__(self):
|
jpayne@69
|
660 if self._state.value == State.INITIAL:
|
jpayne@69
|
661 self.start()
|
jpayne@69
|
662 if self._state.value != State.STARTED:
|
jpayne@69
|
663 if self._state.value == State.INITIAL:
|
jpayne@69
|
664 raise ProcessError("Unable to start server")
|
jpayne@69
|
665 elif self._state.value == State.SHUTDOWN:
|
jpayne@69
|
666 raise ProcessError("Manager has shut down")
|
jpayne@69
|
667 else:
|
jpayne@69
|
668 raise ProcessError(
|
jpayne@69
|
669 "Unknown state {!r}".format(self._state.value))
|
jpayne@69
|
670 return self
|
jpayne@69
|
671
|
jpayne@69
|
672 def __exit__(self, exc_type, exc_val, exc_tb):
|
jpayne@69
|
673 self.shutdown()
|
jpayne@69
|
674
|
jpayne@69
|
675 @staticmethod
|
jpayne@69
|
676 def _finalize_manager(process, address, authkey, state, _Client):
|
jpayne@69
|
677 '''
|
jpayne@69
|
678 Shutdown the manager process; will be registered as a finalizer
|
jpayne@69
|
679 '''
|
jpayne@69
|
680 if process.is_alive():
|
jpayne@69
|
681 util.info('sending shutdown message to manager')
|
jpayne@69
|
682 try:
|
jpayne@69
|
683 conn = _Client(address, authkey=authkey)
|
jpayne@69
|
684 try:
|
jpayne@69
|
685 dispatch(conn, None, 'shutdown')
|
jpayne@69
|
686 finally:
|
jpayne@69
|
687 conn.close()
|
jpayne@69
|
688 except Exception:
|
jpayne@69
|
689 pass
|
jpayne@69
|
690
|
jpayne@69
|
691 process.join(timeout=1.0)
|
jpayne@69
|
692 if process.is_alive():
|
jpayne@69
|
693 util.info('manager still alive')
|
jpayne@69
|
694 if hasattr(process, 'terminate'):
|
jpayne@69
|
695 util.info('trying to `terminate()` manager process')
|
jpayne@69
|
696 process.terminate()
|
jpayne@69
|
697 process.join(timeout=0.1)
|
jpayne@69
|
698 if process.is_alive():
|
jpayne@69
|
699 util.info('manager still alive after terminate')
|
jpayne@69
|
700
|
jpayne@69
|
701 state.value = State.SHUTDOWN
|
jpayne@69
|
702 try:
|
jpayne@69
|
703 del BaseProxy._address_to_local[address]
|
jpayne@69
|
704 except KeyError:
|
jpayne@69
|
705 pass
|
jpayne@69
|
706
|
jpayne@69
|
707 @property
|
jpayne@69
|
708 def address(self):
|
jpayne@69
|
709 return self._address
|
jpayne@69
|
710
|
jpayne@69
|
711 @classmethod
|
jpayne@69
|
712 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
|
jpayne@69
|
713 method_to_typeid=None, create_method=True):
|
jpayne@69
|
714 '''
|
jpayne@69
|
715 Register a typeid with the manager type
|
jpayne@69
|
716 '''
|
jpayne@69
|
717 if '_registry' not in cls.__dict__:
|
jpayne@69
|
718 cls._registry = cls._registry.copy()
|
jpayne@69
|
719
|
jpayne@69
|
720 if proxytype is None:
|
jpayne@69
|
721 proxytype = AutoProxy
|
jpayne@69
|
722
|
jpayne@69
|
723 exposed = exposed or getattr(proxytype, '_exposed_', None)
|
jpayne@69
|
724
|
jpayne@69
|
725 method_to_typeid = method_to_typeid or \
|
jpayne@69
|
726 getattr(proxytype, '_method_to_typeid_', None)
|
jpayne@69
|
727
|
jpayne@69
|
728 if method_to_typeid:
|
jpayne@69
|
729 for key, value in list(method_to_typeid.items()): # isinstance?
|
jpayne@69
|
730 assert type(key) is str, '%r is not a string' % key
|
jpayne@69
|
731 assert type(value) is str, '%r is not a string' % value
|
jpayne@69
|
732
|
jpayne@69
|
733 cls._registry[typeid] = (
|
jpayne@69
|
734 callable, exposed, method_to_typeid, proxytype
|
jpayne@69
|
735 )
|
jpayne@69
|
736
|
jpayne@69
|
737 if create_method:
|
jpayne@69
|
738 def temp(self, /, *args, **kwds):
|
jpayne@69
|
739 util.debug('requesting creation of a shared %r object', typeid)
|
jpayne@69
|
740 token, exp = self._create(typeid, *args, **kwds)
|
jpayne@69
|
741 proxy = proxytype(
|
jpayne@69
|
742 token, self._serializer, manager=self,
|
jpayne@69
|
743 authkey=self._authkey, exposed=exp
|
jpayne@69
|
744 )
|
jpayne@69
|
745 conn = self._Client(token.address, authkey=self._authkey)
|
jpayne@69
|
746 dispatch(conn, None, 'decref', (token.id,))
|
jpayne@69
|
747 return proxy
|
jpayne@69
|
748 temp.__name__ = typeid
|
jpayne@69
|
749 setattr(cls, typeid, temp)
|
jpayne@69
|
750
|
jpayne@69
|
751 #
|
jpayne@69
|
752 # Subclass of set which get cleared after a fork
|
jpayne@69
|
753 #
|
jpayne@69
|
754
|
jpayne@69
|
755 class ProcessLocalSet(set):
|
jpayne@69
|
756 def __init__(self):
|
jpayne@69
|
757 util.register_after_fork(self, lambda obj: obj.clear())
|
jpayne@69
|
758 def __reduce__(self):
|
jpayne@69
|
759 return type(self), ()
|
jpayne@69
|
760
|
jpayne@69
|
761 #
|
jpayne@69
|
762 # Definition of BaseProxy
|
jpayne@69
|
763 #
|
jpayne@69
|
764
|
jpayne@69
|
765 class BaseProxy(object):
|
jpayne@69
|
766 '''
|
jpayne@69
|
767 A base for proxies of shared objects
|
jpayne@69
|
768 '''
|
jpayne@69
|
769 _address_to_local = {}
|
jpayne@69
|
770 _mutex = util.ForkAwareThreadLock()
|
jpayne@69
|
771
|
jpayne@69
|
772 def __init__(self, token, serializer, manager=None,
|
jpayne@69
|
773 authkey=None, exposed=None, incref=True, manager_owned=False):
|
jpayne@69
|
774 with BaseProxy._mutex:
|
jpayne@69
|
775 tls_idset = BaseProxy._address_to_local.get(token.address, None)
|
jpayne@69
|
776 if tls_idset is None:
|
jpayne@69
|
777 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
|
jpayne@69
|
778 BaseProxy._address_to_local[token.address] = tls_idset
|
jpayne@69
|
779
|
jpayne@69
|
780 # self._tls is used to record the connection used by this
|
jpayne@69
|
781 # thread to communicate with the manager at token.address
|
jpayne@69
|
782 self._tls = tls_idset[0]
|
jpayne@69
|
783
|
jpayne@69
|
784 # self._idset is used to record the identities of all shared
|
jpayne@69
|
785 # objects for which the current process owns references and
|
jpayne@69
|
786 # which are in the manager at token.address
|
jpayne@69
|
787 self._idset = tls_idset[1]
|
jpayne@69
|
788
|
jpayne@69
|
789 self._token = token
|
jpayne@69
|
790 self._id = self._token.id
|
jpayne@69
|
791 self._manager = manager
|
jpayne@69
|
792 self._serializer = serializer
|
jpayne@69
|
793 self._Client = listener_client[serializer][1]
|
jpayne@69
|
794
|
jpayne@69
|
795 # Should be set to True only when a proxy object is being created
|
jpayne@69
|
796 # on the manager server; primary use case: nested proxy objects.
|
jpayne@69
|
797 # RebuildProxy detects when a proxy is being created on the manager
|
jpayne@69
|
798 # and sets this value appropriately.
|
jpayne@69
|
799 self._owned_by_manager = manager_owned
|
jpayne@69
|
800
|
jpayne@69
|
801 if authkey is not None:
|
jpayne@69
|
802 self._authkey = process.AuthenticationString(authkey)
|
jpayne@69
|
803 elif self._manager is not None:
|
jpayne@69
|
804 self._authkey = self._manager._authkey
|
jpayne@69
|
805 else:
|
jpayne@69
|
806 self._authkey = process.current_process().authkey
|
jpayne@69
|
807
|
jpayne@69
|
808 if incref:
|
jpayne@69
|
809 self._incref()
|
jpayne@69
|
810
|
jpayne@69
|
811 util.register_after_fork(self, BaseProxy._after_fork)
|
jpayne@69
|
812
|
jpayne@69
|
813 def _connect(self):
|
jpayne@69
|
814 util.debug('making connection to manager')
|
jpayne@69
|
815 name = process.current_process().name
|
jpayne@69
|
816 if threading.current_thread().name != 'MainThread':
|
jpayne@69
|
817 name += '|' + threading.current_thread().name
|
jpayne@69
|
818 conn = self._Client(self._token.address, authkey=self._authkey)
|
jpayne@69
|
819 dispatch(conn, None, 'accept_connection', (name,))
|
jpayne@69
|
820 self._tls.connection = conn
|
jpayne@69
|
821
|
jpayne@69
|
822 def _callmethod(self, methodname, args=(), kwds={}):
|
jpayne@69
|
823 '''
|
jpayne@69
|
824 Try to call a method of the referrent and return a copy of the result
|
jpayne@69
|
825 '''
|
jpayne@69
|
826 try:
|
jpayne@69
|
827 conn = self._tls.connection
|
jpayne@69
|
828 except AttributeError:
|
jpayne@69
|
829 util.debug('thread %r does not own a connection',
|
jpayne@69
|
830 threading.current_thread().name)
|
jpayne@69
|
831 self._connect()
|
jpayne@69
|
832 conn = self._tls.connection
|
jpayne@69
|
833
|
jpayne@69
|
834 conn.send((self._id, methodname, args, kwds))
|
jpayne@69
|
835 kind, result = conn.recv()
|
jpayne@69
|
836
|
jpayne@69
|
837 if kind == '#RETURN':
|
jpayne@69
|
838 return result
|
jpayne@69
|
839 elif kind == '#PROXY':
|
jpayne@69
|
840 exposed, token = result
|
jpayne@69
|
841 proxytype = self._manager._registry[token.typeid][-1]
|
jpayne@69
|
842 token.address = self._token.address
|
jpayne@69
|
843 proxy = proxytype(
|
jpayne@69
|
844 token, self._serializer, manager=self._manager,
|
jpayne@69
|
845 authkey=self._authkey, exposed=exposed
|
jpayne@69
|
846 )
|
jpayne@69
|
847 conn = self._Client(token.address, authkey=self._authkey)
|
jpayne@69
|
848 dispatch(conn, None, 'decref', (token.id,))
|
jpayne@69
|
849 return proxy
|
jpayne@69
|
850 raise convert_to_error(kind, result)
|
jpayne@69
|
851
|
jpayne@69
|
852 def _getvalue(self):
|
jpayne@69
|
853 '''
|
jpayne@69
|
854 Get a copy of the value of the referent
|
jpayne@69
|
855 '''
|
jpayne@69
|
856 return self._callmethod('#GETVALUE')
|
jpayne@69
|
857
|
jpayne@69
|
858 def _incref(self):
|
jpayne@69
|
859 if self._owned_by_manager:
|
jpayne@69
|
860 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
|
jpayne@69
|
861 return
|
jpayne@69
|
862
|
jpayne@69
|
863 conn = self._Client(self._token.address, authkey=self._authkey)
|
jpayne@69
|
864 dispatch(conn, None, 'incref', (self._id,))
|
jpayne@69
|
865 util.debug('INCREF %r', self._token.id)
|
jpayne@69
|
866
|
jpayne@69
|
867 self._idset.add(self._id)
|
jpayne@69
|
868
|
jpayne@69
|
869 state = self._manager and self._manager._state
|
jpayne@69
|
870
|
jpayne@69
|
871 self._close = util.Finalize(
|
jpayne@69
|
872 self, BaseProxy._decref,
|
jpayne@69
|
873 args=(self._token, self._authkey, state,
|
jpayne@69
|
874 self._tls, self._idset, self._Client),
|
jpayne@69
|
875 exitpriority=10
|
jpayne@69
|
876 )
|
jpayne@69
|
877
|
jpayne@69
|
878 @staticmethod
|
jpayne@69
|
879 def _decref(token, authkey, state, tls, idset, _Client):
|
jpayne@69
|
880 idset.discard(token.id)
|
jpayne@69
|
881
|
jpayne@69
|
882 # check whether manager is still alive
|
jpayne@69
|
883 if state is None or state.value == State.STARTED:
|
jpayne@69
|
884 # tell manager this process no longer cares about referent
|
jpayne@69
|
885 try:
|
jpayne@69
|
886 util.debug('DECREF %r', token.id)
|
jpayne@69
|
887 conn = _Client(token.address, authkey=authkey)
|
jpayne@69
|
888 dispatch(conn, None, 'decref', (token.id,))
|
jpayne@69
|
889 except Exception as e:
|
jpayne@69
|
890 util.debug('... decref failed %s', e)
|
jpayne@69
|
891
|
jpayne@69
|
892 else:
|
jpayne@69
|
893 util.debug('DECREF %r -- manager already shutdown', token.id)
|
jpayne@69
|
894
|
jpayne@69
|
895 # check whether we can close this thread's connection because
|
jpayne@69
|
896 # the process owns no more references to objects for this manager
|
jpayne@69
|
897 if not idset and hasattr(tls, 'connection'):
|
jpayne@69
|
898 util.debug('thread %r has no more proxies so closing conn',
|
jpayne@69
|
899 threading.current_thread().name)
|
jpayne@69
|
900 tls.connection.close()
|
jpayne@69
|
901 del tls.connection
|
jpayne@69
|
902
|
jpayne@69
|
903 def _after_fork(self):
|
jpayne@69
|
904 self._manager = None
|
jpayne@69
|
905 try:
|
jpayne@69
|
906 self._incref()
|
jpayne@69
|
907 except Exception as e:
|
jpayne@69
|
908 # the proxy may just be for a manager which has shutdown
|
jpayne@69
|
909 util.info('incref failed: %s' % e)
|
jpayne@69
|
910
|
jpayne@69
|
911 def __reduce__(self):
|
jpayne@69
|
912 kwds = {}
|
jpayne@69
|
913 if get_spawning_popen() is not None:
|
jpayne@69
|
914 kwds['authkey'] = self._authkey
|
jpayne@69
|
915
|
jpayne@69
|
916 if getattr(self, '_isauto', False):
|
jpayne@69
|
917 kwds['exposed'] = self._exposed_
|
jpayne@69
|
918 return (RebuildProxy,
|
jpayne@69
|
919 (AutoProxy, self._token, self._serializer, kwds))
|
jpayne@69
|
920 else:
|
jpayne@69
|
921 return (RebuildProxy,
|
jpayne@69
|
922 (type(self), self._token, self._serializer, kwds))
|
jpayne@69
|
923
|
jpayne@69
|
924 def __deepcopy__(self, memo):
|
jpayne@69
|
925 return self._getvalue()
|
jpayne@69
|
926
|
jpayne@69
|
927 def __repr__(self):
|
jpayne@69
|
928 return '<%s object, typeid %r at %#x>' % \
|
jpayne@69
|
929 (type(self).__name__, self._token.typeid, id(self))
|
jpayne@69
|
930
|
jpayne@69
|
931 def __str__(self):
|
jpayne@69
|
932 '''
|
jpayne@69
|
933 Return representation of the referent (or a fall-back if that fails)
|
jpayne@69
|
934 '''
|
jpayne@69
|
935 try:
|
jpayne@69
|
936 return self._callmethod('__repr__')
|
jpayne@69
|
937 except Exception:
|
jpayne@69
|
938 return repr(self)[:-1] + "; '__str__()' failed>"
|
jpayne@69
|
939
|
jpayne@69
|
940 #
|
jpayne@69
|
941 # Function used for unpickling
|
jpayne@69
|
942 #
|
jpayne@69
|
943
|
jpayne@69
|
944 def RebuildProxy(func, token, serializer, kwds):
|
jpayne@69
|
945 '''
|
jpayne@69
|
946 Function used for unpickling proxy objects.
|
jpayne@69
|
947 '''
|
jpayne@69
|
948 server = getattr(process.current_process(), '_manager_server', None)
|
jpayne@69
|
949 if server and server.address == token.address:
|
jpayne@69
|
950 util.debug('Rebuild a proxy owned by manager, token=%r', token)
|
jpayne@69
|
951 kwds['manager_owned'] = True
|
jpayne@69
|
952 if token.id not in server.id_to_local_proxy_obj:
|
jpayne@69
|
953 server.id_to_local_proxy_obj[token.id] = \
|
jpayne@69
|
954 server.id_to_obj[token.id]
|
jpayne@69
|
955 incref = (
|
jpayne@69
|
956 kwds.pop('incref', True) and
|
jpayne@69
|
957 not getattr(process.current_process(), '_inheriting', False)
|
jpayne@69
|
958 )
|
jpayne@69
|
959 return func(token, serializer, incref=incref, **kwds)
|
jpayne@69
|
960
|
jpayne@69
|
961 #
|
jpayne@69
|
962 # Functions to create proxies and proxy types
|
jpayne@69
|
963 #
|
jpayne@69
|
964
|
jpayne@69
|
965 def MakeProxyType(name, exposed, _cache={}):
|
jpayne@69
|
966 '''
|
jpayne@69
|
967 Return a proxy type whose methods are given by `exposed`
|
jpayne@69
|
968 '''
|
jpayne@69
|
969 exposed = tuple(exposed)
|
jpayne@69
|
970 try:
|
jpayne@69
|
971 return _cache[(name, exposed)]
|
jpayne@69
|
972 except KeyError:
|
jpayne@69
|
973 pass
|
jpayne@69
|
974
|
jpayne@69
|
975 dic = {}
|
jpayne@69
|
976
|
jpayne@69
|
977 for meth in exposed:
|
jpayne@69
|
978 exec('''def %s(self, /, *args, **kwds):
|
jpayne@69
|
979 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
|
jpayne@69
|
980
|
jpayne@69
|
981 ProxyType = type(name, (BaseProxy,), dic)
|
jpayne@69
|
982 ProxyType._exposed_ = exposed
|
jpayne@69
|
983 _cache[(name, exposed)] = ProxyType
|
jpayne@69
|
984 return ProxyType
|
jpayne@69
|
985
|
jpayne@69
|
986
|
jpayne@69
|
987 def AutoProxy(token, serializer, manager=None, authkey=None,
|
jpayne@69
|
988 exposed=None, incref=True):
|
jpayne@69
|
989 '''
|
jpayne@69
|
990 Return an auto-proxy for `token`
|
jpayne@69
|
991 '''
|
jpayne@69
|
992 _Client = listener_client[serializer][1]
|
jpayne@69
|
993
|
jpayne@69
|
994 if exposed is None:
|
jpayne@69
|
995 conn = _Client(token.address, authkey=authkey)
|
jpayne@69
|
996 try:
|
jpayne@69
|
997 exposed = dispatch(conn, None, 'get_methods', (token,))
|
jpayne@69
|
998 finally:
|
jpayne@69
|
999 conn.close()
|
jpayne@69
|
1000
|
jpayne@69
|
1001 if authkey is None and manager is not None:
|
jpayne@69
|
1002 authkey = manager._authkey
|
jpayne@69
|
1003 if authkey is None:
|
jpayne@69
|
1004 authkey = process.current_process().authkey
|
jpayne@69
|
1005
|
jpayne@69
|
1006 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
|
jpayne@69
|
1007 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
|
jpayne@69
|
1008 incref=incref)
|
jpayne@69
|
1009 proxy._isauto = True
|
jpayne@69
|
1010 return proxy
|
jpayne@69
|
1011
|
jpayne@69
|
1012 #
|
jpayne@69
|
1013 # Types/callables which we will register with SyncManager
|
jpayne@69
|
1014 #
|
jpayne@69
|
1015
|
jpayne@69
|
1016 class Namespace(object):
|
jpayne@69
|
1017 def __init__(self, /, **kwds):
|
jpayne@69
|
1018 self.__dict__.update(kwds)
|
jpayne@69
|
1019 def __repr__(self):
|
jpayne@69
|
1020 items = list(self.__dict__.items())
|
jpayne@69
|
1021 temp = []
|
jpayne@69
|
1022 for name, value in items:
|
jpayne@69
|
1023 if not name.startswith('_'):
|
jpayne@69
|
1024 temp.append('%s=%r' % (name, value))
|
jpayne@69
|
1025 temp.sort()
|
jpayne@69
|
1026 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
|
jpayne@69
|
1027
|
jpayne@69
|
1028 class Value(object):
|
jpayne@69
|
1029 def __init__(self, typecode, value, lock=True):
|
jpayne@69
|
1030 self._typecode = typecode
|
jpayne@69
|
1031 self._value = value
|
jpayne@69
|
1032 def get(self):
|
jpayne@69
|
1033 return self._value
|
jpayne@69
|
1034 def set(self, value):
|
jpayne@69
|
1035 self._value = value
|
jpayne@69
|
1036 def __repr__(self):
|
jpayne@69
|
1037 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
|
jpayne@69
|
1038 value = property(get, set)
|
jpayne@69
|
1039
|
jpayne@69
|
1040 def Array(typecode, sequence, lock=True):
|
jpayne@69
|
1041 return array.array(typecode, sequence)
|
jpayne@69
|
1042
|
jpayne@69
|
1043 #
|
jpayne@69
|
1044 # Proxy types used by SyncManager
|
jpayne@69
|
1045 #
|
jpayne@69
|
1046
|
jpayne@69
|
1047 class IteratorProxy(BaseProxy):
|
jpayne@69
|
1048 _exposed_ = ('__next__', 'send', 'throw', 'close')
|
jpayne@69
|
1049 def __iter__(self):
|
jpayne@69
|
1050 return self
|
jpayne@69
|
1051 def __next__(self, *args):
|
jpayne@69
|
1052 return self._callmethod('__next__', args)
|
jpayne@69
|
1053 def send(self, *args):
|
jpayne@69
|
1054 return self._callmethod('send', args)
|
jpayne@69
|
1055 def throw(self, *args):
|
jpayne@69
|
1056 return self._callmethod('throw', args)
|
jpayne@69
|
1057 def close(self, *args):
|
jpayne@69
|
1058 return self._callmethod('close', args)
|
jpayne@69
|
1059
|
jpayne@69
|
1060
|
jpayne@69
|
1061 class AcquirerProxy(BaseProxy):
|
jpayne@69
|
1062 _exposed_ = ('acquire', 'release')
|
jpayne@69
|
1063 def acquire(self, blocking=True, timeout=None):
|
jpayne@69
|
1064 args = (blocking,) if timeout is None else (blocking, timeout)
|
jpayne@69
|
1065 return self._callmethod('acquire', args)
|
jpayne@69
|
1066 def release(self):
|
jpayne@69
|
1067 return self._callmethod('release')
|
jpayne@69
|
1068 def __enter__(self):
|
jpayne@69
|
1069 return self._callmethod('acquire')
|
jpayne@69
|
1070 def __exit__(self, exc_type, exc_val, exc_tb):
|
jpayne@69
|
1071 return self._callmethod('release')
|
jpayne@69
|
1072
|
jpayne@69
|
1073
|
jpayne@69
|
1074 class ConditionProxy(AcquirerProxy):
|
jpayne@69
|
1075 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
|
jpayne@69
|
1076 def wait(self, timeout=None):
|
jpayne@69
|
1077 return self._callmethod('wait', (timeout,))
|
jpayne@69
|
1078 def notify(self, n=1):
|
jpayne@69
|
1079 return self._callmethod('notify', (n,))
|
jpayne@69
|
1080 def notify_all(self):
|
jpayne@69
|
1081 return self._callmethod('notify_all')
|
jpayne@69
|
1082 def wait_for(self, predicate, timeout=None):
|
jpayne@69
|
1083 result = predicate()
|
jpayne@69
|
1084 if result:
|
jpayne@69
|
1085 return result
|
jpayne@69
|
1086 if timeout is not None:
|
jpayne@69
|
1087 endtime = time.monotonic() + timeout
|
jpayne@69
|
1088 else:
|
jpayne@69
|
1089 endtime = None
|
jpayne@69
|
1090 waittime = None
|
jpayne@69
|
1091 while not result:
|
jpayne@69
|
1092 if endtime is not None:
|
jpayne@69
|
1093 waittime = endtime - time.monotonic()
|
jpayne@69
|
1094 if waittime <= 0:
|
jpayne@69
|
1095 break
|
jpayne@69
|
1096 self.wait(waittime)
|
jpayne@69
|
1097 result = predicate()
|
jpayne@69
|
1098 return result
|
jpayne@69
|
1099
|
jpayne@69
|
1100
|
jpayne@69
|
1101 class EventProxy(BaseProxy):
|
jpayne@69
|
1102 _exposed_ = ('is_set', 'set', 'clear', 'wait')
|
jpayne@69
|
1103 def is_set(self):
|
jpayne@69
|
1104 return self._callmethod('is_set')
|
jpayne@69
|
1105 def set(self):
|
jpayne@69
|
1106 return self._callmethod('set')
|
jpayne@69
|
1107 def clear(self):
|
jpayne@69
|
1108 return self._callmethod('clear')
|
jpayne@69
|
1109 def wait(self, timeout=None):
|
jpayne@69
|
1110 return self._callmethod('wait', (timeout,))
|
jpayne@69
|
1111
|
jpayne@69
|
1112
|
jpayne@69
|
1113 class BarrierProxy(BaseProxy):
|
jpayne@69
|
1114 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
|
jpayne@69
|
1115 def wait(self, timeout=None):
|
jpayne@69
|
1116 return self._callmethod('wait', (timeout,))
|
jpayne@69
|
1117 def abort(self):
|
jpayne@69
|
1118 return self._callmethod('abort')
|
jpayne@69
|
1119 def reset(self):
|
jpayne@69
|
1120 return self._callmethod('reset')
|
jpayne@69
|
1121 @property
|
jpayne@69
|
1122 def parties(self):
|
jpayne@69
|
1123 return self._callmethod('__getattribute__', ('parties',))
|
jpayne@69
|
1124 @property
|
jpayne@69
|
1125 def n_waiting(self):
|
jpayne@69
|
1126 return self._callmethod('__getattribute__', ('n_waiting',))
|
jpayne@69
|
1127 @property
|
jpayne@69
|
1128 def broken(self):
|
jpayne@69
|
1129 return self._callmethod('__getattribute__', ('broken',))
|
jpayne@69
|
1130
|
jpayne@69
|
1131
|
jpayne@69
|
1132 class NamespaceProxy(BaseProxy):
|
jpayne@69
|
1133 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
|
jpayne@69
|
1134 def __getattr__(self, key):
|
jpayne@69
|
1135 if key[0] == '_':
|
jpayne@69
|
1136 return object.__getattribute__(self, key)
|
jpayne@69
|
1137 callmethod = object.__getattribute__(self, '_callmethod')
|
jpayne@69
|
1138 return callmethod('__getattribute__', (key,))
|
jpayne@69
|
1139 def __setattr__(self, key, value):
|
jpayne@69
|
1140 if key[0] == '_':
|
jpayne@69
|
1141 return object.__setattr__(self, key, value)
|
jpayne@69
|
1142 callmethod = object.__getattribute__(self, '_callmethod')
|
jpayne@69
|
1143 return callmethod('__setattr__', (key, value))
|
jpayne@69
|
1144 def __delattr__(self, key):
|
jpayne@69
|
1145 if key[0] == '_':
|
jpayne@69
|
1146 return object.__delattr__(self, key)
|
jpayne@69
|
1147 callmethod = object.__getattribute__(self, '_callmethod')
|
jpayne@69
|
1148 return callmethod('__delattr__', (key,))
|
jpayne@69
|
1149
|
jpayne@69
|
1150
|
jpayne@69
|
1151 class ValueProxy(BaseProxy):
|
jpayne@69
|
1152 _exposed_ = ('get', 'set')
|
jpayne@69
|
1153 def get(self):
|
jpayne@69
|
1154 return self._callmethod('get')
|
jpayne@69
|
1155 def set(self, value):
|
jpayne@69
|
1156 return self._callmethod('set', (value,))
|
jpayne@69
|
1157 value = property(get, set)
|
jpayne@69
|
1158
|
jpayne@69
|
1159
|
jpayne@69
|
1160 BaseListProxy = MakeProxyType('BaseListProxy', (
|
jpayne@69
|
1161 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
|
jpayne@69
|
1162 '__mul__', '__reversed__', '__rmul__', '__setitem__',
|
jpayne@69
|
1163 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
|
jpayne@69
|
1164 'reverse', 'sort', '__imul__'
|
jpayne@69
|
1165 ))
|
jpayne@69
|
1166 class ListProxy(BaseListProxy):
|
jpayne@69
|
1167 def __iadd__(self, value):
|
jpayne@69
|
1168 self._callmethod('extend', (value,))
|
jpayne@69
|
1169 return self
|
jpayne@69
|
1170 def __imul__(self, value):
|
jpayne@69
|
1171 self._callmethod('__imul__', (value,))
|
jpayne@69
|
1172 return self
|
jpayne@69
|
1173
|
jpayne@69
|
1174
|
jpayne@69
|
1175 DictProxy = MakeProxyType('DictProxy', (
|
jpayne@69
|
1176 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
|
jpayne@69
|
1177 '__setitem__', 'clear', 'copy', 'get', 'items',
|
jpayne@69
|
1178 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
|
jpayne@69
|
1179 ))
|
jpayne@69
|
1180 DictProxy._method_to_typeid_ = {
|
jpayne@69
|
1181 '__iter__': 'Iterator',
|
jpayne@69
|
1182 }
|
jpayne@69
|
1183
|
jpayne@69
|
1184
|
jpayne@69
|
1185 ArrayProxy = MakeProxyType('ArrayProxy', (
|
jpayne@69
|
1186 '__len__', '__getitem__', '__setitem__'
|
jpayne@69
|
1187 ))
|
jpayne@69
|
1188
|
jpayne@69
|
1189
|
jpayne@69
|
1190 BasePoolProxy = MakeProxyType('PoolProxy', (
|
jpayne@69
|
1191 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
|
jpayne@69
|
1192 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
|
jpayne@69
|
1193 ))
|
jpayne@69
|
1194 BasePoolProxy._method_to_typeid_ = {
|
jpayne@69
|
1195 'apply_async': 'AsyncResult',
|
jpayne@69
|
1196 'map_async': 'AsyncResult',
|
jpayne@69
|
1197 'starmap_async': 'AsyncResult',
|
jpayne@69
|
1198 'imap': 'Iterator',
|
jpayne@69
|
1199 'imap_unordered': 'Iterator'
|
jpayne@69
|
1200 }
|
jpayne@69
|
1201 class PoolProxy(BasePoolProxy):
|
jpayne@69
|
1202 def __enter__(self):
|
jpayne@69
|
1203 return self
|
jpayne@69
|
1204 def __exit__(self, exc_type, exc_val, exc_tb):
|
jpayne@69
|
1205 self.terminate()
|
jpayne@69
|
1206
|
jpayne@69
|
1207 #
|
jpayne@69
|
1208 # Definition of SyncManager
|
jpayne@69
|
1209 #
|
jpayne@69
|
1210
|
jpayne@69
|
1211 class SyncManager(BaseManager):
|
jpayne@69
|
1212 '''
|
jpayne@69
|
1213 Subclass of `BaseManager` which supports a number of shared object types.
|
jpayne@69
|
1214
|
jpayne@69
|
1215 The types registered are those intended for the synchronization
|
jpayne@69
|
1216 of threads, plus `dict`, `list` and `Namespace`.
|
jpayne@69
|
1217
|
jpayne@69
|
1218 The `multiprocessing.Manager()` function creates started instances of
|
jpayne@69
|
1219 this class.
|
jpayne@69
|
1220 '''
|
jpayne@69
|
1221
|
jpayne@69
|
1222 SyncManager.register('Queue', queue.Queue)
|
jpayne@69
|
1223 SyncManager.register('JoinableQueue', queue.Queue)
|
jpayne@69
|
1224 SyncManager.register('Event', threading.Event, EventProxy)
|
jpayne@69
|
1225 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
|
jpayne@69
|
1226 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
|
jpayne@69
|
1227 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
|
jpayne@69
|
1228 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
|
jpayne@69
|
1229 AcquirerProxy)
|
jpayne@69
|
1230 SyncManager.register('Condition', threading.Condition, ConditionProxy)
|
jpayne@69
|
1231 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
|
jpayne@69
|
1232 SyncManager.register('Pool', pool.Pool, PoolProxy)
|
jpayne@69
|
1233 SyncManager.register('list', list, ListProxy)
|
jpayne@69
|
1234 SyncManager.register('dict', dict, DictProxy)
|
jpayne@69
|
1235 SyncManager.register('Value', Value, ValueProxy)
|
jpayne@69
|
1236 SyncManager.register('Array', Array, ArrayProxy)
|
jpayne@69
|
1237 SyncManager.register('Namespace', Namespace, NamespaceProxy)
|
jpayne@69
|
1238
|
jpayne@69
|
1239 # types returned by methods of PoolProxy
|
jpayne@69
|
1240 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
|
jpayne@69
|
1241 SyncManager.register('AsyncResult', create_method=False)
|
jpayne@69
|
1242
|
jpayne@69
|
1243 #
|
jpayne@69
|
1244 # Definition of SharedMemoryManager and SharedMemoryServer
|
jpayne@69
|
1245 #
|
jpayne@69
|
1246
|
jpayne@69
|
1247 if HAS_SHMEM:
|
jpayne@69
|
1248 class _SharedMemoryTracker:
|
jpayne@69
|
1249 "Manages one or more shared memory segments."
|
jpayne@69
|
1250
|
jpayne@69
|
1251 def __init__(self, name, segment_names=[]):
|
jpayne@69
|
1252 self.shared_memory_context_name = name
|
jpayne@69
|
1253 self.segment_names = segment_names
|
jpayne@69
|
1254
|
jpayne@69
|
1255 def register_segment(self, segment_name):
|
jpayne@69
|
1256 "Adds the supplied shared memory block name to tracker."
|
jpayne@69
|
1257 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
|
jpayne@69
|
1258 self.segment_names.append(segment_name)
|
jpayne@69
|
1259
|
jpayne@69
|
1260 def destroy_segment(self, segment_name):
|
jpayne@69
|
1261 """Calls unlink() on the shared memory block with the supplied name
|
jpayne@69
|
1262 and removes it from the list of blocks being tracked."""
|
jpayne@69
|
1263 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
|
jpayne@69
|
1264 self.segment_names.remove(segment_name)
|
jpayne@69
|
1265 segment = shared_memory.SharedMemory(segment_name)
|
jpayne@69
|
1266 segment.close()
|
jpayne@69
|
1267 segment.unlink()
|
jpayne@69
|
1268
|
jpayne@69
|
1269 def unlink(self):
|
jpayne@69
|
1270 "Calls destroy_segment() on all tracked shared memory blocks."
|
jpayne@69
|
1271 for segment_name in self.segment_names[:]:
|
jpayne@69
|
1272 self.destroy_segment(segment_name)
|
jpayne@69
|
1273
|
jpayne@69
|
1274 def __del__(self):
|
jpayne@69
|
1275 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
|
jpayne@69
|
1276 self.unlink()
|
jpayne@69
|
1277
|
jpayne@69
|
1278 def __getstate__(self):
|
jpayne@69
|
1279 return (self.shared_memory_context_name, self.segment_names)
|
jpayne@69
|
1280
|
jpayne@69
|
1281 def __setstate__(self, state):
|
jpayne@69
|
1282 self.__init__(*state)
|
jpayne@69
|
1283
|
jpayne@69
|
1284
|
jpayne@69
|
1285 class SharedMemoryServer(Server):
|
jpayne@69
|
1286
|
jpayne@69
|
1287 public = Server.public + \
|
jpayne@69
|
1288 ['track_segment', 'release_segment', 'list_segments']
|
jpayne@69
|
1289
|
jpayne@69
|
1290 def __init__(self, *args, **kwargs):
|
jpayne@69
|
1291 Server.__init__(self, *args, **kwargs)
|
jpayne@69
|
1292 self.shared_memory_context = \
|
jpayne@69
|
1293 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
|
jpayne@69
|
1294 util.debug(f"SharedMemoryServer started by pid {getpid()}")
|
jpayne@69
|
1295
|
jpayne@69
|
1296 def create(*args, **kwargs):
|
jpayne@69
|
1297 """Create a new distributed-shared object (not backed by a shared
|
jpayne@69
|
1298 memory block) and return its id to be used in a Proxy Object."""
|
jpayne@69
|
1299 # Unless set up as a shared proxy, don't make shared_memory_context
|
jpayne@69
|
1300 # a standard part of kwargs. This makes things easier for supplying
|
jpayne@69
|
1301 # simple functions.
|
jpayne@69
|
1302 if len(args) >= 3:
|
jpayne@69
|
1303 typeod = args[2]
|
jpayne@69
|
1304 elif 'typeid' in kwargs:
|
jpayne@69
|
1305 typeid = kwargs['typeid']
|
jpayne@69
|
1306 elif not args:
|
jpayne@69
|
1307 raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
|
jpayne@69
|
1308 "object needs an argument")
|
jpayne@69
|
1309 else:
|
jpayne@69
|
1310 raise TypeError('create expected at least 2 positional '
|
jpayne@69
|
1311 'arguments, got %d' % (len(args)-1))
|
jpayne@69
|
1312 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
|
jpayne@69
|
1313 kwargs['shared_memory_context'] = self.shared_memory_context
|
jpayne@69
|
1314 return Server.create(*args, **kwargs)
|
jpayne@69
|
1315 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)'
|
jpayne@69
|
1316
|
jpayne@69
|
1317 def shutdown(self, c):
|
jpayne@69
|
1318 "Call unlink() on all tracked shared memory, terminate the Server."
|
jpayne@69
|
1319 self.shared_memory_context.unlink()
|
jpayne@69
|
1320 return Server.shutdown(self, c)
|
jpayne@69
|
1321
|
jpayne@69
|
1322 def track_segment(self, c, segment_name):
|
jpayne@69
|
1323 "Adds the supplied shared memory block name to Server's tracker."
|
jpayne@69
|
1324 self.shared_memory_context.register_segment(segment_name)
|
jpayne@69
|
1325
|
jpayne@69
|
1326 def release_segment(self, c, segment_name):
|
jpayne@69
|
1327 """Calls unlink() on the shared memory block with the supplied name
|
jpayne@69
|
1328 and removes it from the tracker instance inside the Server."""
|
jpayne@69
|
1329 self.shared_memory_context.destroy_segment(segment_name)
|
jpayne@69
|
1330
|
jpayne@69
|
1331 def list_segments(self, c):
|
jpayne@69
|
1332 """Returns a list of names of shared memory blocks that the Server
|
jpayne@69
|
1333 is currently tracking."""
|
jpayne@69
|
1334 return self.shared_memory_context.segment_names
|
jpayne@69
|
1335
|
jpayne@69
|
1336
|
jpayne@69
|
1337 class SharedMemoryManager(BaseManager):
|
jpayne@69
|
1338 """Like SyncManager but uses SharedMemoryServer instead of Server.
|
jpayne@69
|
1339
|
jpayne@69
|
1340 It provides methods for creating and returning SharedMemory instances
|
jpayne@69
|
1341 and for creating a list-like object (ShareableList) backed by shared
|
jpayne@69
|
1342 memory. It also provides methods that create and return Proxy Objects
|
jpayne@69
|
1343 that support synchronization across processes (i.e. multi-process-safe
|
jpayne@69
|
1344 locks and semaphores).
|
jpayne@69
|
1345 """
|
jpayne@69
|
1346
|
jpayne@69
|
1347 _Server = SharedMemoryServer
|
jpayne@69
|
1348
|
jpayne@69
|
1349 def __init__(self, *args, **kwargs):
|
jpayne@69
|
1350 if os.name == "posix":
|
jpayne@69
|
1351 # bpo-36867: Ensure the resource_tracker is running before
|
jpayne@69
|
1352 # launching the manager process, so that concurrent
|
jpayne@69
|
1353 # shared_memory manipulation both in the manager and in the
|
jpayne@69
|
1354 # current process does not create two resource_tracker
|
jpayne@69
|
1355 # processes.
|
jpayne@69
|
1356 from . import resource_tracker
|
jpayne@69
|
1357 resource_tracker.ensure_running()
|
jpayne@69
|
1358 BaseManager.__init__(self, *args, **kwargs)
|
jpayne@69
|
1359 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
|
jpayne@69
|
1360
|
jpayne@69
|
1361 def __del__(self):
|
jpayne@69
|
1362 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
|
jpayne@69
|
1363 pass
|
jpayne@69
|
1364
|
jpayne@69
|
1365 def get_server(self):
|
jpayne@69
|
1366 'Better than monkeypatching for now; merge into Server ultimately'
|
jpayne@69
|
1367 if self._state.value != State.INITIAL:
|
jpayne@69
|
1368 if self._state.value == State.STARTED:
|
jpayne@69
|
1369 raise ProcessError("Already started SharedMemoryServer")
|
jpayne@69
|
1370 elif self._state.value == State.SHUTDOWN:
|
jpayne@69
|
1371 raise ProcessError("SharedMemoryManager has shut down")
|
jpayne@69
|
1372 else:
|
jpayne@69
|
1373 raise ProcessError(
|
jpayne@69
|
1374 "Unknown state {!r}".format(self._state.value))
|
jpayne@69
|
1375 return self._Server(self._registry, self._address,
|
jpayne@69
|
1376 self._authkey, self._serializer)
|
jpayne@69
|
1377
|
jpayne@69
|
1378 def SharedMemory(self, size):
|
jpayne@69
|
1379 """Returns a new SharedMemory instance with the specified size in
|
jpayne@69
|
1380 bytes, to be tracked by the manager."""
|
jpayne@69
|
1381 with self._Client(self._address, authkey=self._authkey) as conn:
|
jpayne@69
|
1382 sms = shared_memory.SharedMemory(None, create=True, size=size)
|
jpayne@69
|
1383 try:
|
jpayne@69
|
1384 dispatch(conn, None, 'track_segment', (sms.name,))
|
jpayne@69
|
1385 except BaseException as e:
|
jpayne@69
|
1386 sms.unlink()
|
jpayne@69
|
1387 raise e
|
jpayne@69
|
1388 return sms
|
jpayne@69
|
1389
|
jpayne@69
|
1390 def ShareableList(self, sequence):
|
jpayne@69
|
1391 """Returns a new ShareableList instance populated with the values
|
jpayne@69
|
1392 from the input sequence, to be tracked by the manager."""
|
jpayne@69
|
1393 with self._Client(self._address, authkey=self._authkey) as conn:
|
jpayne@69
|
1394 sl = shared_memory.ShareableList(sequence)
|
jpayne@69
|
1395 try:
|
jpayne@69
|
1396 dispatch(conn, None, 'track_segment', (sl.shm.name,))
|
jpayne@69
|
1397 except BaseException as e:
|
jpayne@69
|
1398 sl.shm.unlink()
|
jpayne@69
|
1399 raise e
|
jpayne@69
|
1400 return sl
|