jpayne@69: # jpayne@69: # We use a background thread for sharing fds on Unix, and for sharing sockets on jpayne@69: # Windows. jpayne@69: # jpayne@69: # A client which wants to pickle a resource registers it with the resource jpayne@69: # sharer and gets an identifier in return. The unpickling process will connect jpayne@69: # to the resource sharer, sends the identifier and its pid, and then receives jpayne@69: # the resource. jpayne@69: # jpayne@69: jpayne@69: import os jpayne@69: import signal jpayne@69: import socket jpayne@69: import sys jpayne@69: import threading jpayne@69: jpayne@69: from . import process jpayne@69: from .context import reduction jpayne@69: from . import util jpayne@69: jpayne@69: __all__ = ['stop'] jpayne@69: jpayne@69: jpayne@69: if sys.platform == 'win32': jpayne@69: __all__ += ['DupSocket'] jpayne@69: jpayne@69: class DupSocket(object): jpayne@69: '''Picklable wrapper for a socket.''' jpayne@69: def __init__(self, sock): jpayne@69: new_sock = sock.dup() jpayne@69: def send(conn, pid): jpayne@69: share = new_sock.share(pid) jpayne@69: conn.send_bytes(share) jpayne@69: self._id = _resource_sharer.register(send, new_sock.close) jpayne@69: jpayne@69: def detach(self): jpayne@69: '''Get the socket. This should only be called once.''' jpayne@69: with _resource_sharer.get_connection(self._id) as conn: jpayne@69: share = conn.recv_bytes() jpayne@69: return socket.fromshare(share) jpayne@69: jpayne@69: else: jpayne@69: __all__ += ['DupFd'] jpayne@69: jpayne@69: class DupFd(object): jpayne@69: '''Wrapper for fd which can be used at any time.''' jpayne@69: def __init__(self, fd): jpayne@69: new_fd = os.dup(fd) jpayne@69: def send(conn, pid): jpayne@69: reduction.send_handle(conn, new_fd, pid) jpayne@69: def close(): jpayne@69: os.close(new_fd) jpayne@69: self._id = _resource_sharer.register(send, close) jpayne@69: jpayne@69: def detach(self): jpayne@69: '''Get the fd. This should only be called once.''' jpayne@69: with _resource_sharer.get_connection(self._id) as conn: jpayne@69: return reduction.recv_handle(conn) jpayne@69: jpayne@69: jpayne@69: class _ResourceSharer(object): jpayne@69: '''Manager for resources using background thread.''' jpayne@69: def __init__(self): jpayne@69: self._key = 0 jpayne@69: self._cache = {} jpayne@69: self._old_locks = [] jpayne@69: self._lock = threading.Lock() jpayne@69: self._listener = None jpayne@69: self._address = None jpayne@69: self._thread = None jpayne@69: util.register_after_fork(self, _ResourceSharer._afterfork) jpayne@69: jpayne@69: def register(self, send, close): jpayne@69: '''Register resource, returning an identifier.''' jpayne@69: with self._lock: jpayne@69: if self._address is None: jpayne@69: self._start() jpayne@69: self._key += 1 jpayne@69: self._cache[self._key] = (send, close) jpayne@69: return (self._address, self._key) jpayne@69: jpayne@69: @staticmethod jpayne@69: def get_connection(ident): jpayne@69: '''Return connection from which to receive identified resource.''' jpayne@69: from .connection import Client jpayne@69: address, key = ident jpayne@69: c = Client(address, authkey=process.current_process().authkey) jpayne@69: c.send((key, os.getpid())) jpayne@69: return c jpayne@69: jpayne@69: def stop(self, timeout=None): jpayne@69: '''Stop the background thread and clear registered resources.''' jpayne@69: from .connection import Client jpayne@69: with self._lock: jpayne@69: if self._address is not None: jpayne@69: c = Client(self._address, jpayne@69: authkey=process.current_process().authkey) jpayne@69: c.send(None) jpayne@69: c.close() jpayne@69: self._thread.join(timeout) jpayne@69: if self._thread.is_alive(): jpayne@69: util.sub_warning('_ResourceSharer thread did ' jpayne@69: 'not stop when asked') jpayne@69: self._listener.close() jpayne@69: self._thread = None jpayne@69: self._address = None jpayne@69: self._listener = None jpayne@69: for key, (send, close) in self._cache.items(): jpayne@69: close() jpayne@69: self._cache.clear() jpayne@69: jpayne@69: def _afterfork(self): jpayne@69: for key, (send, close) in self._cache.items(): jpayne@69: close() jpayne@69: self._cache.clear() jpayne@69: # If self._lock was locked at the time of the fork, it may be broken jpayne@69: # -- see issue 6721. Replace it without letting it be gc'ed. jpayne@69: self._old_locks.append(self._lock) jpayne@69: self._lock = threading.Lock() jpayne@69: if self._listener is not None: jpayne@69: self._listener.close() jpayne@69: self._listener = None jpayne@69: self._address = None jpayne@69: self._thread = None jpayne@69: jpayne@69: def _start(self): jpayne@69: from .connection import Listener jpayne@69: assert self._listener is None, "Already have Listener" jpayne@69: util.debug('starting listener and thread for sending handles') jpayne@69: self._listener = Listener(authkey=process.current_process().authkey) jpayne@69: self._address = self._listener.address jpayne@69: t = threading.Thread(target=self._serve) jpayne@69: t.daemon = True jpayne@69: t.start() jpayne@69: self._thread = t jpayne@69: jpayne@69: def _serve(self): jpayne@69: if hasattr(signal, 'pthread_sigmask'): jpayne@69: signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) jpayne@69: while 1: jpayne@69: try: jpayne@69: with self._listener.accept() as conn: jpayne@69: msg = conn.recv() jpayne@69: if msg is None: jpayne@69: break jpayne@69: key, destination_pid = msg jpayne@69: send, close = self._cache.pop(key) jpayne@69: try: jpayne@69: send(conn, destination_pid) jpayne@69: finally: jpayne@69: close() jpayne@69: except: jpayne@69: if not util.is_exiting(): jpayne@69: sys.excepthook(*sys.exc_info()) jpayne@69: jpayne@69: jpayne@69: _resource_sharer = _ResourceSharer() jpayne@69: stop = _resource_sharer.stop