jpayne@69
|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
jpayne@69
|
2 // Licensed under the MIT License:
|
jpayne@69
|
3 //
|
jpayne@69
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
jpayne@69
|
5 // of this software and associated documentation files (the "Software"), to deal
|
jpayne@69
|
6 // in the Software without restriction, including without limitation the rights
|
jpayne@69
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
jpayne@69
|
8 // copies of the Software, and to permit persons to whom the Software is
|
jpayne@69
|
9 // furnished to do so, subject to the following conditions:
|
jpayne@69
|
10 //
|
jpayne@69
|
11 // The above copyright notice and this permission notice shall be included in
|
jpayne@69
|
12 // all copies or substantial portions of the Software.
|
jpayne@69
|
13 //
|
jpayne@69
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
jpayne@69
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
jpayne@69
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
jpayne@69
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
jpayne@69
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
jpayne@69
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
jpayne@69
|
20 // THE SOFTWARE.
|
jpayne@69
|
21
|
jpayne@69
|
22 #pragma once
|
jpayne@69
|
23
|
jpayne@69
|
24 #include "async.h"
|
jpayne@69
|
25 #include <kj/function.h>
|
jpayne@69
|
26 #include <kj/thread.h>
|
jpayne@69
|
27 #include <kj/timer.h>
|
jpayne@69
|
28
|
jpayne@69
|
29 KJ_BEGIN_HEADER
|
jpayne@69
|
30
|
jpayne@69
|
31 struct sockaddr;
|
jpayne@69
|
32
|
jpayne@69
|
33 namespace kj {
|
jpayne@69
|
34
|
jpayne@69
|
35 #if _WIN32
|
jpayne@69
|
36 class Win32EventPort;
|
jpayne@69
|
37 class AutoCloseHandle;
|
jpayne@69
|
38 #else
|
jpayne@69
|
39 class UnixEventPort;
|
jpayne@69
|
40 #endif
|
jpayne@69
|
41
|
jpayne@69
|
42 class AutoCloseFd;
|
jpayne@69
|
43 class NetworkAddress;
|
jpayne@69
|
44 class AsyncOutputStream;
|
jpayne@69
|
45 class AsyncIoStream;
|
jpayne@69
|
46 class AncillaryMessage;
|
jpayne@69
|
47
|
jpayne@69
|
48 class ReadableFile;
|
jpayne@69
|
49 class File;
|
jpayne@69
|
50
|
jpayne@69
|
51 // =======================================================================================
|
jpayne@69
|
52 // Streaming I/O
|
jpayne@69
|
53
|
jpayne@69
|
54 class AsyncInputStream: private AsyncObject {
|
jpayne@69
|
55 // Asynchronous equivalent of InputStream (from io.h).
|
jpayne@69
|
56
|
jpayne@69
|
57 public:
|
jpayne@69
|
58 virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes);
|
jpayne@69
|
59 virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
|
jpayne@69
|
60
|
jpayne@69
|
61 Promise<void> read(void* buffer, size_t bytes);
|
jpayne@69
|
62
|
jpayne@69
|
63 virtual Maybe<uint64_t> tryGetLength();
|
jpayne@69
|
64 // Get the remaining number of bytes that will be produced by this stream, if known.
|
jpayne@69
|
65 //
|
jpayne@69
|
66 // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
|
jpayne@69
|
67 // HTTP implementation may need to fall back to Transfer-Encoding: chunked.
|
jpayne@69
|
68 //
|
jpayne@69
|
69 // The default implementation always returns null.
|
jpayne@69
|
70
|
jpayne@69
|
71 virtual Promise<uint64_t> pumpTo(
|
jpayne@69
|
72 AsyncOutputStream& output, uint64_t amount = kj::maxValue);
|
jpayne@69
|
73 // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
|
jpayne@69
|
74 // total bytes actually pumped (which is only less than `amount` if EOF was reached).
|
jpayne@69
|
75 //
|
jpayne@69
|
76 // Override this if your stream type knows how to pump itself to certain kinds of output
|
jpayne@69
|
77 // streams more efficiently than via the naive approach. You can use
|
jpayne@69
|
78 // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
|
jpayne@69
|
79 // delegate to the default implementation.
|
jpayne@69
|
80 //
|
jpayne@69
|
81 // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
|
jpayne@69
|
82 // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
|
jpayne@69
|
83
|
jpayne@69
|
84 Promise<Array<byte>> readAllBytes(uint64_t limit = kj::maxValue);
|
jpayne@69
|
85 Promise<String> readAllText(uint64_t limit = kj::maxValue);
|
jpayne@69
|
86 // Read until EOF and return as one big byte array or string. Throw an exception if EOF is not
|
jpayne@69
|
87 // seen before reading `limit` bytes.
|
jpayne@69
|
88 //
|
jpayne@69
|
89 // To prevent runaway memory allocation, consider using a more conservative value for `limit` than
|
jpayne@69
|
90 // the default, particularly on untrusted data streams which may never see EOF.
|
jpayne@69
|
91
|
jpayne@69
|
92 virtual void registerAncillaryMessageHandler(Function<void(ArrayPtr<AncillaryMessage>)> fn);
|
jpayne@69
|
93 // Register interest in checking for ancillary messages (aka control messages) when reading.
|
jpayne@69
|
94 // The provided callback will be called whenever any are encountered. The messages passed to
|
jpayne@69
|
95 // the function do not live beyond when function returns.
|
jpayne@69
|
96 // Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this.
|
jpayne@69
|
97
|
jpayne@69
|
98 virtual Maybe<Own<AsyncInputStream>> tryTee(uint64_t limit = kj::maxValue);
|
jpayne@69
|
99 // Primarily intended as an optimization for the `tee` call. Returns an input stream whose state
|
jpayne@69
|
100 // is independent from this one but which will return the exact same set of bytes read going
|
jpayne@69
|
101 // forward. limit is a total limit on the amount of memory, in bytes, which a tee implementation
|
jpayne@69
|
102 // may use to buffer stream data. An implementation must throw an exception if a read operation
|
jpayne@69
|
103 // would cause the limit to be exceeded. If tryTee() can see that the new limit is impossible to
|
jpayne@69
|
104 // satisfy, it should return nullptr so that the pessimized path is taken in newTee. This is
|
jpayne@69
|
105 // likely to arise if tryTee() is called twice with different limits on the same stream.
|
jpayne@69
|
106 };
|
jpayne@69
|
107
|
jpayne@69
|
108 class AsyncOutputStream: private AsyncObject {
|
jpayne@69
|
109 // Asynchronous equivalent of OutputStream (from io.h).
|
jpayne@69
|
110
|
jpayne@69
|
111 public:
|
jpayne@69
|
112 virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0;
|
jpayne@69
|
113 virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces)
|
jpayne@69
|
114 KJ_WARN_UNUSED_RESULT = 0;
|
jpayne@69
|
115
|
jpayne@69
|
116 virtual Maybe<Promise<uint64_t>> tryPumpFrom(
|
jpayne@69
|
117 AsyncInputStream& input, uint64_t amount = kj::maxValue);
|
jpayne@69
|
118 // Implements double-dispatch for AsyncInputStream::pumpTo().
|
jpayne@69
|
119 //
|
jpayne@69
|
120 // This method should only be called from within an implementation of pumpTo().
|
jpayne@69
|
121 //
|
jpayne@69
|
122 // This method examines the type of `input` to find optimized ways to pump data from it to this
|
jpayne@69
|
123 // output stream. If it finds one, it performs the pump. Otherwise, it returns null.
|
jpayne@69
|
124 //
|
jpayne@69
|
125 // The default implementation always returns null.
|
jpayne@69
|
126
|
jpayne@69
|
127 virtual Promise<void> whenWriteDisconnected() = 0;
|
jpayne@69
|
128 // Returns a promise that resolves when the stream has become disconnected such that new write()s
|
jpayne@69
|
129 // will fail with a DISCONNECTED exception. This is particularly useful, for example, to cancel
|
jpayne@69
|
130 // work early when it is detected that no one will receive the result.
|
jpayne@69
|
131 //
|
jpayne@69
|
132 // Note that not all streams are able to detect this condition without actually performing a
|
jpayne@69
|
133 // write(); such stream implementations may return a promise that never resolves. (In particular,
|
jpayne@69
|
134 // as of this writing, whenWriteDisconnected() is not implemented on Windows. Also, for TCP
|
jpayne@69
|
135 // streams, not all disconnects are detectable -- a power or network failure may lead the
|
jpayne@69
|
136 // connection to hang forever, or until configured socket options lead to a timeout.)
|
jpayne@69
|
137 //
|
jpayne@69
|
138 // Unlike most other asynchronous stream methods, it is safe to call whenWriteDisconnected()
|
jpayne@69
|
139 // multiple times without canceling the previous promises.
|
jpayne@69
|
140 };
|
jpayne@69
|
141
|
jpayne@69
|
142 class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
|
jpayne@69
|
143 // A combination input and output stream.
|
jpayne@69
|
144
|
jpayne@69
|
145 public:
|
jpayne@69
|
146 virtual void shutdownWrite() = 0;
|
jpayne@69
|
147 // Cleanly shut down just the write end of the stream, while keeping the read end open.
|
jpayne@69
|
148
|
jpayne@69
|
149 virtual void abortRead() {}
|
jpayne@69
|
150 // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
|
jpayne@69
|
151 // be called when an error has occurred.
|
jpayne@69
|
152
|
jpayne@69
|
153 virtual void getsockopt(int level, int option, void* value, uint* length);
|
jpayne@69
|
154 virtual void setsockopt(int level, int option, const void* value, uint length);
|
jpayne@69
|
155 // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
|
jpayne@69
|
156 // if the stream is not a socket or the option is not appropriate for the socket type. The
|
jpayne@69
|
157 // default implementations always throw "unimplemented".
|
jpayne@69
|
158
|
jpayne@69
|
159 virtual void getsockname(struct sockaddr* addr, uint* length);
|
jpayne@69
|
160 virtual void getpeername(struct sockaddr* addr, uint* length);
|
jpayne@69
|
161 // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
|
jpayne@69
|
162 // exception if the stream is not a socket. The default implementations always throw
|
jpayne@69
|
163 // "unimplemented".
|
jpayne@69
|
164 //
|
jpayne@69
|
165 // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
|
jpayne@69
|
166 // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
|
jpayne@69
|
167 // ephemeral addresses for a single connection.
|
jpayne@69
|
168
|
jpayne@69
|
169 virtual kj::Maybe<int> getFd() const { return nullptr; }
|
jpayne@69
|
170 // Get the underlying Unix file descriptor, if any. Returns nullptr if this object actually
|
jpayne@69
|
171 // isn't wrapping a file descriptor.
|
jpayne@69
|
172 };
|
jpayne@69
|
173
|
jpayne@69
|
174 Promise<uint64_t> unoptimizedPumpTo(
|
jpayne@69
|
175 AsyncInputStream& input, AsyncOutputStream& output, uint64_t amount,
|
jpayne@69
|
176 uint64_t completedSoFar = 0);
|
jpayne@69
|
177 // Performs a pump using read() and write(), without calling the stream's pumpTo() nor
|
jpayne@69
|
178 // tryPumpFrom() methods. This is intended to be used as a fallback by implementations of pumpTo()
|
jpayne@69
|
179 // and tryPumpFrom() when they want to give up on optimization, but can't just call pumpTo() again
|
jpayne@69
|
180 // because this would recursively retry the optimization. unoptimizedPumpTo() should only be called
|
jpayne@69
|
181 // inside implementations of streams, never by the caller of a stream -- use the pumpTo() method
|
jpayne@69
|
182 // instead.
|
jpayne@69
|
183 //
|
jpayne@69
|
184 // `completedSoFar` is the number of bytes out of `amount` that have already been pumped. This is
|
jpayne@69
|
185 // provided for convenience for cases where the caller has already done some pumping before they
|
jpayne@69
|
186 // give up. Otherwise, a `.then()` would need to be used to add the bytes to the final result.
|
jpayne@69
|
187
|
jpayne@69
|
188 class AsyncCapabilityStream: public AsyncIoStream {
|
jpayne@69
|
189 // An AsyncIoStream that also allows transmitting new stream objects and file descriptors
|
jpayne@69
|
190 // (capabilities, in the object-capability model sense), in addition to bytes.
|
jpayne@69
|
191 //
|
jpayne@69
|
192 // Capabilities can be attached to bytes when they are written. On the receiving end, the read()
|
jpayne@69
|
193 // that receives the first byte of such a message will also receive the capabilities.
|
jpayne@69
|
194 //
|
jpayne@69
|
195 // Note that AsyncIoStream's regular byte-oriented methods can be used on AsyncCapabilityStream,
|
jpayne@69
|
196 // with the effect of silently dropping any capabilities attached to the respective bytes. E.g.
|
jpayne@69
|
197 // using `AsyncIoStream::tryRead()` to read bytes that had been sent with `writeWithFds()` will
|
jpayne@69
|
198 // silently drop the FDs (closing them if appropriate). Also note that pumping a stream with
|
jpayne@69
|
199 // `pumpTo()` always drops all capabilities attached to the pumped data. (TODO(someday): Do we
|
jpayne@69
|
200 // want a version of pumpTo() that preserves capabilities?)
|
jpayne@69
|
201 //
|
jpayne@69
|
202 // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor
|
jpayne@69
|
203 // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally
|
jpayne@69
|
204 // read()s when it should have called receiveStream(), it will observe a NUL byte in the data
|
jpayne@69
|
205 // and the capability will be discarded. Of course, an application should not depend on this
|
jpayne@69
|
206 // behavior; it should avoid read()ing through a capability.
|
jpayne@69
|
207 //
|
jpayne@69
|
208 // KJ does not provide any inter-process implementation of this type on Windows, as there's no
|
jpayne@69
|
209 // obvious implementation there. Handle passing on Windows requires at least one of the processes
|
jpayne@69
|
210 // involved to have permission to modify the other's handle table, which is effectively full
|
jpayne@69
|
211 // control. Handle passing between mutually non-trusting processes would require a trusted
|
jpayne@69
|
212 // broker process to facilitate. One could possibly implement this type in terms of such a
|
jpayne@69
|
213 // broker, or in terms of direct handle passing if at least one process trusts the other.
|
jpayne@69
|
214
|
jpayne@69
|
215 public:
|
jpayne@69
|
216 virtual Promise<void> writeWithFds(ArrayPtr<const byte> data,
|
jpayne@69
|
217 ArrayPtr<const ArrayPtr<const byte>> moreData,
|
jpayne@69
|
218 ArrayPtr<const int> fds) = 0;
|
jpayne@69
|
219 Promise<void> writeWithFds(ArrayPtr<const byte> data,
|
jpayne@69
|
220 ArrayPtr<const ArrayPtr<const byte>> moreData,
|
jpayne@69
|
221 ArrayPtr<const AutoCloseFd> fds);
|
jpayne@69
|
222 // Write some data to the stream with some file descriptors attached to it.
|
jpayne@69
|
223 //
|
jpayne@69
|
224 // The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed
|
jpayne@69
|
225 // limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is
|
jpayne@69
|
226 // probably a bad idea.
|
jpayne@69
|
227
|
jpayne@69
|
228 struct ReadResult {
|
jpayne@69
|
229 size_t byteCount;
|
jpayne@69
|
230 size_t capCount;
|
jpayne@69
|
231 };
|
jpayne@69
|
232
|
jpayne@69
|
233 virtual Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
|
jpayne@69
|
234 AutoCloseFd* fdBuffer, size_t maxFds) = 0;
|
jpayne@69
|
235 // Read data from the stream that may have file descriptors attached. Any attached descriptors
|
jpayne@69
|
236 // will be placed in `fdBuffer`. If multiple bundles of FDs are encountered in the course of
|
jpayne@69
|
237 // reading the amount of data requested by minBytes/maxBytes, then they will be concatenated. If
|
jpayne@69
|
238 // more FDs are received than fit in the buffer, then the excess will be discarded and closed --
|
jpayne@69
|
239 // this behavior, while ugly, is important to defend against denial-of-service attacks that may
|
jpayne@69
|
240 // fill up the FD table with garbage. Applications must think carefully about how many FDs they
|
jpayne@69
|
241 // really need to receive at once and set a well-defined limit.
|
jpayne@69
|
242
|
jpayne@69
|
243 virtual Promise<void> writeWithStreams(ArrayPtr<const byte> data,
|
jpayne@69
|
244 ArrayPtr<const ArrayPtr<const byte>> moreData,
|
jpayne@69
|
245 Array<Own<AsyncCapabilityStream>> streams) = 0;
|
jpayne@69
|
246 virtual Promise<ReadResult> tryReadWithStreams(
|
jpayne@69
|
247 void* buffer, size_t minBytes, size_t maxBytes,
|
jpayne@69
|
248 Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) = 0;
|
jpayne@69
|
249 // Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from
|
jpayne@69
|
250 // the same AsyncIoProvider.
|
jpayne@69
|
251
|
jpayne@69
|
252 // ---------------------------------------------------------------------------
|
jpayne@69
|
253 // Helpers for sending individual capabilities.
|
jpayne@69
|
254 //
|
jpayne@69
|
255 // These are equivalent to the above methods with the constraint that only one FD is
|
jpayne@69
|
256 // sent/received at a time and the corresponding data is a single zero-valued byte.
|
jpayne@69
|
257
|
jpayne@69
|
258 Promise<Own<AsyncCapabilityStream>> receiveStream();
|
jpayne@69
|
259 Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream();
|
jpayne@69
|
260 Promise<void> sendStream(Own<AsyncCapabilityStream> stream);
|
jpayne@69
|
261 // Transfer a single stream.
|
jpayne@69
|
262
|
jpayne@69
|
263 Promise<AutoCloseFd> receiveFd();
|
jpayne@69
|
264 Promise<Maybe<AutoCloseFd>> tryReceiveFd();
|
jpayne@69
|
265 Promise<void> sendFd(int fd);
|
jpayne@69
|
266 // Transfer a single raw file descriptor.
|
jpayne@69
|
267 };
|
jpayne@69
|
268
|
jpayne@69
|
269 struct OneWayPipe {
|
jpayne@69
|
270 // A data pipe with an input end and an output end. (Typically backed by pipe() system call.)
|
jpayne@69
|
271
|
jpayne@69
|
272 Own<AsyncInputStream> in;
|
jpayne@69
|
273 Own<AsyncOutputStream> out;
|
jpayne@69
|
274 };
|
jpayne@69
|
275
|
jpayne@69
|
276 OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr);
|
jpayne@69
|
277 // Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits
|
jpayne@69
|
278 // until both a read() and a write() call are pending, then resolves both.
|
jpayne@69
|
279 //
|
jpayne@69
|
280 // If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many
|
jpayne@69
|
281 // bytes. The input end's `tryGetLength()` will return the number of bytes left.
|
jpayne@69
|
282
|
jpayne@69
|
283 struct TwoWayPipe {
|
jpayne@69
|
284 // A data pipe that supports sending in both directions. Each end's output sends data to the
|
jpayne@69
|
285 // other end's input. (Typically backed by socketpair() system call.)
|
jpayne@69
|
286
|
jpayne@69
|
287 Own<AsyncIoStream> ends[2];
|
jpayne@69
|
288 };
|
jpayne@69
|
289
|
jpayne@69
|
290 TwoWayPipe newTwoWayPipe();
|
jpayne@69
|
291 // Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits
|
jpayne@69
|
292 // until both a read() and a write() call are pending, then resolves both.
|
jpayne@69
|
293
|
jpayne@69
|
294 struct CapabilityPipe {
|
jpayne@69
|
295 // Like TwoWayPipe but allowing capability-passing.
|
jpayne@69
|
296
|
jpayne@69
|
297 Own<AsyncCapabilityStream> ends[2];
|
jpayne@69
|
298 };
|
jpayne@69
|
299
|
jpayne@69
|
300 CapabilityPipe newCapabilityPipe();
|
jpayne@69
|
301 // Like newTwoWayPipe() but creates a capability pipe.
|
jpayne@69
|
302 //
|
jpayne@69
|
303 // The requirement of `writeWithStreams()` that "The stream implementations must be from the same
|
jpayne@69
|
304 // AsyncIoProvider." does not apply to this pipe; any kind of AsyncCapabilityStream implementation
|
jpayne@69
|
305 // is supported.
|
jpayne@69
|
306 //
|
jpayne@69
|
307 // This implementation does not know how to convert streams to FDs or vice versa; if you write FDs
|
jpayne@69
|
308 // you must read FDs, and if you write streams you must read streams.
|
jpayne@69
|
309
|
jpayne@69
|
310 struct Tee {
|
jpayne@69
|
311 // Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream.
|
jpayne@69
|
312
|
jpayne@69
|
313 Own<AsyncInputStream> branches[2];
|
jpayne@69
|
314 };
|
jpayne@69
|
315
|
jpayne@69
|
316 Tee newTee(Own<AsyncInputStream> input, uint64_t limit = kj::maxValue);
|
jpayne@69
|
317 // Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is
|
jpayne@69
|
318 // called on one of the two input ends. If a read or pump operation is subsequently called on the
|
jpayne@69
|
319 // other input end, the buffered data is consumed.
|
jpayne@69
|
320 //
|
jpayne@69
|
321 // `pumpTo()` operations on the input ends will proactively read from the inner stream and block
|
jpayne@69
|
322 // while writing to the output stream. While one branch has an active `pumpTo()` operation, any
|
jpayne@69
|
323 // `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the
|
jpayne@69
|
324 // pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if
|
jpayne@69
|
325 // there are `pumpTo()` operations active on both branches, the greater of the two backpressures is
|
jpayne@69
|
326 // respected -- the two pumps progress in lockstep, and there is no buffering.
|
jpayne@69
|
327 //
|
jpayne@69
|
328 // At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would
|
jpayne@69
|
329 // grow beyond the limit, an exception is generated, which both branches see once they have
|
jpayne@69
|
330 // exhausted their buffers.
|
jpayne@69
|
331 //
|
jpayne@69
|
332 // It is recommended that you use a more conservative value for `limit` than the default.
|
jpayne@69
|
333
|
jpayne@69
|
334 Own<AsyncOutputStream> newPromisedStream(Promise<Own<AsyncOutputStream>> promise);
|
jpayne@69
|
335 Own<AsyncIoStream> newPromisedStream(Promise<Own<AsyncIoStream>> promise);
|
jpayne@69
|
336 // Constructs an Async*Stream which waits for a promise to resolve, then forwards all calls to the
|
jpayne@69
|
337 // promised stream.
|
jpayne@69
|
338
|
jpayne@69
|
339 // =======================================================================================
|
jpayne@69
|
340 // Authenticated streams
|
jpayne@69
|
341
|
jpayne@69
|
342 class PeerIdentity {
|
jpayne@69
|
343 // PeerIdentity provides information about a connecting client. Various subclasses exist to
|
jpayne@69
|
344 // address different network types.
|
jpayne@69
|
345 public:
|
jpayne@69
|
346 virtual kj::String toString() = 0;
|
jpayne@69
|
347 // Returns a human-readable string identifying the peer. Where possible, this string will be
|
jpayne@69
|
348 // in the same format as the addresses you could pass to `kj::Network::parseAddress()`. However,
|
jpayne@69
|
349 // only certain subclasses of `PeerIdentity` guarantee this property.
|
jpayne@69
|
350 };
|
jpayne@69
|
351
|
jpayne@69
|
352 struct AuthenticatedStream {
|
jpayne@69
|
353 // A pair of an `AsyncIoStream` and a `PeerIdentity`. This is used as the return type of
|
jpayne@69
|
354 // `NetworkAddress::connectAuthenticated()` and `ConnectionReceiver::acceptAuthenticated()`.
|
jpayne@69
|
355
|
jpayne@69
|
356 Own<AsyncIoStream> stream;
|
jpayne@69
|
357 // The byte stream.
|
jpayne@69
|
358
|
jpayne@69
|
359 Own<PeerIdentity> peerIdentity;
|
jpayne@69
|
360 // An object indicating who is at the other end of the stream.
|
jpayne@69
|
361 //
|
jpayne@69
|
362 // Different subclasses of `PeerIdentity` are used in different situations:
|
jpayne@69
|
363 // - TCP connections will use NetworkPeerIdentity, which gives the network address of the client.
|
jpayne@69
|
364 // - Local (unix) socket connections will use LocalPeerIdentity, which identifies the UID
|
jpayne@69
|
365 // and PID of the process that initiated the connection.
|
jpayne@69
|
366 // - TLS connections will use TlsPeerIdentity which provides details of the client certificate,
|
jpayne@69
|
367 // if any was provided.
|
jpayne@69
|
368 // - When no meaningful peer identity can be provided, `UnknownPeerIdentity` is returned.
|
jpayne@69
|
369 //
|
jpayne@69
|
370 // Implementations of `Network`, `ConnectionReceiver`, `NetworkAddress`, etc. should document the
|
jpayne@69
|
371 // specific assumptions the caller can make about the type of `PeerIdentity`s used, allowing for
|
jpayne@69
|
372 // identities to be statically downcast if the right conditions are met. In the absence of
|
jpayne@69
|
373 // documented promises, RTTI may be needed to query the type.
|
jpayne@69
|
374 };
|
jpayne@69
|
375
|
jpayne@69
|
376 class NetworkPeerIdentity: public PeerIdentity {
|
jpayne@69
|
377 // PeerIdentity used for network protocols like TCP/IP. This identifies the remote peer.
|
jpayne@69
|
378 //
|
jpayne@69
|
379 // This is only "authenticated" to the extent that we know data written to the stream will be
|
jpayne@69
|
380 // routed to the given address. This does not preclude the possibility of man-in-the-middle
|
jpayne@69
|
381 // attacks by attackers who are able to manipulate traffic along the route.
|
jpayne@69
|
382 public:
|
jpayne@69
|
383 virtual NetworkAddress& getAddress() = 0;
|
jpayne@69
|
384 // Obtain the peer's address as a NetworkAddress object. The returned reference's lifetime is the
|
jpayne@69
|
385 // same as the `NetworkPeerIdentity`, but you can always call `clone()` on it to get a copy that
|
jpayne@69
|
386 // lives longer.
|
jpayne@69
|
387
|
jpayne@69
|
388 static kj::Own<NetworkPeerIdentity> newInstance(kj::Own<NetworkAddress> addr);
|
jpayne@69
|
389 // Construct an instance of this interface wrapping the given address.
|
jpayne@69
|
390 };
|
jpayne@69
|
391
|
jpayne@69
|
392 class LocalPeerIdentity: public PeerIdentity {
|
jpayne@69
|
393 // PeerIdentity used for connections between processes on the local machine -- in particular,
|
jpayne@69
|
394 // Unix sockets.
|
jpayne@69
|
395 //
|
jpayne@69
|
396 // (This interface probably isn't useful on Windows.)
|
jpayne@69
|
397 public:
|
jpayne@69
|
398 struct Credentials {
|
jpayne@69
|
399 kj::Maybe<int> pid;
|
jpayne@69
|
400 kj::Maybe<uint> uid;
|
jpayne@69
|
401
|
jpayne@69
|
402 // We don't cover groups at present because some systems produce a list of groups while others
|
jpayne@69
|
403 // only provide the peer's main group, the latter being pretty useless.
|
jpayne@69
|
404 };
|
jpayne@69
|
405
|
jpayne@69
|
406 virtual Credentials getCredentials() = 0;
|
jpayne@69
|
407 // Get the PID and UID of the peer process, if possible.
|
jpayne@69
|
408 //
|
jpayne@69
|
409 // Either ID may be null if the peer could not be identified. Some operating systems do not
|
jpayne@69
|
410 // support retrieving these credentials, or can only provide one or the other. Some situations
|
jpayne@69
|
411 // (like user and PID namespaces on Linux) may also make it impossible to represent the peer's
|
jpayne@69
|
412 // credentials accurately.
|
jpayne@69
|
413 //
|
jpayne@69
|
414 // Note the meaning here can be subtle. Multiple processes can potentially have the socket in
|
jpayne@69
|
415 // their file descriptor tables. The identified process is the one who called `connect()` or
|
jpayne@69
|
416 // `listen()`.
|
jpayne@69
|
417 //
|
jpayne@69
|
418 // On Linux this is implemented with SO_PEERCRED.
|
jpayne@69
|
419
|
jpayne@69
|
420 static kj::Own<LocalPeerIdentity> newInstance(Credentials creds);
|
jpayne@69
|
421 // Construct an instance of this interface wrapping the given credentials.
|
jpayne@69
|
422 };
|
jpayne@69
|
423
|
jpayne@69
|
424 class UnknownPeerIdentity: public PeerIdentity {
|
jpayne@69
|
425 public:
|
jpayne@69
|
426 static kj::Own<UnknownPeerIdentity> newInstance();
|
jpayne@69
|
427 // Get an instance of this interface. This actually always returns the same instance with no
|
jpayne@69
|
428 // memory allocation.
|
jpayne@69
|
429 };
|
jpayne@69
|
430
|
jpayne@69
|
431 // =======================================================================================
|
jpayne@69
|
432 // Accepting connections
|
jpayne@69
|
433
|
jpayne@69
|
434 class ConnectionReceiver: private AsyncObject {
|
jpayne@69
|
435 // Represents a server socket listening on a port.
|
jpayne@69
|
436
|
jpayne@69
|
437 public:
|
jpayne@69
|
438 virtual Promise<Own<AsyncIoStream>> accept() = 0;
|
jpayne@69
|
439 // Accept the next incoming connection.
|
jpayne@69
|
440
|
jpayne@69
|
441 virtual Promise<AuthenticatedStream> acceptAuthenticated();
|
jpayne@69
|
442 // Accept the next incoming connection, and also provide a PeerIdentity with any information
|
jpayne@69
|
443 // about the client.
|
jpayne@69
|
444 //
|
jpayne@69
|
445 // For backwards-compatibility, the default implementation of this method calls `accept()` and
|
jpayne@69
|
446 // then adds `UnknownPeerIdentity`.
|
jpayne@69
|
447
|
jpayne@69
|
448 virtual uint getPort() = 0;
|
jpayne@69
|
449 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
|
jpayne@69
|
450 // specify a port when constructing the NetworkAddress -- one will have been assigned
|
jpayne@69
|
451 // automatically.
|
jpayne@69
|
452
|
jpayne@69
|
453 virtual void getsockopt(int level, int option, void* value, uint* length);
|
jpayne@69
|
454 virtual void setsockopt(int level, int option, const void* value, uint length);
|
jpayne@69
|
455 virtual void getsockname(struct sockaddr* addr, uint* length);
|
jpayne@69
|
456 // Same as the methods of AsyncIoStream.
|
jpayne@69
|
457 };
|
jpayne@69
|
458
|
jpayne@69
|
459 Own<ConnectionReceiver> newAggregateConnectionReceiver(Array<Own<ConnectionReceiver>> receivers);
|
jpayne@69
|
460 // Create a ConnectionReceiver that listens on several other ConnectionReceivers and returns
|
jpayne@69
|
461 // sockets from any of them.
|
jpayne@69
|
462
|
jpayne@69
|
463 // =======================================================================================
|
jpayne@69
|
464 // Datagram I/O
|
jpayne@69
|
465
|
jpayne@69
|
466 class AncillaryMessage {
|
jpayne@69
|
467 // Represents an ancillary message (aka control message) received using the recvmsg() system
|
jpayne@69
|
468 // call (or equivalent). Most apps will not use this.
|
jpayne@69
|
469
|
jpayne@69
|
470 public:
|
jpayne@69
|
471 inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
|
jpayne@69
|
472 AncillaryMessage() = default;
|
jpayne@69
|
473
|
jpayne@69
|
474 inline int getLevel() const;
|
jpayne@69
|
475 // Originating protocol / socket level.
|
jpayne@69
|
476
|
jpayne@69
|
477 inline int getType() const;
|
jpayne@69
|
478 // Protocol-specific message type.
|
jpayne@69
|
479
|
jpayne@69
|
480 template <typename T>
|
jpayne@69
|
481 inline Maybe<const T&> as() const;
|
jpayne@69
|
482 // Interpret the ancillary message as the given struct type. Most ancillary messages are some
|
jpayne@69
|
483 // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
|
jpayne@69
|
484 // is smaller than the struct -- this can happen if the message was truncated due to
|
jpayne@69
|
485 // insufficient ancillary buffer space.
|
jpayne@69
|
486
|
jpayne@69
|
487 template <typename T>
|
jpayne@69
|
488 inline ArrayPtr<const T> asArray() const;
|
jpayne@69
|
489 // Interpret the ancillary message as an array of items. If the message size does not evenly
|
jpayne@69
|
490 // divide into elements of type T, the remainder is discarded -- this can happen if the message
|
jpayne@69
|
491 // was truncated due to insufficient ancillary buffer space.
|
jpayne@69
|
492
|
jpayne@69
|
493 private:
|
jpayne@69
|
494 int level;
|
jpayne@69
|
495 int type;
|
jpayne@69
|
496 ArrayPtr<const byte> data;
|
jpayne@69
|
497 // Message data. In most cases you should use `as()` or `asArray()`.
|
jpayne@69
|
498 };
|
jpayne@69
|
499
|
jpayne@69
|
500 class DatagramReceiver {
|
jpayne@69
|
501 // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
|
jpayne@69
|
502 // capacity in advance; if a received packet is larger than the capacity, it will be truncated.
|
jpayne@69
|
503
|
jpayne@69
|
504 public:
|
jpayne@69
|
505 virtual Promise<void> receive() = 0;
|
jpayne@69
|
506 // Receive a new message, overwriting this object's content.
|
jpayne@69
|
507 //
|
jpayne@69
|
508 // receive() may reuse the same buffers for content and ancillary data with each call.
|
jpayne@69
|
509
|
jpayne@69
|
510 template <typename T>
|
jpayne@69
|
511 struct MaybeTruncated {
|
jpayne@69
|
512 T value;
|
jpayne@69
|
513
|
jpayne@69
|
514 bool isTruncated;
|
jpayne@69
|
515 // True if the Receiver's capacity was insufficient to receive the value and therefore the
|
jpayne@69
|
516 // value is truncated.
|
jpayne@69
|
517 };
|
jpayne@69
|
518
|
jpayne@69
|
519 virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
|
jpayne@69
|
520 // Get the content of the datagram.
|
jpayne@69
|
521
|
jpayne@69
|
522 virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
|
jpayne@69
|
523 // Ancillary messages received with the datagram. See the recvmsg() system call and the cmsghdr
|
jpayne@69
|
524 // struct. Most apps don't need this.
|
jpayne@69
|
525 //
|
jpayne@69
|
526 // If the returned value is truncated, then the last message in the array may itself be
|
jpayne@69
|
527 // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
|
jpayne@69
|
528 // return fewer elements than expected. Truncation can also mean that additional messages were
|
jpayne@69
|
529 // available but discarded.
|
jpayne@69
|
530
|
jpayne@69
|
531 virtual NetworkAddress& getSource() = 0;
|
jpayne@69
|
532 // Get the datagram sender's address.
|
jpayne@69
|
533
|
jpayne@69
|
534 struct Capacity {
|
jpayne@69
|
535 size_t content = 8192;
|
jpayne@69
|
536 // How much space to allocate for the datagram content. If a datagram is received that is
|
jpayne@69
|
537 // larger than this, it will be truncated, with no way to recover the tail.
|
jpayne@69
|
538
|
jpayne@69
|
539 size_t ancillary = 0;
|
jpayne@69
|
540 // How much space to allocate for ancillary messages. As with content, if the ancillary data
|
jpayne@69
|
541 // is larger than this, it will be truncated.
|
jpayne@69
|
542 };
|
jpayne@69
|
543 };
|
jpayne@69
|
544
|
jpayne@69
|
545 class DatagramPort {
|
jpayne@69
|
546 public:
|
jpayne@69
|
547 virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
|
jpayne@69
|
548 virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
|
jpayne@69
|
549 NetworkAddress& destination) = 0;
|
jpayne@69
|
550
|
jpayne@69
|
551 virtual Own<DatagramReceiver> makeReceiver(
|
jpayne@69
|
552 DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
|
jpayne@69
|
553 // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
|
jpayne@69
|
554 // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
|
jpayne@69
|
555
|
jpayne@69
|
556 virtual uint getPort() = 0;
|
jpayne@69
|
557 // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
|
jpayne@69
|
558 // specify a port when constructing the NetworkAddress -- one will have been assigned
|
jpayne@69
|
559 // automatically.
|
jpayne@69
|
560
|
jpayne@69
|
561 virtual void getsockopt(int level, int option, void* value, uint* length);
|
jpayne@69
|
562 virtual void setsockopt(int level, int option, const void* value, uint length);
|
jpayne@69
|
563 // Same as the methods of AsyncIoStream.
|
jpayne@69
|
564 };
|
jpayne@69
|
565
|
jpayne@69
|
566 // =======================================================================================
|
jpayne@69
|
567 // Networks
|
jpayne@69
|
568
|
jpayne@69
|
569 class NetworkAddress: private AsyncObject {
|
jpayne@69
|
570 // Represents a remote address to which the application can connect.
|
jpayne@69
|
571
|
jpayne@69
|
572 public:
|
jpayne@69
|
573 virtual Promise<Own<AsyncIoStream>> connect() = 0;
|
jpayne@69
|
574 // Make a new connection to this address.
|
jpayne@69
|
575 //
|
jpayne@69
|
576 // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number.
|
jpayne@69
|
577
|
jpayne@69
|
578 virtual Promise<AuthenticatedStream> connectAuthenticated();
|
jpayne@69
|
579 // Connect to the address and return both the connection and information about the peer identity.
|
jpayne@69
|
580 // This is especially useful when using TLS, to get certificate details.
|
jpayne@69
|
581 //
|
jpayne@69
|
582 // For backwards-compatibility, the default implementation of this method calls `connect()` and
|
jpayne@69
|
583 // then uses a `NetworkPeerIdentity` wrapping a clone of this `NetworkAddress` -- which is not
|
jpayne@69
|
584 // particularly useful.
|
jpayne@69
|
585
|
jpayne@69
|
586 virtual Own<ConnectionReceiver> listen() = 0;
|
jpayne@69
|
587 // Listen for incoming connections on this address.
|
jpayne@69
|
588 //
|
jpayne@69
|
589 // The address must be local.
|
jpayne@69
|
590
|
jpayne@69
|
591 virtual Own<DatagramPort> bindDatagramPort();
|
jpayne@69
|
592 // Open this address as a datagram (e.g. UDP) port.
|
jpayne@69
|
593 //
|
jpayne@69
|
594 // The address must be local.
|
jpayne@69
|
595
|
jpayne@69
|
596 virtual Own<NetworkAddress> clone() = 0;
|
jpayne@69
|
597 // Returns an equivalent copy of this NetworkAddress.
|
jpayne@69
|
598
|
jpayne@69
|
599 virtual String toString() = 0;
|
jpayne@69
|
600 // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
|
jpayne@69
|
601 // to reproduce this address, although whether or not that works of course depends on the Network
|
jpayne@69
|
602 // implementation. This should be called only to display the address to human users, who will
|
jpayne@69
|
603 // hopefully know what they are able to do with it.
|
jpayne@69
|
604 };
|
jpayne@69
|
605
|
jpayne@69
|
606 class Network {
|
jpayne@69
|
607 // Factory for NetworkAddress instances, representing the network services offered by the
|
jpayne@69
|
608 // operating system.
|
jpayne@69
|
609 //
|
jpayne@69
|
610 // This interface typically represents broad authority, and well-designed code should limit its
|
jpayne@69
|
611 // use to high-level startup code and user interaction. Low-level APIs should accept
|
jpayne@69
|
612 // NetworkAddress instances directly and work from there, if at all possible.
|
jpayne@69
|
613
|
jpayne@69
|
614 public:
|
jpayne@69
|
615 virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
|
jpayne@69
|
616 // Construct a network address from a user-provided string. The format of the address
|
jpayne@69
|
617 // strings is not specified at the API level, and application code should make no assumptions
|
jpayne@69
|
618 // about them. These strings should always be provided by humans, and said humans will know
|
jpayne@69
|
619 // what format to use in their particular context.
|
jpayne@69
|
620 //
|
jpayne@69
|
621 // `portHint`, if provided, specifies the "standard" IP port number for the application-level
|
jpayne@69
|
622 // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a
|
jpayne@69
|
623 // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is
|
jpayne@69
|
624 // omitted, then the returned address will only support listen() and bindDatagramPort()
|
jpayne@69
|
625 // (not connect()), and an unused port will be chosen each time one of those methods is called.
|
jpayne@69
|
626
|
jpayne@69
|
627 virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
|
jpayne@69
|
628 // Construct a network address from a legacy struct sockaddr.
|
jpayne@69
|
629
|
jpayne@69
|
630 virtual Own<Network> restrictPeers(
|
jpayne@69
|
631 kj::ArrayPtr<const kj::StringPtr> allow,
|
jpayne@69
|
632 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0;
|
jpayne@69
|
633 // Constructs a new Network instance wrapping this one which restricts which peer addresses are
|
jpayne@69
|
634 // permitted (both for outgoing and incoming connections).
|
jpayne@69
|
635 //
|
jpayne@69
|
636 // Communication will be allowed only with peers whose addresses match one of the patterns
|
jpayne@69
|
637 // specified in the `allow` array. If a `deny` array is specified, then any address which matches
|
jpayne@69
|
638 // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be
|
jpayne@69
|
639 // denied.
|
jpayne@69
|
640 //
|
jpayne@69
|
641 // The syntax of address patterns depends on the network, except that three special patterns are
|
jpayne@69
|
642 // defined for all networks:
|
jpayne@69
|
643 // - "private": Matches network addresses that are reserved by standards for private networks,
|
jpayne@69
|
644 // such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local".
|
jpayne@69
|
645 // - "public": Opposite of "private".
|
jpayne@69
|
646 // - "local": Matches network addresses that are defined by standards to only be accessible from
|
jpayne@69
|
647 // the local machine, such as "127.0.0.0/8" or Unix domain addresses.
|
jpayne@69
|
648 // - "network": Opposite of "local".
|
jpayne@69
|
649 //
|
jpayne@69
|
650 // For the standard KJ network implementation, the following patterns are also recognized:
|
jpayne@69
|
651 // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or
|
jpayne@69
|
652 // "2001:db8::/32".
|
jpayne@69
|
653 // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a
|
jpayne@69
|
654 // glob.)
|
jpayne@69
|
655 // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may
|
jpayne@69
|
656 // support specifying a glob.)
|
jpayne@69
|
657 //
|
jpayne@69
|
658 // Network restrictions apply *after* DNS resolution (otherwise they'd be useless).
|
jpayne@69
|
659 //
|
jpayne@69
|
660 // It is legal to parseAddress() a restricted address. An exception won't be thrown until
|
jpayne@69
|
661 // connect() is called.
|
jpayne@69
|
662 //
|
jpayne@69
|
663 // It's possible to listen() on a restricted address. However, connections will only be accepted
|
jpayne@69
|
664 // from non-restricted addresses; others will be dropped. If a particular listen address has no
|
jpayne@69
|
665 // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then
|
jpayne@69
|
666 // listen() may throw (or may simply never receive any connections).
|
jpayne@69
|
667 //
|
jpayne@69
|
668 // Examples:
|
jpayne@69
|
669 //
|
jpayne@69
|
670 // auto restricted = network->restrictPeers({"public"});
|
jpayne@69
|
671 //
|
jpayne@69
|
672 // Allows connections only to/from public internet addresses. Use this when connecting to an
|
jpayne@69
|
673 // address specified by a third party that is not trusted and is not themselves already on your
|
jpayne@69
|
674 // private network.
|
jpayne@69
|
675 //
|
jpayne@69
|
676 // auto restricted = network->restrictPeers({"private"});
|
jpayne@69
|
677 //
|
jpayne@69
|
678 // Allows connections only to/from the private network. Use this on the server side to reject
|
jpayne@69
|
679 // connections from the public internet.
|
jpayne@69
|
680 //
|
jpayne@69
|
681 // auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"});
|
jpayne@69
|
682 //
|
jpayne@69
|
683 // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked.
|
jpayne@69
|
684 //
|
jpayne@69
|
685 // auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"});
|
jpayne@69
|
686 //
|
jpayne@69
|
687 // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an
|
jpayne@69
|
688 // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow
|
jpayne@69
|
689 // rule that is more specific than the deny rule).
|
jpayne@69
|
690 };
|
jpayne@69
|
691
|
jpayne@69
|
692 // =======================================================================================
|
jpayne@69
|
693 // I/O Provider
|
jpayne@69
|
694
|
jpayne@69
|
695 class AsyncIoProvider {
|
jpayne@69
|
696 // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
|
jpayne@69
|
697 //
|
jpayne@69
|
698 // Generally, the implementation of this interface must integrate closely with a particular
|
jpayne@69
|
699 // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide
|
jpayne@69
|
700 // an AsyncIoProvider.
|
jpayne@69
|
701
|
jpayne@69
|
702 public:
|
jpayne@69
|
703 virtual OneWayPipe newOneWayPipe() = 0;
|
jpayne@69
|
704 // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
|
jpayne@69
|
705 // the pipe(2) system call).
|
jpayne@69
|
706
|
jpayne@69
|
707 virtual TwoWayPipe newTwoWayPipe() = 0;
|
jpayne@69
|
708 // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
|
jpayne@69
|
709 // socketpair(2) system call). Data written to one end can be read from the other.
|
jpayne@69
|
710
|
jpayne@69
|
711 virtual CapabilityPipe newCapabilityPipe();
|
jpayne@69
|
712 // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe.
|
jpayne@69
|
713 //
|
jpayne@69
|
714 // The default implementation throws an unimplemented exception. In particular this is not
|
jpayne@69
|
715 // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to
|
jpayne@69
|
716 // pass handles over a stream.
|
jpayne@69
|
717
|
jpayne@69
|
718 virtual Network& getNetwork() = 0;
|
jpayne@69
|
719 // Creates a new `Network` instance representing the networks exposed by the operating system.
|
jpayne@69
|
720 //
|
jpayne@69
|
721 // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If
|
jpayne@69
|
722 // you call this from low-level code, then you are preventing higher-level code from injecting an
|
jpayne@69
|
723 // alternative implementation. Instead, if your code needs to use network functionality, it
|
jpayne@69
|
724 // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
|
jpayne@69
|
725 // chose what implementation to use. The system network is essentially a singleton. See:
|
jpayne@69
|
726 // http://www.object-oriented-security.org/lets-argue/singletons
|
jpayne@69
|
727 //
|
jpayne@69
|
728 // Code that uses the system network should not make any assumptions about what kinds of
|
jpayne@69
|
729 // addresses it will parse, as this could differ across platforms. String addresses should come
|
jpayne@69
|
730 // strictly from the user, who will know how to write them correctly for their system.
|
jpayne@69
|
731 //
|
jpayne@69
|
732 // With that said, KJ currently supports the following string address formats:
|
jpayne@69
|
733 // - IPv4: "1.2.3.4", "1.2.3.4:80"
|
jpayne@69
|
734 // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
|
jpayne@69
|
735 // - Local IP wildcard (covers both v4 and v6): "*", "*:80"
|
jpayne@69
|
736 // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
|
jpayne@69
|
737 // - Unix domain: "unix:/path/to/socket"
|
jpayne@69
|
738
|
jpayne@69
|
739 struct PipeThread {
|
jpayne@69
|
740 // A combination of a thread and a two-way pipe that communicates with that thread.
|
jpayne@69
|
741 //
|
jpayne@69
|
742 // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
|
jpayne@69
|
743 // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread
|
jpayne@69
|
744 // arranges to exit when it detects disconnect, destruction should be clean.
|
jpayne@69
|
745
|
jpayne@69
|
746 Own<Thread> thread;
|
jpayne@69
|
747 Own<AsyncIoStream> pipe;
|
jpayne@69
|
748 };
|
jpayne@69
|
749
|
jpayne@69
|
750 virtual PipeThread newPipeThread(
|
jpayne@69
|
751 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
|
jpayne@69
|
752 // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
|
jpayne@69
|
753 // with it. One end of the pipe is passed to the thread's start function and the other end of
|
jpayne@69
|
754 // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will
|
jpayne@69
|
755 // already have an active `EventLoop` when `startFunc` is called.
|
jpayne@69
|
756 //
|
jpayne@69
|
757 // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too
|
jpayne@69
|
758 // much at once but I'm not sure how to cleanly break it down.
|
jpayne@69
|
759
|
jpayne@69
|
760 virtual Timer& getTimer() = 0;
|
jpayne@69
|
761 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
|
jpayne@69
|
762 // it only updates when the event loop polls for system events. This means that calling `now()`
|
jpayne@69
|
763 // on this timer does not require a system call.
|
jpayne@69
|
764 //
|
jpayne@69
|
765 // This timer is not affected by changes to the system date. It is unspecified whether the timer
|
jpayne@69
|
766 // continues to count while the system is suspended.
|
jpayne@69
|
767 };
|
jpayne@69
|
768
|
jpayne@69
|
769 class LowLevelAsyncIoProvider {
|
jpayne@69
|
770 // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
|
jpayne@69
|
771 // different operating systems. You should prefer to use `AsyncIoProvider` over this interface
|
jpayne@69
|
772 // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
|
jpayne@69
|
773 //
|
jpayne@69
|
774 // On Unix, this interface can be used to import native file descriptors into the async framework.
|
jpayne@69
|
775 // Different implementations of this interface might work on top of different event handling
|
jpayne@69
|
776 // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
|
jpayne@69
|
777 //
|
jpayne@69
|
778 // On Windows, this interface can be used to import native SOCKETs into the async framework.
|
jpayne@69
|
779 // Different implementations of this interface might work on top of different event handling
|
jpayne@69
|
780 // primitives, such as I/O completion ports vs. completion routines.
|
jpayne@69
|
781
|
jpayne@69
|
782 public:
|
jpayne@69
|
783 enum Flags {
|
jpayne@69
|
784 // Flags controlling how to wrap a file descriptor.
|
jpayne@69
|
785
|
jpayne@69
|
786 TAKE_OWNERSHIP = 1 << 0,
|
jpayne@69
|
787 // The returned object should own the file descriptor, automatically closing it when destroyed.
|
jpayne@69
|
788 // The close-on-exec flag will be set on the descriptor if it is not already.
|
jpayne@69
|
789 //
|
jpayne@69
|
790 // If this flag is not used, then the file descriptor is not automatically closed and the
|
jpayne@69
|
791 // close-on-exec flag is not modified.
|
jpayne@69
|
792
|
jpayne@69
|
793 #if !_WIN32
|
jpayne@69
|
794 ALREADY_CLOEXEC = 1 << 1,
|
jpayne@69
|
795 // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
|
jpayne@69
|
796 // Only relevant when combined with TAKE_OWNERSHIP.
|
jpayne@69
|
797 //
|
jpayne@69
|
798 // On Linux, all system calls which yield new file descriptors have flags or variants which
|
jpayne@69
|
799 // set the close-on-exec flag immediately. Unfortunately, other OS's do not.
|
jpayne@69
|
800
|
jpayne@69
|
801 ALREADY_NONBLOCK = 1 << 2
|
jpayne@69
|
802 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
|
jpayne@69
|
803 // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode
|
jpayne@69
|
804 // automatically.
|
jpayne@69
|
805 //
|
jpayne@69
|
806 // On Linux, all system calls which yield new file descriptors have flags or variants which
|
jpayne@69
|
807 // enable non-blocking mode immediately. Unfortunately, other OS's do not.
|
jpayne@69
|
808 #endif
|
jpayne@69
|
809 };
|
jpayne@69
|
810
|
jpayne@69
|
811 #if _WIN32
|
jpayne@69
|
812 typedef uintptr_t Fd;
|
jpayne@69
|
813 typedef AutoCloseHandle OwnFd;
|
jpayne@69
|
814 // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the
|
jpayne@69
|
815 // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify
|
jpayne@69
|
816 // explicitly).
|
jpayne@69
|
817 #else
|
jpayne@69
|
818 typedef int Fd;
|
jpayne@69
|
819 typedef AutoCloseFd OwnFd;
|
jpayne@69
|
820 // On Unix, any arbitrary file descriptor is supported.
|
jpayne@69
|
821 #endif
|
jpayne@69
|
822
|
jpayne@69
|
823 virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0;
|
jpayne@69
|
824 // Create an AsyncInputStream wrapping a file descriptor.
|
jpayne@69
|
825 //
|
jpayne@69
|
826 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
jpayne@69
|
827
|
jpayne@69
|
828 virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0;
|
jpayne@69
|
829 // Create an AsyncOutputStream wrapping a file descriptor.
|
jpayne@69
|
830 //
|
jpayne@69
|
831 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
jpayne@69
|
832
|
jpayne@69
|
833 virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0;
|
jpayne@69
|
834 // Create an AsyncIoStream wrapping a socket file descriptor.
|
jpayne@69
|
835 //
|
jpayne@69
|
836 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
jpayne@69
|
837
|
jpayne@69
|
838 #if !_WIN32
|
jpayne@69
|
839 virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0);
|
jpayne@69
|
840 // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be
|
jpayne@69
|
841 // a Unix domain socket.
|
jpayne@69
|
842 //
|
jpayne@69
|
843 // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with
|
jpayne@69
|
844 // LowLevelAsyncIoProvider implementations written before this method was added.
|
jpayne@69
|
845 #endif
|
jpayne@69
|
846
|
jpayne@69
|
847 virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
|
jpayne@69
|
848 Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
|
jpayne@69
|
849 // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
|
jpayne@69
|
850 // The returned promise does not resolve until connection has completed.
|
jpayne@69
|
851 //
|
jpayne@69
|
852 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
jpayne@69
|
853
|
jpayne@69
|
854 class NetworkFilter {
|
jpayne@69
|
855 public:
|
jpayne@69
|
856 virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0;
|
jpayne@69
|
857 // Returns true if incoming connections or datagrams from the given peer should be accepted.
|
jpayne@69
|
858 // If false, they will be dropped. This is used to implement kj::Network::restrictPeers().
|
jpayne@69
|
859
|
jpayne@69
|
860 static NetworkFilter& getAllAllowed();
|
jpayne@69
|
861 };
|
jpayne@69
|
862
|
jpayne@69
|
863 virtual Own<ConnectionReceiver> wrapListenSocketFd(
|
jpayne@69
|
864 Fd fd, NetworkFilter& filter, uint flags = 0) = 0;
|
jpayne@69
|
865 inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) {
|
jpayne@69
|
866 return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
|
jpayne@69
|
867 }
|
jpayne@69
|
868 // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
|
jpayne@69
|
869 // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
|
jpayne@69
|
870 //
|
jpayne@69
|
871 // `flags` is a bitwise-OR of the values of the `Flags` enum.
|
jpayne@69
|
872
|
jpayne@69
|
873 virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0);
|
jpayne@69
|
874 inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) {
|
jpayne@69
|
875 return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
|
jpayne@69
|
876 }
|
jpayne@69
|
877
|
jpayne@69
|
878 virtual Timer& getTimer() = 0;
|
jpayne@69
|
879 // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
|
jpayne@69
|
880 // it only updates when the event loop polls for system events. This means that calling `now()`
|
jpayne@69
|
881 // on this timer does not require a system call.
|
jpayne@69
|
882 //
|
jpayne@69
|
883 // This timer is not affected by changes to the system date. It is unspecified whether the timer
|
jpayne@69
|
884 // continues to count while the system is suspended.
|
jpayne@69
|
885
|
jpayne@69
|
886 Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0);
|
jpayne@69
|
887 Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0);
|
jpayne@69
|
888 Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0);
|
jpayne@69
|
889 #if !_WIN32
|
jpayne@69
|
890 Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0);
|
jpayne@69
|
891 #endif
|
jpayne@69
|
892 Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
|
jpayne@69
|
893 OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0);
|
jpayne@69
|
894 Own<ConnectionReceiver> wrapListenSocketFd(
|
jpayne@69
|
895 OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
|
jpayne@69
|
896 Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0);
|
jpayne@69
|
897 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
|
jpayne@69
|
898 Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0);
|
jpayne@69
|
899 // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle
|
jpayne@69
|
900 // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`.
|
jpayne@69
|
901 };
|
jpayne@69
|
902
|
jpayne@69
|
903 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
|
jpayne@69
|
904 // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.
|
jpayne@69
|
905
|
jpayne@69
|
906 struct AsyncIoContext {
|
jpayne@69
|
907 Own<LowLevelAsyncIoProvider> lowLevelProvider;
|
jpayne@69
|
908 Own<AsyncIoProvider> provider;
|
jpayne@69
|
909 WaitScope& waitScope;
|
jpayne@69
|
910
|
jpayne@69
|
911 #if _WIN32
|
jpayne@69
|
912 Win32EventPort& win32EventPort;
|
jpayne@69
|
913 #else
|
jpayne@69
|
914 UnixEventPort& unixEventPort;
|
jpayne@69
|
915 // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
|
jpayne@69
|
916 // field will go away at some point when we have a chance to improve these interfaces.
|
jpayne@69
|
917 #endif
|
jpayne@69
|
918 };
|
jpayne@69
|
919
|
jpayne@69
|
920 AsyncIoContext setupAsyncIo();
|
jpayne@69
|
921 // Convenience method which sets up the current thread with everything it needs to do async I/O.
|
jpayne@69
|
922 // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
|
jpayne@69
|
923 // doing I/O on the host system, so everything is ready for the thread to start making async calls
|
jpayne@69
|
924 // and waiting on promises.
|
jpayne@69
|
925 //
|
jpayne@69
|
926 // You would typically call this in your main() loop or in the start function of a thread.
|
jpayne@69
|
927 // Example:
|
jpayne@69
|
928 //
|
jpayne@69
|
929 // int main() {
|
jpayne@69
|
930 // auto ioContext = kj::setupAsyncIo();
|
jpayne@69
|
931 //
|
jpayne@69
|
932 // // Now we can call an async function.
|
jpayne@69
|
933 // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
|
jpayne@69
|
934 //
|
jpayne@69
|
935 // // And we can wait for the promise to complete. Note that you can only use `wait()`
|
jpayne@69
|
936 // // from the top level, not from inside a promise callback.
|
jpayne@69
|
937 // String text = textPromise.wait(ioContext.waitScope);
|
jpayne@69
|
938 // print(text);
|
jpayne@69
|
939 // return 0;
|
jpayne@69
|
940 // }
|
jpayne@69
|
941 //
|
jpayne@69
|
942 // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
|
jpayne@69
|
943 // particular, note that after a fork(), an AsyncIoContext created in the parent process will
|
jpayne@69
|
944 // not work correctly in the child, even if the parent ceases to use its copy. In particular
|
jpayne@69
|
945 // note that this means that server processes which daemonize themselves at startup must wait
|
jpayne@69
|
946 // until after daemonization to create an AsyncIoContext.
|
jpayne@69
|
947
|
jpayne@69
|
948 // =======================================================================================
|
jpayne@69
|
949 // Convenience adapters.
|
jpayne@69
|
950
|
jpayne@69
|
951 class CapabilityStreamConnectionReceiver final: public ConnectionReceiver {
|
jpayne@69
|
952 // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept()
|
jpayne@69
|
953 // calls receiveStream().
|
jpayne@69
|
954
|
jpayne@69
|
955 public:
|
jpayne@69
|
956 CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner)
|
jpayne@69
|
957 : inner(inner) {}
|
jpayne@69
|
958
|
jpayne@69
|
959 Promise<Own<AsyncIoStream>> accept() override;
|
jpayne@69
|
960 uint getPort() override;
|
jpayne@69
|
961
|
jpayne@69
|
962 Promise<AuthenticatedStream> acceptAuthenticated() override;
|
jpayne@69
|
963 // Always produces UnknownIdentity. Capability-based security patterns should not rely on
|
jpayne@69
|
964 // authenticating peers; the other end of the capability stream should only be given to
|
jpayne@69
|
965 // authorized parties in the first place.
|
jpayne@69
|
966
|
jpayne@69
|
967 private:
|
jpayne@69
|
968 AsyncCapabilityStream& inner;
|
jpayne@69
|
969 };
|
jpayne@69
|
970
|
jpayne@69
|
971 class CapabilityStreamNetworkAddress final: public NetworkAddress {
|
jpayne@69
|
972 // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress.
|
jpayne@69
|
973 //
|
jpayne@69
|
974 // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the
|
jpayne@69
|
975 // original capability stream, and returning the other end. If `provider` is null, then the
|
jpayne@69
|
976 // global kj::newCapabilityPipe() will be used, but this ONLY works if `inner` itself is agnostic
|
jpayne@69
|
977 // to the type of streams it receives, e.g. because it was also created using
|
jpayne@69
|
978 // kj::NewCapabilityPipe().
|
jpayne@69
|
979 //
|
jpayne@69
|
980 // listen().accept() is implemented by receiving new streams over the original stream.
|
jpayne@69
|
981 //
|
jpayne@69
|
982 // Note that clone() doesn't work (due to ownership issues) and toString() returns a static
|
jpayne@69
|
983 // string.
|
jpayne@69
|
984
|
jpayne@69
|
985 public:
|
jpayne@69
|
986 CapabilityStreamNetworkAddress(kj::Maybe<AsyncIoProvider&> provider, AsyncCapabilityStream& inner)
|
jpayne@69
|
987 : provider(provider), inner(inner) {}
|
jpayne@69
|
988
|
jpayne@69
|
989 Promise<Own<AsyncIoStream>> connect() override;
|
jpayne@69
|
990 Own<ConnectionReceiver> listen() override;
|
jpayne@69
|
991
|
jpayne@69
|
992 Own<NetworkAddress> clone() override;
|
jpayne@69
|
993 String toString() override;
|
jpayne@69
|
994
|
jpayne@69
|
995 Promise<AuthenticatedStream> connectAuthenticated() override;
|
jpayne@69
|
996 // Always produces UnknownIdentity. Capability-based security patterns should not rely on
|
jpayne@69
|
997 // authenticating peers; the other end of the capability stream should only be given to
|
jpayne@69
|
998 // authorized parties in the first place.
|
jpayne@69
|
999
|
jpayne@69
|
1000 private:
|
jpayne@69
|
1001 kj::Maybe<AsyncIoProvider&> provider;
|
jpayne@69
|
1002 AsyncCapabilityStream& inner;
|
jpayne@69
|
1003 };
|
jpayne@69
|
1004
|
jpayne@69
|
1005 class FileInputStream: public AsyncInputStream {
|
jpayne@69
|
1006 // InputStream that reads from a disk file -- and enables sendfile() optimization.
|
jpayne@69
|
1007 //
|
jpayne@69
|
1008 // Reads are performed synchronously -- no actual attempt is made to use asynchronous file I/O.
|
jpayne@69
|
1009 // True asynchronous file I/O is complicated and is mostly unnecessary in the presence of
|
jpayne@69
|
1010 // caching. Only certain niche programs can expect to benefit from it. For the rest, it's better
|
jpayne@69
|
1011 // to use regular syrchronous disk I/O, so that's what this class does.
|
jpayne@69
|
1012 //
|
jpayne@69
|
1013 // The real purpose of this class, aside from general convenience, is to enable sendfile()
|
jpayne@69
|
1014 // optimization. When you use this class's pumpTo() method, and the destination is a socket,
|
jpayne@69
|
1015 // the system will detect this and optimize to sendfile(), so that the file data never needs to
|
jpayne@69
|
1016 // be read into userspace.
|
jpayne@69
|
1017 //
|
jpayne@69
|
1018 // NOTE: As of this writing, sendfile() optimization is only implemented on Linux.
|
jpayne@69
|
1019
|
jpayne@69
|
1020 public:
|
jpayne@69
|
1021 FileInputStream(const ReadableFile& file, uint64_t offset = 0)
|
jpayne@69
|
1022 : file(file), offset(offset) {}
|
jpayne@69
|
1023
|
jpayne@69
|
1024 const ReadableFile& getUnderlyingFile() { return file; }
|
jpayne@69
|
1025 uint64_t getOffset() { return offset; }
|
jpayne@69
|
1026 void seek(uint64_t newOffset) { offset = newOffset; }
|
jpayne@69
|
1027
|
jpayne@69
|
1028 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes);
|
jpayne@69
|
1029 Maybe<uint64_t> tryGetLength();
|
jpayne@69
|
1030
|
jpayne@69
|
1031 // (pumpTo() is not actually overridden here, but AsyncStreamFd's tryPumpFrom() will detect when
|
jpayne@69
|
1032 // the source is a file.)
|
jpayne@69
|
1033
|
jpayne@69
|
1034 private:
|
jpayne@69
|
1035 const ReadableFile& file;
|
jpayne@69
|
1036 uint64_t offset;
|
jpayne@69
|
1037 };
|
jpayne@69
|
1038
|
jpayne@69
|
1039 class FileOutputStream: public AsyncOutputStream {
|
jpayne@69
|
1040 // OutputStream that writes to a disk file.
|
jpayne@69
|
1041 //
|
jpayne@69
|
1042 // As with FileInputStream, calls are not actually async. Async would be even less useful here
|
jpayne@69
|
1043 // because writes should usually land in cache anyway.
|
jpayne@69
|
1044 //
|
jpayne@69
|
1045 // sendfile() optimization does not apply when writing to a file, but on Linux, splice() can
|
jpayne@69
|
1046 // be used to achieve a similar effect.
|
jpayne@69
|
1047 //
|
jpayne@69
|
1048 // NOTE: As of this writing, splice() optimization is not implemented.
|
jpayne@69
|
1049
|
jpayne@69
|
1050 public:
|
jpayne@69
|
1051 FileOutputStream(const File& file, uint64_t offset = 0)
|
jpayne@69
|
1052 : file(file), offset(offset) {}
|
jpayne@69
|
1053
|
jpayne@69
|
1054 const File& getUnderlyingFile() { return file; }
|
jpayne@69
|
1055 uint64_t getOffset() { return offset; }
|
jpayne@69
|
1056 void seek(uint64_t newOffset) { offset = newOffset; }
|
jpayne@69
|
1057
|
jpayne@69
|
1058 Promise<void> write(const void* buffer, size_t size);
|
jpayne@69
|
1059 Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces);
|
jpayne@69
|
1060 Promise<void> whenWriteDisconnected();
|
jpayne@69
|
1061
|
jpayne@69
|
1062 private:
|
jpayne@69
|
1063 const File& file;
|
jpayne@69
|
1064 uint64_t offset;
|
jpayne@69
|
1065 };
|
jpayne@69
|
1066
|
jpayne@69
|
1067 // =======================================================================================
|
jpayne@69
|
1068 // inline implementation details
|
jpayne@69
|
1069
|
jpayne@69
|
1070 inline AncillaryMessage::AncillaryMessage(
|
jpayne@69
|
1071 int level, int type, ArrayPtr<const byte> data)
|
jpayne@69
|
1072 : level(level), type(type), data(data) {}
|
jpayne@69
|
1073
|
jpayne@69
|
1074 inline int AncillaryMessage::getLevel() const { return level; }
|
jpayne@69
|
1075 inline int AncillaryMessage::getType() const { return type; }
|
jpayne@69
|
1076
|
jpayne@69
|
1077 template <typename T>
|
jpayne@69
|
1078 inline Maybe<const T&> AncillaryMessage::as() const {
|
jpayne@69
|
1079 if (data.size() >= sizeof(T)) {
|
jpayne@69
|
1080 return *reinterpret_cast<const T*>(data.begin());
|
jpayne@69
|
1081 } else {
|
jpayne@69
|
1082 return nullptr;
|
jpayne@69
|
1083 }
|
jpayne@69
|
1084 }
|
jpayne@69
|
1085
|
jpayne@69
|
1086 template <typename T>
|
jpayne@69
|
1087 inline ArrayPtr<const T> AncillaryMessage::asArray() const {
|
jpayne@69
|
1088 return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
|
jpayne@69
|
1089 }
|
jpayne@69
|
1090
|
jpayne@69
|
1091 class SecureNetworkWrapper {
|
jpayne@69
|
1092 // Abstract interface for a class which implements a "secure" network as a wrapper around an
|
jpayne@69
|
1093 // insecure one. "secure" means:
|
jpayne@69
|
1094 // * Connections to a server will only succeed if it can be verified that the requested hostname
|
jpayne@69
|
1095 // actually belongs to the responding server.
|
jpayne@69
|
1096 // * No man-in-the-middle attacker can potentially see the bytes sent and received.
|
jpayne@69
|
1097 //
|
jpayne@69
|
1098 // The typical implementation uses TLS. The object in this case could be configured to use cerain
|
jpayne@69
|
1099 // keys, certificates, etc. See kj/compat/tls.h for such an implementation.
|
jpayne@69
|
1100 //
|
jpayne@69
|
1101 // However, an implementation could use some other form of encryption, or might not need to use
|
jpayne@69
|
1102 // encryption at all. For example, imagine a kj::Network that exists only on a single machine,
|
jpayne@69
|
1103 // providing communications between various processes using unix sockets. Perhaps the "hostnames"
|
jpayne@69
|
1104 // are actually PIDs in this case. An implementation of such a network could verify the other
|
jpayne@69
|
1105 // side's identity using an `SCM_CREDENTIALS` auxiliary message, which cannot be forged. Once
|
jpayne@69
|
1106 // verified, there is no need to encrypt since unix sockets cannot be intercepted.
|
jpayne@69
|
1107
|
jpayne@69
|
1108 public:
|
jpayne@69
|
1109 virtual kj::Promise<kj::Own<kj::AsyncIoStream>> wrapServer(kj::Own<kj::AsyncIoStream> stream) = 0;
|
jpayne@69
|
1110 // Act as the server side of a connection. The given stream is already connected to a client, but
|
jpayne@69
|
1111 // no authentication has occurred. The returned stream represents the secure transport once
|
jpayne@69
|
1112 // established.
|
jpayne@69
|
1113
|
jpayne@69
|
1114 virtual kj::Promise<kj::Own<kj::AsyncIoStream>> wrapClient(
|
jpayne@69
|
1115 kj::Own<kj::AsyncIoStream> stream, kj::StringPtr expectedServerHostname) = 0;
|
jpayne@69
|
1116 // Act as the client side of a connection. The given stream is already connecetd to a server, but
|
jpayne@69
|
1117 // no authentication has occurred. This method will verify that the server actually is the given
|
jpayne@69
|
1118 // hostname, then return the stream representing a secure transport to that server.
|
jpayne@69
|
1119
|
jpayne@69
|
1120 virtual kj::Promise<kj::AuthenticatedStream> wrapServer(kj::AuthenticatedStream stream) = 0;
|
jpayne@69
|
1121 virtual kj::Promise<kj::AuthenticatedStream> wrapClient(
|
jpayne@69
|
1122 kj::AuthenticatedStream stream, kj::StringPtr expectedServerHostname) = 0;
|
jpayne@69
|
1123 // Same as above, but implementing kj::AuthenticatedStream, which provides PeerIdentity objects
|
jpayne@69
|
1124 // with more details about the peer. The SecureNetworkWrapper will provide its own implementation
|
jpayne@69
|
1125 // of PeerIdentity with the specific details it is able to authenticate.
|
jpayne@69
|
1126
|
jpayne@69
|
1127 virtual kj::Own<kj::ConnectionReceiver> wrapPort(kj::Own<kj::ConnectionReceiver> port) = 0;
|
jpayne@69
|
1128 // Wrap a connection listener. This is equivalent to calling wrapServer() on every connection
|
jpayne@69
|
1129 // received.
|
jpayne@69
|
1130
|
jpayne@69
|
1131 virtual kj::Own<kj::NetworkAddress> wrapAddress(
|
jpayne@69
|
1132 kj::Own<kj::NetworkAddress> address, kj::StringPtr expectedServerHostname) = 0;
|
jpayne@69
|
1133 // Wrap a NetworkAddress. This is equivalent to calling `wrapClient()` on every connection
|
jpayne@69
|
1134 // formed by calling `connect()` on the address.
|
jpayne@69
|
1135
|
jpayne@69
|
1136 virtual kj::Own<kj::Network> wrapNetwork(kj::Network& network) = 0;
|
jpayne@69
|
1137 // Wrap a whole `kj::Network`. This automatically wraps everything constructed using the network.
|
jpayne@69
|
1138 // The network will only accept address strings that can be authenticated, and will automatically
|
jpayne@69
|
1139 // authenticate servers against those addresses when connecting to them.
|
jpayne@69
|
1140 };
|
jpayne@69
|
1141
|
jpayne@69
|
1142 } // namespace kj
|
jpayne@69
|
1143
|
jpayne@69
|
1144 KJ_END_HEADER
|