annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/managers.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 #
jpayne@69 2 # Module providing manager classes for dealing
jpayne@69 3 # with shared objects
jpayne@69 4 #
jpayne@69 5 # multiprocessing/managers.py
jpayne@69 6 #
jpayne@69 7 # Copyright (c) 2006-2008, R Oudkerk
jpayne@69 8 # Licensed to PSF under a Contributor Agreement.
jpayne@69 9 #
jpayne@69 10
jpayne@69 11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
jpayne@69 12 'SharedMemoryManager' ]
jpayne@69 13
jpayne@69 14 #
jpayne@69 15 # Imports
jpayne@69 16 #
jpayne@69 17
jpayne@69 18 import sys
jpayne@69 19 import threading
jpayne@69 20 import signal
jpayne@69 21 import array
jpayne@69 22 import queue
jpayne@69 23 import time
jpayne@69 24 import os
jpayne@69 25 from os import getpid
jpayne@69 26
jpayne@69 27 from traceback import format_exc
jpayne@69 28
jpayne@69 29 from . import connection
jpayne@69 30 from .context import reduction, get_spawning_popen, ProcessError
jpayne@69 31 from . import pool
jpayne@69 32 from . import process
jpayne@69 33 from . import util
jpayne@69 34 from . import get_context
jpayne@69 35 try:
jpayne@69 36 from . import shared_memory
jpayne@69 37 HAS_SHMEM = True
jpayne@69 38 except ImportError:
jpayne@69 39 HAS_SHMEM = False
jpayne@69 40
jpayne@69 41 #
jpayne@69 42 # Register some things for pickling
jpayne@69 43 #
jpayne@69 44
jpayne@69 45 def reduce_array(a):
jpayne@69 46 return array.array, (a.typecode, a.tobytes())
jpayne@69 47 reduction.register(array.array, reduce_array)
jpayne@69 48
jpayne@69 49 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
jpayne@69 50 if view_types[0] is not list: # only needed in Py3.0
jpayne@69 51 def rebuild_as_list(obj):
jpayne@69 52 return list, (list(obj),)
jpayne@69 53 for view_type in view_types:
jpayne@69 54 reduction.register(view_type, rebuild_as_list)
jpayne@69 55
jpayne@69 56 #
jpayne@69 57 # Type for identifying shared objects
jpayne@69 58 #
jpayne@69 59
jpayne@69 60 class Token(object):
jpayne@69 61 '''
jpayne@69 62 Type to uniquely indentify a shared object
jpayne@69 63 '''
jpayne@69 64 __slots__ = ('typeid', 'address', 'id')
jpayne@69 65
jpayne@69 66 def __init__(self, typeid, address, id):
jpayne@69 67 (self.typeid, self.address, self.id) = (typeid, address, id)
jpayne@69 68
jpayne@69 69 def __getstate__(self):
jpayne@69 70 return (self.typeid, self.address, self.id)
jpayne@69 71
jpayne@69 72 def __setstate__(self, state):
jpayne@69 73 (self.typeid, self.address, self.id) = state
jpayne@69 74
jpayne@69 75 def __repr__(self):
jpayne@69 76 return '%s(typeid=%r, address=%r, id=%r)' % \
jpayne@69 77 (self.__class__.__name__, self.typeid, self.address, self.id)
jpayne@69 78
jpayne@69 79 #
jpayne@69 80 # Function for communication with a manager's server process
jpayne@69 81 #
jpayne@69 82
jpayne@69 83 def dispatch(c, id, methodname, args=(), kwds={}):
jpayne@69 84 '''
jpayne@69 85 Send a message to manager using connection `c` and return response
jpayne@69 86 '''
jpayne@69 87 c.send((id, methodname, args, kwds))
jpayne@69 88 kind, result = c.recv()
jpayne@69 89 if kind == '#RETURN':
jpayne@69 90 return result
jpayne@69 91 raise convert_to_error(kind, result)
jpayne@69 92
jpayne@69 93 def convert_to_error(kind, result):
jpayne@69 94 if kind == '#ERROR':
jpayne@69 95 return result
jpayne@69 96 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
jpayne@69 97 if not isinstance(result, str):
jpayne@69 98 raise TypeError(
jpayne@69 99 "Result {0!r} (kind '{1}') type is {2}, not str".format(
jpayne@69 100 result, kind, type(result)))
jpayne@69 101 if kind == '#UNSERIALIZABLE':
jpayne@69 102 return RemoteError('Unserializable message: %s\n' % result)
jpayne@69 103 else:
jpayne@69 104 return RemoteError(result)
jpayne@69 105 else:
jpayne@69 106 return ValueError('Unrecognized message type {!r}'.format(kind))
jpayne@69 107
jpayne@69 108 class RemoteError(Exception):
jpayne@69 109 def __str__(self):
jpayne@69 110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
jpayne@69 111
jpayne@69 112 #
jpayne@69 113 # Functions for finding the method names of an object
jpayne@69 114 #
jpayne@69 115
jpayne@69 116 def all_methods(obj):
jpayne@69 117 '''
jpayne@69 118 Return a list of names of methods of `obj`
jpayne@69 119 '''
jpayne@69 120 temp = []
jpayne@69 121 for name in dir(obj):
jpayne@69 122 func = getattr(obj, name)
jpayne@69 123 if callable(func):
jpayne@69 124 temp.append(name)
jpayne@69 125 return temp
jpayne@69 126
jpayne@69 127 def public_methods(obj):
jpayne@69 128 '''
jpayne@69 129 Return a list of names of methods of `obj` which do not start with '_'
jpayne@69 130 '''
jpayne@69 131 return [name for name in all_methods(obj) if name[0] != '_']
jpayne@69 132
jpayne@69 133 #
jpayne@69 134 # Server which is run in a process controlled by a manager
jpayne@69 135 #
jpayne@69 136
jpayne@69 137 class Server(object):
jpayne@69 138 '''
jpayne@69 139 Server class which runs in a process controlled by a manager object
jpayne@69 140 '''
jpayne@69 141 public = ['shutdown', 'create', 'accept_connection', 'get_methods',
jpayne@69 142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
jpayne@69 143
jpayne@69 144 def __init__(self, registry, address, authkey, serializer):
jpayne@69 145 if not isinstance(authkey, bytes):
jpayne@69 146 raise TypeError(
jpayne@69 147 "Authkey {0!r} is type {1!s}, not bytes".format(
jpayne@69 148 authkey, type(authkey)))
jpayne@69 149 self.registry = registry
jpayne@69 150 self.authkey = process.AuthenticationString(authkey)
jpayne@69 151 Listener, Client = listener_client[serializer]
jpayne@69 152
jpayne@69 153 # do authentication later
jpayne@69 154 self.listener = Listener(address=address, backlog=16)
jpayne@69 155 self.address = self.listener.address
jpayne@69 156
jpayne@69 157 self.id_to_obj = {'0': (None, ())}
jpayne@69 158 self.id_to_refcount = {}
jpayne@69 159 self.id_to_local_proxy_obj = {}
jpayne@69 160 self.mutex = threading.Lock()
jpayne@69 161
jpayne@69 162 def serve_forever(self):
jpayne@69 163 '''
jpayne@69 164 Run the server forever
jpayne@69 165 '''
jpayne@69 166 self.stop_event = threading.Event()
jpayne@69 167 process.current_process()._manager_server = self
jpayne@69 168 try:
jpayne@69 169 accepter = threading.Thread(target=self.accepter)
jpayne@69 170 accepter.daemon = True
jpayne@69 171 accepter.start()
jpayne@69 172 try:
jpayne@69 173 while not self.stop_event.is_set():
jpayne@69 174 self.stop_event.wait(1)
jpayne@69 175 except (KeyboardInterrupt, SystemExit):
jpayne@69 176 pass
jpayne@69 177 finally:
jpayne@69 178 if sys.stdout != sys.__stdout__: # what about stderr?
jpayne@69 179 util.debug('resetting stdout, stderr')
jpayne@69 180 sys.stdout = sys.__stdout__
jpayne@69 181 sys.stderr = sys.__stderr__
jpayne@69 182 sys.exit(0)
jpayne@69 183
jpayne@69 184 def accepter(self):
jpayne@69 185 while True:
jpayne@69 186 try:
jpayne@69 187 c = self.listener.accept()
jpayne@69 188 except OSError:
jpayne@69 189 continue
jpayne@69 190 t = threading.Thread(target=self.handle_request, args=(c,))
jpayne@69 191 t.daemon = True
jpayne@69 192 t.start()
jpayne@69 193
jpayne@69 194 def handle_request(self, c):
jpayne@69 195 '''
jpayne@69 196 Handle a new connection
jpayne@69 197 '''
jpayne@69 198 funcname = result = request = None
jpayne@69 199 try:
jpayne@69 200 connection.deliver_challenge(c, self.authkey)
jpayne@69 201 connection.answer_challenge(c, self.authkey)
jpayne@69 202 request = c.recv()
jpayne@69 203 ignore, funcname, args, kwds = request
jpayne@69 204 assert funcname in self.public, '%r unrecognized' % funcname
jpayne@69 205 func = getattr(self, funcname)
jpayne@69 206 except Exception:
jpayne@69 207 msg = ('#TRACEBACK', format_exc())
jpayne@69 208 else:
jpayne@69 209 try:
jpayne@69 210 result = func(c, *args, **kwds)
jpayne@69 211 except Exception:
jpayne@69 212 msg = ('#TRACEBACK', format_exc())
jpayne@69 213 else:
jpayne@69 214 msg = ('#RETURN', result)
jpayne@69 215 try:
jpayne@69 216 c.send(msg)
jpayne@69 217 except Exception as e:
jpayne@69 218 try:
jpayne@69 219 c.send(('#TRACEBACK', format_exc()))
jpayne@69 220 except Exception:
jpayne@69 221 pass
jpayne@69 222 util.info('Failure to send message: %r', msg)
jpayne@69 223 util.info(' ... request was %r', request)
jpayne@69 224 util.info(' ... exception was %r', e)
jpayne@69 225
jpayne@69 226 c.close()
jpayne@69 227
jpayne@69 228 def serve_client(self, conn):
jpayne@69 229 '''
jpayne@69 230 Handle requests from the proxies in a particular process/thread
jpayne@69 231 '''
jpayne@69 232 util.debug('starting server thread to service %r',
jpayne@69 233 threading.current_thread().name)
jpayne@69 234
jpayne@69 235 recv = conn.recv
jpayne@69 236 send = conn.send
jpayne@69 237 id_to_obj = self.id_to_obj
jpayne@69 238
jpayne@69 239 while not self.stop_event.is_set():
jpayne@69 240
jpayne@69 241 try:
jpayne@69 242 methodname = obj = None
jpayne@69 243 request = recv()
jpayne@69 244 ident, methodname, args, kwds = request
jpayne@69 245 try:
jpayne@69 246 obj, exposed, gettypeid = id_to_obj[ident]
jpayne@69 247 except KeyError as ke:
jpayne@69 248 try:
jpayne@69 249 obj, exposed, gettypeid = \
jpayne@69 250 self.id_to_local_proxy_obj[ident]
jpayne@69 251 except KeyError as second_ke:
jpayne@69 252 raise ke
jpayne@69 253
jpayne@69 254 if methodname not in exposed:
jpayne@69 255 raise AttributeError(
jpayne@69 256 'method %r of %r object is not in exposed=%r' %
jpayne@69 257 (methodname, type(obj), exposed)
jpayne@69 258 )
jpayne@69 259
jpayne@69 260 function = getattr(obj, methodname)
jpayne@69 261
jpayne@69 262 try:
jpayne@69 263 res = function(*args, **kwds)
jpayne@69 264 except Exception as e:
jpayne@69 265 msg = ('#ERROR', e)
jpayne@69 266 else:
jpayne@69 267 typeid = gettypeid and gettypeid.get(methodname, None)
jpayne@69 268 if typeid:
jpayne@69 269 rident, rexposed = self.create(conn, typeid, res)
jpayne@69 270 token = Token(typeid, self.address, rident)
jpayne@69 271 msg = ('#PROXY', (rexposed, token))
jpayne@69 272 else:
jpayne@69 273 msg = ('#RETURN', res)
jpayne@69 274
jpayne@69 275 except AttributeError:
jpayne@69 276 if methodname is None:
jpayne@69 277 msg = ('#TRACEBACK', format_exc())
jpayne@69 278 else:
jpayne@69 279 try:
jpayne@69 280 fallback_func = self.fallback_mapping[methodname]
jpayne@69 281 result = fallback_func(
jpayne@69 282 self, conn, ident, obj, *args, **kwds
jpayne@69 283 )
jpayne@69 284 msg = ('#RETURN', result)
jpayne@69 285 except Exception:
jpayne@69 286 msg = ('#TRACEBACK', format_exc())
jpayne@69 287
jpayne@69 288 except EOFError:
jpayne@69 289 util.debug('got EOF -- exiting thread serving %r',
jpayne@69 290 threading.current_thread().name)
jpayne@69 291 sys.exit(0)
jpayne@69 292
jpayne@69 293 except Exception:
jpayne@69 294 msg = ('#TRACEBACK', format_exc())
jpayne@69 295
jpayne@69 296 try:
jpayne@69 297 try:
jpayne@69 298 send(msg)
jpayne@69 299 except Exception as e:
jpayne@69 300 send(('#UNSERIALIZABLE', format_exc()))
jpayne@69 301 except Exception as e:
jpayne@69 302 util.info('exception in thread serving %r',
jpayne@69 303 threading.current_thread().name)
jpayne@69 304 util.info(' ... message was %r', msg)
jpayne@69 305 util.info(' ... exception was %r', e)
jpayne@69 306 conn.close()
jpayne@69 307 sys.exit(1)
jpayne@69 308
jpayne@69 309 def fallback_getvalue(self, conn, ident, obj):
jpayne@69 310 return obj
jpayne@69 311
jpayne@69 312 def fallback_str(self, conn, ident, obj):
jpayne@69 313 return str(obj)
jpayne@69 314
jpayne@69 315 def fallback_repr(self, conn, ident, obj):
jpayne@69 316 return repr(obj)
jpayne@69 317
jpayne@69 318 fallback_mapping = {
jpayne@69 319 '__str__':fallback_str,
jpayne@69 320 '__repr__':fallback_repr,
jpayne@69 321 '#GETVALUE':fallback_getvalue
jpayne@69 322 }
jpayne@69 323
jpayne@69 324 def dummy(self, c):
jpayne@69 325 pass
jpayne@69 326
jpayne@69 327 def debug_info(self, c):
jpayne@69 328 '''
jpayne@69 329 Return some info --- useful to spot problems with refcounting
jpayne@69 330 '''
jpayne@69 331 # Perhaps include debug info about 'c'?
jpayne@69 332 with self.mutex:
jpayne@69 333 result = []
jpayne@69 334 keys = list(self.id_to_refcount.keys())
jpayne@69 335 keys.sort()
jpayne@69 336 for ident in keys:
jpayne@69 337 if ident != '0':
jpayne@69 338 result.append(' %s: refcount=%s\n %s' %
jpayne@69 339 (ident, self.id_to_refcount[ident],
jpayne@69 340 str(self.id_to_obj[ident][0])[:75]))
jpayne@69 341 return '\n'.join(result)
jpayne@69 342
jpayne@69 343 def number_of_objects(self, c):
jpayne@69 344 '''
jpayne@69 345 Number of shared objects
jpayne@69 346 '''
jpayne@69 347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
jpayne@69 348 return len(self.id_to_refcount)
jpayne@69 349
jpayne@69 350 def shutdown(self, c):
jpayne@69 351 '''
jpayne@69 352 Shutdown this process
jpayne@69 353 '''
jpayne@69 354 try:
jpayne@69 355 util.debug('manager received shutdown message')
jpayne@69 356 c.send(('#RETURN', None))
jpayne@69 357 except:
jpayne@69 358 import traceback
jpayne@69 359 traceback.print_exc()
jpayne@69 360 finally:
jpayne@69 361 self.stop_event.set()
jpayne@69 362
jpayne@69 363 def create(*args, **kwds):
jpayne@69 364 '''
jpayne@69 365 Create a new shared object and return its id
jpayne@69 366 '''
jpayne@69 367 if len(args) >= 3:
jpayne@69 368 self, c, typeid, *args = args
jpayne@69 369 elif not args:
jpayne@69 370 raise TypeError("descriptor 'create' of 'Server' object "
jpayne@69 371 "needs an argument")
jpayne@69 372 else:
jpayne@69 373 if 'typeid' not in kwds:
jpayne@69 374 raise TypeError('create expected at least 2 positional '
jpayne@69 375 'arguments, got %d' % (len(args)-1))
jpayne@69 376 typeid = kwds.pop('typeid')
jpayne@69 377 if len(args) >= 2:
jpayne@69 378 self, c, *args = args
jpayne@69 379 import warnings
jpayne@69 380 warnings.warn("Passing 'typeid' as keyword argument is deprecated",
jpayne@69 381 DeprecationWarning, stacklevel=2)
jpayne@69 382 else:
jpayne@69 383 if 'c' not in kwds:
jpayne@69 384 raise TypeError('create expected at least 2 positional '
jpayne@69 385 'arguments, got %d' % (len(args)-1))
jpayne@69 386 c = kwds.pop('c')
jpayne@69 387 self, *args = args
jpayne@69 388 import warnings
jpayne@69 389 warnings.warn("Passing 'c' as keyword argument is deprecated",
jpayne@69 390 DeprecationWarning, stacklevel=2)
jpayne@69 391 args = tuple(args)
jpayne@69 392
jpayne@69 393 with self.mutex:
jpayne@69 394 callable, exposed, method_to_typeid, proxytype = \
jpayne@69 395 self.registry[typeid]
jpayne@69 396
jpayne@69 397 if callable is None:
jpayne@69 398 if kwds or (len(args) != 1):
jpayne@69 399 raise ValueError(
jpayne@69 400 "Without callable, must have one non-keyword argument")
jpayne@69 401 obj = args[0]
jpayne@69 402 else:
jpayne@69 403 obj = callable(*args, **kwds)
jpayne@69 404
jpayne@69 405 if exposed is None:
jpayne@69 406 exposed = public_methods(obj)
jpayne@69 407 if method_to_typeid is not None:
jpayne@69 408 if not isinstance(method_to_typeid, dict):
jpayne@69 409 raise TypeError(
jpayne@69 410 "Method_to_typeid {0!r}: type {1!s}, not dict".format(
jpayne@69 411 method_to_typeid, type(method_to_typeid)))
jpayne@69 412 exposed = list(exposed) + list(method_to_typeid)
jpayne@69 413
jpayne@69 414 ident = '%x' % id(obj) # convert to string because xmlrpclib
jpayne@69 415 # only has 32 bit signed integers
jpayne@69 416 util.debug('%r callable returned object with id %r', typeid, ident)
jpayne@69 417
jpayne@69 418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
jpayne@69 419 if ident not in self.id_to_refcount:
jpayne@69 420 self.id_to_refcount[ident] = 0
jpayne@69 421
jpayne@69 422 self.incref(c, ident)
jpayne@69 423 return ident, tuple(exposed)
jpayne@69 424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)'
jpayne@69 425
jpayne@69 426 def get_methods(self, c, token):
jpayne@69 427 '''
jpayne@69 428 Return the methods of the shared object indicated by token
jpayne@69 429 '''
jpayne@69 430 return tuple(self.id_to_obj[token.id][1])
jpayne@69 431
jpayne@69 432 def accept_connection(self, c, name):
jpayne@69 433 '''
jpayne@69 434 Spawn a new thread to serve this connection
jpayne@69 435 '''
jpayne@69 436 threading.current_thread().name = name
jpayne@69 437 c.send(('#RETURN', None))
jpayne@69 438 self.serve_client(c)
jpayne@69 439
jpayne@69 440 def incref(self, c, ident):
jpayne@69 441 with self.mutex:
jpayne@69 442 try:
jpayne@69 443 self.id_to_refcount[ident] += 1
jpayne@69 444 except KeyError as ke:
jpayne@69 445 # If no external references exist but an internal (to the
jpayne@69 446 # manager) still does and a new external reference is created
jpayne@69 447 # from it, restore the manager's tracking of it from the
jpayne@69 448 # previously stashed internal ref.
jpayne@69 449 if ident in self.id_to_local_proxy_obj:
jpayne@69 450 self.id_to_refcount[ident] = 1
jpayne@69 451 self.id_to_obj[ident] = \
jpayne@69 452 self.id_to_local_proxy_obj[ident]
jpayne@69 453 obj, exposed, gettypeid = self.id_to_obj[ident]
jpayne@69 454 util.debug('Server re-enabled tracking & INCREF %r', ident)
jpayne@69 455 else:
jpayne@69 456 raise ke
jpayne@69 457
jpayne@69 458 def decref(self, c, ident):
jpayne@69 459 if ident not in self.id_to_refcount and \
jpayne@69 460 ident in self.id_to_local_proxy_obj:
jpayne@69 461 util.debug('Server DECREF skipping %r', ident)
jpayne@69 462 return
jpayne@69 463
jpayne@69 464 with self.mutex:
jpayne@69 465 if self.id_to_refcount[ident] <= 0:
jpayne@69 466 raise AssertionError(
jpayne@69 467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
jpayne@69 468 ident, self.id_to_obj[ident],
jpayne@69 469 self.id_to_refcount[ident]))
jpayne@69 470 self.id_to_refcount[ident] -= 1
jpayne@69 471 if self.id_to_refcount[ident] == 0:
jpayne@69 472 del self.id_to_refcount[ident]
jpayne@69 473
jpayne@69 474 if ident not in self.id_to_refcount:
jpayne@69 475 # Two-step process in case the object turns out to contain other
jpayne@69 476 # proxy objects (e.g. a managed list of managed lists).
jpayne@69 477 # Otherwise, deleting self.id_to_obj[ident] would trigger the
jpayne@69 478 # deleting of the stored value (another managed object) which would
jpayne@69 479 # in turn attempt to acquire the mutex that is already held here.
jpayne@69 480 self.id_to_obj[ident] = (None, (), None) # thread-safe
jpayne@69 481 util.debug('disposing of obj with id %r', ident)
jpayne@69 482 with self.mutex:
jpayne@69 483 del self.id_to_obj[ident]
jpayne@69 484
jpayne@69 485
jpayne@69 486 #
jpayne@69 487 # Class to represent state of a manager
jpayne@69 488 #
jpayne@69 489
jpayne@69 490 class State(object):
jpayne@69 491 __slots__ = ['value']
jpayne@69 492 INITIAL = 0
jpayne@69 493 STARTED = 1
jpayne@69 494 SHUTDOWN = 2
jpayne@69 495
jpayne@69 496 #
jpayne@69 497 # Mapping from serializer name to Listener and Client types
jpayne@69 498 #
jpayne@69 499
jpayne@69 500 listener_client = {
jpayne@69 501 'pickle' : (connection.Listener, connection.Client),
jpayne@69 502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
jpayne@69 503 }
jpayne@69 504
jpayne@69 505 #
jpayne@69 506 # Definition of BaseManager
jpayne@69 507 #
jpayne@69 508
jpayne@69 509 class BaseManager(object):
jpayne@69 510 '''
jpayne@69 511 Base class for managers
jpayne@69 512 '''
jpayne@69 513 _registry = {}
jpayne@69 514 _Server = Server
jpayne@69 515
jpayne@69 516 def __init__(self, address=None, authkey=None, serializer='pickle',
jpayne@69 517 ctx=None):
jpayne@69 518 if authkey is None:
jpayne@69 519 authkey = process.current_process().authkey
jpayne@69 520 self._address = address # XXX not final address if eg ('', 0)
jpayne@69 521 self._authkey = process.AuthenticationString(authkey)
jpayne@69 522 self._state = State()
jpayne@69 523 self._state.value = State.INITIAL
jpayne@69 524 self._serializer = serializer
jpayne@69 525 self._Listener, self._Client = listener_client[serializer]
jpayne@69 526 self._ctx = ctx or get_context()
jpayne@69 527
jpayne@69 528 def get_server(self):
jpayne@69 529 '''
jpayne@69 530 Return server object with serve_forever() method and address attribute
jpayne@69 531 '''
jpayne@69 532 if self._state.value != State.INITIAL:
jpayne@69 533 if self._state.value == State.STARTED:
jpayne@69 534 raise ProcessError("Already started server")
jpayne@69 535 elif self._state.value == State.SHUTDOWN:
jpayne@69 536 raise ProcessError("Manager has shut down")
jpayne@69 537 else:
jpayne@69 538 raise ProcessError(
jpayne@69 539 "Unknown state {!r}".format(self._state.value))
jpayne@69 540 return Server(self._registry, self._address,
jpayne@69 541 self._authkey, self._serializer)
jpayne@69 542
jpayne@69 543 def connect(self):
jpayne@69 544 '''
jpayne@69 545 Connect manager object to the server process
jpayne@69 546 '''
jpayne@69 547 Listener, Client = listener_client[self._serializer]
jpayne@69 548 conn = Client(self._address, authkey=self._authkey)
jpayne@69 549 dispatch(conn, None, 'dummy')
jpayne@69 550 self._state.value = State.STARTED
jpayne@69 551
jpayne@69 552 def start(self, initializer=None, initargs=()):
jpayne@69 553 '''
jpayne@69 554 Spawn a server process for this manager object
jpayne@69 555 '''
jpayne@69 556 if self._state.value != State.INITIAL:
jpayne@69 557 if self._state.value == State.STARTED:
jpayne@69 558 raise ProcessError("Already started server")
jpayne@69 559 elif self._state.value == State.SHUTDOWN:
jpayne@69 560 raise ProcessError("Manager has shut down")
jpayne@69 561 else:
jpayne@69 562 raise ProcessError(
jpayne@69 563 "Unknown state {!r}".format(self._state.value))
jpayne@69 564
jpayne@69 565 if initializer is not None and not callable(initializer):
jpayne@69 566 raise TypeError('initializer must be a callable')
jpayne@69 567
jpayne@69 568 # pipe over which we will retrieve address of server
jpayne@69 569 reader, writer = connection.Pipe(duplex=False)
jpayne@69 570
jpayne@69 571 # spawn process which runs a server
jpayne@69 572 self._process = self._ctx.Process(
jpayne@69 573 target=type(self)._run_server,
jpayne@69 574 args=(self._registry, self._address, self._authkey,
jpayne@69 575 self._serializer, writer, initializer, initargs),
jpayne@69 576 )
jpayne@69 577 ident = ':'.join(str(i) for i in self._process._identity)
jpayne@69 578 self._process.name = type(self).__name__ + '-' + ident
jpayne@69 579 self._process.start()
jpayne@69 580
jpayne@69 581 # get address of server
jpayne@69 582 writer.close()
jpayne@69 583 self._address = reader.recv()
jpayne@69 584 reader.close()
jpayne@69 585
jpayne@69 586 # register a finalizer
jpayne@69 587 self._state.value = State.STARTED
jpayne@69 588 self.shutdown = util.Finalize(
jpayne@69 589 self, type(self)._finalize_manager,
jpayne@69 590 args=(self._process, self._address, self._authkey,
jpayne@69 591 self._state, self._Client),
jpayne@69 592 exitpriority=0
jpayne@69 593 )
jpayne@69 594
jpayne@69 595 @classmethod
jpayne@69 596 def _run_server(cls, registry, address, authkey, serializer, writer,
jpayne@69 597 initializer=None, initargs=()):
jpayne@69 598 '''
jpayne@69 599 Create a server, report its address and run it
jpayne@69 600 '''
jpayne@69 601 # bpo-36368: protect server process from KeyboardInterrupt signals
jpayne@69 602 signal.signal(signal.SIGINT, signal.SIG_IGN)
jpayne@69 603
jpayne@69 604 if initializer is not None:
jpayne@69 605 initializer(*initargs)
jpayne@69 606
jpayne@69 607 # create server
jpayne@69 608 server = cls._Server(registry, address, authkey, serializer)
jpayne@69 609
jpayne@69 610 # inform parent process of the server's address
jpayne@69 611 writer.send(server.address)
jpayne@69 612 writer.close()
jpayne@69 613
jpayne@69 614 # run the manager
jpayne@69 615 util.info('manager serving at %r', server.address)
jpayne@69 616 server.serve_forever()
jpayne@69 617
jpayne@69 618 def _create(self, typeid, /, *args, **kwds):
jpayne@69 619 '''
jpayne@69 620 Create a new shared object; return the token and exposed tuple
jpayne@69 621 '''
jpayne@69 622 assert self._state.value == State.STARTED, 'server not yet started'
jpayne@69 623 conn = self._Client(self._address, authkey=self._authkey)
jpayne@69 624 try:
jpayne@69 625 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
jpayne@69 626 finally:
jpayne@69 627 conn.close()
jpayne@69 628 return Token(typeid, self._address, id), exposed
jpayne@69 629
jpayne@69 630 def join(self, timeout=None):
jpayne@69 631 '''
jpayne@69 632 Join the manager process (if it has been spawned)
jpayne@69 633 '''
jpayne@69 634 if self._process is not None:
jpayne@69 635 self._process.join(timeout)
jpayne@69 636 if not self._process.is_alive():
jpayne@69 637 self._process = None
jpayne@69 638
jpayne@69 639 def _debug_info(self):
jpayne@69 640 '''
jpayne@69 641 Return some info about the servers shared objects and connections
jpayne@69 642 '''
jpayne@69 643 conn = self._Client(self._address, authkey=self._authkey)
jpayne@69 644 try:
jpayne@69 645 return dispatch(conn, None, 'debug_info')
jpayne@69 646 finally:
jpayne@69 647 conn.close()
jpayne@69 648
jpayne@69 649 def _number_of_objects(self):
jpayne@69 650 '''
jpayne@69 651 Return the number of shared objects
jpayne@69 652 '''
jpayne@69 653 conn = self._Client(self._address, authkey=self._authkey)
jpayne@69 654 try:
jpayne@69 655 return dispatch(conn, None, 'number_of_objects')
jpayne@69 656 finally:
jpayne@69 657 conn.close()
jpayne@69 658
jpayne@69 659 def __enter__(self):
jpayne@69 660 if self._state.value == State.INITIAL:
jpayne@69 661 self.start()
jpayne@69 662 if self._state.value != State.STARTED:
jpayne@69 663 if self._state.value == State.INITIAL:
jpayne@69 664 raise ProcessError("Unable to start server")
jpayne@69 665 elif self._state.value == State.SHUTDOWN:
jpayne@69 666 raise ProcessError("Manager has shut down")
jpayne@69 667 else:
jpayne@69 668 raise ProcessError(
jpayne@69 669 "Unknown state {!r}".format(self._state.value))
jpayne@69 670 return self
jpayne@69 671
jpayne@69 672 def __exit__(self, exc_type, exc_val, exc_tb):
jpayne@69 673 self.shutdown()
jpayne@69 674
jpayne@69 675 @staticmethod
jpayne@69 676 def _finalize_manager(process, address, authkey, state, _Client):
jpayne@69 677 '''
jpayne@69 678 Shutdown the manager process; will be registered as a finalizer
jpayne@69 679 '''
jpayne@69 680 if process.is_alive():
jpayne@69 681 util.info('sending shutdown message to manager')
jpayne@69 682 try:
jpayne@69 683 conn = _Client(address, authkey=authkey)
jpayne@69 684 try:
jpayne@69 685 dispatch(conn, None, 'shutdown')
jpayne@69 686 finally:
jpayne@69 687 conn.close()
jpayne@69 688 except Exception:
jpayne@69 689 pass
jpayne@69 690
jpayne@69 691 process.join(timeout=1.0)
jpayne@69 692 if process.is_alive():
jpayne@69 693 util.info('manager still alive')
jpayne@69 694 if hasattr(process, 'terminate'):
jpayne@69 695 util.info('trying to `terminate()` manager process')
jpayne@69 696 process.terminate()
jpayne@69 697 process.join(timeout=0.1)
jpayne@69 698 if process.is_alive():
jpayne@69 699 util.info('manager still alive after terminate')
jpayne@69 700
jpayne@69 701 state.value = State.SHUTDOWN
jpayne@69 702 try:
jpayne@69 703 del BaseProxy._address_to_local[address]
jpayne@69 704 except KeyError:
jpayne@69 705 pass
jpayne@69 706
jpayne@69 707 @property
jpayne@69 708 def address(self):
jpayne@69 709 return self._address
jpayne@69 710
jpayne@69 711 @classmethod
jpayne@69 712 def register(cls, typeid, callable=None, proxytype=None, exposed=None,
jpayne@69 713 method_to_typeid=None, create_method=True):
jpayne@69 714 '''
jpayne@69 715 Register a typeid with the manager type
jpayne@69 716 '''
jpayne@69 717 if '_registry' not in cls.__dict__:
jpayne@69 718 cls._registry = cls._registry.copy()
jpayne@69 719
jpayne@69 720 if proxytype is None:
jpayne@69 721 proxytype = AutoProxy
jpayne@69 722
jpayne@69 723 exposed = exposed or getattr(proxytype, '_exposed_', None)
jpayne@69 724
jpayne@69 725 method_to_typeid = method_to_typeid or \
jpayne@69 726 getattr(proxytype, '_method_to_typeid_', None)
jpayne@69 727
jpayne@69 728 if method_to_typeid:
jpayne@69 729 for key, value in list(method_to_typeid.items()): # isinstance?
jpayne@69 730 assert type(key) is str, '%r is not a string' % key
jpayne@69 731 assert type(value) is str, '%r is not a string' % value
jpayne@69 732
jpayne@69 733 cls._registry[typeid] = (
jpayne@69 734 callable, exposed, method_to_typeid, proxytype
jpayne@69 735 )
jpayne@69 736
jpayne@69 737 if create_method:
jpayne@69 738 def temp(self, /, *args, **kwds):
jpayne@69 739 util.debug('requesting creation of a shared %r object', typeid)
jpayne@69 740 token, exp = self._create(typeid, *args, **kwds)
jpayne@69 741 proxy = proxytype(
jpayne@69 742 token, self._serializer, manager=self,
jpayne@69 743 authkey=self._authkey, exposed=exp
jpayne@69 744 )
jpayne@69 745 conn = self._Client(token.address, authkey=self._authkey)
jpayne@69 746 dispatch(conn, None, 'decref', (token.id,))
jpayne@69 747 return proxy
jpayne@69 748 temp.__name__ = typeid
jpayne@69 749 setattr(cls, typeid, temp)
jpayne@69 750
jpayne@69 751 #
jpayne@69 752 # Subclass of set which get cleared after a fork
jpayne@69 753 #
jpayne@69 754
jpayne@69 755 class ProcessLocalSet(set):
jpayne@69 756 def __init__(self):
jpayne@69 757 util.register_after_fork(self, lambda obj: obj.clear())
jpayne@69 758 def __reduce__(self):
jpayne@69 759 return type(self), ()
jpayne@69 760
jpayne@69 761 #
jpayne@69 762 # Definition of BaseProxy
jpayne@69 763 #
jpayne@69 764
jpayne@69 765 class BaseProxy(object):
jpayne@69 766 '''
jpayne@69 767 A base for proxies of shared objects
jpayne@69 768 '''
jpayne@69 769 _address_to_local = {}
jpayne@69 770 _mutex = util.ForkAwareThreadLock()
jpayne@69 771
jpayne@69 772 def __init__(self, token, serializer, manager=None,
jpayne@69 773 authkey=None, exposed=None, incref=True, manager_owned=False):
jpayne@69 774 with BaseProxy._mutex:
jpayne@69 775 tls_idset = BaseProxy._address_to_local.get(token.address, None)
jpayne@69 776 if tls_idset is None:
jpayne@69 777 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
jpayne@69 778 BaseProxy._address_to_local[token.address] = tls_idset
jpayne@69 779
jpayne@69 780 # self._tls is used to record the connection used by this
jpayne@69 781 # thread to communicate with the manager at token.address
jpayne@69 782 self._tls = tls_idset[0]
jpayne@69 783
jpayne@69 784 # self._idset is used to record the identities of all shared
jpayne@69 785 # objects for which the current process owns references and
jpayne@69 786 # which are in the manager at token.address
jpayne@69 787 self._idset = tls_idset[1]
jpayne@69 788
jpayne@69 789 self._token = token
jpayne@69 790 self._id = self._token.id
jpayne@69 791 self._manager = manager
jpayne@69 792 self._serializer = serializer
jpayne@69 793 self._Client = listener_client[serializer][1]
jpayne@69 794
jpayne@69 795 # Should be set to True only when a proxy object is being created
jpayne@69 796 # on the manager server; primary use case: nested proxy objects.
jpayne@69 797 # RebuildProxy detects when a proxy is being created on the manager
jpayne@69 798 # and sets this value appropriately.
jpayne@69 799 self._owned_by_manager = manager_owned
jpayne@69 800
jpayne@69 801 if authkey is not None:
jpayne@69 802 self._authkey = process.AuthenticationString(authkey)
jpayne@69 803 elif self._manager is not None:
jpayne@69 804 self._authkey = self._manager._authkey
jpayne@69 805 else:
jpayne@69 806 self._authkey = process.current_process().authkey
jpayne@69 807
jpayne@69 808 if incref:
jpayne@69 809 self._incref()
jpayne@69 810
jpayne@69 811 util.register_after_fork(self, BaseProxy._after_fork)
jpayne@69 812
jpayne@69 813 def _connect(self):
jpayne@69 814 util.debug('making connection to manager')
jpayne@69 815 name = process.current_process().name
jpayne@69 816 if threading.current_thread().name != 'MainThread':
jpayne@69 817 name += '|' + threading.current_thread().name
jpayne@69 818 conn = self._Client(self._token.address, authkey=self._authkey)
jpayne@69 819 dispatch(conn, None, 'accept_connection', (name,))
jpayne@69 820 self._tls.connection = conn
jpayne@69 821
jpayne@69 822 def _callmethod(self, methodname, args=(), kwds={}):
jpayne@69 823 '''
jpayne@69 824 Try to call a method of the referrent and return a copy of the result
jpayne@69 825 '''
jpayne@69 826 try:
jpayne@69 827 conn = self._tls.connection
jpayne@69 828 except AttributeError:
jpayne@69 829 util.debug('thread %r does not own a connection',
jpayne@69 830 threading.current_thread().name)
jpayne@69 831 self._connect()
jpayne@69 832 conn = self._tls.connection
jpayne@69 833
jpayne@69 834 conn.send((self._id, methodname, args, kwds))
jpayne@69 835 kind, result = conn.recv()
jpayne@69 836
jpayne@69 837 if kind == '#RETURN':
jpayne@69 838 return result
jpayne@69 839 elif kind == '#PROXY':
jpayne@69 840 exposed, token = result
jpayne@69 841 proxytype = self._manager._registry[token.typeid][-1]
jpayne@69 842 token.address = self._token.address
jpayne@69 843 proxy = proxytype(
jpayne@69 844 token, self._serializer, manager=self._manager,
jpayne@69 845 authkey=self._authkey, exposed=exposed
jpayne@69 846 )
jpayne@69 847 conn = self._Client(token.address, authkey=self._authkey)
jpayne@69 848 dispatch(conn, None, 'decref', (token.id,))
jpayne@69 849 return proxy
jpayne@69 850 raise convert_to_error(kind, result)
jpayne@69 851
jpayne@69 852 def _getvalue(self):
jpayne@69 853 '''
jpayne@69 854 Get a copy of the value of the referent
jpayne@69 855 '''
jpayne@69 856 return self._callmethod('#GETVALUE')
jpayne@69 857
jpayne@69 858 def _incref(self):
jpayne@69 859 if self._owned_by_manager:
jpayne@69 860 util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
jpayne@69 861 return
jpayne@69 862
jpayne@69 863 conn = self._Client(self._token.address, authkey=self._authkey)
jpayne@69 864 dispatch(conn, None, 'incref', (self._id,))
jpayne@69 865 util.debug('INCREF %r', self._token.id)
jpayne@69 866
jpayne@69 867 self._idset.add(self._id)
jpayne@69 868
jpayne@69 869 state = self._manager and self._manager._state
jpayne@69 870
jpayne@69 871 self._close = util.Finalize(
jpayne@69 872 self, BaseProxy._decref,
jpayne@69 873 args=(self._token, self._authkey, state,
jpayne@69 874 self._tls, self._idset, self._Client),
jpayne@69 875 exitpriority=10
jpayne@69 876 )
jpayne@69 877
jpayne@69 878 @staticmethod
jpayne@69 879 def _decref(token, authkey, state, tls, idset, _Client):
jpayne@69 880 idset.discard(token.id)
jpayne@69 881
jpayne@69 882 # check whether manager is still alive
jpayne@69 883 if state is None or state.value == State.STARTED:
jpayne@69 884 # tell manager this process no longer cares about referent
jpayne@69 885 try:
jpayne@69 886 util.debug('DECREF %r', token.id)
jpayne@69 887 conn = _Client(token.address, authkey=authkey)
jpayne@69 888 dispatch(conn, None, 'decref', (token.id,))
jpayne@69 889 except Exception as e:
jpayne@69 890 util.debug('... decref failed %s', e)
jpayne@69 891
jpayne@69 892 else:
jpayne@69 893 util.debug('DECREF %r -- manager already shutdown', token.id)
jpayne@69 894
jpayne@69 895 # check whether we can close this thread's connection because
jpayne@69 896 # the process owns no more references to objects for this manager
jpayne@69 897 if not idset and hasattr(tls, 'connection'):
jpayne@69 898 util.debug('thread %r has no more proxies so closing conn',
jpayne@69 899 threading.current_thread().name)
jpayne@69 900 tls.connection.close()
jpayne@69 901 del tls.connection
jpayne@69 902
jpayne@69 903 def _after_fork(self):
jpayne@69 904 self._manager = None
jpayne@69 905 try:
jpayne@69 906 self._incref()
jpayne@69 907 except Exception as e:
jpayne@69 908 # the proxy may just be for a manager which has shutdown
jpayne@69 909 util.info('incref failed: %s' % e)
jpayne@69 910
jpayne@69 911 def __reduce__(self):
jpayne@69 912 kwds = {}
jpayne@69 913 if get_spawning_popen() is not None:
jpayne@69 914 kwds['authkey'] = self._authkey
jpayne@69 915
jpayne@69 916 if getattr(self, '_isauto', False):
jpayne@69 917 kwds['exposed'] = self._exposed_
jpayne@69 918 return (RebuildProxy,
jpayne@69 919 (AutoProxy, self._token, self._serializer, kwds))
jpayne@69 920 else:
jpayne@69 921 return (RebuildProxy,
jpayne@69 922 (type(self), self._token, self._serializer, kwds))
jpayne@69 923
jpayne@69 924 def __deepcopy__(self, memo):
jpayne@69 925 return self._getvalue()
jpayne@69 926
jpayne@69 927 def __repr__(self):
jpayne@69 928 return '<%s object, typeid %r at %#x>' % \
jpayne@69 929 (type(self).__name__, self._token.typeid, id(self))
jpayne@69 930
jpayne@69 931 def __str__(self):
jpayne@69 932 '''
jpayne@69 933 Return representation of the referent (or a fall-back if that fails)
jpayne@69 934 '''
jpayne@69 935 try:
jpayne@69 936 return self._callmethod('__repr__')
jpayne@69 937 except Exception:
jpayne@69 938 return repr(self)[:-1] + "; '__str__()' failed>"
jpayne@69 939
jpayne@69 940 #
jpayne@69 941 # Function used for unpickling
jpayne@69 942 #
jpayne@69 943
jpayne@69 944 def RebuildProxy(func, token, serializer, kwds):
jpayne@69 945 '''
jpayne@69 946 Function used for unpickling proxy objects.
jpayne@69 947 '''
jpayne@69 948 server = getattr(process.current_process(), '_manager_server', None)
jpayne@69 949 if server and server.address == token.address:
jpayne@69 950 util.debug('Rebuild a proxy owned by manager, token=%r', token)
jpayne@69 951 kwds['manager_owned'] = True
jpayne@69 952 if token.id not in server.id_to_local_proxy_obj:
jpayne@69 953 server.id_to_local_proxy_obj[token.id] = \
jpayne@69 954 server.id_to_obj[token.id]
jpayne@69 955 incref = (
jpayne@69 956 kwds.pop('incref', True) and
jpayne@69 957 not getattr(process.current_process(), '_inheriting', False)
jpayne@69 958 )
jpayne@69 959 return func(token, serializer, incref=incref, **kwds)
jpayne@69 960
jpayne@69 961 #
jpayne@69 962 # Functions to create proxies and proxy types
jpayne@69 963 #
jpayne@69 964
jpayne@69 965 def MakeProxyType(name, exposed, _cache={}):
jpayne@69 966 '''
jpayne@69 967 Return a proxy type whose methods are given by `exposed`
jpayne@69 968 '''
jpayne@69 969 exposed = tuple(exposed)
jpayne@69 970 try:
jpayne@69 971 return _cache[(name, exposed)]
jpayne@69 972 except KeyError:
jpayne@69 973 pass
jpayne@69 974
jpayne@69 975 dic = {}
jpayne@69 976
jpayne@69 977 for meth in exposed:
jpayne@69 978 exec('''def %s(self, /, *args, **kwds):
jpayne@69 979 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
jpayne@69 980
jpayne@69 981 ProxyType = type(name, (BaseProxy,), dic)
jpayne@69 982 ProxyType._exposed_ = exposed
jpayne@69 983 _cache[(name, exposed)] = ProxyType
jpayne@69 984 return ProxyType
jpayne@69 985
jpayne@69 986
jpayne@69 987 def AutoProxy(token, serializer, manager=None, authkey=None,
jpayne@69 988 exposed=None, incref=True):
jpayne@69 989 '''
jpayne@69 990 Return an auto-proxy for `token`
jpayne@69 991 '''
jpayne@69 992 _Client = listener_client[serializer][1]
jpayne@69 993
jpayne@69 994 if exposed is None:
jpayne@69 995 conn = _Client(token.address, authkey=authkey)
jpayne@69 996 try:
jpayne@69 997 exposed = dispatch(conn, None, 'get_methods', (token,))
jpayne@69 998 finally:
jpayne@69 999 conn.close()
jpayne@69 1000
jpayne@69 1001 if authkey is None and manager is not None:
jpayne@69 1002 authkey = manager._authkey
jpayne@69 1003 if authkey is None:
jpayne@69 1004 authkey = process.current_process().authkey
jpayne@69 1005
jpayne@69 1006 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
jpayne@69 1007 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
jpayne@69 1008 incref=incref)
jpayne@69 1009 proxy._isauto = True
jpayne@69 1010 return proxy
jpayne@69 1011
jpayne@69 1012 #
jpayne@69 1013 # Types/callables which we will register with SyncManager
jpayne@69 1014 #
jpayne@69 1015
jpayne@69 1016 class Namespace(object):
jpayne@69 1017 def __init__(self, /, **kwds):
jpayne@69 1018 self.__dict__.update(kwds)
jpayne@69 1019 def __repr__(self):
jpayne@69 1020 items = list(self.__dict__.items())
jpayne@69 1021 temp = []
jpayne@69 1022 for name, value in items:
jpayne@69 1023 if not name.startswith('_'):
jpayne@69 1024 temp.append('%s=%r' % (name, value))
jpayne@69 1025 temp.sort()
jpayne@69 1026 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
jpayne@69 1027
jpayne@69 1028 class Value(object):
jpayne@69 1029 def __init__(self, typecode, value, lock=True):
jpayne@69 1030 self._typecode = typecode
jpayne@69 1031 self._value = value
jpayne@69 1032 def get(self):
jpayne@69 1033 return self._value
jpayne@69 1034 def set(self, value):
jpayne@69 1035 self._value = value
jpayne@69 1036 def __repr__(self):
jpayne@69 1037 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
jpayne@69 1038 value = property(get, set)
jpayne@69 1039
jpayne@69 1040 def Array(typecode, sequence, lock=True):
jpayne@69 1041 return array.array(typecode, sequence)
jpayne@69 1042
jpayne@69 1043 #
jpayne@69 1044 # Proxy types used by SyncManager
jpayne@69 1045 #
jpayne@69 1046
jpayne@69 1047 class IteratorProxy(BaseProxy):
jpayne@69 1048 _exposed_ = ('__next__', 'send', 'throw', 'close')
jpayne@69 1049 def __iter__(self):
jpayne@69 1050 return self
jpayne@69 1051 def __next__(self, *args):
jpayne@69 1052 return self._callmethod('__next__', args)
jpayne@69 1053 def send(self, *args):
jpayne@69 1054 return self._callmethod('send', args)
jpayne@69 1055 def throw(self, *args):
jpayne@69 1056 return self._callmethod('throw', args)
jpayne@69 1057 def close(self, *args):
jpayne@69 1058 return self._callmethod('close', args)
jpayne@69 1059
jpayne@69 1060
jpayne@69 1061 class AcquirerProxy(BaseProxy):
jpayne@69 1062 _exposed_ = ('acquire', 'release')
jpayne@69 1063 def acquire(self, blocking=True, timeout=None):
jpayne@69 1064 args = (blocking,) if timeout is None else (blocking, timeout)
jpayne@69 1065 return self._callmethod('acquire', args)
jpayne@69 1066 def release(self):
jpayne@69 1067 return self._callmethod('release')
jpayne@69 1068 def __enter__(self):
jpayne@69 1069 return self._callmethod('acquire')
jpayne@69 1070 def __exit__(self, exc_type, exc_val, exc_tb):
jpayne@69 1071 return self._callmethod('release')
jpayne@69 1072
jpayne@69 1073
jpayne@69 1074 class ConditionProxy(AcquirerProxy):
jpayne@69 1075 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
jpayne@69 1076 def wait(self, timeout=None):
jpayne@69 1077 return self._callmethod('wait', (timeout,))
jpayne@69 1078 def notify(self, n=1):
jpayne@69 1079 return self._callmethod('notify', (n,))
jpayne@69 1080 def notify_all(self):
jpayne@69 1081 return self._callmethod('notify_all')
jpayne@69 1082 def wait_for(self, predicate, timeout=None):
jpayne@69 1083 result = predicate()
jpayne@69 1084 if result:
jpayne@69 1085 return result
jpayne@69 1086 if timeout is not None:
jpayne@69 1087 endtime = time.monotonic() + timeout
jpayne@69 1088 else:
jpayne@69 1089 endtime = None
jpayne@69 1090 waittime = None
jpayne@69 1091 while not result:
jpayne@69 1092 if endtime is not None:
jpayne@69 1093 waittime = endtime - time.monotonic()
jpayne@69 1094 if waittime <= 0:
jpayne@69 1095 break
jpayne@69 1096 self.wait(waittime)
jpayne@69 1097 result = predicate()
jpayne@69 1098 return result
jpayne@69 1099
jpayne@69 1100
jpayne@69 1101 class EventProxy(BaseProxy):
jpayne@69 1102 _exposed_ = ('is_set', 'set', 'clear', 'wait')
jpayne@69 1103 def is_set(self):
jpayne@69 1104 return self._callmethod('is_set')
jpayne@69 1105 def set(self):
jpayne@69 1106 return self._callmethod('set')
jpayne@69 1107 def clear(self):
jpayne@69 1108 return self._callmethod('clear')
jpayne@69 1109 def wait(self, timeout=None):
jpayne@69 1110 return self._callmethod('wait', (timeout,))
jpayne@69 1111
jpayne@69 1112
jpayne@69 1113 class BarrierProxy(BaseProxy):
jpayne@69 1114 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
jpayne@69 1115 def wait(self, timeout=None):
jpayne@69 1116 return self._callmethod('wait', (timeout,))
jpayne@69 1117 def abort(self):
jpayne@69 1118 return self._callmethod('abort')
jpayne@69 1119 def reset(self):
jpayne@69 1120 return self._callmethod('reset')
jpayne@69 1121 @property
jpayne@69 1122 def parties(self):
jpayne@69 1123 return self._callmethod('__getattribute__', ('parties',))
jpayne@69 1124 @property
jpayne@69 1125 def n_waiting(self):
jpayne@69 1126 return self._callmethod('__getattribute__', ('n_waiting',))
jpayne@69 1127 @property
jpayne@69 1128 def broken(self):
jpayne@69 1129 return self._callmethod('__getattribute__', ('broken',))
jpayne@69 1130
jpayne@69 1131
jpayne@69 1132 class NamespaceProxy(BaseProxy):
jpayne@69 1133 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
jpayne@69 1134 def __getattr__(self, key):
jpayne@69 1135 if key[0] == '_':
jpayne@69 1136 return object.__getattribute__(self, key)
jpayne@69 1137 callmethod = object.__getattribute__(self, '_callmethod')
jpayne@69 1138 return callmethod('__getattribute__', (key,))
jpayne@69 1139 def __setattr__(self, key, value):
jpayne@69 1140 if key[0] == '_':
jpayne@69 1141 return object.__setattr__(self, key, value)
jpayne@69 1142 callmethod = object.__getattribute__(self, '_callmethod')
jpayne@69 1143 return callmethod('__setattr__', (key, value))
jpayne@69 1144 def __delattr__(self, key):
jpayne@69 1145 if key[0] == '_':
jpayne@69 1146 return object.__delattr__(self, key)
jpayne@69 1147 callmethod = object.__getattribute__(self, '_callmethod')
jpayne@69 1148 return callmethod('__delattr__', (key,))
jpayne@69 1149
jpayne@69 1150
jpayne@69 1151 class ValueProxy(BaseProxy):
jpayne@69 1152 _exposed_ = ('get', 'set')
jpayne@69 1153 def get(self):
jpayne@69 1154 return self._callmethod('get')
jpayne@69 1155 def set(self, value):
jpayne@69 1156 return self._callmethod('set', (value,))
jpayne@69 1157 value = property(get, set)
jpayne@69 1158
jpayne@69 1159
jpayne@69 1160 BaseListProxy = MakeProxyType('BaseListProxy', (
jpayne@69 1161 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
jpayne@69 1162 '__mul__', '__reversed__', '__rmul__', '__setitem__',
jpayne@69 1163 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
jpayne@69 1164 'reverse', 'sort', '__imul__'
jpayne@69 1165 ))
jpayne@69 1166 class ListProxy(BaseListProxy):
jpayne@69 1167 def __iadd__(self, value):
jpayne@69 1168 self._callmethod('extend', (value,))
jpayne@69 1169 return self
jpayne@69 1170 def __imul__(self, value):
jpayne@69 1171 self._callmethod('__imul__', (value,))
jpayne@69 1172 return self
jpayne@69 1173
jpayne@69 1174
jpayne@69 1175 DictProxy = MakeProxyType('DictProxy', (
jpayne@69 1176 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
jpayne@69 1177 '__setitem__', 'clear', 'copy', 'get', 'items',
jpayne@69 1178 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
jpayne@69 1179 ))
jpayne@69 1180 DictProxy._method_to_typeid_ = {
jpayne@69 1181 '__iter__': 'Iterator',
jpayne@69 1182 }
jpayne@69 1183
jpayne@69 1184
jpayne@69 1185 ArrayProxy = MakeProxyType('ArrayProxy', (
jpayne@69 1186 '__len__', '__getitem__', '__setitem__'
jpayne@69 1187 ))
jpayne@69 1188
jpayne@69 1189
jpayne@69 1190 BasePoolProxy = MakeProxyType('PoolProxy', (
jpayne@69 1191 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
jpayne@69 1192 'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
jpayne@69 1193 ))
jpayne@69 1194 BasePoolProxy._method_to_typeid_ = {
jpayne@69 1195 'apply_async': 'AsyncResult',
jpayne@69 1196 'map_async': 'AsyncResult',
jpayne@69 1197 'starmap_async': 'AsyncResult',
jpayne@69 1198 'imap': 'Iterator',
jpayne@69 1199 'imap_unordered': 'Iterator'
jpayne@69 1200 }
jpayne@69 1201 class PoolProxy(BasePoolProxy):
jpayne@69 1202 def __enter__(self):
jpayne@69 1203 return self
jpayne@69 1204 def __exit__(self, exc_type, exc_val, exc_tb):
jpayne@69 1205 self.terminate()
jpayne@69 1206
jpayne@69 1207 #
jpayne@69 1208 # Definition of SyncManager
jpayne@69 1209 #
jpayne@69 1210
jpayne@69 1211 class SyncManager(BaseManager):
jpayne@69 1212 '''
jpayne@69 1213 Subclass of `BaseManager` which supports a number of shared object types.
jpayne@69 1214
jpayne@69 1215 The types registered are those intended for the synchronization
jpayne@69 1216 of threads, plus `dict`, `list` and `Namespace`.
jpayne@69 1217
jpayne@69 1218 The `multiprocessing.Manager()` function creates started instances of
jpayne@69 1219 this class.
jpayne@69 1220 '''
jpayne@69 1221
jpayne@69 1222 SyncManager.register('Queue', queue.Queue)
jpayne@69 1223 SyncManager.register('JoinableQueue', queue.Queue)
jpayne@69 1224 SyncManager.register('Event', threading.Event, EventProxy)
jpayne@69 1225 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
jpayne@69 1226 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
jpayne@69 1227 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
jpayne@69 1228 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
jpayne@69 1229 AcquirerProxy)
jpayne@69 1230 SyncManager.register('Condition', threading.Condition, ConditionProxy)
jpayne@69 1231 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
jpayne@69 1232 SyncManager.register('Pool', pool.Pool, PoolProxy)
jpayne@69 1233 SyncManager.register('list', list, ListProxy)
jpayne@69 1234 SyncManager.register('dict', dict, DictProxy)
jpayne@69 1235 SyncManager.register('Value', Value, ValueProxy)
jpayne@69 1236 SyncManager.register('Array', Array, ArrayProxy)
jpayne@69 1237 SyncManager.register('Namespace', Namespace, NamespaceProxy)
jpayne@69 1238
jpayne@69 1239 # types returned by methods of PoolProxy
jpayne@69 1240 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
jpayne@69 1241 SyncManager.register('AsyncResult', create_method=False)
jpayne@69 1242
jpayne@69 1243 #
jpayne@69 1244 # Definition of SharedMemoryManager and SharedMemoryServer
jpayne@69 1245 #
jpayne@69 1246
jpayne@69 1247 if HAS_SHMEM:
jpayne@69 1248 class _SharedMemoryTracker:
jpayne@69 1249 "Manages one or more shared memory segments."
jpayne@69 1250
jpayne@69 1251 def __init__(self, name, segment_names=[]):
jpayne@69 1252 self.shared_memory_context_name = name
jpayne@69 1253 self.segment_names = segment_names
jpayne@69 1254
jpayne@69 1255 def register_segment(self, segment_name):
jpayne@69 1256 "Adds the supplied shared memory block name to tracker."
jpayne@69 1257 util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
jpayne@69 1258 self.segment_names.append(segment_name)
jpayne@69 1259
jpayne@69 1260 def destroy_segment(self, segment_name):
jpayne@69 1261 """Calls unlink() on the shared memory block with the supplied name
jpayne@69 1262 and removes it from the list of blocks being tracked."""
jpayne@69 1263 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
jpayne@69 1264 self.segment_names.remove(segment_name)
jpayne@69 1265 segment = shared_memory.SharedMemory(segment_name)
jpayne@69 1266 segment.close()
jpayne@69 1267 segment.unlink()
jpayne@69 1268
jpayne@69 1269 def unlink(self):
jpayne@69 1270 "Calls destroy_segment() on all tracked shared memory blocks."
jpayne@69 1271 for segment_name in self.segment_names[:]:
jpayne@69 1272 self.destroy_segment(segment_name)
jpayne@69 1273
jpayne@69 1274 def __del__(self):
jpayne@69 1275 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
jpayne@69 1276 self.unlink()
jpayne@69 1277
jpayne@69 1278 def __getstate__(self):
jpayne@69 1279 return (self.shared_memory_context_name, self.segment_names)
jpayne@69 1280
jpayne@69 1281 def __setstate__(self, state):
jpayne@69 1282 self.__init__(*state)
jpayne@69 1283
jpayne@69 1284
jpayne@69 1285 class SharedMemoryServer(Server):
jpayne@69 1286
jpayne@69 1287 public = Server.public + \
jpayne@69 1288 ['track_segment', 'release_segment', 'list_segments']
jpayne@69 1289
jpayne@69 1290 def __init__(self, *args, **kwargs):
jpayne@69 1291 Server.__init__(self, *args, **kwargs)
jpayne@69 1292 self.shared_memory_context = \
jpayne@69 1293 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
jpayne@69 1294 util.debug(f"SharedMemoryServer started by pid {getpid()}")
jpayne@69 1295
jpayne@69 1296 def create(*args, **kwargs):
jpayne@69 1297 """Create a new distributed-shared object (not backed by a shared
jpayne@69 1298 memory block) and return its id to be used in a Proxy Object."""
jpayne@69 1299 # Unless set up as a shared proxy, don't make shared_memory_context
jpayne@69 1300 # a standard part of kwargs. This makes things easier for supplying
jpayne@69 1301 # simple functions.
jpayne@69 1302 if len(args) >= 3:
jpayne@69 1303 typeod = args[2]
jpayne@69 1304 elif 'typeid' in kwargs:
jpayne@69 1305 typeid = kwargs['typeid']
jpayne@69 1306 elif not args:
jpayne@69 1307 raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
jpayne@69 1308 "object needs an argument")
jpayne@69 1309 else:
jpayne@69 1310 raise TypeError('create expected at least 2 positional '
jpayne@69 1311 'arguments, got %d' % (len(args)-1))
jpayne@69 1312 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
jpayne@69 1313 kwargs['shared_memory_context'] = self.shared_memory_context
jpayne@69 1314 return Server.create(*args, **kwargs)
jpayne@69 1315 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)'
jpayne@69 1316
jpayne@69 1317 def shutdown(self, c):
jpayne@69 1318 "Call unlink() on all tracked shared memory, terminate the Server."
jpayne@69 1319 self.shared_memory_context.unlink()
jpayne@69 1320 return Server.shutdown(self, c)
jpayne@69 1321
jpayne@69 1322 def track_segment(self, c, segment_name):
jpayne@69 1323 "Adds the supplied shared memory block name to Server's tracker."
jpayne@69 1324 self.shared_memory_context.register_segment(segment_name)
jpayne@69 1325
jpayne@69 1326 def release_segment(self, c, segment_name):
jpayne@69 1327 """Calls unlink() on the shared memory block with the supplied name
jpayne@69 1328 and removes it from the tracker instance inside the Server."""
jpayne@69 1329 self.shared_memory_context.destroy_segment(segment_name)
jpayne@69 1330
jpayne@69 1331 def list_segments(self, c):
jpayne@69 1332 """Returns a list of names of shared memory blocks that the Server
jpayne@69 1333 is currently tracking."""
jpayne@69 1334 return self.shared_memory_context.segment_names
jpayne@69 1335
jpayne@69 1336
jpayne@69 1337 class SharedMemoryManager(BaseManager):
jpayne@69 1338 """Like SyncManager but uses SharedMemoryServer instead of Server.
jpayne@69 1339
jpayne@69 1340 It provides methods for creating and returning SharedMemory instances
jpayne@69 1341 and for creating a list-like object (ShareableList) backed by shared
jpayne@69 1342 memory. It also provides methods that create and return Proxy Objects
jpayne@69 1343 that support synchronization across processes (i.e. multi-process-safe
jpayne@69 1344 locks and semaphores).
jpayne@69 1345 """
jpayne@69 1346
jpayne@69 1347 _Server = SharedMemoryServer
jpayne@69 1348
jpayne@69 1349 def __init__(self, *args, **kwargs):
jpayne@69 1350 if os.name == "posix":
jpayne@69 1351 # bpo-36867: Ensure the resource_tracker is running before
jpayne@69 1352 # launching the manager process, so that concurrent
jpayne@69 1353 # shared_memory manipulation both in the manager and in the
jpayne@69 1354 # current process does not create two resource_tracker
jpayne@69 1355 # processes.
jpayne@69 1356 from . import resource_tracker
jpayne@69 1357 resource_tracker.ensure_running()
jpayne@69 1358 BaseManager.__init__(self, *args, **kwargs)
jpayne@69 1359 util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
jpayne@69 1360
jpayne@69 1361 def __del__(self):
jpayne@69 1362 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
jpayne@69 1363 pass
jpayne@69 1364
jpayne@69 1365 def get_server(self):
jpayne@69 1366 'Better than monkeypatching for now; merge into Server ultimately'
jpayne@69 1367 if self._state.value != State.INITIAL:
jpayne@69 1368 if self._state.value == State.STARTED:
jpayne@69 1369 raise ProcessError("Already started SharedMemoryServer")
jpayne@69 1370 elif self._state.value == State.SHUTDOWN:
jpayne@69 1371 raise ProcessError("SharedMemoryManager has shut down")
jpayne@69 1372 else:
jpayne@69 1373 raise ProcessError(
jpayne@69 1374 "Unknown state {!r}".format(self._state.value))
jpayne@69 1375 return self._Server(self._registry, self._address,
jpayne@69 1376 self._authkey, self._serializer)
jpayne@69 1377
jpayne@69 1378 def SharedMemory(self, size):
jpayne@69 1379 """Returns a new SharedMemory instance with the specified size in
jpayne@69 1380 bytes, to be tracked by the manager."""
jpayne@69 1381 with self._Client(self._address, authkey=self._authkey) as conn:
jpayne@69 1382 sms = shared_memory.SharedMemory(None, create=True, size=size)
jpayne@69 1383 try:
jpayne@69 1384 dispatch(conn, None, 'track_segment', (sms.name,))
jpayne@69 1385 except BaseException as e:
jpayne@69 1386 sms.unlink()
jpayne@69 1387 raise e
jpayne@69 1388 return sms
jpayne@69 1389
jpayne@69 1390 def ShareableList(self, sequence):
jpayne@69 1391 """Returns a new ShareableList instance populated with the values
jpayne@69 1392 from the input sequence, to be tracked by the manager."""
jpayne@69 1393 with self._Client(self._address, authkey=self._authkey) as conn:
jpayne@69 1394 sl = shared_memory.ShareableList(sequence)
jpayne@69 1395 try:
jpayne@69 1396 dispatch(conn, None, 'track_segment', (sl.shm.name,))
jpayne@69 1397 except BaseException as e:
jpayne@69 1398 sl.shm.unlink()
jpayne@69 1399 raise e
jpayne@69 1400 return sl