Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/managers.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 # | |
2 # Module providing manager classes for dealing | |
3 # with shared objects | |
4 # | |
5 # multiprocessing/managers.py | |
6 # | |
7 # Copyright (c) 2006-2008, R Oudkerk | |
8 # Licensed to PSF under a Contributor Agreement. | |
9 # | |
10 | |
11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token', | |
12 'SharedMemoryManager' ] | |
13 | |
14 # | |
15 # Imports | |
16 # | |
17 | |
18 import sys | |
19 import threading | |
20 import signal | |
21 import array | |
22 import queue | |
23 import time | |
24 import os | |
25 from os import getpid | |
26 | |
27 from traceback import format_exc | |
28 | |
29 from . import connection | |
30 from .context import reduction, get_spawning_popen, ProcessError | |
31 from . import pool | |
32 from . import process | |
33 from . import util | |
34 from . import get_context | |
35 try: | |
36 from . import shared_memory | |
37 HAS_SHMEM = True | |
38 except ImportError: | |
39 HAS_SHMEM = False | |
40 | |
41 # | |
42 # Register some things for pickling | |
43 # | |
44 | |
45 def reduce_array(a): | |
46 return array.array, (a.typecode, a.tobytes()) | |
47 reduction.register(array.array, reduce_array) | |
48 | |
49 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] | |
50 if view_types[0] is not list: # only needed in Py3.0 | |
51 def rebuild_as_list(obj): | |
52 return list, (list(obj),) | |
53 for view_type in view_types: | |
54 reduction.register(view_type, rebuild_as_list) | |
55 | |
56 # | |
57 # Type for identifying shared objects | |
58 # | |
59 | |
60 class Token(object): | |
61 ''' | |
62 Type to uniquely indentify a shared object | |
63 ''' | |
64 __slots__ = ('typeid', 'address', 'id') | |
65 | |
66 def __init__(self, typeid, address, id): | |
67 (self.typeid, self.address, self.id) = (typeid, address, id) | |
68 | |
69 def __getstate__(self): | |
70 return (self.typeid, self.address, self.id) | |
71 | |
72 def __setstate__(self, state): | |
73 (self.typeid, self.address, self.id) = state | |
74 | |
75 def __repr__(self): | |
76 return '%s(typeid=%r, address=%r, id=%r)' % \ | |
77 (self.__class__.__name__, self.typeid, self.address, self.id) | |
78 | |
79 # | |
80 # Function for communication with a manager's server process | |
81 # | |
82 | |
83 def dispatch(c, id, methodname, args=(), kwds={}): | |
84 ''' | |
85 Send a message to manager using connection `c` and return response | |
86 ''' | |
87 c.send((id, methodname, args, kwds)) | |
88 kind, result = c.recv() | |
89 if kind == '#RETURN': | |
90 return result | |
91 raise convert_to_error(kind, result) | |
92 | |
93 def convert_to_error(kind, result): | |
94 if kind == '#ERROR': | |
95 return result | |
96 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): | |
97 if not isinstance(result, str): | |
98 raise TypeError( | |
99 "Result {0!r} (kind '{1}') type is {2}, not str".format( | |
100 result, kind, type(result))) | |
101 if kind == '#UNSERIALIZABLE': | |
102 return RemoteError('Unserializable message: %s\n' % result) | |
103 else: | |
104 return RemoteError(result) | |
105 else: | |
106 return ValueError('Unrecognized message type {!r}'.format(kind)) | |
107 | |
108 class RemoteError(Exception): | |
109 def __str__(self): | |
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) | |
111 | |
112 # | |
113 # Functions for finding the method names of an object | |
114 # | |
115 | |
116 def all_methods(obj): | |
117 ''' | |
118 Return a list of names of methods of `obj` | |
119 ''' | |
120 temp = [] | |
121 for name in dir(obj): | |
122 func = getattr(obj, name) | |
123 if callable(func): | |
124 temp.append(name) | |
125 return temp | |
126 | |
127 def public_methods(obj): | |
128 ''' | |
129 Return a list of names of methods of `obj` which do not start with '_' | |
130 ''' | |
131 return [name for name in all_methods(obj) if name[0] != '_'] | |
132 | |
133 # | |
134 # Server which is run in a process controlled by a manager | |
135 # | |
136 | |
137 class Server(object): | |
138 ''' | |
139 Server class which runs in a process controlled by a manager object | |
140 ''' | |
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods', | |
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] | |
143 | |
144 def __init__(self, registry, address, authkey, serializer): | |
145 if not isinstance(authkey, bytes): | |
146 raise TypeError( | |
147 "Authkey {0!r} is type {1!s}, not bytes".format( | |
148 authkey, type(authkey))) | |
149 self.registry = registry | |
150 self.authkey = process.AuthenticationString(authkey) | |
151 Listener, Client = listener_client[serializer] | |
152 | |
153 # do authentication later | |
154 self.listener = Listener(address=address, backlog=16) | |
155 self.address = self.listener.address | |
156 | |
157 self.id_to_obj = {'0': (None, ())} | |
158 self.id_to_refcount = {} | |
159 self.id_to_local_proxy_obj = {} | |
160 self.mutex = threading.Lock() | |
161 | |
162 def serve_forever(self): | |
163 ''' | |
164 Run the server forever | |
165 ''' | |
166 self.stop_event = threading.Event() | |
167 process.current_process()._manager_server = self | |
168 try: | |
169 accepter = threading.Thread(target=self.accepter) | |
170 accepter.daemon = True | |
171 accepter.start() | |
172 try: | |
173 while not self.stop_event.is_set(): | |
174 self.stop_event.wait(1) | |
175 except (KeyboardInterrupt, SystemExit): | |
176 pass | |
177 finally: | |
178 if sys.stdout != sys.__stdout__: # what about stderr? | |
179 util.debug('resetting stdout, stderr') | |
180 sys.stdout = sys.__stdout__ | |
181 sys.stderr = sys.__stderr__ | |
182 sys.exit(0) | |
183 | |
184 def accepter(self): | |
185 while True: | |
186 try: | |
187 c = self.listener.accept() | |
188 except OSError: | |
189 continue | |
190 t = threading.Thread(target=self.handle_request, args=(c,)) | |
191 t.daemon = True | |
192 t.start() | |
193 | |
194 def handle_request(self, c): | |
195 ''' | |
196 Handle a new connection | |
197 ''' | |
198 funcname = result = request = None | |
199 try: | |
200 connection.deliver_challenge(c, self.authkey) | |
201 connection.answer_challenge(c, self.authkey) | |
202 request = c.recv() | |
203 ignore, funcname, args, kwds = request | |
204 assert funcname in self.public, '%r unrecognized' % funcname | |
205 func = getattr(self, funcname) | |
206 except Exception: | |
207 msg = ('#TRACEBACK', format_exc()) | |
208 else: | |
209 try: | |
210 result = func(c, *args, **kwds) | |
211 except Exception: | |
212 msg = ('#TRACEBACK', format_exc()) | |
213 else: | |
214 msg = ('#RETURN', result) | |
215 try: | |
216 c.send(msg) | |
217 except Exception as e: | |
218 try: | |
219 c.send(('#TRACEBACK', format_exc())) | |
220 except Exception: | |
221 pass | |
222 util.info('Failure to send message: %r', msg) | |
223 util.info(' ... request was %r', request) | |
224 util.info(' ... exception was %r', e) | |
225 | |
226 c.close() | |
227 | |
228 def serve_client(self, conn): | |
229 ''' | |
230 Handle requests from the proxies in a particular process/thread | |
231 ''' | |
232 util.debug('starting server thread to service %r', | |
233 threading.current_thread().name) | |
234 | |
235 recv = conn.recv | |
236 send = conn.send | |
237 id_to_obj = self.id_to_obj | |
238 | |
239 while not self.stop_event.is_set(): | |
240 | |
241 try: | |
242 methodname = obj = None | |
243 request = recv() | |
244 ident, methodname, args, kwds = request | |
245 try: | |
246 obj, exposed, gettypeid = id_to_obj[ident] | |
247 except KeyError as ke: | |
248 try: | |
249 obj, exposed, gettypeid = \ | |
250 self.id_to_local_proxy_obj[ident] | |
251 except KeyError as second_ke: | |
252 raise ke | |
253 | |
254 if methodname not in exposed: | |
255 raise AttributeError( | |
256 'method %r of %r object is not in exposed=%r' % | |
257 (methodname, type(obj), exposed) | |
258 ) | |
259 | |
260 function = getattr(obj, methodname) | |
261 | |
262 try: | |
263 res = function(*args, **kwds) | |
264 except Exception as e: | |
265 msg = ('#ERROR', e) | |
266 else: | |
267 typeid = gettypeid and gettypeid.get(methodname, None) | |
268 if typeid: | |
269 rident, rexposed = self.create(conn, typeid, res) | |
270 token = Token(typeid, self.address, rident) | |
271 msg = ('#PROXY', (rexposed, token)) | |
272 else: | |
273 msg = ('#RETURN', res) | |
274 | |
275 except AttributeError: | |
276 if methodname is None: | |
277 msg = ('#TRACEBACK', format_exc()) | |
278 else: | |
279 try: | |
280 fallback_func = self.fallback_mapping[methodname] | |
281 result = fallback_func( | |
282 self, conn, ident, obj, *args, **kwds | |
283 ) | |
284 msg = ('#RETURN', result) | |
285 except Exception: | |
286 msg = ('#TRACEBACK', format_exc()) | |
287 | |
288 except EOFError: | |
289 util.debug('got EOF -- exiting thread serving %r', | |
290 threading.current_thread().name) | |
291 sys.exit(0) | |
292 | |
293 except Exception: | |
294 msg = ('#TRACEBACK', format_exc()) | |
295 | |
296 try: | |
297 try: | |
298 send(msg) | |
299 except Exception as e: | |
300 send(('#UNSERIALIZABLE', format_exc())) | |
301 except Exception as e: | |
302 util.info('exception in thread serving %r', | |
303 threading.current_thread().name) | |
304 util.info(' ... message was %r', msg) | |
305 util.info(' ... exception was %r', e) | |
306 conn.close() | |
307 sys.exit(1) | |
308 | |
309 def fallback_getvalue(self, conn, ident, obj): | |
310 return obj | |
311 | |
312 def fallback_str(self, conn, ident, obj): | |
313 return str(obj) | |
314 | |
315 def fallback_repr(self, conn, ident, obj): | |
316 return repr(obj) | |
317 | |
318 fallback_mapping = { | |
319 '__str__':fallback_str, | |
320 '__repr__':fallback_repr, | |
321 '#GETVALUE':fallback_getvalue | |
322 } | |
323 | |
324 def dummy(self, c): | |
325 pass | |
326 | |
327 def debug_info(self, c): | |
328 ''' | |
329 Return some info --- useful to spot problems with refcounting | |
330 ''' | |
331 # Perhaps include debug info about 'c'? | |
332 with self.mutex: | |
333 result = [] | |
334 keys = list(self.id_to_refcount.keys()) | |
335 keys.sort() | |
336 for ident in keys: | |
337 if ident != '0': | |
338 result.append(' %s: refcount=%s\n %s' % | |
339 (ident, self.id_to_refcount[ident], | |
340 str(self.id_to_obj[ident][0])[:75])) | |
341 return '\n'.join(result) | |
342 | |
343 def number_of_objects(self, c): | |
344 ''' | |
345 Number of shared objects | |
346 ''' | |
347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' | |
348 return len(self.id_to_refcount) | |
349 | |
350 def shutdown(self, c): | |
351 ''' | |
352 Shutdown this process | |
353 ''' | |
354 try: | |
355 util.debug('manager received shutdown message') | |
356 c.send(('#RETURN', None)) | |
357 except: | |
358 import traceback | |
359 traceback.print_exc() | |
360 finally: | |
361 self.stop_event.set() | |
362 | |
363 def create(*args, **kwds): | |
364 ''' | |
365 Create a new shared object and return its id | |
366 ''' | |
367 if len(args) >= 3: | |
368 self, c, typeid, *args = args | |
369 elif not args: | |
370 raise TypeError("descriptor 'create' of 'Server' object " | |
371 "needs an argument") | |
372 else: | |
373 if 'typeid' not in kwds: | |
374 raise TypeError('create expected at least 2 positional ' | |
375 'arguments, got %d' % (len(args)-1)) | |
376 typeid = kwds.pop('typeid') | |
377 if len(args) >= 2: | |
378 self, c, *args = args | |
379 import warnings | |
380 warnings.warn("Passing 'typeid' as keyword argument is deprecated", | |
381 DeprecationWarning, stacklevel=2) | |
382 else: | |
383 if 'c' not in kwds: | |
384 raise TypeError('create expected at least 2 positional ' | |
385 'arguments, got %d' % (len(args)-1)) | |
386 c = kwds.pop('c') | |
387 self, *args = args | |
388 import warnings | |
389 warnings.warn("Passing 'c' as keyword argument is deprecated", | |
390 DeprecationWarning, stacklevel=2) | |
391 args = tuple(args) | |
392 | |
393 with self.mutex: | |
394 callable, exposed, method_to_typeid, proxytype = \ | |
395 self.registry[typeid] | |
396 | |
397 if callable is None: | |
398 if kwds or (len(args) != 1): | |
399 raise ValueError( | |
400 "Without callable, must have one non-keyword argument") | |
401 obj = args[0] | |
402 else: | |
403 obj = callable(*args, **kwds) | |
404 | |
405 if exposed is None: | |
406 exposed = public_methods(obj) | |
407 if method_to_typeid is not None: | |
408 if not isinstance(method_to_typeid, dict): | |
409 raise TypeError( | |
410 "Method_to_typeid {0!r}: type {1!s}, not dict".format( | |
411 method_to_typeid, type(method_to_typeid))) | |
412 exposed = list(exposed) + list(method_to_typeid) | |
413 | |
414 ident = '%x' % id(obj) # convert to string because xmlrpclib | |
415 # only has 32 bit signed integers | |
416 util.debug('%r callable returned object with id %r', typeid, ident) | |
417 | |
418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) | |
419 if ident not in self.id_to_refcount: | |
420 self.id_to_refcount[ident] = 0 | |
421 | |
422 self.incref(c, ident) | |
423 return ident, tuple(exposed) | |
424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)' | |
425 | |
426 def get_methods(self, c, token): | |
427 ''' | |
428 Return the methods of the shared object indicated by token | |
429 ''' | |
430 return tuple(self.id_to_obj[token.id][1]) | |
431 | |
432 def accept_connection(self, c, name): | |
433 ''' | |
434 Spawn a new thread to serve this connection | |
435 ''' | |
436 threading.current_thread().name = name | |
437 c.send(('#RETURN', None)) | |
438 self.serve_client(c) | |
439 | |
440 def incref(self, c, ident): | |
441 with self.mutex: | |
442 try: | |
443 self.id_to_refcount[ident] += 1 | |
444 except KeyError as ke: | |
445 # If no external references exist but an internal (to the | |
446 # manager) still does and a new external reference is created | |
447 # from it, restore the manager's tracking of it from the | |
448 # previously stashed internal ref. | |
449 if ident in self.id_to_local_proxy_obj: | |
450 self.id_to_refcount[ident] = 1 | |
451 self.id_to_obj[ident] = \ | |
452 self.id_to_local_proxy_obj[ident] | |
453 obj, exposed, gettypeid = self.id_to_obj[ident] | |
454 util.debug('Server re-enabled tracking & INCREF %r', ident) | |
455 else: | |
456 raise ke | |
457 | |
458 def decref(self, c, ident): | |
459 if ident not in self.id_to_refcount and \ | |
460 ident in self.id_to_local_proxy_obj: | |
461 util.debug('Server DECREF skipping %r', ident) | |
462 return | |
463 | |
464 with self.mutex: | |
465 if self.id_to_refcount[ident] <= 0: | |
466 raise AssertionError( | |
467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( | |
468 ident, self.id_to_obj[ident], | |
469 self.id_to_refcount[ident])) | |
470 self.id_to_refcount[ident] -= 1 | |
471 if self.id_to_refcount[ident] == 0: | |
472 del self.id_to_refcount[ident] | |
473 | |
474 if ident not in self.id_to_refcount: | |
475 # Two-step process in case the object turns out to contain other | |
476 # proxy objects (e.g. a managed list of managed lists). | |
477 # Otherwise, deleting self.id_to_obj[ident] would trigger the | |
478 # deleting of the stored value (another managed object) which would | |
479 # in turn attempt to acquire the mutex that is already held here. | |
480 self.id_to_obj[ident] = (None, (), None) # thread-safe | |
481 util.debug('disposing of obj with id %r', ident) | |
482 with self.mutex: | |
483 del self.id_to_obj[ident] | |
484 | |
485 | |
486 # | |
487 # Class to represent state of a manager | |
488 # | |
489 | |
490 class State(object): | |
491 __slots__ = ['value'] | |
492 INITIAL = 0 | |
493 STARTED = 1 | |
494 SHUTDOWN = 2 | |
495 | |
496 # | |
497 # Mapping from serializer name to Listener and Client types | |
498 # | |
499 | |
500 listener_client = { | |
501 'pickle' : (connection.Listener, connection.Client), | |
502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) | |
503 } | |
504 | |
505 # | |
506 # Definition of BaseManager | |
507 # | |
508 | |
509 class BaseManager(object): | |
510 ''' | |
511 Base class for managers | |
512 ''' | |
513 _registry = {} | |
514 _Server = Server | |
515 | |
516 def __init__(self, address=None, authkey=None, serializer='pickle', | |
517 ctx=None): | |
518 if authkey is None: | |
519 authkey = process.current_process().authkey | |
520 self._address = address # XXX not final address if eg ('', 0) | |
521 self._authkey = process.AuthenticationString(authkey) | |
522 self._state = State() | |
523 self._state.value = State.INITIAL | |
524 self._serializer = serializer | |
525 self._Listener, self._Client = listener_client[serializer] | |
526 self._ctx = ctx or get_context() | |
527 | |
528 def get_server(self): | |
529 ''' | |
530 Return server object with serve_forever() method and address attribute | |
531 ''' | |
532 if self._state.value != State.INITIAL: | |
533 if self._state.value == State.STARTED: | |
534 raise ProcessError("Already started server") | |
535 elif self._state.value == State.SHUTDOWN: | |
536 raise ProcessError("Manager has shut down") | |
537 else: | |
538 raise ProcessError( | |
539 "Unknown state {!r}".format(self._state.value)) | |
540 return Server(self._registry, self._address, | |
541 self._authkey, self._serializer) | |
542 | |
543 def connect(self): | |
544 ''' | |
545 Connect manager object to the server process | |
546 ''' | |
547 Listener, Client = listener_client[self._serializer] | |
548 conn = Client(self._address, authkey=self._authkey) | |
549 dispatch(conn, None, 'dummy') | |
550 self._state.value = State.STARTED | |
551 | |
552 def start(self, initializer=None, initargs=()): | |
553 ''' | |
554 Spawn a server process for this manager object | |
555 ''' | |
556 if self._state.value != State.INITIAL: | |
557 if self._state.value == State.STARTED: | |
558 raise ProcessError("Already started server") | |
559 elif self._state.value == State.SHUTDOWN: | |
560 raise ProcessError("Manager has shut down") | |
561 else: | |
562 raise ProcessError( | |
563 "Unknown state {!r}".format(self._state.value)) | |
564 | |
565 if initializer is not None and not callable(initializer): | |
566 raise TypeError('initializer must be a callable') | |
567 | |
568 # pipe over which we will retrieve address of server | |
569 reader, writer = connection.Pipe(duplex=False) | |
570 | |
571 # spawn process which runs a server | |
572 self._process = self._ctx.Process( | |
573 target=type(self)._run_server, | |
574 args=(self._registry, self._address, self._authkey, | |
575 self._serializer, writer, initializer, initargs), | |
576 ) | |
577 ident = ':'.join(str(i) for i in self._process._identity) | |
578 self._process.name = type(self).__name__ + '-' + ident | |
579 self._process.start() | |
580 | |
581 # get address of server | |
582 writer.close() | |
583 self._address = reader.recv() | |
584 reader.close() | |
585 | |
586 # register a finalizer | |
587 self._state.value = State.STARTED | |
588 self.shutdown = util.Finalize( | |
589 self, type(self)._finalize_manager, | |
590 args=(self._process, self._address, self._authkey, | |
591 self._state, self._Client), | |
592 exitpriority=0 | |
593 ) | |
594 | |
595 @classmethod | |
596 def _run_server(cls, registry, address, authkey, serializer, writer, | |
597 initializer=None, initargs=()): | |
598 ''' | |
599 Create a server, report its address and run it | |
600 ''' | |
601 # bpo-36368: protect server process from KeyboardInterrupt signals | |
602 signal.signal(signal.SIGINT, signal.SIG_IGN) | |
603 | |
604 if initializer is not None: | |
605 initializer(*initargs) | |
606 | |
607 # create server | |
608 server = cls._Server(registry, address, authkey, serializer) | |
609 | |
610 # inform parent process of the server's address | |
611 writer.send(server.address) | |
612 writer.close() | |
613 | |
614 # run the manager | |
615 util.info('manager serving at %r', server.address) | |
616 server.serve_forever() | |
617 | |
618 def _create(self, typeid, /, *args, **kwds): | |
619 ''' | |
620 Create a new shared object; return the token and exposed tuple | |
621 ''' | |
622 assert self._state.value == State.STARTED, 'server not yet started' | |
623 conn = self._Client(self._address, authkey=self._authkey) | |
624 try: | |
625 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) | |
626 finally: | |
627 conn.close() | |
628 return Token(typeid, self._address, id), exposed | |
629 | |
630 def join(self, timeout=None): | |
631 ''' | |
632 Join the manager process (if it has been spawned) | |
633 ''' | |
634 if self._process is not None: | |
635 self._process.join(timeout) | |
636 if not self._process.is_alive(): | |
637 self._process = None | |
638 | |
639 def _debug_info(self): | |
640 ''' | |
641 Return some info about the servers shared objects and connections | |
642 ''' | |
643 conn = self._Client(self._address, authkey=self._authkey) | |
644 try: | |
645 return dispatch(conn, None, 'debug_info') | |
646 finally: | |
647 conn.close() | |
648 | |
649 def _number_of_objects(self): | |
650 ''' | |
651 Return the number of shared objects | |
652 ''' | |
653 conn = self._Client(self._address, authkey=self._authkey) | |
654 try: | |
655 return dispatch(conn, None, 'number_of_objects') | |
656 finally: | |
657 conn.close() | |
658 | |
659 def __enter__(self): | |
660 if self._state.value == State.INITIAL: | |
661 self.start() | |
662 if self._state.value != State.STARTED: | |
663 if self._state.value == State.INITIAL: | |
664 raise ProcessError("Unable to start server") | |
665 elif self._state.value == State.SHUTDOWN: | |
666 raise ProcessError("Manager has shut down") | |
667 else: | |
668 raise ProcessError( | |
669 "Unknown state {!r}".format(self._state.value)) | |
670 return self | |
671 | |
672 def __exit__(self, exc_type, exc_val, exc_tb): | |
673 self.shutdown() | |
674 | |
675 @staticmethod | |
676 def _finalize_manager(process, address, authkey, state, _Client): | |
677 ''' | |
678 Shutdown the manager process; will be registered as a finalizer | |
679 ''' | |
680 if process.is_alive(): | |
681 util.info('sending shutdown message to manager') | |
682 try: | |
683 conn = _Client(address, authkey=authkey) | |
684 try: | |
685 dispatch(conn, None, 'shutdown') | |
686 finally: | |
687 conn.close() | |
688 except Exception: | |
689 pass | |
690 | |
691 process.join(timeout=1.0) | |
692 if process.is_alive(): | |
693 util.info('manager still alive') | |
694 if hasattr(process, 'terminate'): | |
695 util.info('trying to `terminate()` manager process') | |
696 process.terminate() | |
697 process.join(timeout=0.1) | |
698 if process.is_alive(): | |
699 util.info('manager still alive after terminate') | |
700 | |
701 state.value = State.SHUTDOWN | |
702 try: | |
703 del BaseProxy._address_to_local[address] | |
704 except KeyError: | |
705 pass | |
706 | |
707 @property | |
708 def address(self): | |
709 return self._address | |
710 | |
711 @classmethod | |
712 def register(cls, typeid, callable=None, proxytype=None, exposed=None, | |
713 method_to_typeid=None, create_method=True): | |
714 ''' | |
715 Register a typeid with the manager type | |
716 ''' | |
717 if '_registry' not in cls.__dict__: | |
718 cls._registry = cls._registry.copy() | |
719 | |
720 if proxytype is None: | |
721 proxytype = AutoProxy | |
722 | |
723 exposed = exposed or getattr(proxytype, '_exposed_', None) | |
724 | |
725 method_to_typeid = method_to_typeid or \ | |
726 getattr(proxytype, '_method_to_typeid_', None) | |
727 | |
728 if method_to_typeid: | |
729 for key, value in list(method_to_typeid.items()): # isinstance? | |
730 assert type(key) is str, '%r is not a string' % key | |
731 assert type(value) is str, '%r is not a string' % value | |
732 | |
733 cls._registry[typeid] = ( | |
734 callable, exposed, method_to_typeid, proxytype | |
735 ) | |
736 | |
737 if create_method: | |
738 def temp(self, /, *args, **kwds): | |
739 util.debug('requesting creation of a shared %r object', typeid) | |
740 token, exp = self._create(typeid, *args, **kwds) | |
741 proxy = proxytype( | |
742 token, self._serializer, manager=self, | |
743 authkey=self._authkey, exposed=exp | |
744 ) | |
745 conn = self._Client(token.address, authkey=self._authkey) | |
746 dispatch(conn, None, 'decref', (token.id,)) | |
747 return proxy | |
748 temp.__name__ = typeid | |
749 setattr(cls, typeid, temp) | |
750 | |
751 # | |
752 # Subclass of set which get cleared after a fork | |
753 # | |
754 | |
755 class ProcessLocalSet(set): | |
756 def __init__(self): | |
757 util.register_after_fork(self, lambda obj: obj.clear()) | |
758 def __reduce__(self): | |
759 return type(self), () | |
760 | |
761 # | |
762 # Definition of BaseProxy | |
763 # | |
764 | |
765 class BaseProxy(object): | |
766 ''' | |
767 A base for proxies of shared objects | |
768 ''' | |
769 _address_to_local = {} | |
770 _mutex = util.ForkAwareThreadLock() | |
771 | |
772 def __init__(self, token, serializer, manager=None, | |
773 authkey=None, exposed=None, incref=True, manager_owned=False): | |
774 with BaseProxy._mutex: | |
775 tls_idset = BaseProxy._address_to_local.get(token.address, None) | |
776 if tls_idset is None: | |
777 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() | |
778 BaseProxy._address_to_local[token.address] = tls_idset | |
779 | |
780 # self._tls is used to record the connection used by this | |
781 # thread to communicate with the manager at token.address | |
782 self._tls = tls_idset[0] | |
783 | |
784 # self._idset is used to record the identities of all shared | |
785 # objects for which the current process owns references and | |
786 # which are in the manager at token.address | |
787 self._idset = tls_idset[1] | |
788 | |
789 self._token = token | |
790 self._id = self._token.id | |
791 self._manager = manager | |
792 self._serializer = serializer | |
793 self._Client = listener_client[serializer][1] | |
794 | |
795 # Should be set to True only when a proxy object is being created | |
796 # on the manager server; primary use case: nested proxy objects. | |
797 # RebuildProxy detects when a proxy is being created on the manager | |
798 # and sets this value appropriately. | |
799 self._owned_by_manager = manager_owned | |
800 | |
801 if authkey is not None: | |
802 self._authkey = process.AuthenticationString(authkey) | |
803 elif self._manager is not None: | |
804 self._authkey = self._manager._authkey | |
805 else: | |
806 self._authkey = process.current_process().authkey | |
807 | |
808 if incref: | |
809 self._incref() | |
810 | |
811 util.register_after_fork(self, BaseProxy._after_fork) | |
812 | |
813 def _connect(self): | |
814 util.debug('making connection to manager') | |
815 name = process.current_process().name | |
816 if threading.current_thread().name != 'MainThread': | |
817 name += '|' + threading.current_thread().name | |
818 conn = self._Client(self._token.address, authkey=self._authkey) | |
819 dispatch(conn, None, 'accept_connection', (name,)) | |
820 self._tls.connection = conn | |
821 | |
822 def _callmethod(self, methodname, args=(), kwds={}): | |
823 ''' | |
824 Try to call a method of the referrent and return a copy of the result | |
825 ''' | |
826 try: | |
827 conn = self._tls.connection | |
828 except AttributeError: | |
829 util.debug('thread %r does not own a connection', | |
830 threading.current_thread().name) | |
831 self._connect() | |
832 conn = self._tls.connection | |
833 | |
834 conn.send((self._id, methodname, args, kwds)) | |
835 kind, result = conn.recv() | |
836 | |
837 if kind == '#RETURN': | |
838 return result | |
839 elif kind == '#PROXY': | |
840 exposed, token = result | |
841 proxytype = self._manager._registry[token.typeid][-1] | |
842 token.address = self._token.address | |
843 proxy = proxytype( | |
844 token, self._serializer, manager=self._manager, | |
845 authkey=self._authkey, exposed=exposed | |
846 ) | |
847 conn = self._Client(token.address, authkey=self._authkey) | |
848 dispatch(conn, None, 'decref', (token.id,)) | |
849 return proxy | |
850 raise convert_to_error(kind, result) | |
851 | |
852 def _getvalue(self): | |
853 ''' | |
854 Get a copy of the value of the referent | |
855 ''' | |
856 return self._callmethod('#GETVALUE') | |
857 | |
858 def _incref(self): | |
859 if self._owned_by_manager: | |
860 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) | |
861 return | |
862 | |
863 conn = self._Client(self._token.address, authkey=self._authkey) | |
864 dispatch(conn, None, 'incref', (self._id,)) | |
865 util.debug('INCREF %r', self._token.id) | |
866 | |
867 self._idset.add(self._id) | |
868 | |
869 state = self._manager and self._manager._state | |
870 | |
871 self._close = util.Finalize( | |
872 self, BaseProxy._decref, | |
873 args=(self._token, self._authkey, state, | |
874 self._tls, self._idset, self._Client), | |
875 exitpriority=10 | |
876 ) | |
877 | |
878 @staticmethod | |
879 def _decref(token, authkey, state, tls, idset, _Client): | |
880 idset.discard(token.id) | |
881 | |
882 # check whether manager is still alive | |
883 if state is None or state.value == State.STARTED: | |
884 # tell manager this process no longer cares about referent | |
885 try: | |
886 util.debug('DECREF %r', token.id) | |
887 conn = _Client(token.address, authkey=authkey) | |
888 dispatch(conn, None, 'decref', (token.id,)) | |
889 except Exception as e: | |
890 util.debug('... decref failed %s', e) | |
891 | |
892 else: | |
893 util.debug('DECREF %r -- manager already shutdown', token.id) | |
894 | |
895 # check whether we can close this thread's connection because | |
896 # the process owns no more references to objects for this manager | |
897 if not idset and hasattr(tls, 'connection'): | |
898 util.debug('thread %r has no more proxies so closing conn', | |
899 threading.current_thread().name) | |
900 tls.connection.close() | |
901 del tls.connection | |
902 | |
903 def _after_fork(self): | |
904 self._manager = None | |
905 try: | |
906 self._incref() | |
907 except Exception as e: | |
908 # the proxy may just be for a manager which has shutdown | |
909 util.info('incref failed: %s' % e) | |
910 | |
911 def __reduce__(self): | |
912 kwds = {} | |
913 if get_spawning_popen() is not None: | |
914 kwds['authkey'] = self._authkey | |
915 | |
916 if getattr(self, '_isauto', False): | |
917 kwds['exposed'] = self._exposed_ | |
918 return (RebuildProxy, | |
919 (AutoProxy, self._token, self._serializer, kwds)) | |
920 else: | |
921 return (RebuildProxy, | |
922 (type(self), self._token, self._serializer, kwds)) | |
923 | |
924 def __deepcopy__(self, memo): | |
925 return self._getvalue() | |
926 | |
927 def __repr__(self): | |
928 return '<%s object, typeid %r at %#x>' % \ | |
929 (type(self).__name__, self._token.typeid, id(self)) | |
930 | |
931 def __str__(self): | |
932 ''' | |
933 Return representation of the referent (or a fall-back if that fails) | |
934 ''' | |
935 try: | |
936 return self._callmethod('__repr__') | |
937 except Exception: | |
938 return repr(self)[:-1] + "; '__str__()' failed>" | |
939 | |
940 # | |
941 # Function used for unpickling | |
942 # | |
943 | |
944 def RebuildProxy(func, token, serializer, kwds): | |
945 ''' | |
946 Function used for unpickling proxy objects. | |
947 ''' | |
948 server = getattr(process.current_process(), '_manager_server', None) | |
949 if server and server.address == token.address: | |
950 util.debug('Rebuild a proxy owned by manager, token=%r', token) | |
951 kwds['manager_owned'] = True | |
952 if token.id not in server.id_to_local_proxy_obj: | |
953 server.id_to_local_proxy_obj[token.id] = \ | |
954 server.id_to_obj[token.id] | |
955 incref = ( | |
956 kwds.pop('incref', True) and | |
957 not getattr(process.current_process(), '_inheriting', False) | |
958 ) | |
959 return func(token, serializer, incref=incref, **kwds) | |
960 | |
961 # | |
962 # Functions to create proxies and proxy types | |
963 # | |
964 | |
965 def MakeProxyType(name, exposed, _cache={}): | |
966 ''' | |
967 Return a proxy type whose methods are given by `exposed` | |
968 ''' | |
969 exposed = tuple(exposed) | |
970 try: | |
971 return _cache[(name, exposed)] | |
972 except KeyError: | |
973 pass | |
974 | |
975 dic = {} | |
976 | |
977 for meth in exposed: | |
978 exec('''def %s(self, /, *args, **kwds): | |
979 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) | |
980 | |
981 ProxyType = type(name, (BaseProxy,), dic) | |
982 ProxyType._exposed_ = exposed | |
983 _cache[(name, exposed)] = ProxyType | |
984 return ProxyType | |
985 | |
986 | |
987 def AutoProxy(token, serializer, manager=None, authkey=None, | |
988 exposed=None, incref=True): | |
989 ''' | |
990 Return an auto-proxy for `token` | |
991 ''' | |
992 _Client = listener_client[serializer][1] | |
993 | |
994 if exposed is None: | |
995 conn = _Client(token.address, authkey=authkey) | |
996 try: | |
997 exposed = dispatch(conn, None, 'get_methods', (token,)) | |
998 finally: | |
999 conn.close() | |
1000 | |
1001 if authkey is None and manager is not None: | |
1002 authkey = manager._authkey | |
1003 if authkey is None: | |
1004 authkey = process.current_process().authkey | |
1005 | |
1006 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) | |
1007 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, | |
1008 incref=incref) | |
1009 proxy._isauto = True | |
1010 return proxy | |
1011 | |
1012 # | |
1013 # Types/callables which we will register with SyncManager | |
1014 # | |
1015 | |
1016 class Namespace(object): | |
1017 def __init__(self, /, **kwds): | |
1018 self.__dict__.update(kwds) | |
1019 def __repr__(self): | |
1020 items = list(self.__dict__.items()) | |
1021 temp = [] | |
1022 for name, value in items: | |
1023 if not name.startswith('_'): | |
1024 temp.append('%s=%r' % (name, value)) | |
1025 temp.sort() | |
1026 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) | |
1027 | |
1028 class Value(object): | |
1029 def __init__(self, typecode, value, lock=True): | |
1030 self._typecode = typecode | |
1031 self._value = value | |
1032 def get(self): | |
1033 return self._value | |
1034 def set(self, value): | |
1035 self._value = value | |
1036 def __repr__(self): | |
1037 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) | |
1038 value = property(get, set) | |
1039 | |
1040 def Array(typecode, sequence, lock=True): | |
1041 return array.array(typecode, sequence) | |
1042 | |
1043 # | |
1044 # Proxy types used by SyncManager | |
1045 # | |
1046 | |
1047 class IteratorProxy(BaseProxy): | |
1048 _exposed_ = ('__next__', 'send', 'throw', 'close') | |
1049 def __iter__(self): | |
1050 return self | |
1051 def __next__(self, *args): | |
1052 return self._callmethod('__next__', args) | |
1053 def send(self, *args): | |
1054 return self._callmethod('send', args) | |
1055 def throw(self, *args): | |
1056 return self._callmethod('throw', args) | |
1057 def close(self, *args): | |
1058 return self._callmethod('close', args) | |
1059 | |
1060 | |
1061 class AcquirerProxy(BaseProxy): | |
1062 _exposed_ = ('acquire', 'release') | |
1063 def acquire(self, blocking=True, timeout=None): | |
1064 args = (blocking,) if timeout is None else (blocking, timeout) | |
1065 return self._callmethod('acquire', args) | |
1066 def release(self): | |
1067 return self._callmethod('release') | |
1068 def __enter__(self): | |
1069 return self._callmethod('acquire') | |
1070 def __exit__(self, exc_type, exc_val, exc_tb): | |
1071 return self._callmethod('release') | |
1072 | |
1073 | |
1074 class ConditionProxy(AcquirerProxy): | |
1075 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') | |
1076 def wait(self, timeout=None): | |
1077 return self._callmethod('wait', (timeout,)) | |
1078 def notify(self, n=1): | |
1079 return self._callmethod('notify', (n,)) | |
1080 def notify_all(self): | |
1081 return self._callmethod('notify_all') | |
1082 def wait_for(self, predicate, timeout=None): | |
1083 result = predicate() | |
1084 if result: | |
1085 return result | |
1086 if timeout is not None: | |
1087 endtime = time.monotonic() + timeout | |
1088 else: | |
1089 endtime = None | |
1090 waittime = None | |
1091 while not result: | |
1092 if endtime is not None: | |
1093 waittime = endtime - time.monotonic() | |
1094 if waittime <= 0: | |
1095 break | |
1096 self.wait(waittime) | |
1097 result = predicate() | |
1098 return result | |
1099 | |
1100 | |
1101 class EventProxy(BaseProxy): | |
1102 _exposed_ = ('is_set', 'set', 'clear', 'wait') | |
1103 def is_set(self): | |
1104 return self._callmethod('is_set') | |
1105 def set(self): | |
1106 return self._callmethod('set') | |
1107 def clear(self): | |
1108 return self._callmethod('clear') | |
1109 def wait(self, timeout=None): | |
1110 return self._callmethod('wait', (timeout,)) | |
1111 | |
1112 | |
1113 class BarrierProxy(BaseProxy): | |
1114 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') | |
1115 def wait(self, timeout=None): | |
1116 return self._callmethod('wait', (timeout,)) | |
1117 def abort(self): | |
1118 return self._callmethod('abort') | |
1119 def reset(self): | |
1120 return self._callmethod('reset') | |
1121 @property | |
1122 def parties(self): | |
1123 return self._callmethod('__getattribute__', ('parties',)) | |
1124 @property | |
1125 def n_waiting(self): | |
1126 return self._callmethod('__getattribute__', ('n_waiting',)) | |
1127 @property | |
1128 def broken(self): | |
1129 return self._callmethod('__getattribute__', ('broken',)) | |
1130 | |
1131 | |
1132 class NamespaceProxy(BaseProxy): | |
1133 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') | |
1134 def __getattr__(self, key): | |
1135 if key[0] == '_': | |
1136 return object.__getattribute__(self, key) | |
1137 callmethod = object.__getattribute__(self, '_callmethod') | |
1138 return callmethod('__getattribute__', (key,)) | |
1139 def __setattr__(self, key, value): | |
1140 if key[0] == '_': | |
1141 return object.__setattr__(self, key, value) | |
1142 callmethod = object.__getattribute__(self, '_callmethod') | |
1143 return callmethod('__setattr__', (key, value)) | |
1144 def __delattr__(self, key): | |
1145 if key[0] == '_': | |
1146 return object.__delattr__(self, key) | |
1147 callmethod = object.__getattribute__(self, '_callmethod') | |
1148 return callmethod('__delattr__', (key,)) | |
1149 | |
1150 | |
1151 class ValueProxy(BaseProxy): | |
1152 _exposed_ = ('get', 'set') | |
1153 def get(self): | |
1154 return self._callmethod('get') | |
1155 def set(self, value): | |
1156 return self._callmethod('set', (value,)) | |
1157 value = property(get, set) | |
1158 | |
1159 | |
1160 BaseListProxy = MakeProxyType('BaseListProxy', ( | |
1161 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', | |
1162 '__mul__', '__reversed__', '__rmul__', '__setitem__', | |
1163 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', | |
1164 'reverse', 'sort', '__imul__' | |
1165 )) | |
1166 class ListProxy(BaseListProxy): | |
1167 def __iadd__(self, value): | |
1168 self._callmethod('extend', (value,)) | |
1169 return self | |
1170 def __imul__(self, value): | |
1171 self._callmethod('__imul__', (value,)) | |
1172 return self | |
1173 | |
1174 | |
1175 DictProxy = MakeProxyType('DictProxy', ( | |
1176 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', | |
1177 '__setitem__', 'clear', 'copy', 'get', 'items', | |
1178 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' | |
1179 )) | |
1180 DictProxy._method_to_typeid_ = { | |
1181 '__iter__': 'Iterator', | |
1182 } | |
1183 | |
1184 | |
1185 ArrayProxy = MakeProxyType('ArrayProxy', ( | |
1186 '__len__', '__getitem__', '__setitem__' | |
1187 )) | |
1188 | |
1189 | |
1190 BasePoolProxy = MakeProxyType('PoolProxy', ( | |
1191 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', | |
1192 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', | |
1193 )) | |
1194 BasePoolProxy._method_to_typeid_ = { | |
1195 'apply_async': 'AsyncResult', | |
1196 'map_async': 'AsyncResult', | |
1197 'starmap_async': 'AsyncResult', | |
1198 'imap': 'Iterator', | |
1199 'imap_unordered': 'Iterator' | |
1200 } | |
1201 class PoolProxy(BasePoolProxy): | |
1202 def __enter__(self): | |
1203 return self | |
1204 def __exit__(self, exc_type, exc_val, exc_tb): | |
1205 self.terminate() | |
1206 | |
1207 # | |
1208 # Definition of SyncManager | |
1209 # | |
1210 | |
1211 class SyncManager(BaseManager): | |
1212 ''' | |
1213 Subclass of `BaseManager` which supports a number of shared object types. | |
1214 | |
1215 The types registered are those intended for the synchronization | |
1216 of threads, plus `dict`, `list` and `Namespace`. | |
1217 | |
1218 The `multiprocessing.Manager()` function creates started instances of | |
1219 this class. | |
1220 ''' | |
1221 | |
1222 SyncManager.register('Queue', queue.Queue) | |
1223 SyncManager.register('JoinableQueue', queue.Queue) | |
1224 SyncManager.register('Event', threading.Event, EventProxy) | |
1225 SyncManager.register('Lock', threading.Lock, AcquirerProxy) | |
1226 SyncManager.register('RLock', threading.RLock, AcquirerProxy) | |
1227 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) | |
1228 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, | |
1229 AcquirerProxy) | |
1230 SyncManager.register('Condition', threading.Condition, ConditionProxy) | |
1231 SyncManager.register('Barrier', threading.Barrier, BarrierProxy) | |
1232 SyncManager.register('Pool', pool.Pool, PoolProxy) | |
1233 SyncManager.register('list', list, ListProxy) | |
1234 SyncManager.register('dict', dict, DictProxy) | |
1235 SyncManager.register('Value', Value, ValueProxy) | |
1236 SyncManager.register('Array', Array, ArrayProxy) | |
1237 SyncManager.register('Namespace', Namespace, NamespaceProxy) | |
1238 | |
1239 # types returned by methods of PoolProxy | |
1240 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) | |
1241 SyncManager.register('AsyncResult', create_method=False) | |
1242 | |
1243 # | |
1244 # Definition of SharedMemoryManager and SharedMemoryServer | |
1245 # | |
1246 | |
1247 if HAS_SHMEM: | |
1248 class _SharedMemoryTracker: | |
1249 "Manages one or more shared memory segments." | |
1250 | |
1251 def __init__(self, name, segment_names=[]): | |
1252 self.shared_memory_context_name = name | |
1253 self.segment_names = segment_names | |
1254 | |
1255 def register_segment(self, segment_name): | |
1256 "Adds the supplied shared memory block name to tracker." | |
1257 util.debug(f"Register segment {segment_name!r} in pid {getpid()}") | |
1258 self.segment_names.append(segment_name) | |
1259 | |
1260 def destroy_segment(self, segment_name): | |
1261 """Calls unlink() on the shared memory block with the supplied name | |
1262 and removes it from the list of blocks being tracked.""" | |
1263 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") | |
1264 self.segment_names.remove(segment_name) | |
1265 segment = shared_memory.SharedMemory(segment_name) | |
1266 segment.close() | |
1267 segment.unlink() | |
1268 | |
1269 def unlink(self): | |
1270 "Calls destroy_segment() on all tracked shared memory blocks." | |
1271 for segment_name in self.segment_names[:]: | |
1272 self.destroy_segment(segment_name) | |
1273 | |
1274 def __del__(self): | |
1275 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") | |
1276 self.unlink() | |
1277 | |
1278 def __getstate__(self): | |
1279 return (self.shared_memory_context_name, self.segment_names) | |
1280 | |
1281 def __setstate__(self, state): | |
1282 self.__init__(*state) | |
1283 | |
1284 | |
1285 class SharedMemoryServer(Server): | |
1286 | |
1287 public = Server.public + \ | |
1288 ['track_segment', 'release_segment', 'list_segments'] | |
1289 | |
1290 def __init__(self, *args, **kwargs): | |
1291 Server.__init__(self, *args, **kwargs) | |
1292 self.shared_memory_context = \ | |
1293 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}") | |
1294 util.debug(f"SharedMemoryServer started by pid {getpid()}") | |
1295 | |
1296 def create(*args, **kwargs): | |
1297 """Create a new distributed-shared object (not backed by a shared | |
1298 memory block) and return its id to be used in a Proxy Object.""" | |
1299 # Unless set up as a shared proxy, don't make shared_memory_context | |
1300 # a standard part of kwargs. This makes things easier for supplying | |
1301 # simple functions. | |
1302 if len(args) >= 3: | |
1303 typeod = args[2] | |
1304 elif 'typeid' in kwargs: | |
1305 typeid = kwargs['typeid'] | |
1306 elif not args: | |
1307 raise TypeError("descriptor 'create' of 'SharedMemoryServer' " | |
1308 "object needs an argument") | |
1309 else: | |
1310 raise TypeError('create expected at least 2 positional ' | |
1311 'arguments, got %d' % (len(args)-1)) | |
1312 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): | |
1313 kwargs['shared_memory_context'] = self.shared_memory_context | |
1314 return Server.create(*args, **kwargs) | |
1315 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)' | |
1316 | |
1317 def shutdown(self, c): | |
1318 "Call unlink() on all tracked shared memory, terminate the Server." | |
1319 self.shared_memory_context.unlink() | |
1320 return Server.shutdown(self, c) | |
1321 | |
1322 def track_segment(self, c, segment_name): | |
1323 "Adds the supplied shared memory block name to Server's tracker." | |
1324 self.shared_memory_context.register_segment(segment_name) | |
1325 | |
1326 def release_segment(self, c, segment_name): | |
1327 """Calls unlink() on the shared memory block with the supplied name | |
1328 and removes it from the tracker instance inside the Server.""" | |
1329 self.shared_memory_context.destroy_segment(segment_name) | |
1330 | |
1331 def list_segments(self, c): | |
1332 """Returns a list of names of shared memory blocks that the Server | |
1333 is currently tracking.""" | |
1334 return self.shared_memory_context.segment_names | |
1335 | |
1336 | |
1337 class SharedMemoryManager(BaseManager): | |
1338 """Like SyncManager but uses SharedMemoryServer instead of Server. | |
1339 | |
1340 It provides methods for creating and returning SharedMemory instances | |
1341 and for creating a list-like object (ShareableList) backed by shared | |
1342 memory. It also provides methods that create and return Proxy Objects | |
1343 that support synchronization across processes (i.e. multi-process-safe | |
1344 locks and semaphores). | |
1345 """ | |
1346 | |
1347 _Server = SharedMemoryServer | |
1348 | |
1349 def __init__(self, *args, **kwargs): | |
1350 if os.name == "posix": | |
1351 # bpo-36867: Ensure the resource_tracker is running before | |
1352 # launching the manager process, so that concurrent | |
1353 # shared_memory manipulation both in the manager and in the | |
1354 # current process does not create two resource_tracker | |
1355 # processes. | |
1356 from . import resource_tracker | |
1357 resource_tracker.ensure_running() | |
1358 BaseManager.__init__(self, *args, **kwargs) | |
1359 util.debug(f"{self.__class__.__name__} created by pid {getpid()}") | |
1360 | |
1361 def __del__(self): | |
1362 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") | |
1363 pass | |
1364 | |
1365 def get_server(self): | |
1366 'Better than monkeypatching for now; merge into Server ultimately' | |
1367 if self._state.value != State.INITIAL: | |
1368 if self._state.value == State.STARTED: | |
1369 raise ProcessError("Already started SharedMemoryServer") | |
1370 elif self._state.value == State.SHUTDOWN: | |
1371 raise ProcessError("SharedMemoryManager has shut down") | |
1372 else: | |
1373 raise ProcessError( | |
1374 "Unknown state {!r}".format(self._state.value)) | |
1375 return self._Server(self._registry, self._address, | |
1376 self._authkey, self._serializer) | |
1377 | |
1378 def SharedMemory(self, size): | |
1379 """Returns a new SharedMemory instance with the specified size in | |
1380 bytes, to be tracked by the manager.""" | |
1381 with self._Client(self._address, authkey=self._authkey) as conn: | |
1382 sms = shared_memory.SharedMemory(None, create=True, size=size) | |
1383 try: | |
1384 dispatch(conn, None, 'track_segment', (sms.name,)) | |
1385 except BaseException as e: | |
1386 sms.unlink() | |
1387 raise e | |
1388 return sms | |
1389 | |
1390 def ShareableList(self, sequence): | |
1391 """Returns a new ShareableList instance populated with the values | |
1392 from the input sequence, to be tracked by the manager.""" | |
1393 with self._Client(self._address, authkey=self._authkey) as conn: | |
1394 sl = shared_memory.ShareableList(sequence) | |
1395 try: | |
1396 dispatch(conn, None, 'track_segment', (sl.shm.name,)) | |
1397 except BaseException as e: | |
1398 sl.shm.unlink() | |
1399 raise e | |
1400 return sl |