jpayne@69
|
1 #
|
jpayne@69
|
2 # We use a background thread for sharing fds on Unix, and for sharing sockets on
|
jpayne@69
|
3 # Windows.
|
jpayne@69
|
4 #
|
jpayne@69
|
5 # A client which wants to pickle a resource registers it with the resource
|
jpayne@69
|
6 # sharer and gets an identifier in return. The unpickling process will connect
|
jpayne@69
|
7 # to the resource sharer, sends the identifier and its pid, and then receives
|
jpayne@69
|
8 # the resource.
|
jpayne@69
|
9 #
|
jpayne@69
|
10
|
jpayne@69
|
11 import os
|
jpayne@69
|
12 import signal
|
jpayne@69
|
13 import socket
|
jpayne@69
|
14 import sys
|
jpayne@69
|
15 import threading
|
jpayne@69
|
16
|
jpayne@69
|
17 from . import process
|
jpayne@69
|
18 from .context import reduction
|
jpayne@69
|
19 from . import util
|
jpayne@69
|
20
|
jpayne@69
|
21 __all__ = ['stop']
|
jpayne@69
|
22
|
jpayne@69
|
23
|
jpayne@69
|
24 if sys.platform == 'win32':
|
jpayne@69
|
25 __all__ += ['DupSocket']
|
jpayne@69
|
26
|
jpayne@69
|
27 class DupSocket(object):
|
jpayne@69
|
28 '''Picklable wrapper for a socket.'''
|
jpayne@69
|
29 def __init__(self, sock):
|
jpayne@69
|
30 new_sock = sock.dup()
|
jpayne@69
|
31 def send(conn, pid):
|
jpayne@69
|
32 share = new_sock.share(pid)
|
jpayne@69
|
33 conn.send_bytes(share)
|
jpayne@69
|
34 self._id = _resource_sharer.register(send, new_sock.close)
|
jpayne@69
|
35
|
jpayne@69
|
36 def detach(self):
|
jpayne@69
|
37 '''Get the socket. This should only be called once.'''
|
jpayne@69
|
38 with _resource_sharer.get_connection(self._id) as conn:
|
jpayne@69
|
39 share = conn.recv_bytes()
|
jpayne@69
|
40 return socket.fromshare(share)
|
jpayne@69
|
41
|
jpayne@69
|
42 else:
|
jpayne@69
|
43 __all__ += ['DupFd']
|
jpayne@69
|
44
|
jpayne@69
|
45 class DupFd(object):
|
jpayne@69
|
46 '''Wrapper for fd which can be used at any time.'''
|
jpayne@69
|
47 def __init__(self, fd):
|
jpayne@69
|
48 new_fd = os.dup(fd)
|
jpayne@69
|
49 def send(conn, pid):
|
jpayne@69
|
50 reduction.send_handle(conn, new_fd, pid)
|
jpayne@69
|
51 def close():
|
jpayne@69
|
52 os.close(new_fd)
|
jpayne@69
|
53 self._id = _resource_sharer.register(send, close)
|
jpayne@69
|
54
|
jpayne@69
|
55 def detach(self):
|
jpayne@69
|
56 '''Get the fd. This should only be called once.'''
|
jpayne@69
|
57 with _resource_sharer.get_connection(self._id) as conn:
|
jpayne@69
|
58 return reduction.recv_handle(conn)
|
jpayne@69
|
59
|
jpayne@69
|
60
|
jpayne@69
|
61 class _ResourceSharer(object):
|
jpayne@69
|
62 '''Manager for resources using background thread.'''
|
jpayne@69
|
63 def __init__(self):
|
jpayne@69
|
64 self._key = 0
|
jpayne@69
|
65 self._cache = {}
|
jpayne@69
|
66 self._old_locks = []
|
jpayne@69
|
67 self._lock = threading.Lock()
|
jpayne@69
|
68 self._listener = None
|
jpayne@69
|
69 self._address = None
|
jpayne@69
|
70 self._thread = None
|
jpayne@69
|
71 util.register_after_fork(self, _ResourceSharer._afterfork)
|
jpayne@69
|
72
|
jpayne@69
|
73 def register(self, send, close):
|
jpayne@69
|
74 '''Register resource, returning an identifier.'''
|
jpayne@69
|
75 with self._lock:
|
jpayne@69
|
76 if self._address is None:
|
jpayne@69
|
77 self._start()
|
jpayne@69
|
78 self._key += 1
|
jpayne@69
|
79 self._cache[self._key] = (send, close)
|
jpayne@69
|
80 return (self._address, self._key)
|
jpayne@69
|
81
|
jpayne@69
|
82 @staticmethod
|
jpayne@69
|
83 def get_connection(ident):
|
jpayne@69
|
84 '''Return connection from which to receive identified resource.'''
|
jpayne@69
|
85 from .connection import Client
|
jpayne@69
|
86 address, key = ident
|
jpayne@69
|
87 c = Client(address, authkey=process.current_process().authkey)
|
jpayne@69
|
88 c.send((key, os.getpid()))
|
jpayne@69
|
89 return c
|
jpayne@69
|
90
|
jpayne@69
|
91 def stop(self, timeout=None):
|
jpayne@69
|
92 '''Stop the background thread and clear registered resources.'''
|
jpayne@69
|
93 from .connection import Client
|
jpayne@69
|
94 with self._lock:
|
jpayne@69
|
95 if self._address is not None:
|
jpayne@69
|
96 c = Client(self._address,
|
jpayne@69
|
97 authkey=process.current_process().authkey)
|
jpayne@69
|
98 c.send(None)
|
jpayne@69
|
99 c.close()
|
jpayne@69
|
100 self._thread.join(timeout)
|
jpayne@69
|
101 if self._thread.is_alive():
|
jpayne@69
|
102 util.sub_warning('_ResourceSharer thread did '
|
jpayne@69
|
103 'not stop when asked')
|
jpayne@69
|
104 self._listener.close()
|
jpayne@69
|
105 self._thread = None
|
jpayne@69
|
106 self._address = None
|
jpayne@69
|
107 self._listener = None
|
jpayne@69
|
108 for key, (send, close) in self._cache.items():
|
jpayne@69
|
109 close()
|
jpayne@69
|
110 self._cache.clear()
|
jpayne@69
|
111
|
jpayne@69
|
112 def _afterfork(self):
|
jpayne@69
|
113 for key, (send, close) in self._cache.items():
|
jpayne@69
|
114 close()
|
jpayne@69
|
115 self._cache.clear()
|
jpayne@69
|
116 # If self._lock was locked at the time of the fork, it may be broken
|
jpayne@69
|
117 # -- see issue 6721. Replace it without letting it be gc'ed.
|
jpayne@69
|
118 self._old_locks.append(self._lock)
|
jpayne@69
|
119 self._lock = threading.Lock()
|
jpayne@69
|
120 if self._listener is not None:
|
jpayne@69
|
121 self._listener.close()
|
jpayne@69
|
122 self._listener = None
|
jpayne@69
|
123 self._address = None
|
jpayne@69
|
124 self._thread = None
|
jpayne@69
|
125
|
jpayne@69
|
126 def _start(self):
|
jpayne@69
|
127 from .connection import Listener
|
jpayne@69
|
128 assert self._listener is None, "Already have Listener"
|
jpayne@69
|
129 util.debug('starting listener and thread for sending handles')
|
jpayne@69
|
130 self._listener = Listener(authkey=process.current_process().authkey)
|
jpayne@69
|
131 self._address = self._listener.address
|
jpayne@69
|
132 t = threading.Thread(target=self._serve)
|
jpayne@69
|
133 t.daemon = True
|
jpayne@69
|
134 t.start()
|
jpayne@69
|
135 self._thread = t
|
jpayne@69
|
136
|
jpayne@69
|
137 def _serve(self):
|
jpayne@69
|
138 if hasattr(signal, 'pthread_sigmask'):
|
jpayne@69
|
139 signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
|
jpayne@69
|
140 while 1:
|
jpayne@69
|
141 try:
|
jpayne@69
|
142 with self._listener.accept() as conn:
|
jpayne@69
|
143 msg = conn.recv()
|
jpayne@69
|
144 if msg is None:
|
jpayne@69
|
145 break
|
jpayne@69
|
146 key, destination_pid = msg
|
jpayne@69
|
147 send, close = self._cache.pop(key)
|
jpayne@69
|
148 try:
|
jpayne@69
|
149 send(conn, destination_pid)
|
jpayne@69
|
150 finally:
|
jpayne@69
|
151 close()
|
jpayne@69
|
152 except:
|
jpayne@69
|
153 if not util.is_exiting():
|
jpayne@69
|
154 sys.excepthook(*sys.exc_info())
|
jpayne@69
|
155
|
jpayne@69
|
156
|
jpayne@69
|
157 _resource_sharer = _ResourceSharer()
|
jpayne@69
|
158 stop = _resource_sharer.stop
|