Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-io.h @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
2 // Licensed under the MIT License: | |
3 // | |
4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 // of this software and associated documentation files (the "Software"), to deal | |
6 // in the Software without restriction, including without limitation the rights | |
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 // copies of the Software, and to permit persons to whom the Software is | |
9 // furnished to do so, subject to the following conditions: | |
10 // | |
11 // The above copyright notice and this permission notice shall be included in | |
12 // all copies or substantial portions of the Software. | |
13 // | |
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 // THE SOFTWARE. | |
21 | |
22 #pragma once | |
23 | |
24 #include "async.h" | |
25 #include <kj/function.h> | |
26 #include <kj/thread.h> | |
27 #include <kj/timer.h> | |
28 | |
29 KJ_BEGIN_HEADER | |
30 | |
31 struct sockaddr; | |
32 | |
33 namespace kj { | |
34 | |
35 #if _WIN32 | |
36 class Win32EventPort; | |
37 class AutoCloseHandle; | |
38 #else | |
39 class UnixEventPort; | |
40 #endif | |
41 | |
42 class AutoCloseFd; | |
43 class NetworkAddress; | |
44 class AsyncOutputStream; | |
45 class AsyncIoStream; | |
46 class AncillaryMessage; | |
47 | |
48 class ReadableFile; | |
49 class File; | |
50 | |
51 // ======================================================================================= | |
52 // Streaming I/O | |
53 | |
54 class AsyncInputStream: private AsyncObject { | |
55 // Asynchronous equivalent of InputStream (from io.h). | |
56 | |
57 public: | |
58 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes); | |
59 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; | |
60 | |
61 Promise<void> read(void* buffer, size_t bytes); | |
62 | |
63 virtual Maybe<uint64_t> tryGetLength(); | |
64 // Get the remaining number of bytes that will be produced by this stream, if known. | |
65 // | |
66 // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the | |
67 // HTTP implementation may need to fall back to Transfer-Encoding: chunked. | |
68 // | |
69 // The default implementation always returns null. | |
70 | |
71 virtual Promise<uint64_t> pumpTo( | |
72 AsyncOutputStream& output, uint64_t amount = kj::maxValue); | |
73 // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the | |
74 // total bytes actually pumped (which is only less than `amount` if EOF was reached). | |
75 // | |
76 // Override this if your stream type knows how to pump itself to certain kinds of output | |
77 // streams more efficiently than via the naive approach. You can use | |
78 // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match, | |
79 // delegate to the default implementation. | |
80 // | |
81 // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it | |
82 // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop. | |
83 | |
84 Promise<Array<byte>> readAllBytes(uint64_t limit = kj::maxValue); | |
85 Promise<String> readAllText(uint64_t limit = kj::maxValue); | |
86 // Read until EOF and return as one big byte array or string. Throw an exception if EOF is not | |
87 // seen before reading `limit` bytes. | |
88 // | |
89 // To prevent runaway memory allocation, consider using a more conservative value for `limit` than | |
90 // the default, particularly on untrusted data streams which may never see EOF. | |
91 | |
92 virtual void registerAncillaryMessageHandler(Function<void(ArrayPtr<AncillaryMessage>)> fn); | |
93 // Register interest in checking for ancillary messages (aka control messages) when reading. | |
94 // The provided callback will be called whenever any are encountered. The messages passed to | |
95 // the function do not live beyond when function returns. | |
96 // Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this. | |
97 | |
98 virtual Maybe<Own<AsyncInputStream>> tryTee(uint64_t limit = kj::maxValue); | |
99 // Primarily intended as an optimization for the `tee` call. Returns an input stream whose state | |
100 // is independent from this one but which will return the exact same set of bytes read going | |
101 // forward. limit is a total limit on the amount of memory, in bytes, which a tee implementation | |
102 // may use to buffer stream data. An implementation must throw an exception if a read operation | |
103 // would cause the limit to be exceeded. If tryTee() can see that the new limit is impossible to | |
104 // satisfy, it should return nullptr so that the pessimized path is taken in newTee. This is | |
105 // likely to arise if tryTee() is called twice with different limits on the same stream. | |
106 }; | |
107 | |
108 class AsyncOutputStream: private AsyncObject { | |
109 // Asynchronous equivalent of OutputStream (from io.h). | |
110 | |
111 public: | |
112 virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0; | |
113 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) | |
114 KJ_WARN_UNUSED_RESULT = 0; | |
115 | |
116 virtual Maybe<Promise<uint64_t>> tryPumpFrom( | |
117 AsyncInputStream& input, uint64_t amount = kj::maxValue); | |
118 // Implements double-dispatch for AsyncInputStream::pumpTo(). | |
119 // | |
120 // This method should only be called from within an implementation of pumpTo(). | |
121 // | |
122 // This method examines the type of `input` to find optimized ways to pump data from it to this | |
123 // output stream. If it finds one, it performs the pump. Otherwise, it returns null. | |
124 // | |
125 // The default implementation always returns null. | |
126 | |
127 virtual Promise<void> whenWriteDisconnected() = 0; | |
128 // Returns a promise that resolves when the stream has become disconnected such that new write()s | |
129 // will fail with a DISCONNECTED exception. This is particularly useful, for example, to cancel | |
130 // work early when it is detected that no one will receive the result. | |
131 // | |
132 // Note that not all streams are able to detect this condition without actually performing a | |
133 // write(); such stream implementations may return a promise that never resolves. (In particular, | |
134 // as of this writing, whenWriteDisconnected() is not implemented on Windows. Also, for TCP | |
135 // streams, not all disconnects are detectable -- a power or network failure may lead the | |
136 // connection to hang forever, or until configured socket options lead to a timeout.) | |
137 // | |
138 // Unlike most other asynchronous stream methods, it is safe to call whenWriteDisconnected() | |
139 // multiple times without canceling the previous promises. | |
140 }; | |
141 | |
142 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { | |
143 // A combination input and output stream. | |
144 | |
145 public: | |
146 virtual void shutdownWrite() = 0; | |
147 // Cleanly shut down just the write end of the stream, while keeping the read end open. | |
148 | |
149 virtual void abortRead() {} | |
150 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only | |
151 // be called when an error has occurred. | |
152 | |
153 virtual void getsockopt(int level, int option, void* value, uint* length); | |
154 virtual void setsockopt(int level, int option, const void* value, uint length); | |
155 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception | |
156 // if the stream is not a socket or the option is not appropriate for the socket type. The | |
157 // default implementations always throw "unimplemented". | |
158 | |
159 virtual void getsockname(struct sockaddr* addr, uint* length); | |
160 virtual void getpeername(struct sockaddr* addr, uint* length); | |
161 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" | |
162 // exception if the stream is not a socket. The default implementations always throw | |
163 // "unimplemented". | |
164 // | |
165 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't | |
166 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are | |
167 // ephemeral addresses for a single connection. | |
168 | |
169 virtual kj::Maybe<int> getFd() const { return nullptr; } | |
170 // Get the underlying Unix file descriptor, if any. Returns nullptr if this object actually | |
171 // isn't wrapping a file descriptor. | |
172 }; | |
173 | |
174 Promise<uint64_t> unoptimizedPumpTo( | |
175 AsyncInputStream& input, AsyncOutputStream& output, uint64_t amount, | |
176 uint64_t completedSoFar = 0); | |
177 // Performs a pump using read() and write(), without calling the stream's pumpTo() nor | |
178 // tryPumpFrom() methods. This is intended to be used as a fallback by implementations of pumpTo() | |
179 // and tryPumpFrom() when they want to give up on optimization, but can't just call pumpTo() again | |
180 // because this would recursively retry the optimization. unoptimizedPumpTo() should only be called | |
181 // inside implementations of streams, never by the caller of a stream -- use the pumpTo() method | |
182 // instead. | |
183 // | |
184 // `completedSoFar` is the number of bytes out of `amount` that have already been pumped. This is | |
185 // provided for convenience for cases where the caller has already done some pumping before they | |
186 // give up. Otherwise, a `.then()` would need to be used to add the bytes to the final result. | |
187 | |
188 class AsyncCapabilityStream: public AsyncIoStream { | |
189 // An AsyncIoStream that also allows transmitting new stream objects and file descriptors | |
190 // (capabilities, in the object-capability model sense), in addition to bytes. | |
191 // | |
192 // Capabilities can be attached to bytes when they are written. On the receiving end, the read() | |
193 // that receives the first byte of such a message will also receive the capabilities. | |
194 // | |
195 // Note that AsyncIoStream's regular byte-oriented methods can be used on AsyncCapabilityStream, | |
196 // with the effect of silently dropping any capabilities attached to the respective bytes. E.g. | |
197 // using `AsyncIoStream::tryRead()` to read bytes that had been sent with `writeWithFds()` will | |
198 // silently drop the FDs (closing them if appropriate). Also note that pumping a stream with | |
199 // `pumpTo()` always drops all capabilities attached to the pumped data. (TODO(someday): Do we | |
200 // want a version of pumpTo() that preserves capabilities?) | |
201 // | |
202 // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor | |
203 // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally | |
204 // read()s when it should have called receiveStream(), it will observe a NUL byte in the data | |
205 // and the capability will be discarded. Of course, an application should not depend on this | |
206 // behavior; it should avoid read()ing through a capability. | |
207 // | |
208 // KJ does not provide any inter-process implementation of this type on Windows, as there's no | |
209 // obvious implementation there. Handle passing on Windows requires at least one of the processes | |
210 // involved to have permission to modify the other's handle table, which is effectively full | |
211 // control. Handle passing between mutually non-trusting processes would require a trusted | |
212 // broker process to facilitate. One could possibly implement this type in terms of such a | |
213 // broker, or in terms of direct handle passing if at least one process trusts the other. | |
214 | |
215 public: | |
216 virtual Promise<void> writeWithFds(ArrayPtr<const byte> data, | |
217 ArrayPtr<const ArrayPtr<const byte>> moreData, | |
218 ArrayPtr<const int> fds) = 0; | |
219 Promise<void> writeWithFds(ArrayPtr<const byte> data, | |
220 ArrayPtr<const ArrayPtr<const byte>> moreData, | |
221 ArrayPtr<const AutoCloseFd> fds); | |
222 // Write some data to the stream with some file descriptors attached to it. | |
223 // | |
224 // The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed | |
225 // limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is | |
226 // probably a bad idea. | |
227 | |
228 struct ReadResult { | |
229 size_t byteCount; | |
230 size_t capCount; | |
231 }; | |
232 | |
233 virtual Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes, | |
234 AutoCloseFd* fdBuffer, size_t maxFds) = 0; | |
235 // Read data from the stream that may have file descriptors attached. Any attached descriptors | |
236 // will be placed in `fdBuffer`. If multiple bundles of FDs are encountered in the course of | |
237 // reading the amount of data requested by minBytes/maxBytes, then they will be concatenated. If | |
238 // more FDs are received than fit in the buffer, then the excess will be discarded and closed -- | |
239 // this behavior, while ugly, is important to defend against denial-of-service attacks that may | |
240 // fill up the FD table with garbage. Applications must think carefully about how many FDs they | |
241 // really need to receive at once and set a well-defined limit. | |
242 | |
243 virtual Promise<void> writeWithStreams(ArrayPtr<const byte> data, | |
244 ArrayPtr<const ArrayPtr<const byte>> moreData, | |
245 Array<Own<AsyncCapabilityStream>> streams) = 0; | |
246 virtual Promise<ReadResult> tryReadWithStreams( | |
247 void* buffer, size_t minBytes, size_t maxBytes, | |
248 Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) = 0; | |
249 // Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from | |
250 // the same AsyncIoProvider. | |
251 | |
252 // --------------------------------------------------------------------------- | |
253 // Helpers for sending individual capabilities. | |
254 // | |
255 // These are equivalent to the above methods with the constraint that only one FD is | |
256 // sent/received at a time and the corresponding data is a single zero-valued byte. | |
257 | |
258 Promise<Own<AsyncCapabilityStream>> receiveStream(); | |
259 Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream(); | |
260 Promise<void> sendStream(Own<AsyncCapabilityStream> stream); | |
261 // Transfer a single stream. | |
262 | |
263 Promise<AutoCloseFd> receiveFd(); | |
264 Promise<Maybe<AutoCloseFd>> tryReceiveFd(); | |
265 Promise<void> sendFd(int fd); | |
266 // Transfer a single raw file descriptor. | |
267 }; | |
268 | |
269 struct OneWayPipe { | |
270 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) | |
271 | |
272 Own<AsyncInputStream> in; | |
273 Own<AsyncOutputStream> out; | |
274 }; | |
275 | |
276 OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr); | |
277 // Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits | |
278 // until both a read() and a write() call are pending, then resolves both. | |
279 // | |
280 // If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many | |
281 // bytes. The input end's `tryGetLength()` will return the number of bytes left. | |
282 | |
283 struct TwoWayPipe { | |
284 // A data pipe that supports sending in both directions. Each end's output sends data to the | |
285 // other end's input. (Typically backed by socketpair() system call.) | |
286 | |
287 Own<AsyncIoStream> ends[2]; | |
288 }; | |
289 | |
290 TwoWayPipe newTwoWayPipe(); | |
291 // Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits | |
292 // until both a read() and a write() call are pending, then resolves both. | |
293 | |
294 struct CapabilityPipe { | |
295 // Like TwoWayPipe but allowing capability-passing. | |
296 | |
297 Own<AsyncCapabilityStream> ends[2]; | |
298 }; | |
299 | |
300 CapabilityPipe newCapabilityPipe(); | |
301 // Like newTwoWayPipe() but creates a capability pipe. | |
302 // | |
303 // The requirement of `writeWithStreams()` that "The stream implementations must be from the same | |
304 // AsyncIoProvider." does not apply to this pipe; any kind of AsyncCapabilityStream implementation | |
305 // is supported. | |
306 // | |
307 // This implementation does not know how to convert streams to FDs or vice versa; if you write FDs | |
308 // you must read FDs, and if you write streams you must read streams. | |
309 | |
310 struct Tee { | |
311 // Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream. | |
312 | |
313 Own<AsyncInputStream> branches[2]; | |
314 }; | |
315 | |
316 Tee newTee(Own<AsyncInputStream> input, uint64_t limit = kj::maxValue); | |
317 // Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is | |
318 // called on one of the two input ends. If a read or pump operation is subsequently called on the | |
319 // other input end, the buffered data is consumed. | |
320 // | |
321 // `pumpTo()` operations on the input ends will proactively read from the inner stream and block | |
322 // while writing to the output stream. While one branch has an active `pumpTo()` operation, any | |
323 // `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the | |
324 // pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if | |
325 // there are `pumpTo()` operations active on both branches, the greater of the two backpressures is | |
326 // respected -- the two pumps progress in lockstep, and there is no buffering. | |
327 // | |
328 // At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would | |
329 // grow beyond the limit, an exception is generated, which both branches see once they have | |
330 // exhausted their buffers. | |
331 // | |
332 // It is recommended that you use a more conservative value for `limit` than the default. | |
333 | |
334 Own<AsyncOutputStream> newPromisedStream(Promise<Own<AsyncOutputStream>> promise); | |
335 Own<AsyncIoStream> newPromisedStream(Promise<Own<AsyncIoStream>> promise); | |
336 // Constructs an Async*Stream which waits for a promise to resolve, then forwards all calls to the | |
337 // promised stream. | |
338 | |
339 // ======================================================================================= | |
340 // Authenticated streams | |
341 | |
342 class PeerIdentity { | |
343 // PeerIdentity provides information about a connecting client. Various subclasses exist to | |
344 // address different network types. | |
345 public: | |
346 virtual kj::String toString() = 0; | |
347 // Returns a human-readable string identifying the peer. Where possible, this string will be | |
348 // in the same format as the addresses you could pass to `kj::Network::parseAddress()`. However, | |
349 // only certain subclasses of `PeerIdentity` guarantee this property. | |
350 }; | |
351 | |
352 struct AuthenticatedStream { | |
353 // A pair of an `AsyncIoStream` and a `PeerIdentity`. This is used as the return type of | |
354 // `NetworkAddress::connectAuthenticated()` and `ConnectionReceiver::acceptAuthenticated()`. | |
355 | |
356 Own<AsyncIoStream> stream; | |
357 // The byte stream. | |
358 | |
359 Own<PeerIdentity> peerIdentity; | |
360 // An object indicating who is at the other end of the stream. | |
361 // | |
362 // Different subclasses of `PeerIdentity` are used in different situations: | |
363 // - TCP connections will use NetworkPeerIdentity, which gives the network address of the client. | |
364 // - Local (unix) socket connections will use LocalPeerIdentity, which identifies the UID | |
365 // and PID of the process that initiated the connection. | |
366 // - TLS connections will use TlsPeerIdentity which provides details of the client certificate, | |
367 // if any was provided. | |
368 // - When no meaningful peer identity can be provided, `UnknownPeerIdentity` is returned. | |
369 // | |
370 // Implementations of `Network`, `ConnectionReceiver`, `NetworkAddress`, etc. should document the | |
371 // specific assumptions the caller can make about the type of `PeerIdentity`s used, allowing for | |
372 // identities to be statically downcast if the right conditions are met. In the absence of | |
373 // documented promises, RTTI may be needed to query the type. | |
374 }; | |
375 | |
376 class NetworkPeerIdentity: public PeerIdentity { | |
377 // PeerIdentity used for network protocols like TCP/IP. This identifies the remote peer. | |
378 // | |
379 // This is only "authenticated" to the extent that we know data written to the stream will be | |
380 // routed to the given address. This does not preclude the possibility of man-in-the-middle | |
381 // attacks by attackers who are able to manipulate traffic along the route. | |
382 public: | |
383 virtual NetworkAddress& getAddress() = 0; | |
384 // Obtain the peer's address as a NetworkAddress object. The returned reference's lifetime is the | |
385 // same as the `NetworkPeerIdentity`, but you can always call `clone()` on it to get a copy that | |
386 // lives longer. | |
387 | |
388 static kj::Own<NetworkPeerIdentity> newInstance(kj::Own<NetworkAddress> addr); | |
389 // Construct an instance of this interface wrapping the given address. | |
390 }; | |
391 | |
392 class LocalPeerIdentity: public PeerIdentity { | |
393 // PeerIdentity used for connections between processes on the local machine -- in particular, | |
394 // Unix sockets. | |
395 // | |
396 // (This interface probably isn't useful on Windows.) | |
397 public: | |
398 struct Credentials { | |
399 kj::Maybe<int> pid; | |
400 kj::Maybe<uint> uid; | |
401 | |
402 // We don't cover groups at present because some systems produce a list of groups while others | |
403 // only provide the peer's main group, the latter being pretty useless. | |
404 }; | |
405 | |
406 virtual Credentials getCredentials() = 0; | |
407 // Get the PID and UID of the peer process, if possible. | |
408 // | |
409 // Either ID may be null if the peer could not be identified. Some operating systems do not | |
410 // support retrieving these credentials, or can only provide one or the other. Some situations | |
411 // (like user and PID namespaces on Linux) may also make it impossible to represent the peer's | |
412 // credentials accurately. | |
413 // | |
414 // Note the meaning here can be subtle. Multiple processes can potentially have the socket in | |
415 // their file descriptor tables. The identified process is the one who called `connect()` or | |
416 // `listen()`. | |
417 // | |
418 // On Linux this is implemented with SO_PEERCRED. | |
419 | |
420 static kj::Own<LocalPeerIdentity> newInstance(Credentials creds); | |
421 // Construct an instance of this interface wrapping the given credentials. | |
422 }; | |
423 | |
424 class UnknownPeerIdentity: public PeerIdentity { | |
425 public: | |
426 static kj::Own<UnknownPeerIdentity> newInstance(); | |
427 // Get an instance of this interface. This actually always returns the same instance with no | |
428 // memory allocation. | |
429 }; | |
430 | |
431 // ======================================================================================= | |
432 // Accepting connections | |
433 | |
434 class ConnectionReceiver: private AsyncObject { | |
435 // Represents a server socket listening on a port. | |
436 | |
437 public: | |
438 virtual Promise<Own<AsyncIoStream>> accept() = 0; | |
439 // Accept the next incoming connection. | |
440 | |
441 virtual Promise<AuthenticatedStream> acceptAuthenticated(); | |
442 // Accept the next incoming connection, and also provide a PeerIdentity with any information | |
443 // about the client. | |
444 // | |
445 // For backwards-compatibility, the default implementation of this method calls `accept()` and | |
446 // then adds `UnknownPeerIdentity`. | |
447 | |
448 virtual uint getPort() = 0; | |
449 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't | |
450 // specify a port when constructing the NetworkAddress -- one will have been assigned | |
451 // automatically. | |
452 | |
453 virtual void getsockopt(int level, int option, void* value, uint* length); | |
454 virtual void setsockopt(int level, int option, const void* value, uint length); | |
455 virtual void getsockname(struct sockaddr* addr, uint* length); | |
456 // Same as the methods of AsyncIoStream. | |
457 }; | |
458 | |
459 Own<ConnectionReceiver> newAggregateConnectionReceiver(Array<Own<ConnectionReceiver>> receivers); | |
460 // Create a ConnectionReceiver that listens on several other ConnectionReceivers and returns | |
461 // sockets from any of them. | |
462 | |
463 // ======================================================================================= | |
464 // Datagram I/O | |
465 | |
466 class AncillaryMessage { | |
467 // Represents an ancillary message (aka control message) received using the recvmsg() system | |
468 // call (or equivalent). Most apps will not use this. | |
469 | |
470 public: | |
471 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data); | |
472 AncillaryMessage() = default; | |
473 | |
474 inline int getLevel() const; | |
475 // Originating protocol / socket level. | |
476 | |
477 inline int getType() const; | |
478 // Protocol-specific message type. | |
479 | |
480 template <typename T> | |
481 inline Maybe<const T&> as() const; | |
482 // Interpret the ancillary message as the given struct type. Most ancillary messages are some | |
483 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message | |
484 // is smaller than the struct -- this can happen if the message was truncated due to | |
485 // insufficient ancillary buffer space. | |
486 | |
487 template <typename T> | |
488 inline ArrayPtr<const T> asArray() const; | |
489 // Interpret the ancillary message as an array of items. If the message size does not evenly | |
490 // divide into elements of type T, the remainder is discarded -- this can happen if the message | |
491 // was truncated due to insufficient ancillary buffer space. | |
492 | |
493 private: | |
494 int level; | |
495 int type; | |
496 ArrayPtr<const byte> data; | |
497 // Message data. In most cases you should use `as()` or `asArray()`. | |
498 }; | |
499 | |
500 class DatagramReceiver { | |
501 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's | |
502 // capacity in advance; if a received packet is larger than the capacity, it will be truncated. | |
503 | |
504 public: | |
505 virtual Promise<void> receive() = 0; | |
506 // Receive a new message, overwriting this object's content. | |
507 // | |
508 // receive() may reuse the same buffers for content and ancillary data with each call. | |
509 | |
510 template <typename T> | |
511 struct MaybeTruncated { | |
512 T value; | |
513 | |
514 bool isTruncated; | |
515 // True if the Receiver's capacity was insufficient to receive the value and therefore the | |
516 // value is truncated. | |
517 }; | |
518 | |
519 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0; | |
520 // Get the content of the datagram. | |
521 | |
522 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0; | |
523 // Ancillary messages received with the datagram. See the recvmsg() system call and the cmsghdr | |
524 // struct. Most apps don't need this. | |
525 // | |
526 // If the returned value is truncated, then the last message in the array may itself be | |
527 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will | |
528 // return fewer elements than expected. Truncation can also mean that additional messages were | |
529 // available but discarded. | |
530 | |
531 virtual NetworkAddress& getSource() = 0; | |
532 // Get the datagram sender's address. | |
533 | |
534 struct Capacity { | |
535 size_t content = 8192; | |
536 // How much space to allocate for the datagram content. If a datagram is received that is | |
537 // larger than this, it will be truncated, with no way to recover the tail. | |
538 | |
539 size_t ancillary = 0; | |
540 // How much space to allocate for ancillary messages. As with content, if the ancillary data | |
541 // is larger than this, it will be truncated. | |
542 }; | |
543 }; | |
544 | |
545 class DatagramPort { | |
546 public: | |
547 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0; | |
548 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces, | |
549 NetworkAddress& destination) = 0; | |
550 | |
551 virtual Own<DatagramReceiver> makeReceiver( | |
552 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; | |
553 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much | |
554 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. | |
555 | |
556 virtual uint getPort() = 0; | |
557 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't | |
558 // specify a port when constructing the NetworkAddress -- one will have been assigned | |
559 // automatically. | |
560 | |
561 virtual void getsockopt(int level, int option, void* value, uint* length); | |
562 virtual void setsockopt(int level, int option, const void* value, uint length); | |
563 // Same as the methods of AsyncIoStream. | |
564 }; | |
565 | |
566 // ======================================================================================= | |
567 // Networks | |
568 | |
569 class NetworkAddress: private AsyncObject { | |
570 // Represents a remote address to which the application can connect. | |
571 | |
572 public: | |
573 virtual Promise<Own<AsyncIoStream>> connect() = 0; | |
574 // Make a new connection to this address. | |
575 // | |
576 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. | |
577 | |
578 virtual Promise<AuthenticatedStream> connectAuthenticated(); | |
579 // Connect to the address and return both the connection and information about the peer identity. | |
580 // This is especially useful when using TLS, to get certificate details. | |
581 // | |
582 // For backwards-compatibility, the default implementation of this method calls `connect()` and | |
583 // then uses a `NetworkPeerIdentity` wrapping a clone of this `NetworkAddress` -- which is not | |
584 // particularly useful. | |
585 | |
586 virtual Own<ConnectionReceiver> listen() = 0; | |
587 // Listen for incoming connections on this address. | |
588 // | |
589 // The address must be local. | |
590 | |
591 virtual Own<DatagramPort> bindDatagramPort(); | |
592 // Open this address as a datagram (e.g. UDP) port. | |
593 // | |
594 // The address must be local. | |
595 | |
596 virtual Own<NetworkAddress> clone() = 0; | |
597 // Returns an equivalent copy of this NetworkAddress. | |
598 | |
599 virtual String toString() = 0; | |
600 // Produce a human-readable string which hopefully can be passed to Network::parseAddress() | |
601 // to reproduce this address, although whether or not that works of course depends on the Network | |
602 // implementation. This should be called only to display the address to human users, who will | |
603 // hopefully know what they are able to do with it. | |
604 }; | |
605 | |
606 class Network { | |
607 // Factory for NetworkAddress instances, representing the network services offered by the | |
608 // operating system. | |
609 // | |
610 // This interface typically represents broad authority, and well-designed code should limit its | |
611 // use to high-level startup code and user interaction. Low-level APIs should accept | |
612 // NetworkAddress instances directly and work from there, if at all possible. | |
613 | |
614 public: | |
615 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0; | |
616 // Construct a network address from a user-provided string. The format of the address | |
617 // strings is not specified at the API level, and application code should make no assumptions | |
618 // about them. These strings should always be provided by humans, and said humans will know | |
619 // what format to use in their particular context. | |
620 // | |
621 // `portHint`, if provided, specifies the "standard" IP port number for the application-level | |
622 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a | |
623 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is | |
624 // omitted, then the returned address will only support listen() and bindDatagramPort() | |
625 // (not connect()), and an unused port will be chosen each time one of those methods is called. | |
626 | |
627 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; | |
628 // Construct a network address from a legacy struct sockaddr. | |
629 | |
630 virtual Own<Network> restrictPeers( | |
631 kj::ArrayPtr<const kj::StringPtr> allow, | |
632 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0; | |
633 // Constructs a new Network instance wrapping this one which restricts which peer addresses are | |
634 // permitted (both for outgoing and incoming connections). | |
635 // | |
636 // Communication will be allowed only with peers whose addresses match one of the patterns | |
637 // specified in the `allow` array. If a `deny` array is specified, then any address which matches | |
638 // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be | |
639 // denied. | |
640 // | |
641 // The syntax of address patterns depends on the network, except that three special patterns are | |
642 // defined for all networks: | |
643 // - "private": Matches network addresses that are reserved by standards for private networks, | |
644 // such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local". | |
645 // - "public": Opposite of "private". | |
646 // - "local": Matches network addresses that are defined by standards to only be accessible from | |
647 // the local machine, such as "127.0.0.0/8" or Unix domain addresses. | |
648 // - "network": Opposite of "local". | |
649 // | |
650 // For the standard KJ network implementation, the following patterns are also recognized: | |
651 // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or | |
652 // "2001:db8::/32". | |
653 // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a | |
654 // glob.) | |
655 // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may | |
656 // support specifying a glob.) | |
657 // | |
658 // Network restrictions apply *after* DNS resolution (otherwise they'd be useless). | |
659 // | |
660 // It is legal to parseAddress() a restricted address. An exception won't be thrown until | |
661 // connect() is called. | |
662 // | |
663 // It's possible to listen() on a restricted address. However, connections will only be accepted | |
664 // from non-restricted addresses; others will be dropped. If a particular listen address has no | |
665 // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then | |
666 // listen() may throw (or may simply never receive any connections). | |
667 // | |
668 // Examples: | |
669 // | |
670 // auto restricted = network->restrictPeers({"public"}); | |
671 // | |
672 // Allows connections only to/from public internet addresses. Use this when connecting to an | |
673 // address specified by a third party that is not trusted and is not themselves already on your | |
674 // private network. | |
675 // | |
676 // auto restricted = network->restrictPeers({"private"}); | |
677 // | |
678 // Allows connections only to/from the private network. Use this on the server side to reject | |
679 // connections from the public internet. | |
680 // | |
681 // auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"}); | |
682 // | |
683 // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked. | |
684 // | |
685 // auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"}); | |
686 // | |
687 // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an | |
688 // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow | |
689 // rule that is more specific than the deny rule). | |
690 }; | |
691 | |
692 // ======================================================================================= | |
693 // I/O Provider | |
694 | |
695 class AsyncIoProvider { | |
696 // Class which constructs asynchronous wrappers around the operating system's I/O facilities. | |
697 // | |
698 // Generally, the implementation of this interface must integrate closely with a particular | |
699 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide | |
700 // an AsyncIoProvider. | |
701 | |
702 public: | |
703 virtual OneWayPipe newOneWayPipe() = 0; | |
704 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with | |
705 // the pipe(2) system call). | |
706 | |
707 virtual TwoWayPipe newTwoWayPipe() = 0; | |
708 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with | |
709 // socketpair(2) system call). Data written to one end can be read from the other. | |
710 | |
711 virtual CapabilityPipe newCapabilityPipe(); | |
712 // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe. | |
713 // | |
714 // The default implementation throws an unimplemented exception. In particular this is not | |
715 // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to | |
716 // pass handles over a stream. | |
717 | |
718 virtual Network& getNetwork() = 0; | |
719 // Creates a new `Network` instance representing the networks exposed by the operating system. | |
720 // | |
721 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If | |
722 // you call this from low-level code, then you are preventing higher-level code from injecting an | |
723 // alternative implementation. Instead, if your code needs to use network functionality, it | |
724 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can | |
725 // chose what implementation to use. The system network is essentially a singleton. See: | |
726 // http://www.object-oriented-security.org/lets-argue/singletons | |
727 // | |
728 // Code that uses the system network should not make any assumptions about what kinds of | |
729 // addresses it will parse, as this could differ across platforms. String addresses should come | |
730 // strictly from the user, who will know how to write them correctly for their system. | |
731 // | |
732 // With that said, KJ currently supports the following string address formats: | |
733 // - IPv4: "1.2.3.4", "1.2.3.4:80" | |
734 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" | |
735 // - Local IP wildcard (covers both v4 and v6): "*", "*:80" | |
736 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" | |
737 // - Unix domain: "unix:/path/to/socket" | |
738 | |
739 struct PipeThread { | |
740 // A combination of a thread and a two-way pipe that communicates with that thread. | |
741 // | |
742 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore | |
743 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread | |
744 // arranges to exit when it detects disconnect, destruction should be clean. | |
745 | |
746 Own<Thread> thread; | |
747 Own<AsyncIoStream> pipe; | |
748 }; | |
749 | |
750 virtual PipeThread newPipeThread( | |
751 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0; | |
752 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate | |
753 // with it. One end of the pipe is passed to the thread's start function and the other end of | |
754 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will | |
755 // already have an active `EventLoop` when `startFunc` is called. | |
756 // | |
757 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too | |
758 // much at once but I'm not sure how to cleanly break it down. | |
759 | |
760 virtual Timer& getTimer() = 0; | |
761 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- | |
762 // it only updates when the event loop polls for system events. This means that calling `now()` | |
763 // on this timer does not require a system call. | |
764 // | |
765 // This timer is not affected by changes to the system date. It is unspecified whether the timer | |
766 // continues to count while the system is suspended. | |
767 }; | |
768 | |
769 class LowLevelAsyncIoProvider { | |
770 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on | |
771 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface | |
772 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. | |
773 // | |
774 // On Unix, this interface can be used to import native file descriptors into the async framework. | |
775 // Different implementations of this interface might work on top of different event handling | |
776 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. | |
777 // | |
778 // On Windows, this interface can be used to import native SOCKETs into the async framework. | |
779 // Different implementations of this interface might work on top of different event handling | |
780 // primitives, such as I/O completion ports vs. completion routines. | |
781 | |
782 public: | |
783 enum Flags { | |
784 // Flags controlling how to wrap a file descriptor. | |
785 | |
786 TAKE_OWNERSHIP = 1 << 0, | |
787 // The returned object should own the file descriptor, automatically closing it when destroyed. | |
788 // The close-on-exec flag will be set on the descriptor if it is not already. | |
789 // | |
790 // If this flag is not used, then the file descriptor is not automatically closed and the | |
791 // close-on-exec flag is not modified. | |
792 | |
793 #if !_WIN32 | |
794 ALREADY_CLOEXEC = 1 << 1, | |
795 // Indicates that the close-on-exec flag is known already to be set, so need not be set again. | |
796 // Only relevant when combined with TAKE_OWNERSHIP. | |
797 // | |
798 // On Linux, all system calls which yield new file descriptors have flags or variants which | |
799 // set the close-on-exec flag immediately. Unfortunately, other OS's do not. | |
800 | |
801 ALREADY_NONBLOCK = 1 << 2 | |
802 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag | |
803 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode | |
804 // automatically. | |
805 // | |
806 // On Linux, all system calls which yield new file descriptors have flags or variants which | |
807 // enable non-blocking mode immediately. Unfortunately, other OS's do not. | |
808 #endif | |
809 }; | |
810 | |
811 #if _WIN32 | |
812 typedef uintptr_t Fd; | |
813 typedef AutoCloseHandle OwnFd; | |
814 // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the | |
815 // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify | |
816 // explicitly). | |
817 #else | |
818 typedef int Fd; | |
819 typedef AutoCloseFd OwnFd; | |
820 // On Unix, any arbitrary file descriptor is supported. | |
821 #endif | |
822 | |
823 virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0; | |
824 // Create an AsyncInputStream wrapping a file descriptor. | |
825 // | |
826 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
827 | |
828 virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0; | |
829 // Create an AsyncOutputStream wrapping a file descriptor. | |
830 // | |
831 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
832 | |
833 virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0; | |
834 // Create an AsyncIoStream wrapping a socket file descriptor. | |
835 // | |
836 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
837 | |
838 #if !_WIN32 | |
839 virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0); | |
840 // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be | |
841 // a Unix domain socket. | |
842 // | |
843 // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with | |
844 // LowLevelAsyncIoProvider implementations written before this method was added. | |
845 #endif | |
846 | |
847 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( | |
848 Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; | |
849 // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address. | |
850 // The returned promise does not resolve until connection has completed. | |
851 // | |
852 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
853 | |
854 class NetworkFilter { | |
855 public: | |
856 virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0; | |
857 // Returns true if incoming connections or datagrams from the given peer should be accepted. | |
858 // If false, they will be dropped. This is used to implement kj::Network::restrictPeers(). | |
859 | |
860 static NetworkFilter& getAllAllowed(); | |
861 }; | |
862 | |
863 virtual Own<ConnectionReceiver> wrapListenSocketFd( | |
864 Fd fd, NetworkFilter& filter, uint flags = 0) = 0; | |
865 inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) { | |
866 return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags); | |
867 } | |
868 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already | |
869 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. | |
870 // | |
871 // `flags` is a bitwise-OR of the values of the `Flags` enum. | |
872 | |
873 virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0); | |
874 inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) { | |
875 return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags); | |
876 } | |
877 | |
878 virtual Timer& getTimer() = 0; | |
879 // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- | |
880 // it only updates when the event loop polls for system events. This means that calling `now()` | |
881 // on this timer does not require a system call. | |
882 // | |
883 // This timer is not affected by changes to the system date. It is unspecified whether the timer | |
884 // continues to count while the system is suspended. | |
885 | |
886 Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0); | |
887 Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0); | |
888 Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0); | |
889 #if !_WIN32 | |
890 Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0); | |
891 #endif | |
892 Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( | |
893 OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0); | |
894 Own<ConnectionReceiver> wrapListenSocketFd( | |
895 OwnFd&& fd, NetworkFilter& filter, uint flags = 0); | |
896 Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0); | |
897 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0); | |
898 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0); | |
899 // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle | |
900 // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`. | |
901 }; | |
902 | |
903 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); | |
904 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. | |
905 | |
906 struct AsyncIoContext { | |
907 Own<LowLevelAsyncIoProvider> lowLevelProvider; | |
908 Own<AsyncIoProvider> provider; | |
909 WaitScope& waitScope; | |
910 | |
911 #if _WIN32 | |
912 Win32EventPort& win32EventPort; | |
913 #else | |
914 UnixEventPort& unixEventPort; | |
915 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This | |
916 // field will go away at some point when we have a chance to improve these interfaces. | |
917 #endif | |
918 }; | |
919 | |
920 AsyncIoContext setupAsyncIo(); | |
921 // Convenience method which sets up the current thread with everything it needs to do async I/O. | |
922 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for | |
923 // doing I/O on the host system, so everything is ready for the thread to start making async calls | |
924 // and waiting on promises. | |
925 // | |
926 // You would typically call this in your main() loop or in the start function of a thread. | |
927 // Example: | |
928 // | |
929 // int main() { | |
930 // auto ioContext = kj::setupAsyncIo(); | |
931 // | |
932 // // Now we can call an async function. | |
933 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com"); | |
934 // | |
935 // // And we can wait for the promise to complete. Note that you can only use `wait()` | |
936 // // from the top level, not from inside a promise callback. | |
937 // String text = textPromise.wait(ioContext.waitScope); | |
938 // print(text); | |
939 // return 0; | |
940 // } | |
941 // | |
942 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In | |
943 // particular, note that after a fork(), an AsyncIoContext created in the parent process will | |
944 // not work correctly in the child, even if the parent ceases to use its copy. In particular | |
945 // note that this means that server processes which daemonize themselves at startup must wait | |
946 // until after daemonization to create an AsyncIoContext. | |
947 | |
948 // ======================================================================================= | |
949 // Convenience adapters. | |
950 | |
951 class CapabilityStreamConnectionReceiver final: public ConnectionReceiver { | |
952 // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept() | |
953 // calls receiveStream(). | |
954 | |
955 public: | |
956 CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner) | |
957 : inner(inner) {} | |
958 | |
959 Promise<Own<AsyncIoStream>> accept() override; | |
960 uint getPort() override; | |
961 | |
962 Promise<AuthenticatedStream> acceptAuthenticated() override; | |
963 // Always produces UnknownIdentity. Capability-based security patterns should not rely on | |
964 // authenticating peers; the other end of the capability stream should only be given to | |
965 // authorized parties in the first place. | |
966 | |
967 private: | |
968 AsyncCapabilityStream& inner; | |
969 }; | |
970 | |
971 class CapabilityStreamNetworkAddress final: public NetworkAddress { | |
972 // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress. | |
973 // | |
974 // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the | |
975 // original capability stream, and returning the other end. If `provider` is null, then the | |
976 // global kj::newCapabilityPipe() will be used, but this ONLY works if `inner` itself is agnostic | |
977 // to the type of streams it receives, e.g. because it was also created using | |
978 // kj::NewCapabilityPipe(). | |
979 // | |
980 // listen().accept() is implemented by receiving new streams over the original stream. | |
981 // | |
982 // Note that clone() doesn't work (due to ownership issues) and toString() returns a static | |
983 // string. | |
984 | |
985 public: | |
986 CapabilityStreamNetworkAddress(kj::Maybe<AsyncIoProvider&> provider, AsyncCapabilityStream& inner) | |
987 : provider(provider), inner(inner) {} | |
988 | |
989 Promise<Own<AsyncIoStream>> connect() override; | |
990 Own<ConnectionReceiver> listen() override; | |
991 | |
992 Own<NetworkAddress> clone() override; | |
993 String toString() override; | |
994 | |
995 Promise<AuthenticatedStream> connectAuthenticated() override; | |
996 // Always produces UnknownIdentity. Capability-based security patterns should not rely on | |
997 // authenticating peers; the other end of the capability stream should only be given to | |
998 // authorized parties in the first place. | |
999 | |
1000 private: | |
1001 kj::Maybe<AsyncIoProvider&> provider; | |
1002 AsyncCapabilityStream& inner; | |
1003 }; | |
1004 | |
1005 class FileInputStream: public AsyncInputStream { | |
1006 // InputStream that reads from a disk file -- and enables sendfile() optimization. | |
1007 // | |
1008 // Reads are performed synchronously -- no actual attempt is made to use asynchronous file I/O. | |
1009 // True asynchronous file I/O is complicated and is mostly unnecessary in the presence of | |
1010 // caching. Only certain niche programs can expect to benefit from it. For the rest, it's better | |
1011 // to use regular syrchronous disk I/O, so that's what this class does. | |
1012 // | |
1013 // The real purpose of this class, aside from general convenience, is to enable sendfile() | |
1014 // optimization. When you use this class's pumpTo() method, and the destination is a socket, | |
1015 // the system will detect this and optimize to sendfile(), so that the file data never needs to | |
1016 // be read into userspace. | |
1017 // | |
1018 // NOTE: As of this writing, sendfile() optimization is only implemented on Linux. | |
1019 | |
1020 public: | |
1021 FileInputStream(const ReadableFile& file, uint64_t offset = 0) | |
1022 : file(file), offset(offset) {} | |
1023 | |
1024 const ReadableFile& getUnderlyingFile() { return file; } | |
1025 uint64_t getOffset() { return offset; } | |
1026 void seek(uint64_t newOffset) { offset = newOffset; } | |
1027 | |
1028 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes); | |
1029 Maybe<uint64_t> tryGetLength(); | |
1030 | |
1031 // (pumpTo() is not actually overridden here, but AsyncStreamFd's tryPumpFrom() will detect when | |
1032 // the source is a file.) | |
1033 | |
1034 private: | |
1035 const ReadableFile& file; | |
1036 uint64_t offset; | |
1037 }; | |
1038 | |
1039 class FileOutputStream: public AsyncOutputStream { | |
1040 // OutputStream that writes to a disk file. | |
1041 // | |
1042 // As with FileInputStream, calls are not actually async. Async would be even less useful here | |
1043 // because writes should usually land in cache anyway. | |
1044 // | |
1045 // sendfile() optimization does not apply when writing to a file, but on Linux, splice() can | |
1046 // be used to achieve a similar effect. | |
1047 // | |
1048 // NOTE: As of this writing, splice() optimization is not implemented. | |
1049 | |
1050 public: | |
1051 FileOutputStream(const File& file, uint64_t offset = 0) | |
1052 : file(file), offset(offset) {} | |
1053 | |
1054 const File& getUnderlyingFile() { return file; } | |
1055 uint64_t getOffset() { return offset; } | |
1056 void seek(uint64_t newOffset) { offset = newOffset; } | |
1057 | |
1058 Promise<void> write(const void* buffer, size_t size); | |
1059 Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces); | |
1060 Promise<void> whenWriteDisconnected(); | |
1061 | |
1062 private: | |
1063 const File& file; | |
1064 uint64_t offset; | |
1065 }; | |
1066 | |
1067 // ======================================================================================= | |
1068 // inline implementation details | |
1069 | |
1070 inline AncillaryMessage::AncillaryMessage( | |
1071 int level, int type, ArrayPtr<const byte> data) | |
1072 : level(level), type(type), data(data) {} | |
1073 | |
1074 inline int AncillaryMessage::getLevel() const { return level; } | |
1075 inline int AncillaryMessage::getType() const { return type; } | |
1076 | |
1077 template <typename T> | |
1078 inline Maybe<const T&> AncillaryMessage::as() const { | |
1079 if (data.size() >= sizeof(T)) { | |
1080 return *reinterpret_cast<const T*>(data.begin()); | |
1081 } else { | |
1082 return nullptr; | |
1083 } | |
1084 } | |
1085 | |
1086 template <typename T> | |
1087 inline ArrayPtr<const T> AncillaryMessage::asArray() const { | |
1088 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T)); | |
1089 } | |
1090 | |
1091 class SecureNetworkWrapper { | |
1092 // Abstract interface for a class which implements a "secure" network as a wrapper around an | |
1093 // insecure one. "secure" means: | |
1094 // * Connections to a server will only succeed if it can be verified that the requested hostname | |
1095 // actually belongs to the responding server. | |
1096 // * No man-in-the-middle attacker can potentially see the bytes sent and received. | |
1097 // | |
1098 // The typical implementation uses TLS. The object in this case could be configured to use cerain | |
1099 // keys, certificates, etc. See kj/compat/tls.h for such an implementation. | |
1100 // | |
1101 // However, an implementation could use some other form of encryption, or might not need to use | |
1102 // encryption at all. For example, imagine a kj::Network that exists only on a single machine, | |
1103 // providing communications between various processes using unix sockets. Perhaps the "hostnames" | |
1104 // are actually PIDs in this case. An implementation of such a network could verify the other | |
1105 // side's identity using an `SCM_CREDENTIALS` auxiliary message, which cannot be forged. Once | |
1106 // verified, there is no need to encrypt since unix sockets cannot be intercepted. | |
1107 | |
1108 public: | |
1109 virtual kj::Promise<kj::Own<kj::AsyncIoStream>> wrapServer(kj::Own<kj::AsyncIoStream> stream) = 0; | |
1110 // Act as the server side of a connection. The given stream is already connected to a client, but | |
1111 // no authentication has occurred. The returned stream represents the secure transport once | |
1112 // established. | |
1113 | |
1114 virtual kj::Promise<kj::Own<kj::AsyncIoStream>> wrapClient( | |
1115 kj::Own<kj::AsyncIoStream> stream, kj::StringPtr expectedServerHostname) = 0; | |
1116 // Act as the client side of a connection. The given stream is already connecetd to a server, but | |
1117 // no authentication has occurred. This method will verify that the server actually is the given | |
1118 // hostname, then return the stream representing a secure transport to that server. | |
1119 | |
1120 virtual kj::Promise<kj::AuthenticatedStream> wrapServer(kj::AuthenticatedStream stream) = 0; | |
1121 virtual kj::Promise<kj::AuthenticatedStream> wrapClient( | |
1122 kj::AuthenticatedStream stream, kj::StringPtr expectedServerHostname) = 0; | |
1123 // Same as above, but implementing kj::AuthenticatedStream, which provides PeerIdentity objects | |
1124 // with more details about the peer. The SecureNetworkWrapper will provide its own implementation | |
1125 // of PeerIdentity with the specific details it is able to authenticate. | |
1126 | |
1127 virtual kj::Own<kj::ConnectionReceiver> wrapPort(kj::Own<kj::ConnectionReceiver> port) = 0; | |
1128 // Wrap a connection listener. This is equivalent to calling wrapServer() on every connection | |
1129 // received. | |
1130 | |
1131 virtual kj::Own<kj::NetworkAddress> wrapAddress( | |
1132 kj::Own<kj::NetworkAddress> address, kj::StringPtr expectedServerHostname) = 0; | |
1133 // Wrap a NetworkAddress. This is equivalent to calling `wrapClient()` on every connection | |
1134 // formed by calling `connect()` on the address. | |
1135 | |
1136 virtual kj::Own<kj::Network> wrapNetwork(kj::Network& network) = 0; | |
1137 // Wrap a whole `kj::Network`. This automatically wraps everything constructed using the network. | |
1138 // The network will only accept address strings that can be authenticated, and will automatically | |
1139 // authenticate servers against those addresses when connecting to them. | |
1140 }; | |
1141 | |
1142 } // namespace kj | |
1143 | |
1144 KJ_END_HEADER |