jpayne@69
|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
jpayne@69
|
2 // Licensed under the MIT License:
|
jpayne@69
|
3 //
|
jpayne@69
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
jpayne@69
|
5 // of this software and associated documentation files (the "Software"), to deal
|
jpayne@69
|
6 // in the Software without restriction, including without limitation the rights
|
jpayne@69
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
jpayne@69
|
8 // copies of the Software, and to permit persons to whom the Software is
|
jpayne@69
|
9 // furnished to do so, subject to the following conditions:
|
jpayne@69
|
10 //
|
jpayne@69
|
11 // The above copyright notice and this permission notice shall be included in
|
jpayne@69
|
12 // all copies or substantial portions of the Software.
|
jpayne@69
|
13 //
|
jpayne@69
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
jpayne@69
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
jpayne@69
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
jpayne@69
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
jpayne@69
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
jpayne@69
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
jpayne@69
|
20 // THE SOFTWARE.
|
jpayne@69
|
21
|
jpayne@69
|
22 #pragma once
|
jpayne@69
|
23
|
jpayne@69
|
24 #include "async-prelude.h"
|
jpayne@69
|
25 #include <kj/exception.h>
|
jpayne@69
|
26 #include <kj/refcount.h>
|
jpayne@69
|
27
|
jpayne@69
|
28 KJ_BEGIN_HEADER
|
jpayne@69
|
29
|
jpayne@69
|
30 #ifndef KJ_USE_FIBERS
|
jpayne@69
|
31 #if __BIONIC__ || __FreeBSD__ || __OpenBSD__ || KJ_NO_EXCEPTIONS
|
jpayne@69
|
32 // These platforms don't support fibers.
|
jpayne@69
|
33 #define KJ_USE_FIBERS 0
|
jpayne@69
|
34 #else
|
jpayne@69
|
35 #define KJ_USE_FIBERS 1
|
jpayne@69
|
36 #endif
|
jpayne@69
|
37 #else
|
jpayne@69
|
38 #if KJ_NO_EXCEPTIONS && KJ_USE_FIBERS
|
jpayne@69
|
39 #error "Fibers cannot be enabled when exceptions are disabled."
|
jpayne@69
|
40 #endif
|
jpayne@69
|
41 #endif
|
jpayne@69
|
42
|
jpayne@69
|
43 namespace kj {
|
jpayne@69
|
44
|
jpayne@69
|
45 class EventLoop;
|
jpayne@69
|
46 class WaitScope;
|
jpayne@69
|
47
|
jpayne@69
|
48 template <typename T>
|
jpayne@69
|
49 class Promise;
|
jpayne@69
|
50 template <typename T>
|
jpayne@69
|
51 class ForkedPromise;
|
jpayne@69
|
52 template <typename T>
|
jpayne@69
|
53 class PromiseFulfiller;
|
jpayne@69
|
54 template <typename T>
|
jpayne@69
|
55 struct PromiseFulfillerPair;
|
jpayne@69
|
56
|
jpayne@69
|
57 template <typename Func>
|
jpayne@69
|
58 class FunctionParam;
|
jpayne@69
|
59
|
jpayne@69
|
60 template <typename Func, typename T>
|
jpayne@69
|
61 using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>;
|
jpayne@69
|
62 // Evaluates to the type of Promise for the result of calling functor type Func with parameter type
|
jpayne@69
|
63 // T. If T is void, then the promise is for the result of calling Func with no arguments. If
|
jpayne@69
|
64 // Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>.
|
jpayne@69
|
65
|
jpayne@69
|
66 // =======================================================================================
|
jpayne@69
|
67
|
jpayne@69
|
68 class AsyncObject {
|
jpayne@69
|
69 // You may optionally inherit privately from this to indicate that the type is a KJ async object,
|
jpayne@69
|
70 // meaning it deals with KJ async I/O making it tied to a specific thread and event loop. This
|
jpayne@69
|
71 // enables some additional debug checks, but does not otherwise have any effect on behavior as
|
jpayne@69
|
72 // long as there are no bugs.
|
jpayne@69
|
73 //
|
jpayne@69
|
74 // (We prefer inheritance rather than composition here because inheriting an empty type adds zero
|
jpayne@69
|
75 // size to the derived class.)
|
jpayne@69
|
76
|
jpayne@69
|
77 public:
|
jpayne@69
|
78 ~AsyncObject();
|
jpayne@69
|
79
|
jpayne@69
|
80 private:
|
jpayne@69
|
81 KJ_NORETURN(static void failed() noexcept);
|
jpayne@69
|
82 };
|
jpayne@69
|
83
|
jpayne@69
|
84 class DisallowAsyncDestructorsScope {
|
jpayne@69
|
85 // Create this type on the stack in order to specify that during its scope, no KJ async objects
|
jpayne@69
|
86 // should be destroyed. If AsyncObject's destructor is called in this scope, the process will
|
jpayne@69
|
87 // crash with std::terminate().
|
jpayne@69
|
88 //
|
jpayne@69
|
89 // This is useful as a sort of "sanitizer" to catch bugs. When tearing down an object that is
|
jpayne@69
|
90 // intended to be passed between threads, you can set up one of these scopes to catch whether
|
jpayne@69
|
91 // the object contains any async objects, which are not legal to pass across threads.
|
jpayne@69
|
92
|
jpayne@69
|
93 public:
|
jpayne@69
|
94 explicit DisallowAsyncDestructorsScope(kj::StringPtr reason);
|
jpayne@69
|
95 ~DisallowAsyncDestructorsScope();
|
jpayne@69
|
96 KJ_DISALLOW_COPY_AND_MOVE(DisallowAsyncDestructorsScope);
|
jpayne@69
|
97
|
jpayne@69
|
98 private:
|
jpayne@69
|
99 kj::StringPtr reason;
|
jpayne@69
|
100 DisallowAsyncDestructorsScope* previousValue;
|
jpayne@69
|
101
|
jpayne@69
|
102 friend class AsyncObject;
|
jpayne@69
|
103 };
|
jpayne@69
|
104
|
jpayne@69
|
105 class AllowAsyncDestructorsScope {
|
jpayne@69
|
106 // Negates the effect of DisallowAsyncDestructorsScope.
|
jpayne@69
|
107
|
jpayne@69
|
108 public:
|
jpayne@69
|
109 AllowAsyncDestructorsScope();
|
jpayne@69
|
110 ~AllowAsyncDestructorsScope();
|
jpayne@69
|
111 KJ_DISALLOW_COPY_AND_MOVE(AllowAsyncDestructorsScope);
|
jpayne@69
|
112
|
jpayne@69
|
113 private:
|
jpayne@69
|
114 DisallowAsyncDestructorsScope* previousValue;
|
jpayne@69
|
115 };
|
jpayne@69
|
116
|
jpayne@69
|
117 // =======================================================================================
|
jpayne@69
|
118 // Promises
|
jpayne@69
|
119
|
jpayne@69
|
120 template <typename T>
|
jpayne@69
|
121 class Promise: protected _::PromiseBase {
|
jpayne@69
|
122 // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed
|
jpayne@69
|
123 // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A.
|
jpayne@69
|
124 //
|
jpayne@69
|
125 // A Promise represents a promise to produce a value of type T some time in the future. Once
|
jpayne@69
|
126 // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be
|
jpayne@69
|
127 // "broken", with an Exception describing what went wrong. You may implicitly convert a value of
|
jpayne@69
|
128 // type T to an already-fulfilled Promise<T>. You may implicitly convert the constant
|
jpayne@69
|
129 // `kj::READY_NOW` to an already-fulfilled Promise<void>. You may also implicitly convert a
|
jpayne@69
|
130 // `kj::Exception` to an already-broken promise of any type.
|
jpayne@69
|
131 //
|
jpayne@69
|
132 // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed
|
jpayne@69
|
133 // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations
|
jpayne@69
|
134 // meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless
|
jpayne@69
|
135 // otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they
|
jpayne@69
|
136 // should be rvalue-qualified, but at the time this interface was created compilers didn't widely
|
jpayne@69
|
137 // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If
|
jpayne@69
|
138 // you want to use one Promise in two different places, you must fork it with `fork()`.
|
jpayne@69
|
139 //
|
jpayne@69
|
140 // To use the result of a Promise, you must call `then()` and supply a callback function to
|
jpayne@69
|
141 // call with the result. `then()` returns another promise, for the result of the callback.
|
jpayne@69
|
142 // Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a
|
jpayne@69
|
143 // simple Promise<T> that first waits for the outer promise, then the inner. Example:
|
jpayne@69
|
144 //
|
jpayne@69
|
145 // // Open a remote file, read the content, and then count the
|
jpayne@69
|
146 // // number of lines of text.
|
jpayne@69
|
147 // // Note that none of the calls here block. `file`, `content`
|
jpayne@69
|
148 // // and `lineCount` are all initialized immediately before any
|
jpayne@69
|
149 // // asynchronous operations occur. The lambda callbacks are
|
jpayne@69
|
150 // // called later.
|
jpayne@69
|
151 // Promise<Own<File>> file = openFtp("ftp://host/foo/bar");
|
jpayne@69
|
152 // Promise<String> content = file.then(
|
jpayne@69
|
153 // [](Own<File> file) -> Promise<String> {
|
jpayne@69
|
154 // return file.readAll();
|
jpayne@69
|
155 // });
|
jpayne@69
|
156 // Promise<int> lineCount = content.then(
|
jpayne@69
|
157 // [](String text) -> int {
|
jpayne@69
|
158 // uint count = 0;
|
jpayne@69
|
159 // for (char c: text) count += (c == '\n');
|
jpayne@69
|
160 // return count;
|
jpayne@69
|
161 // });
|
jpayne@69
|
162 //
|
jpayne@69
|
163 // For `then()` to work, the current thread must have an active `EventLoop`. Each callback
|
jpayne@69
|
164 // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current
|
jpayne@69
|
165 // thread's event loop, you do not need to worry about two callbacks running at the same time.
|
jpayne@69
|
166 // You will need to set up at least one `EventLoop` at the top level of your program before you
|
jpayne@69
|
167 // can use promises.
|
jpayne@69
|
168 //
|
jpayne@69
|
169 // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`.
|
jpayne@69
|
170 //
|
jpayne@69
|
171 // Systems using promises should consider supporting the concept of "pipelining". Pipelining
|
jpayne@69
|
172 // means allowing a caller to start issuing method calls against a promised object before the
|
jpayne@69
|
173 // promise has actually been fulfilled. This is particularly useful if the promise is for a
|
jpayne@69
|
174 // remote object living across a network, as this can avoid round trips when chaining a series
|
jpayne@69
|
175 // of calls. It is suggested that any class T which supports pipelining implement a subclass of
|
jpayne@69
|
176 // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please
|
jpayne@69
|
177 // invoke the corresponding method on the promised value once it is available". These methods
|
jpayne@69
|
178 // should in turn return promises for the eventual results of said invocations. Cap'n Proto,
|
jpayne@69
|
179 // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see
|
jpayne@69
|
180 // `capnp/capability.h`.
|
jpayne@69
|
181 //
|
jpayne@69
|
182 // KJ Promises are based on E promises:
|
jpayne@69
|
183 // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises
|
jpayne@69
|
184 //
|
jpayne@69
|
185 // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript
|
jpayne@69
|
186 // promises, which are themselves influenced by E promises:
|
jpayne@69
|
187 // http://promisesaplus.com/
|
jpayne@69
|
188 // https://github.com/domenic/promises-unwrapping
|
jpayne@69
|
189
|
jpayne@69
|
190 public:
|
jpayne@69
|
191 Promise(_::FixVoid<T> value);
|
jpayne@69
|
192 // Construct an already-fulfilled Promise from a value of type T. For non-void promises, the
|
jpayne@69
|
193 // parameter type is simply T. So, e.g., in a function that returns `Promise<int>`, you can
|
jpayne@69
|
194 // say `return 123;` to return a promise that is already fulfilled to 123.
|
jpayne@69
|
195 //
|
jpayne@69
|
196 // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`.
|
jpayne@69
|
197
|
jpayne@69
|
198 Promise(kj::Exception&& e);
|
jpayne@69
|
199 // Construct an already-broken Promise.
|
jpayne@69
|
200
|
jpayne@69
|
201 inline Promise(decltype(nullptr)) {}
|
jpayne@69
|
202
|
jpayne@69
|
203 template <typename Func, typename ErrorFunc = _::PropagateException>
|
jpayne@69
|
204 PromiseForResult<Func, T> then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException(),
|
jpayne@69
|
205 SourceLocation location = {}) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
206 // Register a continuation function to be executed when the promise completes. The continuation
|
jpayne@69
|
207 // (`func`) takes the promised value (an rvalue of type `T`) as its parameter. The continuation
|
jpayne@69
|
208 // may return a new value; `then()` itself returns a promise for the continuation's eventual
|
jpayne@69
|
209 // result. If the continuation itself returns a `Promise<U>`, then `then()` shall also return
|
jpayne@69
|
210 // a `Promise<U>` which first waits for the original promise, then executes the continuation,
|
jpayne@69
|
211 // then waits for the inner promise (i.e. it automatically "unwraps" the promise).
|
jpayne@69
|
212 //
|
jpayne@69
|
213 // In all cases, `then()` returns immediately. The continuation is executed later. The
|
jpayne@69
|
214 // continuation is always executed on the same EventLoop (and, therefore, the same thread) which
|
jpayne@69
|
215 // called `then()`, therefore no synchronization is necessary on state shared by the continuation
|
jpayne@69
|
216 // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws
|
jpayne@69
|
217 // an exception.
|
jpayne@69
|
218 //
|
jpayne@69
|
219 // You may also specify an error handler continuation as the second parameter. `errorHandler`
|
jpayne@69
|
220 // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same
|
jpayne@69
|
221 // type as `func` returns (except when `func` returns `Promise<U>`, in which case `errorHandler`
|
jpayne@69
|
222 // may return either `Promise<U>` or just `U`). The default error handler simply propagates the
|
jpayne@69
|
223 // exception to the returned promise.
|
jpayne@69
|
224 //
|
jpayne@69
|
225 // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise
|
jpayne@69
|
226 // is broken. When compiled with -fno-exceptions, the framework will still detect when a
|
jpayne@69
|
227 // recoverable exception was thrown inside of a continuation and will consider the promise
|
jpayne@69
|
228 // broken even though a (presumably garbage) result was returned.
|
jpayne@69
|
229 //
|
jpayne@69
|
230 // If the returned promise is destroyed before the callback runs, the callback will be canceled
|
jpayne@69
|
231 // (it will never run).
|
jpayne@69
|
232 //
|
jpayne@69
|
233 // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is
|
jpayne@69
|
234 // called, in the sense of move semantics. After returning, the original promise is no longer
|
jpayne@69
|
235 // valid, but `then()` returns a new promise.
|
jpayne@69
|
236 //
|
jpayne@69
|
237 // *Advanced implementation tips:* Most users will never need to worry about the below, but
|
jpayne@69
|
238 // it is good to be aware of.
|
jpayne@69
|
239 //
|
jpayne@69
|
240 // As an optimization, if the callback function `func` does _not_ return another promise, then
|
jpayne@69
|
241 // execution of `func` itself may be delayed until its result is known to be needed. The
|
jpayne@69
|
242 // expectation here is that `func` is just doing some transformation on the results, not
|
jpayne@69
|
243 // scheduling any other actions, therefore the system doesn't need to be proactive about
|
jpayne@69
|
244 // evaluating it. This way, a chain of trivial then() transformations can be executed all at
|
jpayne@69
|
245 // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()`
|
jpayne@69
|
246 // method to suppress this behavior.
|
jpayne@69
|
247 //
|
jpayne@69
|
248 // On the other hand, if `func` _does_ return another promise, then the system evaluates `func`
|
jpayne@69
|
249 // as soon as possible, because the promise it returns might be for a newly-scheduled
|
jpayne@69
|
250 // long-running asynchronous task.
|
jpayne@69
|
251 //
|
jpayne@69
|
252 // As another optimization, when a callback function registered with `then()` is actually
|
jpayne@69
|
253 // scheduled, it is scheduled to occur immediately, preempting other work in the event queue.
|
jpayne@69
|
254 // This allows a long chain of `then`s to execute all at once, improving cache locality by
|
jpayne@69
|
255 // clustering operations on the same data. However, this implies that starvation can occur
|
jpayne@69
|
256 // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for
|
jpayne@69
|
257 // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events
|
jpayne@69
|
258 // in the queue will get a chance to run before your callback is executed.
|
jpayne@69
|
259
|
jpayne@69
|
260 Promise<void> ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); }
|
jpayne@69
|
261 // Convenience method to convert the promise to a void promise by ignoring the return value.
|
jpayne@69
|
262 //
|
jpayne@69
|
263 // You must still wait on the returned promise if you want the task to execute.
|
jpayne@69
|
264
|
jpayne@69
|
265 template <typename ErrorFunc>
|
jpayne@69
|
266 Promise<T> catch_(ErrorFunc&& errorHandler, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
267 // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that
|
jpayne@69
|
268 // just returns its input.
|
jpayne@69
|
269
|
jpayne@69
|
270 T wait(WaitScope& waitScope, SourceLocation location = {});
|
jpayne@69
|
271 // Run the event loop until the promise is fulfilled, then return its result. If the promise
|
jpayne@69
|
272 // is rejected, throw an exception.
|
jpayne@69
|
273 //
|
jpayne@69
|
274 // wait() is primarily useful at the top level of a program -- typically, within the function
|
jpayne@69
|
275 // that allocated the EventLoop. For example, a program that performs one or two RPCs and then
|
jpayne@69
|
276 // exits would likely use wait() in its main() function to wait on each RPC. On the other hand,
|
jpayne@69
|
277 // server-side code generally cannot use wait(), because it has to be able to accept multiple
|
jpayne@69
|
278 // requests at once.
|
jpayne@69
|
279 //
|
jpayne@69
|
280 // If the promise is rejected, `wait()` throws an exception. If the program was compiled without
|
jpayne@69
|
281 // exceptions (-fno-exceptions), this will usually abort. In this case you really should first
|
jpayne@69
|
282 // use `then()` to set an appropriate handler for the exception case, so that the promise you
|
jpayne@69
|
283 // actually wait on never throws.
|
jpayne@69
|
284 //
|
jpayne@69
|
285 // `waitScope` is an object proving that the caller is in a scope where wait() is allowed. By
|
jpayne@69
|
286 // convention, any function which might call wait(), or which might call another function which
|
jpayne@69
|
287 // might call wait(), must take `WaitScope&` as one of its parameters. This is needed for two
|
jpayne@69
|
288 // reasons:
|
jpayne@69
|
289 // * `wait()` is not allowed during an event callback, because event callbacks are themselves
|
jpayne@69
|
290 // called during some other `wait()`, and such recursive `wait()`s would only be able to
|
jpayne@69
|
291 // complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer
|
jpayne@69
|
292 // than it is supposed to. To prevent this, a `WaitScope` cannot be constructed or used during
|
jpayne@69
|
293 // an event callback.
|
jpayne@69
|
294 // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()`
|
jpayne@69
|
295 // returns. This means that anyone calling `wait()` must be reentrant -- state may change
|
jpayne@69
|
296 // around them in arbitrary ways. Therefore, callers really need to know if a function they
|
jpayne@69
|
297 // are calling might wait(), and the `WaitScope&` parameter makes this clear.
|
jpayne@69
|
298 //
|
jpayne@69
|
299 // Usually, there is only one `WaitScope` for each `EventLoop`, and it can only be used at the
|
jpayne@69
|
300 // top level of the thread owning the loop. Calling `wait()` with this `WaitScope` is what
|
jpayne@69
|
301 // actually causes the event loop to run at all. This top-level `WaitScope` cannot be used
|
jpayne@69
|
302 // recursively, so cannot be used within an event callback.
|
jpayne@69
|
303 //
|
jpayne@69
|
304 // However, it is possible to obtain a `WaitScope` in lower-level code by using fibers. Use
|
jpayne@69
|
305 // kj::startFiber() to start some code executing on an alternate call stack. That code will get
|
jpayne@69
|
306 // its own `WaitScope` allowing it to operate in a synchronous style. In this case, `wait()`
|
jpayne@69
|
307 // switches back to the main stack in order to run the event loop, returning to the fiber's stack
|
jpayne@69
|
308 // once the awaited promise resolves.
|
jpayne@69
|
309
|
jpayne@69
|
310 bool poll(WaitScope& waitScope, SourceLocation location = {});
|
jpayne@69
|
311 // Returns true if a call to wait() would complete without blocking, false if it would block.
|
jpayne@69
|
312 //
|
jpayne@69
|
313 // If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an
|
jpayne@69
|
314 // attempt to resolve it. Only when there is nothing left to do will it return false.
|
jpayne@69
|
315 //
|
jpayne@69
|
316 // Generally, poll() is most useful in tests. Often, you may want to verify that a promise does
|
jpayne@69
|
317 // not resolve until some specific event occurs. To do so, poll() the promise before the event to
|
jpayne@69
|
318 // verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves.
|
jpayne@69
|
319 // The first poll() verifies that the promise doesn't resolve early, which would otherwise be
|
jpayne@69
|
320 // hard to do deterministically. The second poll() allows you to check that the promise has
|
jpayne@69
|
321 // resolved and avoid a wait() that might deadlock in the case that it hasn't.
|
jpayne@69
|
322 //
|
jpayne@69
|
323 // poll() is not supported in fibers; it will throw an exception.
|
jpayne@69
|
324
|
jpayne@69
|
325 ForkedPromise<T> fork(SourceLocation location = {}) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
326 // Forks the promise, so that multiple different clients can independently wait on the result.
|
jpayne@69
|
327 // `T` must be copy-constructable for this to work. Or, in the special case where `T` is
|
jpayne@69
|
328 // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same
|
jpayne@69
|
329 // (or an equivalent) object (probably implemented via reference counting).
|
jpayne@69
|
330
|
jpayne@69
|
331 _::SplitTuplePromise<T> split(SourceLocation location = {});
|
jpayne@69
|
332 // Split a promise for a tuple into a tuple of promises.
|
jpayne@69
|
333 //
|
jpayne@69
|
334 // E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns
|
jpayne@69
|
335 // `kj::Tuple<Promise<T>, Promise<U>>`.
|
jpayne@69
|
336
|
jpayne@69
|
337 Promise<T> exclusiveJoin(Promise<T>&& other, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
338 // Return a new promise that resolves when either the original promise resolves or `other`
|
jpayne@69
|
339 // resolves (whichever comes first). The promise that didn't resolve first is canceled.
|
jpayne@69
|
340
|
jpayne@69
|
341 // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions
|
jpayne@69
|
342 // and produces a tuple?
|
jpayne@69
|
343
|
jpayne@69
|
344 template <typename... Attachments>
|
jpayne@69
|
345 Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
346 // "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will
|
jpayne@69
|
347 // be destroyed when the promise resolves. This is useful when a promise's callback contains
|
jpayne@69
|
348 // pointers into some object and you want to make sure the object still exists when the callback
|
jpayne@69
|
349 // runs -- after calling then(), use attach() to add necessary objects to the result.
|
jpayne@69
|
350
|
jpayne@69
|
351 template <typename ErrorFunc>
|
jpayne@69
|
352 Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location = {})
|
jpayne@69
|
353 KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
354 Promise<T> eagerlyEvaluate(decltype(nullptr), SourceLocation location = {}) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
355 // Force eager evaluation of this promise. Use this if you are going to hold on to the promise
|
jpayne@69
|
356 // for awhile without consuming the result, but you want to make sure that the system actually
|
jpayne@69
|
357 // processes it.
|
jpayne@69
|
358 //
|
jpayne@69
|
359 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
|
jpayne@69
|
360 // `then()`, or the parameter to `catch_()`. We make you specify this because otherwise it's
|
jpayne@69
|
361 // easy to forget to handle errors in a promise that you never use. You may specify nullptr for
|
jpayne@69
|
362 // the error handler if you are sure that ignoring errors is fine, or if you know that you'll
|
jpayne@69
|
363 // eventually wait on the promise somewhere.
|
jpayne@69
|
364
|
jpayne@69
|
365 template <typename ErrorFunc>
|
jpayne@69
|
366 void detach(ErrorFunc&& errorHandler);
|
jpayne@69
|
367 // Allows the promise to continue running in the background until it completes or the
|
jpayne@69
|
368 // `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this
|
jpayne@69
|
369 // promise, you need to make sure that the promise owns all the objects it touches or make sure
|
jpayne@69
|
370 // those objects outlive the EventLoop.
|
jpayne@69
|
371 //
|
jpayne@69
|
372 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
|
jpayne@69
|
373 // `then()`, except that it must return void.
|
jpayne@69
|
374 //
|
jpayne@69
|
375 // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
|
jpayne@69
|
376 // canceled unless the callee explicitly permits it.
|
jpayne@69
|
377
|
jpayne@69
|
378 kj::String trace();
|
jpayne@69
|
379 // Returns a dump of debug info about this promise. Not for production use. Requires RTTI.
|
jpayne@69
|
380 // This method does NOT consume the promise as other methods do.
|
jpayne@69
|
381
|
jpayne@69
|
382 private:
|
jpayne@69
|
383 Promise(bool, _::OwnPromiseNode&& node): PromiseBase(kj::mv(node)) {}
|
jpayne@69
|
384 // Second parameter prevent ambiguity with immediate-value constructor.
|
jpayne@69
|
385
|
jpayne@69
|
386 friend class _::PromiseNode;
|
jpayne@69
|
387 };
|
jpayne@69
|
388
|
jpayne@69
|
389 template <typename T>
|
jpayne@69
|
390 class ForkedPromise {
|
jpayne@69
|
391 // The result of `Promise::fork()` and `EventLoop::fork()`. Allows branches to be created.
|
jpayne@69
|
392 // Like `Promise<T>`, this is a pass-by-move type.
|
jpayne@69
|
393
|
jpayne@69
|
394 public:
|
jpayne@69
|
395 inline ForkedPromise(decltype(nullptr)) {}
|
jpayne@69
|
396
|
jpayne@69
|
397 Promise<T> addBranch();
|
jpayne@69
|
398 // Add a new branch to the fork. The branch is equivalent to the original promise.
|
jpayne@69
|
399
|
jpayne@69
|
400 bool hasBranches();
|
jpayne@69
|
401 // Returns true if there are any branches that haven't been canceled.
|
jpayne@69
|
402
|
jpayne@69
|
403 private:
|
jpayne@69
|
404 Own<_::ForkHub<_::FixVoid<T>>> hub;
|
jpayne@69
|
405
|
jpayne@69
|
406 inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid<T>>>&& hub): hub(kj::mv(hub)) {}
|
jpayne@69
|
407
|
jpayne@69
|
408 friend class Promise<T>;
|
jpayne@69
|
409 friend class EventLoop;
|
jpayne@69
|
410 };
|
jpayne@69
|
411
|
jpayne@69
|
412 constexpr _::ReadyNow READY_NOW = _::ReadyNow();
|
jpayne@69
|
413 // Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
|
jpayne@69
|
414 // cast to `Promise<void>`.
|
jpayne@69
|
415
|
jpayne@69
|
416 constexpr _::NeverDone NEVER_DONE = _::NeverDone();
|
jpayne@69
|
417 // The opposite of `READY_NOW`, return this when the promise should never resolve. This can be
|
jpayne@69
|
418 // implicitly converted to any promise type. You may also call `NEVER_DONE.wait()` to wait
|
jpayne@69
|
419 // forever (useful for servers).
|
jpayne@69
|
420
|
jpayne@69
|
421 template <typename T, T value>
|
jpayne@69
|
422 Promise<T> constPromise();
|
jpayne@69
|
423 // Construct a Promise which resolves to the given constant value. This function is equivalent to
|
jpayne@69
|
424 // `Promise<T>(value)` except that it avoids an allocation.
|
jpayne@69
|
425
|
jpayne@69
|
426 template <typename Func>
|
jpayne@69
|
427 PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
428 // Schedule for the given zero-parameter function to be executed in the event loop at some
|
jpayne@69
|
429 // point in the near future. Returns a Promise for its result -- or, if `func()` itself returns
|
jpayne@69
|
430 // a promise, `evalLater()` returns a Promise for the result of resolving that promise.
|
jpayne@69
|
431 //
|
jpayne@69
|
432 // Example usage:
|
jpayne@69
|
433 // Promise<int> x = evalLater([]() { return 123; });
|
jpayne@69
|
434 //
|
jpayne@69
|
435 // The above is exactly equivalent to:
|
jpayne@69
|
436 // Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; });
|
jpayne@69
|
437 //
|
jpayne@69
|
438 // If the returned promise is destroyed before the callback runs, the callback will be canceled
|
jpayne@69
|
439 // (never called).
|
jpayne@69
|
440 //
|
jpayne@69
|
441 // If you schedule several evaluations with `evalLater` during the same callback, they are
|
jpayne@69
|
442 // guaranteed to be executed in order.
|
jpayne@69
|
443
|
jpayne@69
|
444 template <typename Func>
|
jpayne@69
|
445 PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
446 // Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns.
|
jpayne@69
|
447 // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the
|
jpayne@69
|
448 // main reason why `evalNow()` is useful.
|
jpayne@69
|
449
|
jpayne@69
|
450 template <typename Func>
|
jpayne@69
|
451 PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
452 // Like `evalLater()`, except that the function doesn't run until the event queue is otherwise
|
jpayne@69
|
453 // completely empty and the thread is about to suspend waiting for I/O.
|
jpayne@69
|
454 //
|
jpayne@69
|
455 // This is useful when you need to perform some disruptive action and you want to make sure that
|
jpayne@69
|
456 // you don't interrupt some other task between two .then() continuations. For example, say you want
|
jpayne@69
|
457 // to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw
|
jpayne@69
|
458 // them. It could be that a read() has completed and bytes have been transferred to the target
|
jpayne@69
|
459 // buffer, but the .then() callback that handles the read result hasn't executed yet. If you
|
jpayne@69
|
460 // cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do
|
jpayne@69
|
461 // evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out
|
jpayne@69
|
462 // and if you didn't receive the read result yet, then you know nothing has been read, and you can
|
jpayne@69
|
463 // simply drop the promise.
|
jpayne@69
|
464 //
|
jpayne@69
|
465 // If evalLast() is called multiple times, functions are executed in LIFO order. If the first
|
jpayne@69
|
466 // callback enqueues new events, then latter callbacks will not execute until those events are
|
jpayne@69
|
467 // drained.
|
jpayne@69
|
468
|
jpayne@69
|
469 ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space);
|
jpayne@69
|
470 kj::String getAsyncTrace();
|
jpayne@69
|
471 // If the event loop is currently running in this thread, get a trace back through the promise
|
jpayne@69
|
472 // chain leading to the currently-executing event. The format is the same as kj::getStackTrace()
|
jpayne@69
|
473 // from exception.c++.
|
jpayne@69
|
474
|
jpayne@69
|
475 template <typename Func>
|
jpayne@69
|
476 PromiseForResult<Func, void> retryOnDisconnect(Func&& func) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
477 // Promises to run `func()` asynchronously, retrying once if it fails with a DISCONNECTED exception.
|
jpayne@69
|
478 // If the retry also fails, the exception is passed through.
|
jpayne@69
|
479 //
|
jpayne@69
|
480 // `func()` should return a `Promise`. `retryOnDisconnect(func)` returns the same promise, except
|
jpayne@69
|
481 // with the retry logic added.
|
jpayne@69
|
482
|
jpayne@69
|
483 template <typename Func>
|
jpayne@69
|
484 PromiseForResult<Func, WaitScope&> startFiber(
|
jpayne@69
|
485 size_t stackSize, Func&& func, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
486 // Executes `func()` in a fiber, returning a promise for the eventual reseult. `func()` will be
|
jpayne@69
|
487 // passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on promises. Thus, `func()`
|
jpayne@69
|
488 // can be written in a synchronous, blocking style, instead of using `.then()`. This is often much
|
jpayne@69
|
489 // easier to write and read, and may even be significantly faster if it allows the use of stack
|
jpayne@69
|
490 // allocation rather than heap allocation.
|
jpayne@69
|
491 //
|
jpayne@69
|
492 // However, fibers have a major disadvantage: memory must be allocated for the fiber's call stack.
|
jpayne@69
|
493 // The entire stack must be allocated at once, making it necessary to choose a stack size upfront
|
jpayne@69
|
494 // that is big enough for whatever the fiber needs to do. Estimating this is often difficult. That
|
jpayne@69
|
495 // said, over-estimating is not too terrible since pages of the stack will actually be allocated
|
jpayne@69
|
496 // lazily when first accessed; actual memory usage will correspond to the "high watermark" of the
|
jpayne@69
|
497 // actual stack usage. That said, this lazy allocation forces page faults, which can be quite slow.
|
jpayne@69
|
498 // Worse, freeing a stack forces a TLB flush and shootdown -- all currently-executing threads will
|
jpayne@69
|
499 // have to be interrupted to flush their CPU cores' TLB caches.
|
jpayne@69
|
500 //
|
jpayne@69
|
501 // In short, when performance matters, you should try to avoid creating fibers very frequently.
|
jpayne@69
|
502
|
jpayne@69
|
503 class FiberPool final {
|
jpayne@69
|
504 // A freelist pool of fibers with a set stack size. This improves CPU usage with fibers at
|
jpayne@69
|
505 // the expense of memory usage. Fibers in this pool will always use the max amount of memory
|
jpayne@69
|
506 // used until the pool is destroyed.
|
jpayne@69
|
507
|
jpayne@69
|
508 public:
|
jpayne@69
|
509 explicit FiberPool(size_t stackSize);
|
jpayne@69
|
510 ~FiberPool() noexcept(false);
|
jpayne@69
|
511 KJ_DISALLOW_COPY_AND_MOVE(FiberPool);
|
jpayne@69
|
512
|
jpayne@69
|
513 void setMaxFreelist(size_t count);
|
jpayne@69
|
514 // Set the maximum number of stacks to add to the freelist. If the freelist is full, stacks will
|
jpayne@69
|
515 // be deleted rather than returned to the freelist.
|
jpayne@69
|
516
|
jpayne@69
|
517 void useCoreLocalFreelists();
|
jpayne@69
|
518 // EXPERIMENTAL: Call to tell FiberPool to try to use core-local stack freelists, which
|
jpayne@69
|
519 // in theory should increase L1/L2 cache efficacy for freelisted stacks. In practice, as of
|
jpayne@69
|
520 // this writing, no performance advantage has yet been demonstrated. Note that currently this
|
jpayne@69
|
521 // feature is only supported on Linux (the flag has no effect on other operating systems).
|
jpayne@69
|
522
|
jpayne@69
|
523 template <typename Func>
|
jpayne@69
|
524 PromiseForResult<Func, WaitScope&> startFiber(
|
jpayne@69
|
525 Func&& func, SourceLocation location = {}) const KJ_WARN_UNUSED_RESULT;
|
jpayne@69
|
526 // Executes `func()` in a fiber from this pool, returning a promise for the eventual result.
|
jpayne@69
|
527 // `func()` will be passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on
|
jpayne@69
|
528 // promises. Thus, `func()` can be written in a synchronous, blocking style, instead of
|
jpayne@69
|
529 // using `.then()`. This is often much easier to write and read, and may even be significantly
|
jpayne@69
|
530 // faster if it allows the use of stack allocation rather than heap allocation.
|
jpayne@69
|
531
|
jpayne@69
|
532 void runSynchronously(kj::FunctionParam<void()> func) const;
|
jpayne@69
|
533 // Use one of the stacks in the pool to synchronously execute func(), returning the result that
|
jpayne@69
|
534 // func() returns. This is not the usual use case for fibers, but can be a nice optimization
|
jpayne@69
|
535 // in programs that have many threads that mostly only need small stacks, but occasionally need
|
jpayne@69
|
536 // a much bigger stack to run some deeply recursive algorithm. If the algorithm is run on each
|
jpayne@69
|
537 // thread's normal call stack, then every thread's stack will tend to grow to be very big
|
jpayne@69
|
538 // (usually, stacks automatically grow as needed, but do not shrink until the thread exits
|
jpayne@69
|
539 // completely). If the thread can share a small set of big stacks that they use only when calling
|
jpayne@69
|
540 // the deeply recursive algorithm, and use small stacks for everything else, overall memory usage
|
jpayne@69
|
541 // is reduced.
|
jpayne@69
|
542 //
|
jpayne@69
|
543 // TODO(someday): If func() returns a value, return it from runSynchronously? Current use case
|
jpayne@69
|
544 // doesn't need it.
|
jpayne@69
|
545
|
jpayne@69
|
546 size_t getFreelistSize() const;
|
jpayne@69
|
547 // Get the number of stacks currently in the freelist. Does not count stacks that are active.
|
jpayne@69
|
548
|
jpayne@69
|
549 private:
|
jpayne@69
|
550 class Impl;
|
jpayne@69
|
551 Own<Impl> impl;
|
jpayne@69
|
552
|
jpayne@69
|
553 friend class _::FiberStack;
|
jpayne@69
|
554 friend class _::FiberBase;
|
jpayne@69
|
555 };
|
jpayne@69
|
556
|
jpayne@69
|
557 template <typename T>
|
jpayne@69
|
558 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location = {});
|
jpayne@69
|
559 // Join an array of promises into a promise for an array. Trailing continuations on promises are not
|
jpayne@69
|
560 // evaluated until all promises have settled. Exceptions are propagated only after the last promise
|
jpayne@69
|
561 // has settled.
|
jpayne@69
|
562 //
|
jpayne@69
|
563 // TODO(cleanup): It is likely that `joinPromisesFailFast()` is what everyone should be using.
|
jpayne@69
|
564 // Deprecate this function.
|
jpayne@69
|
565
|
jpayne@69
|
566 template <typename T>
|
jpayne@69
|
567 Promise<Array<T>> joinPromisesFailFast(Array<Promise<T>>&& promises, SourceLocation location = {});
|
jpayne@69
|
568 // Join an array of promises into a promise for an array. Trailing continuations on promises are
|
jpayne@69
|
569 // evaluated eagerly. If any promise results in an exception, the exception is immediately
|
jpayne@69
|
570 // propagated to the returned join promise.
|
jpayne@69
|
571
|
jpayne@69
|
572 // =======================================================================================
|
jpayne@69
|
573 // Hack for creating a lambda that holds an owned pointer.
|
jpayne@69
|
574
|
jpayne@69
|
575 template <typename Func, typename MovedParam>
|
jpayne@69
|
576 class CaptureByMove {
|
jpayne@69
|
577 public:
|
jpayne@69
|
578 inline CaptureByMove(Func&& func, MovedParam&& param)
|
jpayne@69
|
579 : func(kj::mv(func)), param(kj::mv(param)) {}
|
jpayne@69
|
580
|
jpayne@69
|
581 template <typename... Params>
|
jpayne@69
|
582 inline auto operator()(Params&&... params)
|
jpayne@69
|
583 -> decltype(kj::instance<Func>()(kj::instance<MovedParam&&>(), kj::fwd<Params>(params)...)) {
|
jpayne@69
|
584 return func(kj::mv(param), kj::fwd<Params>(params)...);
|
jpayne@69
|
585 }
|
jpayne@69
|
586
|
jpayne@69
|
587 private:
|
jpayne@69
|
588 Func func;
|
jpayne@69
|
589 MovedParam param;
|
jpayne@69
|
590 };
|
jpayne@69
|
591
|
jpayne@69
|
592 template <typename Func, typename MovedParam>
|
jpayne@69
|
593 inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func)
|
jpayne@69
|
594 KJ_DEPRECATED("Use C++14 generalized captures instead.");
|
jpayne@69
|
595
|
jpayne@69
|
596 template <typename Func, typename MovedParam>
|
jpayne@69
|
597 inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) {
|
jpayne@69
|
598 // Hack to create a "lambda" which captures a variable by moving it rather than copying or
|
jpayne@69
|
599 // referencing. C++14 generalized captures should make this obsolete, but for now in C++11 this
|
jpayne@69
|
600 // is commonly needed for Promise continuations that own their state. Example usage:
|
jpayne@69
|
601 //
|
jpayne@69
|
602 // Own<Foo> ptr = makeFoo();
|
jpayne@69
|
603 // Promise<int> promise = callRpc();
|
jpayne@69
|
604 // promise.then(mvCapture(ptr, [](Own<Foo>&& ptr, int result) {
|
jpayne@69
|
605 // return ptr->finish(result);
|
jpayne@69
|
606 // }));
|
jpayne@69
|
607
|
jpayne@69
|
608 return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param));
|
jpayne@69
|
609 }
|
jpayne@69
|
610
|
jpayne@69
|
611 // =======================================================================================
|
jpayne@69
|
612 // Hack for safely using a lambda as a coroutine.
|
jpayne@69
|
613
|
jpayne@69
|
614 #if KJ_HAS_COROUTINE
|
jpayne@69
|
615
|
jpayne@69
|
616 namespace _ {
|
jpayne@69
|
617
|
jpayne@69
|
618 void throwMultipleCoCaptureInvocations();
|
jpayne@69
|
619
|
jpayne@69
|
620 template<typename Functor>
|
jpayne@69
|
621 struct CaptureForCoroutine {
|
jpayne@69
|
622 kj::Maybe<Functor> maybeFunctor;
|
jpayne@69
|
623
|
jpayne@69
|
624 explicit CaptureForCoroutine(Functor&& f) : maybeFunctor(kj::mv(f)) {}
|
jpayne@69
|
625
|
jpayne@69
|
626 template<typename ...Args>
|
jpayne@69
|
627 static auto coInvoke(Functor functor, Args&&... args)
|
jpayne@69
|
628 -> decltype(functor(kj::fwd<Args>(args)...)) {
|
jpayne@69
|
629 // Since the functor is now in the local scope and no longer a member variable, it will be
|
jpayne@69
|
630 // persisted in the coroutine state.
|
jpayne@69
|
631
|
jpayne@69
|
632 // Note that `co_await functor(...)` can still return `void`. It just happens that
|
jpayne@69
|
633 // `co_return voidReturn();` is explicitly allowed.
|
jpayne@69
|
634 co_return co_await functor(kj::fwd<Args>(args)...);
|
jpayne@69
|
635 }
|
jpayne@69
|
636
|
jpayne@69
|
637 template<typename ...Args>
|
jpayne@69
|
638 auto operator()(Args&&... args) {
|
jpayne@69
|
639 if (maybeFunctor == nullptr) {
|
jpayne@69
|
640 throwMultipleCoCaptureInvocations();
|
jpayne@69
|
641 }
|
jpayne@69
|
642 auto localFunctor = kj::mv(*kj::_::readMaybe(maybeFunctor));
|
jpayne@69
|
643 maybeFunctor = nullptr;
|
jpayne@69
|
644 return coInvoke(kj::mv(localFunctor), kj::fwd<Args>(args)...);
|
jpayne@69
|
645 }
|
jpayne@69
|
646 };
|
jpayne@69
|
647
|
jpayne@69
|
648 } // namespace _
|
jpayne@69
|
649
|
jpayne@69
|
650 template <typename Functor>
|
jpayne@69
|
651 auto coCapture(Functor&& f) {
|
jpayne@69
|
652 // Assuming `f()` returns a Promise<T> `p`, wrap `f` in such a way that it will outlive its
|
jpayne@69
|
653 // returned Promise. Note that the returned object may only be invoked once.
|
jpayne@69
|
654 //
|
jpayne@69
|
655 // This function is meant to help address this pain point with functors that return a coroutine:
|
jpayne@69
|
656 // https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rcoro-capture
|
jpayne@69
|
657 //
|
jpayne@69
|
658 // The two most common patterns where this may be useful look like so:
|
jpayne@69
|
659 // ```
|
jpayne@69
|
660 // void addTask(Value myValue) {
|
jpayne@69
|
661 // auto myFun = [myValue]() -> kj::Promise<void> {
|
jpayne@69
|
662 // ...
|
jpayne@69
|
663 // co_return;
|
jpayne@69
|
664 // };
|
jpayne@69
|
665 // tasks.add(myFun());
|
jpayne@69
|
666 // }
|
jpayne@69
|
667 // ```
|
jpayne@69
|
668 // and
|
jpayne@69
|
669 // ```
|
jpayne@69
|
670 // kj::Promise<void> afterPromise(kj::Promise<void> promise, Value myValue) {
|
jpayne@69
|
671 // auto myFun = [myValue]() -> kj::Promise<void> {
|
jpayne@69
|
672 // ...
|
jpayne@69
|
673 // co_return;
|
jpayne@69
|
674 // };
|
jpayne@69
|
675 // return promise.then(kj::mv(myFun));
|
jpayne@69
|
676 // }
|
jpayne@69
|
677 // ```
|
jpayne@69
|
678 //
|
jpayne@69
|
679 // Note that there are potentially more optimal alternatives to both of these patterns:
|
jpayne@69
|
680 // ```
|
jpayne@69
|
681 // void addTask(Value myValue) {
|
jpayne@69
|
682 // auto myFun = [](auto myValue) -> kj::Promise<void> {
|
jpayne@69
|
683 // ...
|
jpayne@69
|
684 // co_return;
|
jpayne@69
|
685 // };
|
jpayne@69
|
686 // tasks.add(myFun(myValue));
|
jpayne@69
|
687 // }
|
jpayne@69
|
688 // ```
|
jpayne@69
|
689 // and
|
jpayne@69
|
690 // ```
|
jpayne@69
|
691 // kj::Promise<void> afterPromise(kj::Promise<void> promise, Value myValue) {
|
jpayne@69
|
692 // auto myFun = [&]() -> kj::Promise<void> {
|
jpayne@69
|
693 // ...
|
jpayne@69
|
694 // co_return;
|
jpayne@69
|
695 // };
|
jpayne@69
|
696 // co_await promise;
|
jpayne@69
|
697 // co_await myFun();
|
jpayne@69
|
698 // co_return;
|
jpayne@69
|
699 // }
|
jpayne@69
|
700 // ```
|
jpayne@69
|
701 //
|
jpayne@69
|
702 // For situations where you are trying to capture a specific local variable, kj::mvCapture() can
|
jpayne@69
|
703 // also be useful:
|
jpayne@69
|
704 // ```
|
jpayne@69
|
705 // kj::Promise<void> reactToPromise(kj::Promise<MyType> promise) {
|
jpayne@69
|
706 // BigA a;
|
jpayne@69
|
707 // TinyB b;
|
jpayne@69
|
708 //
|
jpayne@69
|
709 // doSomething(a, b);
|
jpayne@69
|
710 // return promise.then(kj::mvCapture(b, [](TinyB b, MyType type) -> kj::Promise<void> {
|
jpayne@69
|
711 // ...
|
jpayne@69
|
712 // co_return;
|
jpayne@69
|
713 // });
|
jpayne@69
|
714 // }
|
jpayne@69
|
715 // ```
|
jpayne@69
|
716
|
jpayne@69
|
717 return _::CaptureForCoroutine(kj::mv(f));
|
jpayne@69
|
718 }
|
jpayne@69
|
719
|
jpayne@69
|
720 #endif // KJ_HAS_COROUTINE
|
jpayne@69
|
721
|
jpayne@69
|
722 // =======================================================================================
|
jpayne@69
|
723 // Advanced promise construction
|
jpayne@69
|
724
|
jpayne@69
|
725 class PromiseRejector: private AsyncObject {
|
jpayne@69
|
726 // Superclass of PromiseFulfiller containing the non-typed methods. Useful when you only really
|
jpayne@69
|
727 // need to be able to reject a promise, and you need to operate on fulfillers of different types.
|
jpayne@69
|
728 public:
|
jpayne@69
|
729 virtual void reject(Exception&& exception) = 0;
|
jpayne@69
|
730 virtual bool isWaiting() = 0;
|
jpayne@69
|
731 };
|
jpayne@69
|
732
|
jpayne@69
|
733 template <typename T>
|
jpayne@69
|
734 class PromiseFulfiller: public PromiseRejector {
|
jpayne@69
|
735 // A callback which can be used to fulfill a promise. Only the first call to fulfill() or
|
jpayne@69
|
736 // reject() matters; subsequent calls are ignored.
|
jpayne@69
|
737
|
jpayne@69
|
738 public:
|
jpayne@69
|
739 virtual void fulfill(T&& value) = 0;
|
jpayne@69
|
740 // Fulfill the promise with the given value.
|
jpayne@69
|
741
|
jpayne@69
|
742 virtual void reject(Exception&& exception) = 0;
|
jpayne@69
|
743 // Reject the promise with an error.
|
jpayne@69
|
744
|
jpayne@69
|
745 virtual bool isWaiting() = 0;
|
jpayne@69
|
746 // Returns true if the promise is still unfulfilled and someone is potentially waiting for it.
|
jpayne@69
|
747 // Returns false if fulfill()/reject() has already been called *or* if the promise to be
|
jpayne@69
|
748 // fulfilled has been discarded and therefore the result will never be used anyway.
|
jpayne@69
|
749
|
jpayne@69
|
750 template <typename Func>
|
jpayne@69
|
751 bool rejectIfThrows(Func&& func);
|
jpayne@69
|
752 // Call the function (with no arguments) and return true. If an exception is thrown, call
|
jpayne@69
|
753 // `fulfiller.reject()` and then return false. When compiled with exceptions disabled,
|
jpayne@69
|
754 // non-fatal exceptions are still detected and handled correctly.
|
jpayne@69
|
755 };
|
jpayne@69
|
756
|
jpayne@69
|
757 template <>
|
jpayne@69
|
758 class PromiseFulfiller<void>: public PromiseRejector {
|
jpayne@69
|
759 // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>.
|
jpayne@69
|
760
|
jpayne@69
|
761 public:
|
jpayne@69
|
762 virtual void fulfill(_::Void&& value = _::Void()) = 0;
|
jpayne@69
|
763 // Call with zero parameters. The parameter is a dummy that only exists so that subclasses don't
|
jpayne@69
|
764 // have to specialize for <void>.
|
jpayne@69
|
765
|
jpayne@69
|
766 virtual void reject(Exception&& exception) = 0;
|
jpayne@69
|
767 virtual bool isWaiting() = 0;
|
jpayne@69
|
768
|
jpayne@69
|
769 template <typename Func>
|
jpayne@69
|
770 bool rejectIfThrows(Func&& func);
|
jpayne@69
|
771 };
|
jpayne@69
|
772
|
jpayne@69
|
773 template <typename T, typename Adapter, typename... Params>
|
jpayne@69
|
774 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams);
|
jpayne@69
|
775 // Creates a new promise which owns an instance of `Adapter` which encapsulates the operation
|
jpayne@69
|
776 // that will eventually fulfill the promise. This is primarily useful for adapting non-KJ
|
jpayne@69
|
777 // asynchronous APIs to use promises.
|
jpayne@69
|
778 //
|
jpayne@69
|
779 // An instance of `Adapter` will be allocated and owned by the returned `Promise`. A
|
jpayne@69
|
780 // `PromiseFulfiller<T>&` will be passed as the first parameter to the adapter's constructor,
|
jpayne@69
|
781 // and `adapterConstructorParams` will be forwarded as the subsequent parameters. The adapter
|
jpayne@69
|
782 // is expected to perform some asynchronous operation and call the `PromiseFulfiller<T>` once
|
jpayne@69
|
783 // it is finished.
|
jpayne@69
|
784 //
|
jpayne@69
|
785 // The adapter is destroyed when its owning Promise is destroyed. This may occur before the
|
jpayne@69
|
786 // Promise has been fulfilled. In this case, the adapter's destructor should cancel the
|
jpayne@69
|
787 // asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be
|
jpayne@69
|
788 // called.
|
jpayne@69
|
789 //
|
jpayne@69
|
790 // An adapter implementation should be carefully written to ensure that it cannot accidentally
|
jpayne@69
|
791 // be left unfulfilled permanently because of an exception. Consider making liberal use of
|
jpayne@69
|
792 // `PromiseFulfiller<T>::rejectIfThrows()`.
|
jpayne@69
|
793
|
jpayne@69
|
794 template <typename T>
|
jpayne@69
|
795 struct PromiseFulfillerPair {
|
jpayne@69
|
796 _::ReducePromises<T> promise;
|
jpayne@69
|
797 Own<PromiseFulfiller<T>> fulfiller;
|
jpayne@69
|
798 };
|
jpayne@69
|
799
|
jpayne@69
|
800 template <typename T>
|
jpayne@69
|
801 PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location = {});
|
jpayne@69
|
802 // Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise.
|
jpayne@69
|
803 // If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is
|
jpayne@69
|
804 // implicitly rejected.
|
jpayne@69
|
805 //
|
jpayne@69
|
806 // Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback
|
jpayne@69
|
807 // that there is no way to handle cancellation (i.e. detect when the Promise is discarded).
|
jpayne@69
|
808 //
|
jpayne@69
|
809 // You can arrange to fulfill a promise with another promise by using a promise type for T. E.g.
|
jpayne@69
|
810 // `newPromiseAndFulfiller<Promise<U>>()` will produce a promise of type `Promise<U>` but the
|
jpayne@69
|
811 // fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the
|
jpayne@69
|
812 // `fulfill()` callback, and the promises are chained.
|
jpayne@69
|
813
|
jpayne@69
|
814 template <typename T>
|
jpayne@69
|
815 class CrossThreadPromiseFulfiller: public kj::PromiseFulfiller<T> {
|
jpayne@69
|
816 // Like PromiseFulfiller<T> but the methods are `const`, indicating they can safely be called
|
jpayne@69
|
817 // from another thread.
|
jpayne@69
|
818
|
jpayne@69
|
819 public:
|
jpayne@69
|
820 virtual void fulfill(T&& value) const = 0;
|
jpayne@69
|
821 virtual void reject(Exception&& exception) const = 0;
|
jpayne@69
|
822 virtual bool isWaiting() const = 0;
|
jpayne@69
|
823
|
jpayne@69
|
824 void fulfill(T&& value) override { return constThis()->fulfill(kj::fwd<T>(value)); }
|
jpayne@69
|
825 void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); }
|
jpayne@69
|
826 bool isWaiting() override { return constThis()->isWaiting(); }
|
jpayne@69
|
827
|
jpayne@69
|
828 private:
|
jpayne@69
|
829 const CrossThreadPromiseFulfiller* constThis() { return this; }
|
jpayne@69
|
830 };
|
jpayne@69
|
831
|
jpayne@69
|
832 template <>
|
jpayne@69
|
833 class CrossThreadPromiseFulfiller<void>: public kj::PromiseFulfiller<void> {
|
jpayne@69
|
834 // Specialization of CrossThreadPromiseFulfiller for void promises. See
|
jpayne@69
|
835 // CrossThreadPromiseFulfiller<T>.
|
jpayne@69
|
836
|
jpayne@69
|
837 public:
|
jpayne@69
|
838 virtual void fulfill(_::Void&& value = _::Void()) const = 0;
|
jpayne@69
|
839 virtual void reject(Exception&& exception) const = 0;
|
jpayne@69
|
840 virtual bool isWaiting() const = 0;
|
jpayne@69
|
841
|
jpayne@69
|
842 void fulfill(_::Void&& value) override { return constThis()->fulfill(kj::mv(value)); }
|
jpayne@69
|
843 void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); }
|
jpayne@69
|
844 bool isWaiting() override { return constThis()->isWaiting(); }
|
jpayne@69
|
845
|
jpayne@69
|
846 private:
|
jpayne@69
|
847 const CrossThreadPromiseFulfiller* constThis() { return this; }
|
jpayne@69
|
848 };
|
jpayne@69
|
849
|
jpayne@69
|
850 template <typename T>
|
jpayne@69
|
851 struct PromiseCrossThreadFulfillerPair {
|
jpayne@69
|
852 _::ReducePromises<T> promise;
|
jpayne@69
|
853 Own<CrossThreadPromiseFulfiller<T>> fulfiller;
|
jpayne@69
|
854 };
|
jpayne@69
|
855
|
jpayne@69
|
856 template <typename T>
|
jpayne@69
|
857 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller();
|
jpayne@69
|
858 // Like `newPromiseAndFulfiller()`, but the fulfiller is allowed to be invoked from any thread,
|
jpayne@69
|
859 // not just the one that called this method. Note that the Promise is still tied to the calling
|
jpayne@69
|
860 // thread's event loop and *cannot* be used from another thread -- only the PromiseFulfiller is
|
jpayne@69
|
861 // cross-thread.
|
jpayne@69
|
862
|
jpayne@69
|
863 // =======================================================================================
|
jpayne@69
|
864 // Canceler
|
jpayne@69
|
865
|
jpayne@69
|
866 class Canceler: private AsyncObject {
|
jpayne@69
|
867 // A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or
|
jpayne@69
|
868 // implicitly when the Canceler is destroyed.
|
jpayne@69
|
869 //
|
jpayne@69
|
870 // The cancellation is done in such a way that once cancel() (or the Canceler's destructor)
|
jpayne@69
|
871 // returns, it's guaranteed that the promise has already been canceled and destroyed. This
|
jpayne@69
|
872 // guarantee is important for enforcing ownership constraints. For example, imagine that Alice
|
jpayne@69
|
873 // calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's
|
jpayne@69
|
874 // internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed
|
jpayne@69
|
875 // at random without Alice having canceled the promise. In this case, it is necessary for Bob to
|
jpayne@69
|
876 // ensure that the promise will be forcefully canceled. Bob can do this by constructing a
|
jpayne@69
|
877 // Canceler and using it to wrap promises before returning them to callers. When Bob is
|
jpayne@69
|
878 // destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors.
|
jpayne@69
|
879 //
|
jpayne@69
|
880 // Note that another common strategy for cancellation is to use exclusiveJoin() to join a promise
|
jpayne@69
|
881 // with some "cancellation promise" which only resolves if the operation should be canceled. The
|
jpayne@69
|
882 // cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus
|
jpayne@69
|
883 // calling the PromiseFulfiller cancels the operation. There is a major problem with this
|
jpayne@69
|
884 // approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the
|
jpayne@69
|
885 // exclusive-joined promise actually resolves and cancels its other fork. During that time, the
|
jpayne@69
|
886 // task might continue to execute. If it holds pointers to objects that have been destroyed, this
|
jpayne@69
|
887 // might cause segfaults. Thus, it is safer to use a Canceler.
|
jpayne@69
|
888
|
jpayne@69
|
889 public:
|
jpayne@69
|
890 inline Canceler() {}
|
jpayne@69
|
891 ~Canceler() noexcept(false);
|
jpayne@69
|
892 KJ_DISALLOW_COPY_AND_MOVE(Canceler);
|
jpayne@69
|
893
|
jpayne@69
|
894 template <typename T>
|
jpayne@69
|
895 Promise<T> wrap(Promise<T> promise) {
|
jpayne@69
|
896 return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise));
|
jpayne@69
|
897 }
|
jpayne@69
|
898
|
jpayne@69
|
899 void cancel(StringPtr cancelReason);
|
jpayne@69
|
900 void cancel(const Exception& exception);
|
jpayne@69
|
901 // Cancel all previously-wrapped promises that have not already completed, causing them to throw
|
jpayne@69
|
902 // the given exception. If you provide just a description message instead of an exception, then
|
jpayne@69
|
903 // an exception object will be constructed from it -- but only if there are requests to cancel.
|
jpayne@69
|
904
|
jpayne@69
|
905 void release();
|
jpayne@69
|
906 // Releases previously-wrapped promises, so that they will not be canceled regardless of what
|
jpayne@69
|
907 // happens to this Canceler.
|
jpayne@69
|
908
|
jpayne@69
|
909 bool isEmpty() const { return list == nullptr; }
|
jpayne@69
|
910 // Indicates if any previously-wrapped promises are still executing. (If this returns true, then
|
jpayne@69
|
911 // cancel() would be a no-op.)
|
jpayne@69
|
912
|
jpayne@69
|
913 private:
|
jpayne@69
|
914 class AdapterBase {
|
jpayne@69
|
915 public:
|
jpayne@69
|
916 AdapterBase(Canceler& canceler);
|
jpayne@69
|
917 ~AdapterBase() noexcept(false);
|
jpayne@69
|
918
|
jpayne@69
|
919 virtual void cancel(Exception&& e) = 0;
|
jpayne@69
|
920
|
jpayne@69
|
921 void unlink();
|
jpayne@69
|
922
|
jpayne@69
|
923 private:
|
jpayne@69
|
924 Maybe<Maybe<AdapterBase&>&> prev;
|
jpayne@69
|
925 Maybe<AdapterBase&> next;
|
jpayne@69
|
926 friend class Canceler;
|
jpayne@69
|
927 };
|
jpayne@69
|
928
|
jpayne@69
|
929 template <typename T>
|
jpayne@69
|
930 class AdapterImpl: public AdapterBase {
|
jpayne@69
|
931 public:
|
jpayne@69
|
932 AdapterImpl(PromiseFulfiller<T>& fulfiller,
|
jpayne@69
|
933 Canceler& canceler, Promise<T> inner)
|
jpayne@69
|
934 : AdapterBase(canceler),
|
jpayne@69
|
935 fulfiller(fulfiller),
|
jpayne@69
|
936 inner(inner.then(
|
jpayne@69
|
937 [&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); },
|
jpayne@69
|
938 [&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); })
|
jpayne@69
|
939 .eagerlyEvaluate(nullptr)) {}
|
jpayne@69
|
940
|
jpayne@69
|
941 void cancel(Exception&& e) override {
|
jpayne@69
|
942 fulfiller.reject(kj::mv(e));
|
jpayne@69
|
943 inner = nullptr;
|
jpayne@69
|
944 }
|
jpayne@69
|
945
|
jpayne@69
|
946 private:
|
jpayne@69
|
947 PromiseFulfiller<T>& fulfiller;
|
jpayne@69
|
948 Promise<void> inner;
|
jpayne@69
|
949 };
|
jpayne@69
|
950
|
jpayne@69
|
951 Maybe<AdapterBase&> list;
|
jpayne@69
|
952 };
|
jpayne@69
|
953
|
jpayne@69
|
954 template <>
|
jpayne@69
|
955 class Canceler::AdapterImpl<void>: public AdapterBase {
|
jpayne@69
|
956 public:
|
jpayne@69
|
957 AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
|
jpayne@69
|
958 Canceler& canceler, kj::Promise<void> inner);
|
jpayne@69
|
959 void cancel(kj::Exception&& e) override;
|
jpayne@69
|
960 // These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to
|
jpayne@69
|
961 // link with symbols defined in async.c++ merely because they included async.h.
|
jpayne@69
|
962
|
jpayne@69
|
963 private:
|
jpayne@69
|
964 kj::PromiseFulfiller<void>& fulfiller;
|
jpayne@69
|
965 kj::Promise<void> inner;
|
jpayne@69
|
966 };
|
jpayne@69
|
967
|
jpayne@69
|
968 // =======================================================================================
|
jpayne@69
|
969 // TaskSet
|
jpayne@69
|
970
|
jpayne@69
|
971 class TaskSet: private AsyncObject {
|
jpayne@69
|
972 // Holds a collection of Promise<void>s and ensures that each executes to completion. Memory
|
jpayne@69
|
973 // associated with each promise is automatically freed when the promise completes. Destroying
|
jpayne@69
|
974 // the TaskSet itself automatically cancels all unfinished promises.
|
jpayne@69
|
975 //
|
jpayne@69
|
976 // This is useful for "daemon" objects that perform background tasks which aren't intended to
|
jpayne@69
|
977 // fulfill any particular external promise, but which may need to be canceled (and thus can't
|
jpayne@69
|
978 // use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is
|
jpayne@69
|
979 // working on. This way, if the daemon itself is destroyed, the TaskSet is destroyed as well,
|
jpayne@69
|
980 // and everything the daemon is doing is canceled.
|
jpayne@69
|
981
|
jpayne@69
|
982 public:
|
jpayne@69
|
983 class ErrorHandler {
|
jpayne@69
|
984 public:
|
jpayne@69
|
985 virtual void taskFailed(kj::Exception&& exception) = 0;
|
jpayne@69
|
986 };
|
jpayne@69
|
987
|
jpayne@69
|
988 TaskSet(ErrorHandler& errorHandler, SourceLocation location = {});
|
jpayne@69
|
989 // `errorHandler` will be executed any time a task throws an exception, and will execute within
|
jpayne@69
|
990 // the given EventLoop.
|
jpayne@69
|
991
|
jpayne@69
|
992 ~TaskSet() noexcept(false);
|
jpayne@69
|
993
|
jpayne@69
|
994 void add(Promise<void>&& promise);
|
jpayne@69
|
995
|
jpayne@69
|
996 kj::String trace();
|
jpayne@69
|
997 // Return debug info about all promises currently in the TaskSet.
|
jpayne@69
|
998
|
jpayne@69
|
999 bool isEmpty() { return tasks == nullptr; }
|
jpayne@69
|
1000 // Check if any tasks are running.
|
jpayne@69
|
1001
|
jpayne@69
|
1002 Promise<void> onEmpty();
|
jpayne@69
|
1003 // Returns a promise that fulfills the next time the TaskSet is empty. Only one such promise can
|
jpayne@69
|
1004 // exist at a time.
|
jpayne@69
|
1005
|
jpayne@69
|
1006 void clear();
|
jpayne@69
|
1007 // Cancel all tasks.
|
jpayne@69
|
1008 //
|
jpayne@69
|
1009 // As always, it is not safe to cancel the task that is currently running, so you could not call
|
jpayne@69
|
1010 // this from inside a task in the TaskSet. However, it IS safe to call this from the
|
jpayne@69
|
1011 // `taskFailed()` callback.
|
jpayne@69
|
1012 //
|
jpayne@69
|
1013 // Calling this will always trigger onEmpty(), if anyone is listening.
|
jpayne@69
|
1014
|
jpayne@69
|
1015 private:
|
jpayne@69
|
1016 class Task;
|
jpayne@69
|
1017 using OwnTask = Own<Task, _::PromiseDisposer>;
|
jpayne@69
|
1018
|
jpayne@69
|
1019 TaskSet::ErrorHandler& errorHandler;
|
jpayne@69
|
1020 Maybe<OwnTask> tasks;
|
jpayne@69
|
1021 Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller;
|
jpayne@69
|
1022 SourceLocation location;
|
jpayne@69
|
1023 };
|
jpayne@69
|
1024
|
jpayne@69
|
1025 // =======================================================================================
|
jpayne@69
|
1026 // Cross-thread execution.
|
jpayne@69
|
1027
|
jpayne@69
|
1028 class Executor {
|
jpayne@69
|
1029 // Executes code on another thread's event loop.
|
jpayne@69
|
1030 //
|
jpayne@69
|
1031 // Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current
|
jpayne@69
|
1032 // thread's event loop. You may then pass the reference to other threads to enable them to call
|
jpayne@69
|
1033 // back to this one.
|
jpayne@69
|
1034
|
jpayne@69
|
1035 public:
|
jpayne@69
|
1036 Executor(EventLoop& loop, Badge<EventLoop>);
|
jpayne@69
|
1037 ~Executor() noexcept(false);
|
jpayne@69
|
1038
|
jpayne@69
|
1039 virtual kj::Own<const Executor> addRef() const = 0;
|
jpayne@69
|
1040 // Add a reference to this Executor. The Executor will not be destroyed until all references are
|
jpayne@69
|
1041 // dropped. This uses atomic refcounting for thread-safety.
|
jpayne@69
|
1042 //
|
jpayne@69
|
1043 // Use this when you can't guarantee that the target thread's event loop won't concurrently exit
|
jpayne@69
|
1044 // (including due to an uncaught exception!) while another thread is still using the Executor.
|
jpayne@69
|
1045 // Otherwise, the Executor object is destroyed when the owning event loop exits.
|
jpayne@69
|
1046 //
|
jpayne@69
|
1047 // If the target event loop has exited, then `execute{Async,Sync}` will throw DISCONNECTED
|
jpayne@69
|
1048 // exceptions.
|
jpayne@69
|
1049
|
jpayne@69
|
1050 bool isLive() const;
|
jpayne@69
|
1051 // Returns true if the remote event loop still exists, false if it has been destroyed. In the
|
jpayne@69
|
1052 // latter case, `execute{Async,Sync}()` will definitely throw. Of course, if this returns true,
|
jpayne@69
|
1053 // it could still change to false at any moment, and `execute{Async,Sync}()` could still throw as
|
jpayne@69
|
1054 // a result.
|
jpayne@69
|
1055 //
|
jpayne@69
|
1056 // TODO(cleanup): Should we have tryExecute{Async,Sync}() that return Maybes that are null if
|
jpayne@69
|
1057 // the remote event loop exited? Currently there are multiple known use cases that check
|
jpayne@69
|
1058 // isLive() after catching a DISCONNECTED exception to decide whether it is due to the executor
|
jpayne@69
|
1059 // exiting, and then handling that case. This is borderline in violation of KJ exception
|
jpayne@69
|
1060 // philosophy, but right now I'm not excited about the extra template metaprogramming needed
|
jpayne@69
|
1061 // for "try" versions...
|
jpayne@69
|
1062
|
jpayne@69
|
1063 template <typename Func>
|
jpayne@69
|
1064 PromiseForResult<Func, void> executeAsync(Func&& func, SourceLocation location = {}) const;
|
jpayne@69
|
1065 // Call from any thread to request that the given function be executed on the executor's thread,
|
jpayne@69
|
1066 // returning a promise for the result.
|
jpayne@69
|
1067 //
|
jpayne@69
|
1068 // The Promise returned by executeAsync() belongs to the requesting thread, not the executor
|
jpayne@69
|
1069 // thread. Hence, for example, continuations added to this promise with .then() will execute in
|
jpayne@69
|
1070 // the requesting thread.
|
jpayne@69
|
1071 //
|
jpayne@69
|
1072 // If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting
|
jpayne@69
|
1073 // thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread
|
jpayne@69
|
1074 // awaits the promise. Once it resolves to a final result, that result is transferred to the
|
jpayne@69
|
1075 // requesting thread, resolving the promise that executeAsync() returned earlier.
|
jpayne@69
|
1076 //
|
jpayne@69
|
1077 // `func` will be destroyed in the requesting thread, after the final result has been returned
|
jpayne@69
|
1078 // from the executor thread. This means that it is safe for `func` to capture objects that cannot
|
jpayne@69
|
1079 // safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference,
|
jpayne@69
|
1080 // so long as the functor remains live until the promise completes or is canceled, and the
|
jpayne@69
|
1081 // function is thread-safe.
|
jpayne@69
|
1082 //
|
jpayne@69
|
1083 // Of course, the body of `func` must be careful that any access it makes on these objects is
|
jpayne@69
|
1084 // safe cross-thread. For example, it must not attempt to access Promise-related objects
|
jpayne@69
|
1085 // cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it
|
jpayne@69
|
1086 // from another. Unfortunately, the usual convention of using const-correctness to enforce
|
jpayne@69
|
1087 // thread-safety does not work here, because applications can often ensure that `func` has
|
jpayne@69
|
1088 // exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe
|
jpayne@69
|
1089 // ways; the const qualifier is not sufficient to express this.
|
jpayne@69
|
1090 //
|
jpayne@69
|
1091 // The final return value of `func` is transferred between threads, and hence is constructed and
|
jpayne@69
|
1092 // destroyed in separate threads. It is the app's responsibility to make sure this is OK.
|
jpayne@69
|
1093 // Alternatively, the app can perhaps arrange to send the return value back to the original
|
jpayne@69
|
1094 // thread for destruction, if needed.
|
jpayne@69
|
1095 //
|
jpayne@69
|
1096 // If the requesting thread destroys the returned Promise, the destructor will block waiting for
|
jpayne@69
|
1097 // the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed
|
jpayne@69
|
1098 // before the Promise's destructor returns.
|
jpayne@69
|
1099 //
|
jpayne@69
|
1100 // Multiple calls to executeAsync() from the same requesting thread to the same target thread
|
jpayne@69
|
1101 // will be delivered in the same order in which they were requested. (However, if func() returns
|
jpayne@69
|
1102 // a promise, delivery of subsequent calls is not blocked on that promise. In other words, this
|
jpayne@69
|
1103 // call provides E-Order in the same way as Cap'n Proto.)
|
jpayne@69
|
1104
|
jpayne@69
|
1105 template <typename Func>
|
jpayne@69
|
1106 _::UnwrapPromise<PromiseForResult<Func, void>> executeSync(
|
jpayne@69
|
1107 Func&& func, SourceLocation location = {}) const;
|
jpayne@69
|
1108 // Schedules `func()` to execute on the executor thread, and then blocks the requesting thread
|
jpayne@69
|
1109 // until `func()` completes. If `func()` returns a Promise, then the wait will continue until
|
jpayne@69
|
1110 // that promise resolves, and the final result will be returned to the requesting thread.
|
jpayne@69
|
1111 //
|
jpayne@69
|
1112 // The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that
|
jpayne@69
|
1113 // loop will *not* execute while the thread is blocked. This method is particularly useful to
|
jpayne@69
|
1114 // allow non-event-loop threads to perform I/O via a separate event-loop thread.
|
jpayne@69
|
1115 //
|
jpayne@69
|
1116 // As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the
|
jpayne@69
|
1117 // executor thread has signaled completion. The return value is transferred between threads.
|
jpayne@69
|
1118
|
jpayne@69
|
1119 private:
|
jpayne@69
|
1120 struct Impl;
|
jpayne@69
|
1121 Own<Impl> impl;
|
jpayne@69
|
1122 // To avoid including mutex.h...
|
jpayne@69
|
1123
|
jpayne@69
|
1124 friend class EventLoop;
|
jpayne@69
|
1125 friend class _::XThreadEvent;
|
jpayne@69
|
1126 friend class _::XThreadPaf;
|
jpayne@69
|
1127
|
jpayne@69
|
1128 void send(_::XThreadEvent& event, bool sync) const;
|
jpayne@69
|
1129 void wait();
|
jpayne@69
|
1130 bool poll();
|
jpayne@69
|
1131
|
jpayne@69
|
1132 EventLoop& getLoop() const;
|
jpayne@69
|
1133 };
|
jpayne@69
|
1134
|
jpayne@69
|
1135 const Executor& getCurrentThreadExecutor();
|
jpayne@69
|
1136 // Get the executor for the current thread's event loop. This reference can then be passed to other
|
jpayne@69
|
1137 // threads.
|
jpayne@69
|
1138
|
jpayne@69
|
1139 // =======================================================================================
|
jpayne@69
|
1140 // The EventLoop class
|
jpayne@69
|
1141
|
jpayne@69
|
1142 class EventPort {
|
jpayne@69
|
1143 // Interfaces between an `EventLoop` and events originating from outside of the loop's thread.
|
jpayne@69
|
1144 // All such events come in through the `EventPort` implementation.
|
jpayne@69
|
1145 //
|
jpayne@69
|
1146 // An `EventPort` implementation may interface with low-level operating system APIs and/or other
|
jpayne@69
|
1147 // threads. You can also write an `EventPort` which wraps some other (non-KJ) event loop
|
jpayne@69
|
1148 // framework, allowing the two to coexist in a single thread.
|
jpayne@69
|
1149
|
jpayne@69
|
1150 public:
|
jpayne@69
|
1151 virtual bool wait() = 0;
|
jpayne@69
|
1152 // Wait for an external event to arrive, sleeping if necessary. Once at least one event has
|
jpayne@69
|
1153 // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return.
|
jpayne@69
|
1154 //
|
jpayne@69
|
1155 // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to
|
jpayne@69
|
1156 // wait for new events to populate the queue.
|
jpayne@69
|
1157 //
|
jpayne@69
|
1158 // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in
|
jpayne@69
|
1159 // a loop will eventually sleep. (That is to say, false positives are fine.)
|
jpayne@69
|
1160 //
|
jpayne@69
|
1161 // Returns true if wake() has been called from another thread. (Precisely, returns true if
|
jpayne@69
|
1162 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last
|
jpayne@69
|
1163 // called.)
|
jpayne@69
|
1164
|
jpayne@69
|
1165 virtual bool poll() = 0;
|
jpayne@69
|
1166 // Check if any external events have arrived, but do not sleep. If any events have arrived,
|
jpayne@69
|
1167 // add them to the event queue (e.g. by fulfilling promises) before returning.
|
jpayne@69
|
1168 //
|
jpayne@69
|
1169 // This may be called during `Promise::wait()` when the EventLoop has been executing for a while
|
jpayne@69
|
1170 // without a break but is still non-empty.
|
jpayne@69
|
1171 //
|
jpayne@69
|
1172 // Returns true if wake() has been called from another thread. (Precisely, returns true if
|
jpayne@69
|
1173 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last
|
jpayne@69
|
1174 // called.)
|
jpayne@69
|
1175
|
jpayne@69
|
1176 virtual void setRunnable(bool runnable);
|
jpayne@69
|
1177 // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it
|
jpayne@69
|
1178 // transitions from empty -> runnable or runnable -> empty. This is typically useful when
|
jpayne@69
|
1179 // integrating with an external event loop; if the loop is currently runnable then you should
|
jpayne@69
|
1180 // arrange to call run() on it soon. The default implementation does nothing.
|
jpayne@69
|
1181
|
jpayne@69
|
1182 virtual void wake() const;
|
jpayne@69
|
1183 // Wake up the EventPort's thread from another thread.
|
jpayne@69
|
1184 //
|
jpayne@69
|
1185 // Unlike all other methods on this interface, `wake()` may be called from another thread, hence
|
jpayne@69
|
1186 // it is `const`.
|
jpayne@69
|
1187 //
|
jpayne@69
|
1188 // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep
|
jpayne@69
|
1189 // again until `wait()` or `poll()` has returned true at least once.
|
jpayne@69
|
1190 //
|
jpayne@69
|
1191 // The default implementation throws an UNIMPLEMENTED exception.
|
jpayne@69
|
1192 };
|
jpayne@69
|
1193
|
jpayne@69
|
1194 class EventLoop {
|
jpayne@69
|
1195 // Represents a queue of events being executed in a loop. Most code won't interact with
|
jpayne@69
|
1196 // EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
|
jpayne@69
|
1197 // documentation for `Promise`.
|
jpayne@69
|
1198 //
|
jpayne@69
|
1199 // Each thread can have at most one current EventLoop. To make an `EventLoop` current for
|
jpayne@69
|
1200 // the thread, create a `WaitScope`. Async APIs require that the thread has a current EventLoop,
|
jpayne@69
|
1201 // or they will throw exceptions. APIs that use `Promise::wait()` additionally must explicitly
|
jpayne@69
|
1202 // be passed a reference to the `WaitScope` to make the caller aware that they might block.
|
jpayne@69
|
1203 //
|
jpayne@69
|
1204 // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g.
|
jpayne@69
|
1205 // in the main() function, or in the start function of a thread. You can then use it to
|
jpayne@69
|
1206 // construct some promises and wait on the result. Example:
|
jpayne@69
|
1207 //
|
jpayne@69
|
1208 // int main() {
|
jpayne@69
|
1209 // // `loop` becomes the official EventLoop for the thread.
|
jpayne@69
|
1210 // MyEventPort eventPort;
|
jpayne@69
|
1211 // EventLoop loop(eventPort);
|
jpayne@69
|
1212 //
|
jpayne@69
|
1213 // // Now we can call an async function.
|
jpayne@69
|
1214 // Promise<String> textPromise = getHttp("http://example.com");
|
jpayne@69
|
1215 //
|
jpayne@69
|
1216 // // And we can wait for the promise to complete. Note that you can only use `wait()`
|
jpayne@69
|
1217 // // from the top level, not from inside a promise callback.
|
jpayne@69
|
1218 // String text = textPromise.wait();
|
jpayne@69
|
1219 // print(text);
|
jpayne@69
|
1220 // return 0;
|
jpayne@69
|
1221 // }
|
jpayne@69
|
1222 //
|
jpayne@69
|
1223 // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather
|
jpayne@69
|
1224 // than allocate an `EventLoop` directly.
|
jpayne@69
|
1225
|
jpayne@69
|
1226 public:
|
jpayne@69
|
1227 EventLoop();
|
jpayne@69
|
1228 // Construct an `EventLoop` which does not receive external events at all.
|
jpayne@69
|
1229
|
jpayne@69
|
1230 explicit EventLoop(EventPort& port);
|
jpayne@69
|
1231 // Construct an `EventLoop` which receives external events through the given `EventPort`.
|
jpayne@69
|
1232
|
jpayne@69
|
1233 ~EventLoop() noexcept(false);
|
jpayne@69
|
1234
|
jpayne@69
|
1235 void run(uint maxTurnCount = maxValue);
|
jpayne@69
|
1236 // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done,
|
jpayne@69
|
1237 // whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will
|
jpayne@69
|
1238 // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty.
|
jpayne@69
|
1239
|
jpayne@69
|
1240 bool isRunnable();
|
jpayne@69
|
1241 // Returns true if run() would currently do anything, or false if the queue is empty.
|
jpayne@69
|
1242
|
jpayne@69
|
1243 const Executor& getExecutor();
|
jpayne@69
|
1244 // Returns an Executor that can be used to schedule events on this EventLoop from another thread.
|
jpayne@69
|
1245 //
|
jpayne@69
|
1246 // Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's
|
jpayne@69
|
1247 // Executor.
|
jpayne@69
|
1248 //
|
jpayne@69
|
1249 // Note that this is only needed for cross-thread scheduling. To schedule code to run later in
|
jpayne@69
|
1250 // the current thread, use `kj::evalLater()`, which will be more efficient.
|
jpayne@69
|
1251
|
jpayne@69
|
1252 private:
|
jpayne@69
|
1253 kj::Maybe<EventPort&> port;
|
jpayne@69
|
1254 // If null, this thread doesn't receive I/O events from the OS. It can potentially receive
|
jpayne@69
|
1255 // events from other threads via the Executor.
|
jpayne@69
|
1256
|
jpayne@69
|
1257 bool running = false;
|
jpayne@69
|
1258 // True while looping -- wait() is then not allowed.
|
jpayne@69
|
1259
|
jpayne@69
|
1260 bool lastRunnableState = false;
|
jpayne@69
|
1261 // What did we last pass to port.setRunnable()?
|
jpayne@69
|
1262
|
jpayne@69
|
1263 _::Event* head = nullptr;
|
jpayne@69
|
1264 _::Event** tail = &head;
|
jpayne@69
|
1265 _::Event** depthFirstInsertPoint = &head;
|
jpayne@69
|
1266 _::Event** breadthFirstInsertPoint = &head;
|
jpayne@69
|
1267
|
jpayne@69
|
1268 kj::Maybe<Own<Executor>> executor;
|
jpayne@69
|
1269 // Allocated the first time getExecutor() is requested, making cross-thread request possible.
|
jpayne@69
|
1270
|
jpayne@69
|
1271 Own<TaskSet> daemons;
|
jpayne@69
|
1272
|
jpayne@69
|
1273 _::Event* currentlyFiring = nullptr;
|
jpayne@69
|
1274
|
jpayne@69
|
1275 bool turn();
|
jpayne@69
|
1276 void setRunnable(bool runnable);
|
jpayne@69
|
1277 void enterScope();
|
jpayne@69
|
1278 void leaveScope();
|
jpayne@69
|
1279
|
jpayne@69
|
1280 void wait();
|
jpayne@69
|
1281 void poll();
|
jpayne@69
|
1282
|
jpayne@69
|
1283 friend void _::detach(kj::Promise<void>&& promise);
|
jpayne@69
|
1284 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result,
|
jpayne@69
|
1285 WaitScope& waitScope, SourceLocation location);
|
jpayne@69
|
1286 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location);
|
jpayne@69
|
1287 friend class _::Event;
|
jpayne@69
|
1288 friend class WaitScope;
|
jpayne@69
|
1289 friend class Executor;
|
jpayne@69
|
1290 friend class _::XThreadEvent;
|
jpayne@69
|
1291 friend class _::XThreadPaf;
|
jpayne@69
|
1292 friend class _::FiberBase;
|
jpayne@69
|
1293 friend class _::FiberStack;
|
jpayne@69
|
1294 friend ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space);
|
jpayne@69
|
1295 };
|
jpayne@69
|
1296
|
jpayne@69
|
1297 class WaitScope {
|
jpayne@69
|
1298 // Represents a scope in which asynchronous programming can occur. A `WaitScope` should usually
|
jpayne@69
|
1299 // be allocated on the stack and serves two purposes:
|
jpayne@69
|
1300 // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the
|
jpayne@69
|
1301 // thread. Most operations dealing with `Promise` (including all of its methods) do not work
|
jpayne@69
|
1302 // unless the thread has a current `EventLoop`.
|
jpayne@69
|
1303 // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular
|
jpayne@69
|
1304 // promise to complete. See `Promise::wait()` for an extended discussion.
|
jpayne@69
|
1305
|
jpayne@69
|
1306 public:
|
jpayne@69
|
1307 inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); }
|
jpayne@69
|
1308 inline ~WaitScope() { if (fiber == nullptr) loop.leaveScope(); }
|
jpayne@69
|
1309 KJ_DISALLOW_COPY_AND_MOVE(WaitScope);
|
jpayne@69
|
1310
|
jpayne@69
|
1311 uint poll(uint maxTurnCount = maxValue);
|
jpayne@69
|
1312 // Pumps the event queue and polls for I/O until there's nothing left to do (without blocking) or
|
jpayne@69
|
1313 // the maximum turn count has been reached. Returns the number of events popped off the event
|
jpayne@69
|
1314 // queue.
|
jpayne@69
|
1315 //
|
jpayne@69
|
1316 // Not supported in fibers.
|
jpayne@69
|
1317
|
jpayne@69
|
1318 void setBusyPollInterval(uint count) { busyPollInterval = count; }
|
jpayne@69
|
1319 // Set the maximum number of events to run in a row before calling poll() on the EventPort to
|
jpayne@69
|
1320 // check for new I/O.
|
jpayne@69
|
1321 //
|
jpayne@69
|
1322 // This has no effect when used in a fiber.
|
jpayne@69
|
1323
|
jpayne@69
|
1324 void runEventCallbacksOnStackPool(kj::Maybe<const FiberPool&> pool) { runningStacksPool = pool; }
|
jpayne@69
|
1325 // Arranges to switch stacks while event callbacks are executing. This is an optimization that
|
jpayne@69
|
1326 // is useful for programs that use extremely high thread counts, where each thread has its own
|
jpayne@69
|
1327 // event loop, but each thread has relatively low event throughput, i.e. each thread spends
|
jpayne@69
|
1328 // most of its time waiting for I/O. Normally, the biggest problem with having lots of threads
|
jpayne@69
|
1329 // is that each thread must allocate a stack, and stacks can take a lot of memory if the
|
jpayne@69
|
1330 // application commonly makes deep calls. But, most of that stack space is only needed while
|
jpayne@69
|
1331 // the thread is executing, not while it's sleeping. So, if threads only switch to a big stack
|
jpayne@69
|
1332 // during execution, switching back when it's time to sleep, and if those stacks are freelisted
|
jpayne@69
|
1333 // so that they can be shared among threads, then a lot of memory is saved.
|
jpayne@69
|
1334 //
|
jpayne@69
|
1335 // We use the `FiberPool` type here because it implements a freelist of stacks, which is exactly
|
jpayne@69
|
1336 // what we happen to want! In our case, though, we don't use those stacks to implement fibers;
|
jpayne@69
|
1337 // we use them as the main thread stack.
|
jpayne@69
|
1338 //
|
jpayne@69
|
1339 // This has no effect if this WaitScope itself is for a fiber.
|
jpayne@69
|
1340 //
|
jpayne@69
|
1341 // Pass `nullptr` as the parameter to go back to running events on the main stack.
|
jpayne@69
|
1342
|
jpayne@69
|
1343 void cancelAllDetached();
|
jpayne@69
|
1344 // HACK: Immediately cancel all detached promises.
|
jpayne@69
|
1345 //
|
jpayne@69
|
1346 // New code should not use detached promises, and therefore should not need this.
|
jpayne@69
|
1347 //
|
jpayne@69
|
1348 // This method exists to help existing code deal with the problems of detached promises,
|
jpayne@69
|
1349 // especially at teardown time.
|
jpayne@69
|
1350 //
|
jpayne@69
|
1351 // This method may be removed in the future.
|
jpayne@69
|
1352
|
jpayne@69
|
1353 private:
|
jpayne@69
|
1354 EventLoop& loop;
|
jpayne@69
|
1355 uint busyPollInterval = kj::maxValue;
|
jpayne@69
|
1356
|
jpayne@69
|
1357 kj::Maybe<_::FiberBase&> fiber;
|
jpayne@69
|
1358 kj::Maybe<const FiberPool&> runningStacksPool;
|
jpayne@69
|
1359
|
jpayne@69
|
1360 explicit WaitScope(EventLoop& loop, _::FiberBase& fiber)
|
jpayne@69
|
1361 : loop(loop), fiber(fiber) {}
|
jpayne@69
|
1362
|
jpayne@69
|
1363 template <typename Func>
|
jpayne@69
|
1364 inline void runOnStackPool(Func&& func) {
|
jpayne@69
|
1365 KJ_IF_MAYBE(pool, runningStacksPool) {
|
jpayne@69
|
1366 pool->runSynchronously(kj::fwd<Func>(func));
|
jpayne@69
|
1367 } else {
|
jpayne@69
|
1368 func();
|
jpayne@69
|
1369 }
|
jpayne@69
|
1370 }
|
jpayne@69
|
1371
|
jpayne@69
|
1372 friend class EventLoop;
|
jpayne@69
|
1373 friend class _::FiberBase;
|
jpayne@69
|
1374 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result,
|
jpayne@69
|
1375 WaitScope& waitScope, SourceLocation location);
|
jpayne@69
|
1376 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location);
|
jpayne@69
|
1377 };
|
jpayne@69
|
1378
|
jpayne@69
|
1379 } // namespace kj
|
jpayne@69
|
1380
|
jpayne@69
|
1381 #define KJ_ASYNC_H_INCLUDED
|
jpayne@69
|
1382 #include "async-inl.h"
|
jpayne@69
|
1383
|
jpayne@69
|
1384 KJ_END_HEADER
|