jpayne@69: # jpayne@69: # Module providing manager classes for dealing jpayne@69: # with shared objects jpayne@69: # jpayne@69: # multiprocessing/managers.py jpayne@69: # jpayne@69: # Copyright (c) 2006-2008, R Oudkerk jpayne@69: # Licensed to PSF under a Contributor Agreement. jpayne@69: # jpayne@69: jpayne@69: __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token', jpayne@69: 'SharedMemoryManager' ] jpayne@69: jpayne@69: # jpayne@69: # Imports jpayne@69: # jpayne@69: jpayne@69: import sys jpayne@69: import threading jpayne@69: import signal jpayne@69: import array jpayne@69: import queue jpayne@69: import time jpayne@69: import os jpayne@69: from os import getpid jpayne@69: jpayne@69: from traceback import format_exc jpayne@69: jpayne@69: from . import connection jpayne@69: from .context import reduction, get_spawning_popen, ProcessError jpayne@69: from . import pool jpayne@69: from . import process jpayne@69: from . import util jpayne@69: from . import get_context jpayne@69: try: jpayne@69: from . import shared_memory jpayne@69: HAS_SHMEM = True jpayne@69: except ImportError: jpayne@69: HAS_SHMEM = False jpayne@69: jpayne@69: # jpayne@69: # Register some things for pickling jpayne@69: # jpayne@69: jpayne@69: def reduce_array(a): jpayne@69: return array.array, (a.typecode, a.tobytes()) jpayne@69: reduction.register(array.array, reduce_array) jpayne@69: jpayne@69: view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] jpayne@69: if view_types[0] is not list: # only needed in Py3.0 jpayne@69: def rebuild_as_list(obj): jpayne@69: return list, (list(obj),) jpayne@69: for view_type in view_types: jpayne@69: reduction.register(view_type, rebuild_as_list) jpayne@69: jpayne@69: # jpayne@69: # Type for identifying shared objects jpayne@69: # jpayne@69: jpayne@69: class Token(object): jpayne@69: ''' jpayne@69: Type to uniquely indentify a shared object jpayne@69: ''' jpayne@69: __slots__ = ('typeid', 'address', 'id') jpayne@69: jpayne@69: def __init__(self, typeid, address, id): jpayne@69: (self.typeid, self.address, self.id) = (typeid, address, id) jpayne@69: jpayne@69: def __getstate__(self): jpayne@69: return (self.typeid, self.address, self.id) jpayne@69: jpayne@69: def __setstate__(self, state): jpayne@69: (self.typeid, self.address, self.id) = state jpayne@69: jpayne@69: def __repr__(self): jpayne@69: return '%s(typeid=%r, address=%r, id=%r)' % \ jpayne@69: (self.__class__.__name__, self.typeid, self.address, self.id) jpayne@69: jpayne@69: # jpayne@69: # Function for communication with a manager's server process jpayne@69: # jpayne@69: jpayne@69: def dispatch(c, id, methodname, args=(), kwds={}): jpayne@69: ''' jpayne@69: Send a message to manager using connection `c` and return response jpayne@69: ''' jpayne@69: c.send((id, methodname, args, kwds)) jpayne@69: kind, result = c.recv() jpayne@69: if kind == '#RETURN': jpayne@69: return result jpayne@69: raise convert_to_error(kind, result) jpayne@69: jpayne@69: def convert_to_error(kind, result): jpayne@69: if kind == '#ERROR': jpayne@69: return result jpayne@69: elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): jpayne@69: if not isinstance(result, str): jpayne@69: raise TypeError( jpayne@69: "Result {0!r} (kind '{1}') type is {2}, not str".format( jpayne@69: result, kind, type(result))) jpayne@69: if kind == '#UNSERIALIZABLE': jpayne@69: return RemoteError('Unserializable message: %s\n' % result) jpayne@69: else: jpayne@69: return RemoteError(result) jpayne@69: else: jpayne@69: return ValueError('Unrecognized message type {!r}'.format(kind)) jpayne@69: jpayne@69: class RemoteError(Exception): jpayne@69: def __str__(self): jpayne@69: return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) jpayne@69: jpayne@69: # jpayne@69: # Functions for finding the method names of an object jpayne@69: # jpayne@69: jpayne@69: def all_methods(obj): jpayne@69: ''' jpayne@69: Return a list of names of methods of `obj` jpayne@69: ''' jpayne@69: temp = [] jpayne@69: for name in dir(obj): jpayne@69: func = getattr(obj, name) jpayne@69: if callable(func): jpayne@69: temp.append(name) jpayne@69: return temp jpayne@69: jpayne@69: def public_methods(obj): jpayne@69: ''' jpayne@69: Return a list of names of methods of `obj` which do not start with '_' jpayne@69: ''' jpayne@69: return [name for name in all_methods(obj) if name[0] != '_'] jpayne@69: jpayne@69: # jpayne@69: # Server which is run in a process controlled by a manager jpayne@69: # jpayne@69: jpayne@69: class Server(object): jpayne@69: ''' jpayne@69: Server class which runs in a process controlled by a manager object jpayne@69: ''' jpayne@69: public = ['shutdown', 'create', 'accept_connection', 'get_methods', jpayne@69: 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] jpayne@69: jpayne@69: def __init__(self, registry, address, authkey, serializer): jpayne@69: if not isinstance(authkey, bytes): jpayne@69: raise TypeError( jpayne@69: "Authkey {0!r} is type {1!s}, not bytes".format( jpayne@69: authkey, type(authkey))) jpayne@69: self.registry = registry jpayne@69: self.authkey = process.AuthenticationString(authkey) jpayne@69: Listener, Client = listener_client[serializer] jpayne@69: jpayne@69: # do authentication later jpayne@69: self.listener = Listener(address=address, backlog=16) jpayne@69: self.address = self.listener.address jpayne@69: jpayne@69: self.id_to_obj = {'0': (None, ())} jpayne@69: self.id_to_refcount = {} jpayne@69: self.id_to_local_proxy_obj = {} jpayne@69: self.mutex = threading.Lock() jpayne@69: jpayne@69: def serve_forever(self): jpayne@69: ''' jpayne@69: Run the server forever jpayne@69: ''' jpayne@69: self.stop_event = threading.Event() jpayne@69: process.current_process()._manager_server = self jpayne@69: try: jpayne@69: accepter = threading.Thread(target=self.accepter) jpayne@69: accepter.daemon = True jpayne@69: accepter.start() jpayne@69: try: jpayne@69: while not self.stop_event.is_set(): jpayne@69: self.stop_event.wait(1) jpayne@69: except (KeyboardInterrupt, SystemExit): jpayne@69: pass jpayne@69: finally: jpayne@69: if sys.stdout != sys.__stdout__: # what about stderr? jpayne@69: util.debug('resetting stdout, stderr') jpayne@69: sys.stdout = sys.__stdout__ jpayne@69: sys.stderr = sys.__stderr__ jpayne@69: sys.exit(0) jpayne@69: jpayne@69: def accepter(self): jpayne@69: while True: jpayne@69: try: jpayne@69: c = self.listener.accept() jpayne@69: except OSError: jpayne@69: continue jpayne@69: t = threading.Thread(target=self.handle_request, args=(c,)) jpayne@69: t.daemon = True jpayne@69: t.start() jpayne@69: jpayne@69: def handle_request(self, c): jpayne@69: ''' jpayne@69: Handle a new connection jpayne@69: ''' jpayne@69: funcname = result = request = None jpayne@69: try: jpayne@69: connection.deliver_challenge(c, self.authkey) jpayne@69: connection.answer_challenge(c, self.authkey) jpayne@69: request = c.recv() jpayne@69: ignore, funcname, args, kwds = request jpayne@69: assert funcname in self.public, '%r unrecognized' % funcname jpayne@69: func = getattr(self, funcname) jpayne@69: except Exception: jpayne@69: msg = ('#TRACEBACK', format_exc()) jpayne@69: else: jpayne@69: try: jpayne@69: result = func(c, *args, **kwds) jpayne@69: except Exception: jpayne@69: msg = ('#TRACEBACK', format_exc()) jpayne@69: else: jpayne@69: msg = ('#RETURN', result) jpayne@69: try: jpayne@69: c.send(msg) jpayne@69: except Exception as e: jpayne@69: try: jpayne@69: c.send(('#TRACEBACK', format_exc())) jpayne@69: except Exception: jpayne@69: pass jpayne@69: util.info('Failure to send message: %r', msg) jpayne@69: util.info(' ... request was %r', request) jpayne@69: util.info(' ... exception was %r', e) jpayne@69: jpayne@69: c.close() jpayne@69: jpayne@69: def serve_client(self, conn): jpayne@69: ''' jpayne@69: Handle requests from the proxies in a particular process/thread jpayne@69: ''' jpayne@69: util.debug('starting server thread to service %r', jpayne@69: threading.current_thread().name) jpayne@69: jpayne@69: recv = conn.recv jpayne@69: send = conn.send jpayne@69: id_to_obj = self.id_to_obj jpayne@69: jpayne@69: while not self.stop_event.is_set(): jpayne@69: jpayne@69: try: jpayne@69: methodname = obj = None jpayne@69: request = recv() jpayne@69: ident, methodname, args, kwds = request jpayne@69: try: jpayne@69: obj, exposed, gettypeid = id_to_obj[ident] jpayne@69: except KeyError as ke: jpayne@69: try: jpayne@69: obj, exposed, gettypeid = \ jpayne@69: self.id_to_local_proxy_obj[ident] jpayne@69: except KeyError as second_ke: jpayne@69: raise ke jpayne@69: jpayne@69: if methodname not in exposed: jpayne@69: raise AttributeError( jpayne@69: 'method %r of %r object is not in exposed=%r' % jpayne@69: (methodname, type(obj), exposed) jpayne@69: ) jpayne@69: jpayne@69: function = getattr(obj, methodname) jpayne@69: jpayne@69: try: jpayne@69: res = function(*args, **kwds) jpayne@69: except Exception as e: jpayne@69: msg = ('#ERROR', e) jpayne@69: else: jpayne@69: typeid = gettypeid and gettypeid.get(methodname, None) jpayne@69: if typeid: jpayne@69: rident, rexposed = self.create(conn, typeid, res) jpayne@69: token = Token(typeid, self.address, rident) jpayne@69: msg = ('#PROXY', (rexposed, token)) jpayne@69: else: jpayne@69: msg = ('#RETURN', res) jpayne@69: jpayne@69: except AttributeError: jpayne@69: if methodname is None: jpayne@69: msg = ('#TRACEBACK', format_exc()) jpayne@69: else: jpayne@69: try: jpayne@69: fallback_func = self.fallback_mapping[methodname] jpayne@69: result = fallback_func( jpayne@69: self, conn, ident, obj, *args, **kwds jpayne@69: ) jpayne@69: msg = ('#RETURN', result) jpayne@69: except Exception: jpayne@69: msg = ('#TRACEBACK', format_exc()) jpayne@69: jpayne@69: except EOFError: jpayne@69: util.debug('got EOF -- exiting thread serving %r', jpayne@69: threading.current_thread().name) jpayne@69: sys.exit(0) jpayne@69: jpayne@69: except Exception: jpayne@69: msg = ('#TRACEBACK', format_exc()) jpayne@69: jpayne@69: try: jpayne@69: try: jpayne@69: send(msg) jpayne@69: except Exception as e: jpayne@69: send(('#UNSERIALIZABLE', format_exc())) jpayne@69: except Exception as e: jpayne@69: util.info('exception in thread serving %r', jpayne@69: threading.current_thread().name) jpayne@69: util.info(' ... message was %r', msg) jpayne@69: util.info(' ... exception was %r', e) jpayne@69: conn.close() jpayne@69: sys.exit(1) jpayne@69: jpayne@69: def fallback_getvalue(self, conn, ident, obj): jpayne@69: return obj jpayne@69: jpayne@69: def fallback_str(self, conn, ident, obj): jpayne@69: return str(obj) jpayne@69: jpayne@69: def fallback_repr(self, conn, ident, obj): jpayne@69: return repr(obj) jpayne@69: jpayne@69: fallback_mapping = { jpayne@69: '__str__':fallback_str, jpayne@69: '__repr__':fallback_repr, jpayne@69: '#GETVALUE':fallback_getvalue jpayne@69: } jpayne@69: jpayne@69: def dummy(self, c): jpayne@69: pass jpayne@69: jpayne@69: def debug_info(self, c): jpayne@69: ''' jpayne@69: Return some info --- useful to spot problems with refcounting jpayne@69: ''' jpayne@69: # Perhaps include debug info about 'c'? jpayne@69: with self.mutex: jpayne@69: result = [] jpayne@69: keys = list(self.id_to_refcount.keys()) jpayne@69: keys.sort() jpayne@69: for ident in keys: jpayne@69: if ident != '0': jpayne@69: result.append(' %s: refcount=%s\n %s' % jpayne@69: (ident, self.id_to_refcount[ident], jpayne@69: str(self.id_to_obj[ident][0])[:75])) jpayne@69: return '\n'.join(result) jpayne@69: jpayne@69: def number_of_objects(self, c): jpayne@69: ''' jpayne@69: Number of shared objects jpayne@69: ''' jpayne@69: # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' jpayne@69: return len(self.id_to_refcount) jpayne@69: jpayne@69: def shutdown(self, c): jpayne@69: ''' jpayne@69: Shutdown this process jpayne@69: ''' jpayne@69: try: jpayne@69: util.debug('manager received shutdown message') jpayne@69: c.send(('#RETURN', None)) jpayne@69: except: jpayne@69: import traceback jpayne@69: traceback.print_exc() jpayne@69: finally: jpayne@69: self.stop_event.set() jpayne@69: jpayne@69: def create(*args, **kwds): jpayne@69: ''' jpayne@69: Create a new shared object and return its id jpayne@69: ''' jpayne@69: if len(args) >= 3: jpayne@69: self, c, typeid, *args = args jpayne@69: elif not args: jpayne@69: raise TypeError("descriptor 'create' of 'Server' object " jpayne@69: "needs an argument") jpayne@69: else: jpayne@69: if 'typeid' not in kwds: jpayne@69: raise TypeError('create expected at least 2 positional ' jpayne@69: 'arguments, got %d' % (len(args)-1)) jpayne@69: typeid = kwds.pop('typeid') jpayne@69: if len(args) >= 2: jpayne@69: self, c, *args = args jpayne@69: import warnings jpayne@69: warnings.warn("Passing 'typeid' as keyword argument is deprecated", jpayne@69: DeprecationWarning, stacklevel=2) jpayne@69: else: jpayne@69: if 'c' not in kwds: jpayne@69: raise TypeError('create expected at least 2 positional ' jpayne@69: 'arguments, got %d' % (len(args)-1)) jpayne@69: c = kwds.pop('c') jpayne@69: self, *args = args jpayne@69: import warnings jpayne@69: warnings.warn("Passing 'c' as keyword argument is deprecated", jpayne@69: DeprecationWarning, stacklevel=2) jpayne@69: args = tuple(args) jpayne@69: jpayne@69: with self.mutex: jpayne@69: callable, exposed, method_to_typeid, proxytype = \ jpayne@69: self.registry[typeid] jpayne@69: jpayne@69: if callable is None: jpayne@69: if kwds or (len(args) != 1): jpayne@69: raise ValueError( jpayne@69: "Without callable, must have one non-keyword argument") jpayne@69: obj = args[0] jpayne@69: else: jpayne@69: obj = callable(*args, **kwds) jpayne@69: jpayne@69: if exposed is None: jpayne@69: exposed = public_methods(obj) jpayne@69: if method_to_typeid is not None: jpayne@69: if not isinstance(method_to_typeid, dict): jpayne@69: raise TypeError( jpayne@69: "Method_to_typeid {0!r}: type {1!s}, not dict".format( jpayne@69: method_to_typeid, type(method_to_typeid))) jpayne@69: exposed = list(exposed) + list(method_to_typeid) jpayne@69: jpayne@69: ident = '%x' % id(obj) # convert to string because xmlrpclib jpayne@69: # only has 32 bit signed integers jpayne@69: util.debug('%r callable returned object with id %r', typeid, ident) jpayne@69: jpayne@69: self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) jpayne@69: if ident not in self.id_to_refcount: jpayne@69: self.id_to_refcount[ident] = 0 jpayne@69: jpayne@69: self.incref(c, ident) jpayne@69: return ident, tuple(exposed) jpayne@69: create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)' jpayne@69: jpayne@69: def get_methods(self, c, token): jpayne@69: ''' jpayne@69: Return the methods of the shared object indicated by token jpayne@69: ''' jpayne@69: return tuple(self.id_to_obj[token.id][1]) jpayne@69: jpayne@69: def accept_connection(self, c, name): jpayne@69: ''' jpayne@69: Spawn a new thread to serve this connection jpayne@69: ''' jpayne@69: threading.current_thread().name = name jpayne@69: c.send(('#RETURN', None)) jpayne@69: self.serve_client(c) jpayne@69: jpayne@69: def incref(self, c, ident): jpayne@69: with self.mutex: jpayne@69: try: jpayne@69: self.id_to_refcount[ident] += 1 jpayne@69: except KeyError as ke: jpayne@69: # If no external references exist but an internal (to the jpayne@69: # manager) still does and a new external reference is created jpayne@69: # from it, restore the manager's tracking of it from the jpayne@69: # previously stashed internal ref. jpayne@69: if ident in self.id_to_local_proxy_obj: jpayne@69: self.id_to_refcount[ident] = 1 jpayne@69: self.id_to_obj[ident] = \ jpayne@69: self.id_to_local_proxy_obj[ident] jpayne@69: obj, exposed, gettypeid = self.id_to_obj[ident] jpayne@69: util.debug('Server re-enabled tracking & INCREF %r', ident) jpayne@69: else: jpayne@69: raise ke jpayne@69: jpayne@69: def decref(self, c, ident): jpayne@69: if ident not in self.id_to_refcount and \ jpayne@69: ident in self.id_to_local_proxy_obj: jpayne@69: util.debug('Server DECREF skipping %r', ident) jpayne@69: return jpayne@69: jpayne@69: with self.mutex: jpayne@69: if self.id_to_refcount[ident] <= 0: jpayne@69: raise AssertionError( jpayne@69: "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( jpayne@69: ident, self.id_to_obj[ident], jpayne@69: self.id_to_refcount[ident])) jpayne@69: self.id_to_refcount[ident] -= 1 jpayne@69: if self.id_to_refcount[ident] == 0: jpayne@69: del self.id_to_refcount[ident] jpayne@69: jpayne@69: if ident not in self.id_to_refcount: jpayne@69: # Two-step process in case the object turns out to contain other jpayne@69: # proxy objects (e.g. a managed list of managed lists). jpayne@69: # Otherwise, deleting self.id_to_obj[ident] would trigger the jpayne@69: # deleting of the stored value (another managed object) which would jpayne@69: # in turn attempt to acquire the mutex that is already held here. jpayne@69: self.id_to_obj[ident] = (None, (), None) # thread-safe jpayne@69: util.debug('disposing of obj with id %r', ident) jpayne@69: with self.mutex: jpayne@69: del self.id_to_obj[ident] jpayne@69: jpayne@69: jpayne@69: # jpayne@69: # Class to represent state of a manager jpayne@69: # jpayne@69: jpayne@69: class State(object): jpayne@69: __slots__ = ['value'] jpayne@69: INITIAL = 0 jpayne@69: STARTED = 1 jpayne@69: SHUTDOWN = 2 jpayne@69: jpayne@69: # jpayne@69: # Mapping from serializer name to Listener and Client types jpayne@69: # jpayne@69: jpayne@69: listener_client = { jpayne@69: 'pickle' : (connection.Listener, connection.Client), jpayne@69: 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) jpayne@69: } jpayne@69: jpayne@69: # jpayne@69: # Definition of BaseManager jpayne@69: # jpayne@69: jpayne@69: class BaseManager(object): jpayne@69: ''' jpayne@69: Base class for managers jpayne@69: ''' jpayne@69: _registry = {} jpayne@69: _Server = Server jpayne@69: jpayne@69: def __init__(self, address=None, authkey=None, serializer='pickle', jpayne@69: ctx=None): jpayne@69: if authkey is None: jpayne@69: authkey = process.current_process().authkey jpayne@69: self._address = address # XXX not final address if eg ('', 0) jpayne@69: self._authkey = process.AuthenticationString(authkey) jpayne@69: self._state = State() jpayne@69: self._state.value = State.INITIAL jpayne@69: self._serializer = serializer jpayne@69: self._Listener, self._Client = listener_client[serializer] jpayne@69: self._ctx = ctx or get_context() jpayne@69: jpayne@69: def get_server(self): jpayne@69: ''' jpayne@69: Return server object with serve_forever() method and address attribute jpayne@69: ''' jpayne@69: if self._state.value != State.INITIAL: jpayne@69: if self._state.value == State.STARTED: jpayne@69: raise ProcessError("Already started server") jpayne@69: elif self._state.value == State.SHUTDOWN: jpayne@69: raise ProcessError("Manager has shut down") jpayne@69: else: jpayne@69: raise ProcessError( jpayne@69: "Unknown state {!r}".format(self._state.value)) jpayne@69: return Server(self._registry, self._address, jpayne@69: self._authkey, self._serializer) jpayne@69: jpayne@69: def connect(self): jpayne@69: ''' jpayne@69: Connect manager object to the server process jpayne@69: ''' jpayne@69: Listener, Client = listener_client[self._serializer] jpayne@69: conn = Client(self._address, authkey=self._authkey) jpayne@69: dispatch(conn, None, 'dummy') jpayne@69: self._state.value = State.STARTED jpayne@69: jpayne@69: def start(self, initializer=None, initargs=()): jpayne@69: ''' jpayne@69: Spawn a server process for this manager object jpayne@69: ''' jpayne@69: if self._state.value != State.INITIAL: jpayne@69: if self._state.value == State.STARTED: jpayne@69: raise ProcessError("Already started server") jpayne@69: elif self._state.value == State.SHUTDOWN: jpayne@69: raise ProcessError("Manager has shut down") jpayne@69: else: jpayne@69: raise ProcessError( jpayne@69: "Unknown state {!r}".format(self._state.value)) jpayne@69: jpayne@69: if initializer is not None and not callable(initializer): jpayne@69: raise TypeError('initializer must be a callable') jpayne@69: jpayne@69: # pipe over which we will retrieve address of server jpayne@69: reader, writer = connection.Pipe(duplex=False) jpayne@69: jpayne@69: # spawn process which runs a server jpayne@69: self._process = self._ctx.Process( jpayne@69: target=type(self)._run_server, jpayne@69: args=(self._registry, self._address, self._authkey, jpayne@69: self._serializer, writer, initializer, initargs), jpayne@69: ) jpayne@69: ident = ':'.join(str(i) for i in self._process._identity) jpayne@69: self._process.name = type(self).__name__ + '-' + ident jpayne@69: self._process.start() jpayne@69: jpayne@69: # get address of server jpayne@69: writer.close() jpayne@69: self._address = reader.recv() jpayne@69: reader.close() jpayne@69: jpayne@69: # register a finalizer jpayne@69: self._state.value = State.STARTED jpayne@69: self.shutdown = util.Finalize( jpayne@69: self, type(self)._finalize_manager, jpayne@69: args=(self._process, self._address, self._authkey, jpayne@69: self._state, self._Client), jpayne@69: exitpriority=0 jpayne@69: ) jpayne@69: jpayne@69: @classmethod jpayne@69: def _run_server(cls, registry, address, authkey, serializer, writer, jpayne@69: initializer=None, initargs=()): jpayne@69: ''' jpayne@69: Create a server, report its address and run it jpayne@69: ''' jpayne@69: # bpo-36368: protect server process from KeyboardInterrupt signals jpayne@69: signal.signal(signal.SIGINT, signal.SIG_IGN) jpayne@69: jpayne@69: if initializer is not None: jpayne@69: initializer(*initargs) jpayne@69: jpayne@69: # create server jpayne@69: server = cls._Server(registry, address, authkey, serializer) jpayne@69: jpayne@69: # inform parent process of the server's address jpayne@69: writer.send(server.address) jpayne@69: writer.close() jpayne@69: jpayne@69: # run the manager jpayne@69: util.info('manager serving at %r', server.address) jpayne@69: server.serve_forever() jpayne@69: jpayne@69: def _create(self, typeid, /, *args, **kwds): jpayne@69: ''' jpayne@69: Create a new shared object; return the token and exposed tuple jpayne@69: ''' jpayne@69: assert self._state.value == State.STARTED, 'server not yet started' jpayne@69: conn = self._Client(self._address, authkey=self._authkey) jpayne@69: try: jpayne@69: id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) jpayne@69: finally: jpayne@69: conn.close() jpayne@69: return Token(typeid, self._address, id), exposed jpayne@69: jpayne@69: def join(self, timeout=None): jpayne@69: ''' jpayne@69: Join the manager process (if it has been spawned) jpayne@69: ''' jpayne@69: if self._process is not None: jpayne@69: self._process.join(timeout) jpayne@69: if not self._process.is_alive(): jpayne@69: self._process = None jpayne@69: jpayne@69: def _debug_info(self): jpayne@69: ''' jpayne@69: Return some info about the servers shared objects and connections jpayne@69: ''' jpayne@69: conn = self._Client(self._address, authkey=self._authkey) jpayne@69: try: jpayne@69: return dispatch(conn, None, 'debug_info') jpayne@69: finally: jpayne@69: conn.close() jpayne@69: jpayne@69: def _number_of_objects(self): jpayne@69: ''' jpayne@69: Return the number of shared objects jpayne@69: ''' jpayne@69: conn = self._Client(self._address, authkey=self._authkey) jpayne@69: try: jpayne@69: return dispatch(conn, None, 'number_of_objects') jpayne@69: finally: jpayne@69: conn.close() jpayne@69: jpayne@69: def __enter__(self): jpayne@69: if self._state.value == State.INITIAL: jpayne@69: self.start() jpayne@69: if self._state.value != State.STARTED: jpayne@69: if self._state.value == State.INITIAL: jpayne@69: raise ProcessError("Unable to start server") jpayne@69: elif self._state.value == State.SHUTDOWN: jpayne@69: raise ProcessError("Manager has shut down") jpayne@69: else: jpayne@69: raise ProcessError( jpayne@69: "Unknown state {!r}".format(self._state.value)) jpayne@69: return self jpayne@69: jpayne@69: def __exit__(self, exc_type, exc_val, exc_tb): jpayne@69: self.shutdown() jpayne@69: jpayne@69: @staticmethod jpayne@69: def _finalize_manager(process, address, authkey, state, _Client): jpayne@69: ''' jpayne@69: Shutdown the manager process; will be registered as a finalizer jpayne@69: ''' jpayne@69: if process.is_alive(): jpayne@69: util.info('sending shutdown message to manager') jpayne@69: try: jpayne@69: conn = _Client(address, authkey=authkey) jpayne@69: try: jpayne@69: dispatch(conn, None, 'shutdown') jpayne@69: finally: jpayne@69: conn.close() jpayne@69: except Exception: jpayne@69: pass jpayne@69: jpayne@69: process.join(timeout=1.0) jpayne@69: if process.is_alive(): jpayne@69: util.info('manager still alive') jpayne@69: if hasattr(process, 'terminate'): jpayne@69: util.info('trying to `terminate()` manager process') jpayne@69: process.terminate() jpayne@69: process.join(timeout=0.1) jpayne@69: if process.is_alive(): jpayne@69: util.info('manager still alive after terminate') jpayne@69: jpayne@69: state.value = State.SHUTDOWN jpayne@69: try: jpayne@69: del BaseProxy._address_to_local[address] jpayne@69: except KeyError: jpayne@69: pass jpayne@69: jpayne@69: @property jpayne@69: def address(self): jpayne@69: return self._address jpayne@69: jpayne@69: @classmethod jpayne@69: def register(cls, typeid, callable=None, proxytype=None, exposed=None, jpayne@69: method_to_typeid=None, create_method=True): jpayne@69: ''' jpayne@69: Register a typeid with the manager type jpayne@69: ''' jpayne@69: if '_registry' not in cls.__dict__: jpayne@69: cls._registry = cls._registry.copy() jpayne@69: jpayne@69: if proxytype is None: jpayne@69: proxytype = AutoProxy jpayne@69: jpayne@69: exposed = exposed or getattr(proxytype, '_exposed_', None) jpayne@69: jpayne@69: method_to_typeid = method_to_typeid or \ jpayne@69: getattr(proxytype, '_method_to_typeid_', None) jpayne@69: jpayne@69: if method_to_typeid: jpayne@69: for key, value in list(method_to_typeid.items()): # isinstance? jpayne@69: assert type(key) is str, '%r is not a string' % key jpayne@69: assert type(value) is str, '%r is not a string' % value jpayne@69: jpayne@69: cls._registry[typeid] = ( jpayne@69: callable, exposed, method_to_typeid, proxytype jpayne@69: ) jpayne@69: jpayne@69: if create_method: jpayne@69: def temp(self, /, *args, **kwds): jpayne@69: util.debug('requesting creation of a shared %r object', typeid) jpayne@69: token, exp = self._create(typeid, *args, **kwds) jpayne@69: proxy = proxytype( jpayne@69: token, self._serializer, manager=self, jpayne@69: authkey=self._authkey, exposed=exp jpayne@69: ) jpayne@69: conn = self._Client(token.address, authkey=self._authkey) jpayne@69: dispatch(conn, None, 'decref', (token.id,)) jpayne@69: return proxy jpayne@69: temp.__name__ = typeid jpayne@69: setattr(cls, typeid, temp) jpayne@69: jpayne@69: # jpayne@69: # Subclass of set which get cleared after a fork jpayne@69: # jpayne@69: jpayne@69: class ProcessLocalSet(set): jpayne@69: def __init__(self): jpayne@69: util.register_after_fork(self, lambda obj: obj.clear()) jpayne@69: def __reduce__(self): jpayne@69: return type(self), () jpayne@69: jpayne@69: # jpayne@69: # Definition of BaseProxy jpayne@69: # jpayne@69: jpayne@69: class BaseProxy(object): jpayne@69: ''' jpayne@69: A base for proxies of shared objects jpayne@69: ''' jpayne@69: _address_to_local = {} jpayne@69: _mutex = util.ForkAwareThreadLock() jpayne@69: jpayne@69: def __init__(self, token, serializer, manager=None, jpayne@69: authkey=None, exposed=None, incref=True, manager_owned=False): jpayne@69: with BaseProxy._mutex: jpayne@69: tls_idset = BaseProxy._address_to_local.get(token.address, None) jpayne@69: if tls_idset is None: jpayne@69: tls_idset = util.ForkAwareLocal(), ProcessLocalSet() jpayne@69: BaseProxy._address_to_local[token.address] = tls_idset jpayne@69: jpayne@69: # self._tls is used to record the connection used by this jpayne@69: # thread to communicate with the manager at token.address jpayne@69: self._tls = tls_idset[0] jpayne@69: jpayne@69: # self._idset is used to record the identities of all shared jpayne@69: # objects for which the current process owns references and jpayne@69: # which are in the manager at token.address jpayne@69: self._idset = tls_idset[1] jpayne@69: jpayne@69: self._token = token jpayne@69: self._id = self._token.id jpayne@69: self._manager = manager jpayne@69: self._serializer = serializer jpayne@69: self._Client = listener_client[serializer][1] jpayne@69: jpayne@69: # Should be set to True only when a proxy object is being created jpayne@69: # on the manager server; primary use case: nested proxy objects. jpayne@69: # RebuildProxy detects when a proxy is being created on the manager jpayne@69: # and sets this value appropriately. jpayne@69: self._owned_by_manager = manager_owned jpayne@69: jpayne@69: if authkey is not None: jpayne@69: self._authkey = process.AuthenticationString(authkey) jpayne@69: elif self._manager is not None: jpayne@69: self._authkey = self._manager._authkey jpayne@69: else: jpayne@69: self._authkey = process.current_process().authkey jpayne@69: jpayne@69: if incref: jpayne@69: self._incref() jpayne@69: jpayne@69: util.register_after_fork(self, BaseProxy._after_fork) jpayne@69: jpayne@69: def _connect(self): jpayne@69: util.debug('making connection to manager') jpayne@69: name = process.current_process().name jpayne@69: if threading.current_thread().name != 'MainThread': jpayne@69: name += '|' + threading.current_thread().name jpayne@69: conn = self._Client(self._token.address, authkey=self._authkey) jpayne@69: dispatch(conn, None, 'accept_connection', (name,)) jpayne@69: self._tls.connection = conn jpayne@69: jpayne@69: def _callmethod(self, methodname, args=(), kwds={}): jpayne@69: ''' jpayne@69: Try to call a method of the referrent and return a copy of the result jpayne@69: ''' jpayne@69: try: jpayne@69: conn = self._tls.connection jpayne@69: except AttributeError: jpayne@69: util.debug('thread %r does not own a connection', jpayne@69: threading.current_thread().name) jpayne@69: self._connect() jpayne@69: conn = self._tls.connection jpayne@69: jpayne@69: conn.send((self._id, methodname, args, kwds)) jpayne@69: kind, result = conn.recv() jpayne@69: jpayne@69: if kind == '#RETURN': jpayne@69: return result jpayne@69: elif kind == '#PROXY': jpayne@69: exposed, token = result jpayne@69: proxytype = self._manager._registry[token.typeid][-1] jpayne@69: token.address = self._token.address jpayne@69: proxy = proxytype( jpayne@69: token, self._serializer, manager=self._manager, jpayne@69: authkey=self._authkey, exposed=exposed jpayne@69: ) jpayne@69: conn = self._Client(token.address, authkey=self._authkey) jpayne@69: dispatch(conn, None, 'decref', (token.id,)) jpayne@69: return proxy jpayne@69: raise convert_to_error(kind, result) jpayne@69: jpayne@69: def _getvalue(self): jpayne@69: ''' jpayne@69: Get a copy of the value of the referent jpayne@69: ''' jpayne@69: return self._callmethod('#GETVALUE') jpayne@69: jpayne@69: def _incref(self): jpayne@69: if self._owned_by_manager: jpayne@69: util.debug('owned_by_manager skipped INCREF of %r', self._token.id) jpayne@69: return jpayne@69: jpayne@69: conn = self._Client(self._token.address, authkey=self._authkey) jpayne@69: dispatch(conn, None, 'incref', (self._id,)) jpayne@69: util.debug('INCREF %r', self._token.id) jpayne@69: jpayne@69: self._idset.add(self._id) jpayne@69: jpayne@69: state = self._manager and self._manager._state jpayne@69: jpayne@69: self._close = util.Finalize( jpayne@69: self, BaseProxy._decref, jpayne@69: args=(self._token, self._authkey, state, jpayne@69: self._tls, self._idset, self._Client), jpayne@69: exitpriority=10 jpayne@69: ) jpayne@69: jpayne@69: @staticmethod jpayne@69: def _decref(token, authkey, state, tls, idset, _Client): jpayne@69: idset.discard(token.id) jpayne@69: jpayne@69: # check whether manager is still alive jpayne@69: if state is None or state.value == State.STARTED: jpayne@69: # tell manager this process no longer cares about referent jpayne@69: try: jpayne@69: util.debug('DECREF %r', token.id) jpayne@69: conn = _Client(token.address, authkey=authkey) jpayne@69: dispatch(conn, None, 'decref', (token.id,)) jpayne@69: except Exception as e: jpayne@69: util.debug('... decref failed %s', e) jpayne@69: jpayne@69: else: jpayne@69: util.debug('DECREF %r -- manager already shutdown', token.id) jpayne@69: jpayne@69: # check whether we can close this thread's connection because jpayne@69: # the process owns no more references to objects for this manager jpayne@69: if not idset and hasattr(tls, 'connection'): jpayne@69: util.debug('thread %r has no more proxies so closing conn', jpayne@69: threading.current_thread().name) jpayne@69: tls.connection.close() jpayne@69: del tls.connection jpayne@69: jpayne@69: def _after_fork(self): jpayne@69: self._manager = None jpayne@69: try: jpayne@69: self._incref() jpayne@69: except Exception as e: jpayne@69: # the proxy may just be for a manager which has shutdown jpayne@69: util.info('incref failed: %s' % e) jpayne@69: jpayne@69: def __reduce__(self): jpayne@69: kwds = {} jpayne@69: if get_spawning_popen() is not None: jpayne@69: kwds['authkey'] = self._authkey jpayne@69: jpayne@69: if getattr(self, '_isauto', False): jpayne@69: kwds['exposed'] = self._exposed_ jpayne@69: return (RebuildProxy, jpayne@69: (AutoProxy, self._token, self._serializer, kwds)) jpayne@69: else: jpayne@69: return (RebuildProxy, jpayne@69: (type(self), self._token, self._serializer, kwds)) jpayne@69: jpayne@69: def __deepcopy__(self, memo): jpayne@69: return self._getvalue() jpayne@69: jpayne@69: def __repr__(self): jpayne@69: return '<%s object, typeid %r at %#x>' % \ jpayne@69: (type(self).__name__, self._token.typeid, id(self)) jpayne@69: jpayne@69: def __str__(self): jpayne@69: ''' jpayne@69: Return representation of the referent (or a fall-back if that fails) jpayne@69: ''' jpayne@69: try: jpayne@69: return self._callmethod('__repr__') jpayne@69: except Exception: jpayne@69: return repr(self)[:-1] + "; '__str__()' failed>" jpayne@69: jpayne@69: # jpayne@69: # Function used for unpickling jpayne@69: # jpayne@69: jpayne@69: def RebuildProxy(func, token, serializer, kwds): jpayne@69: ''' jpayne@69: Function used for unpickling proxy objects. jpayne@69: ''' jpayne@69: server = getattr(process.current_process(), '_manager_server', None) jpayne@69: if server and server.address == token.address: jpayne@69: util.debug('Rebuild a proxy owned by manager, token=%r', token) jpayne@69: kwds['manager_owned'] = True jpayne@69: if token.id not in server.id_to_local_proxy_obj: jpayne@69: server.id_to_local_proxy_obj[token.id] = \ jpayne@69: server.id_to_obj[token.id] jpayne@69: incref = ( jpayne@69: kwds.pop('incref', True) and jpayne@69: not getattr(process.current_process(), '_inheriting', False) jpayne@69: ) jpayne@69: return func(token, serializer, incref=incref, **kwds) jpayne@69: jpayne@69: # jpayne@69: # Functions to create proxies and proxy types jpayne@69: # jpayne@69: jpayne@69: def MakeProxyType(name, exposed, _cache={}): jpayne@69: ''' jpayne@69: Return a proxy type whose methods are given by `exposed` jpayne@69: ''' jpayne@69: exposed = tuple(exposed) jpayne@69: try: jpayne@69: return _cache[(name, exposed)] jpayne@69: except KeyError: jpayne@69: pass jpayne@69: jpayne@69: dic = {} jpayne@69: jpayne@69: for meth in exposed: jpayne@69: exec('''def %s(self, /, *args, **kwds): jpayne@69: return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) jpayne@69: jpayne@69: ProxyType = type(name, (BaseProxy,), dic) jpayne@69: ProxyType._exposed_ = exposed jpayne@69: _cache[(name, exposed)] = ProxyType jpayne@69: return ProxyType jpayne@69: jpayne@69: jpayne@69: def AutoProxy(token, serializer, manager=None, authkey=None, jpayne@69: exposed=None, incref=True): jpayne@69: ''' jpayne@69: Return an auto-proxy for `token` jpayne@69: ''' jpayne@69: _Client = listener_client[serializer][1] jpayne@69: jpayne@69: if exposed is None: jpayne@69: conn = _Client(token.address, authkey=authkey) jpayne@69: try: jpayne@69: exposed = dispatch(conn, None, 'get_methods', (token,)) jpayne@69: finally: jpayne@69: conn.close() jpayne@69: jpayne@69: if authkey is None and manager is not None: jpayne@69: authkey = manager._authkey jpayne@69: if authkey is None: jpayne@69: authkey = process.current_process().authkey jpayne@69: jpayne@69: ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) jpayne@69: proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, jpayne@69: incref=incref) jpayne@69: proxy._isauto = True jpayne@69: return proxy jpayne@69: jpayne@69: # jpayne@69: # Types/callables which we will register with SyncManager jpayne@69: # jpayne@69: jpayne@69: class Namespace(object): jpayne@69: def __init__(self, /, **kwds): jpayne@69: self.__dict__.update(kwds) jpayne@69: def __repr__(self): jpayne@69: items = list(self.__dict__.items()) jpayne@69: temp = [] jpayne@69: for name, value in items: jpayne@69: if not name.startswith('_'): jpayne@69: temp.append('%s=%r' % (name, value)) jpayne@69: temp.sort() jpayne@69: return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) jpayne@69: jpayne@69: class Value(object): jpayne@69: def __init__(self, typecode, value, lock=True): jpayne@69: self._typecode = typecode jpayne@69: self._value = value jpayne@69: def get(self): jpayne@69: return self._value jpayne@69: def set(self, value): jpayne@69: self._value = value jpayne@69: def __repr__(self): jpayne@69: return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) jpayne@69: value = property(get, set) jpayne@69: jpayne@69: def Array(typecode, sequence, lock=True): jpayne@69: return array.array(typecode, sequence) jpayne@69: jpayne@69: # jpayne@69: # Proxy types used by SyncManager jpayne@69: # jpayne@69: jpayne@69: class IteratorProxy(BaseProxy): jpayne@69: _exposed_ = ('__next__', 'send', 'throw', 'close') jpayne@69: def __iter__(self): jpayne@69: return self jpayne@69: def __next__(self, *args): jpayne@69: return self._callmethod('__next__', args) jpayne@69: def send(self, *args): jpayne@69: return self._callmethod('send', args) jpayne@69: def throw(self, *args): jpayne@69: return self._callmethod('throw', args) jpayne@69: def close(self, *args): jpayne@69: return self._callmethod('close', args) jpayne@69: jpayne@69: jpayne@69: class AcquirerProxy(BaseProxy): jpayne@69: _exposed_ = ('acquire', 'release') jpayne@69: def acquire(self, blocking=True, timeout=None): jpayne@69: args = (blocking,) if timeout is None else (blocking, timeout) jpayne@69: return self._callmethod('acquire', args) jpayne@69: def release(self): jpayne@69: return self._callmethod('release') jpayne@69: def __enter__(self): jpayne@69: return self._callmethod('acquire') jpayne@69: def __exit__(self, exc_type, exc_val, exc_tb): jpayne@69: return self._callmethod('release') jpayne@69: jpayne@69: jpayne@69: class ConditionProxy(AcquirerProxy): jpayne@69: _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') jpayne@69: def wait(self, timeout=None): jpayne@69: return self._callmethod('wait', (timeout,)) jpayne@69: def notify(self, n=1): jpayne@69: return self._callmethod('notify', (n,)) jpayne@69: def notify_all(self): jpayne@69: return self._callmethod('notify_all') jpayne@69: def wait_for(self, predicate, timeout=None): jpayne@69: result = predicate() jpayne@69: if result: jpayne@69: return result jpayne@69: if timeout is not None: jpayne@69: endtime = time.monotonic() + timeout jpayne@69: else: jpayne@69: endtime = None jpayne@69: waittime = None jpayne@69: while not result: jpayne@69: if endtime is not None: jpayne@69: waittime = endtime - time.monotonic() jpayne@69: if waittime <= 0: jpayne@69: break jpayne@69: self.wait(waittime) jpayne@69: result = predicate() jpayne@69: return result jpayne@69: jpayne@69: jpayne@69: class EventProxy(BaseProxy): jpayne@69: _exposed_ = ('is_set', 'set', 'clear', 'wait') jpayne@69: def is_set(self): jpayne@69: return self._callmethod('is_set') jpayne@69: def set(self): jpayne@69: return self._callmethod('set') jpayne@69: def clear(self): jpayne@69: return self._callmethod('clear') jpayne@69: def wait(self, timeout=None): jpayne@69: return self._callmethod('wait', (timeout,)) jpayne@69: jpayne@69: jpayne@69: class BarrierProxy(BaseProxy): jpayne@69: _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') jpayne@69: def wait(self, timeout=None): jpayne@69: return self._callmethod('wait', (timeout,)) jpayne@69: def abort(self): jpayne@69: return self._callmethod('abort') jpayne@69: def reset(self): jpayne@69: return self._callmethod('reset') jpayne@69: @property jpayne@69: def parties(self): jpayne@69: return self._callmethod('__getattribute__', ('parties',)) jpayne@69: @property jpayne@69: def n_waiting(self): jpayne@69: return self._callmethod('__getattribute__', ('n_waiting',)) jpayne@69: @property jpayne@69: def broken(self): jpayne@69: return self._callmethod('__getattribute__', ('broken',)) jpayne@69: jpayne@69: jpayne@69: class NamespaceProxy(BaseProxy): jpayne@69: _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') jpayne@69: def __getattr__(self, key): jpayne@69: if key[0] == '_': jpayne@69: return object.__getattribute__(self, key) jpayne@69: callmethod = object.__getattribute__(self, '_callmethod') jpayne@69: return callmethod('__getattribute__', (key,)) jpayne@69: def __setattr__(self, key, value): jpayne@69: if key[0] == '_': jpayne@69: return object.__setattr__(self, key, value) jpayne@69: callmethod = object.__getattribute__(self, '_callmethod') jpayne@69: return callmethod('__setattr__', (key, value)) jpayne@69: def __delattr__(self, key): jpayne@69: if key[0] == '_': jpayne@69: return object.__delattr__(self, key) jpayne@69: callmethod = object.__getattribute__(self, '_callmethod') jpayne@69: return callmethod('__delattr__', (key,)) jpayne@69: jpayne@69: jpayne@69: class ValueProxy(BaseProxy): jpayne@69: _exposed_ = ('get', 'set') jpayne@69: def get(self): jpayne@69: return self._callmethod('get') jpayne@69: def set(self, value): jpayne@69: return self._callmethod('set', (value,)) jpayne@69: value = property(get, set) jpayne@69: jpayne@69: jpayne@69: BaseListProxy = MakeProxyType('BaseListProxy', ( jpayne@69: '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', jpayne@69: '__mul__', '__reversed__', '__rmul__', '__setitem__', jpayne@69: 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', jpayne@69: 'reverse', 'sort', '__imul__' jpayne@69: )) jpayne@69: class ListProxy(BaseListProxy): jpayne@69: def __iadd__(self, value): jpayne@69: self._callmethod('extend', (value,)) jpayne@69: return self jpayne@69: def __imul__(self, value): jpayne@69: self._callmethod('__imul__', (value,)) jpayne@69: return self jpayne@69: jpayne@69: jpayne@69: DictProxy = MakeProxyType('DictProxy', ( jpayne@69: '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', jpayne@69: '__setitem__', 'clear', 'copy', 'get', 'items', jpayne@69: 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' jpayne@69: )) jpayne@69: DictProxy._method_to_typeid_ = { jpayne@69: '__iter__': 'Iterator', jpayne@69: } jpayne@69: jpayne@69: jpayne@69: ArrayProxy = MakeProxyType('ArrayProxy', ( jpayne@69: '__len__', '__getitem__', '__setitem__' jpayne@69: )) jpayne@69: jpayne@69: jpayne@69: BasePoolProxy = MakeProxyType('PoolProxy', ( jpayne@69: 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', jpayne@69: 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', jpayne@69: )) jpayne@69: BasePoolProxy._method_to_typeid_ = { jpayne@69: 'apply_async': 'AsyncResult', jpayne@69: 'map_async': 'AsyncResult', jpayne@69: 'starmap_async': 'AsyncResult', jpayne@69: 'imap': 'Iterator', jpayne@69: 'imap_unordered': 'Iterator' jpayne@69: } jpayne@69: class PoolProxy(BasePoolProxy): jpayne@69: def __enter__(self): jpayne@69: return self jpayne@69: def __exit__(self, exc_type, exc_val, exc_tb): jpayne@69: self.terminate() jpayne@69: jpayne@69: # jpayne@69: # Definition of SyncManager jpayne@69: # jpayne@69: jpayne@69: class SyncManager(BaseManager): jpayne@69: ''' jpayne@69: Subclass of `BaseManager` which supports a number of shared object types. jpayne@69: jpayne@69: The types registered are those intended for the synchronization jpayne@69: of threads, plus `dict`, `list` and `Namespace`. jpayne@69: jpayne@69: The `multiprocessing.Manager()` function creates started instances of jpayne@69: this class. jpayne@69: ''' jpayne@69: jpayne@69: SyncManager.register('Queue', queue.Queue) jpayne@69: SyncManager.register('JoinableQueue', queue.Queue) jpayne@69: SyncManager.register('Event', threading.Event, EventProxy) jpayne@69: SyncManager.register('Lock', threading.Lock, AcquirerProxy) jpayne@69: SyncManager.register('RLock', threading.RLock, AcquirerProxy) jpayne@69: SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) jpayne@69: SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, jpayne@69: AcquirerProxy) jpayne@69: SyncManager.register('Condition', threading.Condition, ConditionProxy) jpayne@69: SyncManager.register('Barrier', threading.Barrier, BarrierProxy) jpayne@69: SyncManager.register('Pool', pool.Pool, PoolProxy) jpayne@69: SyncManager.register('list', list, ListProxy) jpayne@69: SyncManager.register('dict', dict, DictProxy) jpayne@69: SyncManager.register('Value', Value, ValueProxy) jpayne@69: SyncManager.register('Array', Array, ArrayProxy) jpayne@69: SyncManager.register('Namespace', Namespace, NamespaceProxy) jpayne@69: jpayne@69: # types returned by methods of PoolProxy jpayne@69: SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) jpayne@69: SyncManager.register('AsyncResult', create_method=False) jpayne@69: jpayne@69: # jpayne@69: # Definition of SharedMemoryManager and SharedMemoryServer jpayne@69: # jpayne@69: jpayne@69: if HAS_SHMEM: jpayne@69: class _SharedMemoryTracker: jpayne@69: "Manages one or more shared memory segments." jpayne@69: jpayne@69: def __init__(self, name, segment_names=[]): jpayne@69: self.shared_memory_context_name = name jpayne@69: self.segment_names = segment_names jpayne@69: jpayne@69: def register_segment(self, segment_name): jpayne@69: "Adds the supplied shared memory block name to tracker." jpayne@69: util.debug(f"Register segment {segment_name!r} in pid {getpid()}") jpayne@69: self.segment_names.append(segment_name) jpayne@69: jpayne@69: def destroy_segment(self, segment_name): jpayne@69: """Calls unlink() on the shared memory block with the supplied name jpayne@69: and removes it from the list of blocks being tracked.""" jpayne@69: util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") jpayne@69: self.segment_names.remove(segment_name) jpayne@69: segment = shared_memory.SharedMemory(segment_name) jpayne@69: segment.close() jpayne@69: segment.unlink() jpayne@69: jpayne@69: def unlink(self): jpayne@69: "Calls destroy_segment() on all tracked shared memory blocks." jpayne@69: for segment_name in self.segment_names[:]: jpayne@69: self.destroy_segment(segment_name) jpayne@69: jpayne@69: def __del__(self): jpayne@69: util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") jpayne@69: self.unlink() jpayne@69: jpayne@69: def __getstate__(self): jpayne@69: return (self.shared_memory_context_name, self.segment_names) jpayne@69: jpayne@69: def __setstate__(self, state): jpayne@69: self.__init__(*state) jpayne@69: jpayne@69: jpayne@69: class SharedMemoryServer(Server): jpayne@69: jpayne@69: public = Server.public + \ jpayne@69: ['track_segment', 'release_segment', 'list_segments'] jpayne@69: jpayne@69: def __init__(self, *args, **kwargs): jpayne@69: Server.__init__(self, *args, **kwargs) jpayne@69: self.shared_memory_context = \ jpayne@69: _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}") jpayne@69: util.debug(f"SharedMemoryServer started by pid {getpid()}") jpayne@69: jpayne@69: def create(*args, **kwargs): jpayne@69: """Create a new distributed-shared object (not backed by a shared jpayne@69: memory block) and return its id to be used in a Proxy Object.""" jpayne@69: # Unless set up as a shared proxy, don't make shared_memory_context jpayne@69: # a standard part of kwargs. This makes things easier for supplying jpayne@69: # simple functions. jpayne@69: if len(args) >= 3: jpayne@69: typeod = args[2] jpayne@69: elif 'typeid' in kwargs: jpayne@69: typeid = kwargs['typeid'] jpayne@69: elif not args: jpayne@69: raise TypeError("descriptor 'create' of 'SharedMemoryServer' " jpayne@69: "object needs an argument") jpayne@69: else: jpayne@69: raise TypeError('create expected at least 2 positional ' jpayne@69: 'arguments, got %d' % (len(args)-1)) jpayne@69: if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): jpayne@69: kwargs['shared_memory_context'] = self.shared_memory_context jpayne@69: return Server.create(*args, **kwargs) jpayne@69: create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)' jpayne@69: jpayne@69: def shutdown(self, c): jpayne@69: "Call unlink() on all tracked shared memory, terminate the Server." jpayne@69: self.shared_memory_context.unlink() jpayne@69: return Server.shutdown(self, c) jpayne@69: jpayne@69: def track_segment(self, c, segment_name): jpayne@69: "Adds the supplied shared memory block name to Server's tracker." jpayne@69: self.shared_memory_context.register_segment(segment_name) jpayne@69: jpayne@69: def release_segment(self, c, segment_name): jpayne@69: """Calls unlink() on the shared memory block with the supplied name jpayne@69: and removes it from the tracker instance inside the Server.""" jpayne@69: self.shared_memory_context.destroy_segment(segment_name) jpayne@69: jpayne@69: def list_segments(self, c): jpayne@69: """Returns a list of names of shared memory blocks that the Server jpayne@69: is currently tracking.""" jpayne@69: return self.shared_memory_context.segment_names jpayne@69: jpayne@69: jpayne@69: class SharedMemoryManager(BaseManager): jpayne@69: """Like SyncManager but uses SharedMemoryServer instead of Server. jpayne@69: jpayne@69: It provides methods for creating and returning SharedMemory instances jpayne@69: and for creating a list-like object (ShareableList) backed by shared jpayne@69: memory. It also provides methods that create and return Proxy Objects jpayne@69: that support synchronization across processes (i.e. multi-process-safe jpayne@69: locks and semaphores). jpayne@69: """ jpayne@69: jpayne@69: _Server = SharedMemoryServer jpayne@69: jpayne@69: def __init__(self, *args, **kwargs): jpayne@69: if os.name == "posix": jpayne@69: # bpo-36867: Ensure the resource_tracker is running before jpayne@69: # launching the manager process, so that concurrent jpayne@69: # shared_memory manipulation both in the manager and in the jpayne@69: # current process does not create two resource_tracker jpayne@69: # processes. jpayne@69: from . import resource_tracker jpayne@69: resource_tracker.ensure_running() jpayne@69: BaseManager.__init__(self, *args, **kwargs) jpayne@69: util.debug(f"{self.__class__.__name__} created by pid {getpid()}") jpayne@69: jpayne@69: def __del__(self): jpayne@69: util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") jpayne@69: pass jpayne@69: jpayne@69: def get_server(self): jpayne@69: 'Better than monkeypatching for now; merge into Server ultimately' jpayne@69: if self._state.value != State.INITIAL: jpayne@69: if self._state.value == State.STARTED: jpayne@69: raise ProcessError("Already started SharedMemoryServer") jpayne@69: elif self._state.value == State.SHUTDOWN: jpayne@69: raise ProcessError("SharedMemoryManager has shut down") jpayne@69: else: jpayne@69: raise ProcessError( jpayne@69: "Unknown state {!r}".format(self._state.value)) jpayne@69: return self._Server(self._registry, self._address, jpayne@69: self._authkey, self._serializer) jpayne@69: jpayne@69: def SharedMemory(self, size): jpayne@69: """Returns a new SharedMemory instance with the specified size in jpayne@69: bytes, to be tracked by the manager.""" jpayne@69: with self._Client(self._address, authkey=self._authkey) as conn: jpayne@69: sms = shared_memory.SharedMemory(None, create=True, size=size) jpayne@69: try: jpayne@69: dispatch(conn, None, 'track_segment', (sms.name,)) jpayne@69: except BaseException as e: jpayne@69: sms.unlink() jpayne@69: raise e jpayne@69: return sms jpayne@69: jpayne@69: def ShareableList(self, sequence): jpayne@69: """Returns a new ShareableList instance populated with the values jpayne@69: from the input sequence, to be tracked by the manager.""" jpayne@69: with self._Client(self._address, authkey=self._authkey) as conn: jpayne@69: sl = shared_memory.ShareableList(sequence) jpayne@69: try: jpayne@69: dispatch(conn, None, 'track_segment', (sl.shm.name,)) jpayne@69: except BaseException as e: jpayne@69: sl.shm.unlink() jpayne@69: raise e jpayne@69: return sl