jpayne@68: # jpayne@68: # We use a background thread for sharing fds on Unix, and for sharing sockets on jpayne@68: # Windows. jpayne@68: # jpayne@68: # A client which wants to pickle a resource registers it with the resource jpayne@68: # sharer and gets an identifier in return. The unpickling process will connect jpayne@68: # to the resource sharer, sends the identifier and its pid, and then receives jpayne@68: # the resource. jpayne@68: # jpayne@68: jpayne@68: import os jpayne@68: import signal jpayne@68: import socket jpayne@68: import sys jpayne@68: import threading jpayne@68: jpayne@68: from . import process jpayne@68: from .context import reduction jpayne@68: from . import util jpayne@68: jpayne@68: __all__ = ['stop'] jpayne@68: jpayne@68: jpayne@68: if sys.platform == 'win32': jpayne@68: __all__ += ['DupSocket'] jpayne@68: jpayne@68: class DupSocket(object): jpayne@68: '''Picklable wrapper for a socket.''' jpayne@68: def __init__(self, sock): jpayne@68: new_sock = sock.dup() jpayne@68: def send(conn, pid): jpayne@68: share = new_sock.share(pid) jpayne@68: conn.send_bytes(share) jpayne@68: self._id = _resource_sharer.register(send, new_sock.close) jpayne@68: jpayne@68: def detach(self): jpayne@68: '''Get the socket. This should only be called once.''' jpayne@68: with _resource_sharer.get_connection(self._id) as conn: jpayne@68: share = conn.recv_bytes() jpayne@68: return socket.fromshare(share) jpayne@68: jpayne@68: else: jpayne@68: __all__ += ['DupFd'] jpayne@68: jpayne@68: class DupFd(object): jpayne@68: '''Wrapper for fd which can be used at any time.''' jpayne@68: def __init__(self, fd): jpayne@68: new_fd = os.dup(fd) jpayne@68: def send(conn, pid): jpayne@68: reduction.send_handle(conn, new_fd, pid) jpayne@68: def close(): jpayne@68: os.close(new_fd) jpayne@68: self._id = _resource_sharer.register(send, close) jpayne@68: jpayne@68: def detach(self): jpayne@68: '''Get the fd. This should only be called once.''' jpayne@68: with _resource_sharer.get_connection(self._id) as conn: jpayne@68: return reduction.recv_handle(conn) jpayne@68: jpayne@68: jpayne@68: class _ResourceSharer(object): jpayne@68: '''Manager for resources using background thread.''' jpayne@68: def __init__(self): jpayne@68: self._key = 0 jpayne@68: self._cache = {} jpayne@68: self._old_locks = [] jpayne@68: self._lock = threading.Lock() jpayne@68: self._listener = None jpayne@68: self._address = None jpayne@68: self._thread = None jpayne@68: util.register_after_fork(self, _ResourceSharer._afterfork) jpayne@68: jpayne@68: def register(self, send, close): jpayne@68: '''Register resource, returning an identifier.''' jpayne@68: with self._lock: jpayne@68: if self._address is None: jpayne@68: self._start() jpayne@68: self._key += 1 jpayne@68: self._cache[self._key] = (send, close) jpayne@68: return (self._address, self._key) jpayne@68: jpayne@68: @staticmethod jpayne@68: def get_connection(ident): jpayne@68: '''Return connection from which to receive identified resource.''' jpayne@68: from .connection import Client jpayne@68: address, key = ident jpayne@68: c = Client(address, authkey=process.current_process().authkey) jpayne@68: c.send((key, os.getpid())) jpayne@68: return c jpayne@68: jpayne@68: def stop(self, timeout=None): jpayne@68: '''Stop the background thread and clear registered resources.''' jpayne@68: from .connection import Client jpayne@68: with self._lock: jpayne@68: if self._address is not None: jpayne@68: c = Client(self._address, jpayne@68: authkey=process.current_process().authkey) jpayne@68: c.send(None) jpayne@68: c.close() jpayne@68: self._thread.join(timeout) jpayne@68: if self._thread.is_alive(): jpayne@68: util.sub_warning('_ResourceSharer thread did ' jpayne@68: 'not stop when asked') jpayne@68: self._listener.close() jpayne@68: self._thread = None jpayne@68: self._address = None jpayne@68: self._listener = None jpayne@68: for key, (send, close) in self._cache.items(): jpayne@68: close() jpayne@68: self._cache.clear() jpayne@68: jpayne@68: def _afterfork(self): jpayne@68: for key, (send, close) in self._cache.items(): jpayne@68: close() jpayne@68: self._cache.clear() jpayne@68: # If self._lock was locked at the time of the fork, it may be broken jpayne@68: # -- see issue 6721. Replace it without letting it be gc'ed. jpayne@68: self._old_locks.append(self._lock) jpayne@68: self._lock = threading.Lock() jpayne@68: if self._listener is not None: jpayne@68: self._listener.close() jpayne@68: self._listener = None jpayne@68: self._address = None jpayne@68: self._thread = None jpayne@68: jpayne@68: def _start(self): jpayne@68: from .connection import Listener jpayne@68: assert self._listener is None, "Already have Listener" jpayne@68: util.debug('starting listener and thread for sending handles') jpayne@68: self._listener = Listener(authkey=process.current_process().authkey) jpayne@68: self._address = self._listener.address jpayne@68: t = threading.Thread(target=self._serve) jpayne@68: t.daemon = True jpayne@68: t.start() jpayne@68: self._thread = t jpayne@68: jpayne@68: def _serve(self): jpayne@68: if hasattr(signal, 'pthread_sigmask'): jpayne@68: signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) jpayne@68: while 1: jpayne@68: try: jpayne@68: with self._listener.accept() as conn: jpayne@68: msg = conn.recv() jpayne@68: if msg is None: jpayne@68: break jpayne@68: key, destination_pid = msg jpayne@68: send, close = self._cache.pop(key) jpayne@68: try: jpayne@68: send(conn, destination_pid) jpayne@68: finally: jpayne@68: close() jpayne@68: except: jpayne@68: if not util.is_exiting(): jpayne@68: sys.excepthook(*sys.exc_info()) jpayne@68: jpayne@68: jpayne@68: _resource_sharer = _ResourceSharer() jpayne@68: stop = _resource_sharer.stop