Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/connection.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 # | |
2 # A higher level module for using sockets (or Windows named pipes) | |
3 # | |
4 # multiprocessing/connection.py | |
5 # | |
6 # Copyright (c) 2006-2008, R Oudkerk | |
7 # Licensed to PSF under a Contributor Agreement. | |
8 # | |
9 | |
10 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] | |
11 | |
12 import io | |
13 import os | |
14 import sys | |
15 import socket | |
16 import struct | |
17 import time | |
18 import tempfile | |
19 import itertools | |
20 | |
21 import _multiprocessing | |
22 | |
23 from . import util | |
24 | |
25 from . import AuthenticationError, BufferTooShort | |
26 from .context import reduction | |
27 _ForkingPickler = reduction.ForkingPickler | |
28 | |
29 try: | |
30 import _winapi | |
31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE | |
32 except ImportError: | |
33 if sys.platform == 'win32': | |
34 raise | |
35 _winapi = None | |
36 | |
37 # | |
38 # | |
39 # | |
40 | |
41 BUFSIZE = 8192 | |
42 # A very generous timeout when it comes to local connections... | |
43 CONNECTION_TIMEOUT = 20. | |
44 | |
45 _mmap_counter = itertools.count() | |
46 | |
47 default_family = 'AF_INET' | |
48 families = ['AF_INET'] | |
49 | |
50 if hasattr(socket, 'AF_UNIX'): | |
51 default_family = 'AF_UNIX' | |
52 families += ['AF_UNIX'] | |
53 | |
54 if sys.platform == 'win32': | |
55 default_family = 'AF_PIPE' | |
56 families += ['AF_PIPE'] | |
57 | |
58 | |
59 def _init_timeout(timeout=CONNECTION_TIMEOUT): | |
60 return time.monotonic() + timeout | |
61 | |
62 def _check_timeout(t): | |
63 return time.monotonic() > t | |
64 | |
65 # | |
66 # | |
67 # | |
68 | |
69 def arbitrary_address(family): | |
70 ''' | |
71 Return an arbitrary free address for the given family | |
72 ''' | |
73 if family == 'AF_INET': | |
74 return ('localhost', 0) | |
75 elif family == 'AF_UNIX': | |
76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) | |
77 elif family == 'AF_PIPE': | |
78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % | |
79 (os.getpid(), next(_mmap_counter)), dir="") | |
80 else: | |
81 raise ValueError('unrecognized family') | |
82 | |
83 def _validate_family(family): | |
84 ''' | |
85 Checks if the family is valid for the current environment. | |
86 ''' | |
87 if sys.platform != 'win32' and family == 'AF_PIPE': | |
88 raise ValueError('Family %s is not recognized.' % family) | |
89 | |
90 if sys.platform == 'win32' and family == 'AF_UNIX': | |
91 # double check | |
92 if not hasattr(socket, family): | |
93 raise ValueError('Family %s is not recognized.' % family) | |
94 | |
95 def address_type(address): | |
96 ''' | |
97 Return the types of the address | |
98 | |
99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' | |
100 ''' | |
101 if type(address) == tuple: | |
102 return 'AF_INET' | |
103 elif type(address) is str and address.startswith('\\\\'): | |
104 return 'AF_PIPE' | |
105 elif type(address) is str: | |
106 return 'AF_UNIX' | |
107 else: | |
108 raise ValueError('address type of %r unrecognized' % address) | |
109 | |
110 # | |
111 # Connection classes | |
112 # | |
113 | |
114 class _ConnectionBase: | |
115 _handle = None | |
116 | |
117 def __init__(self, handle, readable=True, writable=True): | |
118 handle = handle.__index__() | |
119 if handle < 0: | |
120 raise ValueError("invalid handle") | |
121 if not readable and not writable: | |
122 raise ValueError( | |
123 "at least one of `readable` and `writable` must be True") | |
124 self._handle = handle | |
125 self._readable = readable | |
126 self._writable = writable | |
127 | |
128 # XXX should we use util.Finalize instead of a __del__? | |
129 | |
130 def __del__(self): | |
131 if self._handle is not None: | |
132 self._close() | |
133 | |
134 def _check_closed(self): | |
135 if self._handle is None: | |
136 raise OSError("handle is closed") | |
137 | |
138 def _check_readable(self): | |
139 if not self._readable: | |
140 raise OSError("connection is write-only") | |
141 | |
142 def _check_writable(self): | |
143 if not self._writable: | |
144 raise OSError("connection is read-only") | |
145 | |
146 def _bad_message_length(self): | |
147 if self._writable: | |
148 self._readable = False | |
149 else: | |
150 self.close() | |
151 raise OSError("bad message length") | |
152 | |
153 @property | |
154 def closed(self): | |
155 """True if the connection is closed""" | |
156 return self._handle is None | |
157 | |
158 @property | |
159 def readable(self): | |
160 """True if the connection is readable""" | |
161 return self._readable | |
162 | |
163 @property | |
164 def writable(self): | |
165 """True if the connection is writable""" | |
166 return self._writable | |
167 | |
168 def fileno(self): | |
169 """File descriptor or handle of the connection""" | |
170 self._check_closed() | |
171 return self._handle | |
172 | |
173 def close(self): | |
174 """Close the connection""" | |
175 if self._handle is not None: | |
176 try: | |
177 self._close() | |
178 finally: | |
179 self._handle = None | |
180 | |
181 def send_bytes(self, buf, offset=0, size=None): | |
182 """Send the bytes data from a bytes-like object""" | |
183 self._check_closed() | |
184 self._check_writable() | |
185 m = memoryview(buf) | |
186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) | |
187 if m.itemsize > 1: | |
188 m = memoryview(bytes(m)) | |
189 n = len(m) | |
190 if offset < 0: | |
191 raise ValueError("offset is negative") | |
192 if n < offset: | |
193 raise ValueError("buffer length < offset") | |
194 if size is None: | |
195 size = n - offset | |
196 elif size < 0: | |
197 raise ValueError("size is negative") | |
198 elif offset + size > n: | |
199 raise ValueError("buffer length < offset + size") | |
200 self._send_bytes(m[offset:offset + size]) | |
201 | |
202 def send(self, obj): | |
203 """Send a (picklable) object""" | |
204 self._check_closed() | |
205 self._check_writable() | |
206 self._send_bytes(_ForkingPickler.dumps(obj)) | |
207 | |
208 def recv_bytes(self, maxlength=None): | |
209 """ | |
210 Receive bytes data as a bytes object. | |
211 """ | |
212 self._check_closed() | |
213 self._check_readable() | |
214 if maxlength is not None and maxlength < 0: | |
215 raise ValueError("negative maxlength") | |
216 buf = self._recv_bytes(maxlength) | |
217 if buf is None: | |
218 self._bad_message_length() | |
219 return buf.getvalue() | |
220 | |
221 def recv_bytes_into(self, buf, offset=0): | |
222 """ | |
223 Receive bytes data into a writeable bytes-like object. | |
224 Return the number of bytes read. | |
225 """ | |
226 self._check_closed() | |
227 self._check_readable() | |
228 with memoryview(buf) as m: | |
229 # Get bytesize of arbitrary buffer | |
230 itemsize = m.itemsize | |
231 bytesize = itemsize * len(m) | |
232 if offset < 0: | |
233 raise ValueError("negative offset") | |
234 elif offset > bytesize: | |
235 raise ValueError("offset too large") | |
236 result = self._recv_bytes() | |
237 size = result.tell() | |
238 if bytesize < offset + size: | |
239 raise BufferTooShort(result.getvalue()) | |
240 # Message can fit in dest | |
241 result.seek(0) | |
242 result.readinto(m[offset // itemsize : | |
243 (offset + size) // itemsize]) | |
244 return size | |
245 | |
246 def recv(self): | |
247 """Receive a (picklable) object""" | |
248 self._check_closed() | |
249 self._check_readable() | |
250 buf = self._recv_bytes() | |
251 return _ForkingPickler.loads(buf.getbuffer()) | |
252 | |
253 def poll(self, timeout=0.0): | |
254 """Whether there is any input available to be read""" | |
255 self._check_closed() | |
256 self._check_readable() | |
257 return self._poll(timeout) | |
258 | |
259 def __enter__(self): | |
260 return self | |
261 | |
262 def __exit__(self, exc_type, exc_value, exc_tb): | |
263 self.close() | |
264 | |
265 | |
266 if _winapi: | |
267 | |
268 class PipeConnection(_ConnectionBase): | |
269 """ | |
270 Connection class based on a Windows named pipe. | |
271 Overlapped I/O is used, so the handles must have been created | |
272 with FILE_FLAG_OVERLAPPED. | |
273 """ | |
274 _got_empty_message = False | |
275 | |
276 def _close(self, _CloseHandle=_winapi.CloseHandle): | |
277 _CloseHandle(self._handle) | |
278 | |
279 def _send_bytes(self, buf): | |
280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) | |
281 try: | |
282 if err == _winapi.ERROR_IO_PENDING: | |
283 waitres = _winapi.WaitForMultipleObjects( | |
284 [ov.event], False, INFINITE) | |
285 assert waitres == WAIT_OBJECT_0 | |
286 except: | |
287 ov.cancel() | |
288 raise | |
289 finally: | |
290 nwritten, err = ov.GetOverlappedResult(True) | |
291 assert err == 0 | |
292 assert nwritten == len(buf) | |
293 | |
294 def _recv_bytes(self, maxsize=None): | |
295 if self._got_empty_message: | |
296 self._got_empty_message = False | |
297 return io.BytesIO() | |
298 else: | |
299 bsize = 128 if maxsize is None else min(maxsize, 128) | |
300 try: | |
301 ov, err = _winapi.ReadFile(self._handle, bsize, | |
302 overlapped=True) | |
303 try: | |
304 if err == _winapi.ERROR_IO_PENDING: | |
305 waitres = _winapi.WaitForMultipleObjects( | |
306 [ov.event], False, INFINITE) | |
307 assert waitres == WAIT_OBJECT_0 | |
308 except: | |
309 ov.cancel() | |
310 raise | |
311 finally: | |
312 nread, err = ov.GetOverlappedResult(True) | |
313 if err == 0: | |
314 f = io.BytesIO() | |
315 f.write(ov.getbuffer()) | |
316 return f | |
317 elif err == _winapi.ERROR_MORE_DATA: | |
318 return self._get_more_data(ov, maxsize) | |
319 except OSError as e: | |
320 if e.winerror == _winapi.ERROR_BROKEN_PIPE: | |
321 raise EOFError | |
322 else: | |
323 raise | |
324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") | |
325 | |
326 def _poll(self, timeout): | |
327 if (self._got_empty_message or | |
328 _winapi.PeekNamedPipe(self._handle)[0] != 0): | |
329 return True | |
330 return bool(wait([self], timeout)) | |
331 | |
332 def _get_more_data(self, ov, maxsize): | |
333 buf = ov.getbuffer() | |
334 f = io.BytesIO() | |
335 f.write(buf) | |
336 left = _winapi.PeekNamedPipe(self._handle)[1] | |
337 assert left > 0 | |
338 if maxsize is not None and len(buf) + left > maxsize: | |
339 self._bad_message_length() | |
340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) | |
341 rbytes, err = ov.GetOverlappedResult(True) | |
342 assert err == 0 | |
343 assert rbytes == left | |
344 f.write(ov.getbuffer()) | |
345 return f | |
346 | |
347 | |
348 class Connection(_ConnectionBase): | |
349 """ | |
350 Connection class based on an arbitrary file descriptor (Unix only), or | |
351 a socket handle (Windows). | |
352 """ | |
353 | |
354 if _winapi: | |
355 def _close(self, _close=_multiprocessing.closesocket): | |
356 _close(self._handle) | |
357 _write = _multiprocessing.send | |
358 _read = _multiprocessing.recv | |
359 else: | |
360 def _close(self, _close=os.close): | |
361 _close(self._handle) | |
362 _write = os.write | |
363 _read = os.read | |
364 | |
365 def _send(self, buf, write=_write): | |
366 remaining = len(buf) | |
367 while True: | |
368 n = write(self._handle, buf) | |
369 remaining -= n | |
370 if remaining == 0: | |
371 break | |
372 buf = buf[n:] | |
373 | |
374 def _recv(self, size, read=_read): | |
375 buf = io.BytesIO() | |
376 handle = self._handle | |
377 remaining = size | |
378 while remaining > 0: | |
379 chunk = read(handle, remaining) | |
380 n = len(chunk) | |
381 if n == 0: | |
382 if remaining == size: | |
383 raise EOFError | |
384 else: | |
385 raise OSError("got end of file during message") | |
386 buf.write(chunk) | |
387 remaining -= n | |
388 return buf | |
389 | |
390 def _send_bytes(self, buf): | |
391 n = len(buf) | |
392 if n > 0x7fffffff: | |
393 pre_header = struct.pack("!i", -1) | |
394 header = struct.pack("!Q", n) | |
395 self._send(pre_header) | |
396 self._send(header) | |
397 self._send(buf) | |
398 else: | |
399 # For wire compatibility with 3.7 and lower | |
400 header = struct.pack("!i", n) | |
401 if n > 16384: | |
402 # The payload is large so Nagle's algorithm won't be triggered | |
403 # and we'd better avoid the cost of concatenation. | |
404 self._send(header) | |
405 self._send(buf) | |
406 else: | |
407 # Issue #20540: concatenate before sending, to avoid delays due | |
408 # to Nagle's algorithm on a TCP socket. | |
409 # Also note we want to avoid sending a 0-length buffer separately, | |
410 # to avoid "broken pipe" errors if the other end closed the pipe. | |
411 self._send(header + buf) | |
412 | |
413 def _recv_bytes(self, maxsize=None): | |
414 buf = self._recv(4) | |
415 size, = struct.unpack("!i", buf.getvalue()) | |
416 if size == -1: | |
417 buf = self._recv(8) | |
418 size, = struct.unpack("!Q", buf.getvalue()) | |
419 if maxsize is not None and size > maxsize: | |
420 return None | |
421 return self._recv(size) | |
422 | |
423 def _poll(self, timeout): | |
424 r = wait([self], timeout) | |
425 return bool(r) | |
426 | |
427 | |
428 # | |
429 # Public functions | |
430 # | |
431 | |
432 class Listener(object): | |
433 ''' | |
434 Returns a listener object. | |
435 | |
436 This is a wrapper for a bound socket which is 'listening' for | |
437 connections, or for a Windows named pipe. | |
438 ''' | |
439 def __init__(self, address=None, family=None, backlog=1, authkey=None): | |
440 family = family or (address and address_type(address)) \ | |
441 or default_family | |
442 address = address or arbitrary_address(family) | |
443 | |
444 _validate_family(family) | |
445 if family == 'AF_PIPE': | |
446 self._listener = PipeListener(address, backlog) | |
447 else: | |
448 self._listener = SocketListener(address, family, backlog) | |
449 | |
450 if authkey is not None and not isinstance(authkey, bytes): | |
451 raise TypeError('authkey should be a byte string') | |
452 | |
453 self._authkey = authkey | |
454 | |
455 def accept(self): | |
456 ''' | |
457 Accept a connection on the bound socket or named pipe of `self`. | |
458 | |
459 Returns a `Connection` object. | |
460 ''' | |
461 if self._listener is None: | |
462 raise OSError('listener is closed') | |
463 c = self._listener.accept() | |
464 if self._authkey: | |
465 deliver_challenge(c, self._authkey) | |
466 answer_challenge(c, self._authkey) | |
467 return c | |
468 | |
469 def close(self): | |
470 ''' | |
471 Close the bound socket or named pipe of `self`. | |
472 ''' | |
473 listener = self._listener | |
474 if listener is not None: | |
475 self._listener = None | |
476 listener.close() | |
477 | |
478 @property | |
479 def address(self): | |
480 return self._listener._address | |
481 | |
482 @property | |
483 def last_accepted(self): | |
484 return self._listener._last_accepted | |
485 | |
486 def __enter__(self): | |
487 return self | |
488 | |
489 def __exit__(self, exc_type, exc_value, exc_tb): | |
490 self.close() | |
491 | |
492 | |
493 def Client(address, family=None, authkey=None): | |
494 ''' | |
495 Returns a connection to the address of a `Listener` | |
496 ''' | |
497 family = family or address_type(address) | |
498 _validate_family(family) | |
499 if family == 'AF_PIPE': | |
500 c = PipeClient(address) | |
501 else: | |
502 c = SocketClient(address) | |
503 | |
504 if authkey is not None and not isinstance(authkey, bytes): | |
505 raise TypeError('authkey should be a byte string') | |
506 | |
507 if authkey is not None: | |
508 answer_challenge(c, authkey) | |
509 deliver_challenge(c, authkey) | |
510 | |
511 return c | |
512 | |
513 | |
514 if sys.platform != 'win32': | |
515 | |
516 def Pipe(duplex=True): | |
517 ''' | |
518 Returns pair of connection objects at either end of a pipe | |
519 ''' | |
520 if duplex: | |
521 s1, s2 = socket.socketpair() | |
522 s1.setblocking(True) | |
523 s2.setblocking(True) | |
524 c1 = Connection(s1.detach()) | |
525 c2 = Connection(s2.detach()) | |
526 else: | |
527 fd1, fd2 = os.pipe() | |
528 c1 = Connection(fd1, writable=False) | |
529 c2 = Connection(fd2, readable=False) | |
530 | |
531 return c1, c2 | |
532 | |
533 else: | |
534 | |
535 def Pipe(duplex=True): | |
536 ''' | |
537 Returns pair of connection objects at either end of a pipe | |
538 ''' | |
539 address = arbitrary_address('AF_PIPE') | |
540 if duplex: | |
541 openmode = _winapi.PIPE_ACCESS_DUPLEX | |
542 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE | |
543 obsize, ibsize = BUFSIZE, BUFSIZE | |
544 else: | |
545 openmode = _winapi.PIPE_ACCESS_INBOUND | |
546 access = _winapi.GENERIC_WRITE | |
547 obsize, ibsize = 0, BUFSIZE | |
548 | |
549 h1 = _winapi.CreateNamedPipe( | |
550 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | | |
551 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, | |
552 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | | |
553 _winapi.PIPE_WAIT, | |
554 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, | |
555 # default security descriptor: the handle cannot be inherited | |
556 _winapi.NULL | |
557 ) | |
558 h2 = _winapi.CreateFile( | |
559 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, | |
560 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL | |
561 ) | |
562 _winapi.SetNamedPipeHandleState( | |
563 h2, _winapi.PIPE_READMODE_MESSAGE, None, None | |
564 ) | |
565 | |
566 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) | |
567 _, err = overlapped.GetOverlappedResult(True) | |
568 assert err == 0 | |
569 | |
570 c1 = PipeConnection(h1, writable=duplex) | |
571 c2 = PipeConnection(h2, readable=duplex) | |
572 | |
573 return c1, c2 | |
574 | |
575 # | |
576 # Definitions for connections based on sockets | |
577 # | |
578 | |
579 class SocketListener(object): | |
580 ''' | |
581 Representation of a socket which is bound to an address and listening | |
582 ''' | |
583 def __init__(self, address, family, backlog=1): | |
584 self._socket = socket.socket(getattr(socket, family)) | |
585 try: | |
586 # SO_REUSEADDR has different semantics on Windows (issue #2550). | |
587 if os.name == 'posix': | |
588 self._socket.setsockopt(socket.SOL_SOCKET, | |
589 socket.SO_REUSEADDR, 1) | |
590 self._socket.setblocking(True) | |
591 self._socket.bind(address) | |
592 self._socket.listen(backlog) | |
593 self._address = self._socket.getsockname() | |
594 except OSError: | |
595 self._socket.close() | |
596 raise | |
597 self._family = family | |
598 self._last_accepted = None | |
599 | |
600 if family == 'AF_UNIX': | |
601 self._unlink = util.Finalize( | |
602 self, os.unlink, args=(address,), exitpriority=0 | |
603 ) | |
604 else: | |
605 self._unlink = None | |
606 | |
607 def accept(self): | |
608 s, self._last_accepted = self._socket.accept() | |
609 s.setblocking(True) | |
610 return Connection(s.detach()) | |
611 | |
612 def close(self): | |
613 try: | |
614 self._socket.close() | |
615 finally: | |
616 unlink = self._unlink | |
617 if unlink is not None: | |
618 self._unlink = None | |
619 unlink() | |
620 | |
621 | |
622 def SocketClient(address): | |
623 ''' | |
624 Return a connection object connected to the socket given by `address` | |
625 ''' | |
626 family = address_type(address) | |
627 with socket.socket( getattr(socket, family) ) as s: | |
628 s.setblocking(True) | |
629 s.connect(address) | |
630 return Connection(s.detach()) | |
631 | |
632 # | |
633 # Definitions for connections based on named pipes | |
634 # | |
635 | |
636 if sys.platform == 'win32': | |
637 | |
638 class PipeListener(object): | |
639 ''' | |
640 Representation of a named pipe | |
641 ''' | |
642 def __init__(self, address, backlog=None): | |
643 self._address = address | |
644 self._handle_queue = [self._new_handle(first=True)] | |
645 | |
646 self._last_accepted = None | |
647 util.sub_debug('listener created with address=%r', self._address) | |
648 self.close = util.Finalize( | |
649 self, PipeListener._finalize_pipe_listener, | |
650 args=(self._handle_queue, self._address), exitpriority=0 | |
651 ) | |
652 | |
653 def _new_handle(self, first=False): | |
654 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED | |
655 if first: | |
656 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE | |
657 return _winapi.CreateNamedPipe( | |
658 self._address, flags, | |
659 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | | |
660 _winapi.PIPE_WAIT, | |
661 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, | |
662 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL | |
663 ) | |
664 | |
665 def accept(self): | |
666 self._handle_queue.append(self._new_handle()) | |
667 handle = self._handle_queue.pop(0) | |
668 try: | |
669 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) | |
670 except OSError as e: | |
671 if e.winerror != _winapi.ERROR_NO_DATA: | |
672 raise | |
673 # ERROR_NO_DATA can occur if a client has already connected, | |
674 # written data and then disconnected -- see Issue 14725. | |
675 else: | |
676 try: | |
677 res = _winapi.WaitForMultipleObjects( | |
678 [ov.event], False, INFINITE) | |
679 except: | |
680 ov.cancel() | |
681 _winapi.CloseHandle(handle) | |
682 raise | |
683 finally: | |
684 _, err = ov.GetOverlappedResult(True) | |
685 assert err == 0 | |
686 return PipeConnection(handle) | |
687 | |
688 @staticmethod | |
689 def _finalize_pipe_listener(queue, address): | |
690 util.sub_debug('closing listener with address=%r', address) | |
691 for handle in queue: | |
692 _winapi.CloseHandle(handle) | |
693 | |
694 def PipeClient(address): | |
695 ''' | |
696 Return a connection object connected to the pipe given by `address` | |
697 ''' | |
698 t = _init_timeout() | |
699 while 1: | |
700 try: | |
701 _winapi.WaitNamedPipe(address, 1000) | |
702 h = _winapi.CreateFile( | |
703 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, | |
704 0, _winapi.NULL, _winapi.OPEN_EXISTING, | |
705 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL | |
706 ) | |
707 except OSError as e: | |
708 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, | |
709 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): | |
710 raise | |
711 else: | |
712 break | |
713 else: | |
714 raise | |
715 | |
716 _winapi.SetNamedPipeHandleState( | |
717 h, _winapi.PIPE_READMODE_MESSAGE, None, None | |
718 ) | |
719 return PipeConnection(h) | |
720 | |
721 # | |
722 # Authentication stuff | |
723 # | |
724 | |
725 MESSAGE_LENGTH = 20 | |
726 | |
727 CHALLENGE = b'#CHALLENGE#' | |
728 WELCOME = b'#WELCOME#' | |
729 FAILURE = b'#FAILURE#' | |
730 | |
731 def deliver_challenge(connection, authkey): | |
732 import hmac | |
733 if not isinstance(authkey, bytes): | |
734 raise ValueError( | |
735 "Authkey must be bytes, not {0!s}".format(type(authkey))) | |
736 message = os.urandom(MESSAGE_LENGTH) | |
737 connection.send_bytes(CHALLENGE + message) | |
738 digest = hmac.new(authkey, message, 'md5').digest() | |
739 response = connection.recv_bytes(256) # reject large message | |
740 if response == digest: | |
741 connection.send_bytes(WELCOME) | |
742 else: | |
743 connection.send_bytes(FAILURE) | |
744 raise AuthenticationError('digest received was wrong') | |
745 | |
746 def answer_challenge(connection, authkey): | |
747 import hmac | |
748 if not isinstance(authkey, bytes): | |
749 raise ValueError( | |
750 "Authkey must be bytes, not {0!s}".format(type(authkey))) | |
751 message = connection.recv_bytes(256) # reject large message | |
752 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message | |
753 message = message[len(CHALLENGE):] | |
754 digest = hmac.new(authkey, message, 'md5').digest() | |
755 connection.send_bytes(digest) | |
756 response = connection.recv_bytes(256) # reject large message | |
757 if response != WELCOME: | |
758 raise AuthenticationError('digest sent was rejected') | |
759 | |
760 # | |
761 # Support for using xmlrpclib for serialization | |
762 # | |
763 | |
764 class ConnectionWrapper(object): | |
765 def __init__(self, conn, dumps, loads): | |
766 self._conn = conn | |
767 self._dumps = dumps | |
768 self._loads = loads | |
769 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): | |
770 obj = getattr(conn, attr) | |
771 setattr(self, attr, obj) | |
772 def send(self, obj): | |
773 s = self._dumps(obj) | |
774 self._conn.send_bytes(s) | |
775 def recv(self): | |
776 s = self._conn.recv_bytes() | |
777 return self._loads(s) | |
778 | |
779 def _xml_dumps(obj): | |
780 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') | |
781 | |
782 def _xml_loads(s): | |
783 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) | |
784 return obj | |
785 | |
786 class XmlListener(Listener): | |
787 def accept(self): | |
788 global xmlrpclib | |
789 import xmlrpc.client as xmlrpclib | |
790 obj = Listener.accept(self) | |
791 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) | |
792 | |
793 def XmlClient(*args, **kwds): | |
794 global xmlrpclib | |
795 import xmlrpc.client as xmlrpclib | |
796 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) | |
797 | |
798 # | |
799 # Wait | |
800 # | |
801 | |
802 if sys.platform == 'win32': | |
803 | |
804 def _exhaustive_wait(handles, timeout): | |
805 # Return ALL handles which are currently signalled. (Only | |
806 # returning the first signalled might create starvation issues.) | |
807 L = list(handles) | |
808 ready = [] | |
809 while L: | |
810 res = _winapi.WaitForMultipleObjects(L, False, timeout) | |
811 if res == WAIT_TIMEOUT: | |
812 break | |
813 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): | |
814 res -= WAIT_OBJECT_0 | |
815 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): | |
816 res -= WAIT_ABANDONED_0 | |
817 else: | |
818 raise RuntimeError('Should not get here') | |
819 ready.append(L[res]) | |
820 L = L[res+1:] | |
821 timeout = 0 | |
822 return ready | |
823 | |
824 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} | |
825 | |
826 def wait(object_list, timeout=None): | |
827 ''' | |
828 Wait till an object in object_list is ready/readable. | |
829 | |
830 Returns list of those objects in object_list which are ready/readable. | |
831 ''' | |
832 if timeout is None: | |
833 timeout = INFINITE | |
834 elif timeout < 0: | |
835 timeout = 0 | |
836 else: | |
837 timeout = int(timeout * 1000 + 0.5) | |
838 | |
839 object_list = list(object_list) | |
840 waithandle_to_obj = {} | |
841 ov_list = [] | |
842 ready_objects = set() | |
843 ready_handles = set() | |
844 | |
845 try: | |
846 for o in object_list: | |
847 try: | |
848 fileno = getattr(o, 'fileno') | |
849 except AttributeError: | |
850 waithandle_to_obj[o.__index__()] = o | |
851 else: | |
852 # start an overlapped read of length zero | |
853 try: | |
854 ov, err = _winapi.ReadFile(fileno(), 0, True) | |
855 except OSError as e: | |
856 ov, err = None, e.winerror | |
857 if err not in _ready_errors: | |
858 raise | |
859 if err == _winapi.ERROR_IO_PENDING: | |
860 ov_list.append(ov) | |
861 waithandle_to_obj[ov.event] = o | |
862 else: | |
863 # If o.fileno() is an overlapped pipe handle and | |
864 # err == 0 then there is a zero length message | |
865 # in the pipe, but it HAS NOT been consumed... | |
866 if ov and sys.getwindowsversion()[:2] >= (6, 2): | |
867 # ... except on Windows 8 and later, where | |
868 # the message HAS been consumed. | |
869 try: | |
870 _, err = ov.GetOverlappedResult(False) | |
871 except OSError as e: | |
872 err = e.winerror | |
873 if not err and hasattr(o, '_got_empty_message'): | |
874 o._got_empty_message = True | |
875 ready_objects.add(o) | |
876 timeout = 0 | |
877 | |
878 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) | |
879 finally: | |
880 # request that overlapped reads stop | |
881 for ov in ov_list: | |
882 ov.cancel() | |
883 | |
884 # wait for all overlapped reads to stop | |
885 for ov in ov_list: | |
886 try: | |
887 _, err = ov.GetOverlappedResult(True) | |
888 except OSError as e: | |
889 err = e.winerror | |
890 if err not in _ready_errors: | |
891 raise | |
892 if err != _winapi.ERROR_OPERATION_ABORTED: | |
893 o = waithandle_to_obj[ov.event] | |
894 ready_objects.add(o) | |
895 if err == 0: | |
896 # If o.fileno() is an overlapped pipe handle then | |
897 # a zero length message HAS been consumed. | |
898 if hasattr(o, '_got_empty_message'): | |
899 o._got_empty_message = True | |
900 | |
901 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) | |
902 return [o for o in object_list if o in ready_objects] | |
903 | |
904 else: | |
905 | |
906 import selectors | |
907 | |
908 # poll/select have the advantage of not requiring any extra file | |
909 # descriptor, contrarily to epoll/kqueue (also, they require a single | |
910 # syscall). | |
911 if hasattr(selectors, 'PollSelector'): | |
912 _WaitSelector = selectors.PollSelector | |
913 else: | |
914 _WaitSelector = selectors.SelectSelector | |
915 | |
916 def wait(object_list, timeout=None): | |
917 ''' | |
918 Wait till an object in object_list is ready/readable. | |
919 | |
920 Returns list of those objects in object_list which are ready/readable. | |
921 ''' | |
922 with _WaitSelector() as selector: | |
923 for obj in object_list: | |
924 selector.register(obj, selectors.EVENT_READ) | |
925 | |
926 if timeout is not None: | |
927 deadline = time.monotonic() + timeout | |
928 | |
929 while True: | |
930 ready = selector.select(timeout) | |
931 if ready: | |
932 return [key.fileobj for (key, events) in ready] | |
933 else: | |
934 if timeout is not None: | |
935 timeout = deadline - time.monotonic() | |
936 if timeout < 0: | |
937 return ready | |
938 | |
939 # | |
940 # Make connection and socket objects sharable if possible | |
941 # | |
942 | |
943 if sys.platform == 'win32': | |
944 def reduce_connection(conn): | |
945 handle = conn.fileno() | |
946 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: | |
947 from . import resource_sharer | |
948 ds = resource_sharer.DupSocket(s) | |
949 return rebuild_connection, (ds, conn.readable, conn.writable) | |
950 def rebuild_connection(ds, readable, writable): | |
951 sock = ds.detach() | |
952 return Connection(sock.detach(), readable, writable) | |
953 reduction.register(Connection, reduce_connection) | |
954 | |
955 def reduce_pipe_connection(conn): | |
956 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | | |
957 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) | |
958 dh = reduction.DupHandle(conn.fileno(), access) | |
959 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) | |
960 def rebuild_pipe_connection(dh, readable, writable): | |
961 handle = dh.detach() | |
962 return PipeConnection(handle, readable, writable) | |
963 reduction.register(PipeConnection, reduce_pipe_connection) | |
964 | |
965 else: | |
966 def reduce_connection(conn): | |
967 df = reduction.DupFd(conn.fileno()) | |
968 return rebuild_connection, (df, conn.readable, conn.writable) | |
969 def rebuild_connection(df, readable, writable): | |
970 fd = df.detach() | |
971 return Connection(fd, readable, writable) | |
972 reduction.register(Connection, reduce_connection) |