jpayne@68
|
1 #
|
jpayne@68
|
2 # A higher level module for using sockets (or Windows named pipes)
|
jpayne@68
|
3 #
|
jpayne@68
|
4 # multiprocessing/connection.py
|
jpayne@68
|
5 #
|
jpayne@68
|
6 # Copyright (c) 2006-2008, R Oudkerk
|
jpayne@68
|
7 # Licensed to PSF under a Contributor Agreement.
|
jpayne@68
|
8 #
|
jpayne@68
|
9
|
jpayne@68
|
10 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
|
jpayne@68
|
11
|
jpayne@68
|
12 import io
|
jpayne@68
|
13 import os
|
jpayne@68
|
14 import sys
|
jpayne@68
|
15 import socket
|
jpayne@68
|
16 import struct
|
jpayne@68
|
17 import time
|
jpayne@68
|
18 import tempfile
|
jpayne@68
|
19 import itertools
|
jpayne@68
|
20
|
jpayne@68
|
21 import _multiprocessing
|
jpayne@68
|
22
|
jpayne@68
|
23 from . import util
|
jpayne@68
|
24
|
jpayne@68
|
25 from . import AuthenticationError, BufferTooShort
|
jpayne@68
|
26 from .context import reduction
|
jpayne@68
|
27 _ForkingPickler = reduction.ForkingPickler
|
jpayne@68
|
28
|
jpayne@68
|
29 try:
|
jpayne@68
|
30 import _winapi
|
jpayne@68
|
31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
|
jpayne@68
|
32 except ImportError:
|
jpayne@68
|
33 if sys.platform == 'win32':
|
jpayne@68
|
34 raise
|
jpayne@68
|
35 _winapi = None
|
jpayne@68
|
36
|
jpayne@68
|
37 #
|
jpayne@68
|
38 #
|
jpayne@68
|
39 #
|
jpayne@68
|
40
|
jpayne@68
|
41 BUFSIZE = 8192
|
jpayne@68
|
42 # A very generous timeout when it comes to local connections...
|
jpayne@68
|
43 CONNECTION_TIMEOUT = 20.
|
jpayne@68
|
44
|
jpayne@68
|
45 _mmap_counter = itertools.count()
|
jpayne@68
|
46
|
jpayne@68
|
47 default_family = 'AF_INET'
|
jpayne@68
|
48 families = ['AF_INET']
|
jpayne@68
|
49
|
jpayne@68
|
50 if hasattr(socket, 'AF_UNIX'):
|
jpayne@68
|
51 default_family = 'AF_UNIX'
|
jpayne@68
|
52 families += ['AF_UNIX']
|
jpayne@68
|
53
|
jpayne@68
|
54 if sys.platform == 'win32':
|
jpayne@68
|
55 default_family = 'AF_PIPE'
|
jpayne@68
|
56 families += ['AF_PIPE']
|
jpayne@68
|
57
|
jpayne@68
|
58
|
jpayne@68
|
59 def _init_timeout(timeout=CONNECTION_TIMEOUT):
|
jpayne@68
|
60 return time.monotonic() + timeout
|
jpayne@68
|
61
|
jpayne@68
|
62 def _check_timeout(t):
|
jpayne@68
|
63 return time.monotonic() > t
|
jpayne@68
|
64
|
jpayne@68
|
65 #
|
jpayne@68
|
66 #
|
jpayne@68
|
67 #
|
jpayne@68
|
68
|
jpayne@68
|
69 def arbitrary_address(family):
|
jpayne@68
|
70 '''
|
jpayne@68
|
71 Return an arbitrary free address for the given family
|
jpayne@68
|
72 '''
|
jpayne@68
|
73 if family == 'AF_INET':
|
jpayne@68
|
74 return ('localhost', 0)
|
jpayne@68
|
75 elif family == 'AF_UNIX':
|
jpayne@68
|
76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
|
jpayne@68
|
77 elif family == 'AF_PIPE':
|
jpayne@68
|
78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
|
jpayne@68
|
79 (os.getpid(), next(_mmap_counter)), dir="")
|
jpayne@68
|
80 else:
|
jpayne@68
|
81 raise ValueError('unrecognized family')
|
jpayne@68
|
82
|
jpayne@68
|
83 def _validate_family(family):
|
jpayne@68
|
84 '''
|
jpayne@68
|
85 Checks if the family is valid for the current environment.
|
jpayne@68
|
86 '''
|
jpayne@68
|
87 if sys.platform != 'win32' and family == 'AF_PIPE':
|
jpayne@68
|
88 raise ValueError('Family %s is not recognized.' % family)
|
jpayne@68
|
89
|
jpayne@68
|
90 if sys.platform == 'win32' and family == 'AF_UNIX':
|
jpayne@68
|
91 # double check
|
jpayne@68
|
92 if not hasattr(socket, family):
|
jpayne@68
|
93 raise ValueError('Family %s is not recognized.' % family)
|
jpayne@68
|
94
|
jpayne@68
|
95 def address_type(address):
|
jpayne@68
|
96 '''
|
jpayne@68
|
97 Return the types of the address
|
jpayne@68
|
98
|
jpayne@68
|
99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
|
jpayne@68
|
100 '''
|
jpayne@68
|
101 if type(address) == tuple:
|
jpayne@68
|
102 return 'AF_INET'
|
jpayne@68
|
103 elif type(address) is str and address.startswith('\\\\'):
|
jpayne@68
|
104 return 'AF_PIPE'
|
jpayne@68
|
105 elif type(address) is str:
|
jpayne@68
|
106 return 'AF_UNIX'
|
jpayne@68
|
107 else:
|
jpayne@68
|
108 raise ValueError('address type of %r unrecognized' % address)
|
jpayne@68
|
109
|
jpayne@68
|
110 #
|
jpayne@68
|
111 # Connection classes
|
jpayne@68
|
112 #
|
jpayne@68
|
113
|
jpayne@68
|
114 class _ConnectionBase:
|
jpayne@68
|
115 _handle = None
|
jpayne@68
|
116
|
jpayne@68
|
117 def __init__(self, handle, readable=True, writable=True):
|
jpayne@68
|
118 handle = handle.__index__()
|
jpayne@68
|
119 if handle < 0:
|
jpayne@68
|
120 raise ValueError("invalid handle")
|
jpayne@68
|
121 if not readable and not writable:
|
jpayne@68
|
122 raise ValueError(
|
jpayne@68
|
123 "at least one of `readable` and `writable` must be True")
|
jpayne@68
|
124 self._handle = handle
|
jpayne@68
|
125 self._readable = readable
|
jpayne@68
|
126 self._writable = writable
|
jpayne@68
|
127
|
jpayne@68
|
128 # XXX should we use util.Finalize instead of a __del__?
|
jpayne@68
|
129
|
jpayne@68
|
130 def __del__(self):
|
jpayne@68
|
131 if self._handle is not None:
|
jpayne@68
|
132 self._close()
|
jpayne@68
|
133
|
jpayne@68
|
134 def _check_closed(self):
|
jpayne@68
|
135 if self._handle is None:
|
jpayne@68
|
136 raise OSError("handle is closed")
|
jpayne@68
|
137
|
jpayne@68
|
138 def _check_readable(self):
|
jpayne@68
|
139 if not self._readable:
|
jpayne@68
|
140 raise OSError("connection is write-only")
|
jpayne@68
|
141
|
jpayne@68
|
142 def _check_writable(self):
|
jpayne@68
|
143 if not self._writable:
|
jpayne@68
|
144 raise OSError("connection is read-only")
|
jpayne@68
|
145
|
jpayne@68
|
146 def _bad_message_length(self):
|
jpayne@68
|
147 if self._writable:
|
jpayne@68
|
148 self._readable = False
|
jpayne@68
|
149 else:
|
jpayne@68
|
150 self.close()
|
jpayne@68
|
151 raise OSError("bad message length")
|
jpayne@68
|
152
|
jpayne@68
|
153 @property
|
jpayne@68
|
154 def closed(self):
|
jpayne@68
|
155 """True if the connection is closed"""
|
jpayne@68
|
156 return self._handle is None
|
jpayne@68
|
157
|
jpayne@68
|
158 @property
|
jpayne@68
|
159 def readable(self):
|
jpayne@68
|
160 """True if the connection is readable"""
|
jpayne@68
|
161 return self._readable
|
jpayne@68
|
162
|
jpayne@68
|
163 @property
|
jpayne@68
|
164 def writable(self):
|
jpayne@68
|
165 """True if the connection is writable"""
|
jpayne@68
|
166 return self._writable
|
jpayne@68
|
167
|
jpayne@68
|
168 def fileno(self):
|
jpayne@68
|
169 """File descriptor or handle of the connection"""
|
jpayne@68
|
170 self._check_closed()
|
jpayne@68
|
171 return self._handle
|
jpayne@68
|
172
|
jpayne@68
|
173 def close(self):
|
jpayne@68
|
174 """Close the connection"""
|
jpayne@68
|
175 if self._handle is not None:
|
jpayne@68
|
176 try:
|
jpayne@68
|
177 self._close()
|
jpayne@68
|
178 finally:
|
jpayne@68
|
179 self._handle = None
|
jpayne@68
|
180
|
jpayne@68
|
181 def send_bytes(self, buf, offset=0, size=None):
|
jpayne@68
|
182 """Send the bytes data from a bytes-like object"""
|
jpayne@68
|
183 self._check_closed()
|
jpayne@68
|
184 self._check_writable()
|
jpayne@68
|
185 m = memoryview(buf)
|
jpayne@68
|
186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
|
jpayne@68
|
187 if m.itemsize > 1:
|
jpayne@68
|
188 m = memoryview(bytes(m))
|
jpayne@68
|
189 n = len(m)
|
jpayne@68
|
190 if offset < 0:
|
jpayne@68
|
191 raise ValueError("offset is negative")
|
jpayne@68
|
192 if n < offset:
|
jpayne@68
|
193 raise ValueError("buffer length < offset")
|
jpayne@68
|
194 if size is None:
|
jpayne@68
|
195 size = n - offset
|
jpayne@68
|
196 elif size < 0:
|
jpayne@68
|
197 raise ValueError("size is negative")
|
jpayne@68
|
198 elif offset + size > n:
|
jpayne@68
|
199 raise ValueError("buffer length < offset + size")
|
jpayne@68
|
200 self._send_bytes(m[offset:offset + size])
|
jpayne@68
|
201
|
jpayne@68
|
202 def send(self, obj):
|
jpayne@68
|
203 """Send a (picklable) object"""
|
jpayne@68
|
204 self._check_closed()
|
jpayne@68
|
205 self._check_writable()
|
jpayne@68
|
206 self._send_bytes(_ForkingPickler.dumps(obj))
|
jpayne@68
|
207
|
jpayne@68
|
208 def recv_bytes(self, maxlength=None):
|
jpayne@68
|
209 """
|
jpayne@68
|
210 Receive bytes data as a bytes object.
|
jpayne@68
|
211 """
|
jpayne@68
|
212 self._check_closed()
|
jpayne@68
|
213 self._check_readable()
|
jpayne@68
|
214 if maxlength is not None and maxlength < 0:
|
jpayne@68
|
215 raise ValueError("negative maxlength")
|
jpayne@68
|
216 buf = self._recv_bytes(maxlength)
|
jpayne@68
|
217 if buf is None:
|
jpayne@68
|
218 self._bad_message_length()
|
jpayne@68
|
219 return buf.getvalue()
|
jpayne@68
|
220
|
jpayne@68
|
221 def recv_bytes_into(self, buf, offset=0):
|
jpayne@68
|
222 """
|
jpayne@68
|
223 Receive bytes data into a writeable bytes-like object.
|
jpayne@68
|
224 Return the number of bytes read.
|
jpayne@68
|
225 """
|
jpayne@68
|
226 self._check_closed()
|
jpayne@68
|
227 self._check_readable()
|
jpayne@68
|
228 with memoryview(buf) as m:
|
jpayne@68
|
229 # Get bytesize of arbitrary buffer
|
jpayne@68
|
230 itemsize = m.itemsize
|
jpayne@68
|
231 bytesize = itemsize * len(m)
|
jpayne@68
|
232 if offset < 0:
|
jpayne@68
|
233 raise ValueError("negative offset")
|
jpayne@68
|
234 elif offset > bytesize:
|
jpayne@68
|
235 raise ValueError("offset too large")
|
jpayne@68
|
236 result = self._recv_bytes()
|
jpayne@68
|
237 size = result.tell()
|
jpayne@68
|
238 if bytesize < offset + size:
|
jpayne@68
|
239 raise BufferTooShort(result.getvalue())
|
jpayne@68
|
240 # Message can fit in dest
|
jpayne@68
|
241 result.seek(0)
|
jpayne@68
|
242 result.readinto(m[offset // itemsize :
|
jpayne@68
|
243 (offset + size) // itemsize])
|
jpayne@68
|
244 return size
|
jpayne@68
|
245
|
jpayne@68
|
246 def recv(self):
|
jpayne@68
|
247 """Receive a (picklable) object"""
|
jpayne@68
|
248 self._check_closed()
|
jpayne@68
|
249 self._check_readable()
|
jpayne@68
|
250 buf = self._recv_bytes()
|
jpayne@68
|
251 return _ForkingPickler.loads(buf.getbuffer())
|
jpayne@68
|
252
|
jpayne@68
|
253 def poll(self, timeout=0.0):
|
jpayne@68
|
254 """Whether there is any input available to be read"""
|
jpayne@68
|
255 self._check_closed()
|
jpayne@68
|
256 self._check_readable()
|
jpayne@68
|
257 return self._poll(timeout)
|
jpayne@68
|
258
|
jpayne@68
|
259 def __enter__(self):
|
jpayne@68
|
260 return self
|
jpayne@68
|
261
|
jpayne@68
|
262 def __exit__(self, exc_type, exc_value, exc_tb):
|
jpayne@68
|
263 self.close()
|
jpayne@68
|
264
|
jpayne@68
|
265
|
jpayne@68
|
266 if _winapi:
|
jpayne@68
|
267
|
jpayne@68
|
268 class PipeConnection(_ConnectionBase):
|
jpayne@68
|
269 """
|
jpayne@68
|
270 Connection class based on a Windows named pipe.
|
jpayne@68
|
271 Overlapped I/O is used, so the handles must have been created
|
jpayne@68
|
272 with FILE_FLAG_OVERLAPPED.
|
jpayne@68
|
273 """
|
jpayne@68
|
274 _got_empty_message = False
|
jpayne@68
|
275
|
jpayne@68
|
276 def _close(self, _CloseHandle=_winapi.CloseHandle):
|
jpayne@68
|
277 _CloseHandle(self._handle)
|
jpayne@68
|
278
|
jpayne@68
|
279 def _send_bytes(self, buf):
|
jpayne@68
|
280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
|
jpayne@68
|
281 try:
|
jpayne@68
|
282 if err == _winapi.ERROR_IO_PENDING:
|
jpayne@68
|
283 waitres = _winapi.WaitForMultipleObjects(
|
jpayne@68
|
284 [ov.event], False, INFINITE)
|
jpayne@68
|
285 assert waitres == WAIT_OBJECT_0
|
jpayne@68
|
286 except:
|
jpayne@68
|
287 ov.cancel()
|
jpayne@68
|
288 raise
|
jpayne@68
|
289 finally:
|
jpayne@68
|
290 nwritten, err = ov.GetOverlappedResult(True)
|
jpayne@68
|
291 assert err == 0
|
jpayne@68
|
292 assert nwritten == len(buf)
|
jpayne@68
|
293
|
jpayne@68
|
294 def _recv_bytes(self, maxsize=None):
|
jpayne@68
|
295 if self._got_empty_message:
|
jpayne@68
|
296 self._got_empty_message = False
|
jpayne@68
|
297 return io.BytesIO()
|
jpayne@68
|
298 else:
|
jpayne@68
|
299 bsize = 128 if maxsize is None else min(maxsize, 128)
|
jpayne@68
|
300 try:
|
jpayne@68
|
301 ov, err = _winapi.ReadFile(self._handle, bsize,
|
jpayne@68
|
302 overlapped=True)
|
jpayne@68
|
303 try:
|
jpayne@68
|
304 if err == _winapi.ERROR_IO_PENDING:
|
jpayne@68
|
305 waitres = _winapi.WaitForMultipleObjects(
|
jpayne@68
|
306 [ov.event], False, INFINITE)
|
jpayne@68
|
307 assert waitres == WAIT_OBJECT_0
|
jpayne@68
|
308 except:
|
jpayne@68
|
309 ov.cancel()
|
jpayne@68
|
310 raise
|
jpayne@68
|
311 finally:
|
jpayne@68
|
312 nread, err = ov.GetOverlappedResult(True)
|
jpayne@68
|
313 if err == 0:
|
jpayne@68
|
314 f = io.BytesIO()
|
jpayne@68
|
315 f.write(ov.getbuffer())
|
jpayne@68
|
316 return f
|
jpayne@68
|
317 elif err == _winapi.ERROR_MORE_DATA:
|
jpayne@68
|
318 return self._get_more_data(ov, maxsize)
|
jpayne@68
|
319 except OSError as e:
|
jpayne@68
|
320 if e.winerror == _winapi.ERROR_BROKEN_PIPE:
|
jpayne@68
|
321 raise EOFError
|
jpayne@68
|
322 else:
|
jpayne@68
|
323 raise
|
jpayne@68
|
324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
|
jpayne@68
|
325
|
jpayne@68
|
326 def _poll(self, timeout):
|
jpayne@68
|
327 if (self._got_empty_message or
|
jpayne@68
|
328 _winapi.PeekNamedPipe(self._handle)[0] != 0):
|
jpayne@68
|
329 return True
|
jpayne@68
|
330 return bool(wait([self], timeout))
|
jpayne@68
|
331
|
jpayne@68
|
332 def _get_more_data(self, ov, maxsize):
|
jpayne@68
|
333 buf = ov.getbuffer()
|
jpayne@68
|
334 f = io.BytesIO()
|
jpayne@68
|
335 f.write(buf)
|
jpayne@68
|
336 left = _winapi.PeekNamedPipe(self._handle)[1]
|
jpayne@68
|
337 assert left > 0
|
jpayne@68
|
338 if maxsize is not None and len(buf) + left > maxsize:
|
jpayne@68
|
339 self._bad_message_length()
|
jpayne@68
|
340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
|
jpayne@68
|
341 rbytes, err = ov.GetOverlappedResult(True)
|
jpayne@68
|
342 assert err == 0
|
jpayne@68
|
343 assert rbytes == left
|
jpayne@68
|
344 f.write(ov.getbuffer())
|
jpayne@68
|
345 return f
|
jpayne@68
|
346
|
jpayne@68
|
347
|
jpayne@68
|
348 class Connection(_ConnectionBase):
|
jpayne@68
|
349 """
|
jpayne@68
|
350 Connection class based on an arbitrary file descriptor (Unix only), or
|
jpayne@68
|
351 a socket handle (Windows).
|
jpayne@68
|
352 """
|
jpayne@68
|
353
|
jpayne@68
|
354 if _winapi:
|
jpayne@68
|
355 def _close(self, _close=_multiprocessing.closesocket):
|
jpayne@68
|
356 _close(self._handle)
|
jpayne@68
|
357 _write = _multiprocessing.send
|
jpayne@68
|
358 _read = _multiprocessing.recv
|
jpayne@68
|
359 else:
|
jpayne@68
|
360 def _close(self, _close=os.close):
|
jpayne@68
|
361 _close(self._handle)
|
jpayne@68
|
362 _write = os.write
|
jpayne@68
|
363 _read = os.read
|
jpayne@68
|
364
|
jpayne@68
|
365 def _send(self, buf, write=_write):
|
jpayne@68
|
366 remaining = len(buf)
|
jpayne@68
|
367 while True:
|
jpayne@68
|
368 n = write(self._handle, buf)
|
jpayne@68
|
369 remaining -= n
|
jpayne@68
|
370 if remaining == 0:
|
jpayne@68
|
371 break
|
jpayne@68
|
372 buf = buf[n:]
|
jpayne@68
|
373
|
jpayne@68
|
374 def _recv(self, size, read=_read):
|
jpayne@68
|
375 buf = io.BytesIO()
|
jpayne@68
|
376 handle = self._handle
|
jpayne@68
|
377 remaining = size
|
jpayne@68
|
378 while remaining > 0:
|
jpayne@68
|
379 chunk = read(handle, remaining)
|
jpayne@68
|
380 n = len(chunk)
|
jpayne@68
|
381 if n == 0:
|
jpayne@68
|
382 if remaining == size:
|
jpayne@68
|
383 raise EOFError
|
jpayne@68
|
384 else:
|
jpayne@68
|
385 raise OSError("got end of file during message")
|
jpayne@68
|
386 buf.write(chunk)
|
jpayne@68
|
387 remaining -= n
|
jpayne@68
|
388 return buf
|
jpayne@68
|
389
|
jpayne@68
|
390 def _send_bytes(self, buf):
|
jpayne@68
|
391 n = len(buf)
|
jpayne@68
|
392 if n > 0x7fffffff:
|
jpayne@68
|
393 pre_header = struct.pack("!i", -1)
|
jpayne@68
|
394 header = struct.pack("!Q", n)
|
jpayne@68
|
395 self._send(pre_header)
|
jpayne@68
|
396 self._send(header)
|
jpayne@68
|
397 self._send(buf)
|
jpayne@68
|
398 else:
|
jpayne@68
|
399 # For wire compatibility with 3.7 and lower
|
jpayne@68
|
400 header = struct.pack("!i", n)
|
jpayne@68
|
401 if n > 16384:
|
jpayne@68
|
402 # The payload is large so Nagle's algorithm won't be triggered
|
jpayne@68
|
403 # and we'd better avoid the cost of concatenation.
|
jpayne@68
|
404 self._send(header)
|
jpayne@68
|
405 self._send(buf)
|
jpayne@68
|
406 else:
|
jpayne@68
|
407 # Issue #20540: concatenate before sending, to avoid delays due
|
jpayne@68
|
408 # to Nagle's algorithm on a TCP socket.
|
jpayne@68
|
409 # Also note we want to avoid sending a 0-length buffer separately,
|
jpayne@68
|
410 # to avoid "broken pipe" errors if the other end closed the pipe.
|
jpayne@68
|
411 self._send(header + buf)
|
jpayne@68
|
412
|
jpayne@68
|
413 def _recv_bytes(self, maxsize=None):
|
jpayne@68
|
414 buf = self._recv(4)
|
jpayne@68
|
415 size, = struct.unpack("!i", buf.getvalue())
|
jpayne@68
|
416 if size == -1:
|
jpayne@68
|
417 buf = self._recv(8)
|
jpayne@68
|
418 size, = struct.unpack("!Q", buf.getvalue())
|
jpayne@68
|
419 if maxsize is not None and size > maxsize:
|
jpayne@68
|
420 return None
|
jpayne@68
|
421 return self._recv(size)
|
jpayne@68
|
422
|
jpayne@68
|
423 def _poll(self, timeout):
|
jpayne@68
|
424 r = wait([self], timeout)
|
jpayne@68
|
425 return bool(r)
|
jpayne@68
|
426
|
jpayne@68
|
427
|
jpayne@68
|
428 #
|
jpayne@68
|
429 # Public functions
|
jpayne@68
|
430 #
|
jpayne@68
|
431
|
jpayne@68
|
432 class Listener(object):
|
jpayne@68
|
433 '''
|
jpayne@68
|
434 Returns a listener object.
|
jpayne@68
|
435
|
jpayne@68
|
436 This is a wrapper for a bound socket which is 'listening' for
|
jpayne@68
|
437 connections, or for a Windows named pipe.
|
jpayne@68
|
438 '''
|
jpayne@68
|
439 def __init__(self, address=None, family=None, backlog=1, authkey=None):
|
jpayne@68
|
440 family = family or (address and address_type(address)) \
|
jpayne@68
|
441 or default_family
|
jpayne@68
|
442 address = address or arbitrary_address(family)
|
jpayne@68
|
443
|
jpayne@68
|
444 _validate_family(family)
|
jpayne@68
|
445 if family == 'AF_PIPE':
|
jpayne@68
|
446 self._listener = PipeListener(address, backlog)
|
jpayne@68
|
447 else:
|
jpayne@68
|
448 self._listener = SocketListener(address, family, backlog)
|
jpayne@68
|
449
|
jpayne@68
|
450 if authkey is not None and not isinstance(authkey, bytes):
|
jpayne@68
|
451 raise TypeError('authkey should be a byte string')
|
jpayne@68
|
452
|
jpayne@68
|
453 self._authkey = authkey
|
jpayne@68
|
454
|
jpayne@68
|
455 def accept(self):
|
jpayne@68
|
456 '''
|
jpayne@68
|
457 Accept a connection on the bound socket or named pipe of `self`.
|
jpayne@68
|
458
|
jpayne@68
|
459 Returns a `Connection` object.
|
jpayne@68
|
460 '''
|
jpayne@68
|
461 if self._listener is None:
|
jpayne@68
|
462 raise OSError('listener is closed')
|
jpayne@68
|
463 c = self._listener.accept()
|
jpayne@68
|
464 if self._authkey:
|
jpayne@68
|
465 deliver_challenge(c, self._authkey)
|
jpayne@68
|
466 answer_challenge(c, self._authkey)
|
jpayne@68
|
467 return c
|
jpayne@68
|
468
|
jpayne@68
|
469 def close(self):
|
jpayne@68
|
470 '''
|
jpayne@68
|
471 Close the bound socket or named pipe of `self`.
|
jpayne@68
|
472 '''
|
jpayne@68
|
473 listener = self._listener
|
jpayne@68
|
474 if listener is not None:
|
jpayne@68
|
475 self._listener = None
|
jpayne@68
|
476 listener.close()
|
jpayne@68
|
477
|
jpayne@68
|
478 @property
|
jpayne@68
|
479 def address(self):
|
jpayne@68
|
480 return self._listener._address
|
jpayne@68
|
481
|
jpayne@68
|
482 @property
|
jpayne@68
|
483 def last_accepted(self):
|
jpayne@68
|
484 return self._listener._last_accepted
|
jpayne@68
|
485
|
jpayne@68
|
486 def __enter__(self):
|
jpayne@68
|
487 return self
|
jpayne@68
|
488
|
jpayne@68
|
489 def __exit__(self, exc_type, exc_value, exc_tb):
|
jpayne@68
|
490 self.close()
|
jpayne@68
|
491
|
jpayne@68
|
492
|
jpayne@68
|
493 def Client(address, family=None, authkey=None):
|
jpayne@68
|
494 '''
|
jpayne@68
|
495 Returns a connection to the address of a `Listener`
|
jpayne@68
|
496 '''
|
jpayne@68
|
497 family = family or address_type(address)
|
jpayne@68
|
498 _validate_family(family)
|
jpayne@68
|
499 if family == 'AF_PIPE':
|
jpayne@68
|
500 c = PipeClient(address)
|
jpayne@68
|
501 else:
|
jpayne@68
|
502 c = SocketClient(address)
|
jpayne@68
|
503
|
jpayne@68
|
504 if authkey is not None and not isinstance(authkey, bytes):
|
jpayne@68
|
505 raise TypeError('authkey should be a byte string')
|
jpayne@68
|
506
|
jpayne@68
|
507 if authkey is not None:
|
jpayne@68
|
508 answer_challenge(c, authkey)
|
jpayne@68
|
509 deliver_challenge(c, authkey)
|
jpayne@68
|
510
|
jpayne@68
|
511 return c
|
jpayne@68
|
512
|
jpayne@68
|
513
|
jpayne@68
|
514 if sys.platform != 'win32':
|
jpayne@68
|
515
|
jpayne@68
|
516 def Pipe(duplex=True):
|
jpayne@68
|
517 '''
|
jpayne@68
|
518 Returns pair of connection objects at either end of a pipe
|
jpayne@68
|
519 '''
|
jpayne@68
|
520 if duplex:
|
jpayne@68
|
521 s1, s2 = socket.socketpair()
|
jpayne@68
|
522 s1.setblocking(True)
|
jpayne@68
|
523 s2.setblocking(True)
|
jpayne@68
|
524 c1 = Connection(s1.detach())
|
jpayne@68
|
525 c2 = Connection(s2.detach())
|
jpayne@68
|
526 else:
|
jpayne@68
|
527 fd1, fd2 = os.pipe()
|
jpayne@68
|
528 c1 = Connection(fd1, writable=False)
|
jpayne@68
|
529 c2 = Connection(fd2, readable=False)
|
jpayne@68
|
530
|
jpayne@68
|
531 return c1, c2
|
jpayne@68
|
532
|
jpayne@68
|
533 else:
|
jpayne@68
|
534
|
jpayne@68
|
535 def Pipe(duplex=True):
|
jpayne@68
|
536 '''
|
jpayne@68
|
537 Returns pair of connection objects at either end of a pipe
|
jpayne@68
|
538 '''
|
jpayne@68
|
539 address = arbitrary_address('AF_PIPE')
|
jpayne@68
|
540 if duplex:
|
jpayne@68
|
541 openmode = _winapi.PIPE_ACCESS_DUPLEX
|
jpayne@68
|
542 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
|
jpayne@68
|
543 obsize, ibsize = BUFSIZE, BUFSIZE
|
jpayne@68
|
544 else:
|
jpayne@68
|
545 openmode = _winapi.PIPE_ACCESS_INBOUND
|
jpayne@68
|
546 access = _winapi.GENERIC_WRITE
|
jpayne@68
|
547 obsize, ibsize = 0, BUFSIZE
|
jpayne@68
|
548
|
jpayne@68
|
549 h1 = _winapi.CreateNamedPipe(
|
jpayne@68
|
550 address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
|
jpayne@68
|
551 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
|
jpayne@68
|
552 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
|
jpayne@68
|
553 _winapi.PIPE_WAIT,
|
jpayne@68
|
554 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
|
jpayne@68
|
555 # default security descriptor: the handle cannot be inherited
|
jpayne@68
|
556 _winapi.NULL
|
jpayne@68
|
557 )
|
jpayne@68
|
558 h2 = _winapi.CreateFile(
|
jpayne@68
|
559 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
|
jpayne@68
|
560 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
|
jpayne@68
|
561 )
|
jpayne@68
|
562 _winapi.SetNamedPipeHandleState(
|
jpayne@68
|
563 h2, _winapi.PIPE_READMODE_MESSAGE, None, None
|
jpayne@68
|
564 )
|
jpayne@68
|
565
|
jpayne@68
|
566 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
|
jpayne@68
|
567 _, err = overlapped.GetOverlappedResult(True)
|
jpayne@68
|
568 assert err == 0
|
jpayne@68
|
569
|
jpayne@68
|
570 c1 = PipeConnection(h1, writable=duplex)
|
jpayne@68
|
571 c2 = PipeConnection(h2, readable=duplex)
|
jpayne@68
|
572
|
jpayne@68
|
573 return c1, c2
|
jpayne@68
|
574
|
jpayne@68
|
575 #
|
jpayne@68
|
576 # Definitions for connections based on sockets
|
jpayne@68
|
577 #
|
jpayne@68
|
578
|
jpayne@68
|
579 class SocketListener(object):
|
jpayne@68
|
580 '''
|
jpayne@68
|
581 Representation of a socket which is bound to an address and listening
|
jpayne@68
|
582 '''
|
jpayne@68
|
583 def __init__(self, address, family, backlog=1):
|
jpayne@68
|
584 self._socket = socket.socket(getattr(socket, family))
|
jpayne@68
|
585 try:
|
jpayne@68
|
586 # SO_REUSEADDR has different semantics on Windows (issue #2550).
|
jpayne@68
|
587 if os.name == 'posix':
|
jpayne@68
|
588 self._socket.setsockopt(socket.SOL_SOCKET,
|
jpayne@68
|
589 socket.SO_REUSEADDR, 1)
|
jpayne@68
|
590 self._socket.setblocking(True)
|
jpayne@68
|
591 self._socket.bind(address)
|
jpayne@68
|
592 self._socket.listen(backlog)
|
jpayne@68
|
593 self._address = self._socket.getsockname()
|
jpayne@68
|
594 except OSError:
|
jpayne@68
|
595 self._socket.close()
|
jpayne@68
|
596 raise
|
jpayne@68
|
597 self._family = family
|
jpayne@68
|
598 self._last_accepted = None
|
jpayne@68
|
599
|
jpayne@68
|
600 if family == 'AF_UNIX':
|
jpayne@68
|
601 self._unlink = util.Finalize(
|
jpayne@68
|
602 self, os.unlink, args=(address,), exitpriority=0
|
jpayne@68
|
603 )
|
jpayne@68
|
604 else:
|
jpayne@68
|
605 self._unlink = None
|
jpayne@68
|
606
|
jpayne@68
|
607 def accept(self):
|
jpayne@68
|
608 s, self._last_accepted = self._socket.accept()
|
jpayne@68
|
609 s.setblocking(True)
|
jpayne@68
|
610 return Connection(s.detach())
|
jpayne@68
|
611
|
jpayne@68
|
612 def close(self):
|
jpayne@68
|
613 try:
|
jpayne@68
|
614 self._socket.close()
|
jpayne@68
|
615 finally:
|
jpayne@68
|
616 unlink = self._unlink
|
jpayne@68
|
617 if unlink is not None:
|
jpayne@68
|
618 self._unlink = None
|
jpayne@68
|
619 unlink()
|
jpayne@68
|
620
|
jpayne@68
|
621
|
jpayne@68
|
622 def SocketClient(address):
|
jpayne@68
|
623 '''
|
jpayne@68
|
624 Return a connection object connected to the socket given by `address`
|
jpayne@68
|
625 '''
|
jpayne@68
|
626 family = address_type(address)
|
jpayne@68
|
627 with socket.socket( getattr(socket, family) ) as s:
|
jpayne@68
|
628 s.setblocking(True)
|
jpayne@68
|
629 s.connect(address)
|
jpayne@68
|
630 return Connection(s.detach())
|
jpayne@68
|
631
|
jpayne@68
|
632 #
|
jpayne@68
|
633 # Definitions for connections based on named pipes
|
jpayne@68
|
634 #
|
jpayne@68
|
635
|
jpayne@68
|
636 if sys.platform == 'win32':
|
jpayne@68
|
637
|
jpayne@68
|
638 class PipeListener(object):
|
jpayne@68
|
639 '''
|
jpayne@68
|
640 Representation of a named pipe
|
jpayne@68
|
641 '''
|
jpayne@68
|
642 def __init__(self, address, backlog=None):
|
jpayne@68
|
643 self._address = address
|
jpayne@68
|
644 self._handle_queue = [self._new_handle(first=True)]
|
jpayne@68
|
645
|
jpayne@68
|
646 self._last_accepted = None
|
jpayne@68
|
647 util.sub_debug('listener created with address=%r', self._address)
|
jpayne@68
|
648 self.close = util.Finalize(
|
jpayne@68
|
649 self, PipeListener._finalize_pipe_listener,
|
jpayne@68
|
650 args=(self._handle_queue, self._address), exitpriority=0
|
jpayne@68
|
651 )
|
jpayne@68
|
652
|
jpayne@68
|
653 def _new_handle(self, first=False):
|
jpayne@68
|
654 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
|
jpayne@68
|
655 if first:
|
jpayne@68
|
656 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
|
jpayne@68
|
657 return _winapi.CreateNamedPipe(
|
jpayne@68
|
658 self._address, flags,
|
jpayne@68
|
659 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
|
jpayne@68
|
660 _winapi.PIPE_WAIT,
|
jpayne@68
|
661 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
|
jpayne@68
|
662 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
|
jpayne@68
|
663 )
|
jpayne@68
|
664
|
jpayne@68
|
665 def accept(self):
|
jpayne@68
|
666 self._handle_queue.append(self._new_handle())
|
jpayne@68
|
667 handle = self._handle_queue.pop(0)
|
jpayne@68
|
668 try:
|
jpayne@68
|
669 ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
|
jpayne@68
|
670 except OSError as e:
|
jpayne@68
|
671 if e.winerror != _winapi.ERROR_NO_DATA:
|
jpayne@68
|
672 raise
|
jpayne@68
|
673 # ERROR_NO_DATA can occur if a client has already connected,
|
jpayne@68
|
674 # written data and then disconnected -- see Issue 14725.
|
jpayne@68
|
675 else:
|
jpayne@68
|
676 try:
|
jpayne@68
|
677 res = _winapi.WaitForMultipleObjects(
|
jpayne@68
|
678 [ov.event], False, INFINITE)
|
jpayne@68
|
679 except:
|
jpayne@68
|
680 ov.cancel()
|
jpayne@68
|
681 _winapi.CloseHandle(handle)
|
jpayne@68
|
682 raise
|
jpayne@68
|
683 finally:
|
jpayne@68
|
684 _, err = ov.GetOverlappedResult(True)
|
jpayne@68
|
685 assert err == 0
|
jpayne@68
|
686 return PipeConnection(handle)
|
jpayne@68
|
687
|
jpayne@68
|
688 @staticmethod
|
jpayne@68
|
689 def _finalize_pipe_listener(queue, address):
|
jpayne@68
|
690 util.sub_debug('closing listener with address=%r', address)
|
jpayne@68
|
691 for handle in queue:
|
jpayne@68
|
692 _winapi.CloseHandle(handle)
|
jpayne@68
|
693
|
jpayne@68
|
694 def PipeClient(address):
|
jpayne@68
|
695 '''
|
jpayne@68
|
696 Return a connection object connected to the pipe given by `address`
|
jpayne@68
|
697 '''
|
jpayne@68
|
698 t = _init_timeout()
|
jpayne@68
|
699 while 1:
|
jpayne@68
|
700 try:
|
jpayne@68
|
701 _winapi.WaitNamedPipe(address, 1000)
|
jpayne@68
|
702 h = _winapi.CreateFile(
|
jpayne@68
|
703 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
|
jpayne@68
|
704 0, _winapi.NULL, _winapi.OPEN_EXISTING,
|
jpayne@68
|
705 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
|
jpayne@68
|
706 )
|
jpayne@68
|
707 except OSError as e:
|
jpayne@68
|
708 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
|
jpayne@68
|
709 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
|
jpayne@68
|
710 raise
|
jpayne@68
|
711 else:
|
jpayne@68
|
712 break
|
jpayne@68
|
713 else:
|
jpayne@68
|
714 raise
|
jpayne@68
|
715
|
jpayne@68
|
716 _winapi.SetNamedPipeHandleState(
|
jpayne@68
|
717 h, _winapi.PIPE_READMODE_MESSAGE, None, None
|
jpayne@68
|
718 )
|
jpayne@68
|
719 return PipeConnection(h)
|
jpayne@68
|
720
|
jpayne@68
|
721 #
|
jpayne@68
|
722 # Authentication stuff
|
jpayne@68
|
723 #
|
jpayne@68
|
724
|
jpayne@68
|
725 MESSAGE_LENGTH = 20
|
jpayne@68
|
726
|
jpayne@68
|
727 CHALLENGE = b'#CHALLENGE#'
|
jpayne@68
|
728 WELCOME = b'#WELCOME#'
|
jpayne@68
|
729 FAILURE = b'#FAILURE#'
|
jpayne@68
|
730
|
jpayne@68
|
731 def deliver_challenge(connection, authkey):
|
jpayne@68
|
732 import hmac
|
jpayne@68
|
733 if not isinstance(authkey, bytes):
|
jpayne@68
|
734 raise ValueError(
|
jpayne@68
|
735 "Authkey must be bytes, not {0!s}".format(type(authkey)))
|
jpayne@68
|
736 message = os.urandom(MESSAGE_LENGTH)
|
jpayne@68
|
737 connection.send_bytes(CHALLENGE + message)
|
jpayne@68
|
738 digest = hmac.new(authkey, message, 'md5').digest()
|
jpayne@68
|
739 response = connection.recv_bytes(256) # reject large message
|
jpayne@68
|
740 if response == digest:
|
jpayne@68
|
741 connection.send_bytes(WELCOME)
|
jpayne@68
|
742 else:
|
jpayne@68
|
743 connection.send_bytes(FAILURE)
|
jpayne@68
|
744 raise AuthenticationError('digest received was wrong')
|
jpayne@68
|
745
|
jpayne@68
|
746 def answer_challenge(connection, authkey):
|
jpayne@68
|
747 import hmac
|
jpayne@68
|
748 if not isinstance(authkey, bytes):
|
jpayne@68
|
749 raise ValueError(
|
jpayne@68
|
750 "Authkey must be bytes, not {0!s}".format(type(authkey)))
|
jpayne@68
|
751 message = connection.recv_bytes(256) # reject large message
|
jpayne@68
|
752 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
|
jpayne@68
|
753 message = message[len(CHALLENGE):]
|
jpayne@68
|
754 digest = hmac.new(authkey, message, 'md5').digest()
|
jpayne@68
|
755 connection.send_bytes(digest)
|
jpayne@68
|
756 response = connection.recv_bytes(256) # reject large message
|
jpayne@68
|
757 if response != WELCOME:
|
jpayne@68
|
758 raise AuthenticationError('digest sent was rejected')
|
jpayne@68
|
759
|
jpayne@68
|
760 #
|
jpayne@68
|
761 # Support for using xmlrpclib for serialization
|
jpayne@68
|
762 #
|
jpayne@68
|
763
|
jpayne@68
|
764 class ConnectionWrapper(object):
|
jpayne@68
|
765 def __init__(self, conn, dumps, loads):
|
jpayne@68
|
766 self._conn = conn
|
jpayne@68
|
767 self._dumps = dumps
|
jpayne@68
|
768 self._loads = loads
|
jpayne@68
|
769 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
|
jpayne@68
|
770 obj = getattr(conn, attr)
|
jpayne@68
|
771 setattr(self, attr, obj)
|
jpayne@68
|
772 def send(self, obj):
|
jpayne@68
|
773 s = self._dumps(obj)
|
jpayne@68
|
774 self._conn.send_bytes(s)
|
jpayne@68
|
775 def recv(self):
|
jpayne@68
|
776 s = self._conn.recv_bytes()
|
jpayne@68
|
777 return self._loads(s)
|
jpayne@68
|
778
|
jpayne@68
|
779 def _xml_dumps(obj):
|
jpayne@68
|
780 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
|
jpayne@68
|
781
|
jpayne@68
|
782 def _xml_loads(s):
|
jpayne@68
|
783 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
|
jpayne@68
|
784 return obj
|
jpayne@68
|
785
|
jpayne@68
|
786 class XmlListener(Listener):
|
jpayne@68
|
787 def accept(self):
|
jpayne@68
|
788 global xmlrpclib
|
jpayne@68
|
789 import xmlrpc.client as xmlrpclib
|
jpayne@68
|
790 obj = Listener.accept(self)
|
jpayne@68
|
791 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
|
jpayne@68
|
792
|
jpayne@68
|
793 def XmlClient(*args, **kwds):
|
jpayne@68
|
794 global xmlrpclib
|
jpayne@68
|
795 import xmlrpc.client as xmlrpclib
|
jpayne@68
|
796 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
|
jpayne@68
|
797
|
jpayne@68
|
798 #
|
jpayne@68
|
799 # Wait
|
jpayne@68
|
800 #
|
jpayne@68
|
801
|
jpayne@68
|
802 if sys.platform == 'win32':
|
jpayne@68
|
803
|
jpayne@68
|
804 def _exhaustive_wait(handles, timeout):
|
jpayne@68
|
805 # Return ALL handles which are currently signalled. (Only
|
jpayne@68
|
806 # returning the first signalled might create starvation issues.)
|
jpayne@68
|
807 L = list(handles)
|
jpayne@68
|
808 ready = []
|
jpayne@68
|
809 while L:
|
jpayne@68
|
810 res = _winapi.WaitForMultipleObjects(L, False, timeout)
|
jpayne@68
|
811 if res == WAIT_TIMEOUT:
|
jpayne@68
|
812 break
|
jpayne@68
|
813 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
|
jpayne@68
|
814 res -= WAIT_OBJECT_0
|
jpayne@68
|
815 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
|
jpayne@68
|
816 res -= WAIT_ABANDONED_0
|
jpayne@68
|
817 else:
|
jpayne@68
|
818 raise RuntimeError('Should not get here')
|
jpayne@68
|
819 ready.append(L[res])
|
jpayne@68
|
820 L = L[res+1:]
|
jpayne@68
|
821 timeout = 0
|
jpayne@68
|
822 return ready
|
jpayne@68
|
823
|
jpayne@68
|
824 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
|
jpayne@68
|
825
|
jpayne@68
|
826 def wait(object_list, timeout=None):
|
jpayne@68
|
827 '''
|
jpayne@68
|
828 Wait till an object in object_list is ready/readable.
|
jpayne@68
|
829
|
jpayne@68
|
830 Returns list of those objects in object_list which are ready/readable.
|
jpayne@68
|
831 '''
|
jpayne@68
|
832 if timeout is None:
|
jpayne@68
|
833 timeout = INFINITE
|
jpayne@68
|
834 elif timeout < 0:
|
jpayne@68
|
835 timeout = 0
|
jpayne@68
|
836 else:
|
jpayne@68
|
837 timeout = int(timeout * 1000 + 0.5)
|
jpayne@68
|
838
|
jpayne@68
|
839 object_list = list(object_list)
|
jpayne@68
|
840 waithandle_to_obj = {}
|
jpayne@68
|
841 ov_list = []
|
jpayne@68
|
842 ready_objects = set()
|
jpayne@68
|
843 ready_handles = set()
|
jpayne@68
|
844
|
jpayne@68
|
845 try:
|
jpayne@68
|
846 for o in object_list:
|
jpayne@68
|
847 try:
|
jpayne@68
|
848 fileno = getattr(o, 'fileno')
|
jpayne@68
|
849 except AttributeError:
|
jpayne@68
|
850 waithandle_to_obj[o.__index__()] = o
|
jpayne@68
|
851 else:
|
jpayne@68
|
852 # start an overlapped read of length zero
|
jpayne@68
|
853 try:
|
jpayne@68
|
854 ov, err = _winapi.ReadFile(fileno(), 0, True)
|
jpayne@68
|
855 except OSError as e:
|
jpayne@68
|
856 ov, err = None, e.winerror
|
jpayne@68
|
857 if err not in _ready_errors:
|
jpayne@68
|
858 raise
|
jpayne@68
|
859 if err == _winapi.ERROR_IO_PENDING:
|
jpayne@68
|
860 ov_list.append(ov)
|
jpayne@68
|
861 waithandle_to_obj[ov.event] = o
|
jpayne@68
|
862 else:
|
jpayne@68
|
863 # If o.fileno() is an overlapped pipe handle and
|
jpayne@68
|
864 # err == 0 then there is a zero length message
|
jpayne@68
|
865 # in the pipe, but it HAS NOT been consumed...
|
jpayne@68
|
866 if ov and sys.getwindowsversion()[:2] >= (6, 2):
|
jpayne@68
|
867 # ... except on Windows 8 and later, where
|
jpayne@68
|
868 # the message HAS been consumed.
|
jpayne@68
|
869 try:
|
jpayne@68
|
870 _, err = ov.GetOverlappedResult(False)
|
jpayne@68
|
871 except OSError as e:
|
jpayne@68
|
872 err = e.winerror
|
jpayne@68
|
873 if not err and hasattr(o, '_got_empty_message'):
|
jpayne@68
|
874 o._got_empty_message = True
|
jpayne@68
|
875 ready_objects.add(o)
|
jpayne@68
|
876 timeout = 0
|
jpayne@68
|
877
|
jpayne@68
|
878 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
|
jpayne@68
|
879 finally:
|
jpayne@68
|
880 # request that overlapped reads stop
|
jpayne@68
|
881 for ov in ov_list:
|
jpayne@68
|
882 ov.cancel()
|
jpayne@68
|
883
|
jpayne@68
|
884 # wait for all overlapped reads to stop
|
jpayne@68
|
885 for ov in ov_list:
|
jpayne@68
|
886 try:
|
jpayne@68
|
887 _, err = ov.GetOverlappedResult(True)
|
jpayne@68
|
888 except OSError as e:
|
jpayne@68
|
889 err = e.winerror
|
jpayne@68
|
890 if err not in _ready_errors:
|
jpayne@68
|
891 raise
|
jpayne@68
|
892 if err != _winapi.ERROR_OPERATION_ABORTED:
|
jpayne@68
|
893 o = waithandle_to_obj[ov.event]
|
jpayne@68
|
894 ready_objects.add(o)
|
jpayne@68
|
895 if err == 0:
|
jpayne@68
|
896 # If o.fileno() is an overlapped pipe handle then
|
jpayne@68
|
897 # a zero length message HAS been consumed.
|
jpayne@68
|
898 if hasattr(o, '_got_empty_message'):
|
jpayne@68
|
899 o._got_empty_message = True
|
jpayne@68
|
900
|
jpayne@68
|
901 ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
|
jpayne@68
|
902 return [o for o in object_list if o in ready_objects]
|
jpayne@68
|
903
|
jpayne@68
|
904 else:
|
jpayne@68
|
905
|
jpayne@68
|
906 import selectors
|
jpayne@68
|
907
|
jpayne@68
|
908 # poll/select have the advantage of not requiring any extra file
|
jpayne@68
|
909 # descriptor, contrarily to epoll/kqueue (also, they require a single
|
jpayne@68
|
910 # syscall).
|
jpayne@68
|
911 if hasattr(selectors, 'PollSelector'):
|
jpayne@68
|
912 _WaitSelector = selectors.PollSelector
|
jpayne@68
|
913 else:
|
jpayne@68
|
914 _WaitSelector = selectors.SelectSelector
|
jpayne@68
|
915
|
jpayne@68
|
916 def wait(object_list, timeout=None):
|
jpayne@68
|
917 '''
|
jpayne@68
|
918 Wait till an object in object_list is ready/readable.
|
jpayne@68
|
919
|
jpayne@68
|
920 Returns list of those objects in object_list which are ready/readable.
|
jpayne@68
|
921 '''
|
jpayne@68
|
922 with _WaitSelector() as selector:
|
jpayne@68
|
923 for obj in object_list:
|
jpayne@68
|
924 selector.register(obj, selectors.EVENT_READ)
|
jpayne@68
|
925
|
jpayne@68
|
926 if timeout is not None:
|
jpayne@68
|
927 deadline = time.monotonic() + timeout
|
jpayne@68
|
928
|
jpayne@68
|
929 while True:
|
jpayne@68
|
930 ready = selector.select(timeout)
|
jpayne@68
|
931 if ready:
|
jpayne@68
|
932 return [key.fileobj for (key, events) in ready]
|
jpayne@68
|
933 else:
|
jpayne@68
|
934 if timeout is not None:
|
jpayne@68
|
935 timeout = deadline - time.monotonic()
|
jpayne@68
|
936 if timeout < 0:
|
jpayne@68
|
937 return ready
|
jpayne@68
|
938
|
jpayne@68
|
939 #
|
jpayne@68
|
940 # Make connection and socket objects sharable if possible
|
jpayne@68
|
941 #
|
jpayne@68
|
942
|
jpayne@68
|
943 if sys.platform == 'win32':
|
jpayne@68
|
944 def reduce_connection(conn):
|
jpayne@68
|
945 handle = conn.fileno()
|
jpayne@68
|
946 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
|
jpayne@68
|
947 from . import resource_sharer
|
jpayne@68
|
948 ds = resource_sharer.DupSocket(s)
|
jpayne@68
|
949 return rebuild_connection, (ds, conn.readable, conn.writable)
|
jpayne@68
|
950 def rebuild_connection(ds, readable, writable):
|
jpayne@68
|
951 sock = ds.detach()
|
jpayne@68
|
952 return Connection(sock.detach(), readable, writable)
|
jpayne@68
|
953 reduction.register(Connection, reduce_connection)
|
jpayne@68
|
954
|
jpayne@68
|
955 def reduce_pipe_connection(conn):
|
jpayne@68
|
956 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
|
jpayne@68
|
957 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
|
jpayne@68
|
958 dh = reduction.DupHandle(conn.fileno(), access)
|
jpayne@68
|
959 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
|
jpayne@68
|
960 def rebuild_pipe_connection(dh, readable, writable):
|
jpayne@68
|
961 handle = dh.detach()
|
jpayne@68
|
962 return PipeConnection(handle, readable, writable)
|
jpayne@68
|
963 reduction.register(PipeConnection, reduce_pipe_connection)
|
jpayne@68
|
964
|
jpayne@68
|
965 else:
|
jpayne@68
|
966 def reduce_connection(conn):
|
jpayne@68
|
967 df = reduction.DupFd(conn.fileno())
|
jpayne@68
|
968 return rebuild_connection, (df, conn.readable, conn.writable)
|
jpayne@68
|
969 def rebuild_connection(df, readable, writable):
|
jpayne@68
|
970 fd = df.detach()
|
jpayne@68
|
971 return Connection(fd, readable, writable)
|
jpayne@68
|
972 reduction.register(Connection, reduce_connection)
|