annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-io.h @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
jpayne@69 2 // Licensed under the MIT License:
jpayne@69 3 //
jpayne@69 4 // Permission is hereby granted, free of charge, to any person obtaining a copy
jpayne@69 5 // of this software and associated documentation files (the "Software"), to deal
jpayne@69 6 // in the Software without restriction, including without limitation the rights
jpayne@69 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
jpayne@69 8 // copies of the Software, and to permit persons to whom the Software is
jpayne@69 9 // furnished to do so, subject to the following conditions:
jpayne@69 10 //
jpayne@69 11 // The above copyright notice and this permission notice shall be included in
jpayne@69 12 // all copies or substantial portions of the Software.
jpayne@69 13 //
jpayne@69 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
jpayne@69 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
jpayne@69 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
jpayne@69 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
jpayne@69 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
jpayne@69 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
jpayne@69 20 // THE SOFTWARE.
jpayne@69 21
jpayne@69 22 #pragma once
jpayne@69 23
jpayne@69 24 #include "async.h"
jpayne@69 25 #include <kj/function.h>
jpayne@69 26 #include <kj/thread.h>
jpayne@69 27 #include <kj/timer.h>
jpayne@69 28
jpayne@69 29 KJ_BEGIN_HEADER
jpayne@69 30
jpayne@69 31 struct sockaddr;
jpayne@69 32
jpayne@69 33 namespace kj {
jpayne@69 34
jpayne@69 35 #if _WIN32
jpayne@69 36 class Win32EventPort;
jpayne@69 37 class AutoCloseHandle;
jpayne@69 38 #else
jpayne@69 39 class UnixEventPort;
jpayne@69 40 #endif
jpayne@69 41
jpayne@69 42 class AutoCloseFd;
jpayne@69 43 class NetworkAddress;
jpayne@69 44 class AsyncOutputStream;
jpayne@69 45 class AsyncIoStream;
jpayne@69 46 class AncillaryMessage;
jpayne@69 47
jpayne@69 48 class ReadableFile;
jpayne@69 49 class File;
jpayne@69 50
jpayne@69 51 // =======================================================================================
jpayne@69 52 // Streaming I/O
jpayne@69 53
jpayne@69 54 class AsyncInputStream: private AsyncObject {
jpayne@69 55 // Asynchronous equivalent of InputStream (from io.h).
jpayne@69 56
jpayne@69 57 public:
jpayne@69 58 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes);
jpayne@69 59 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
jpayne@69 60
jpayne@69 61 Promise<void> read(void* buffer, size_t bytes);
jpayne@69 62
jpayne@69 63 virtual Maybe<uint64_t> tryGetLength();
jpayne@69 64 // Get the remaining number of bytes that will be produced by this stream, if known.
jpayne@69 65 //
jpayne@69 66 // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
jpayne@69 67 // HTTP implementation may need to fall back to Transfer-Encoding: chunked.
jpayne@69 68 //
jpayne@69 69 // The default implementation always returns null.
jpayne@69 70
jpayne@69 71 virtual Promise<uint64_t> pumpTo(
jpayne@69 72 AsyncOutputStream& output, uint64_t amount = kj::maxValue);
jpayne@69 73 // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
jpayne@69 74 // total bytes actually pumped (which is only less than `amount` if EOF was reached).
jpayne@69 75 //
jpayne@69 76 // Override this if your stream type knows how to pump itself to certain kinds of output
jpayne@69 77 // streams more efficiently than via the naive approach. You can use
jpayne@69 78 // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
jpayne@69 79 // delegate to the default implementation.
jpayne@69 80 //
jpayne@69 81 // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
jpayne@69 82 // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
jpayne@69 83
jpayne@69 84 Promise<Array<byte>> readAllBytes(uint64_t limit = kj::maxValue);
jpayne@69 85 Promise<String> readAllText(uint64_t limit = kj::maxValue);
jpayne@69 86 // Read until EOF and return as one big byte array or string. Throw an exception if EOF is not
jpayne@69 87 // seen before reading `limit` bytes.
jpayne@69 88 //
jpayne@69 89 // To prevent runaway memory allocation, consider using a more conservative value for `limit` than
jpayne@69 90 // the default, particularly on untrusted data streams which may never see EOF.
jpayne@69 91
jpayne@69 92 virtual void registerAncillaryMessageHandler(Function<void(ArrayPtr<AncillaryMessage>)> fn);
jpayne@69 93 // Register interest in checking for ancillary messages (aka control messages) when reading.
jpayne@69 94 // The provided callback will be called whenever any are encountered. The messages passed to
jpayne@69 95 // the function do not live beyond when function returns.
jpayne@69 96 // Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this.
jpayne@69 97
jpayne@69 98 virtual Maybe<Own<AsyncInputStream>> tryTee(uint64_t limit = kj::maxValue);
jpayne@69 99 // Primarily intended as an optimization for the `tee` call. Returns an input stream whose state
jpayne@69 100 // is independent from this one but which will return the exact same set of bytes read going
jpayne@69 101 // forward. limit is a total limit on the amount of memory, in bytes, which a tee implementation
jpayne@69 102 // may use to buffer stream data. An implementation must throw an exception if a read operation
jpayne@69 103 // would cause the limit to be exceeded. If tryTee() can see that the new limit is impossible to
jpayne@69 104 // satisfy, it should return nullptr so that the pessimized path is taken in newTee. This is
jpayne@69 105 // likely to arise if tryTee() is called twice with different limits on the same stream.
jpayne@69 106 };
jpayne@69 107
jpayne@69 108 class AsyncOutputStream: private AsyncObject {
jpayne@69 109 // Asynchronous equivalent of OutputStream (from io.h).
jpayne@69 110
jpayne@69 111 public:
jpayne@69 112 virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0;
jpayne@69 113 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces)
jpayne@69 114 KJ_WARN_UNUSED_RESULT = 0;
jpayne@69 115
jpayne@69 116 virtual Maybe<Promise<uint64_t>> tryPumpFrom(
jpayne@69 117 AsyncInputStream& input, uint64_t amount = kj::maxValue);
jpayne@69 118 // Implements double-dispatch for AsyncInputStream::pumpTo().
jpayne@69 119 //
jpayne@69 120 // This method should only be called from within an implementation of pumpTo().
jpayne@69 121 //
jpayne@69 122 // This method examines the type of `input` to find optimized ways to pump data from it to this
jpayne@69 123 // output stream. If it finds one, it performs the pump. Otherwise, it returns null.
jpayne@69 124 //
jpayne@69 125 // The default implementation always returns null.
jpayne@69 126
jpayne@69 127 virtual Promise<void> whenWriteDisconnected() = 0;
jpayne@69 128 // Returns a promise that resolves when the stream has become disconnected such that new write()s
jpayne@69 129 // will fail with a DISCONNECTED exception. This is particularly useful, for example, to cancel
jpayne@69 130 // work early when it is detected that no one will receive the result.
jpayne@69 131 //
jpayne@69 132 // Note that not all streams are able to detect this condition without actually performing a
jpayne@69 133 // write(); such stream implementations may return a promise that never resolves. (In particular,
jpayne@69 134 // as of this writing, whenWriteDisconnected() is not implemented on Windows. Also, for TCP
jpayne@69 135 // streams, not all disconnects are detectable -- a power or network failure may lead the
jpayne@69 136 // connection to hang forever, or until configured socket options lead to a timeout.)
jpayne@69 137 //
jpayne@69 138 // Unlike most other asynchronous stream methods, it is safe to call whenWriteDisconnected()
jpayne@69 139 // multiple times without canceling the previous promises.
jpayne@69 140 };
jpayne@69 141
jpayne@69 142 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
jpayne@69 143 // A combination input and output stream.
jpayne@69 144
jpayne@69 145 public:
jpayne@69 146 virtual void shutdownWrite() = 0;
jpayne@69 147 // Cleanly shut down just the write end of the stream, while keeping the read end open.
jpayne@69 148
jpayne@69 149 virtual void abortRead() {}
jpayne@69 150 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
jpayne@69 151 // be called when an error has occurred.
jpayne@69 152
jpayne@69 153 virtual void getsockopt(int level, int option, void* value, uint* length);
jpayne@69 154 virtual void setsockopt(int level, int option, const void* value, uint length);
jpayne@69 155 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
jpayne@69 156 // if the stream is not a socket or the option is not appropriate for the socket type. The
jpayne@69 157 // default implementations always throw "unimplemented".
jpayne@69 158
jpayne@69 159 virtual void getsockname(struct sockaddr* addr, uint* length);
jpayne@69 160 virtual void getpeername(struct sockaddr* addr, uint* length);
jpayne@69 161 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
jpayne@69 162 // exception if the stream is not a socket. The default implementations always throw
jpayne@69 163 // "unimplemented".
jpayne@69 164 //
jpayne@69 165 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
jpayne@69 166 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
jpayne@69 167 // ephemeral addresses for a single connection.
jpayne@69 168
jpayne@69 169 virtual kj::Maybe<int> getFd() const { return nullptr; }
jpayne@69 170 // Get the underlying Unix file descriptor, if any. Returns nullptr if this object actually
jpayne@69 171 // isn't wrapping a file descriptor.
jpayne@69 172 };
jpayne@69 173
jpayne@69 174 Promise<uint64_t> unoptimizedPumpTo(
jpayne@69 175 AsyncInputStream& input, AsyncOutputStream& output, uint64_t amount,
jpayne@69 176 uint64_t completedSoFar = 0);
jpayne@69 177 // Performs a pump using read() and write(), without calling the stream's pumpTo() nor
jpayne@69 178 // tryPumpFrom() methods. This is intended to be used as a fallback by implementations of pumpTo()
jpayne@69 179 // and tryPumpFrom() when they want to give up on optimization, but can't just call pumpTo() again
jpayne@69 180 // because this would recursively retry the optimization. unoptimizedPumpTo() should only be called
jpayne@69 181 // inside implementations of streams, never by the caller of a stream -- use the pumpTo() method
jpayne@69 182 // instead.
jpayne@69 183 //
jpayne@69 184 // `completedSoFar` is the number of bytes out of `amount` that have already been pumped. This is
jpayne@69 185 // provided for convenience for cases where the caller has already done some pumping before they
jpayne@69 186 // give up. Otherwise, a `.then()` would need to be used to add the bytes to the final result.
jpayne@69 187
jpayne@69 188 class AsyncCapabilityStream: public AsyncIoStream {
jpayne@69 189 // An AsyncIoStream that also allows transmitting new stream objects and file descriptors
jpayne@69 190 // (capabilities, in the object-capability model sense), in addition to bytes.
jpayne@69 191 //
jpayne@69 192 // Capabilities can be attached to bytes when they are written. On the receiving end, the read()
jpayne@69 193 // that receives the first byte of such a message will also receive the capabilities.
jpayne@69 194 //
jpayne@69 195 // Note that AsyncIoStream's regular byte-oriented methods can be used on AsyncCapabilityStream,
jpayne@69 196 // with the effect of silently dropping any capabilities attached to the respective bytes. E.g.
jpayne@69 197 // using `AsyncIoStream::tryRead()` to read bytes that had been sent with `writeWithFds()` will
jpayne@69 198 // silently drop the FDs (closing them if appropriate). Also note that pumping a stream with
jpayne@69 199 // `pumpTo()` always drops all capabilities attached to the pumped data. (TODO(someday): Do we
jpayne@69 200 // want a version of pumpTo() that preserves capabilities?)
jpayne@69 201 //
jpayne@69 202 // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor
jpayne@69 203 // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally
jpayne@69 204 // read()s when it should have called receiveStream(), it will observe a NUL byte in the data
jpayne@69 205 // and the capability will be discarded. Of course, an application should not depend on this
jpayne@69 206 // behavior; it should avoid read()ing through a capability.
jpayne@69 207 //
jpayne@69 208 // KJ does not provide any inter-process implementation of this type on Windows, as there's no
jpayne@69 209 // obvious implementation there. Handle passing on Windows requires at least one of the processes
jpayne@69 210 // involved to have permission to modify the other's handle table, which is effectively full
jpayne@69 211 // control. Handle passing between mutually non-trusting processes would require a trusted
jpayne@69 212 // broker process to facilitate. One could possibly implement this type in terms of such a
jpayne@69 213 // broker, or in terms of direct handle passing if at least one process trusts the other.
jpayne@69 214
jpayne@69 215 public:
jpayne@69 216 virtual Promise<void> writeWithFds(ArrayPtr<const byte> data,
jpayne@69 217 ArrayPtr<const ArrayPtr<const byte>> moreData,
jpayne@69 218 ArrayPtr<const int> fds) = 0;
jpayne@69 219 Promise<void> writeWithFds(ArrayPtr<const byte> data,
jpayne@69 220 ArrayPtr<const ArrayPtr<const byte>> moreData,
jpayne@69 221 ArrayPtr<const AutoCloseFd> fds);
jpayne@69 222 // Write some data to the stream with some file descriptors attached to it.
jpayne@69 223 //
jpayne@69 224 // The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed
jpayne@69 225 // limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is
jpayne@69 226 // probably a bad idea.
jpayne@69 227
jpayne@69 228 struct ReadResult {
jpayne@69 229 size_t byteCount;
jpayne@69 230 size_t capCount;
jpayne@69 231 };
jpayne@69 232
jpayne@69 233 virtual Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
jpayne@69 234 AutoCloseFd* fdBuffer, size_t maxFds) = 0;
jpayne@69 235 // Read data from the stream that may have file descriptors attached. Any attached descriptors
jpayne@69 236 // will be placed in `fdBuffer`. If multiple bundles of FDs are encountered in the course of
jpayne@69 237 // reading the amount of data requested by minBytes/maxBytes, then they will be concatenated. If
jpayne@69 238 // more FDs are received than fit in the buffer, then the excess will be discarded and closed --
jpayne@69 239 // this behavior, while ugly, is important to defend against denial-of-service attacks that may
jpayne@69 240 // fill up the FD table with garbage. Applications must think carefully about how many FDs they
jpayne@69 241 // really need to receive at once and set a well-defined limit.
jpayne@69 242
jpayne@69 243 virtual Promise<void> writeWithStreams(ArrayPtr<const byte> data,
jpayne@69 244 ArrayPtr<const ArrayPtr<const byte>> moreData,
jpayne@69 245 Array<Own<AsyncCapabilityStream>> streams) = 0;
jpayne@69 246 virtual Promise<ReadResult> tryReadWithStreams(
jpayne@69 247 void* buffer, size_t minBytes, size_t maxBytes,
jpayne@69 248 Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) = 0;
jpayne@69 249 // Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from
jpayne@69 250 // the same AsyncIoProvider.
jpayne@69 251
jpayne@69 252 // ---------------------------------------------------------------------------
jpayne@69 253 // Helpers for sending individual capabilities.
jpayne@69 254 //
jpayne@69 255 // These are equivalent to the above methods with the constraint that only one FD is
jpayne@69 256 // sent/received at a time and the corresponding data is a single zero-valued byte.
jpayne@69 257
jpayne@69 258 Promise<Own<AsyncCapabilityStream>> receiveStream();
jpayne@69 259 Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream();
jpayne@69 260 Promise<void> sendStream(Own<AsyncCapabilityStream> stream);
jpayne@69 261 // Transfer a single stream.
jpayne@69 262
jpayne@69 263 Promise<AutoCloseFd> receiveFd();
jpayne@69 264 Promise<Maybe<AutoCloseFd>> tryReceiveFd();
jpayne@69 265 Promise<void> sendFd(int fd);
jpayne@69 266 // Transfer a single raw file descriptor.
jpayne@69 267 };
jpayne@69 268
jpayne@69 269 struct OneWayPipe {
jpayne@69 270 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.)
jpayne@69 271
jpayne@69 272 Own<AsyncInputStream> in;
jpayne@69 273 Own<AsyncOutputStream> out;
jpayne@69 274 };
jpayne@69 275
jpayne@69 276 OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr);
jpayne@69 277 // Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits
jpayne@69 278 // until both a read() and a write() call are pending, then resolves both.
jpayne@69 279 //
jpayne@69 280 // If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many
jpayne@69 281 // bytes. The input end's `tryGetLength()` will return the number of bytes left.
jpayne@69 282
jpayne@69 283 struct TwoWayPipe {
jpayne@69 284 // A data pipe that supports sending in both directions. Each end's output sends data to the
jpayne@69 285 // other end's input. (Typically backed by socketpair() system call.)
jpayne@69 286
jpayne@69 287 Own<AsyncIoStream> ends[2];
jpayne@69 288 };
jpayne@69 289
jpayne@69 290 TwoWayPipe newTwoWayPipe();
jpayne@69 291 // Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits
jpayne@69 292 // until both a read() and a write() call are pending, then resolves both.
jpayne@69 293
jpayne@69 294 struct CapabilityPipe {
jpayne@69 295 // Like TwoWayPipe but allowing capability-passing.
jpayne@69 296
jpayne@69 297 Own<AsyncCapabilityStream> ends[2];
jpayne@69 298 };
jpayne@69 299
jpayne@69 300 CapabilityPipe newCapabilityPipe();
jpayne@69 301 // Like newTwoWayPipe() but creates a capability pipe.
jpayne@69 302 //
jpayne@69 303 // The requirement of `writeWithStreams()` that "The stream implementations must be from the same
jpayne@69 304 // AsyncIoProvider." does not apply to this pipe; any kind of AsyncCapabilityStream implementation
jpayne@69 305 // is supported.
jpayne@69 306 //
jpayne@69 307 // This implementation does not know how to convert streams to FDs or vice versa; if you write FDs
jpayne@69 308 // you must read FDs, and if you write streams you must read streams.
jpayne@69 309
jpayne@69 310 struct Tee {
jpayne@69 311 // Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream.
jpayne@69 312
jpayne@69 313 Own<AsyncInputStream> branches[2];
jpayne@69 314 };
jpayne@69 315
jpayne@69 316 Tee newTee(Own<AsyncInputStream> input, uint64_t limit = kj::maxValue);
jpayne@69 317 // Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is
jpayne@69 318 // called on one of the two input ends. If a read or pump operation is subsequently called on the
jpayne@69 319 // other input end, the buffered data is consumed.
jpayne@69 320 //
jpayne@69 321 // `pumpTo()` operations on the input ends will proactively read from the inner stream and block
jpayne@69 322 // while writing to the output stream. While one branch has an active `pumpTo()` operation, any
jpayne@69 323 // `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the
jpayne@69 324 // pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if
jpayne@69 325 // there are `pumpTo()` operations active on both branches, the greater of the two backpressures is
jpayne@69 326 // respected -- the two pumps progress in lockstep, and there is no buffering.
jpayne@69 327 //
jpayne@69 328 // At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would
jpayne@69 329 // grow beyond the limit, an exception is generated, which both branches see once they have
jpayne@69 330 // exhausted their buffers.
jpayne@69 331 //
jpayne@69 332 // It is recommended that you use a more conservative value for `limit` than the default.
jpayne@69 333
jpayne@69 334 Own<AsyncOutputStream> newPromisedStream(Promise<Own<AsyncOutputStream>> promise);
jpayne@69 335 Own<AsyncIoStream> newPromisedStream(Promise<Own<AsyncIoStream>> promise);
jpayne@69 336 // Constructs an Async*Stream which waits for a promise to resolve, then forwards all calls to the
jpayne@69 337 // promised stream.
jpayne@69 338
jpayne@69 339 // =======================================================================================
jpayne@69 340 // Authenticated streams
jpayne@69 341
jpayne@69 342 class PeerIdentity {
jpayne@69 343 // PeerIdentity provides information about a connecting client. Various subclasses exist to
jpayne@69 344 // address different network types.
jpayne@69 345 public:
jpayne@69 346 virtual kj::String toString() = 0;
jpayne@69 347 // Returns a human-readable string identifying the peer. Where possible, this string will be
jpayne@69 348 // in the same format as the addresses you could pass to `kj::Network::parseAddress()`. However,
jpayne@69 349 // only certain subclasses of `PeerIdentity` guarantee this property.
jpayne@69 350 };
jpayne@69 351
jpayne@69 352 struct AuthenticatedStream {
jpayne@69 353 // A pair of an `AsyncIoStream` and a `PeerIdentity`. This is used as the return type of
jpayne@69 354 // `NetworkAddress::connectAuthenticated()` and `ConnectionReceiver::acceptAuthenticated()`.
jpayne@69 355
jpayne@69 356 Own<AsyncIoStream> stream;
jpayne@69 357 // The byte stream.
jpayne@69 358
jpayne@69 359 Own<PeerIdentity> peerIdentity;
jpayne@69 360 // An object indicating who is at the other end of the stream.
jpayne@69 361 //
jpayne@69 362 // Different subclasses of `PeerIdentity` are used in different situations:
jpayne@69 363 // - TCP connections will use NetworkPeerIdentity, which gives the network address of the client.
jpayne@69 364 // - Local (unix) socket connections will use LocalPeerIdentity, which identifies the UID
jpayne@69 365 // and PID of the process that initiated the connection.
jpayne@69 366 // - TLS connections will use TlsPeerIdentity which provides details of the client certificate,
jpayne@69 367 // if any was provided.
jpayne@69 368 // - When no meaningful peer identity can be provided, `UnknownPeerIdentity` is returned.
jpayne@69 369 //
jpayne@69 370 // Implementations of `Network`, `ConnectionReceiver`, `NetworkAddress`, etc. should document the
jpayne@69 371 // specific assumptions the caller can make about the type of `PeerIdentity`s used, allowing for
jpayne@69 372 // identities to be statically downcast if the right conditions are met. In the absence of
jpayne@69 373 // documented promises, RTTI may be needed to query the type.
jpayne@69 374 };
jpayne@69 375
jpayne@69 376 class NetworkPeerIdentity: public PeerIdentity {
jpayne@69 377 // PeerIdentity used for network protocols like TCP/IP. This identifies the remote peer.
jpayne@69 378 //
jpayne@69 379 // This is only "authenticated" to the extent that we know data written to the stream will be
jpayne@69 380 // routed to the given address. This does not preclude the possibility of man-in-the-middle
jpayne@69 381 // attacks by attackers who are able to manipulate traffic along the route.
jpayne@69 382 public:
jpayne@69 383 virtual NetworkAddress& getAddress() = 0;
jpayne@69 384 // Obtain the peer's address as a NetworkAddress object. The returned reference's lifetime is the
jpayne@69 385 // same as the `NetworkPeerIdentity`, but you can always call `clone()` on it to get a copy that
jpayne@69 386 // lives longer.
jpayne@69 387
jpayne@69 388 static kj::Own<NetworkPeerIdentity> newInstance(kj::Own<NetworkAddress> addr);
jpayne@69 389 // Construct an instance of this interface wrapping the given address.
jpayne@69 390 };
jpayne@69 391
jpayne@69 392 class LocalPeerIdentity: public PeerIdentity {
jpayne@69 393 // PeerIdentity used for connections between processes on the local machine -- in particular,
jpayne@69 394 // Unix sockets.
jpayne@69 395 //
jpayne@69 396 // (This interface probably isn't useful on Windows.)
jpayne@69 397 public:
jpayne@69 398 struct Credentials {
jpayne@69 399 kj::Maybe<int> pid;
jpayne@69 400 kj::Maybe<uint> uid;
jpayne@69 401
jpayne@69 402 // We don't cover groups at present because some systems produce a list of groups while others
jpayne@69 403 // only provide the peer's main group, the latter being pretty useless.
jpayne@69 404 };
jpayne@69 405
jpayne@69 406 virtual Credentials getCredentials() = 0;
jpayne@69 407 // Get the PID and UID of the peer process, if possible.
jpayne@69 408 //
jpayne@69 409 // Either ID may be null if the peer could not be identified. Some operating systems do not
jpayne@69 410 // support retrieving these credentials, or can only provide one or the other. Some situations
jpayne@69 411 // (like user and PID namespaces on Linux) may also make it impossible to represent the peer's
jpayne@69 412 // credentials accurately.
jpayne@69 413 //
jpayne@69 414 // Note the meaning here can be subtle. Multiple processes can potentially have the socket in
jpayne@69 415 // their file descriptor tables. The identified process is the one who called `connect()` or
jpayne@69 416 // `listen()`.
jpayne@69 417 //
jpayne@69 418 // On Linux this is implemented with SO_PEERCRED.
jpayne@69 419
jpayne@69 420 static kj::Own<LocalPeerIdentity> newInstance(Credentials creds);
jpayne@69 421 // Construct an instance of this interface wrapping the given credentials.
jpayne@69 422 };
jpayne@69 423
jpayne@69 424 class UnknownPeerIdentity: public PeerIdentity {
jpayne@69 425 public:
jpayne@69 426 static kj::Own<UnknownPeerIdentity> newInstance();
jpayne@69 427 // Get an instance of this interface. This actually always returns the same instance with no
jpayne@69 428 // memory allocation.
jpayne@69 429 };
jpayne@69 430
jpayne@69 431 // =======================================================================================
jpayne@69 432 // Accepting connections
jpayne@69 433
jpayne@69 434 class ConnectionReceiver: private AsyncObject {
jpayne@69 435 // Represents a server socket listening on a port.
jpayne@69 436
jpayne@69 437 public:
jpayne@69 438 virtual Promise<Own<AsyncIoStream>> accept() = 0;
jpayne@69 439 // Accept the next incoming connection.
jpayne@69 440
jpayne@69 441 virtual Promise<AuthenticatedStream> acceptAuthenticated();
jpayne@69 442 // Accept the next incoming connection, and also provide a PeerIdentity with any information
jpayne@69 443 // about the client.
jpayne@69 444 //
jpayne@69 445 // For backwards-compatibility, the default implementation of this method calls `accept()` and
jpayne@69 446 // then adds `UnknownPeerIdentity`.
jpayne@69 447
jpayne@69 448 virtual uint getPort() = 0;
jpayne@69 449 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
jpayne@69 450 // specify a port when constructing the NetworkAddress -- one will have been assigned
jpayne@69 451 // automatically.
jpayne@69 452
jpayne@69 453 virtual void getsockopt(int level, int option, void* value, uint* length);
jpayne@69 454 virtual void setsockopt(int level, int option, const void* value, uint length);
jpayne@69 455 virtual void getsockname(struct sockaddr* addr, uint* length);
jpayne@69 456 // Same as the methods of AsyncIoStream.
jpayne@69 457 };
jpayne@69 458
jpayne@69 459 Own<ConnectionReceiver> newAggregateConnectionReceiver(Array<Own<ConnectionReceiver>> receivers);
jpayne@69 460 // Create a ConnectionReceiver that listens on several other ConnectionReceivers and returns
jpayne@69 461 // sockets from any of them.
jpayne@69 462
jpayne@69 463 // =======================================================================================
jpayne@69 464 // Datagram I/O
jpayne@69 465
jpayne@69 466 class AncillaryMessage {
jpayne@69 467 // Represents an ancillary message (aka control message) received using the recvmsg() system
jpayne@69 468 // call (or equivalent). Most apps will not use this.
jpayne@69 469
jpayne@69 470 public:
jpayne@69 471 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
jpayne@69 472 AncillaryMessage() = default;
jpayne@69 473
jpayne@69 474 inline int getLevel() const;
jpayne@69 475 // Originating protocol / socket level.
jpayne@69 476
jpayne@69 477 inline int getType() const;
jpayne@69 478 // Protocol-specific message type.
jpayne@69 479
jpayne@69 480 template <typename T>
jpayne@69 481 inline Maybe<const T&> as() const;
jpayne@69 482 // Interpret the ancillary message as the given struct type. Most ancillary messages are some
jpayne@69 483 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
jpayne@69 484 // is smaller than the struct -- this can happen if the message was truncated due to
jpayne@69 485 // insufficient ancillary buffer space.
jpayne@69 486
jpayne@69 487 template <typename T>
jpayne@69 488 inline ArrayPtr<const T> asArray() const;
jpayne@69 489 // Interpret the ancillary message as an array of items. If the message size does not evenly
jpayne@69 490 // divide into elements of type T, the remainder is discarded -- this can happen if the message
jpayne@69 491 // was truncated due to insufficient ancillary buffer space.
jpayne@69 492
jpayne@69 493 private:
jpayne@69 494 int level;
jpayne@69 495 int type;
jpayne@69 496 ArrayPtr<const byte> data;
jpayne@69 497 // Message data. In most cases you should use `as()` or `asArray()`.
jpayne@69 498 };
jpayne@69 499
jpayne@69 500 class DatagramReceiver {
jpayne@69 501 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
jpayne@69 502 // capacity in advance; if a received packet is larger than the capacity, it will be truncated.
jpayne@69 503
jpayne@69 504 public:
jpayne@69 505 virtual Promise<void> receive() = 0;
jpayne@69 506 // Receive a new message, overwriting this object's content.
jpayne@69 507 //
jpayne@69 508 // receive() may reuse the same buffers for content and ancillary data with each call.
jpayne@69 509
jpayne@69 510 template <typename T>
jpayne@69 511 struct MaybeTruncated {
jpayne@69 512 T value;
jpayne@69 513
jpayne@69 514 bool isTruncated;
jpayne@69 515 // True if the Receiver's capacity was insufficient to receive the value and therefore the
jpayne@69 516 // value is truncated.
jpayne@69 517 };
jpayne@69 518
jpayne@69 519 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
jpayne@69 520 // Get the content of the datagram.
jpayne@69 521
jpayne@69 522 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
jpayne@69 523 // Ancillary messages received with the datagram. See the recvmsg() system call and the cmsghdr
jpayne@69 524 // struct. Most apps don't need this.
jpayne@69 525 //
jpayne@69 526 // If the returned value is truncated, then the last message in the array may itself be
jpayne@69 527 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
jpayne@69 528 // return fewer elements than expected. Truncation can also mean that additional messages were
jpayne@69 529 // available but discarded.
jpayne@69 530
jpayne@69 531 virtual NetworkAddress& getSource() = 0;
jpayne@69 532 // Get the datagram sender's address.
jpayne@69 533
jpayne@69 534 struct Capacity {
jpayne@69 535 size_t content = 8192;
jpayne@69 536 // How much space to allocate for the datagram content. If a datagram is received that is
jpayne@69 537 // larger than this, it will be truncated, with no way to recover the tail.
jpayne@69 538
jpayne@69 539 size_t ancillary = 0;
jpayne@69 540 // How much space to allocate for ancillary messages. As with content, if the ancillary data
jpayne@69 541 // is larger than this, it will be truncated.
jpayne@69 542 };
jpayne@69 543 };
jpayne@69 544
jpayne@69 545 class DatagramPort {
jpayne@69 546 public:
jpayne@69 547 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
jpayne@69 548 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
jpayne@69 549 NetworkAddress& destination) = 0;
jpayne@69 550
jpayne@69 551 virtual Own<DatagramReceiver> makeReceiver(
jpayne@69 552 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
jpayne@69 553 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
jpayne@69 554 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
jpayne@69 555
jpayne@69 556 virtual uint getPort() = 0;
jpayne@69 557 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
jpayne@69 558 // specify a port when constructing the NetworkAddress -- one will have been assigned
jpayne@69 559 // automatically.
jpayne@69 560
jpayne@69 561 virtual void getsockopt(int level, int option, void* value, uint* length);
jpayne@69 562 virtual void setsockopt(int level, int option, const void* value, uint length);
jpayne@69 563 // Same as the methods of AsyncIoStream.
jpayne@69 564 };
jpayne@69 565
jpayne@69 566 // =======================================================================================
jpayne@69 567 // Networks
jpayne@69 568
jpayne@69 569 class NetworkAddress: private AsyncObject {
jpayne@69 570 // Represents a remote address to which the application can connect.
jpayne@69 571
jpayne@69 572 public:
jpayne@69 573 virtual Promise<Own<AsyncIoStream>> connect() = 0;
jpayne@69 574 // Make a new connection to this address.
jpayne@69 575 //
jpayne@69 576 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number.
jpayne@69 577
jpayne@69 578 virtual Promise<AuthenticatedStream> connectAuthenticated();
jpayne@69 579 // Connect to the address and return both the connection and information about the peer identity.
jpayne@69 580 // This is especially useful when using TLS, to get certificate details.
jpayne@69 581 //
jpayne@69 582 // For backwards-compatibility, the default implementation of this method calls `connect()` and
jpayne@69 583 // then uses a `NetworkPeerIdentity` wrapping a clone of this `NetworkAddress` -- which is not
jpayne@69 584 // particularly useful.
jpayne@69 585
jpayne@69 586 virtual Own<ConnectionReceiver> listen() = 0;
jpayne@69 587 // Listen for incoming connections on this address.
jpayne@69 588 //
jpayne@69 589 // The address must be local.
jpayne@69 590
jpayne@69 591 virtual Own<DatagramPort> bindDatagramPort();
jpayne@69 592 // Open this address as a datagram (e.g. UDP) port.
jpayne@69 593 //
jpayne@69 594 // The address must be local.
jpayne@69 595
jpayne@69 596 virtual Own<NetworkAddress> clone() = 0;
jpayne@69 597 // Returns an equivalent copy of this NetworkAddress.
jpayne@69 598
jpayne@69 599 virtual String toString() = 0;
jpayne@69 600 // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
jpayne@69 601 // to reproduce this address, although whether or not that works of course depends on the Network
jpayne@69 602 // implementation. This should be called only to display the address to human users, who will
jpayne@69 603 // hopefully know what they are able to do with it.
jpayne@69 604 };
jpayne@69 605
jpayne@69 606 class Network {
jpayne@69 607 // Factory for NetworkAddress instances, representing the network services offered by the
jpayne@69 608 // operating system.
jpayne@69 609 //
jpayne@69 610 // This interface typically represents broad authority, and well-designed code should limit its
jpayne@69 611 // use to high-level startup code and user interaction. Low-level APIs should accept
jpayne@69 612 // NetworkAddress instances directly and work from there, if at all possible.
jpayne@69 613
jpayne@69 614 public:
jpayne@69 615 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
jpayne@69 616 // Construct a network address from a user-provided string. The format of the address
jpayne@69 617 // strings is not specified at the API level, and application code should make no assumptions
jpayne@69 618 // about them. These strings should always be provided by humans, and said humans will know
jpayne@69 619 // what format to use in their particular context.
jpayne@69 620 //
jpayne@69 621 // `portHint`, if provided, specifies the "standard" IP port number for the application-level
jpayne@69 622 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a
jpayne@69 623 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is
jpayne@69 624 // omitted, then the returned address will only support listen() and bindDatagramPort()
jpayne@69 625 // (not connect()), and an unused port will be chosen each time one of those methods is called.
jpayne@69 626
jpayne@69 627 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
jpayne@69 628 // Construct a network address from a legacy struct sockaddr.
jpayne@69 629
jpayne@69 630 virtual Own<Network> restrictPeers(
jpayne@69 631 kj::ArrayPtr<const kj::StringPtr> allow,
jpayne@69 632 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0;
jpayne@69 633 // Constructs a new Network instance wrapping this one which restricts which peer addresses are
jpayne@69 634 // permitted (both for outgoing and incoming connections).
jpayne@69 635 //
jpayne@69 636 // Communication will be allowed only with peers whose addresses match one of the patterns
jpayne@69 637 // specified in the `allow` array. If a `deny` array is specified, then any address which matches
jpayne@69 638 // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be
jpayne@69 639 // denied.
jpayne@69 640 //
jpayne@69 641 // The syntax of address patterns depends on the network, except that three special patterns are
jpayne@69 642 // defined for all networks:
jpayne@69 643 // - "private": Matches network addresses that are reserved by standards for private networks,
jpayne@69 644 // such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local".
jpayne@69 645 // - "public": Opposite of "private".
jpayne@69 646 // - "local": Matches network addresses that are defined by standards to only be accessible from
jpayne@69 647 // the local machine, such as "127.0.0.0/8" or Unix domain addresses.
jpayne@69 648 // - "network": Opposite of "local".
jpayne@69 649 //
jpayne@69 650 // For the standard KJ network implementation, the following patterns are also recognized:
jpayne@69 651 // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or
jpayne@69 652 // "2001:db8::/32".
jpayne@69 653 // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a
jpayne@69 654 // glob.)
jpayne@69 655 // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may
jpayne@69 656 // support specifying a glob.)
jpayne@69 657 //
jpayne@69 658 // Network restrictions apply *after* DNS resolution (otherwise they'd be useless).
jpayne@69 659 //
jpayne@69 660 // It is legal to parseAddress() a restricted address. An exception won't be thrown until
jpayne@69 661 // connect() is called.
jpayne@69 662 //
jpayne@69 663 // It's possible to listen() on a restricted address. However, connections will only be accepted
jpayne@69 664 // from non-restricted addresses; others will be dropped. If a particular listen address has no
jpayne@69 665 // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then
jpayne@69 666 // listen() may throw (or may simply never receive any connections).
jpayne@69 667 //
jpayne@69 668 // Examples:
jpayne@69 669 //
jpayne@69 670 // auto restricted = network->restrictPeers({"public"});
jpayne@69 671 //
jpayne@69 672 // Allows connections only to/from public internet addresses. Use this when connecting to an
jpayne@69 673 // address specified by a third party that is not trusted and is not themselves already on your
jpayne@69 674 // private network.
jpayne@69 675 //
jpayne@69 676 // auto restricted = network->restrictPeers({"private"});
jpayne@69 677 //
jpayne@69 678 // Allows connections only to/from the private network. Use this on the server side to reject
jpayne@69 679 // connections from the public internet.
jpayne@69 680 //
jpayne@69 681 // auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"});
jpayne@69 682 //
jpayne@69 683 // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked.
jpayne@69 684 //
jpayne@69 685 // auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"});
jpayne@69 686 //
jpayne@69 687 // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an
jpayne@69 688 // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow
jpayne@69 689 // rule that is more specific than the deny rule).
jpayne@69 690 };
jpayne@69 691
jpayne@69 692 // =======================================================================================
jpayne@69 693 // I/O Provider
jpayne@69 694
jpayne@69 695 class AsyncIoProvider {
jpayne@69 696 // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
jpayne@69 697 //
jpayne@69 698 // Generally, the implementation of this interface must integrate closely with a particular
jpayne@69 699 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide
jpayne@69 700 // an AsyncIoProvider.
jpayne@69 701
jpayne@69 702 public:
jpayne@69 703 virtual OneWayPipe newOneWayPipe() = 0;
jpayne@69 704 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
jpayne@69 705 // the pipe(2) system call).
jpayne@69 706
jpayne@69 707 virtual TwoWayPipe newTwoWayPipe() = 0;
jpayne@69 708 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
jpayne@69 709 // socketpair(2) system call). Data written to one end can be read from the other.
jpayne@69 710
jpayne@69 711 virtual CapabilityPipe newCapabilityPipe();
jpayne@69 712 // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe.
jpayne@69 713 //
jpayne@69 714 // The default implementation throws an unimplemented exception. In particular this is not
jpayne@69 715 // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to
jpayne@69 716 // pass handles over a stream.
jpayne@69 717
jpayne@69 718 virtual Network& getNetwork() = 0;
jpayne@69 719 // Creates a new `Network` instance representing the networks exposed by the operating system.
jpayne@69 720 //
jpayne@69 721 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If
jpayne@69 722 // you call this from low-level code, then you are preventing higher-level code from injecting an
jpayne@69 723 // alternative implementation. Instead, if your code needs to use network functionality, it
jpayne@69 724 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
jpayne@69 725 // chose what implementation to use. The system network is essentially a singleton. See:
jpayne@69 726 // http://www.object-oriented-security.org/lets-argue/singletons
jpayne@69 727 //
jpayne@69 728 // Code that uses the system network should not make any assumptions about what kinds of
jpayne@69 729 // addresses it will parse, as this could differ across platforms. String addresses should come
jpayne@69 730 // strictly from the user, who will know how to write them correctly for their system.
jpayne@69 731 //
jpayne@69 732 // With that said, KJ currently supports the following string address formats:
jpayne@69 733 // - IPv4: "1.2.3.4", "1.2.3.4:80"
jpayne@69 734 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
jpayne@69 735 // - Local IP wildcard (covers both v4 and v6): "*", "*:80"
jpayne@69 736 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
jpayne@69 737 // - Unix domain: "unix:/path/to/socket"
jpayne@69 738
jpayne@69 739 struct PipeThread {
jpayne@69 740 // A combination of a thread and a two-way pipe that communicates with that thread.
jpayne@69 741 //
jpayne@69 742 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
jpayne@69 743 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread
jpayne@69 744 // arranges to exit when it detects disconnect, destruction should be clean.
jpayne@69 745
jpayne@69 746 Own<Thread> thread;
jpayne@69 747 Own<AsyncIoStream> pipe;
jpayne@69 748 };
jpayne@69 749
jpayne@69 750 virtual PipeThread newPipeThread(
jpayne@69 751 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
jpayne@69 752 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
jpayne@69 753 // with it. One end of the pipe is passed to the thread's start function and the other end of
jpayne@69 754 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will
jpayne@69 755 // already have an active `EventLoop` when `startFunc` is called.
jpayne@69 756 //
jpayne@69 757 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too
jpayne@69 758 // much at once but I'm not sure how to cleanly break it down.
jpayne@69 759
jpayne@69 760 virtual Timer& getTimer() = 0;
jpayne@69 761 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
jpayne@69 762 // it only updates when the event loop polls for system events. This means that calling `now()`
jpayne@69 763 // on this timer does not require a system call.
jpayne@69 764 //
jpayne@69 765 // This timer is not affected by changes to the system date. It is unspecified whether the timer
jpayne@69 766 // continues to count while the system is suspended.
jpayne@69 767 };
jpayne@69 768
jpayne@69 769 class LowLevelAsyncIoProvider {
jpayne@69 770 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
jpayne@69 771 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface
jpayne@69 772 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
jpayne@69 773 //
jpayne@69 774 // On Unix, this interface can be used to import native file descriptors into the async framework.
jpayne@69 775 // Different implementations of this interface might work on top of different event handling
jpayne@69 776 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
jpayne@69 777 //
jpayne@69 778 // On Windows, this interface can be used to import native SOCKETs into the async framework.
jpayne@69 779 // Different implementations of this interface might work on top of different event handling
jpayne@69 780 // primitives, such as I/O completion ports vs. completion routines.
jpayne@69 781
jpayne@69 782 public:
jpayne@69 783 enum Flags {
jpayne@69 784 // Flags controlling how to wrap a file descriptor.
jpayne@69 785
jpayne@69 786 TAKE_OWNERSHIP = 1 << 0,
jpayne@69 787 // The returned object should own the file descriptor, automatically closing it when destroyed.
jpayne@69 788 // The close-on-exec flag will be set on the descriptor if it is not already.
jpayne@69 789 //
jpayne@69 790 // If this flag is not used, then the file descriptor is not automatically closed and the
jpayne@69 791 // close-on-exec flag is not modified.
jpayne@69 792
jpayne@69 793 #if !_WIN32
jpayne@69 794 ALREADY_CLOEXEC = 1 << 1,
jpayne@69 795 // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
jpayne@69 796 // Only relevant when combined with TAKE_OWNERSHIP.
jpayne@69 797 //
jpayne@69 798 // On Linux, all system calls which yield new file descriptors have flags or variants which
jpayne@69 799 // set the close-on-exec flag immediately. Unfortunately, other OS's do not.
jpayne@69 800
jpayne@69 801 ALREADY_NONBLOCK = 1 << 2
jpayne@69 802 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
jpayne@69 803 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode
jpayne@69 804 // automatically.
jpayne@69 805 //
jpayne@69 806 // On Linux, all system calls which yield new file descriptors have flags or variants which
jpayne@69 807 // enable non-blocking mode immediately. Unfortunately, other OS's do not.
jpayne@69 808 #endif
jpayne@69 809 };
jpayne@69 810
jpayne@69 811 #if _WIN32
jpayne@69 812 typedef uintptr_t Fd;
jpayne@69 813 typedef AutoCloseHandle OwnFd;
jpayne@69 814 // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the
jpayne@69 815 // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify
jpayne@69 816 // explicitly).
jpayne@69 817 #else
jpayne@69 818 typedef int Fd;
jpayne@69 819 typedef AutoCloseFd OwnFd;
jpayne@69 820 // On Unix, any arbitrary file descriptor is supported.
jpayne@69 821 #endif
jpayne@69 822
jpayne@69 823 virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0;
jpayne@69 824 // Create an AsyncInputStream wrapping a file descriptor.
jpayne@69 825 //
jpayne@69 826 // `flags` is a bitwise-OR of the values of the `Flags` enum.
jpayne@69 827
jpayne@69 828 virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0;
jpayne@69 829 // Create an AsyncOutputStream wrapping a file descriptor.
jpayne@69 830 //
jpayne@69 831 // `flags` is a bitwise-OR of the values of the `Flags` enum.
jpayne@69 832
jpayne@69 833 virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0;
jpayne@69 834 // Create an AsyncIoStream wrapping a socket file descriptor.
jpayne@69 835 //
jpayne@69 836 // `flags` is a bitwise-OR of the values of the `Flags` enum.
jpayne@69 837
jpayne@69 838 #if !_WIN32
jpayne@69 839 virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0);
jpayne@69 840 // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be
jpayne@69 841 // a Unix domain socket.
jpayne@69 842 //
jpayne@69 843 // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with
jpayne@69 844 // LowLevelAsyncIoProvider implementations written before this method was added.
jpayne@69 845 #endif
jpayne@69 846
jpayne@69 847 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
jpayne@69 848 Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
jpayne@69 849 // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
jpayne@69 850 // The returned promise does not resolve until connection has completed.
jpayne@69 851 //
jpayne@69 852 // `flags` is a bitwise-OR of the values of the `Flags` enum.
jpayne@69 853
jpayne@69 854 class NetworkFilter {
jpayne@69 855 public:
jpayne@69 856 virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0;
jpayne@69 857 // Returns true if incoming connections or datagrams from the given peer should be accepted.
jpayne@69 858 // If false, they will be dropped. This is used to implement kj::Network::restrictPeers().
jpayne@69 859
jpayne@69 860 static NetworkFilter& getAllAllowed();
jpayne@69 861 };
jpayne@69 862
jpayne@69 863 virtual Own<ConnectionReceiver> wrapListenSocketFd(
jpayne@69 864 Fd fd, NetworkFilter& filter, uint flags = 0) = 0;
jpayne@69 865 inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) {
jpayne@69 866 return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
jpayne@69 867 }
jpayne@69 868 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
jpayne@69 869 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
jpayne@69 870 //
jpayne@69 871 // `flags` is a bitwise-OR of the values of the `Flags` enum.
jpayne@69 872
jpayne@69 873 virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0);
jpayne@69 874 inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) {
jpayne@69 875 return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
jpayne@69 876 }
jpayne@69 877
jpayne@69 878 virtual Timer& getTimer() = 0;
jpayne@69 879 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
jpayne@69 880 // it only updates when the event loop polls for system events. This means that calling `now()`
jpayne@69 881 // on this timer does not require a system call.
jpayne@69 882 //
jpayne@69 883 // This timer is not affected by changes to the system date. It is unspecified whether the timer
jpayne@69 884 // continues to count while the system is suspended.
jpayne@69 885
jpayne@69 886 Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0);
jpayne@69 887 Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0);
jpayne@69 888 Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0);
jpayne@69 889 #if !_WIN32
jpayne@69 890 Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0);
jpayne@69 891 #endif
jpayne@69 892 Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
jpayne@69 893 OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0);
jpayne@69 894 Own<ConnectionReceiver> wrapListenSocketFd(
jpayne@69 895 OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
jpayne@69 896 Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0);
jpayne@69 897 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
jpayne@69 898 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0);
jpayne@69 899 // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle
jpayne@69 900 // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`.
jpayne@69 901 };
jpayne@69 902
jpayne@69 903 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
jpayne@69 904 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.
jpayne@69 905
jpayne@69 906 struct AsyncIoContext {
jpayne@69 907 Own<LowLevelAsyncIoProvider> lowLevelProvider;
jpayne@69 908 Own<AsyncIoProvider> provider;
jpayne@69 909 WaitScope& waitScope;
jpayne@69 910
jpayne@69 911 #if _WIN32
jpayne@69 912 Win32EventPort& win32EventPort;
jpayne@69 913 #else
jpayne@69 914 UnixEventPort& unixEventPort;
jpayne@69 915 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
jpayne@69 916 // field will go away at some point when we have a chance to improve these interfaces.
jpayne@69 917 #endif
jpayne@69 918 };
jpayne@69 919
jpayne@69 920 AsyncIoContext setupAsyncIo();
jpayne@69 921 // Convenience method which sets up the current thread with everything it needs to do async I/O.
jpayne@69 922 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
jpayne@69 923 // doing I/O on the host system, so everything is ready for the thread to start making async calls
jpayne@69 924 // and waiting on promises.
jpayne@69 925 //
jpayne@69 926 // You would typically call this in your main() loop or in the start function of a thread.
jpayne@69 927 // Example:
jpayne@69 928 //
jpayne@69 929 // int main() {
jpayne@69 930 // auto ioContext = kj::setupAsyncIo();
jpayne@69 931 //
jpayne@69 932 // // Now we can call an async function.
jpayne@69 933 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
jpayne@69 934 //
jpayne@69 935 // // And we can wait for the promise to complete. Note that you can only use `wait()`
jpayne@69 936 // // from the top level, not from inside a promise callback.
jpayne@69 937 // String text = textPromise.wait(ioContext.waitScope);
jpayne@69 938 // print(text);
jpayne@69 939 // return 0;
jpayne@69 940 // }
jpayne@69 941 //
jpayne@69 942 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
jpayne@69 943 // particular, note that after a fork(), an AsyncIoContext created in the parent process will
jpayne@69 944 // not work correctly in the child, even if the parent ceases to use its copy. In particular
jpayne@69 945 // note that this means that server processes which daemonize themselves at startup must wait
jpayne@69 946 // until after daemonization to create an AsyncIoContext.
jpayne@69 947
jpayne@69 948 // =======================================================================================
jpayne@69 949 // Convenience adapters.
jpayne@69 950
jpayne@69 951 class CapabilityStreamConnectionReceiver final: public ConnectionReceiver {
jpayne@69 952 // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept()
jpayne@69 953 // calls receiveStream().
jpayne@69 954
jpayne@69 955 public:
jpayne@69 956 CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner)
jpayne@69 957 : inner(inner) {}
jpayne@69 958
jpayne@69 959 Promise<Own<AsyncIoStream>> accept() override;
jpayne@69 960 uint getPort() override;
jpayne@69 961
jpayne@69 962 Promise<AuthenticatedStream> acceptAuthenticated() override;
jpayne@69 963 // Always produces UnknownIdentity. Capability-based security patterns should not rely on
jpayne@69 964 // authenticating peers; the other end of the capability stream should only be given to
jpayne@69 965 // authorized parties in the first place.
jpayne@69 966
jpayne@69 967 private:
jpayne@69 968 AsyncCapabilityStream& inner;
jpayne@69 969 };
jpayne@69 970
jpayne@69 971 class CapabilityStreamNetworkAddress final: public NetworkAddress {
jpayne@69 972 // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress.
jpayne@69 973 //
jpayne@69 974 // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the
jpayne@69 975 // original capability stream, and returning the other end. If `provider` is null, then the
jpayne@69 976 // global kj::newCapabilityPipe() will be used, but this ONLY works if `inner` itself is agnostic
jpayne@69 977 // to the type of streams it receives, e.g. because it was also created using
jpayne@69 978 // kj::NewCapabilityPipe().
jpayne@69 979 //
jpayne@69 980 // listen().accept() is implemented by receiving new streams over the original stream.
jpayne@69 981 //
jpayne@69 982 // Note that clone() doesn't work (due to ownership issues) and toString() returns a static
jpayne@69 983 // string.
jpayne@69 984
jpayne@69 985 public:
jpayne@69 986 CapabilityStreamNetworkAddress(kj::Maybe<AsyncIoProvider&> provider, AsyncCapabilityStream& inner)
jpayne@69 987 : provider(provider), inner(inner) {}
jpayne@69 988
jpayne@69 989 Promise<Own<AsyncIoStream>> connect() override;
jpayne@69 990 Own<ConnectionReceiver> listen() override;
jpayne@69 991
jpayne@69 992 Own<NetworkAddress> clone() override;
jpayne@69 993 String toString() override;
jpayne@69 994
jpayne@69 995 Promise<AuthenticatedStream> connectAuthenticated() override;
jpayne@69 996 // Always produces UnknownIdentity. Capability-based security patterns should not rely on
jpayne@69 997 // authenticating peers; the other end of the capability stream should only be given to
jpayne@69 998 // authorized parties in the first place.
jpayne@69 999
jpayne@69 1000 private:
jpayne@69 1001 kj::Maybe<AsyncIoProvider&> provider;
jpayne@69 1002 AsyncCapabilityStream& inner;
jpayne@69 1003 };
jpayne@69 1004
jpayne@69 1005 class FileInputStream: public AsyncInputStream {
jpayne@69 1006 // InputStream that reads from a disk file -- and enables sendfile() optimization.
jpayne@69 1007 //
jpayne@69 1008 // Reads are performed synchronously -- no actual attempt is made to use asynchronous file I/O.
jpayne@69 1009 // True asynchronous file I/O is complicated and is mostly unnecessary in the presence of
jpayne@69 1010 // caching. Only certain niche programs can expect to benefit from it. For the rest, it's better
jpayne@69 1011 // to use regular syrchronous disk I/O, so that's what this class does.
jpayne@69 1012 //
jpayne@69 1013 // The real purpose of this class, aside from general convenience, is to enable sendfile()
jpayne@69 1014 // optimization. When you use this class's pumpTo() method, and the destination is a socket,
jpayne@69 1015 // the system will detect this and optimize to sendfile(), so that the file data never needs to
jpayne@69 1016 // be read into userspace.
jpayne@69 1017 //
jpayne@69 1018 // NOTE: As of this writing, sendfile() optimization is only implemented on Linux.
jpayne@69 1019
jpayne@69 1020 public:
jpayne@69 1021 FileInputStream(const ReadableFile& file, uint64_t offset = 0)
jpayne@69 1022 : file(file), offset(offset) {}
jpayne@69 1023
jpayne@69 1024 const ReadableFile& getUnderlyingFile() { return file; }
jpayne@69 1025 uint64_t getOffset() { return offset; }
jpayne@69 1026 void seek(uint64_t newOffset) { offset = newOffset; }
jpayne@69 1027
jpayne@69 1028 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes);
jpayne@69 1029 Maybe<uint64_t> tryGetLength();
jpayne@69 1030
jpayne@69 1031 // (pumpTo() is not actually overridden here, but AsyncStreamFd's tryPumpFrom() will detect when
jpayne@69 1032 // the source is a file.)
jpayne@69 1033
jpayne@69 1034 private:
jpayne@69 1035 const ReadableFile& file;
jpayne@69 1036 uint64_t offset;
jpayne@69 1037 };
jpayne@69 1038
jpayne@69 1039 class FileOutputStream: public AsyncOutputStream {
jpayne@69 1040 // OutputStream that writes to a disk file.
jpayne@69 1041 //
jpayne@69 1042 // As with FileInputStream, calls are not actually async. Async would be even less useful here
jpayne@69 1043 // because writes should usually land in cache anyway.
jpayne@69 1044 //
jpayne@69 1045 // sendfile() optimization does not apply when writing to a file, but on Linux, splice() can
jpayne@69 1046 // be used to achieve a similar effect.
jpayne@69 1047 //
jpayne@69 1048 // NOTE: As of this writing, splice() optimization is not implemented.
jpayne@69 1049
jpayne@69 1050 public:
jpayne@69 1051 FileOutputStream(const File& file, uint64_t offset = 0)
jpayne@69 1052 : file(file), offset(offset) {}
jpayne@69 1053
jpayne@69 1054 const File& getUnderlyingFile() { return file; }
jpayne@69 1055 uint64_t getOffset() { return offset; }
jpayne@69 1056 void seek(uint64_t newOffset) { offset = newOffset; }
jpayne@69 1057
jpayne@69 1058 Promise<void> write(const void* buffer, size_t size);
jpayne@69 1059 Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces);
jpayne@69 1060 Promise<void> whenWriteDisconnected();
jpayne@69 1061
jpayne@69 1062 private:
jpayne@69 1063 const File& file;
jpayne@69 1064 uint64_t offset;
jpayne@69 1065 };
jpayne@69 1066
jpayne@69 1067 // =======================================================================================
jpayne@69 1068 // inline implementation details
jpayne@69 1069
jpayne@69 1070 inline AncillaryMessage::AncillaryMessage(
jpayne@69 1071 int level, int type, ArrayPtr<const byte> data)
jpayne@69 1072 : level(level), type(type), data(data) {}
jpayne@69 1073
jpayne@69 1074 inline int AncillaryMessage::getLevel() const { return level; }
jpayne@69 1075 inline int AncillaryMessage::getType() const { return type; }
jpayne@69 1076
jpayne@69 1077 template <typename T>
jpayne@69 1078 inline Maybe<const T&> AncillaryMessage::as() const {
jpayne@69 1079 if (data.size() >= sizeof(T)) {
jpayne@69 1080 return *reinterpret_cast<const T*>(data.begin());
jpayne@69 1081 } else {
jpayne@69 1082 return nullptr;
jpayne@69 1083 }
jpayne@69 1084 }
jpayne@69 1085
jpayne@69 1086 template <typename T>
jpayne@69 1087 inline ArrayPtr<const T> AncillaryMessage::asArray() const {
jpayne@69 1088 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
jpayne@69 1089 }
jpayne@69 1090
jpayne@69 1091 class SecureNetworkWrapper {
jpayne@69 1092 // Abstract interface for a class which implements a "secure" network as a wrapper around an
jpayne@69 1093 // insecure one. "secure" means:
jpayne@69 1094 // * Connections to a server will only succeed if it can be verified that the requested hostname
jpayne@69 1095 // actually belongs to the responding server.
jpayne@69 1096 // * No man-in-the-middle attacker can potentially see the bytes sent and received.
jpayne@69 1097 //
jpayne@69 1098 // The typical implementation uses TLS. The object in this case could be configured to use cerain
jpayne@69 1099 // keys, certificates, etc. See kj/compat/tls.h for such an implementation.
jpayne@69 1100 //
jpayne@69 1101 // However, an implementation could use some other form of encryption, or might not need to use
jpayne@69 1102 // encryption at all. For example, imagine a kj::Network that exists only on a single machine,
jpayne@69 1103 // providing communications between various processes using unix sockets. Perhaps the "hostnames"
jpayne@69 1104 // are actually PIDs in this case. An implementation of such a network could verify the other
jpayne@69 1105 // side's identity using an `SCM_CREDENTIALS` auxiliary message, which cannot be forged. Once
jpayne@69 1106 // verified, there is no need to encrypt since unix sockets cannot be intercepted.
jpayne@69 1107
jpayne@69 1108 public:
jpayne@69 1109 virtual kj::Promise<kj::Own<kj::AsyncIoStream>> wrapServer(kj::Own<kj::AsyncIoStream> stream) = 0;
jpayne@69 1110 // Act as the server side of a connection. The given stream is already connected to a client, but
jpayne@69 1111 // no authentication has occurred. The returned stream represents the secure transport once
jpayne@69 1112 // established.
jpayne@69 1113
jpayne@69 1114 virtual kj::Promise<kj::Own<kj::AsyncIoStream>> wrapClient(
jpayne@69 1115 kj::Own<kj::AsyncIoStream> stream, kj::StringPtr expectedServerHostname) = 0;
jpayne@69 1116 // Act as the client side of a connection. The given stream is already connecetd to a server, but
jpayne@69 1117 // no authentication has occurred. This method will verify that the server actually is the given
jpayne@69 1118 // hostname, then return the stream representing a secure transport to that server.
jpayne@69 1119
jpayne@69 1120 virtual kj::Promise<kj::AuthenticatedStream> wrapServer(kj::AuthenticatedStream stream) = 0;
jpayne@69 1121 virtual kj::Promise<kj::AuthenticatedStream> wrapClient(
jpayne@69 1122 kj::AuthenticatedStream stream, kj::StringPtr expectedServerHostname) = 0;
jpayne@69 1123 // Same as above, but implementing kj::AuthenticatedStream, which provides PeerIdentity objects
jpayne@69 1124 // with more details about the peer. The SecureNetworkWrapper will provide its own implementation
jpayne@69 1125 // of PeerIdentity with the specific details it is able to authenticate.
jpayne@69 1126
jpayne@69 1127 virtual kj::Own<kj::ConnectionReceiver> wrapPort(kj::Own<kj::ConnectionReceiver> port) = 0;
jpayne@69 1128 // Wrap a connection listener. This is equivalent to calling wrapServer() on every connection
jpayne@69 1129 // received.
jpayne@69 1130
jpayne@69 1131 virtual kj::Own<kj::NetworkAddress> wrapAddress(
jpayne@69 1132 kj::Own<kj::NetworkAddress> address, kj::StringPtr expectedServerHostname) = 0;
jpayne@69 1133 // Wrap a NetworkAddress. This is equivalent to calling `wrapClient()` on every connection
jpayne@69 1134 // formed by calling `connect()` on the address.
jpayne@69 1135
jpayne@69 1136 virtual kj::Own<kj::Network> wrapNetwork(kj::Network& network) = 0;
jpayne@69 1137 // Wrap a whole `kj::Network`. This automatically wraps everything constructed using the network.
jpayne@69 1138 // The network will only accept address strings that can be authenticated, and will automatically
jpayne@69 1139 // authenticate servers against those addresses when connecting to them.
jpayne@69 1140 };
jpayne@69 1141
jpayne@69 1142 } // namespace kj
jpayne@69 1143
jpayne@69 1144 KJ_END_HEADER