jpayne@69: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors jpayne@69: // Licensed under the MIT License: jpayne@69: // jpayne@69: // Permission is hereby granted, free of charge, to any person obtaining a copy jpayne@69: // of this software and associated documentation files (the "Software"), to deal jpayne@69: // in the Software without restriction, including without limitation the rights jpayne@69: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell jpayne@69: // copies of the Software, and to permit persons to whom the Software is jpayne@69: // furnished to do so, subject to the following conditions: jpayne@69: // jpayne@69: // The above copyright notice and this permission notice shall be included in jpayne@69: // all copies or substantial portions of the Software. jpayne@69: // jpayne@69: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE jpayne@69: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, jpayne@69: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN jpayne@69: // THE SOFTWARE. jpayne@69: jpayne@69: #pragma once jpayne@69: jpayne@69: #include "async.h" jpayne@69: #include jpayne@69: #include jpayne@69: #include jpayne@69: jpayne@69: KJ_BEGIN_HEADER jpayne@69: jpayne@69: struct sockaddr; jpayne@69: jpayne@69: namespace kj { jpayne@69: jpayne@69: #if _WIN32 jpayne@69: class Win32EventPort; jpayne@69: class AutoCloseHandle; jpayne@69: #else jpayne@69: class UnixEventPort; jpayne@69: #endif jpayne@69: jpayne@69: class AutoCloseFd; jpayne@69: class NetworkAddress; jpayne@69: class AsyncOutputStream; jpayne@69: class AsyncIoStream; jpayne@69: class AncillaryMessage; jpayne@69: jpayne@69: class ReadableFile; jpayne@69: class File; jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Streaming I/O jpayne@69: jpayne@69: class AsyncInputStream: private AsyncObject { jpayne@69: // Asynchronous equivalent of InputStream (from io.h). jpayne@69: jpayne@69: public: jpayne@69: virtual Promise read(void* buffer, size_t minBytes, size_t maxBytes); jpayne@69: virtual Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; jpayne@69: jpayne@69: Promise read(void* buffer, size_t bytes); jpayne@69: jpayne@69: virtual Maybe tryGetLength(); jpayne@69: // Get the remaining number of bytes that will be produced by this stream, if known. jpayne@69: // jpayne@69: // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the jpayne@69: // HTTP implementation may need to fall back to Transfer-Encoding: chunked. jpayne@69: // jpayne@69: // The default implementation always returns null. jpayne@69: jpayne@69: virtual Promise pumpTo( jpayne@69: AsyncOutputStream& output, uint64_t amount = kj::maxValue); jpayne@69: // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the jpayne@69: // total bytes actually pumped (which is only less than `amount` if EOF was reached). jpayne@69: // jpayne@69: // Override this if your stream type knows how to pump itself to certain kinds of output jpayne@69: // streams more efficiently than via the naive approach. You can use jpayne@69: // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match, jpayne@69: // delegate to the default implementation. jpayne@69: // jpayne@69: // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it jpayne@69: // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop. jpayne@69: jpayne@69: Promise> readAllBytes(uint64_t limit = kj::maxValue); jpayne@69: Promise readAllText(uint64_t limit = kj::maxValue); jpayne@69: // Read until EOF and return as one big byte array or string. Throw an exception if EOF is not jpayne@69: // seen before reading `limit` bytes. jpayne@69: // jpayne@69: // To prevent runaway memory allocation, consider using a more conservative value for `limit` than jpayne@69: // the default, particularly on untrusted data streams which may never see EOF. jpayne@69: jpayne@69: virtual void registerAncillaryMessageHandler(Function)> fn); jpayne@69: // Register interest in checking for ancillary messages (aka control messages) when reading. jpayne@69: // The provided callback will be called whenever any are encountered. The messages passed to jpayne@69: // the function do not live beyond when function returns. jpayne@69: // Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this. jpayne@69: jpayne@69: virtual Maybe> tryTee(uint64_t limit = kj::maxValue); jpayne@69: // Primarily intended as an optimization for the `tee` call. Returns an input stream whose state jpayne@69: // is independent from this one but which will return the exact same set of bytes read going jpayne@69: // forward. limit is a total limit on the amount of memory, in bytes, which a tee implementation jpayne@69: // may use to buffer stream data. An implementation must throw an exception if a read operation jpayne@69: // would cause the limit to be exceeded. If tryTee() can see that the new limit is impossible to jpayne@69: // satisfy, it should return nullptr so that the pessimized path is taken in newTee. This is jpayne@69: // likely to arise if tryTee() is called twice with different limits on the same stream. jpayne@69: }; jpayne@69: jpayne@69: class AsyncOutputStream: private AsyncObject { jpayne@69: // Asynchronous equivalent of OutputStream (from io.h). jpayne@69: jpayne@69: public: jpayne@69: virtual Promise write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0; jpayne@69: virtual Promise write(ArrayPtr> pieces) jpayne@69: KJ_WARN_UNUSED_RESULT = 0; jpayne@69: jpayne@69: virtual Maybe> tryPumpFrom( jpayne@69: AsyncInputStream& input, uint64_t amount = kj::maxValue); jpayne@69: // Implements double-dispatch for AsyncInputStream::pumpTo(). jpayne@69: // jpayne@69: // This method should only be called from within an implementation of pumpTo(). jpayne@69: // jpayne@69: // This method examines the type of `input` to find optimized ways to pump data from it to this jpayne@69: // output stream. If it finds one, it performs the pump. Otherwise, it returns null. jpayne@69: // jpayne@69: // The default implementation always returns null. jpayne@69: jpayne@69: virtual Promise whenWriteDisconnected() = 0; jpayne@69: // Returns a promise that resolves when the stream has become disconnected such that new write()s jpayne@69: // will fail with a DISCONNECTED exception. This is particularly useful, for example, to cancel jpayne@69: // work early when it is detected that no one will receive the result. jpayne@69: // jpayne@69: // Note that not all streams are able to detect this condition without actually performing a jpayne@69: // write(); such stream implementations may return a promise that never resolves. (In particular, jpayne@69: // as of this writing, whenWriteDisconnected() is not implemented on Windows. Also, for TCP jpayne@69: // streams, not all disconnects are detectable -- a power or network failure may lead the jpayne@69: // connection to hang forever, or until configured socket options lead to a timeout.) jpayne@69: // jpayne@69: // Unlike most other asynchronous stream methods, it is safe to call whenWriteDisconnected() jpayne@69: // multiple times without canceling the previous promises. jpayne@69: }; jpayne@69: jpayne@69: class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { jpayne@69: // A combination input and output stream. jpayne@69: jpayne@69: public: jpayne@69: virtual void shutdownWrite() = 0; jpayne@69: // Cleanly shut down just the write end of the stream, while keeping the read end open. jpayne@69: jpayne@69: virtual void abortRead() {} jpayne@69: // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only jpayne@69: // be called when an error has occurred. jpayne@69: jpayne@69: virtual void getsockopt(int level, int option, void* value, uint* length); jpayne@69: virtual void setsockopt(int level, int option, const void* value, uint length); jpayne@69: // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception jpayne@69: // if the stream is not a socket or the option is not appropriate for the socket type. The jpayne@69: // default implementations always throw "unimplemented". jpayne@69: jpayne@69: virtual void getsockname(struct sockaddr* addr, uint* length); jpayne@69: virtual void getpeername(struct sockaddr* addr, uint* length); jpayne@69: // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" jpayne@69: // exception if the stream is not a socket. The default implementations always throw jpayne@69: // "unimplemented". jpayne@69: // jpayne@69: // Note that we don't provide methods that return NetworkAddress because it usually wouldn't jpayne@69: // be useful. You can't connect() to or listen() on these addresses, obviously, because they are jpayne@69: // ephemeral addresses for a single connection. jpayne@69: jpayne@69: virtual kj::Maybe getFd() const { return nullptr; } jpayne@69: // Get the underlying Unix file descriptor, if any. Returns nullptr if this object actually jpayne@69: // isn't wrapping a file descriptor. jpayne@69: }; jpayne@69: jpayne@69: Promise unoptimizedPumpTo( jpayne@69: AsyncInputStream& input, AsyncOutputStream& output, uint64_t amount, jpayne@69: uint64_t completedSoFar = 0); jpayne@69: // Performs a pump using read() and write(), without calling the stream's pumpTo() nor jpayne@69: // tryPumpFrom() methods. This is intended to be used as a fallback by implementations of pumpTo() jpayne@69: // and tryPumpFrom() when they want to give up on optimization, but can't just call pumpTo() again jpayne@69: // because this would recursively retry the optimization. unoptimizedPumpTo() should only be called jpayne@69: // inside implementations of streams, never by the caller of a stream -- use the pumpTo() method jpayne@69: // instead. jpayne@69: // jpayne@69: // `completedSoFar` is the number of bytes out of `amount` that have already been pumped. This is jpayne@69: // provided for convenience for cases where the caller has already done some pumping before they jpayne@69: // give up. Otherwise, a `.then()` would need to be used to add the bytes to the final result. jpayne@69: jpayne@69: class AsyncCapabilityStream: public AsyncIoStream { jpayne@69: // An AsyncIoStream that also allows transmitting new stream objects and file descriptors jpayne@69: // (capabilities, in the object-capability model sense), in addition to bytes. jpayne@69: // jpayne@69: // Capabilities can be attached to bytes when they are written. On the receiving end, the read() jpayne@69: // that receives the first byte of such a message will also receive the capabilities. jpayne@69: // jpayne@69: // Note that AsyncIoStream's regular byte-oriented methods can be used on AsyncCapabilityStream, jpayne@69: // with the effect of silently dropping any capabilities attached to the respective bytes. E.g. jpayne@69: // using `AsyncIoStream::tryRead()` to read bytes that had been sent with `writeWithFds()` will jpayne@69: // silently drop the FDs (closing them if appropriate). Also note that pumping a stream with jpayne@69: // `pumpTo()` always drops all capabilities attached to the pumped data. (TODO(someday): Do we jpayne@69: // want a version of pumpTo() that preserves capabilities?) jpayne@69: // jpayne@69: // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor jpayne@69: // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally jpayne@69: // read()s when it should have called receiveStream(), it will observe a NUL byte in the data jpayne@69: // and the capability will be discarded. Of course, an application should not depend on this jpayne@69: // behavior; it should avoid read()ing through a capability. jpayne@69: // jpayne@69: // KJ does not provide any inter-process implementation of this type on Windows, as there's no jpayne@69: // obvious implementation there. Handle passing on Windows requires at least one of the processes jpayne@69: // involved to have permission to modify the other's handle table, which is effectively full jpayne@69: // control. Handle passing between mutually non-trusting processes would require a trusted jpayne@69: // broker process to facilitate. One could possibly implement this type in terms of such a jpayne@69: // broker, or in terms of direct handle passing if at least one process trusts the other. jpayne@69: jpayne@69: public: jpayne@69: virtual Promise writeWithFds(ArrayPtr data, jpayne@69: ArrayPtr> moreData, jpayne@69: ArrayPtr fds) = 0; jpayne@69: Promise writeWithFds(ArrayPtr data, jpayne@69: ArrayPtr> moreData, jpayne@69: ArrayPtr fds); jpayne@69: // Write some data to the stream with some file descriptors attached to it. jpayne@69: // jpayne@69: // The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed jpayne@69: // limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is jpayne@69: // probably a bad idea. jpayne@69: jpayne@69: struct ReadResult { jpayne@69: size_t byteCount; jpayne@69: size_t capCount; jpayne@69: }; jpayne@69: jpayne@69: virtual Promise tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes, jpayne@69: AutoCloseFd* fdBuffer, size_t maxFds) = 0; jpayne@69: // Read data from the stream that may have file descriptors attached. Any attached descriptors jpayne@69: // will be placed in `fdBuffer`. If multiple bundles of FDs are encountered in the course of jpayne@69: // reading the amount of data requested by minBytes/maxBytes, then they will be concatenated. If jpayne@69: // more FDs are received than fit in the buffer, then the excess will be discarded and closed -- jpayne@69: // this behavior, while ugly, is important to defend against denial-of-service attacks that may jpayne@69: // fill up the FD table with garbage. Applications must think carefully about how many FDs they jpayne@69: // really need to receive at once and set a well-defined limit. jpayne@69: jpayne@69: virtual Promise writeWithStreams(ArrayPtr data, jpayne@69: ArrayPtr> moreData, jpayne@69: Array> streams) = 0; jpayne@69: virtual Promise tryReadWithStreams( jpayne@69: void* buffer, size_t minBytes, size_t maxBytes, jpayne@69: Own* streamBuffer, size_t maxStreams) = 0; jpayne@69: // Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from jpayne@69: // the same AsyncIoProvider. jpayne@69: jpayne@69: // --------------------------------------------------------------------------- jpayne@69: // Helpers for sending individual capabilities. jpayne@69: // jpayne@69: // These are equivalent to the above methods with the constraint that only one FD is jpayne@69: // sent/received at a time and the corresponding data is a single zero-valued byte. jpayne@69: jpayne@69: Promise> receiveStream(); jpayne@69: Promise>> tryReceiveStream(); jpayne@69: Promise sendStream(Own stream); jpayne@69: // Transfer a single stream. jpayne@69: jpayne@69: Promise receiveFd(); jpayne@69: Promise> tryReceiveFd(); jpayne@69: Promise sendFd(int fd); jpayne@69: // Transfer a single raw file descriptor. jpayne@69: }; jpayne@69: jpayne@69: struct OneWayPipe { jpayne@69: // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) jpayne@69: jpayne@69: Own in; jpayne@69: Own out; jpayne@69: }; jpayne@69: jpayne@69: OneWayPipe newOneWayPipe(kj::Maybe expectedLength = nullptr); jpayne@69: // Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits jpayne@69: // until both a read() and a write() call are pending, then resolves both. jpayne@69: // jpayne@69: // If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many jpayne@69: // bytes. The input end's `tryGetLength()` will return the number of bytes left. jpayne@69: jpayne@69: struct TwoWayPipe { jpayne@69: // A data pipe that supports sending in both directions. Each end's output sends data to the jpayne@69: // other end's input. (Typically backed by socketpair() system call.) jpayne@69: jpayne@69: Own ends[2]; jpayne@69: }; jpayne@69: jpayne@69: TwoWayPipe newTwoWayPipe(); jpayne@69: // Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits jpayne@69: // until both a read() and a write() call are pending, then resolves both. jpayne@69: jpayne@69: struct CapabilityPipe { jpayne@69: // Like TwoWayPipe but allowing capability-passing. jpayne@69: jpayne@69: Own ends[2]; jpayne@69: }; jpayne@69: jpayne@69: CapabilityPipe newCapabilityPipe(); jpayne@69: // Like newTwoWayPipe() but creates a capability pipe. jpayne@69: // jpayne@69: // The requirement of `writeWithStreams()` that "The stream implementations must be from the same jpayne@69: // AsyncIoProvider." does not apply to this pipe; any kind of AsyncCapabilityStream implementation jpayne@69: // is supported. jpayne@69: // jpayne@69: // This implementation does not know how to convert streams to FDs or vice versa; if you write FDs jpayne@69: // you must read FDs, and if you write streams you must read streams. jpayne@69: jpayne@69: struct Tee { jpayne@69: // Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream. jpayne@69: jpayne@69: Own branches[2]; jpayne@69: }; jpayne@69: jpayne@69: Tee newTee(Own input, uint64_t limit = kj::maxValue); jpayne@69: // Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is jpayne@69: // called on one of the two input ends. If a read or pump operation is subsequently called on the jpayne@69: // other input end, the buffered data is consumed. jpayne@69: // jpayne@69: // `pumpTo()` operations on the input ends will proactively read from the inner stream and block jpayne@69: // while writing to the output stream. While one branch has an active `pumpTo()` operation, any jpayne@69: // `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the jpayne@69: // pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if jpayne@69: // there are `pumpTo()` operations active on both branches, the greater of the two backpressures is jpayne@69: // respected -- the two pumps progress in lockstep, and there is no buffering. jpayne@69: // jpayne@69: // At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would jpayne@69: // grow beyond the limit, an exception is generated, which both branches see once they have jpayne@69: // exhausted their buffers. jpayne@69: // jpayne@69: // It is recommended that you use a more conservative value for `limit` than the default. jpayne@69: jpayne@69: Own newPromisedStream(Promise> promise); jpayne@69: Own newPromisedStream(Promise> promise); jpayne@69: // Constructs an Async*Stream which waits for a promise to resolve, then forwards all calls to the jpayne@69: // promised stream. jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Authenticated streams jpayne@69: jpayne@69: class PeerIdentity { jpayne@69: // PeerIdentity provides information about a connecting client. Various subclasses exist to jpayne@69: // address different network types. jpayne@69: public: jpayne@69: virtual kj::String toString() = 0; jpayne@69: // Returns a human-readable string identifying the peer. Where possible, this string will be jpayne@69: // in the same format as the addresses you could pass to `kj::Network::parseAddress()`. However, jpayne@69: // only certain subclasses of `PeerIdentity` guarantee this property. jpayne@69: }; jpayne@69: jpayne@69: struct AuthenticatedStream { jpayne@69: // A pair of an `AsyncIoStream` and a `PeerIdentity`. This is used as the return type of jpayne@69: // `NetworkAddress::connectAuthenticated()` and `ConnectionReceiver::acceptAuthenticated()`. jpayne@69: jpayne@69: Own stream; jpayne@69: // The byte stream. jpayne@69: jpayne@69: Own peerIdentity; jpayne@69: // An object indicating who is at the other end of the stream. jpayne@69: // jpayne@69: // Different subclasses of `PeerIdentity` are used in different situations: jpayne@69: // - TCP connections will use NetworkPeerIdentity, which gives the network address of the client. jpayne@69: // - Local (unix) socket connections will use LocalPeerIdentity, which identifies the UID jpayne@69: // and PID of the process that initiated the connection. jpayne@69: // - TLS connections will use TlsPeerIdentity which provides details of the client certificate, jpayne@69: // if any was provided. jpayne@69: // - When no meaningful peer identity can be provided, `UnknownPeerIdentity` is returned. jpayne@69: // jpayne@69: // Implementations of `Network`, `ConnectionReceiver`, `NetworkAddress`, etc. should document the jpayne@69: // specific assumptions the caller can make about the type of `PeerIdentity`s used, allowing for jpayne@69: // identities to be statically downcast if the right conditions are met. In the absence of jpayne@69: // documented promises, RTTI may be needed to query the type. jpayne@69: }; jpayne@69: jpayne@69: class NetworkPeerIdentity: public PeerIdentity { jpayne@69: // PeerIdentity used for network protocols like TCP/IP. This identifies the remote peer. jpayne@69: // jpayne@69: // This is only "authenticated" to the extent that we know data written to the stream will be jpayne@69: // routed to the given address. This does not preclude the possibility of man-in-the-middle jpayne@69: // attacks by attackers who are able to manipulate traffic along the route. jpayne@69: public: jpayne@69: virtual NetworkAddress& getAddress() = 0; jpayne@69: // Obtain the peer's address as a NetworkAddress object. The returned reference's lifetime is the jpayne@69: // same as the `NetworkPeerIdentity`, but you can always call `clone()` on it to get a copy that jpayne@69: // lives longer. jpayne@69: jpayne@69: static kj::Own newInstance(kj::Own addr); jpayne@69: // Construct an instance of this interface wrapping the given address. jpayne@69: }; jpayne@69: jpayne@69: class LocalPeerIdentity: public PeerIdentity { jpayne@69: // PeerIdentity used for connections between processes on the local machine -- in particular, jpayne@69: // Unix sockets. jpayne@69: // jpayne@69: // (This interface probably isn't useful on Windows.) jpayne@69: public: jpayne@69: struct Credentials { jpayne@69: kj::Maybe pid; jpayne@69: kj::Maybe uid; jpayne@69: jpayne@69: // We don't cover groups at present because some systems produce a list of groups while others jpayne@69: // only provide the peer's main group, the latter being pretty useless. jpayne@69: }; jpayne@69: jpayne@69: virtual Credentials getCredentials() = 0; jpayne@69: // Get the PID and UID of the peer process, if possible. jpayne@69: // jpayne@69: // Either ID may be null if the peer could not be identified. Some operating systems do not jpayne@69: // support retrieving these credentials, or can only provide one or the other. Some situations jpayne@69: // (like user and PID namespaces on Linux) may also make it impossible to represent the peer's jpayne@69: // credentials accurately. jpayne@69: // jpayne@69: // Note the meaning here can be subtle. Multiple processes can potentially have the socket in jpayne@69: // their file descriptor tables. The identified process is the one who called `connect()` or jpayne@69: // `listen()`. jpayne@69: // jpayne@69: // On Linux this is implemented with SO_PEERCRED. jpayne@69: jpayne@69: static kj::Own newInstance(Credentials creds); jpayne@69: // Construct an instance of this interface wrapping the given credentials. jpayne@69: }; jpayne@69: jpayne@69: class UnknownPeerIdentity: public PeerIdentity { jpayne@69: public: jpayne@69: static kj::Own newInstance(); jpayne@69: // Get an instance of this interface. This actually always returns the same instance with no jpayne@69: // memory allocation. jpayne@69: }; jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Accepting connections jpayne@69: jpayne@69: class ConnectionReceiver: private AsyncObject { jpayne@69: // Represents a server socket listening on a port. jpayne@69: jpayne@69: public: jpayne@69: virtual Promise> accept() = 0; jpayne@69: // Accept the next incoming connection. jpayne@69: jpayne@69: virtual Promise acceptAuthenticated(); jpayne@69: // Accept the next incoming connection, and also provide a PeerIdentity with any information jpayne@69: // about the client. jpayne@69: // jpayne@69: // For backwards-compatibility, the default implementation of this method calls `accept()` and jpayne@69: // then adds `UnknownPeerIdentity`. jpayne@69: jpayne@69: virtual uint getPort() = 0; jpayne@69: // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't jpayne@69: // specify a port when constructing the NetworkAddress -- one will have been assigned jpayne@69: // automatically. jpayne@69: jpayne@69: virtual void getsockopt(int level, int option, void* value, uint* length); jpayne@69: virtual void setsockopt(int level, int option, const void* value, uint length); jpayne@69: virtual void getsockname(struct sockaddr* addr, uint* length); jpayne@69: // Same as the methods of AsyncIoStream. jpayne@69: }; jpayne@69: jpayne@69: Own newAggregateConnectionReceiver(Array> receivers); jpayne@69: // Create a ConnectionReceiver that listens on several other ConnectionReceivers and returns jpayne@69: // sockets from any of them. jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Datagram I/O jpayne@69: jpayne@69: class AncillaryMessage { jpayne@69: // Represents an ancillary message (aka control message) received using the recvmsg() system jpayne@69: // call (or equivalent). Most apps will not use this. jpayne@69: jpayne@69: public: jpayne@69: inline AncillaryMessage(int level, int type, ArrayPtr data); jpayne@69: AncillaryMessage() = default; jpayne@69: jpayne@69: inline int getLevel() const; jpayne@69: // Originating protocol / socket level. jpayne@69: jpayne@69: inline int getType() const; jpayne@69: // Protocol-specific message type. jpayne@69: jpayne@69: template jpayne@69: inline Maybe as() const; jpayne@69: // Interpret the ancillary message as the given struct type. Most ancillary messages are some jpayne@69: // sort of struct, so this is a convenient way to access it. Returns nullptr if the message jpayne@69: // is smaller than the struct -- this can happen if the message was truncated due to jpayne@69: // insufficient ancillary buffer space. jpayne@69: jpayne@69: template jpayne@69: inline ArrayPtr asArray() const; jpayne@69: // Interpret the ancillary message as an array of items. If the message size does not evenly jpayne@69: // divide into elements of type T, the remainder is discarded -- this can happen if the message jpayne@69: // was truncated due to insufficient ancillary buffer space. jpayne@69: jpayne@69: private: jpayne@69: int level; jpayne@69: int type; jpayne@69: ArrayPtr data; jpayne@69: // Message data. In most cases you should use `as()` or `asArray()`. jpayne@69: }; jpayne@69: jpayne@69: class DatagramReceiver { jpayne@69: // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's jpayne@69: // capacity in advance; if a received packet is larger than the capacity, it will be truncated. jpayne@69: jpayne@69: public: jpayne@69: virtual Promise receive() = 0; jpayne@69: // Receive a new message, overwriting this object's content. jpayne@69: // jpayne@69: // receive() may reuse the same buffers for content and ancillary data with each call. jpayne@69: jpayne@69: template jpayne@69: struct MaybeTruncated { jpayne@69: T value; jpayne@69: jpayne@69: bool isTruncated; jpayne@69: // True if the Receiver's capacity was insufficient to receive the value and therefore the jpayne@69: // value is truncated. jpayne@69: }; jpayne@69: jpayne@69: virtual MaybeTruncated> getContent() = 0; jpayne@69: // Get the content of the datagram. jpayne@69: jpayne@69: virtual MaybeTruncated> getAncillary() = 0; jpayne@69: // Ancillary messages received with the datagram. See the recvmsg() system call and the cmsghdr jpayne@69: // struct. Most apps don't need this. jpayne@69: // jpayne@69: // If the returned value is truncated, then the last message in the array may itself be jpayne@69: // truncated, meaning its as() method will return nullptr or its asArray() method will jpayne@69: // return fewer elements than expected. Truncation can also mean that additional messages were jpayne@69: // available but discarded. jpayne@69: jpayne@69: virtual NetworkAddress& getSource() = 0; jpayne@69: // Get the datagram sender's address. jpayne@69: jpayne@69: struct Capacity { jpayne@69: size_t content = 8192; jpayne@69: // How much space to allocate for the datagram content. If a datagram is received that is jpayne@69: // larger than this, it will be truncated, with no way to recover the tail. jpayne@69: jpayne@69: size_t ancillary = 0; jpayne@69: // How much space to allocate for ancillary messages. As with content, if the ancillary data jpayne@69: // is larger than this, it will be truncated. jpayne@69: }; jpayne@69: }; jpayne@69: jpayne@69: class DatagramPort { jpayne@69: public: jpayne@69: virtual Promise send(const void* buffer, size_t size, NetworkAddress& destination) = 0; jpayne@69: virtual Promise send(ArrayPtr> pieces, jpayne@69: NetworkAddress& destination) = 0; jpayne@69: jpayne@69: virtual Own makeReceiver( jpayne@69: DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; jpayne@69: // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much jpayne@69: // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. jpayne@69: jpayne@69: virtual uint getPort() = 0; jpayne@69: // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't jpayne@69: // specify a port when constructing the NetworkAddress -- one will have been assigned jpayne@69: // automatically. jpayne@69: jpayne@69: virtual void getsockopt(int level, int option, void* value, uint* length); jpayne@69: virtual void setsockopt(int level, int option, const void* value, uint length); jpayne@69: // Same as the methods of AsyncIoStream. jpayne@69: }; jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Networks jpayne@69: jpayne@69: class NetworkAddress: private AsyncObject { jpayne@69: // Represents a remote address to which the application can connect. jpayne@69: jpayne@69: public: jpayne@69: virtual Promise> connect() = 0; jpayne@69: // Make a new connection to this address. jpayne@69: // jpayne@69: // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. jpayne@69: jpayne@69: virtual Promise connectAuthenticated(); jpayne@69: // Connect to the address and return both the connection and information about the peer identity. jpayne@69: // This is especially useful when using TLS, to get certificate details. jpayne@69: // jpayne@69: // For backwards-compatibility, the default implementation of this method calls `connect()` and jpayne@69: // then uses a `NetworkPeerIdentity` wrapping a clone of this `NetworkAddress` -- which is not jpayne@69: // particularly useful. jpayne@69: jpayne@69: virtual Own listen() = 0; jpayne@69: // Listen for incoming connections on this address. jpayne@69: // jpayne@69: // The address must be local. jpayne@69: jpayne@69: virtual Own bindDatagramPort(); jpayne@69: // Open this address as a datagram (e.g. UDP) port. jpayne@69: // jpayne@69: // The address must be local. jpayne@69: jpayne@69: virtual Own clone() = 0; jpayne@69: // Returns an equivalent copy of this NetworkAddress. jpayne@69: jpayne@69: virtual String toString() = 0; jpayne@69: // Produce a human-readable string which hopefully can be passed to Network::parseAddress() jpayne@69: // to reproduce this address, although whether or not that works of course depends on the Network jpayne@69: // implementation. This should be called only to display the address to human users, who will jpayne@69: // hopefully know what they are able to do with it. jpayne@69: }; jpayne@69: jpayne@69: class Network { jpayne@69: // Factory for NetworkAddress instances, representing the network services offered by the jpayne@69: // operating system. jpayne@69: // jpayne@69: // This interface typically represents broad authority, and well-designed code should limit its jpayne@69: // use to high-level startup code and user interaction. Low-level APIs should accept jpayne@69: // NetworkAddress instances directly and work from there, if at all possible. jpayne@69: jpayne@69: public: jpayne@69: virtual Promise> parseAddress(StringPtr addr, uint portHint = 0) = 0; jpayne@69: // Construct a network address from a user-provided string. The format of the address jpayne@69: // strings is not specified at the API level, and application code should make no assumptions jpayne@69: // about them. These strings should always be provided by humans, and said humans will know jpayne@69: // what format to use in their particular context. jpayne@69: // jpayne@69: // `portHint`, if provided, specifies the "standard" IP port number for the application-level jpayne@69: // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a jpayne@69: // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is jpayne@69: // omitted, then the returned address will only support listen() and bindDatagramPort() jpayne@69: // (not connect()), and an unused port will be chosen each time one of those methods is called. jpayne@69: jpayne@69: virtual Own getSockaddr(const void* sockaddr, uint len) = 0; jpayne@69: // Construct a network address from a legacy struct sockaddr. jpayne@69: jpayne@69: virtual Own restrictPeers( jpayne@69: kj::ArrayPtr allow, jpayne@69: kj::ArrayPtr deny = nullptr) KJ_WARN_UNUSED_RESULT = 0; jpayne@69: // Constructs a new Network instance wrapping this one which restricts which peer addresses are jpayne@69: // permitted (both for outgoing and incoming connections). jpayne@69: // jpayne@69: // Communication will be allowed only with peers whose addresses match one of the patterns jpayne@69: // specified in the `allow` array. If a `deny` array is specified, then any address which matches jpayne@69: // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be jpayne@69: // denied. jpayne@69: // jpayne@69: // The syntax of address patterns depends on the network, except that three special patterns are jpayne@69: // defined for all networks: jpayne@69: // - "private": Matches network addresses that are reserved by standards for private networks, jpayne@69: // such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local". jpayne@69: // - "public": Opposite of "private". jpayne@69: // - "local": Matches network addresses that are defined by standards to only be accessible from jpayne@69: // the local machine, such as "127.0.0.0/8" or Unix domain addresses. jpayne@69: // - "network": Opposite of "local". jpayne@69: // jpayne@69: // For the standard KJ network implementation, the following patterns are also recognized: jpayne@69: // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or jpayne@69: // "2001:db8::/32". jpayne@69: // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a jpayne@69: // glob.) jpayne@69: // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may jpayne@69: // support specifying a glob.) jpayne@69: // jpayne@69: // Network restrictions apply *after* DNS resolution (otherwise they'd be useless). jpayne@69: // jpayne@69: // It is legal to parseAddress() a restricted address. An exception won't be thrown until jpayne@69: // connect() is called. jpayne@69: // jpayne@69: // It's possible to listen() on a restricted address. However, connections will only be accepted jpayne@69: // from non-restricted addresses; others will be dropped. If a particular listen address has no jpayne@69: // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then jpayne@69: // listen() may throw (or may simply never receive any connections). jpayne@69: // jpayne@69: // Examples: jpayne@69: // jpayne@69: // auto restricted = network->restrictPeers({"public"}); jpayne@69: // jpayne@69: // Allows connections only to/from public internet addresses. Use this when connecting to an jpayne@69: // address specified by a third party that is not trusted and is not themselves already on your jpayne@69: // private network. jpayne@69: // jpayne@69: // auto restricted = network->restrictPeers({"private"}); jpayne@69: // jpayne@69: // Allows connections only to/from the private network. Use this on the server side to reject jpayne@69: // connections from the public internet. jpayne@69: // jpayne@69: // auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"}); jpayne@69: // jpayne@69: // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked. jpayne@69: // jpayne@69: // auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"}); jpayne@69: // jpayne@69: // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an jpayne@69: // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow jpayne@69: // rule that is more specific than the deny rule). jpayne@69: }; jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // I/O Provider jpayne@69: jpayne@69: class AsyncIoProvider { jpayne@69: // Class which constructs asynchronous wrappers around the operating system's I/O facilities. jpayne@69: // jpayne@69: // Generally, the implementation of this interface must integrate closely with a particular jpayne@69: // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide jpayne@69: // an AsyncIoProvider. jpayne@69: jpayne@69: public: jpayne@69: virtual OneWayPipe newOneWayPipe() = 0; jpayne@69: // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with jpayne@69: // the pipe(2) system call). jpayne@69: jpayne@69: virtual TwoWayPipe newTwoWayPipe() = 0; jpayne@69: // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with jpayne@69: // socketpair(2) system call). Data written to one end can be read from the other. jpayne@69: jpayne@69: virtual CapabilityPipe newCapabilityPipe(); jpayne@69: // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe. jpayne@69: // jpayne@69: // The default implementation throws an unimplemented exception. In particular this is not jpayne@69: // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to jpayne@69: // pass handles over a stream. jpayne@69: jpayne@69: virtual Network& getNetwork() = 0; jpayne@69: // Creates a new `Network` instance representing the networks exposed by the operating system. jpayne@69: // jpayne@69: // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If jpayne@69: // you call this from low-level code, then you are preventing higher-level code from injecting an jpayne@69: // alternative implementation. Instead, if your code needs to use network functionality, it jpayne@69: // should ask for a `Network` as a constructor or method parameter, so that higher-level code can jpayne@69: // chose what implementation to use. The system network is essentially a singleton. See: jpayne@69: // http://www.object-oriented-security.org/lets-argue/singletons jpayne@69: // jpayne@69: // Code that uses the system network should not make any assumptions about what kinds of jpayne@69: // addresses it will parse, as this could differ across platforms. String addresses should come jpayne@69: // strictly from the user, who will know how to write them correctly for their system. jpayne@69: // jpayne@69: // With that said, KJ currently supports the following string address formats: jpayne@69: // - IPv4: "1.2.3.4", "1.2.3.4:80" jpayne@69: // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" jpayne@69: // - Local IP wildcard (covers both v4 and v6): "*", "*:80" jpayne@69: // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" jpayne@69: // - Unix domain: "unix:/path/to/socket" jpayne@69: jpayne@69: struct PipeThread { jpayne@69: // A combination of a thread and a two-way pipe that communicates with that thread. jpayne@69: // jpayne@69: // The fields are intentionally ordered so that the pipe will be destroyed (and therefore jpayne@69: // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread jpayne@69: // arranges to exit when it detects disconnect, destruction should be clean. jpayne@69: jpayne@69: Own thread; jpayne@69: Own pipe; jpayne@69: }; jpayne@69: jpayne@69: virtual PipeThread newPipeThread( jpayne@69: Function startFunc) = 0; jpayne@69: // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate jpayne@69: // with it. One end of the pipe is passed to the thread's start function and the other end of jpayne@69: // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will jpayne@69: // already have an active `EventLoop` when `startFunc` is called. jpayne@69: // jpayne@69: // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too jpayne@69: // much at once but I'm not sure how to cleanly break it down. jpayne@69: jpayne@69: virtual Timer& getTimer() = 0; jpayne@69: // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- jpayne@69: // it only updates when the event loop polls for system events. This means that calling `now()` jpayne@69: // on this timer does not require a system call. jpayne@69: // jpayne@69: // This timer is not affected by changes to the system date. It is unspecified whether the timer jpayne@69: // continues to count while the system is suspended. jpayne@69: }; jpayne@69: jpayne@69: class LowLevelAsyncIoProvider { jpayne@69: // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on jpayne@69: // different operating systems. You should prefer to use `AsyncIoProvider` over this interface jpayne@69: // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. jpayne@69: // jpayne@69: // On Unix, this interface can be used to import native file descriptors into the async framework. jpayne@69: // Different implementations of this interface might work on top of different event handling jpayne@69: // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. jpayne@69: // jpayne@69: // On Windows, this interface can be used to import native SOCKETs into the async framework. jpayne@69: // Different implementations of this interface might work on top of different event handling jpayne@69: // primitives, such as I/O completion ports vs. completion routines. jpayne@69: jpayne@69: public: jpayne@69: enum Flags { jpayne@69: // Flags controlling how to wrap a file descriptor. jpayne@69: jpayne@69: TAKE_OWNERSHIP = 1 << 0, jpayne@69: // The returned object should own the file descriptor, automatically closing it when destroyed. jpayne@69: // The close-on-exec flag will be set on the descriptor if it is not already. jpayne@69: // jpayne@69: // If this flag is not used, then the file descriptor is not automatically closed and the jpayne@69: // close-on-exec flag is not modified. jpayne@69: jpayne@69: #if !_WIN32 jpayne@69: ALREADY_CLOEXEC = 1 << 1, jpayne@69: // Indicates that the close-on-exec flag is known already to be set, so need not be set again. jpayne@69: // Only relevant when combined with TAKE_OWNERSHIP. jpayne@69: // jpayne@69: // On Linux, all system calls which yield new file descriptors have flags or variants which jpayne@69: // set the close-on-exec flag immediately. Unfortunately, other OS's do not. jpayne@69: jpayne@69: ALREADY_NONBLOCK = 1 << 2 jpayne@69: // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag jpayne@69: // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode jpayne@69: // automatically. jpayne@69: // jpayne@69: // On Linux, all system calls which yield new file descriptors have flags or variants which jpayne@69: // enable non-blocking mode immediately. Unfortunately, other OS's do not. jpayne@69: #endif jpayne@69: }; jpayne@69: jpayne@69: #if _WIN32 jpayne@69: typedef uintptr_t Fd; jpayne@69: typedef AutoCloseHandle OwnFd; jpayne@69: // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the jpayne@69: // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify jpayne@69: // explicitly). jpayne@69: #else jpayne@69: typedef int Fd; jpayne@69: typedef AutoCloseFd OwnFd; jpayne@69: // On Unix, any arbitrary file descriptor is supported. jpayne@69: #endif jpayne@69: jpayne@69: virtual Own wrapInputFd(Fd fd, uint flags = 0) = 0; jpayne@69: // Create an AsyncInputStream wrapping a file descriptor. jpayne@69: // jpayne@69: // `flags` is a bitwise-OR of the values of the `Flags` enum. jpayne@69: jpayne@69: virtual Own wrapOutputFd(Fd fd, uint flags = 0) = 0; jpayne@69: // Create an AsyncOutputStream wrapping a file descriptor. jpayne@69: // jpayne@69: // `flags` is a bitwise-OR of the values of the `Flags` enum. jpayne@69: jpayne@69: virtual Own wrapSocketFd(Fd fd, uint flags = 0) = 0; jpayne@69: // Create an AsyncIoStream wrapping a socket file descriptor. jpayne@69: // jpayne@69: // `flags` is a bitwise-OR of the values of the `Flags` enum. jpayne@69: jpayne@69: #if !_WIN32 jpayne@69: virtual Own wrapUnixSocketFd(Fd fd, uint flags = 0); jpayne@69: // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be jpayne@69: // a Unix domain socket. jpayne@69: // jpayne@69: // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with jpayne@69: // LowLevelAsyncIoProvider implementations written before this method was added. jpayne@69: #endif jpayne@69: jpayne@69: virtual Promise> wrapConnectingSocketFd( jpayne@69: Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; jpayne@69: // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address. jpayne@69: // The returned promise does not resolve until connection has completed. jpayne@69: // jpayne@69: // `flags` is a bitwise-OR of the values of the `Flags` enum. jpayne@69: jpayne@69: class NetworkFilter { jpayne@69: public: jpayne@69: virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0; jpayne@69: // Returns true if incoming connections or datagrams from the given peer should be accepted. jpayne@69: // If false, they will be dropped. This is used to implement kj::Network::restrictPeers(). jpayne@69: jpayne@69: static NetworkFilter& getAllAllowed(); jpayne@69: }; jpayne@69: jpayne@69: virtual Own wrapListenSocketFd( jpayne@69: Fd fd, NetworkFilter& filter, uint flags = 0) = 0; jpayne@69: inline Own wrapListenSocketFd(Fd fd, uint flags = 0) { jpayne@69: return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags); jpayne@69: } jpayne@69: // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already jpayne@69: // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. jpayne@69: // jpayne@69: // `flags` is a bitwise-OR of the values of the `Flags` enum. jpayne@69: jpayne@69: virtual Own wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0); jpayne@69: inline Own wrapDatagramSocketFd(Fd fd, uint flags = 0) { jpayne@69: return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags); jpayne@69: } jpayne@69: jpayne@69: virtual Timer& getTimer() = 0; jpayne@69: // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- jpayne@69: // it only updates when the event loop polls for system events. This means that calling `now()` jpayne@69: // on this timer does not require a system call. jpayne@69: // jpayne@69: // This timer is not affected by changes to the system date. It is unspecified whether the timer jpayne@69: // continues to count while the system is suspended. jpayne@69: jpayne@69: Own wrapInputFd(OwnFd&& fd, uint flags = 0); jpayne@69: Own wrapOutputFd(OwnFd&& fd, uint flags = 0); jpayne@69: Own wrapSocketFd(OwnFd&& fd, uint flags = 0); jpayne@69: #if !_WIN32 jpayne@69: Own wrapUnixSocketFd(OwnFd&& fd, uint flags = 0); jpayne@69: #endif jpayne@69: Promise> wrapConnectingSocketFd( jpayne@69: OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0); jpayne@69: Own wrapListenSocketFd( jpayne@69: OwnFd&& fd, NetworkFilter& filter, uint flags = 0); jpayne@69: Own wrapListenSocketFd(OwnFd&& fd, uint flags = 0); jpayne@69: Own wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0); jpayne@69: Own wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0); jpayne@69: // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle jpayne@69: // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`. jpayne@69: }; jpayne@69: jpayne@69: Own newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); jpayne@69: // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. jpayne@69: jpayne@69: struct AsyncIoContext { jpayne@69: Own lowLevelProvider; jpayne@69: Own provider; jpayne@69: WaitScope& waitScope; jpayne@69: jpayne@69: #if _WIN32 jpayne@69: Win32EventPort& win32EventPort; jpayne@69: #else jpayne@69: UnixEventPort& unixEventPort; jpayne@69: // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This jpayne@69: // field will go away at some point when we have a chance to improve these interfaces. jpayne@69: #endif jpayne@69: }; jpayne@69: jpayne@69: AsyncIoContext setupAsyncIo(); jpayne@69: // Convenience method which sets up the current thread with everything it needs to do async I/O. jpayne@69: // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for jpayne@69: // doing I/O on the host system, so everything is ready for the thread to start making async calls jpayne@69: // and waiting on promises. jpayne@69: // jpayne@69: // You would typically call this in your main() loop or in the start function of a thread. jpayne@69: // Example: jpayne@69: // jpayne@69: // int main() { jpayne@69: // auto ioContext = kj::setupAsyncIo(); jpayne@69: // jpayne@69: // // Now we can call an async function. jpayne@69: // Promise textPromise = getHttp(*ioContext.provider, "http://example.com"); jpayne@69: // jpayne@69: // // And we can wait for the promise to complete. Note that you can only use `wait()` jpayne@69: // // from the top level, not from inside a promise callback. jpayne@69: // String text = textPromise.wait(ioContext.waitScope); jpayne@69: // print(text); jpayne@69: // return 0; jpayne@69: // } jpayne@69: // jpayne@69: // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In jpayne@69: // particular, note that after a fork(), an AsyncIoContext created in the parent process will jpayne@69: // not work correctly in the child, even if the parent ceases to use its copy. In particular jpayne@69: // note that this means that server processes which daemonize themselves at startup must wait jpayne@69: // until after daemonization to create an AsyncIoContext. jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Convenience adapters. jpayne@69: jpayne@69: class CapabilityStreamConnectionReceiver final: public ConnectionReceiver { jpayne@69: // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept() jpayne@69: // calls receiveStream(). jpayne@69: jpayne@69: public: jpayne@69: CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner) jpayne@69: : inner(inner) {} jpayne@69: jpayne@69: Promise> accept() override; jpayne@69: uint getPort() override; jpayne@69: jpayne@69: Promise acceptAuthenticated() override; jpayne@69: // Always produces UnknownIdentity. Capability-based security patterns should not rely on jpayne@69: // authenticating peers; the other end of the capability stream should only be given to jpayne@69: // authorized parties in the first place. jpayne@69: jpayne@69: private: jpayne@69: AsyncCapabilityStream& inner; jpayne@69: }; jpayne@69: jpayne@69: class CapabilityStreamNetworkAddress final: public NetworkAddress { jpayne@69: // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress. jpayne@69: // jpayne@69: // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the jpayne@69: // original capability stream, and returning the other end. If `provider` is null, then the jpayne@69: // global kj::newCapabilityPipe() will be used, but this ONLY works if `inner` itself is agnostic jpayne@69: // to the type of streams it receives, e.g. because it was also created using jpayne@69: // kj::NewCapabilityPipe(). jpayne@69: // jpayne@69: // listen().accept() is implemented by receiving new streams over the original stream. jpayne@69: // jpayne@69: // Note that clone() doesn't work (due to ownership issues) and toString() returns a static jpayne@69: // string. jpayne@69: jpayne@69: public: jpayne@69: CapabilityStreamNetworkAddress(kj::Maybe provider, AsyncCapabilityStream& inner) jpayne@69: : provider(provider), inner(inner) {} jpayne@69: jpayne@69: Promise> connect() override; jpayne@69: Own listen() override; jpayne@69: jpayne@69: Own clone() override; jpayne@69: String toString() override; jpayne@69: jpayne@69: Promise connectAuthenticated() override; jpayne@69: // Always produces UnknownIdentity. Capability-based security patterns should not rely on jpayne@69: // authenticating peers; the other end of the capability stream should only be given to jpayne@69: // authorized parties in the first place. jpayne@69: jpayne@69: private: jpayne@69: kj::Maybe provider; jpayne@69: AsyncCapabilityStream& inner; jpayne@69: }; jpayne@69: jpayne@69: class FileInputStream: public AsyncInputStream { jpayne@69: // InputStream that reads from a disk file -- and enables sendfile() optimization. jpayne@69: // jpayne@69: // Reads are performed synchronously -- no actual attempt is made to use asynchronous file I/O. jpayne@69: // True asynchronous file I/O is complicated and is mostly unnecessary in the presence of jpayne@69: // caching. Only certain niche programs can expect to benefit from it. For the rest, it's better jpayne@69: // to use regular syrchronous disk I/O, so that's what this class does. jpayne@69: // jpayne@69: // The real purpose of this class, aside from general convenience, is to enable sendfile() jpayne@69: // optimization. When you use this class's pumpTo() method, and the destination is a socket, jpayne@69: // the system will detect this and optimize to sendfile(), so that the file data never needs to jpayne@69: // be read into userspace. jpayne@69: // jpayne@69: // NOTE: As of this writing, sendfile() optimization is only implemented on Linux. jpayne@69: jpayne@69: public: jpayne@69: FileInputStream(const ReadableFile& file, uint64_t offset = 0) jpayne@69: : file(file), offset(offset) {} jpayne@69: jpayne@69: const ReadableFile& getUnderlyingFile() { return file; } jpayne@69: uint64_t getOffset() { return offset; } jpayne@69: void seek(uint64_t newOffset) { offset = newOffset; } jpayne@69: jpayne@69: Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes); jpayne@69: Maybe tryGetLength(); jpayne@69: jpayne@69: // (pumpTo() is not actually overridden here, but AsyncStreamFd's tryPumpFrom() will detect when jpayne@69: // the source is a file.) jpayne@69: jpayne@69: private: jpayne@69: const ReadableFile& file; jpayne@69: uint64_t offset; jpayne@69: }; jpayne@69: jpayne@69: class FileOutputStream: public AsyncOutputStream { jpayne@69: // OutputStream that writes to a disk file. jpayne@69: // jpayne@69: // As with FileInputStream, calls are not actually async. Async would be even less useful here jpayne@69: // because writes should usually land in cache anyway. jpayne@69: // jpayne@69: // sendfile() optimization does not apply when writing to a file, but on Linux, splice() can jpayne@69: // be used to achieve a similar effect. jpayne@69: // jpayne@69: // NOTE: As of this writing, splice() optimization is not implemented. jpayne@69: jpayne@69: public: jpayne@69: FileOutputStream(const File& file, uint64_t offset = 0) jpayne@69: : file(file), offset(offset) {} jpayne@69: jpayne@69: const File& getUnderlyingFile() { return file; } jpayne@69: uint64_t getOffset() { return offset; } jpayne@69: void seek(uint64_t newOffset) { offset = newOffset; } jpayne@69: jpayne@69: Promise write(const void* buffer, size_t size); jpayne@69: Promise write(ArrayPtr> pieces); jpayne@69: Promise whenWriteDisconnected(); jpayne@69: jpayne@69: private: jpayne@69: const File& file; jpayne@69: uint64_t offset; jpayne@69: }; jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // inline implementation details jpayne@69: jpayne@69: inline AncillaryMessage::AncillaryMessage( jpayne@69: int level, int type, ArrayPtr data) jpayne@69: : level(level), type(type), data(data) {} jpayne@69: jpayne@69: inline int AncillaryMessage::getLevel() const { return level; } jpayne@69: inline int AncillaryMessage::getType() const { return type; } jpayne@69: jpayne@69: template jpayne@69: inline Maybe AncillaryMessage::as() const { jpayne@69: if (data.size() >= sizeof(T)) { jpayne@69: return *reinterpret_cast(data.begin()); jpayne@69: } else { jpayne@69: return nullptr; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline ArrayPtr AncillaryMessage::asArray() const { jpayne@69: return arrayPtr(reinterpret_cast(data.begin()), data.size() / sizeof(T)); jpayne@69: } jpayne@69: jpayne@69: class SecureNetworkWrapper { jpayne@69: // Abstract interface for a class which implements a "secure" network as a wrapper around an jpayne@69: // insecure one. "secure" means: jpayne@69: // * Connections to a server will only succeed if it can be verified that the requested hostname jpayne@69: // actually belongs to the responding server. jpayne@69: // * No man-in-the-middle attacker can potentially see the bytes sent and received. jpayne@69: // jpayne@69: // The typical implementation uses TLS. The object in this case could be configured to use cerain jpayne@69: // keys, certificates, etc. See kj/compat/tls.h for such an implementation. jpayne@69: // jpayne@69: // However, an implementation could use some other form of encryption, or might not need to use jpayne@69: // encryption at all. For example, imagine a kj::Network that exists only on a single machine, jpayne@69: // providing communications between various processes using unix sockets. Perhaps the "hostnames" jpayne@69: // are actually PIDs in this case. An implementation of such a network could verify the other jpayne@69: // side's identity using an `SCM_CREDENTIALS` auxiliary message, which cannot be forged. Once jpayne@69: // verified, there is no need to encrypt since unix sockets cannot be intercepted. jpayne@69: jpayne@69: public: jpayne@69: virtual kj::Promise> wrapServer(kj::Own stream) = 0; jpayne@69: // Act as the server side of a connection. The given stream is already connected to a client, but jpayne@69: // no authentication has occurred. The returned stream represents the secure transport once jpayne@69: // established. jpayne@69: jpayne@69: virtual kj::Promise> wrapClient( jpayne@69: kj::Own stream, kj::StringPtr expectedServerHostname) = 0; jpayne@69: // Act as the client side of a connection. The given stream is already connecetd to a server, but jpayne@69: // no authentication has occurred. This method will verify that the server actually is the given jpayne@69: // hostname, then return the stream representing a secure transport to that server. jpayne@69: jpayne@69: virtual kj::Promise wrapServer(kj::AuthenticatedStream stream) = 0; jpayne@69: virtual kj::Promise wrapClient( jpayne@69: kj::AuthenticatedStream stream, kj::StringPtr expectedServerHostname) = 0; jpayne@69: // Same as above, but implementing kj::AuthenticatedStream, which provides PeerIdentity objects jpayne@69: // with more details about the peer. The SecureNetworkWrapper will provide its own implementation jpayne@69: // of PeerIdentity with the specific details it is able to authenticate. jpayne@69: jpayne@69: virtual kj::Own wrapPort(kj::Own port) = 0; jpayne@69: // Wrap a connection listener. This is equivalent to calling wrapServer() on every connection jpayne@69: // received. jpayne@69: jpayne@69: virtual kj::Own wrapAddress( jpayne@69: kj::Own address, kj::StringPtr expectedServerHostname) = 0; jpayne@69: // Wrap a NetworkAddress. This is equivalent to calling `wrapClient()` on every connection jpayne@69: // formed by calling `connect()` on the address. jpayne@69: jpayne@69: virtual kj::Own wrapNetwork(kj::Network& network) = 0; jpayne@69: // Wrap a whole `kj::Network`. This automatically wraps everything constructed using the network. jpayne@69: // The network will only accept address strings that can be authenticated, and will automatically jpayne@69: // authenticate servers against those addresses when connecting to them. jpayne@69: }; jpayne@69: jpayne@69: } // namespace kj jpayne@69: jpayne@69: KJ_END_HEADER