annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/connection.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 #
jpayne@69 2 # A higher level module for using sockets (or Windows named pipes)
jpayne@69 3 #
jpayne@69 4 # multiprocessing/connection.py
jpayne@69 5 #
jpayne@69 6 # Copyright (c) 2006-2008, R Oudkerk
jpayne@69 7 # Licensed to PSF under a Contributor Agreement.
jpayne@69 8 #
jpayne@69 9
jpayne@69 10 __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
jpayne@69 11
jpayne@69 12 import io
jpayne@69 13 import os
jpayne@69 14 import sys
jpayne@69 15 import socket
jpayne@69 16 import struct
jpayne@69 17 import time
jpayne@69 18 import tempfile
jpayne@69 19 import itertools
jpayne@69 20
jpayne@69 21 import _multiprocessing
jpayne@69 22
jpayne@69 23 from . import util
jpayne@69 24
jpayne@69 25 from . import AuthenticationError, BufferTooShort
jpayne@69 26 from .context import reduction
jpayne@69 27 _ForkingPickler = reduction.ForkingPickler
jpayne@69 28
jpayne@69 29 try:
jpayne@69 30 import _winapi
jpayne@69 31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
jpayne@69 32 except ImportError:
jpayne@69 33 if sys.platform == 'win32':
jpayne@69 34 raise
jpayne@69 35 _winapi = None
jpayne@69 36
jpayne@69 37 #
jpayne@69 38 #
jpayne@69 39 #
jpayne@69 40
jpayne@69 41 BUFSIZE = 8192
jpayne@69 42 # A very generous timeout when it comes to local connections...
jpayne@69 43 CONNECTION_TIMEOUT = 20.
jpayne@69 44
jpayne@69 45 _mmap_counter = itertools.count()
jpayne@69 46
jpayne@69 47 default_family = 'AF_INET'
jpayne@69 48 families = ['AF_INET']
jpayne@69 49
jpayne@69 50 if hasattr(socket, 'AF_UNIX'):
jpayne@69 51 default_family = 'AF_UNIX'
jpayne@69 52 families += ['AF_UNIX']
jpayne@69 53
jpayne@69 54 if sys.platform == 'win32':
jpayne@69 55 default_family = 'AF_PIPE'
jpayne@69 56 families += ['AF_PIPE']
jpayne@69 57
jpayne@69 58
jpayne@69 59 def _init_timeout(timeout=CONNECTION_TIMEOUT):
jpayne@69 60 return time.monotonic() + timeout
jpayne@69 61
jpayne@69 62 def _check_timeout(t):
jpayne@69 63 return time.monotonic() > t
jpayne@69 64
jpayne@69 65 #
jpayne@69 66 #
jpayne@69 67 #
jpayne@69 68
jpayne@69 69 def arbitrary_address(family):
jpayne@69 70 '''
jpayne@69 71 Return an arbitrary free address for the given family
jpayne@69 72 '''
jpayne@69 73 if family == 'AF_INET':
jpayne@69 74 return ('localhost', 0)
jpayne@69 75 elif family == 'AF_UNIX':
jpayne@69 76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
jpayne@69 77 elif family == 'AF_PIPE':
jpayne@69 78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
jpayne@69 79 (os.getpid(), next(_mmap_counter)), dir="")
jpayne@69 80 else:
jpayne@69 81 raise ValueError('unrecognized family')
jpayne@69 82
jpayne@69 83 def _validate_family(family):
jpayne@69 84 '''
jpayne@69 85 Checks if the family is valid for the current environment.
jpayne@69 86 '''
jpayne@69 87 if sys.platform != 'win32' and family == 'AF_PIPE':
jpayne@69 88 raise ValueError('Family %s is not recognized.' % family)
jpayne@69 89
jpayne@69 90 if sys.platform == 'win32' and family == 'AF_UNIX':
jpayne@69 91 # double check
jpayne@69 92 if not hasattr(socket, family):
jpayne@69 93 raise ValueError('Family %s is not recognized.' % family)
jpayne@69 94
jpayne@69 95 def address_type(address):
jpayne@69 96 '''
jpayne@69 97 Return the types of the address
jpayne@69 98
jpayne@69 99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
jpayne@69 100 '''
jpayne@69 101 if type(address) == tuple:
jpayne@69 102 return 'AF_INET'
jpayne@69 103 elif type(address) is str and address.startswith('\\\\'):
jpayne@69 104 return 'AF_PIPE'
jpayne@69 105 elif type(address) is str:
jpayne@69 106 return 'AF_UNIX'
jpayne@69 107 else:
jpayne@69 108 raise ValueError('address type of %r unrecognized' % address)
jpayne@69 109
jpayne@69 110 #
jpayne@69 111 # Connection classes
jpayne@69 112 #
jpayne@69 113
jpayne@69 114 class _ConnectionBase:
jpayne@69 115 _handle = None
jpayne@69 116
jpayne@69 117 def __init__(self, handle, readable=True, writable=True):
jpayne@69 118 handle = handle.__index__()
jpayne@69 119 if handle < 0:
jpayne@69 120 raise ValueError("invalid handle")
jpayne@69 121 if not readable and not writable:
jpayne@69 122 raise ValueError(
jpayne@69 123 "at least one of `readable` and `writable` must be True")
jpayne@69 124 self._handle = handle
jpayne@69 125 self._readable = readable
jpayne@69 126 self._writable = writable
jpayne@69 127
jpayne@69 128 # XXX should we use util.Finalize instead of a __del__?
jpayne@69 129
jpayne@69 130 def __del__(self):
jpayne@69 131 if self._handle is not None:
jpayne@69 132 self._close()
jpayne@69 133
jpayne@69 134 def _check_closed(self):
jpayne@69 135 if self._handle is None:
jpayne@69 136 raise OSError("handle is closed")
jpayne@69 137
jpayne@69 138 def _check_readable(self):
jpayne@69 139 if not self._readable:
jpayne@69 140 raise OSError("connection is write-only")
jpayne@69 141
jpayne@69 142 def _check_writable(self):
jpayne@69 143 if not self._writable:
jpayne@69 144 raise OSError("connection is read-only")
jpayne@69 145
jpayne@69 146 def _bad_message_length(self):
jpayne@69 147 if self._writable:
jpayne@69 148 self._readable = False
jpayne@69 149 else:
jpayne@69 150 self.close()
jpayne@69 151 raise OSError("bad message length")
jpayne@69 152
jpayne@69 153 @property
jpayne@69 154 def closed(self):
jpayne@69 155 """True if the connection is closed"""
jpayne@69 156 return self._handle is None
jpayne@69 157
jpayne@69 158 @property
jpayne@69 159 def readable(self):
jpayne@69 160 """True if the connection is readable"""
jpayne@69 161 return self._readable
jpayne@69 162
jpayne@69 163 @property
jpayne@69 164 def writable(self):
jpayne@69 165 """True if the connection is writable"""
jpayne@69 166 return self._writable
jpayne@69 167
jpayne@69 168 def fileno(self):
jpayne@69 169 """File descriptor or handle of the connection"""
jpayne@69 170 self._check_closed()
jpayne@69 171 return self._handle
jpayne@69 172
jpayne@69 173 def close(self):
jpayne@69 174 """Close the connection"""
jpayne@69 175 if self._handle is not None:
jpayne@69 176 try:
jpayne@69 177 self._close()
jpayne@69 178 finally:
jpayne@69 179 self._handle = None
jpayne@69 180
jpayne@69 181 def send_bytes(self, buf, offset=0, size=None):
jpayne@69 182 """Send the bytes data from a bytes-like object"""
jpayne@69 183 self._check_closed()
jpayne@69 184 self._check_writable()
jpayne@69 185 m = memoryview(buf)
jpayne@69 186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
jpayne@69 187 if m.itemsize > 1:
jpayne@69 188 m = memoryview(bytes(m))
jpayne@69 189 n = len(m)
jpayne@69 190 if offset < 0:
jpayne@69 191 raise ValueError("offset is negative")
jpayne@69 192 if n < offset:
jpayne@69 193 raise ValueError("buffer length < offset")
jpayne@69 194 if size is None:
jpayne@69 195 size = n - offset
jpayne@69 196 elif size < 0:
jpayne@69 197 raise ValueError("size is negative")
jpayne@69 198 elif offset + size > n:
jpayne@69 199 raise ValueError("buffer length < offset + size")
jpayne@69 200 self._send_bytes(m[offset:offset + size])
jpayne@69 201
jpayne@69 202 def send(self, obj):
jpayne@69 203 """Send a (picklable) object"""
jpayne@69 204 self._check_closed()
jpayne@69 205 self._check_writable()
jpayne@69 206 self._send_bytes(_ForkingPickler.dumps(obj))
jpayne@69 207
jpayne@69 208 def recv_bytes(self, maxlength=None):
jpayne@69 209 """
jpayne@69 210 Receive bytes data as a bytes object.
jpayne@69 211 """
jpayne@69 212 self._check_closed()
jpayne@69 213 self._check_readable()
jpayne@69 214 if maxlength is not None and maxlength < 0:
jpayne@69 215 raise ValueError("negative maxlength")
jpayne@69 216 buf = self._recv_bytes(maxlength)
jpayne@69 217 if buf is None:
jpayne@69 218 self._bad_message_length()
jpayne@69 219 return buf.getvalue()
jpayne@69 220
jpayne@69 221 def recv_bytes_into(self, buf, offset=0):
jpayne@69 222 """
jpayne@69 223 Receive bytes data into a writeable bytes-like object.
jpayne@69 224 Return the number of bytes read.
jpayne@69 225 """
jpayne@69 226 self._check_closed()
jpayne@69 227 self._check_readable()
jpayne@69 228 with memoryview(buf) as m:
jpayne@69 229 # Get bytesize of arbitrary buffer
jpayne@69 230 itemsize = m.itemsize
jpayne@69 231 bytesize = itemsize * len(m)
jpayne@69 232 if offset < 0:
jpayne@69 233 raise ValueError("negative offset")
jpayne@69 234 elif offset > bytesize:
jpayne@69 235 raise ValueError("offset too large")
jpayne@69 236 result = self._recv_bytes()
jpayne@69 237 size = result.tell()
jpayne@69 238 if bytesize < offset + size:
jpayne@69 239 raise BufferTooShort(result.getvalue())
jpayne@69 240 # Message can fit in dest
jpayne@69 241 result.seek(0)
jpayne@69 242 result.readinto(m[offset // itemsize :
jpayne@69 243 (offset + size) // itemsize])
jpayne@69 244 return size
jpayne@69 245
jpayne@69 246 def recv(self):
jpayne@69 247 """Receive a (picklable) object"""
jpayne@69 248 self._check_closed()
jpayne@69 249 self._check_readable()
jpayne@69 250 buf = self._recv_bytes()
jpayne@69 251 return _ForkingPickler.loads(buf.getbuffer())
jpayne@69 252
jpayne@69 253 def poll(self, timeout=0.0):
jpayne@69 254 """Whether there is any input available to be read"""
jpayne@69 255 self._check_closed()
jpayne@69 256 self._check_readable()
jpayne@69 257 return self._poll(timeout)
jpayne@69 258
jpayne@69 259 def __enter__(self):
jpayne@69 260 return self
jpayne@69 261
jpayne@69 262 def __exit__(self, exc_type, exc_value, exc_tb):
jpayne@69 263 self.close()
jpayne@69 264
jpayne@69 265
jpayne@69 266 if _winapi:
jpayne@69 267
jpayne@69 268 class PipeConnection(_ConnectionBase):
jpayne@69 269 """
jpayne@69 270 Connection class based on a Windows named pipe.
jpayne@69 271 Overlapped I/O is used, so the handles must have been created
jpayne@69 272 with FILE_FLAG_OVERLAPPED.
jpayne@69 273 """
jpayne@69 274 _got_empty_message = False
jpayne@69 275
jpayne@69 276 def _close(self, _CloseHandle=_winapi.CloseHandle):
jpayne@69 277 _CloseHandle(self._handle)
jpayne@69 278
jpayne@69 279 def _send_bytes(self, buf):
jpayne@69 280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
jpayne@69 281 try:
jpayne@69 282 if err == _winapi.ERROR_IO_PENDING:
jpayne@69 283 waitres = _winapi.WaitForMultipleObjects(
jpayne@69 284 [ov.event], False, INFINITE)
jpayne@69 285 assert waitres == WAIT_OBJECT_0
jpayne@69 286 except:
jpayne@69 287 ov.cancel()
jpayne@69 288 raise
jpayne@69 289 finally:
jpayne@69 290 nwritten, err = ov.GetOverlappedResult(True)
jpayne@69 291 assert err == 0
jpayne@69 292 assert nwritten == len(buf)
jpayne@69 293
jpayne@69 294 def _recv_bytes(self, maxsize=None):
jpayne@69 295 if self._got_empty_message:
jpayne@69 296 self._got_empty_message = False
jpayne@69 297 return io.BytesIO()
jpayne@69 298 else:
jpayne@69 299 bsize = 128 if maxsize is None else min(maxsize, 128)
jpayne@69 300 try:
jpayne@69 301 ov, err = _winapi.ReadFile(self._handle, bsize,
jpayne@69 302 overlapped=True)
jpayne@69 303 try:
jpayne@69 304 if err == _winapi.ERROR_IO_PENDING:
jpayne@69 305 waitres = _winapi.WaitForMultipleObjects(
jpayne@69 306 [ov.event], False, INFINITE)
jpayne@69 307 assert waitres == WAIT_OBJECT_0
jpayne@69 308 except:
jpayne@69 309 ov.cancel()
jpayne@69 310 raise
jpayne@69 311 finally:
jpayne@69 312 nread, err = ov.GetOverlappedResult(True)
jpayne@69 313 if err == 0:
jpayne@69 314 f = io.BytesIO()
jpayne@69 315 f.write(ov.getbuffer())
jpayne@69 316 return f
jpayne@69 317 elif err == _winapi.ERROR_MORE_DATA:
jpayne@69 318 return self._get_more_data(ov, maxsize)
jpayne@69 319 except OSError as e:
jpayne@69 320 if e.winerror == _winapi.ERROR_BROKEN_PIPE:
jpayne@69 321 raise EOFError
jpayne@69 322 else:
jpayne@69 323 raise
jpayne@69 324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
jpayne@69 325
jpayne@69 326 def _poll(self, timeout):
jpayne@69 327 if (self._got_empty_message or
jpayne@69 328 _winapi.PeekNamedPipe(self._handle)[0] != 0):
jpayne@69 329 return True
jpayne@69 330 return bool(wait([self], timeout))
jpayne@69 331
jpayne@69 332 def _get_more_data(self, ov, maxsize):
jpayne@69 333 buf = ov.getbuffer()
jpayne@69 334 f = io.BytesIO()
jpayne@69 335 f.write(buf)
jpayne@69 336 left = _winapi.PeekNamedPipe(self._handle)[1]
jpayne@69 337 assert left > 0
jpayne@69 338 if maxsize is not None and len(buf) + left > maxsize:
jpayne@69 339 self._bad_message_length()
jpayne@69 340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
jpayne@69 341 rbytes, err = ov.GetOverlappedResult(True)
jpayne@69 342 assert err == 0
jpayne@69 343 assert rbytes == left
jpayne@69 344 f.write(ov.getbuffer())
jpayne@69 345 return f
jpayne@69 346
jpayne@69 347
jpayne@69 348 class Connection(_ConnectionBase):
jpayne@69 349 """
jpayne@69 350 Connection class based on an arbitrary file descriptor (Unix only), or
jpayne@69 351 a socket handle (Windows).
jpayne@69 352 """
jpayne@69 353
jpayne@69 354 if _winapi:
jpayne@69 355 def _close(self, _close=_multiprocessing.closesocket):
jpayne@69 356 _close(self._handle)
jpayne@69 357 _write = _multiprocessing.send
jpayne@69 358 _read = _multiprocessing.recv
jpayne@69 359 else:
jpayne@69 360 def _close(self, _close=os.close):
jpayne@69 361 _close(self._handle)
jpayne@69 362 _write = os.write
jpayne@69 363 _read = os.read
jpayne@69 364
jpayne@69 365 def _send(self, buf, write=_write):
jpayne@69 366 remaining = len(buf)
jpayne@69 367 while True:
jpayne@69 368 n = write(self._handle, buf)
jpayne@69 369 remaining -= n
jpayne@69 370 if remaining == 0:
jpayne@69 371 break
jpayne@69 372 buf = buf[n:]
jpayne@69 373
jpayne@69 374 def _recv(self, size, read=_read):
jpayne@69 375 buf = io.BytesIO()
jpayne@69 376 handle = self._handle
jpayne@69 377 remaining = size
jpayne@69 378 while remaining > 0:
jpayne@69 379 chunk = read(handle, remaining)
jpayne@69 380 n = len(chunk)
jpayne@69 381 if n == 0:
jpayne@69 382 if remaining == size:
jpayne@69 383 raise EOFError
jpayne@69 384 else:
jpayne@69 385 raise OSError("got end of file during message")
jpayne@69 386 buf.write(chunk)
jpayne@69 387 remaining -= n
jpayne@69 388 return buf
jpayne@69 389
jpayne@69 390 def _send_bytes(self, buf):
jpayne@69 391 n = len(buf)
jpayne@69 392 if n > 0x7fffffff:
jpayne@69 393 pre_header = struct.pack("!i", -1)
jpayne@69 394 header = struct.pack("!Q", n)
jpayne@69 395 self._send(pre_header)
jpayne@69 396 self._send(header)
jpayne@69 397 self._send(buf)
jpayne@69 398 else:
jpayne@69 399 # For wire compatibility with 3.7 and lower
jpayne@69 400 header = struct.pack("!i", n)
jpayne@69 401 if n > 16384:
jpayne@69 402 # The payload is large so Nagle's algorithm won't be triggered
jpayne@69 403 # and we'd better avoid the cost of concatenation.
jpayne@69 404 self._send(header)
jpayne@69 405 self._send(buf)
jpayne@69 406 else:
jpayne@69 407 # Issue #20540: concatenate before sending, to avoid delays due
jpayne@69 408 # to Nagle's algorithm on a TCP socket.
jpayne@69 409 # Also note we want to avoid sending a 0-length buffer separately,
jpayne@69 410 # to avoid "broken pipe" errors if the other end closed the pipe.
jpayne@69 411 self._send(header + buf)
jpayne@69 412
jpayne@69 413 def _recv_bytes(self, maxsize=None):
jpayne@69 414 buf = self._recv(4)
jpayne@69 415 size, = struct.unpack("!i", buf.getvalue())
jpayne@69 416 if size == -1:
jpayne@69 417 buf = self._recv(8)
jpayne@69 418 size, = struct.unpack("!Q", buf.getvalue())
jpayne@69 419 if maxsize is not None and size > maxsize:
jpayne@69 420 return None
jpayne@69 421 return self._recv(size)
jpayne@69 422
jpayne@69 423 def _poll(self, timeout):
jpayne@69 424 r = wait([self], timeout)
jpayne@69 425 return bool(r)
jpayne@69 426
jpayne@69 427
jpayne@69 428 #
jpayne@69 429 # Public functions
jpayne@69 430 #
jpayne@69 431
jpayne@69 432 class Listener(object):
jpayne@69 433 '''
jpayne@69 434 Returns a listener object.
jpayne@69 435
jpayne@69 436 This is a wrapper for a bound socket which is 'listening' for
jpayne@69 437 connections, or for a Windows named pipe.
jpayne@69 438 '''
jpayne@69 439 def __init__(self, address=None, family=None, backlog=1, authkey=None):
jpayne@69 440 family = family or (address and address_type(address)) \
jpayne@69 441 or default_family
jpayne@69 442 address = address or arbitrary_address(family)
jpayne@69 443
jpayne@69 444 _validate_family(family)
jpayne@69 445 if family == 'AF_PIPE':
jpayne@69 446 self._listener = PipeListener(address, backlog)
jpayne@69 447 else:
jpayne@69 448 self._listener = SocketListener(address, family, backlog)
jpayne@69 449
jpayne@69 450 if authkey is not None and not isinstance(authkey, bytes):
jpayne@69 451 raise TypeError('authkey should be a byte string')
jpayne@69 452
jpayne@69 453 self._authkey = authkey
jpayne@69 454
jpayne@69 455 def accept(self):
jpayne@69 456 '''
jpayne@69 457 Accept a connection on the bound socket or named pipe of `self`.
jpayne@69 458
jpayne@69 459 Returns a `Connection` object.
jpayne@69 460 '''
jpayne@69 461 if self._listener is None:
jpayne@69 462 raise OSError('listener is closed')
jpayne@69 463 c = self._listener.accept()
jpayne@69 464 if self._authkey:
jpayne@69 465 deliver_challenge(c, self._authkey)
jpayne@69 466 answer_challenge(c, self._authkey)
jpayne@69 467 return c
jpayne@69 468
jpayne@69 469 def close(self):
jpayne@69 470 '''
jpayne@69 471 Close the bound socket or named pipe of `self`.
jpayne@69 472 '''
jpayne@69 473 listener = self._listener
jpayne@69 474 if listener is not None:
jpayne@69 475 self._listener = None
jpayne@69 476 listener.close()
jpayne@69 477
jpayne@69 478 @property
jpayne@69 479 def address(self):
jpayne@69 480 return self._listener._address
jpayne@69 481
jpayne@69 482 @property
jpayne@69 483 def last_accepted(self):
jpayne@69 484 return self._listener._last_accepted
jpayne@69 485
jpayne@69 486 def __enter__(self):
jpayne@69 487 return self
jpayne@69 488
jpayne@69 489 def __exit__(self, exc_type, exc_value, exc_tb):
jpayne@69 490 self.close()
jpayne@69 491
jpayne@69 492
jpayne@69 493 def Client(address, family=None, authkey=None):
jpayne@69 494 '''
jpayne@69 495 Returns a connection to the address of a `Listener`
jpayne@69 496 '''
jpayne@69 497 family = family or address_type(address)
jpayne@69 498 _validate_family(family)
jpayne@69 499 if family == 'AF_PIPE':
jpayne@69 500 c = PipeClient(address)
jpayne@69 501 else:
jpayne@69 502 c = SocketClient(address)
jpayne@69 503
jpayne@69 504 if authkey is not None and not isinstance(authkey, bytes):
jpayne@69 505 raise TypeError('authkey should be a byte string')
jpayne@69 506
jpayne@69 507 if authkey is not None:
jpayne@69 508 answer_challenge(c, authkey)
jpayne@69 509 deliver_challenge(c, authkey)
jpayne@69 510
jpayne@69 511 return c
jpayne@69 512
jpayne@69 513
jpayne@69 514 if sys.platform != 'win32':
jpayne@69 515
jpayne@69 516 def Pipe(duplex=True):
jpayne@69 517 '''
jpayne@69 518 Returns pair of connection objects at either end of a pipe
jpayne@69 519 '''
jpayne@69 520 if duplex:
jpayne@69 521 s1, s2 = socket.socketpair()
jpayne@69 522 s1.setblocking(True)
jpayne@69 523 s2.setblocking(True)
jpayne@69 524 c1 = Connection(s1.detach())
jpayne@69 525 c2 = Connection(s2.detach())
jpayne@69 526 else:
jpayne@69 527 fd1, fd2 = os.pipe()
jpayne@69 528 c1 = Connection(fd1, writable=False)
jpayne@69 529 c2 = Connection(fd2, readable=False)
jpayne@69 530
jpayne@69 531 return c1, c2
jpayne@69 532
jpayne@69 533 else:
jpayne@69 534
jpayne@69 535 def Pipe(duplex=True):
jpayne@69 536 '''
jpayne@69 537 Returns pair of connection objects at either end of a pipe
jpayne@69 538 '''
jpayne@69 539 address = arbitrary_address('AF_PIPE')
jpayne@69 540 if duplex:
jpayne@69 541 openmode = _winapi.PIPE_ACCESS_DUPLEX
jpayne@69 542 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
jpayne@69 543 obsize, ibsize = BUFSIZE, BUFSIZE
jpayne@69 544 else:
jpayne@69 545 openmode = _winapi.PIPE_ACCESS_INBOUND
jpayne@69 546 access = _winapi.GENERIC_WRITE
jpayne@69 547 obsize, ibsize = 0, BUFSIZE
jpayne@69 548
jpayne@69 549 h1 = _winapi.CreateNamedPipe(
jpayne@69 550 address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
jpayne@69 551 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
jpayne@69 552 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
jpayne@69 553 _winapi.PIPE_WAIT,
jpayne@69 554 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
jpayne@69 555 # default security descriptor: the handle cannot be inherited
jpayne@69 556 _winapi.NULL
jpayne@69 557 )
jpayne@69 558 h2 = _winapi.CreateFile(
jpayne@69 559 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
jpayne@69 560 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
jpayne@69 561 )
jpayne@69 562 _winapi.SetNamedPipeHandleState(
jpayne@69 563 h2, _winapi.PIPE_READMODE_MESSAGE, None, None
jpayne@69 564 )
jpayne@69 565
jpayne@69 566 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
jpayne@69 567 _, err = overlapped.GetOverlappedResult(True)
jpayne@69 568 assert err == 0
jpayne@69 569
jpayne@69 570 c1 = PipeConnection(h1, writable=duplex)
jpayne@69 571 c2 = PipeConnection(h2, readable=duplex)
jpayne@69 572
jpayne@69 573 return c1, c2
jpayne@69 574
jpayne@69 575 #
jpayne@69 576 # Definitions for connections based on sockets
jpayne@69 577 #
jpayne@69 578
jpayne@69 579 class SocketListener(object):
jpayne@69 580 '''
jpayne@69 581 Representation of a socket which is bound to an address and listening
jpayne@69 582 '''
jpayne@69 583 def __init__(self, address, family, backlog=1):
jpayne@69 584 self._socket = socket.socket(getattr(socket, family))
jpayne@69 585 try:
jpayne@69 586 # SO_REUSEADDR has different semantics on Windows (issue #2550).
jpayne@69 587 if os.name == 'posix':
jpayne@69 588 self._socket.setsockopt(socket.SOL_SOCKET,
jpayne@69 589 socket.SO_REUSEADDR, 1)
jpayne@69 590 self._socket.setblocking(True)
jpayne@69 591 self._socket.bind(address)
jpayne@69 592 self._socket.listen(backlog)
jpayne@69 593 self._address = self._socket.getsockname()
jpayne@69 594 except OSError:
jpayne@69 595 self._socket.close()
jpayne@69 596 raise
jpayne@69 597 self._family = family
jpayne@69 598 self._last_accepted = None
jpayne@69 599
jpayne@69 600 if family == 'AF_UNIX':
jpayne@69 601 self._unlink = util.Finalize(
jpayne@69 602 self, os.unlink, args=(address,), exitpriority=0
jpayne@69 603 )
jpayne@69 604 else:
jpayne@69 605 self._unlink = None
jpayne@69 606
jpayne@69 607 def accept(self):
jpayne@69 608 s, self._last_accepted = self._socket.accept()
jpayne@69 609 s.setblocking(True)
jpayne@69 610 return Connection(s.detach())
jpayne@69 611
jpayne@69 612 def close(self):
jpayne@69 613 try:
jpayne@69 614 self._socket.close()
jpayne@69 615 finally:
jpayne@69 616 unlink = self._unlink
jpayne@69 617 if unlink is not None:
jpayne@69 618 self._unlink = None
jpayne@69 619 unlink()
jpayne@69 620
jpayne@69 621
jpayne@69 622 def SocketClient(address):
jpayne@69 623 '''
jpayne@69 624 Return a connection object connected to the socket given by `address`
jpayne@69 625 '''
jpayne@69 626 family = address_type(address)
jpayne@69 627 with socket.socket( getattr(socket, family) ) as s:
jpayne@69 628 s.setblocking(True)
jpayne@69 629 s.connect(address)
jpayne@69 630 return Connection(s.detach())
jpayne@69 631
jpayne@69 632 #
jpayne@69 633 # Definitions for connections based on named pipes
jpayne@69 634 #
jpayne@69 635
jpayne@69 636 if sys.platform == 'win32':
jpayne@69 637
jpayne@69 638 class PipeListener(object):
jpayne@69 639 '''
jpayne@69 640 Representation of a named pipe
jpayne@69 641 '''
jpayne@69 642 def __init__(self, address, backlog=None):
jpayne@69 643 self._address = address
jpayne@69 644 self._handle_queue = [self._new_handle(first=True)]
jpayne@69 645
jpayne@69 646 self._last_accepted = None
jpayne@69 647 util.sub_debug('listener created with address=%r', self._address)
jpayne@69 648 self.close = util.Finalize(
jpayne@69 649 self, PipeListener._finalize_pipe_listener,
jpayne@69 650 args=(self._handle_queue, self._address), exitpriority=0
jpayne@69 651 )
jpayne@69 652
jpayne@69 653 def _new_handle(self, first=False):
jpayne@69 654 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
jpayne@69 655 if first:
jpayne@69 656 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
jpayne@69 657 return _winapi.CreateNamedPipe(
jpayne@69 658 self._address, flags,
jpayne@69 659 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
jpayne@69 660 _winapi.PIPE_WAIT,
jpayne@69 661 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
jpayne@69 662 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
jpayne@69 663 )
jpayne@69 664
jpayne@69 665 def accept(self):
jpayne@69 666 self._handle_queue.append(self._new_handle())
jpayne@69 667 handle = self._handle_queue.pop(0)
jpayne@69 668 try:
jpayne@69 669 ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
jpayne@69 670 except OSError as e:
jpayne@69 671 if e.winerror != _winapi.ERROR_NO_DATA:
jpayne@69 672 raise
jpayne@69 673 # ERROR_NO_DATA can occur if a client has already connected,
jpayne@69 674 # written data and then disconnected -- see Issue 14725.
jpayne@69 675 else:
jpayne@69 676 try:
jpayne@69 677 res = _winapi.WaitForMultipleObjects(
jpayne@69 678 [ov.event], False, INFINITE)
jpayne@69 679 except:
jpayne@69 680 ov.cancel()
jpayne@69 681 _winapi.CloseHandle(handle)
jpayne@69 682 raise
jpayne@69 683 finally:
jpayne@69 684 _, err = ov.GetOverlappedResult(True)
jpayne@69 685 assert err == 0
jpayne@69 686 return PipeConnection(handle)
jpayne@69 687
jpayne@69 688 @staticmethod
jpayne@69 689 def _finalize_pipe_listener(queue, address):
jpayne@69 690 util.sub_debug('closing listener with address=%r', address)
jpayne@69 691 for handle in queue:
jpayne@69 692 _winapi.CloseHandle(handle)
jpayne@69 693
jpayne@69 694 def PipeClient(address):
jpayne@69 695 '''
jpayne@69 696 Return a connection object connected to the pipe given by `address`
jpayne@69 697 '''
jpayne@69 698 t = _init_timeout()
jpayne@69 699 while 1:
jpayne@69 700 try:
jpayne@69 701 _winapi.WaitNamedPipe(address, 1000)
jpayne@69 702 h = _winapi.CreateFile(
jpayne@69 703 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
jpayne@69 704 0, _winapi.NULL, _winapi.OPEN_EXISTING,
jpayne@69 705 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
jpayne@69 706 )
jpayne@69 707 except OSError as e:
jpayne@69 708 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
jpayne@69 709 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
jpayne@69 710 raise
jpayne@69 711 else:
jpayne@69 712 break
jpayne@69 713 else:
jpayne@69 714 raise
jpayne@69 715
jpayne@69 716 _winapi.SetNamedPipeHandleState(
jpayne@69 717 h, _winapi.PIPE_READMODE_MESSAGE, None, None
jpayne@69 718 )
jpayne@69 719 return PipeConnection(h)
jpayne@69 720
jpayne@69 721 #
jpayne@69 722 # Authentication stuff
jpayne@69 723 #
jpayne@69 724
jpayne@69 725 MESSAGE_LENGTH = 20
jpayne@69 726
jpayne@69 727 CHALLENGE = b'#CHALLENGE#'
jpayne@69 728 WELCOME = b'#WELCOME#'
jpayne@69 729 FAILURE = b'#FAILURE#'
jpayne@69 730
jpayne@69 731 def deliver_challenge(connection, authkey):
jpayne@69 732 import hmac
jpayne@69 733 if not isinstance(authkey, bytes):
jpayne@69 734 raise ValueError(
jpayne@69 735 "Authkey must be bytes, not {0!s}".format(type(authkey)))
jpayne@69 736 message = os.urandom(MESSAGE_LENGTH)
jpayne@69 737 connection.send_bytes(CHALLENGE + message)
jpayne@69 738 digest = hmac.new(authkey, message, 'md5').digest()
jpayne@69 739 response = connection.recv_bytes(256) # reject large message
jpayne@69 740 if response == digest:
jpayne@69 741 connection.send_bytes(WELCOME)
jpayne@69 742 else:
jpayne@69 743 connection.send_bytes(FAILURE)
jpayne@69 744 raise AuthenticationError('digest received was wrong')
jpayne@69 745
jpayne@69 746 def answer_challenge(connection, authkey):
jpayne@69 747 import hmac
jpayne@69 748 if not isinstance(authkey, bytes):
jpayne@69 749 raise ValueError(
jpayne@69 750 "Authkey must be bytes, not {0!s}".format(type(authkey)))
jpayne@69 751 message = connection.recv_bytes(256) # reject large message
jpayne@69 752 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
jpayne@69 753 message = message[len(CHALLENGE):]
jpayne@69 754 digest = hmac.new(authkey, message, 'md5').digest()
jpayne@69 755 connection.send_bytes(digest)
jpayne@69 756 response = connection.recv_bytes(256) # reject large message
jpayne@69 757 if response != WELCOME:
jpayne@69 758 raise AuthenticationError('digest sent was rejected')
jpayne@69 759
jpayne@69 760 #
jpayne@69 761 # Support for using xmlrpclib for serialization
jpayne@69 762 #
jpayne@69 763
jpayne@69 764 class ConnectionWrapper(object):
jpayne@69 765 def __init__(self, conn, dumps, loads):
jpayne@69 766 self._conn = conn
jpayne@69 767 self._dumps = dumps
jpayne@69 768 self._loads = loads
jpayne@69 769 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
jpayne@69 770 obj = getattr(conn, attr)
jpayne@69 771 setattr(self, attr, obj)
jpayne@69 772 def send(self, obj):
jpayne@69 773 s = self._dumps(obj)
jpayne@69 774 self._conn.send_bytes(s)
jpayne@69 775 def recv(self):
jpayne@69 776 s = self._conn.recv_bytes()
jpayne@69 777 return self._loads(s)
jpayne@69 778
jpayne@69 779 def _xml_dumps(obj):
jpayne@69 780 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
jpayne@69 781
jpayne@69 782 def _xml_loads(s):
jpayne@69 783 (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
jpayne@69 784 return obj
jpayne@69 785
jpayne@69 786 class XmlListener(Listener):
jpayne@69 787 def accept(self):
jpayne@69 788 global xmlrpclib
jpayne@69 789 import xmlrpc.client as xmlrpclib
jpayne@69 790 obj = Listener.accept(self)
jpayne@69 791 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
jpayne@69 792
jpayne@69 793 def XmlClient(*args, **kwds):
jpayne@69 794 global xmlrpclib
jpayne@69 795 import xmlrpc.client as xmlrpclib
jpayne@69 796 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
jpayne@69 797
jpayne@69 798 #
jpayne@69 799 # Wait
jpayne@69 800 #
jpayne@69 801
jpayne@69 802 if sys.platform == 'win32':
jpayne@69 803
jpayne@69 804 def _exhaustive_wait(handles, timeout):
jpayne@69 805 # Return ALL handles which are currently signalled. (Only
jpayne@69 806 # returning the first signalled might create starvation issues.)
jpayne@69 807 L = list(handles)
jpayne@69 808 ready = []
jpayne@69 809 while L:
jpayne@69 810 res = _winapi.WaitForMultipleObjects(L, False, timeout)
jpayne@69 811 if res == WAIT_TIMEOUT:
jpayne@69 812 break
jpayne@69 813 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
jpayne@69 814 res -= WAIT_OBJECT_0
jpayne@69 815 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
jpayne@69 816 res -= WAIT_ABANDONED_0
jpayne@69 817 else:
jpayne@69 818 raise RuntimeError('Should not get here')
jpayne@69 819 ready.append(L[res])
jpayne@69 820 L = L[res+1:]
jpayne@69 821 timeout = 0
jpayne@69 822 return ready
jpayne@69 823
jpayne@69 824 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
jpayne@69 825
jpayne@69 826 def wait(object_list, timeout=None):
jpayne@69 827 '''
jpayne@69 828 Wait till an object in object_list is ready/readable.
jpayne@69 829
jpayne@69 830 Returns list of those objects in object_list which are ready/readable.
jpayne@69 831 '''
jpayne@69 832 if timeout is None:
jpayne@69 833 timeout = INFINITE
jpayne@69 834 elif timeout < 0:
jpayne@69 835 timeout = 0
jpayne@69 836 else:
jpayne@69 837 timeout = int(timeout * 1000 + 0.5)
jpayne@69 838
jpayne@69 839 object_list = list(object_list)
jpayne@69 840 waithandle_to_obj = {}
jpayne@69 841 ov_list = []
jpayne@69 842 ready_objects = set()
jpayne@69 843 ready_handles = set()
jpayne@69 844
jpayne@69 845 try:
jpayne@69 846 for o in object_list:
jpayne@69 847 try:
jpayne@69 848 fileno = getattr(o, 'fileno')
jpayne@69 849 except AttributeError:
jpayne@69 850 waithandle_to_obj[o.__index__()] = o
jpayne@69 851 else:
jpayne@69 852 # start an overlapped read of length zero
jpayne@69 853 try:
jpayne@69 854 ov, err = _winapi.ReadFile(fileno(), 0, True)
jpayne@69 855 except OSError as e:
jpayne@69 856 ov, err = None, e.winerror
jpayne@69 857 if err not in _ready_errors:
jpayne@69 858 raise
jpayne@69 859 if err == _winapi.ERROR_IO_PENDING:
jpayne@69 860 ov_list.append(ov)
jpayne@69 861 waithandle_to_obj[ov.event] = o
jpayne@69 862 else:
jpayne@69 863 # If o.fileno() is an overlapped pipe handle and
jpayne@69 864 # err == 0 then there is a zero length message
jpayne@69 865 # in the pipe, but it HAS NOT been consumed...
jpayne@69 866 if ov and sys.getwindowsversion()[:2] >= (6, 2):
jpayne@69 867 # ... except on Windows 8 and later, where
jpayne@69 868 # the message HAS been consumed.
jpayne@69 869 try:
jpayne@69 870 _, err = ov.GetOverlappedResult(False)
jpayne@69 871 except OSError as e:
jpayne@69 872 err = e.winerror
jpayne@69 873 if not err and hasattr(o, '_got_empty_message'):
jpayne@69 874 o._got_empty_message = True
jpayne@69 875 ready_objects.add(o)
jpayne@69 876 timeout = 0
jpayne@69 877
jpayne@69 878 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
jpayne@69 879 finally:
jpayne@69 880 # request that overlapped reads stop
jpayne@69 881 for ov in ov_list:
jpayne@69 882 ov.cancel()
jpayne@69 883
jpayne@69 884 # wait for all overlapped reads to stop
jpayne@69 885 for ov in ov_list:
jpayne@69 886 try:
jpayne@69 887 _, err = ov.GetOverlappedResult(True)
jpayne@69 888 except OSError as e:
jpayne@69 889 err = e.winerror
jpayne@69 890 if err not in _ready_errors:
jpayne@69 891 raise
jpayne@69 892 if err != _winapi.ERROR_OPERATION_ABORTED:
jpayne@69 893 o = waithandle_to_obj[ov.event]
jpayne@69 894 ready_objects.add(o)
jpayne@69 895 if err == 0:
jpayne@69 896 # If o.fileno() is an overlapped pipe handle then
jpayne@69 897 # a zero length message HAS been consumed.
jpayne@69 898 if hasattr(o, '_got_empty_message'):
jpayne@69 899 o._got_empty_message = True
jpayne@69 900
jpayne@69 901 ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
jpayne@69 902 return [o for o in object_list if o in ready_objects]
jpayne@69 903
jpayne@69 904 else:
jpayne@69 905
jpayne@69 906 import selectors
jpayne@69 907
jpayne@69 908 # poll/select have the advantage of not requiring any extra file
jpayne@69 909 # descriptor, contrarily to epoll/kqueue (also, they require a single
jpayne@69 910 # syscall).
jpayne@69 911 if hasattr(selectors, 'PollSelector'):
jpayne@69 912 _WaitSelector = selectors.PollSelector
jpayne@69 913 else:
jpayne@69 914 _WaitSelector = selectors.SelectSelector
jpayne@69 915
jpayne@69 916 def wait(object_list, timeout=None):
jpayne@69 917 '''
jpayne@69 918 Wait till an object in object_list is ready/readable.
jpayne@69 919
jpayne@69 920 Returns list of those objects in object_list which are ready/readable.
jpayne@69 921 '''
jpayne@69 922 with _WaitSelector() as selector:
jpayne@69 923 for obj in object_list:
jpayne@69 924 selector.register(obj, selectors.EVENT_READ)
jpayne@69 925
jpayne@69 926 if timeout is not None:
jpayne@69 927 deadline = time.monotonic() + timeout
jpayne@69 928
jpayne@69 929 while True:
jpayne@69 930 ready = selector.select(timeout)
jpayne@69 931 if ready:
jpayne@69 932 return [key.fileobj for (key, events) in ready]
jpayne@69 933 else:
jpayne@69 934 if timeout is not None:
jpayne@69 935 timeout = deadline - time.monotonic()
jpayne@69 936 if timeout < 0:
jpayne@69 937 return ready
jpayne@69 938
jpayne@69 939 #
jpayne@69 940 # Make connection and socket objects sharable if possible
jpayne@69 941 #
jpayne@69 942
jpayne@69 943 if sys.platform == 'win32':
jpayne@69 944 def reduce_connection(conn):
jpayne@69 945 handle = conn.fileno()
jpayne@69 946 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
jpayne@69 947 from . import resource_sharer
jpayne@69 948 ds = resource_sharer.DupSocket(s)
jpayne@69 949 return rebuild_connection, (ds, conn.readable, conn.writable)
jpayne@69 950 def rebuild_connection(ds, readable, writable):
jpayne@69 951 sock = ds.detach()
jpayne@69 952 return Connection(sock.detach(), readable, writable)
jpayne@69 953 reduction.register(Connection, reduce_connection)
jpayne@69 954
jpayne@69 955 def reduce_pipe_connection(conn):
jpayne@69 956 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
jpayne@69 957 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
jpayne@69 958 dh = reduction.DupHandle(conn.fileno(), access)
jpayne@69 959 return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
jpayne@69 960 def rebuild_pipe_connection(dh, readable, writable):
jpayne@69 961 handle = dh.detach()
jpayne@69 962 return PipeConnection(handle, readable, writable)
jpayne@69 963 reduction.register(PipeConnection, reduce_pipe_connection)
jpayne@69 964
jpayne@69 965 else:
jpayne@69 966 def reduce_connection(conn):
jpayne@69 967 df = reduction.DupFd(conn.fileno())
jpayne@69 968 return rebuild_connection, (df, conn.readable, conn.writable)
jpayne@69 969 def rebuild_connection(df, readable, writable):
jpayne@69 970 fd = df.detach()
jpayne@69 971 return Connection(fd, readable, writable)
jpayne@69 972 reduction.register(Connection, reduce_connection)