annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/connection.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 #
jpayne@68 2 # A higher level module for using sockets (or Windows named pipes)
jpayne@68 3 #
jpayne@68 4 # multiprocessing/connection.py
jpayne@68 5 #
jpayne@68 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@68 7 # Licensed to PSF under a Contributor Agreement.
jpayne@68 8 #
jpayne@68 9
jpayne@68 10 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
jpayne@68 11
jpayne@68 12 import io
jpayne@68 13 import os
jpayne@68 14 import sys
jpayne@68 15 import socket
jpayne@68 16 import struct
jpayne@68 17 import time
jpayne@68 18 import tempfile
jpayne@68 19 import itertools
jpayne@68 20
jpayne@68 21 import _multiprocessing
jpayne@68 22
jpayne@68 23 from . import util
jpayne@68 24
jpayne@68 25 from . import AuthenticationError, BufferTooShort
jpayne@68 26 from .context import reduction
jpayne@68 27 _ForkingPickler = reduction.ForkingPickler
jpayne@68 28
jpayne@68 29 try:
jpayne@68 30 import _winapi
jpayne@68 31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
jpayne@68 32 except ImportError:
jpayne@68 33 if sys.platform == 'win32':
jpayne@68 34 raise
jpayne@68 35 _winapi = None
jpayne@68 36
jpayne@68 37 #
jpayne@68 38 #
jpayne@68 39 #
jpayne@68 40
jpayne@68 41 BUFSIZE = 8192
jpayne@68 42 # A very generous timeout when it comes to local connections...
jpayne@68 43 CONNECTION_TIMEOUT = 20.
jpayne@68 44
jpayne@68 45 _mmap_counter = itertools.count()
jpayne@68 46
jpayne@68 47 default_family = 'AF_INET'
jpayne@68 48 families = ['AF_INET']
jpayne@68 49
jpayne@68 50 if hasattr(socket, 'AF_UNIX'):
jpayne@68 51 default_family = 'AF_UNIX'
jpayne@68 52 families += ['AF_UNIX']
jpayne@68 53
jpayne@68 54 if sys.platform == 'win32':
jpayne@68 55 default_family = 'AF_PIPE'
jpayne@68 56 families += ['AF_PIPE']
jpayne@68 57
jpayne@68 58
jpayne@68 59 def _init_timeout(timeout=CONNECTION_TIMEOUT):
jpayne@68 60 return time.monotonic() + timeout
jpayne@68 61
jpayne@68 62 def _check_timeout(t):
jpayne@68 63 return time.monotonic() > t
jpayne@68 64
jpayne@68 65 #
jpayne@68 66 #
jpayne@68 67 #
jpayne@68 68
jpayne@68 69 def arbitrary_address(family):
jpayne@68 70 '''
jpayne@68 71 Return an arbitrary free address for the given family
jpayne@68 72 '''
jpayne@68 73 if family == 'AF_INET':
jpayne@68 74 return ('localhost', 0)
jpayne@68 75 elif family == 'AF_UNIX':
jpayne@68 76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
jpayne@68 77 elif family == 'AF_PIPE':
jpayne@68 78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
jpayne@68 79 (os.getpid(), next(_mmap_counter)), dir="")
jpayne@68 80 else:
jpayne@68 81 raise ValueError('unrecognized family')
jpayne@68 82
jpayne@68 83 def _validate_family(family):
jpayne@68 84 '''
jpayne@68 85 Checks if the family is valid for the current environment.
jpayne@68 86 '''
jpayne@68 87 if sys.platform != 'win32' and family == 'AF_PIPE':
jpayne@68 88 raise ValueError('Family %s is not recognized.' % family)
jpayne@68 89
jpayne@68 90 if sys.platform == 'win32' and family == 'AF_UNIX':
jpayne@68 91 # double check
jpayne@68 92 if not hasattr(socket, family):
jpayne@68 93 raise ValueError('Family %s is not recognized.' % family)
jpayne@68 94
jpayne@68 95 def address_type(address):
jpayne@68 96 '''
jpayne@68 97 Return the types of the address
jpayne@68 98
jpayne@68 99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
jpayne@68 100 '''
jpayne@68 101 if type(address) == tuple:
jpayne@68 102 return 'AF_INET'
jpayne@68 103 elif type(address) is str and address.startswith('\\\\'):
jpayne@68 104 return 'AF_PIPE'
jpayne@68 105 elif type(address) is str:
jpayne@68 106 return 'AF_UNIX'
jpayne@68 107 else:
jpayne@68 108 raise ValueError('address type of %r unrecognized' % address)
jpayne@68 109
jpayne@68 110 #
jpayne@68 111 # Connection classes
jpayne@68 112 #
jpayne@68 113
jpayne@68 114 class _ConnectionBase:
jpayne@68 115 _handle = None
jpayne@68 116
jpayne@68 117 def __init__(self, handle, readable=True, writable=True):
jpayne@68 118 handle = handle.__index__()
jpayne@68 119 if handle < 0:
jpayne@68 120 raise ValueError("invalid handle")
jpayne@68 121 if not readable and not writable:
jpayne@68 122 raise ValueError(
jpayne@68 123 "at least one of `readable` and `writable` must be True")
jpayne@68 124 self._handle = handle
jpayne@68 125 self._readable = readable
jpayne@68 126 self._writable = writable
jpayne@68 127
jpayne@68 128 # XXX should we use util.Finalize instead of a __del__?
jpayne@68 129
jpayne@68 130 def __del__(self):
jpayne@68 131 if self._handle is not None:
jpayne@68 132 self._close()
jpayne@68 133
jpayne@68 134 def _check_closed(self):
jpayne@68 135 if self._handle is None:
jpayne@68 136 raise OSError("handle is closed")
jpayne@68 137
jpayne@68 138 def _check_readable(self):
jpayne@68 139 if not self._readable:
jpayne@68 140 raise OSError("connection is write-only")
jpayne@68 141
jpayne@68 142 def _check_writable(self):
jpayne@68 143 if not self._writable:
jpayne@68 144 raise OSError("connection is read-only")
jpayne@68 145
jpayne@68 146 def _bad_message_length(self):
jpayne@68 147 if self._writable:
jpayne@68 148 self._readable = False
jpayne@68 149 else:
jpayne@68 150 self.close()
jpayne@68 151 raise OSError("bad message length")
jpayne@68 152
jpayne@68 153 @property
jpayne@68 154 def closed(self):
jpayne@68 155 """True if the connection is closed"""
jpayne@68 156 return self._handle is None
jpayne@68 157
jpayne@68 158 @property
jpayne@68 159 def readable(self):
jpayne@68 160 """True if the connection is readable"""
jpayne@68 161 return self._readable
jpayne@68 162
jpayne@68 163 @property
jpayne@68 164 def writable(self):
jpayne@68 165 """True if the connection is writable"""
jpayne@68 166 return self._writable
jpayne@68 167
jpayne@68 168 def fileno(self):
jpayne@68 169 """File descriptor or handle of the connection"""
jpayne@68 170 self._check_closed()
jpayne@68 171 return self._handle
jpayne@68 172
jpayne@68 173 def close(self):
jpayne@68 174 """Close the connection"""
jpayne@68 175 if self._handle is not None:
jpayne@68 176 try:
jpayne@68 177 self._close()
jpayne@68 178 finally:
jpayne@68 179 self._handle = None
jpayne@68 180
jpayne@68 181 def send_bytes(self, buf, offset=0, size=None):
jpayne@68 182 """Send the bytes data from a bytes-like object"""
jpayne@68 183 self._check_closed()
jpayne@68 184 self._check_writable()
jpayne@68 185 m = memoryview(buf)
jpayne@68 186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
jpayne@68 187 if m.itemsize > 1:
jpayne@68 188 m = memoryview(bytes(m))
jpayne@68 189 n = len(m)
jpayne@68 190 if offset < 0:
jpayne@68 191 raise ValueError("offset is negative")
jpayne@68 192 if n < offset:
jpayne@68 193 raise ValueError("buffer length < offset")
jpayne@68 194 if size is None:
jpayne@68 195 size = n - offset
jpayne@68 196 elif size < 0:
jpayne@68 197 raise ValueError("size is negative")
jpayne@68 198 elif offset + size > n:
jpayne@68 199 raise ValueError("buffer length < offset + size")
jpayne@68 200 self._send_bytes(m[offset:offset + size])
jpayne@68 201
jpayne@68 202 def send(self, obj):
jpayne@68 203 """Send a (picklable) object"""
jpayne@68 204 self._check_closed()
jpayne@68 205 self._check_writable()
jpayne@68 206 self._send_bytes(_ForkingPickler.dumps(obj))
jpayne@68 207
jpayne@68 208 def recv_bytes(self, maxlength=None):
jpayne@68 209 """
jpayne@68 210 Receive bytes data as a bytes object.
jpayne@68 211 """
jpayne@68 212 self._check_closed()
jpayne@68 213 self._check_readable()
jpayne@68 214 if maxlength is not None and maxlength < 0:
jpayne@68 215 raise ValueError("negative maxlength")
jpayne@68 216 buf = self._recv_bytes(maxlength)
jpayne@68 217 if buf is None:
jpayne@68 218 self._bad_message_length()
jpayne@68 219 return buf.getvalue()
jpayne@68 220
jpayne@68 221 def recv_bytes_into(self, buf, offset=0):
jpayne@68 222 """
jpayne@68 223 Receive bytes data into a writeable bytes-like object.
jpayne@68 224 Return the number of bytes read.
jpayne@68 225 """
jpayne@68 226 self._check_closed()
jpayne@68 227 self._check_readable()
jpayne@68 228 with memoryview(buf) as m:
jpayne@68 229 # Get bytesize of arbitrary buffer
jpayne@68 230 itemsize = m.itemsize
jpayne@68 231 bytesize = itemsize * len(m)
jpayne@68 232 if offset < 0:
jpayne@68 233 raise ValueError("negative offset")
jpayne@68 234 elif offset > bytesize:
jpayne@68 235 raise ValueError("offset too large")
jpayne@68 236 result = self._recv_bytes()
jpayne@68 237 size = result.tell()
jpayne@68 238 if bytesize < offset + size:
jpayne@68 239 raise BufferTooShort(result.getvalue())
jpayne@68 240 # Message can fit in dest
jpayne@68 241 result.seek(0)
jpayne@68 242 result.readinto(m[offset // itemsize :
jpayne@68 243 (offset + size) // itemsize])
jpayne@68 244 return size
jpayne@68 245
jpayne@68 246 def recv(self):
jpayne@68 247 """Receive a (picklable) object"""
jpayne@68 248 self._check_closed()
jpayne@68 249 self._check_readable()
jpayne@68 250 buf = self._recv_bytes()
jpayne@68 251 return _ForkingPickler.loads(buf.getbuffer())
jpayne@68 252
jpayne@68 253 def poll(self, timeout=0.0):
jpayne@68 254 """Whether there is any input available to be read"""
jpayne@68 255 self._check_closed()
jpayne@68 256 self._check_readable()
jpayne@68 257 return self._poll(timeout)
jpayne@68 258
jpayne@68 259 def __enter__(self):
jpayne@68 260 return self
jpayne@68 261
jpayne@68 262 def __exit__(self, exc_type, exc_value, exc_tb):
jpayne@68 263 self.close()
jpayne@68 264
jpayne@68 265
jpayne@68 266 if _winapi:
jpayne@68 267
jpayne@68 268 class PipeConnection(_ConnectionBase):
jpayne@68 269 """
jpayne@68 270 Connection class based on a Windows named pipe.
jpayne@68 271 Overlapped I/O is used, so the handles must have been created
jpayne@68 272 with FILE_FLAG_OVERLAPPED.
jpayne@68 273 """
jpayne@68 274 _got_empty_message = False
jpayne@68 275
jpayne@68 276 def _close(self, _CloseHandle=_winapi.CloseHandle):
jpayne@68 277 _CloseHandle(self._handle)
jpayne@68 278
jpayne@68 279 def _send_bytes(self, buf):
jpayne@68 280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
jpayne@68 281 try:
jpayne@68 282 if err == _winapi.ERROR_IO_PENDING:
jpayne@68 283 waitres = _winapi.WaitForMultipleObjects(
jpayne@68 284 [ov.event], False, INFINITE)
jpayne@68 285 assert waitres == WAIT_OBJECT_0
jpayne@68 286 except:
jpayne@68 287 ov.cancel()
jpayne@68 288 raise
jpayne@68 289 finally:
jpayne@68 290 nwritten, err = ov.GetOverlappedResult(True)
jpayne@68 291 assert err == 0
jpayne@68 292 assert nwritten == len(buf)
jpayne@68 293
jpayne@68 294 def _recv_bytes(self, maxsize=None):
jpayne@68 295 if self._got_empty_message:
jpayne@68 296 self._got_empty_message = False
jpayne@68 297 return io.BytesIO()
jpayne@68 298 else:
jpayne@68 299 bsize = 128 if maxsize is None else min(maxsize, 128)
jpayne@68 300 try:
jpayne@68 301 ov, err = _winapi.ReadFile(self._handle, bsize,
jpayne@68 302 overlapped=True)
jpayne@68 303 try:
jpayne@68 304 if err == _winapi.ERROR_IO_PENDING:
jpayne@68 305 waitres = _winapi.WaitForMultipleObjects(
jpayne@68 306 [ov.event], False, INFINITE)
jpayne@68 307 assert waitres == WAIT_OBJECT_0
jpayne@68 308 except:
jpayne@68 309 ov.cancel()
jpayne@68 310 raise
jpayne@68 311 finally:
jpayne@68 312 nread, err = ov.GetOverlappedResult(True)
jpayne@68 313 if err == 0:
jpayne@68 314 f = io.BytesIO()
jpayne@68 315 f.write(ov.getbuffer())
jpayne@68 316 return f
jpayne@68 317 elif err == _winapi.ERROR_MORE_DATA:
jpayne@68 318 return self._get_more_data(ov, maxsize)
jpayne@68 319 except OSError as e:
jpayne@68 320 if e.winerror == _winapi.ERROR_BROKEN_PIPE:
jpayne@68 321 raise EOFError
jpayne@68 322 else:
jpayne@68 323 raise
jpayne@68 324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
jpayne@68 325
jpayne@68 326 def _poll(self, timeout):
jpayne@68 327 if (self._got_empty_message or
jpayne@68 328 _winapi.PeekNamedPipe(self._handle)[0] != 0):
jpayne@68 329 return True
jpayne@68 330 return bool(wait([self], timeout))
jpayne@68 331
jpayne@68 332 def _get_more_data(self, ov, maxsize):
jpayne@68 333 buf = ov.getbuffer()
jpayne@68 334 f = io.BytesIO()
jpayne@68 335 f.write(buf)
jpayne@68 336 left = _winapi.PeekNamedPipe(self._handle)[1]
jpayne@68 337 assert left > 0
jpayne@68 338 if maxsize is not None and len(buf) + left > maxsize:
jpayne@68 339 self._bad_message_length()
jpayne@68 340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
jpayne@68 341 rbytes, err = ov.GetOverlappedResult(True)
jpayne@68 342 assert err == 0
jpayne@68 343 assert rbytes == left
jpayne@68 344 f.write(ov.getbuffer())
jpayne@68 345 return f
jpayne@68 346
jpayne@68 347
jpayne@68 348 class Connection(_ConnectionBase):
jpayne@68 349 """
jpayne@68 350 Connection class based on an arbitrary file descriptor (Unix only), or
jpayne@68 351 a socket handle (Windows).
jpayne@68 352 """
jpayne@68 353
jpayne@68 354 if _winapi:
jpayne@68 355 def _close(self, _close=_multiprocessing.closesocket):
jpayne@68 356 _close(self._handle)
jpayne@68 357 _write = _multiprocessing.send
jpayne@68 358 _read = _multiprocessing.recv
jpayne@68 359 else:
jpayne@68 360 def _close(self, _close=os.close):
jpayne@68 361 _close(self._handle)
jpayne@68 362 _write = os.write
jpayne@68 363 _read = os.read
jpayne@68 364
jpayne@68 365 def _send(self, buf, write=_write):
jpayne@68 366 remaining = len(buf)
jpayne@68 367 while True:
jpayne@68 368 n = write(self._handle, buf)
jpayne@68 369 remaining -= n
jpayne@68 370 if remaining == 0:
jpayne@68 371 break
jpayne@68 372 buf = buf[n:]
jpayne@68 373
jpayne@68 374 def _recv(self, size, read=_read):
jpayne@68 375 buf = io.BytesIO()
jpayne@68 376 handle = self._handle
jpayne@68 377 remaining = size
jpayne@68 378 while remaining > 0:
jpayne@68 379 chunk = read(handle, remaining)
jpayne@68 380 n = len(chunk)
jpayne@68 381 if n == 0:
jpayne@68 382 if remaining == size:
jpayne@68 383 raise EOFError
jpayne@68 384 else:
jpayne@68 385 raise OSError("got end of file during message")
jpayne@68 386 buf.write(chunk)
jpayne@68 387 remaining -= n
jpayne@68 388 return buf
jpayne@68 389
jpayne@68 390 def _send_bytes(self, buf):
jpayne@68 391 n = len(buf)
jpayne@68 392 if n > 0x7fffffff:
jpayne@68 393 pre_header = struct.pack("!i", -1)
jpayne@68 394 header = struct.pack("!Q", n)
jpayne@68 395 self._send(pre_header)
jpayne@68 396 self._send(header)
jpayne@68 397 self._send(buf)
jpayne@68 398 else:
jpayne@68 399 # For wire compatibility with 3.7 and lower
jpayne@68 400 header = struct.pack("!i", n)
jpayne@68 401 if n > 16384:
jpayne@68 402 # The payload is large so Nagle's algorithm won't be triggered
jpayne@68 403 # and we'd better avoid the cost of concatenation.
jpayne@68 404 self._send(header)
jpayne@68 405 self._send(buf)
jpayne@68 406 else:
jpayne@68 407 # Issue #20540: concatenate before sending, to avoid delays due
jpayne@68 408 # to Nagle's algorithm on a TCP socket.
jpayne@68 409 # Also note we want to avoid sending a 0-length buffer separately,
jpayne@68 410 # to avoid "broken pipe" errors if the other end closed the pipe.
jpayne@68 411 self._send(header + buf)
jpayne@68 412
jpayne@68 413 def _recv_bytes(self, maxsize=None):
jpayne@68 414 buf = self._recv(4)
jpayne@68 415 size, = struct.unpack("!i", buf.getvalue())
jpayne@68 416 if size == -1:
jpayne@68 417 buf = self._recv(8)
jpayne@68 418 size, = struct.unpack("!Q", buf.getvalue())
jpayne@68 419 if maxsize is not None and size > maxsize:
jpayne@68 420 return None
jpayne@68 421 return self._recv(size)
jpayne@68 422
jpayne@68 423 def _poll(self, timeout):
jpayne@68 424 r = wait([self], timeout)
jpayne@68 425 return bool(r)
jpayne@68 426
jpayne@68 427
jpayne@68 428 #
jpayne@68 429 # Public functions
jpayne@68 430 #
jpayne@68 431
jpayne@68 432 class Listener(object):
jpayne@68 433 '''
jpayne@68 434 Returns a listener object.
jpayne@68 435
jpayne@68 436 This is a wrapper for a bound socket which is 'listening' for
jpayne@68 437 connections, or for a Windows named pipe.
jpayne@68 438 '''
jpayne@68 439 def __init__(self, address=None, family=None, backlog=1, authkey=None):
jpayne@68 440 family = family or (address and address_type(address)) \
jpayne@68 441 or default_family
jpayne@68 442 address = address or arbitrary_address(family)
jpayne@68 443
jpayne@68 444 _validate_family(family)
jpayne@68 445 if family == 'AF_PIPE':
jpayne@68 446 self._listener = PipeListener(address, backlog)
jpayne@68 447 else:
jpayne@68 448 self._listener = SocketListener(address, family, backlog)
jpayne@68 449
jpayne@68 450 if authkey is not None and not isinstance(authkey, bytes):
jpayne@68 451 raise TypeError('authkey should be a byte string')
jpayne@68 452
jpayne@68 453 self._authkey = authkey
jpayne@68 454
jpayne@68 455 def accept(self):
jpayne@68 456 '''
jpayne@68 457 Accept a connection on the bound socket or named pipe of `self`.
jpayne@68 458
jpayne@68 459 Returns a `Connection` object.
jpayne@68 460 '''
jpayne@68 461 if self._listener is None:
jpayne@68 462 raise OSError('listener is closed')
jpayne@68 463 c = self._listener.accept()
jpayne@68 464 if self._authkey:
jpayne@68 465 deliver_challenge(c, self._authkey)
jpayne@68 466 answer_challenge(c, self._authkey)
jpayne@68 467 return c
jpayne@68 468
jpayne@68 469 def close(self):
jpayne@68 470 '''
jpayne@68 471 Close the bound socket or named pipe of `self`.
jpayne@68 472 '''
jpayne@68 473 listener = self._listener
jpayne@68 474 if listener is not None:
jpayne@68 475 self._listener = None
jpayne@68 476 listener.close()
jpayne@68 477
jpayne@68 478 @property
jpayne@68 479 def address(self):
jpayne@68 480 return self._listener._address
jpayne@68 481
jpayne@68 482 @property
jpayne@68 483 def last_accepted(self):
jpayne@68 484 return self._listener._last_accepted
jpayne@68 485
jpayne@68 486 def __enter__(self):
jpayne@68 487 return self
jpayne@68 488
jpayne@68 489 def __exit__(self, exc_type, exc_value, exc_tb):
jpayne@68 490 self.close()
jpayne@68 491
jpayne@68 492
jpayne@68 493 def Client(address, family=None, authkey=None):
jpayne@68 494 '''
jpayne@68 495 Returns a connection to the address of a `Listener`
jpayne@68 496 '''
jpayne@68 497 family = family or address_type(address)
jpayne@68 498 _validate_family(family)
jpayne@68 499 if family == 'AF_PIPE':
jpayne@68 500 c = PipeClient(address)
jpayne@68 501 else:
jpayne@68 502 c = SocketClient(address)
jpayne@68 503
jpayne@68 504 if authkey is not None and not isinstance(authkey, bytes):
jpayne@68 505 raise TypeError('authkey should be a byte string')
jpayne@68 506
jpayne@68 507 if authkey is not None:
jpayne@68 508 answer_challenge(c, authkey)
jpayne@68 509 deliver_challenge(c, authkey)
jpayne@68 510
jpayne@68 511 return c
jpayne@68 512
jpayne@68 513
jpayne@68 514 if sys.platform != 'win32':
jpayne@68 515
jpayne@68 516 def Pipe(duplex=True):
jpayne@68 517 '''
jpayne@68 518 Returns pair of connection objects at either end of a pipe
jpayne@68 519 '''
jpayne@68 520 if duplex:
jpayne@68 521 s1, s2 = socket.socketpair()
jpayne@68 522 s1.setblocking(True)
jpayne@68 523 s2.setblocking(True)
jpayne@68 524 c1 = Connection(s1.detach())
jpayne@68 525 c2 = Connection(s2.detach())
jpayne@68 526 else:
jpayne@68 527 fd1, fd2 = os.pipe()
jpayne@68 528 c1 = Connection(fd1, writable=False)
jpayne@68 529 c2 = Connection(fd2, readable=False)
jpayne@68 530
jpayne@68 531 return c1, c2
jpayne@68 532
jpayne@68 533 else:
jpayne@68 534
jpayne@68 535 def Pipe(duplex=True):
jpayne@68 536 '''
jpayne@68 537 Returns pair of connection objects at either end of a pipe
jpayne@68 538 '''
jpayne@68 539 address = arbitrary_address('AF_PIPE')
jpayne@68 540 if duplex:
jpayne@68 541 openmode = _winapi.PIPE_ACCESS_DUPLEX
jpayne@68 542 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
jpayne@68 543 obsize, ibsize = BUFSIZE, BUFSIZE
jpayne@68 544 else:
jpayne@68 545 openmode = _winapi.PIPE_ACCESS_INBOUND
jpayne@68 546 access = _winapi.GENERIC_WRITE
jpayne@68 547 obsize, ibsize = 0, BUFSIZE
jpayne@68 548
jpayne@68 549 h1 = _winapi.CreateNamedPipe(
jpayne@68 550 address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
jpayne@68 551 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
jpayne@68 552 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
jpayne@68 553 _winapi.PIPE_WAIT,
jpayne@68 554 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
jpayne@68 555 # default security descriptor: the handle cannot be inherited
jpayne@68 556 _winapi.NULL
jpayne@68 557 )
jpayne@68 558 h2 = _winapi.CreateFile(
jpayne@68 559 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
jpayne@68 560 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
jpayne@68 561 )
jpayne@68 562 _winapi.SetNamedPipeHandleState(
jpayne@68 563 h2, _winapi.PIPE_READMODE_MESSAGE, None, None
jpayne@68 564 )
jpayne@68 565
jpayne@68 566 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
jpayne@68 567 _, err = overlapped.GetOverlappedResult(True)
jpayne@68 568 assert err == 0
jpayne@68 569
jpayne@68 570 c1 = PipeConnection(h1, writable=duplex)
jpayne@68 571 c2 = PipeConnection(h2, readable=duplex)
jpayne@68 572
jpayne@68 573 return c1, c2
jpayne@68 574
jpayne@68 575 #
jpayne@68 576 # Definitions for connections based on sockets
jpayne@68 577 #
jpayne@68 578
jpayne@68 579 class SocketListener(object):
jpayne@68 580 '''
jpayne@68 581 Representation of a socket which is bound to an address and listening
jpayne@68 582 '''
jpayne@68 583 def __init__(self, address, family, backlog=1):
jpayne@68 584 self._socket = socket.socket(getattr(socket, family))
jpayne@68 585 try:
jpayne@68 586 # SO_REUSEADDR has different semantics on Windows (issue #2550).
jpayne@68 587 if os.name == 'posix':
jpayne@68 588 self._socket.setsockopt(socket.SOL_SOCKET,
jpayne@68 589 socket.SO_REUSEADDR, 1)
jpayne@68 590 self._socket.setblocking(True)
jpayne@68 591 self._socket.bind(address)
jpayne@68 592 self._socket.listen(backlog)
jpayne@68 593 self._address = self._socket.getsockname()
jpayne@68 594 except OSError:
jpayne@68 595 self._socket.close()
jpayne@68 596 raise
jpayne@68 597 self._family = family
jpayne@68 598 self._last_accepted = None
jpayne@68 599
jpayne@68 600 if family == 'AF_UNIX':
jpayne@68 601 self._unlink = util.Finalize(
jpayne@68 602 self, os.unlink, args=(address,), exitpriority=0
jpayne@68 603 )
jpayne@68 604 else:
jpayne@68 605 self._unlink = None
jpayne@68 606
jpayne@68 607 def accept(self):
jpayne@68 608 s, self._last_accepted = self._socket.accept()
jpayne@68 609 s.setblocking(True)
jpayne@68 610 return Connection(s.detach())
jpayne@68 611
jpayne@68 612 def close(self):
jpayne@68 613 try:
jpayne@68 614 self._socket.close()
jpayne@68 615 finally:
jpayne@68 616 unlink = self._unlink
jpayne@68 617 if unlink is not None:
jpayne@68 618 self._unlink = None
jpayne@68 619 unlink()
jpayne@68 620
jpayne@68 621
jpayne@68 622 def SocketClient(address):
jpayne@68 623 '''
jpayne@68 624 Return a connection object connected to the socket given by `address`
jpayne@68 625 '''
jpayne@68 626 family = address_type(address)
jpayne@68 627 with socket.socket( getattr(socket, family) ) as s:
jpayne@68 628 s.setblocking(True)
jpayne@68 629 s.connect(address)
jpayne@68 630 return Connection(s.detach())
jpayne@68 631
jpayne@68 632 #
jpayne@68 633 # Definitions for connections based on named pipes
jpayne@68 634 #
jpayne@68 635
jpayne@68 636 if sys.platform == 'win32':
jpayne@68 637
jpayne@68 638 class PipeListener(object):
jpayne@68 639 '''
jpayne@68 640 Representation of a named pipe
jpayne@68 641 '''
jpayne@68 642 def __init__(self, address, backlog=None):
jpayne@68 643 self._address = address
jpayne@68 644 self._handle_queue = [self._new_handle(first=True)]
jpayne@68 645
jpayne@68 646 self._last_accepted = None
jpayne@68 647 util.sub_debug('listener created with address=%r', self._address)
jpayne@68 648 self.close = util.Finalize(
jpayne@68 649 self, PipeListener._finalize_pipe_listener,
jpayne@68 650 args=(self._handle_queue, self._address), exitpriority=0
jpayne@68 651 )
jpayne@68 652
jpayne@68 653 def _new_handle(self, first=False):
jpayne@68 654 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
jpayne@68 655 if first:
jpayne@68 656 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
jpayne@68 657 return _winapi.CreateNamedPipe(
jpayne@68 658 self._address, flags,
jpayne@68 659 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
jpayne@68 660 _winapi.PIPE_WAIT,
jpayne@68 661 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
jpayne@68 662 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
jpayne@68 663 )
jpayne@68 664
jpayne@68 665 def accept(self):
jpayne@68 666 self._handle_queue.append(self._new_handle())
jpayne@68 667 handle = self._handle_queue.pop(0)
jpayne@68 668 try:
jpayne@68 669 ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
jpayne@68 670 except OSError as e:
jpayne@68 671 if e.winerror != _winapi.ERROR_NO_DATA:
jpayne@68 672 raise
jpayne@68 673 # ERROR_NO_DATA can occur if a client has already connected,
jpayne@68 674 # written data and then disconnected -- see Issue 14725.
jpayne@68 675 else:
jpayne@68 676 try:
jpayne@68 677 res = _winapi.WaitForMultipleObjects(
jpayne@68 678 [ov.event], False, INFINITE)
jpayne@68 679 except:
jpayne@68 680 ov.cancel()
jpayne@68 681 _winapi.CloseHandle(handle)
jpayne@68 682 raise
jpayne@68 683 finally:
jpayne@68 684 _, err = ov.GetOverlappedResult(True)
jpayne@68 685 assert err == 0
jpayne@68 686 return PipeConnection(handle)
jpayne@68 687
jpayne@68 688 @staticmethod
jpayne@68 689 def _finalize_pipe_listener(queue, address):
jpayne@68 690 util.sub_debug('closing listener with address=%r', address)
jpayne@68 691 for handle in queue:
jpayne@68 692 _winapi.CloseHandle(handle)
jpayne@68 693
jpayne@68 694 def PipeClient(address):
jpayne@68 695 '''
jpayne@68 696 Return a connection object connected to the pipe given by `address`
jpayne@68 697 '''
jpayne@68 698 t = _init_timeout()
jpayne@68 699 while 1:
jpayne@68 700 try:
jpayne@68 701 _winapi.WaitNamedPipe(address, 1000)
jpayne@68 702 h = _winapi.CreateFile(
jpayne@68 703 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
jpayne@68 704 0, _winapi.NULL, _winapi.OPEN_EXISTING,
jpayne@68 705 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
jpayne@68 706 )
jpayne@68 707 except OSError as e:
jpayne@68 708 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
jpayne@68 709 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
jpayne@68 710 raise
jpayne@68 711 else:
jpayne@68 712 break
jpayne@68 713 else:
jpayne@68 714 raise
jpayne@68 715
jpayne@68 716 _winapi.SetNamedPipeHandleState(
jpayne@68 717 h, _winapi.PIPE_READMODE_MESSAGE, None, None
jpayne@68 718 )
jpayne@68 719 return PipeConnection(h)
jpayne@68 720
jpayne@68 721 #
jpayne@68 722 # Authentication stuff
jpayne@68 723 #
jpayne@68 724
jpayne@68 725 MESSAGE_LENGTH = 20
jpayne@68 726
jpayne@68 727 CHALLENGE = b'#CHALLENGE#'
jpayne@68 728 WELCOME = b'#WELCOME#'
jpayne@68 729 FAILURE = b'#FAILURE#'
jpayne@68 730
jpayne@68 731 def deliver_challenge(connection, authkey):
jpayne@68 732 import hmac
jpayne@68 733 if not isinstance(authkey, bytes):
jpayne@68 734 raise ValueError(
jpayne@68 735 "Authkey must be bytes, not {0!s}".format(type(authkey)))
jpayne@68 736 message = os.urandom(MESSAGE_LENGTH)
jpayne@68 737 connection.send_bytes(CHALLENGE + message)
jpayne@68 738 digest = hmac.new(authkey, message, 'md5').digest()
jpayne@68 739 response = connection.recv_bytes(256) # reject large message
jpayne@68 740 if response == digest:
jpayne@68 741 connection.send_bytes(WELCOME)
jpayne@68 742 else:
jpayne@68 743 connection.send_bytes(FAILURE)
jpayne@68 744 raise AuthenticationError('digest received was wrong')
jpayne@68 745
jpayne@68 746 def answer_challenge(connection, authkey):
jpayne@68 747 import hmac
jpayne@68 748 if not isinstance(authkey, bytes):
jpayne@68 749 raise ValueError(
jpayne@68 750 "Authkey must be bytes, not {0!s}".format(type(authkey)))
jpayne@68 751 message = connection.recv_bytes(256) # reject large message
jpayne@68 752 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
jpayne@68 753 message = message[len(CHALLENGE):]
jpayne@68 754 digest = hmac.new(authkey, message, 'md5').digest()
jpayne@68 755 connection.send_bytes(digest)
jpayne@68 756 response = connection.recv_bytes(256) # reject large message
jpayne@68 757 if response != WELCOME:
jpayne@68 758 raise AuthenticationError('digest sent was rejected')
jpayne@68 759
jpayne@68 760 #
jpayne@68 761 # Support for using xmlrpclib for serialization
jpayne@68 762 #
jpayne@68 763
jpayne@68 764 class ConnectionWrapper(object):
jpayne@68 765 def __init__(self, conn, dumps, loads):
jpayne@68 766 self._conn = conn
jpayne@68 767 self._dumps = dumps
jpayne@68 768 self._loads = loads
jpayne@68 769 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
jpayne@68 770 obj = getattr(conn, attr)
jpayne@68 771 setattr(self, attr, obj)
jpayne@68 772 def send(self, obj):
jpayne@68 773 s = self._dumps(obj)
jpayne@68 774 self._conn.send_bytes(s)
jpayne@68 775 def recv(self):
jpayne@68 776 s = self._conn.recv_bytes()
jpayne@68 777 return self._loads(s)
jpayne@68 778
jpayne@68 779 def _xml_dumps(obj):
jpayne@68 780 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
jpayne@68 781
jpayne@68 782 def _xml_loads(s):
jpayne@68 783 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
jpayne@68 784 return obj
jpayne@68 785
jpayne@68 786 class XmlListener(Listener):
jpayne@68 787 def accept(self):
jpayne@68 788 global xmlrpclib
jpayne@68 789 import xmlrpc.client as xmlrpclib
jpayne@68 790 obj = Listener.accept(self)
jpayne@68 791 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
jpayne@68 792
jpayne@68 793 def XmlClient(*args, **kwds):
jpayne@68 794 global xmlrpclib
jpayne@68 795 import xmlrpc.client as xmlrpclib
jpayne@68 796 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
jpayne@68 797
jpayne@68 798 #
jpayne@68 799 # Wait
jpayne@68 800 #
jpayne@68 801
jpayne@68 802 if sys.platform == 'win32':
jpayne@68 803
jpayne@68 804 def _exhaustive_wait(handles, timeout):
jpayne@68 805 # Return ALL handles which are currently signalled. (Only
jpayne@68 806 # returning the first signalled might create starvation issues.)
jpayne@68 807 L = list(handles)
jpayne@68 808 ready = []
jpayne@68 809 while L:
jpayne@68 810 res = _winapi.WaitForMultipleObjects(L, False, timeout)
jpayne@68 811 if res == WAIT_TIMEOUT:
jpayne@68 812 break
jpayne@68 813 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
jpayne@68 814 res -= WAIT_OBJECT_0
jpayne@68 815 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
jpayne@68 816 res -= WAIT_ABANDONED_0
jpayne@68 817 else:
jpayne@68 818 raise RuntimeError('Should not get here')
jpayne@68 819 ready.append(L[res])
jpayne@68 820 L = L[res+1:]
jpayne@68 821 timeout = 0
jpayne@68 822 return ready
jpayne@68 823
jpayne@68 824 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
jpayne@68 825
jpayne@68 826 def wait(object_list, timeout=None):
jpayne@68 827 '''
jpayne@68 828 Wait till an object in object_list is ready/readable.
jpayne@68 829
jpayne@68 830 Returns list of those objects in object_list which are ready/readable.
jpayne@68 831 '''
jpayne@68 832 if timeout is None:
jpayne@68 833 timeout = INFINITE
jpayne@68 834 elif timeout < 0:
jpayne@68 835 timeout = 0
jpayne@68 836 else:
jpayne@68 837 timeout = int(timeout * 1000 + 0.5)
jpayne@68 838
jpayne@68 839 object_list = list(object_list)
jpayne@68 840 waithandle_to_obj = {}
jpayne@68 841 ov_list = []
jpayne@68 842 ready_objects = set()
jpayne@68 843 ready_handles = set()
jpayne@68 844
jpayne@68 845 try:
jpayne@68 846 for o in object_list:
jpayne@68 847 try:
jpayne@68 848 fileno = getattr(o, 'fileno')
jpayne@68 849 except AttributeError:
jpayne@68 850 waithandle_to_obj[o.__index__()] = o
jpayne@68 851 else:
jpayne@68 852 # start an overlapped read of length zero
jpayne@68 853 try:
jpayne@68 854 ov, err = _winapi.ReadFile(fileno(), 0, True)
jpayne@68 855 except OSError as e:
jpayne@68 856 ov, err = None, e.winerror
jpayne@68 857 if err not in _ready_errors:
jpayne@68 858 raise
jpayne@68 859 if err == _winapi.ERROR_IO_PENDING:
jpayne@68 860 ov_list.append(ov)
jpayne@68 861 waithandle_to_obj[ov.event] = o
jpayne@68 862 else:
jpayne@68 863 # If o.fileno() is an overlapped pipe handle and
jpayne@68 864 # err == 0 then there is a zero length message
jpayne@68 865 # in the pipe, but it HAS NOT been consumed...
jpayne@68 866 if ov and sys.getwindowsversion()[:2] >= (6, 2):
jpayne@68 867 # ... except on Windows 8 and later, where
jpayne@68 868 # the message HAS been consumed.
jpayne@68 869 try:
jpayne@68 870 _, err = ov.GetOverlappedResult(False)
jpayne@68 871 except OSError as e:
jpayne@68 872 err = e.winerror
jpayne@68 873 if not err and hasattr(o, '_got_empty_message'):
jpayne@68 874 o._got_empty_message = True
jpayne@68 875 ready_objects.add(o)
jpayne@68 876 timeout = 0
jpayne@68 877
jpayne@68 878 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
jpayne@68 879 finally:
jpayne@68 880 # request that overlapped reads stop
jpayne@68 881 for ov in ov_list:
jpayne@68 882 ov.cancel()
jpayne@68 883
jpayne@68 884 # wait for all overlapped reads to stop
jpayne@68 885 for ov in ov_list:
jpayne@68 886 try:
jpayne@68 887 _, err = ov.GetOverlappedResult(True)
jpayne@68 888 except OSError as e:
jpayne@68 889 err = e.winerror
jpayne@68 890 if err not in _ready_errors:
jpayne@68 891 raise
jpayne@68 892 if err != _winapi.ERROR_OPERATION_ABORTED:
jpayne@68 893 o = waithandle_to_obj[ov.event]
jpayne@68 894 ready_objects.add(o)
jpayne@68 895 if err == 0:
jpayne@68 896 # If o.fileno() is an overlapped pipe handle then
jpayne@68 897 # a zero length message HAS been consumed.
jpayne@68 898 if hasattr(o, '_got_empty_message'):
jpayne@68 899 o._got_empty_message = True
jpayne@68 900
jpayne@68 901 ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
jpayne@68 902 return [o for o in object_list if o in ready_objects]
jpayne@68 903
jpayne@68 904 else:
jpayne@68 905
jpayne@68 906 import selectors
jpayne@68 907
jpayne@68 908 # poll/select have the advantage of not requiring any extra file
jpayne@68 909 # descriptor, contrarily to epoll/kqueue (also, they require a single
jpayne@68 910 # syscall).
jpayne@68 911 if hasattr(selectors, 'PollSelector'):
jpayne@68 912 _WaitSelector = selectors.PollSelector
jpayne@68 913 else:
jpayne@68 914 _WaitSelector = selectors.SelectSelector
jpayne@68 915
jpayne@68 916 def wait(object_list, timeout=None):
jpayne@68 917 '''
jpayne@68 918 Wait till an object in object_list is ready/readable.
jpayne@68 919
jpayne@68 920 Returns list of those objects in object_list which are ready/readable.
jpayne@68 921 '''
jpayne@68 922 with _WaitSelector() as selector:
jpayne@68 923 for obj in object_list:
jpayne@68 924 selector.register(obj, selectors.EVENT_READ)
jpayne@68 925
jpayne@68 926 if timeout is not None:
jpayne@68 927 deadline = time.monotonic() + timeout
jpayne@68 928
jpayne@68 929 while True:
jpayne@68 930 ready = selector.select(timeout)
jpayne@68 931 if ready:
jpayne@68 932 return [key.fileobj for (key, events) in ready]
jpayne@68 933 else:
jpayne@68 934 if timeout is not None:
jpayne@68 935 timeout = deadline - time.monotonic()
jpayne@68 936 if timeout < 0:
jpayne@68 937 return ready
jpayne@68 938
jpayne@68 939 #
jpayne@68 940 # Make connection and socket objects sharable if possible
jpayne@68 941 #
jpayne@68 942
jpayne@68 943 if sys.platform == 'win32':
jpayne@68 944 def reduce_connection(conn):
jpayne@68 945 handle = conn.fileno()
jpayne@68 946 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
jpayne@68 947 from . import resource_sharer
jpayne@68 948 ds = resource_sharer.DupSocket(s)
jpayne@68 949 return rebuild_connection, (ds, conn.readable, conn.writable)
jpayne@68 950 def rebuild_connection(ds, readable, writable):
jpayne@68 951 sock = ds.detach()
jpayne@68 952 return Connection(sock.detach(), readable, writable)
jpayne@68 953 reduction.register(Connection, reduce_connection)
jpayne@68 954
jpayne@68 955 def reduce_pipe_connection(conn):
jpayne@68 956 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
jpayne@68 957 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
jpayne@68 958 dh = reduction.DupHandle(conn.fileno(), access)
jpayne@68 959 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
jpayne@68 960 def rebuild_pipe_connection(dh, readable, writable):
jpayne@68 961 handle = dh.detach()
jpayne@68 962 return PipeConnection(handle, readable, writable)
jpayne@68 963 reduction.register(PipeConnection, reduce_pipe_connection)
jpayne@68 964
jpayne@68 965 else:
jpayne@68 966 def reduce_connection(conn):
jpayne@68 967 df = reduction.DupFd(conn.fileno())
jpayne@68 968 return rebuild_connection, (df, conn.readable, conn.writable)
jpayne@68 969 def rebuild_connection(df, readable, writable):
jpayne@68 970 fd = df.detach()
jpayne@68 971 return Connection(fd, readable, writable)
jpayne@68 972 reduction.register(Connection, reduce_connection)