comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/managers.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 69:33d812a61356
1 #
2 # Module providing manager classes for dealing
3 # with shared objects
4 #
5 # multiprocessing/managers.py
6 #
7 # Copyright (c) 2006-2008, R Oudkerk
8 # Licensed to PSF under a Contributor Agreement.
9 #
10
11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12 'SharedMemoryManager' ]
13
14 #
15 # Imports
16 #
17
18 import sys
19 import threading
20 import signal
21 import array
22 import queue
23 import time
24 import os
25 from os import getpid
26
27 from traceback import format_exc
28
29 from . import connection
30 from .context import reduction, get_spawning_popen, ProcessError
31 from . import pool
32 from . import process
33 from . import util
34 from . import get_context
35 try:
36 from . import shared_memory
37 HAS_SHMEM = True
38 except ImportError:
39 HAS_SHMEM = False
40
41 #
42 # Register some things for pickling
43 #
44
45 def reduce_array(a):
46 return array.array, (a.typecode, a.tobytes())
47 reduction.register(array.array, reduce_array)
48
49 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
50 if view_types[0] is not list: # only needed in Py3.0
51 def rebuild_as_list(obj):
52 return list, (list(obj),)
53 for view_type in view_types:
54 reduction.register(view_type, rebuild_as_list)
55
56 #
57 # Type for identifying shared objects
58 #
59
60 class Token(object):
61 '''
62 Type to uniquely indentify a shared object
63 '''
64 __slots__ = ('typeid', 'address', 'id')
65
66 def __init__(self, typeid, address, id):
67 (self.typeid, self.address, self.id) = (typeid, address, id)
68
69 def __getstate__(self):
70 return (self.typeid, self.address, self.id)
71
72 def __setstate__(self, state):
73 (self.typeid, self.address, self.id) = state
74
75 def __repr__(self):
76 return '%s(typeid=%r, address=%r, id=%r)' % \
77 (self.__class__.__name__, self.typeid, self.address, self.id)
78
79 #
80 # Function for communication with a manager's server process
81 #
82
83 def dispatch(c, id, methodname, args=(), kwds={}):
84 '''
85 Send a message to manager using connection `c` and return response
86 '''
87 c.send((id, methodname, args, kwds))
88 kind, result = c.recv()
89 if kind == '#RETURN':
90 return result
91 raise convert_to_error(kind, result)
92
93 def convert_to_error(kind, result):
94 if kind == '#ERROR':
95 return result
96 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
97 if not isinstance(result, str):
98 raise TypeError(
99 "Result {0!r} (kind '{1}') type is {2}, not str".format(
100 result, kind, type(result)))
101 if kind == '#UNSERIALIZABLE':
102 return RemoteError('Unserializable message: %s\n' % result)
103 else:
104 return RemoteError(result)
105 else:
106 return ValueError('Unrecognized message type {!r}'.format(kind))
107
108 class RemoteError(Exception):
109 def __str__(self):
110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
111
112 #
113 # Functions for finding the method names of an object
114 #
115
116 def all_methods(obj):
117 '''
118 Return a list of names of methods of `obj`
119 '''
120 temp = []
121 for name in dir(obj):
122 func = getattr(obj, name)
123 if callable(func):
124 temp.append(name)
125 return temp
126
127 def public_methods(obj):
128 '''
129 Return a list of names of methods of `obj` which do not start with '_'
130 '''
131 return [name for name in all_methods(obj) if name[0] != '_']
132
133 #
134 # Server which is run in a process controlled by a manager
135 #
136
137 class Server(object):
138 '''
139 Server class which runs in a process controlled by a manager object
140 '''
141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
143
144 def __init__(self, registry, address, authkey, serializer):
145 if not isinstance(authkey, bytes):
146 raise TypeError(
147 "Authkey {0!r} is type {1!s}, not bytes".format(
148 authkey, type(authkey)))
149 self.registry = registry
150 self.authkey = process.AuthenticationString(authkey)
151 Listener, Client = listener_client[serializer]
152
153 # do authentication later
154 self.listener = Listener(address=address, backlog=16)
155 self.address = self.listener.address
156
157 self.id_to_obj = {'0': (None, ())}
158 self.id_to_refcount = {}
159 self.id_to_local_proxy_obj = {}
160 self.mutex = threading.Lock()
161
162 def serve_forever(self):
163 '''
164 Run the server forever
165 '''
166 self.stop_event = threading.Event()
167 process.current_process()._manager_server = self
168 try:
169 accepter = threading.Thread(target=self.accepter)
170 accepter.daemon = True
171 accepter.start()
172 try:
173 while not self.stop_event.is_set():
174 self.stop_event.wait(1)
175 except (KeyboardInterrupt, SystemExit):
176 pass
177 finally:
178 if sys.stdout != sys.__stdout__: # what about stderr?
179 util.debug('resetting stdout, stderr')
180 sys.stdout = sys.__stdout__
181 sys.stderr = sys.__stderr__
182 sys.exit(0)
183
184 def accepter(self):
185 while True:
186 try:
187 c = self.listener.accept()
188 except OSError:
189 continue
190 t = threading.Thread(target=self.handle_request, args=(c,))
191 t.daemon = True
192 t.start()
193
194 def handle_request(self, c):
195 '''
196 Handle a new connection
197 '''
198 funcname = result = request = None
199 try:
200 connection.deliver_challenge(c, self.authkey)
201 connection.answer_challenge(c, self.authkey)
202 request = c.recv()
203 ignore, funcname, args, kwds = request
204 assert funcname in self.public, '%r unrecognized' % funcname
205 func = getattr(self, funcname)
206 except Exception:
207 msg = ('#TRACEBACK', format_exc())
208 else:
209 try:
210 result = func(c, *args, **kwds)
211 except Exception:
212 msg = ('#TRACEBACK', format_exc())
213 else:
214 msg = ('#RETURN', result)
215 try:
216 c.send(msg)
217 except Exception as e:
218 try:
219 c.send(('#TRACEBACK', format_exc()))
220 except Exception:
221 pass
222 util.info('Failure to send message: %r', msg)
223 util.info(' ... request was %r', request)
224 util.info(' ... exception was %r', e)
225
226 c.close()
227
228 def serve_client(self, conn):
229 '''
230 Handle requests from the proxies in a particular process/thread
231 '''
232 util.debug('starting server thread to service %r',
233 threading.current_thread().name)
234
235 recv = conn.recv
236 send = conn.send
237 id_to_obj = self.id_to_obj
238
239 while not self.stop_event.is_set():
240
241 try:
242 methodname = obj = None
243 request = recv()
244 ident, methodname, args, kwds = request
245 try:
246 obj, exposed, gettypeid = id_to_obj[ident]
247 except KeyError as ke:
248 try:
249 obj, exposed, gettypeid = \
250 self.id_to_local_proxy_obj[ident]
251 except KeyError as second_ke:
252 raise ke
253
254 if methodname not in exposed:
255 raise AttributeError(
256 'method %r of %r object is not in exposed=%r' %
257 (methodname, type(obj), exposed)
258 )
259
260 function = getattr(obj, methodname)
261
262 try:
263 res = function(*args, **kwds)
264 except Exception as e:
265 msg = ('#ERROR', e)
266 else:
267 typeid = gettypeid and gettypeid.get(methodname, None)
268 if typeid:
269 rident, rexposed = self.create(conn, typeid, res)
270 token = Token(typeid, self.address, rident)
271 msg = ('#PROXY', (rexposed, token))
272 else:
273 msg = ('#RETURN', res)
274
275 except AttributeError:
276 if methodname is None:
277 msg = ('#TRACEBACK', format_exc())
278 else:
279 try:
280 fallback_func = self.fallback_mapping[methodname]
281 result = fallback_func(
282 self, conn, ident, obj, *args, **kwds
283 )
284 msg = ('#RETURN', result)
285 except Exception:
286 msg = ('#TRACEBACK', format_exc())
287
288 except EOFError:
289 util.debug('got EOF -- exiting thread serving %r',
290 threading.current_thread().name)
291 sys.exit(0)
292
293 except Exception:
294 msg = ('#TRACEBACK', format_exc())
295
296 try:
297 try:
298 send(msg)
299 except Exception as e:
300 send(('#UNSERIALIZABLE', format_exc()))
301 except Exception as e:
302 util.info('exception in thread serving %r',
303 threading.current_thread().name)
304 util.info(' ... message was %r', msg)
305 util.info(' ... exception was %r', e)
306 conn.close()
307 sys.exit(1)
308
309 def fallback_getvalue(self, conn, ident, obj):
310 return obj
311
312 def fallback_str(self, conn, ident, obj):
313 return str(obj)
314
315 def fallback_repr(self, conn, ident, obj):
316 return repr(obj)
317
318 fallback_mapping = {
319 '__str__':fallback_str,
320 '__repr__':fallback_repr,
321 '#GETVALUE':fallback_getvalue
322 }
323
324 def dummy(self, c):
325 pass
326
327 def debug_info(self, c):
328 '''
329 Return some info --- useful to spot problems with refcounting
330 '''
331 # Perhaps include debug info about 'c'?
332 with self.mutex:
333 result = []
334 keys = list(self.id_to_refcount.keys())
335 keys.sort()
336 for ident in keys:
337 if ident != '0':
338 result.append(' %s: refcount=%s\n %s' %
339 (ident, self.id_to_refcount[ident],
340 str(self.id_to_obj[ident][0])[:75]))
341 return '\n'.join(result)
342
343 def number_of_objects(self, c):
344 '''
345 Number of shared objects
346 '''
347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
348 return len(self.id_to_refcount)
349
350 def shutdown(self, c):
351 '''
352 Shutdown this process
353 '''
354 try:
355 util.debug('manager received shutdown message')
356 c.send(('#RETURN', None))
357 except:
358 import traceback
359 traceback.print_exc()
360 finally:
361 self.stop_event.set()
362
363 def create(*args, **kwds):
364 '''
365 Create a new shared object and return its id
366 '''
367 if len(args) >= 3:
368 self, c, typeid, *args = args
369 elif not args:
370 raise TypeError("descriptor 'create' of 'Server' object "
371 "needs an argument")
372 else:
373 if 'typeid' not in kwds:
374 raise TypeError('create expected at least 2 positional '
375 'arguments, got %d' % (len(args)-1))
376 typeid = kwds.pop('typeid')
377 if len(args) >= 2:
378 self, c, *args = args
379 import warnings
380 warnings.warn("Passing 'typeid' as keyword argument is deprecated",
381 DeprecationWarning, stacklevel=2)
382 else:
383 if 'c' not in kwds:
384 raise TypeError('create expected at least 2 positional '
385 'arguments, got %d' % (len(args)-1))
386 c = kwds.pop('c')
387 self, *args = args
388 import warnings
389 warnings.warn("Passing 'c' as keyword argument is deprecated",
390 DeprecationWarning, stacklevel=2)
391 args = tuple(args)
392
393 with self.mutex:
394 callable, exposed, method_to_typeid, proxytype = \
395 self.registry[typeid]
396
397 if callable is None:
398 if kwds or (len(args) != 1):
399 raise ValueError(
400 "Without callable, must have one non-keyword argument")
401 obj = args[0]
402 else:
403 obj = callable(*args, **kwds)
404
405 if exposed is None:
406 exposed = public_methods(obj)
407 if method_to_typeid is not None:
408 if not isinstance(method_to_typeid, dict):
409 raise TypeError(
410 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
411 method_to_typeid, type(method_to_typeid)))
412 exposed = list(exposed) + list(method_to_typeid)
413
414 ident = '%x' % id(obj) # convert to string because xmlrpclib
415 # only has 32 bit signed integers
416 util.debug('%r callable returned object with id %r', typeid, ident)
417
418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
419 if ident not in self.id_to_refcount:
420 self.id_to_refcount[ident] = 0
421
422 self.incref(c, ident)
423 return ident, tuple(exposed)
424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)'
425
426 def get_methods(self, c, token):
427 '''
428 Return the methods of the shared object indicated by token
429 '''
430 return tuple(self.id_to_obj[token.id][1])
431
432 def accept_connection(self, c, name):
433 '''
434 Spawn a new thread to serve this connection
435 '''
436 threading.current_thread().name = name
437 c.send(('#RETURN', None))
438 self.serve_client(c)
439
440 def incref(self, c, ident):
441 with self.mutex:
442 try:
443 self.id_to_refcount[ident] += 1
444 except KeyError as ke:
445 # If no external references exist but an internal (to the
446 # manager) still does and a new external reference is created
447 # from it, restore the manager's tracking of it from the
448 # previously stashed internal ref.
449 if ident in self.id_to_local_proxy_obj:
450 self.id_to_refcount[ident] = 1
451 self.id_to_obj[ident] = \
452 self.id_to_local_proxy_obj[ident]
453 obj, exposed, gettypeid = self.id_to_obj[ident]
454 util.debug('Server re-enabled tracking & INCREF %r', ident)
455 else:
456 raise ke
457
458 def decref(self, c, ident):
459 if ident not in self.id_to_refcount and \
460 ident in self.id_to_local_proxy_obj:
461 util.debug('Server DECREF skipping %r', ident)
462 return
463
464 with self.mutex:
465 if self.id_to_refcount[ident] <= 0:
466 raise AssertionError(
467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
468 ident, self.id_to_obj[ident],
469 self.id_to_refcount[ident]))
470 self.id_to_refcount[ident] -= 1
471 if self.id_to_refcount[ident] == 0:
472 del self.id_to_refcount[ident]
473
474 if ident not in self.id_to_refcount:
475 # Two-step process in case the object turns out to contain other
476 # proxy objects (e.g. a managed list of managed lists).
477 # Otherwise, deleting self.id_to_obj[ident] would trigger the
478 # deleting of the stored value (another managed object) which would
479 # in turn attempt to acquire the mutex that is already held here.
480 self.id_to_obj[ident] = (None, (), None) # thread-safe
481 util.debug('disposing of obj with id %r', ident)
482 with self.mutex:
483 del self.id_to_obj[ident]
484
485
486 #
487 # Class to represent state of a manager
488 #
489
490 class State(object):
491 __slots__ = ['value']
492 INITIAL = 0
493 STARTED = 1
494 SHUTDOWN = 2
495
496 #
497 # Mapping from serializer name to Listener and Client types
498 #
499
500 listener_client = {
501 'pickle' : (connection.Listener, connection.Client),
502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
503 }
504
505 #
506 # Definition of BaseManager
507 #
508
509 class BaseManager(object):
510 '''
511 Base class for managers
512 '''
513 _registry = {}
514 _Server = Server
515
516 def __init__(self, address=None, authkey=None, serializer='pickle',
517 ctx=None):
518 if authkey is None:
519 authkey = process.current_process().authkey
520 self._address = address # XXX not final address if eg ('', 0)
521 self._authkey = process.AuthenticationString(authkey)
522 self._state = State()
523 self._state.value = State.INITIAL
524 self._serializer = serializer
525 self._Listener, self._Client = listener_client[serializer]
526 self._ctx = ctx or get_context()
527
528 def get_server(self):
529 '''
530 Return server object with serve_forever() method and address attribute
531 '''
532 if self._state.value != State.INITIAL:
533 if self._state.value == State.STARTED:
534 raise ProcessError("Already started server")
535 elif self._state.value == State.SHUTDOWN:
536 raise ProcessError("Manager has shut down")
537 else:
538 raise ProcessError(
539 "Unknown state {!r}".format(self._state.value))
540 return Server(self._registry, self._address,
541 self._authkey, self._serializer)
542
543 def connect(self):
544 '''
545 Connect manager object to the server process
546 '''
547 Listener, Client = listener_client[self._serializer]
548 conn = Client(self._address, authkey=self._authkey)
549 dispatch(conn, None, 'dummy')
550 self._state.value = State.STARTED
551
552 def start(self, initializer=None, initargs=()):
553 '''
554 Spawn a server process for this manager object
555 '''
556 if self._state.value != State.INITIAL:
557 if self._state.value == State.STARTED:
558 raise ProcessError("Already started server")
559 elif self._state.value == State.SHUTDOWN:
560 raise ProcessError("Manager has shut down")
561 else:
562 raise ProcessError(
563 "Unknown state {!r}".format(self._state.value))
564
565 if initializer is not None and not callable(initializer):
566 raise TypeError('initializer must be a callable')
567
568 # pipe over which we will retrieve address of server
569 reader, writer = connection.Pipe(duplex=False)
570
571 # spawn process which runs a server
572 self._process = self._ctx.Process(
573 target=type(self)._run_server,
574 args=(self._registry, self._address, self._authkey,
575 self._serializer, writer, initializer, initargs),
576 )
577 ident = ':'.join(str(i) for i in self._process._identity)
578 self._process.name = type(self).__name__ + '-' + ident
579 self._process.start()
580
581 # get address of server
582 writer.close()
583 self._address = reader.recv()
584 reader.close()
585
586 # register a finalizer
587 self._state.value = State.STARTED
588 self.shutdown = util.Finalize(
589 self, type(self)._finalize_manager,
590 args=(self._process, self._address, self._authkey,
591 self._state, self._Client),
592 exitpriority=0
593 )
594
595 @classmethod
596 def _run_server(cls, registry, address, authkey, serializer, writer,
597 initializer=None, initargs=()):
598 '''
599 Create a server, report its address and run it
600 '''
601 # bpo-36368: protect server process from KeyboardInterrupt signals
602 signal.signal(signal.SIGINT, signal.SIG_IGN)
603
604 if initializer is not None:
605 initializer(*initargs)
606
607 # create server
608 server = cls._Server(registry, address, authkey, serializer)
609
610 # inform parent process of the server's address
611 writer.send(server.address)
612 writer.close()
613
614 # run the manager
615 util.info('manager serving at %r', server.address)
616 server.serve_forever()
617
618 def _create(self, typeid, /, *args, **kwds):
619 '''
620 Create a new shared object; return the token and exposed tuple
621 '''
622 assert self._state.value == State.STARTED, 'server not yet started'
623 conn = self._Client(self._address, authkey=self._authkey)
624 try:
625 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
626 finally:
627 conn.close()
628 return Token(typeid, self._address, id), exposed
629
630 def join(self, timeout=None):
631 '''
632 Join the manager process (if it has been spawned)
633 '''
634 if self._process is not None:
635 self._process.join(timeout)
636 if not self._process.is_alive():
637 self._process = None
638
639 def _debug_info(self):
640 '''
641 Return some info about the servers shared objects and connections
642 '''
643 conn = self._Client(self._address, authkey=self._authkey)
644 try:
645 return dispatch(conn, None, 'debug_info')
646 finally:
647 conn.close()
648
649 def _number_of_objects(self):
650 '''
651 Return the number of shared objects
652 '''
653 conn = self._Client(self._address, authkey=self._authkey)
654 try:
655 return dispatch(conn, None, 'number_of_objects')
656 finally:
657 conn.close()
658
659 def __enter__(self):
660 if self._state.value == State.INITIAL:
661 self.start()
662 if self._state.value != State.STARTED:
663 if self._state.value == State.INITIAL:
664 raise ProcessError("Unable to start server")
665 elif self._state.value == State.SHUTDOWN:
666 raise ProcessError("Manager has shut down")
667 else:
668 raise ProcessError(
669 "Unknown state {!r}".format(self._state.value))
670 return self
671
672 def __exit__(self, exc_type, exc_val, exc_tb):
673 self.shutdown()
674
675 @staticmethod
676 def _finalize_manager(process, address, authkey, state, _Client):
677 '''
678 Shutdown the manager process; will be registered as a finalizer
679 '''
680 if process.is_alive():
681 util.info('sending shutdown message to manager')
682 try:
683 conn = _Client(address, authkey=authkey)
684 try:
685 dispatch(conn, None, 'shutdown')
686 finally:
687 conn.close()
688 except Exception:
689 pass
690
691 process.join(timeout=1.0)
692 if process.is_alive():
693 util.info('manager still alive')
694 if hasattr(process, 'terminate'):
695 util.info('trying to `terminate()` manager process')
696 process.terminate()
697 process.join(timeout=0.1)
698 if process.is_alive():
699 util.info('manager still alive after terminate')
700
701 state.value = State.SHUTDOWN
702 try:
703 del BaseProxy._address_to_local[address]
704 except KeyError:
705 pass
706
707 @property
708 def address(self):
709 return self._address
710
711 @classmethod
712 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
713 method_to_typeid=None, create_method=True):
714 '''
715 Register a typeid with the manager type
716 '''
717 if '_registry' not in cls.__dict__:
718 cls._registry = cls._registry.copy()
719
720 if proxytype is None:
721 proxytype = AutoProxy
722
723 exposed = exposed or getattr(proxytype, '_exposed_', None)
724
725 method_to_typeid = method_to_typeid or \
726 getattr(proxytype, '_method_to_typeid_', None)
727
728 if method_to_typeid:
729 for key, value in list(method_to_typeid.items()): # isinstance?
730 assert type(key) is str, '%r is not a string' % key
731 assert type(value) is str, '%r is not a string' % value
732
733 cls._registry[typeid] = (
734 callable, exposed, method_to_typeid, proxytype
735 )
736
737 if create_method:
738 def temp(self, /, *args, **kwds):
739 util.debug('requesting creation of a shared %r object', typeid)
740 token, exp = self._create(typeid, *args, **kwds)
741 proxy = proxytype(
742 token, self._serializer, manager=self,
743 authkey=self._authkey, exposed=exp
744 )
745 conn = self._Client(token.address, authkey=self._authkey)
746 dispatch(conn, None, 'decref', (token.id,))
747 return proxy
748 temp.__name__ = typeid
749 setattr(cls, typeid, temp)
750
751 #
752 # Subclass of set which get cleared after a fork
753 #
754
755 class ProcessLocalSet(set):
756 def __init__(self):
757 util.register_after_fork(self, lambda obj: obj.clear())
758 def __reduce__(self):
759 return type(self), ()
760
761 #
762 # Definition of BaseProxy
763 #
764
765 class BaseProxy(object):
766 '''
767 A base for proxies of shared objects
768 '''
769 _address_to_local = {}
770 _mutex = util.ForkAwareThreadLock()
771
772 def __init__(self, token, serializer, manager=None,
773 authkey=None, exposed=None, incref=True, manager_owned=False):
774 with BaseProxy._mutex:
775 tls_idset = BaseProxy._address_to_local.get(token.address, None)
776 if tls_idset is None:
777 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
778 BaseProxy._address_to_local[token.address] = tls_idset
779
780 # self._tls is used to record the connection used by this
781 # thread to communicate with the manager at token.address
782 self._tls = tls_idset[0]
783
784 # self._idset is used to record the identities of all shared
785 # objects for which the current process owns references and
786 # which are in the manager at token.address
787 self._idset = tls_idset[1]
788
789 self._token = token
790 self._id = self._token.id
791 self._manager = manager
792 self._serializer = serializer
793 self._Client = listener_client[serializer][1]
794
795 # Should be set to True only when a proxy object is being created
796 # on the manager server; primary use case: nested proxy objects.
797 # RebuildProxy detects when a proxy is being created on the manager
798 # and sets this value appropriately.
799 self._owned_by_manager = manager_owned
800
801 if authkey is not None:
802 self._authkey = process.AuthenticationString(authkey)
803 elif self._manager is not None:
804 self._authkey = self._manager._authkey
805 else:
806 self._authkey = process.current_process().authkey
807
808 if incref:
809 self._incref()
810
811 util.register_after_fork(self, BaseProxy._after_fork)
812
813 def _connect(self):
814 util.debug('making connection to manager')
815 name = process.current_process().name
816 if threading.current_thread().name != 'MainThread':
817 name += '|' + threading.current_thread().name
818 conn = self._Client(self._token.address, authkey=self._authkey)
819 dispatch(conn, None, 'accept_connection', (name,))
820 self._tls.connection = conn
821
822 def _callmethod(self, methodname, args=(), kwds={}):
823 '''
824 Try to call a method of the referrent and return a copy of the result
825 '''
826 try:
827 conn = self._tls.connection
828 except AttributeError:
829 util.debug('thread %r does not own a connection',
830 threading.current_thread().name)
831 self._connect()
832 conn = self._tls.connection
833
834 conn.send((self._id, methodname, args, kwds))
835 kind, result = conn.recv()
836
837 if kind == '#RETURN':
838 return result
839 elif kind == '#PROXY':
840 exposed, token = result
841 proxytype = self._manager._registry[token.typeid][-1]
842 token.address = self._token.address
843 proxy = proxytype(
844 token, self._serializer, manager=self._manager,
845 authkey=self._authkey, exposed=exposed
846 )
847 conn = self._Client(token.address, authkey=self._authkey)
848 dispatch(conn, None, 'decref', (token.id,))
849 return proxy
850 raise convert_to_error(kind, result)
851
852 def _getvalue(self):
853 '''
854 Get a copy of the value of the referent
855 '''
856 return self._callmethod('#GETVALUE')
857
858 def _incref(self):
859 if self._owned_by_manager:
860 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
861 return
862
863 conn = self._Client(self._token.address, authkey=self._authkey)
864 dispatch(conn, None, 'incref', (self._id,))
865 util.debug('INCREF %r', self._token.id)
866
867 self._idset.add(self._id)
868
869 state = self._manager and self._manager._state
870
871 self._close = util.Finalize(
872 self, BaseProxy._decref,
873 args=(self._token, self._authkey, state,
874 self._tls, self._idset, self._Client),
875 exitpriority=10
876 )
877
878 @staticmethod
879 def _decref(token, authkey, state, tls, idset, _Client):
880 idset.discard(token.id)
881
882 # check whether manager is still alive
883 if state is None or state.value == State.STARTED:
884 # tell manager this process no longer cares about referent
885 try:
886 util.debug('DECREF %r', token.id)
887 conn = _Client(token.address, authkey=authkey)
888 dispatch(conn, None, 'decref', (token.id,))
889 except Exception as e:
890 util.debug('... decref failed %s', e)
891
892 else:
893 util.debug('DECREF %r -- manager already shutdown', token.id)
894
895 # check whether we can close this thread's connection because
896 # the process owns no more references to objects for this manager
897 if not idset and hasattr(tls, 'connection'):
898 util.debug('thread %r has no more proxies so closing conn',
899 threading.current_thread().name)
900 tls.connection.close()
901 del tls.connection
902
903 def _after_fork(self):
904 self._manager = None
905 try:
906 self._incref()
907 except Exception as e:
908 # the proxy may just be for a manager which has shutdown
909 util.info('incref failed: %s' % e)
910
911 def __reduce__(self):
912 kwds = {}
913 if get_spawning_popen() is not None:
914 kwds['authkey'] = self._authkey
915
916 if getattr(self, '_isauto', False):
917 kwds['exposed'] = self._exposed_
918 return (RebuildProxy,
919 (AutoProxy, self._token, self._serializer, kwds))
920 else:
921 return (RebuildProxy,
922 (type(self), self._token, self._serializer, kwds))
923
924 def __deepcopy__(self, memo):
925 return self._getvalue()
926
927 def __repr__(self):
928 return '<%s object, typeid %r at %#x>' % \
929 (type(self).__name__, self._token.typeid, id(self))
930
931 def __str__(self):
932 '''
933 Return representation of the referent (or a fall-back if that fails)
934 '''
935 try:
936 return self._callmethod('__repr__')
937 except Exception:
938 return repr(self)[:-1] + "; '__str__()' failed>"
939
940 #
941 # Function used for unpickling
942 #
943
944 def RebuildProxy(func, token, serializer, kwds):
945 '''
946 Function used for unpickling proxy objects.
947 '''
948 server = getattr(process.current_process(), '_manager_server', None)
949 if server and server.address == token.address:
950 util.debug('Rebuild a proxy owned by manager, token=%r', token)
951 kwds['manager_owned'] = True
952 if token.id not in server.id_to_local_proxy_obj:
953 server.id_to_local_proxy_obj[token.id] = \
954 server.id_to_obj[token.id]
955 incref = (
956 kwds.pop('incref', True) and
957 not getattr(process.current_process(), '_inheriting', False)
958 )
959 return func(token, serializer, incref=incref, **kwds)
960
961 #
962 # Functions to create proxies and proxy types
963 #
964
965 def MakeProxyType(name, exposed, _cache={}):
966 '''
967 Return a proxy type whose methods are given by `exposed`
968 '''
969 exposed = tuple(exposed)
970 try:
971 return _cache[(name, exposed)]
972 except KeyError:
973 pass
974
975 dic = {}
976
977 for meth in exposed:
978 exec('''def %s(self, /, *args, **kwds):
979 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
980
981 ProxyType = type(name, (BaseProxy,), dic)
982 ProxyType._exposed_ = exposed
983 _cache[(name, exposed)] = ProxyType
984 return ProxyType
985
986
987 def AutoProxy(token, serializer, manager=None, authkey=None,
988 exposed=None, incref=True):
989 '''
990 Return an auto-proxy for `token`
991 '''
992 _Client = listener_client[serializer][1]
993
994 if exposed is None:
995 conn = _Client(token.address, authkey=authkey)
996 try:
997 exposed = dispatch(conn, None, 'get_methods', (token,))
998 finally:
999 conn.close()
1000
1001 if authkey is None and manager is not None:
1002 authkey = manager._authkey
1003 if authkey is None:
1004 authkey = process.current_process().authkey
1005
1006 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
1007 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
1008 incref=incref)
1009 proxy._isauto = True
1010 return proxy
1011
1012 #
1013 # Types/callables which we will register with SyncManager
1014 #
1015
1016 class Namespace(object):
1017 def __init__(self, /, **kwds):
1018 self.__dict__.update(kwds)
1019 def __repr__(self):
1020 items = list(self.__dict__.items())
1021 temp = []
1022 for name, value in items:
1023 if not name.startswith('_'):
1024 temp.append('%s=%r' % (name, value))
1025 temp.sort()
1026 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1027
1028 class Value(object):
1029 def __init__(self, typecode, value, lock=True):
1030 self._typecode = typecode
1031 self._value = value
1032 def get(self):
1033 return self._value
1034 def set(self, value):
1035 self._value = value
1036 def __repr__(self):
1037 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1038 value = property(get, set)
1039
1040 def Array(typecode, sequence, lock=True):
1041 return array.array(typecode, sequence)
1042
1043 #
1044 # Proxy types used by SyncManager
1045 #
1046
1047 class IteratorProxy(BaseProxy):
1048 _exposed_ = ('__next__', 'send', 'throw', 'close')
1049 def __iter__(self):
1050 return self
1051 def __next__(self, *args):
1052 return self._callmethod('__next__', args)
1053 def send(self, *args):
1054 return self._callmethod('send', args)
1055 def throw(self, *args):
1056 return self._callmethod('throw', args)
1057 def close(self, *args):
1058 return self._callmethod('close', args)
1059
1060
1061 class AcquirerProxy(BaseProxy):
1062 _exposed_ = ('acquire', 'release')
1063 def acquire(self, blocking=True, timeout=None):
1064 args = (blocking,) if timeout is None else (blocking, timeout)
1065 return self._callmethod('acquire', args)
1066 def release(self):
1067 return self._callmethod('release')
1068 def __enter__(self):
1069 return self._callmethod('acquire')
1070 def __exit__(self, exc_type, exc_val, exc_tb):
1071 return self._callmethod('release')
1072
1073
1074 class ConditionProxy(AcquirerProxy):
1075 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1076 def wait(self, timeout=None):
1077 return self._callmethod('wait', (timeout,))
1078 def notify(self, n=1):
1079 return self._callmethod('notify', (n,))
1080 def notify_all(self):
1081 return self._callmethod('notify_all')
1082 def wait_for(self, predicate, timeout=None):
1083 result = predicate()
1084 if result:
1085 return result
1086 if timeout is not None:
1087 endtime = time.monotonic() + timeout
1088 else:
1089 endtime = None
1090 waittime = None
1091 while not result:
1092 if endtime is not None:
1093 waittime = endtime - time.monotonic()
1094 if waittime <= 0:
1095 break
1096 self.wait(waittime)
1097 result = predicate()
1098 return result
1099
1100
1101 class EventProxy(BaseProxy):
1102 _exposed_ = ('is_set', 'set', 'clear', 'wait')
1103 def is_set(self):
1104 return self._callmethod('is_set')
1105 def set(self):
1106 return self._callmethod('set')
1107 def clear(self):
1108 return self._callmethod('clear')
1109 def wait(self, timeout=None):
1110 return self._callmethod('wait', (timeout,))
1111
1112
1113 class BarrierProxy(BaseProxy):
1114 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1115 def wait(self, timeout=None):
1116 return self._callmethod('wait', (timeout,))
1117 def abort(self):
1118 return self._callmethod('abort')
1119 def reset(self):
1120 return self._callmethod('reset')
1121 @property
1122 def parties(self):
1123 return self._callmethod('__getattribute__', ('parties',))
1124 @property
1125 def n_waiting(self):
1126 return self._callmethod('__getattribute__', ('n_waiting',))
1127 @property
1128 def broken(self):
1129 return self._callmethod('__getattribute__', ('broken',))
1130
1131
1132 class NamespaceProxy(BaseProxy):
1133 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1134 def __getattr__(self, key):
1135 if key[0] == '_':
1136 return object.__getattribute__(self, key)
1137 callmethod = object.__getattribute__(self, '_callmethod')
1138 return callmethod('__getattribute__', (key,))
1139 def __setattr__(self, key, value):
1140 if key[0] == '_':
1141 return object.__setattr__(self, key, value)
1142 callmethod = object.__getattribute__(self, '_callmethod')
1143 return callmethod('__setattr__', (key, value))
1144 def __delattr__(self, key):
1145 if key[0] == '_':
1146 return object.__delattr__(self, key)
1147 callmethod = object.__getattribute__(self, '_callmethod')
1148 return callmethod('__delattr__', (key,))
1149
1150
1151 class ValueProxy(BaseProxy):
1152 _exposed_ = ('get', 'set')
1153 def get(self):
1154 return self._callmethod('get')
1155 def set(self, value):
1156 return self._callmethod('set', (value,))
1157 value = property(get, set)
1158
1159
1160 BaseListProxy = MakeProxyType('BaseListProxy', (
1161 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1162 '__mul__', '__reversed__', '__rmul__', '__setitem__',
1163 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1164 'reverse', 'sort', '__imul__'
1165 ))
1166 class ListProxy(BaseListProxy):
1167 def __iadd__(self, value):
1168 self._callmethod('extend', (value,))
1169 return self
1170 def __imul__(self, value):
1171 self._callmethod('__imul__', (value,))
1172 return self
1173
1174
1175 DictProxy = MakeProxyType('DictProxy', (
1176 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1177 '__setitem__', 'clear', 'copy', 'get', 'items',
1178 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1179 ))
1180 DictProxy._method_to_typeid_ = {
1181 '__iter__': 'Iterator',
1182 }
1183
1184
1185 ArrayProxy = MakeProxyType('ArrayProxy', (
1186 '__len__', '__getitem__', '__setitem__'
1187 ))
1188
1189
1190 BasePoolProxy = MakeProxyType('PoolProxy', (
1191 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1192 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1193 ))
1194 BasePoolProxy._method_to_typeid_ = {
1195 'apply_async': 'AsyncResult',
1196 'map_async': 'AsyncResult',
1197 'starmap_async': 'AsyncResult',
1198 'imap': 'Iterator',
1199 'imap_unordered': 'Iterator'
1200 }
1201 class PoolProxy(BasePoolProxy):
1202 def __enter__(self):
1203 return self
1204 def __exit__(self, exc_type, exc_val, exc_tb):
1205 self.terminate()
1206
1207 #
1208 # Definition of SyncManager
1209 #
1210
1211 class SyncManager(BaseManager):
1212 '''
1213 Subclass of `BaseManager` which supports a number of shared object types.
1214
1215 The types registered are those intended for the synchronization
1216 of threads, plus `dict`, `list` and `Namespace`.
1217
1218 The `multiprocessing.Manager()` function creates started instances of
1219 this class.
1220 '''
1221
1222 SyncManager.register('Queue', queue.Queue)
1223 SyncManager.register('JoinableQueue', queue.Queue)
1224 SyncManager.register('Event', threading.Event, EventProxy)
1225 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1226 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1227 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1228 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1229 AcquirerProxy)
1230 SyncManager.register('Condition', threading.Condition, ConditionProxy)
1231 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1232 SyncManager.register('Pool', pool.Pool, PoolProxy)
1233 SyncManager.register('list', list, ListProxy)
1234 SyncManager.register('dict', dict, DictProxy)
1235 SyncManager.register('Value', Value, ValueProxy)
1236 SyncManager.register('Array', Array, ArrayProxy)
1237 SyncManager.register('Namespace', Namespace, NamespaceProxy)
1238
1239 # types returned by methods of PoolProxy
1240 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1241 SyncManager.register('AsyncResult', create_method=False)
1242
1243 #
1244 # Definition of SharedMemoryManager and SharedMemoryServer
1245 #
1246
1247 if HAS_SHMEM:
1248 class _SharedMemoryTracker:
1249 "Manages one or more shared memory segments."
1250
1251 def __init__(self, name, segment_names=[]):
1252 self.shared_memory_context_name = name
1253 self.segment_names = segment_names
1254
1255 def register_segment(self, segment_name):
1256 "Adds the supplied shared memory block name to tracker."
1257 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1258 self.segment_names.append(segment_name)
1259
1260 def destroy_segment(self, segment_name):
1261 """Calls unlink() on the shared memory block with the supplied name
1262 and removes it from the list of blocks being tracked."""
1263 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1264 self.segment_names.remove(segment_name)
1265 segment = shared_memory.SharedMemory(segment_name)
1266 segment.close()
1267 segment.unlink()
1268
1269 def unlink(self):
1270 "Calls destroy_segment() on all tracked shared memory blocks."
1271 for segment_name in self.segment_names[:]:
1272 self.destroy_segment(segment_name)
1273
1274 def __del__(self):
1275 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1276 self.unlink()
1277
1278 def __getstate__(self):
1279 return (self.shared_memory_context_name, self.segment_names)
1280
1281 def __setstate__(self, state):
1282 self.__init__(*state)
1283
1284
1285 class SharedMemoryServer(Server):
1286
1287 public = Server.public + \
1288 ['track_segment', 'release_segment', 'list_segments']
1289
1290 def __init__(self, *args, **kwargs):
1291 Server.__init__(self, *args, **kwargs)
1292 self.shared_memory_context = \
1293 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1294 util.debug(f"SharedMemoryServer started by pid {getpid()}")
1295
1296 def create(*args, **kwargs):
1297 """Create a new distributed-shared object (not backed by a shared
1298 memory block) and return its id to be used in a Proxy Object."""
1299 # Unless set up as a shared proxy, don't make shared_memory_context
1300 # a standard part of kwargs. This makes things easier for supplying
1301 # simple functions.
1302 if len(args) >= 3:
1303 typeod = args[2]
1304 elif 'typeid' in kwargs:
1305 typeid = kwargs['typeid']
1306 elif not args:
1307 raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
1308 "object needs an argument")
1309 else:
1310 raise TypeError('create expected at least 2 positional '
1311 'arguments, got %d' % (len(args)-1))
1312 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1313 kwargs['shared_memory_context'] = self.shared_memory_context
1314 return Server.create(*args, **kwargs)
1315 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)'
1316
1317 def shutdown(self, c):
1318 "Call unlink() on all tracked shared memory, terminate the Server."
1319 self.shared_memory_context.unlink()
1320 return Server.shutdown(self, c)
1321
1322 def track_segment(self, c, segment_name):
1323 "Adds the supplied shared memory block name to Server's tracker."
1324 self.shared_memory_context.register_segment(segment_name)
1325
1326 def release_segment(self, c, segment_name):
1327 """Calls unlink() on the shared memory block with the supplied name
1328 and removes it from the tracker instance inside the Server."""
1329 self.shared_memory_context.destroy_segment(segment_name)
1330
1331 def list_segments(self, c):
1332 """Returns a list of names of shared memory blocks that the Server
1333 is currently tracking."""
1334 return self.shared_memory_context.segment_names
1335
1336
1337 class SharedMemoryManager(BaseManager):
1338 """Like SyncManager but uses SharedMemoryServer instead of Server.
1339
1340 It provides methods for creating and returning SharedMemory instances
1341 and for creating a list-like object (ShareableList) backed by shared
1342 memory. It also provides methods that create and return Proxy Objects
1343 that support synchronization across processes (i.e. multi-process-safe
1344 locks and semaphores).
1345 """
1346
1347 _Server = SharedMemoryServer
1348
1349 def __init__(self, *args, **kwargs):
1350 if os.name == "posix":
1351 # bpo-36867: Ensure the resource_tracker is running before
1352 # launching the manager process, so that concurrent
1353 # shared_memory manipulation both in the manager and in the
1354 # current process does not create two resource_tracker
1355 # processes.
1356 from . import resource_tracker
1357 resource_tracker.ensure_running()
1358 BaseManager.__init__(self, *args, **kwargs)
1359 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1360
1361 def __del__(self):
1362 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1363 pass
1364
1365 def get_server(self):
1366 'Better than monkeypatching for now; merge into Server ultimately'
1367 if self._state.value != State.INITIAL:
1368 if self._state.value == State.STARTED:
1369 raise ProcessError("Already started SharedMemoryServer")
1370 elif self._state.value == State.SHUTDOWN:
1371 raise ProcessError("SharedMemoryManager has shut down")
1372 else:
1373 raise ProcessError(
1374 "Unknown state {!r}".format(self._state.value))
1375 return self._Server(self._registry, self._address,
1376 self._authkey, self._serializer)
1377
1378 def SharedMemory(self, size):
1379 """Returns a new SharedMemory instance with the specified size in
1380 bytes, to be tracked by the manager."""
1381 with self._Client(self._address, authkey=self._authkey) as conn:
1382 sms = shared_memory.SharedMemory(None, create=True, size=size)
1383 try:
1384 dispatch(conn, None, 'track_segment', (sms.name,))
1385 except BaseException as e:
1386 sms.unlink()
1387 raise e
1388 return sms
1389
1390 def ShareableList(self, sequence):
1391 """Returns a new ShareableList instance populated with the values
1392 from the input sequence, to be tracked by the manager."""
1393 with self._Client(self._address, authkey=self._authkey) as conn:
1394 sl = shared_memory.ShareableList(sequence)
1395 try:
1396 dispatch(conn, None, 'track_segment', (sl.shm.name,))
1397 except BaseException as e:
1398 sl.shm.unlink()
1399 raise e
1400 return sl