Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async.h @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
2 // Licensed under the MIT License: | |
3 // | |
4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 // of this software and associated documentation files (the "Software"), to deal | |
6 // in the Software without restriction, including without limitation the rights | |
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 // copies of the Software, and to permit persons to whom the Software is | |
9 // furnished to do so, subject to the following conditions: | |
10 // | |
11 // The above copyright notice and this permission notice shall be included in | |
12 // all copies or substantial portions of the Software. | |
13 // | |
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 // THE SOFTWARE. | |
21 | |
22 #pragma once | |
23 | |
24 #include "async-prelude.h" | |
25 #include <kj/exception.h> | |
26 #include <kj/refcount.h> | |
27 | |
28 KJ_BEGIN_HEADER | |
29 | |
30 #ifndef KJ_USE_FIBERS | |
31 #if __BIONIC__ || __FreeBSD__ || __OpenBSD__ || KJ_NO_EXCEPTIONS | |
32 // These platforms don't support fibers. | |
33 #define KJ_USE_FIBERS 0 | |
34 #else | |
35 #define KJ_USE_FIBERS 1 | |
36 #endif | |
37 #else | |
38 #if KJ_NO_EXCEPTIONS && KJ_USE_FIBERS | |
39 #error "Fibers cannot be enabled when exceptions are disabled." | |
40 #endif | |
41 #endif | |
42 | |
43 namespace kj { | |
44 | |
45 class EventLoop; | |
46 class WaitScope; | |
47 | |
48 template <typename T> | |
49 class Promise; | |
50 template <typename T> | |
51 class ForkedPromise; | |
52 template <typename T> | |
53 class PromiseFulfiller; | |
54 template <typename T> | |
55 struct PromiseFulfillerPair; | |
56 | |
57 template <typename Func> | |
58 class FunctionParam; | |
59 | |
60 template <typename Func, typename T> | |
61 using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>; | |
62 // Evaluates to the type of Promise for the result of calling functor type Func with parameter type | |
63 // T. If T is void, then the promise is for the result of calling Func with no arguments. If | |
64 // Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>. | |
65 | |
66 // ======================================================================================= | |
67 | |
68 class AsyncObject { | |
69 // You may optionally inherit privately from this to indicate that the type is a KJ async object, | |
70 // meaning it deals with KJ async I/O making it tied to a specific thread and event loop. This | |
71 // enables some additional debug checks, but does not otherwise have any effect on behavior as | |
72 // long as there are no bugs. | |
73 // | |
74 // (We prefer inheritance rather than composition here because inheriting an empty type adds zero | |
75 // size to the derived class.) | |
76 | |
77 public: | |
78 ~AsyncObject(); | |
79 | |
80 private: | |
81 KJ_NORETURN(static void failed() noexcept); | |
82 }; | |
83 | |
84 class DisallowAsyncDestructorsScope { | |
85 // Create this type on the stack in order to specify that during its scope, no KJ async objects | |
86 // should be destroyed. If AsyncObject's destructor is called in this scope, the process will | |
87 // crash with std::terminate(). | |
88 // | |
89 // This is useful as a sort of "sanitizer" to catch bugs. When tearing down an object that is | |
90 // intended to be passed between threads, you can set up one of these scopes to catch whether | |
91 // the object contains any async objects, which are not legal to pass across threads. | |
92 | |
93 public: | |
94 explicit DisallowAsyncDestructorsScope(kj::StringPtr reason); | |
95 ~DisallowAsyncDestructorsScope(); | |
96 KJ_DISALLOW_COPY_AND_MOVE(DisallowAsyncDestructorsScope); | |
97 | |
98 private: | |
99 kj::StringPtr reason; | |
100 DisallowAsyncDestructorsScope* previousValue; | |
101 | |
102 friend class AsyncObject; | |
103 }; | |
104 | |
105 class AllowAsyncDestructorsScope { | |
106 // Negates the effect of DisallowAsyncDestructorsScope. | |
107 | |
108 public: | |
109 AllowAsyncDestructorsScope(); | |
110 ~AllowAsyncDestructorsScope(); | |
111 KJ_DISALLOW_COPY_AND_MOVE(AllowAsyncDestructorsScope); | |
112 | |
113 private: | |
114 DisallowAsyncDestructorsScope* previousValue; | |
115 }; | |
116 | |
117 // ======================================================================================= | |
118 // Promises | |
119 | |
120 template <typename T> | |
121 class Promise: protected _::PromiseBase { | |
122 // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed | |
123 // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A. | |
124 // | |
125 // A Promise represents a promise to produce a value of type T some time in the future. Once | |
126 // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be | |
127 // "broken", with an Exception describing what went wrong. You may implicitly convert a value of | |
128 // type T to an already-fulfilled Promise<T>. You may implicitly convert the constant | |
129 // `kj::READY_NOW` to an already-fulfilled Promise<void>. You may also implicitly convert a | |
130 // `kj::Exception` to an already-broken promise of any type. | |
131 // | |
132 // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed | |
133 // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations | |
134 // meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless | |
135 // otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they | |
136 // should be rvalue-qualified, but at the time this interface was created compilers didn't widely | |
137 // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If | |
138 // you want to use one Promise in two different places, you must fork it with `fork()`. | |
139 // | |
140 // To use the result of a Promise, you must call `then()` and supply a callback function to | |
141 // call with the result. `then()` returns another promise, for the result of the callback. | |
142 // Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a | |
143 // simple Promise<T> that first waits for the outer promise, then the inner. Example: | |
144 // | |
145 // // Open a remote file, read the content, and then count the | |
146 // // number of lines of text. | |
147 // // Note that none of the calls here block. `file`, `content` | |
148 // // and `lineCount` are all initialized immediately before any | |
149 // // asynchronous operations occur. The lambda callbacks are | |
150 // // called later. | |
151 // Promise<Own<File>> file = openFtp("ftp://host/foo/bar"); | |
152 // Promise<String> content = file.then( | |
153 // [](Own<File> file) -> Promise<String> { | |
154 // return file.readAll(); | |
155 // }); | |
156 // Promise<int> lineCount = content.then( | |
157 // [](String text) -> int { | |
158 // uint count = 0; | |
159 // for (char c: text) count += (c == '\n'); | |
160 // return count; | |
161 // }); | |
162 // | |
163 // For `then()` to work, the current thread must have an active `EventLoop`. Each callback | |
164 // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current | |
165 // thread's event loop, you do not need to worry about two callbacks running at the same time. | |
166 // You will need to set up at least one `EventLoop` at the top level of your program before you | |
167 // can use promises. | |
168 // | |
169 // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`. | |
170 // | |
171 // Systems using promises should consider supporting the concept of "pipelining". Pipelining | |
172 // means allowing a caller to start issuing method calls against a promised object before the | |
173 // promise has actually been fulfilled. This is particularly useful if the promise is for a | |
174 // remote object living across a network, as this can avoid round trips when chaining a series | |
175 // of calls. It is suggested that any class T which supports pipelining implement a subclass of | |
176 // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please | |
177 // invoke the corresponding method on the promised value once it is available". These methods | |
178 // should in turn return promises for the eventual results of said invocations. Cap'n Proto, | |
179 // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see | |
180 // `capnp/capability.h`. | |
181 // | |
182 // KJ Promises are based on E promises: | |
183 // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises | |
184 // | |
185 // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript | |
186 // promises, which are themselves influenced by E promises: | |
187 // http://promisesaplus.com/ | |
188 // https://github.com/domenic/promises-unwrapping | |
189 | |
190 public: | |
191 Promise(_::FixVoid<T> value); | |
192 // Construct an already-fulfilled Promise from a value of type T. For non-void promises, the | |
193 // parameter type is simply T. So, e.g., in a function that returns `Promise<int>`, you can | |
194 // say `return 123;` to return a promise that is already fulfilled to 123. | |
195 // | |
196 // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`. | |
197 | |
198 Promise(kj::Exception&& e); | |
199 // Construct an already-broken Promise. | |
200 | |
201 inline Promise(decltype(nullptr)) {} | |
202 | |
203 template <typename Func, typename ErrorFunc = _::PropagateException> | |
204 PromiseForResult<Func, T> then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException(), | |
205 SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; | |
206 // Register a continuation function to be executed when the promise completes. The continuation | |
207 // (`func`) takes the promised value (an rvalue of type `T`) as its parameter. The continuation | |
208 // may return a new value; `then()` itself returns a promise for the continuation's eventual | |
209 // result. If the continuation itself returns a `Promise<U>`, then `then()` shall also return | |
210 // a `Promise<U>` which first waits for the original promise, then executes the continuation, | |
211 // then waits for the inner promise (i.e. it automatically "unwraps" the promise). | |
212 // | |
213 // In all cases, `then()` returns immediately. The continuation is executed later. The | |
214 // continuation is always executed on the same EventLoop (and, therefore, the same thread) which | |
215 // called `then()`, therefore no synchronization is necessary on state shared by the continuation | |
216 // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws | |
217 // an exception. | |
218 // | |
219 // You may also specify an error handler continuation as the second parameter. `errorHandler` | |
220 // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same | |
221 // type as `func` returns (except when `func` returns `Promise<U>`, in which case `errorHandler` | |
222 // may return either `Promise<U>` or just `U`). The default error handler simply propagates the | |
223 // exception to the returned promise. | |
224 // | |
225 // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise | |
226 // is broken. When compiled with -fno-exceptions, the framework will still detect when a | |
227 // recoverable exception was thrown inside of a continuation and will consider the promise | |
228 // broken even though a (presumably garbage) result was returned. | |
229 // | |
230 // If the returned promise is destroyed before the callback runs, the callback will be canceled | |
231 // (it will never run). | |
232 // | |
233 // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is | |
234 // called, in the sense of move semantics. After returning, the original promise is no longer | |
235 // valid, but `then()` returns a new promise. | |
236 // | |
237 // *Advanced implementation tips:* Most users will never need to worry about the below, but | |
238 // it is good to be aware of. | |
239 // | |
240 // As an optimization, if the callback function `func` does _not_ return another promise, then | |
241 // execution of `func` itself may be delayed until its result is known to be needed. The | |
242 // expectation here is that `func` is just doing some transformation on the results, not | |
243 // scheduling any other actions, therefore the system doesn't need to be proactive about | |
244 // evaluating it. This way, a chain of trivial then() transformations can be executed all at | |
245 // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()` | |
246 // method to suppress this behavior. | |
247 // | |
248 // On the other hand, if `func` _does_ return another promise, then the system evaluates `func` | |
249 // as soon as possible, because the promise it returns might be for a newly-scheduled | |
250 // long-running asynchronous task. | |
251 // | |
252 // As another optimization, when a callback function registered with `then()` is actually | |
253 // scheduled, it is scheduled to occur immediately, preempting other work in the event queue. | |
254 // This allows a long chain of `then`s to execute all at once, improving cache locality by | |
255 // clustering operations on the same data. However, this implies that starvation can occur | |
256 // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for | |
257 // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events | |
258 // in the queue will get a chance to run before your callback is executed. | |
259 | |
260 Promise<void> ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); } | |
261 // Convenience method to convert the promise to a void promise by ignoring the return value. | |
262 // | |
263 // You must still wait on the returned promise if you want the task to execute. | |
264 | |
265 template <typename ErrorFunc> | |
266 Promise<T> catch_(ErrorFunc&& errorHandler, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; | |
267 // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that | |
268 // just returns its input. | |
269 | |
270 T wait(WaitScope& waitScope, SourceLocation location = {}); | |
271 // Run the event loop until the promise is fulfilled, then return its result. If the promise | |
272 // is rejected, throw an exception. | |
273 // | |
274 // wait() is primarily useful at the top level of a program -- typically, within the function | |
275 // that allocated the EventLoop. For example, a program that performs one or two RPCs and then | |
276 // exits would likely use wait() in its main() function to wait on each RPC. On the other hand, | |
277 // server-side code generally cannot use wait(), because it has to be able to accept multiple | |
278 // requests at once. | |
279 // | |
280 // If the promise is rejected, `wait()` throws an exception. If the program was compiled without | |
281 // exceptions (-fno-exceptions), this will usually abort. In this case you really should first | |
282 // use `then()` to set an appropriate handler for the exception case, so that the promise you | |
283 // actually wait on never throws. | |
284 // | |
285 // `waitScope` is an object proving that the caller is in a scope where wait() is allowed. By | |
286 // convention, any function which might call wait(), or which might call another function which | |
287 // might call wait(), must take `WaitScope&` as one of its parameters. This is needed for two | |
288 // reasons: | |
289 // * `wait()` is not allowed during an event callback, because event callbacks are themselves | |
290 // called during some other `wait()`, and such recursive `wait()`s would only be able to | |
291 // complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer | |
292 // than it is supposed to. To prevent this, a `WaitScope` cannot be constructed or used during | |
293 // an event callback. | |
294 // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()` | |
295 // returns. This means that anyone calling `wait()` must be reentrant -- state may change | |
296 // around them in arbitrary ways. Therefore, callers really need to know if a function they | |
297 // are calling might wait(), and the `WaitScope&` parameter makes this clear. | |
298 // | |
299 // Usually, there is only one `WaitScope` for each `EventLoop`, and it can only be used at the | |
300 // top level of the thread owning the loop. Calling `wait()` with this `WaitScope` is what | |
301 // actually causes the event loop to run at all. This top-level `WaitScope` cannot be used | |
302 // recursively, so cannot be used within an event callback. | |
303 // | |
304 // However, it is possible to obtain a `WaitScope` in lower-level code by using fibers. Use | |
305 // kj::startFiber() to start some code executing on an alternate call stack. That code will get | |
306 // its own `WaitScope` allowing it to operate in a synchronous style. In this case, `wait()` | |
307 // switches back to the main stack in order to run the event loop, returning to the fiber's stack | |
308 // once the awaited promise resolves. | |
309 | |
310 bool poll(WaitScope& waitScope, SourceLocation location = {}); | |
311 // Returns true if a call to wait() would complete without blocking, false if it would block. | |
312 // | |
313 // If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an | |
314 // attempt to resolve it. Only when there is nothing left to do will it return false. | |
315 // | |
316 // Generally, poll() is most useful in tests. Often, you may want to verify that a promise does | |
317 // not resolve until some specific event occurs. To do so, poll() the promise before the event to | |
318 // verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves. | |
319 // The first poll() verifies that the promise doesn't resolve early, which would otherwise be | |
320 // hard to do deterministically. The second poll() allows you to check that the promise has | |
321 // resolved and avoid a wait() that might deadlock in the case that it hasn't. | |
322 // | |
323 // poll() is not supported in fibers; it will throw an exception. | |
324 | |
325 ForkedPromise<T> fork(SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; | |
326 // Forks the promise, so that multiple different clients can independently wait on the result. | |
327 // `T` must be copy-constructable for this to work. Or, in the special case where `T` is | |
328 // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same | |
329 // (or an equivalent) object (probably implemented via reference counting). | |
330 | |
331 _::SplitTuplePromise<T> split(SourceLocation location = {}); | |
332 // Split a promise for a tuple into a tuple of promises. | |
333 // | |
334 // E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns | |
335 // `kj::Tuple<Promise<T>, Promise<U>>`. | |
336 | |
337 Promise<T> exclusiveJoin(Promise<T>&& other, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; | |
338 // Return a new promise that resolves when either the original promise resolves or `other` | |
339 // resolves (whichever comes first). The promise that didn't resolve first is canceled. | |
340 | |
341 // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions | |
342 // and produces a tuple? | |
343 | |
344 template <typename... Attachments> | |
345 Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT; | |
346 // "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will | |
347 // be destroyed when the promise resolves. This is useful when a promise's callback contains | |
348 // pointers into some object and you want to make sure the object still exists when the callback | |
349 // runs -- after calling then(), use attach() to add necessary objects to the result. | |
350 | |
351 template <typename ErrorFunc> | |
352 Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location = {}) | |
353 KJ_WARN_UNUSED_RESULT; | |
354 Promise<T> eagerlyEvaluate(decltype(nullptr), SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; | |
355 // Force eager evaluation of this promise. Use this if you are going to hold on to the promise | |
356 // for awhile without consuming the result, but you want to make sure that the system actually | |
357 // processes it. | |
358 // | |
359 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to | |
360 // `then()`, or the parameter to `catch_()`. We make you specify this because otherwise it's | |
361 // easy to forget to handle errors in a promise that you never use. You may specify nullptr for | |
362 // the error handler if you are sure that ignoring errors is fine, or if you know that you'll | |
363 // eventually wait on the promise somewhere. | |
364 | |
365 template <typename ErrorFunc> | |
366 void detach(ErrorFunc&& errorHandler); | |
367 // Allows the promise to continue running in the background until it completes or the | |
368 // `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this | |
369 // promise, you need to make sure that the promise owns all the objects it touches or make sure | |
370 // those objects outlive the EventLoop. | |
371 // | |
372 // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to | |
373 // `then()`, except that it must return void. | |
374 // | |
375 // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be | |
376 // canceled unless the callee explicitly permits it. | |
377 | |
378 kj::String trace(); | |
379 // Returns a dump of debug info about this promise. Not for production use. Requires RTTI. | |
380 // This method does NOT consume the promise as other methods do. | |
381 | |
382 private: | |
383 Promise(bool, _::OwnPromiseNode&& node): PromiseBase(kj::mv(node)) {} | |
384 // Second parameter prevent ambiguity with immediate-value constructor. | |
385 | |
386 friend class _::PromiseNode; | |
387 }; | |
388 | |
389 template <typename T> | |
390 class ForkedPromise { | |
391 // The result of `Promise::fork()` and `EventLoop::fork()`. Allows branches to be created. | |
392 // Like `Promise<T>`, this is a pass-by-move type. | |
393 | |
394 public: | |
395 inline ForkedPromise(decltype(nullptr)) {} | |
396 | |
397 Promise<T> addBranch(); | |
398 // Add a new branch to the fork. The branch is equivalent to the original promise. | |
399 | |
400 bool hasBranches(); | |
401 // Returns true if there are any branches that haven't been canceled. | |
402 | |
403 private: | |
404 Own<_::ForkHub<_::FixVoid<T>>> hub; | |
405 | |
406 inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid<T>>>&& hub): hub(kj::mv(hub)) {} | |
407 | |
408 friend class Promise<T>; | |
409 friend class EventLoop; | |
410 }; | |
411 | |
412 constexpr _::ReadyNow READY_NOW = _::ReadyNow(); | |
413 // Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly | |
414 // cast to `Promise<void>`. | |
415 | |
416 constexpr _::NeverDone NEVER_DONE = _::NeverDone(); | |
417 // The opposite of `READY_NOW`, return this when the promise should never resolve. This can be | |
418 // implicitly converted to any promise type. You may also call `NEVER_DONE.wait()` to wait | |
419 // forever (useful for servers). | |
420 | |
421 template <typename T, T value> | |
422 Promise<T> constPromise(); | |
423 // Construct a Promise which resolves to the given constant value. This function is equivalent to | |
424 // `Promise<T>(value)` except that it avoids an allocation. | |
425 | |
426 template <typename Func> | |
427 PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT; | |
428 // Schedule for the given zero-parameter function to be executed in the event loop at some | |
429 // point in the near future. Returns a Promise for its result -- or, if `func()` itself returns | |
430 // a promise, `evalLater()` returns a Promise for the result of resolving that promise. | |
431 // | |
432 // Example usage: | |
433 // Promise<int> x = evalLater([]() { return 123; }); | |
434 // | |
435 // The above is exactly equivalent to: | |
436 // Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; }); | |
437 // | |
438 // If the returned promise is destroyed before the callback runs, the callback will be canceled | |
439 // (never called). | |
440 // | |
441 // If you schedule several evaluations with `evalLater` during the same callback, they are | |
442 // guaranteed to be executed in order. | |
443 | |
444 template <typename Func> | |
445 PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT; | |
446 // Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns. | |
447 // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the | |
448 // main reason why `evalNow()` is useful. | |
449 | |
450 template <typename Func> | |
451 PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT; | |
452 // Like `evalLater()`, except that the function doesn't run until the event queue is otherwise | |
453 // completely empty and the thread is about to suspend waiting for I/O. | |
454 // | |
455 // This is useful when you need to perform some disruptive action and you want to make sure that | |
456 // you don't interrupt some other task between two .then() continuations. For example, say you want | |
457 // to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw | |
458 // them. It could be that a read() has completed and bytes have been transferred to the target | |
459 // buffer, but the .then() callback that handles the read result hasn't executed yet. If you | |
460 // cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do | |
461 // evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out | |
462 // and if you didn't receive the read result yet, then you know nothing has been read, and you can | |
463 // simply drop the promise. | |
464 // | |
465 // If evalLast() is called multiple times, functions are executed in LIFO order. If the first | |
466 // callback enqueues new events, then latter callbacks will not execute until those events are | |
467 // drained. | |
468 | |
469 ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space); | |
470 kj::String getAsyncTrace(); | |
471 // If the event loop is currently running in this thread, get a trace back through the promise | |
472 // chain leading to the currently-executing event. The format is the same as kj::getStackTrace() | |
473 // from exception.c++. | |
474 | |
475 template <typename Func> | |
476 PromiseForResult<Func, void> retryOnDisconnect(Func&& func) KJ_WARN_UNUSED_RESULT; | |
477 // Promises to run `func()` asynchronously, retrying once if it fails with a DISCONNECTED exception. | |
478 // If the retry also fails, the exception is passed through. | |
479 // | |
480 // `func()` should return a `Promise`. `retryOnDisconnect(func)` returns the same promise, except | |
481 // with the retry logic added. | |
482 | |
483 template <typename Func> | |
484 PromiseForResult<Func, WaitScope&> startFiber( | |
485 size_t stackSize, Func&& func, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; | |
486 // Executes `func()` in a fiber, returning a promise for the eventual reseult. `func()` will be | |
487 // passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on promises. Thus, `func()` | |
488 // can be written in a synchronous, blocking style, instead of using `.then()`. This is often much | |
489 // easier to write and read, and may even be significantly faster if it allows the use of stack | |
490 // allocation rather than heap allocation. | |
491 // | |
492 // However, fibers have a major disadvantage: memory must be allocated for the fiber's call stack. | |
493 // The entire stack must be allocated at once, making it necessary to choose a stack size upfront | |
494 // that is big enough for whatever the fiber needs to do. Estimating this is often difficult. That | |
495 // said, over-estimating is not too terrible since pages of the stack will actually be allocated | |
496 // lazily when first accessed; actual memory usage will correspond to the "high watermark" of the | |
497 // actual stack usage. That said, this lazy allocation forces page faults, which can be quite slow. | |
498 // Worse, freeing a stack forces a TLB flush and shootdown -- all currently-executing threads will | |
499 // have to be interrupted to flush their CPU cores' TLB caches. | |
500 // | |
501 // In short, when performance matters, you should try to avoid creating fibers very frequently. | |
502 | |
503 class FiberPool final { | |
504 // A freelist pool of fibers with a set stack size. This improves CPU usage with fibers at | |
505 // the expense of memory usage. Fibers in this pool will always use the max amount of memory | |
506 // used until the pool is destroyed. | |
507 | |
508 public: | |
509 explicit FiberPool(size_t stackSize); | |
510 ~FiberPool() noexcept(false); | |
511 KJ_DISALLOW_COPY_AND_MOVE(FiberPool); | |
512 | |
513 void setMaxFreelist(size_t count); | |
514 // Set the maximum number of stacks to add to the freelist. If the freelist is full, stacks will | |
515 // be deleted rather than returned to the freelist. | |
516 | |
517 void useCoreLocalFreelists(); | |
518 // EXPERIMENTAL: Call to tell FiberPool to try to use core-local stack freelists, which | |
519 // in theory should increase L1/L2 cache efficacy for freelisted stacks. In practice, as of | |
520 // this writing, no performance advantage has yet been demonstrated. Note that currently this | |
521 // feature is only supported on Linux (the flag has no effect on other operating systems). | |
522 | |
523 template <typename Func> | |
524 PromiseForResult<Func, WaitScope&> startFiber( | |
525 Func&& func, SourceLocation location = {}) const KJ_WARN_UNUSED_RESULT; | |
526 // Executes `func()` in a fiber from this pool, returning a promise for the eventual result. | |
527 // `func()` will be passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on | |
528 // promises. Thus, `func()` can be written in a synchronous, blocking style, instead of | |
529 // using `.then()`. This is often much easier to write and read, and may even be significantly | |
530 // faster if it allows the use of stack allocation rather than heap allocation. | |
531 | |
532 void runSynchronously(kj::FunctionParam<void()> func) const; | |
533 // Use one of the stacks in the pool to synchronously execute func(), returning the result that | |
534 // func() returns. This is not the usual use case for fibers, but can be a nice optimization | |
535 // in programs that have many threads that mostly only need small stacks, but occasionally need | |
536 // a much bigger stack to run some deeply recursive algorithm. If the algorithm is run on each | |
537 // thread's normal call stack, then every thread's stack will tend to grow to be very big | |
538 // (usually, stacks automatically grow as needed, but do not shrink until the thread exits | |
539 // completely). If the thread can share a small set of big stacks that they use only when calling | |
540 // the deeply recursive algorithm, and use small stacks for everything else, overall memory usage | |
541 // is reduced. | |
542 // | |
543 // TODO(someday): If func() returns a value, return it from runSynchronously? Current use case | |
544 // doesn't need it. | |
545 | |
546 size_t getFreelistSize() const; | |
547 // Get the number of stacks currently in the freelist. Does not count stacks that are active. | |
548 | |
549 private: | |
550 class Impl; | |
551 Own<Impl> impl; | |
552 | |
553 friend class _::FiberStack; | |
554 friend class _::FiberBase; | |
555 }; | |
556 | |
557 template <typename T> | |
558 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location = {}); | |
559 // Join an array of promises into a promise for an array. Trailing continuations on promises are not | |
560 // evaluated until all promises have settled. Exceptions are propagated only after the last promise | |
561 // has settled. | |
562 // | |
563 // TODO(cleanup): It is likely that `joinPromisesFailFast()` is what everyone should be using. | |
564 // Deprecate this function. | |
565 | |
566 template <typename T> | |
567 Promise<Array<T>> joinPromisesFailFast(Array<Promise<T>>&& promises, SourceLocation location = {}); | |
568 // Join an array of promises into a promise for an array. Trailing continuations on promises are | |
569 // evaluated eagerly. If any promise results in an exception, the exception is immediately | |
570 // propagated to the returned join promise. | |
571 | |
572 // ======================================================================================= | |
573 // Hack for creating a lambda that holds an owned pointer. | |
574 | |
575 template <typename Func, typename MovedParam> | |
576 class CaptureByMove { | |
577 public: | |
578 inline CaptureByMove(Func&& func, MovedParam&& param) | |
579 : func(kj::mv(func)), param(kj::mv(param)) {} | |
580 | |
581 template <typename... Params> | |
582 inline auto operator()(Params&&... params) | |
583 -> decltype(kj::instance<Func>()(kj::instance<MovedParam&&>(), kj::fwd<Params>(params)...)) { | |
584 return func(kj::mv(param), kj::fwd<Params>(params)...); | |
585 } | |
586 | |
587 private: | |
588 Func func; | |
589 MovedParam param; | |
590 }; | |
591 | |
592 template <typename Func, typename MovedParam> | |
593 inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) | |
594 KJ_DEPRECATED("Use C++14 generalized captures instead."); | |
595 | |
596 template <typename Func, typename MovedParam> | |
597 inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) { | |
598 // Hack to create a "lambda" which captures a variable by moving it rather than copying or | |
599 // referencing. C++14 generalized captures should make this obsolete, but for now in C++11 this | |
600 // is commonly needed for Promise continuations that own their state. Example usage: | |
601 // | |
602 // Own<Foo> ptr = makeFoo(); | |
603 // Promise<int> promise = callRpc(); | |
604 // promise.then(mvCapture(ptr, [](Own<Foo>&& ptr, int result) { | |
605 // return ptr->finish(result); | |
606 // })); | |
607 | |
608 return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param)); | |
609 } | |
610 | |
611 // ======================================================================================= | |
612 // Hack for safely using a lambda as a coroutine. | |
613 | |
614 #if KJ_HAS_COROUTINE | |
615 | |
616 namespace _ { | |
617 | |
618 void throwMultipleCoCaptureInvocations(); | |
619 | |
620 template<typename Functor> | |
621 struct CaptureForCoroutine { | |
622 kj::Maybe<Functor> maybeFunctor; | |
623 | |
624 explicit CaptureForCoroutine(Functor&& f) : maybeFunctor(kj::mv(f)) {} | |
625 | |
626 template<typename ...Args> | |
627 static auto coInvoke(Functor functor, Args&&... args) | |
628 -> decltype(functor(kj::fwd<Args>(args)...)) { | |
629 // Since the functor is now in the local scope and no longer a member variable, it will be | |
630 // persisted in the coroutine state. | |
631 | |
632 // Note that `co_await functor(...)` can still return `void`. It just happens that | |
633 // `co_return voidReturn();` is explicitly allowed. | |
634 co_return co_await functor(kj::fwd<Args>(args)...); | |
635 } | |
636 | |
637 template<typename ...Args> | |
638 auto operator()(Args&&... args) { | |
639 if (maybeFunctor == nullptr) { | |
640 throwMultipleCoCaptureInvocations(); | |
641 } | |
642 auto localFunctor = kj::mv(*kj::_::readMaybe(maybeFunctor)); | |
643 maybeFunctor = nullptr; | |
644 return coInvoke(kj::mv(localFunctor), kj::fwd<Args>(args)...); | |
645 } | |
646 }; | |
647 | |
648 } // namespace _ | |
649 | |
650 template <typename Functor> | |
651 auto coCapture(Functor&& f) { | |
652 // Assuming `f()` returns a Promise<T> `p`, wrap `f` in such a way that it will outlive its | |
653 // returned Promise. Note that the returned object may only be invoked once. | |
654 // | |
655 // This function is meant to help address this pain point with functors that return a coroutine: | |
656 // https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rcoro-capture | |
657 // | |
658 // The two most common patterns where this may be useful look like so: | |
659 // ``` | |
660 // void addTask(Value myValue) { | |
661 // auto myFun = [myValue]() -> kj::Promise<void> { | |
662 // ... | |
663 // co_return; | |
664 // }; | |
665 // tasks.add(myFun()); | |
666 // } | |
667 // ``` | |
668 // and | |
669 // ``` | |
670 // kj::Promise<void> afterPromise(kj::Promise<void> promise, Value myValue) { | |
671 // auto myFun = [myValue]() -> kj::Promise<void> { | |
672 // ... | |
673 // co_return; | |
674 // }; | |
675 // return promise.then(kj::mv(myFun)); | |
676 // } | |
677 // ``` | |
678 // | |
679 // Note that there are potentially more optimal alternatives to both of these patterns: | |
680 // ``` | |
681 // void addTask(Value myValue) { | |
682 // auto myFun = [](auto myValue) -> kj::Promise<void> { | |
683 // ... | |
684 // co_return; | |
685 // }; | |
686 // tasks.add(myFun(myValue)); | |
687 // } | |
688 // ``` | |
689 // and | |
690 // ``` | |
691 // kj::Promise<void> afterPromise(kj::Promise<void> promise, Value myValue) { | |
692 // auto myFun = [&]() -> kj::Promise<void> { | |
693 // ... | |
694 // co_return; | |
695 // }; | |
696 // co_await promise; | |
697 // co_await myFun(); | |
698 // co_return; | |
699 // } | |
700 // ``` | |
701 // | |
702 // For situations where you are trying to capture a specific local variable, kj::mvCapture() can | |
703 // also be useful: | |
704 // ``` | |
705 // kj::Promise<void> reactToPromise(kj::Promise<MyType> promise) { | |
706 // BigA a; | |
707 // TinyB b; | |
708 // | |
709 // doSomething(a, b); | |
710 // return promise.then(kj::mvCapture(b, [](TinyB b, MyType type) -> kj::Promise<void> { | |
711 // ... | |
712 // co_return; | |
713 // }); | |
714 // } | |
715 // ``` | |
716 | |
717 return _::CaptureForCoroutine(kj::mv(f)); | |
718 } | |
719 | |
720 #endif // KJ_HAS_COROUTINE | |
721 | |
722 // ======================================================================================= | |
723 // Advanced promise construction | |
724 | |
725 class PromiseRejector: private AsyncObject { | |
726 // Superclass of PromiseFulfiller containing the non-typed methods. Useful when you only really | |
727 // need to be able to reject a promise, and you need to operate on fulfillers of different types. | |
728 public: | |
729 virtual void reject(Exception&& exception) = 0; | |
730 virtual bool isWaiting() = 0; | |
731 }; | |
732 | |
733 template <typename T> | |
734 class PromiseFulfiller: public PromiseRejector { | |
735 // A callback which can be used to fulfill a promise. Only the first call to fulfill() or | |
736 // reject() matters; subsequent calls are ignored. | |
737 | |
738 public: | |
739 virtual void fulfill(T&& value) = 0; | |
740 // Fulfill the promise with the given value. | |
741 | |
742 virtual void reject(Exception&& exception) = 0; | |
743 // Reject the promise with an error. | |
744 | |
745 virtual bool isWaiting() = 0; | |
746 // Returns true if the promise is still unfulfilled and someone is potentially waiting for it. | |
747 // Returns false if fulfill()/reject() has already been called *or* if the promise to be | |
748 // fulfilled has been discarded and therefore the result will never be used anyway. | |
749 | |
750 template <typename Func> | |
751 bool rejectIfThrows(Func&& func); | |
752 // Call the function (with no arguments) and return true. If an exception is thrown, call | |
753 // `fulfiller.reject()` and then return false. When compiled with exceptions disabled, | |
754 // non-fatal exceptions are still detected and handled correctly. | |
755 }; | |
756 | |
757 template <> | |
758 class PromiseFulfiller<void>: public PromiseRejector { | |
759 // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>. | |
760 | |
761 public: | |
762 virtual void fulfill(_::Void&& value = _::Void()) = 0; | |
763 // Call with zero parameters. The parameter is a dummy that only exists so that subclasses don't | |
764 // have to specialize for <void>. | |
765 | |
766 virtual void reject(Exception&& exception) = 0; | |
767 virtual bool isWaiting() = 0; | |
768 | |
769 template <typename Func> | |
770 bool rejectIfThrows(Func&& func); | |
771 }; | |
772 | |
773 template <typename T, typename Adapter, typename... Params> | |
774 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams); | |
775 // Creates a new promise which owns an instance of `Adapter` which encapsulates the operation | |
776 // that will eventually fulfill the promise. This is primarily useful for adapting non-KJ | |
777 // asynchronous APIs to use promises. | |
778 // | |
779 // An instance of `Adapter` will be allocated and owned by the returned `Promise`. A | |
780 // `PromiseFulfiller<T>&` will be passed as the first parameter to the adapter's constructor, | |
781 // and `adapterConstructorParams` will be forwarded as the subsequent parameters. The adapter | |
782 // is expected to perform some asynchronous operation and call the `PromiseFulfiller<T>` once | |
783 // it is finished. | |
784 // | |
785 // The adapter is destroyed when its owning Promise is destroyed. This may occur before the | |
786 // Promise has been fulfilled. In this case, the adapter's destructor should cancel the | |
787 // asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be | |
788 // called. | |
789 // | |
790 // An adapter implementation should be carefully written to ensure that it cannot accidentally | |
791 // be left unfulfilled permanently because of an exception. Consider making liberal use of | |
792 // `PromiseFulfiller<T>::rejectIfThrows()`. | |
793 | |
794 template <typename T> | |
795 struct PromiseFulfillerPair { | |
796 _::ReducePromises<T> promise; | |
797 Own<PromiseFulfiller<T>> fulfiller; | |
798 }; | |
799 | |
800 template <typename T> | |
801 PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location = {}); | |
802 // Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise. | |
803 // If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is | |
804 // implicitly rejected. | |
805 // | |
806 // Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback | |
807 // that there is no way to handle cancellation (i.e. detect when the Promise is discarded). | |
808 // | |
809 // You can arrange to fulfill a promise with another promise by using a promise type for T. E.g. | |
810 // `newPromiseAndFulfiller<Promise<U>>()` will produce a promise of type `Promise<U>` but the | |
811 // fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the | |
812 // `fulfill()` callback, and the promises are chained. | |
813 | |
814 template <typename T> | |
815 class CrossThreadPromiseFulfiller: public kj::PromiseFulfiller<T> { | |
816 // Like PromiseFulfiller<T> but the methods are `const`, indicating they can safely be called | |
817 // from another thread. | |
818 | |
819 public: | |
820 virtual void fulfill(T&& value) const = 0; | |
821 virtual void reject(Exception&& exception) const = 0; | |
822 virtual bool isWaiting() const = 0; | |
823 | |
824 void fulfill(T&& value) override { return constThis()->fulfill(kj::fwd<T>(value)); } | |
825 void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); } | |
826 bool isWaiting() override { return constThis()->isWaiting(); } | |
827 | |
828 private: | |
829 const CrossThreadPromiseFulfiller* constThis() { return this; } | |
830 }; | |
831 | |
832 template <> | |
833 class CrossThreadPromiseFulfiller<void>: public kj::PromiseFulfiller<void> { | |
834 // Specialization of CrossThreadPromiseFulfiller for void promises. See | |
835 // CrossThreadPromiseFulfiller<T>. | |
836 | |
837 public: | |
838 virtual void fulfill(_::Void&& value = _::Void()) const = 0; | |
839 virtual void reject(Exception&& exception) const = 0; | |
840 virtual bool isWaiting() const = 0; | |
841 | |
842 void fulfill(_::Void&& value) override { return constThis()->fulfill(kj::mv(value)); } | |
843 void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); } | |
844 bool isWaiting() override { return constThis()->isWaiting(); } | |
845 | |
846 private: | |
847 const CrossThreadPromiseFulfiller* constThis() { return this; } | |
848 }; | |
849 | |
850 template <typename T> | |
851 struct PromiseCrossThreadFulfillerPair { | |
852 _::ReducePromises<T> promise; | |
853 Own<CrossThreadPromiseFulfiller<T>> fulfiller; | |
854 }; | |
855 | |
856 template <typename T> | |
857 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller(); | |
858 // Like `newPromiseAndFulfiller()`, but the fulfiller is allowed to be invoked from any thread, | |
859 // not just the one that called this method. Note that the Promise is still tied to the calling | |
860 // thread's event loop and *cannot* be used from another thread -- only the PromiseFulfiller is | |
861 // cross-thread. | |
862 | |
863 // ======================================================================================= | |
864 // Canceler | |
865 | |
866 class Canceler: private AsyncObject { | |
867 // A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or | |
868 // implicitly when the Canceler is destroyed. | |
869 // | |
870 // The cancellation is done in such a way that once cancel() (or the Canceler's destructor) | |
871 // returns, it's guaranteed that the promise has already been canceled and destroyed. This | |
872 // guarantee is important for enforcing ownership constraints. For example, imagine that Alice | |
873 // calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's | |
874 // internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed | |
875 // at random without Alice having canceled the promise. In this case, it is necessary for Bob to | |
876 // ensure that the promise will be forcefully canceled. Bob can do this by constructing a | |
877 // Canceler and using it to wrap promises before returning them to callers. When Bob is | |
878 // destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors. | |
879 // | |
880 // Note that another common strategy for cancellation is to use exclusiveJoin() to join a promise | |
881 // with some "cancellation promise" which only resolves if the operation should be canceled. The | |
882 // cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus | |
883 // calling the PromiseFulfiller cancels the operation. There is a major problem with this | |
884 // approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the | |
885 // exclusive-joined promise actually resolves and cancels its other fork. During that time, the | |
886 // task might continue to execute. If it holds pointers to objects that have been destroyed, this | |
887 // might cause segfaults. Thus, it is safer to use a Canceler. | |
888 | |
889 public: | |
890 inline Canceler() {} | |
891 ~Canceler() noexcept(false); | |
892 KJ_DISALLOW_COPY_AND_MOVE(Canceler); | |
893 | |
894 template <typename T> | |
895 Promise<T> wrap(Promise<T> promise) { | |
896 return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise)); | |
897 } | |
898 | |
899 void cancel(StringPtr cancelReason); | |
900 void cancel(const Exception& exception); | |
901 // Cancel all previously-wrapped promises that have not already completed, causing them to throw | |
902 // the given exception. If you provide just a description message instead of an exception, then | |
903 // an exception object will be constructed from it -- but only if there are requests to cancel. | |
904 | |
905 void release(); | |
906 // Releases previously-wrapped promises, so that they will not be canceled regardless of what | |
907 // happens to this Canceler. | |
908 | |
909 bool isEmpty() const { return list == nullptr; } | |
910 // Indicates if any previously-wrapped promises are still executing. (If this returns true, then | |
911 // cancel() would be a no-op.) | |
912 | |
913 private: | |
914 class AdapterBase { | |
915 public: | |
916 AdapterBase(Canceler& canceler); | |
917 ~AdapterBase() noexcept(false); | |
918 | |
919 virtual void cancel(Exception&& e) = 0; | |
920 | |
921 void unlink(); | |
922 | |
923 private: | |
924 Maybe<Maybe<AdapterBase&>&> prev; | |
925 Maybe<AdapterBase&> next; | |
926 friend class Canceler; | |
927 }; | |
928 | |
929 template <typename T> | |
930 class AdapterImpl: public AdapterBase { | |
931 public: | |
932 AdapterImpl(PromiseFulfiller<T>& fulfiller, | |
933 Canceler& canceler, Promise<T> inner) | |
934 : AdapterBase(canceler), | |
935 fulfiller(fulfiller), | |
936 inner(inner.then( | |
937 [&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); }, | |
938 [&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); }) | |
939 .eagerlyEvaluate(nullptr)) {} | |
940 | |
941 void cancel(Exception&& e) override { | |
942 fulfiller.reject(kj::mv(e)); | |
943 inner = nullptr; | |
944 } | |
945 | |
946 private: | |
947 PromiseFulfiller<T>& fulfiller; | |
948 Promise<void> inner; | |
949 }; | |
950 | |
951 Maybe<AdapterBase&> list; | |
952 }; | |
953 | |
954 template <> | |
955 class Canceler::AdapterImpl<void>: public AdapterBase { | |
956 public: | |
957 AdapterImpl(kj::PromiseFulfiller<void>& fulfiller, | |
958 Canceler& canceler, kj::Promise<void> inner); | |
959 void cancel(kj::Exception&& e) override; | |
960 // These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to | |
961 // link with symbols defined in async.c++ merely because they included async.h. | |
962 | |
963 private: | |
964 kj::PromiseFulfiller<void>& fulfiller; | |
965 kj::Promise<void> inner; | |
966 }; | |
967 | |
968 // ======================================================================================= | |
969 // TaskSet | |
970 | |
971 class TaskSet: private AsyncObject { | |
972 // Holds a collection of Promise<void>s and ensures that each executes to completion. Memory | |
973 // associated with each promise is automatically freed when the promise completes. Destroying | |
974 // the TaskSet itself automatically cancels all unfinished promises. | |
975 // | |
976 // This is useful for "daemon" objects that perform background tasks which aren't intended to | |
977 // fulfill any particular external promise, but which may need to be canceled (and thus can't | |
978 // use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is | |
979 // working on. This way, if the daemon itself is destroyed, the TaskSet is destroyed as well, | |
980 // and everything the daemon is doing is canceled. | |
981 | |
982 public: | |
983 class ErrorHandler { | |
984 public: | |
985 virtual void taskFailed(kj::Exception&& exception) = 0; | |
986 }; | |
987 | |
988 TaskSet(ErrorHandler& errorHandler, SourceLocation location = {}); | |
989 // `errorHandler` will be executed any time a task throws an exception, and will execute within | |
990 // the given EventLoop. | |
991 | |
992 ~TaskSet() noexcept(false); | |
993 | |
994 void add(Promise<void>&& promise); | |
995 | |
996 kj::String trace(); | |
997 // Return debug info about all promises currently in the TaskSet. | |
998 | |
999 bool isEmpty() { return tasks == nullptr; } | |
1000 // Check if any tasks are running. | |
1001 | |
1002 Promise<void> onEmpty(); | |
1003 // Returns a promise that fulfills the next time the TaskSet is empty. Only one such promise can | |
1004 // exist at a time. | |
1005 | |
1006 void clear(); | |
1007 // Cancel all tasks. | |
1008 // | |
1009 // As always, it is not safe to cancel the task that is currently running, so you could not call | |
1010 // this from inside a task in the TaskSet. However, it IS safe to call this from the | |
1011 // `taskFailed()` callback. | |
1012 // | |
1013 // Calling this will always trigger onEmpty(), if anyone is listening. | |
1014 | |
1015 private: | |
1016 class Task; | |
1017 using OwnTask = Own<Task, _::PromiseDisposer>; | |
1018 | |
1019 TaskSet::ErrorHandler& errorHandler; | |
1020 Maybe<OwnTask> tasks; | |
1021 Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller; | |
1022 SourceLocation location; | |
1023 }; | |
1024 | |
1025 // ======================================================================================= | |
1026 // Cross-thread execution. | |
1027 | |
1028 class Executor { | |
1029 // Executes code on another thread's event loop. | |
1030 // | |
1031 // Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current | |
1032 // thread's event loop. You may then pass the reference to other threads to enable them to call | |
1033 // back to this one. | |
1034 | |
1035 public: | |
1036 Executor(EventLoop& loop, Badge<EventLoop>); | |
1037 ~Executor() noexcept(false); | |
1038 | |
1039 virtual kj::Own<const Executor> addRef() const = 0; | |
1040 // Add a reference to this Executor. The Executor will not be destroyed until all references are | |
1041 // dropped. This uses atomic refcounting for thread-safety. | |
1042 // | |
1043 // Use this when you can't guarantee that the target thread's event loop won't concurrently exit | |
1044 // (including due to an uncaught exception!) while another thread is still using the Executor. | |
1045 // Otherwise, the Executor object is destroyed when the owning event loop exits. | |
1046 // | |
1047 // If the target event loop has exited, then `execute{Async,Sync}` will throw DISCONNECTED | |
1048 // exceptions. | |
1049 | |
1050 bool isLive() const; | |
1051 // Returns true if the remote event loop still exists, false if it has been destroyed. In the | |
1052 // latter case, `execute{Async,Sync}()` will definitely throw. Of course, if this returns true, | |
1053 // it could still change to false at any moment, and `execute{Async,Sync}()` could still throw as | |
1054 // a result. | |
1055 // | |
1056 // TODO(cleanup): Should we have tryExecute{Async,Sync}() that return Maybes that are null if | |
1057 // the remote event loop exited? Currently there are multiple known use cases that check | |
1058 // isLive() after catching a DISCONNECTED exception to decide whether it is due to the executor | |
1059 // exiting, and then handling that case. This is borderline in violation of KJ exception | |
1060 // philosophy, but right now I'm not excited about the extra template metaprogramming needed | |
1061 // for "try" versions... | |
1062 | |
1063 template <typename Func> | |
1064 PromiseForResult<Func, void> executeAsync(Func&& func, SourceLocation location = {}) const; | |
1065 // Call from any thread to request that the given function be executed on the executor's thread, | |
1066 // returning a promise for the result. | |
1067 // | |
1068 // The Promise returned by executeAsync() belongs to the requesting thread, not the executor | |
1069 // thread. Hence, for example, continuations added to this promise with .then() will execute in | |
1070 // the requesting thread. | |
1071 // | |
1072 // If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting | |
1073 // thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread | |
1074 // awaits the promise. Once it resolves to a final result, that result is transferred to the | |
1075 // requesting thread, resolving the promise that executeAsync() returned earlier. | |
1076 // | |
1077 // `func` will be destroyed in the requesting thread, after the final result has been returned | |
1078 // from the executor thread. This means that it is safe for `func` to capture objects that cannot | |
1079 // safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference, | |
1080 // so long as the functor remains live until the promise completes or is canceled, and the | |
1081 // function is thread-safe. | |
1082 // | |
1083 // Of course, the body of `func` must be careful that any access it makes on these objects is | |
1084 // safe cross-thread. For example, it must not attempt to access Promise-related objects | |
1085 // cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it | |
1086 // from another. Unfortunately, the usual convention of using const-correctness to enforce | |
1087 // thread-safety does not work here, because applications can often ensure that `func` has | |
1088 // exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe | |
1089 // ways; the const qualifier is not sufficient to express this. | |
1090 // | |
1091 // The final return value of `func` is transferred between threads, and hence is constructed and | |
1092 // destroyed in separate threads. It is the app's responsibility to make sure this is OK. | |
1093 // Alternatively, the app can perhaps arrange to send the return value back to the original | |
1094 // thread for destruction, if needed. | |
1095 // | |
1096 // If the requesting thread destroys the returned Promise, the destructor will block waiting for | |
1097 // the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed | |
1098 // before the Promise's destructor returns. | |
1099 // | |
1100 // Multiple calls to executeAsync() from the same requesting thread to the same target thread | |
1101 // will be delivered in the same order in which they were requested. (However, if func() returns | |
1102 // a promise, delivery of subsequent calls is not blocked on that promise. In other words, this | |
1103 // call provides E-Order in the same way as Cap'n Proto.) | |
1104 | |
1105 template <typename Func> | |
1106 _::UnwrapPromise<PromiseForResult<Func, void>> executeSync( | |
1107 Func&& func, SourceLocation location = {}) const; | |
1108 // Schedules `func()` to execute on the executor thread, and then blocks the requesting thread | |
1109 // until `func()` completes. If `func()` returns a Promise, then the wait will continue until | |
1110 // that promise resolves, and the final result will be returned to the requesting thread. | |
1111 // | |
1112 // The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that | |
1113 // loop will *not* execute while the thread is blocked. This method is particularly useful to | |
1114 // allow non-event-loop threads to perform I/O via a separate event-loop thread. | |
1115 // | |
1116 // As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the | |
1117 // executor thread has signaled completion. The return value is transferred between threads. | |
1118 | |
1119 private: | |
1120 struct Impl; | |
1121 Own<Impl> impl; | |
1122 // To avoid including mutex.h... | |
1123 | |
1124 friend class EventLoop; | |
1125 friend class _::XThreadEvent; | |
1126 friend class _::XThreadPaf; | |
1127 | |
1128 void send(_::XThreadEvent& event, bool sync) const; | |
1129 void wait(); | |
1130 bool poll(); | |
1131 | |
1132 EventLoop& getLoop() const; | |
1133 }; | |
1134 | |
1135 const Executor& getCurrentThreadExecutor(); | |
1136 // Get the executor for the current thread's event loop. This reference can then be passed to other | |
1137 // threads. | |
1138 | |
1139 // ======================================================================================= | |
1140 // The EventLoop class | |
1141 | |
1142 class EventPort { | |
1143 // Interfaces between an `EventLoop` and events originating from outside of the loop's thread. | |
1144 // All such events come in through the `EventPort` implementation. | |
1145 // | |
1146 // An `EventPort` implementation may interface with low-level operating system APIs and/or other | |
1147 // threads. You can also write an `EventPort` which wraps some other (non-KJ) event loop | |
1148 // framework, allowing the two to coexist in a single thread. | |
1149 | |
1150 public: | |
1151 virtual bool wait() = 0; | |
1152 // Wait for an external event to arrive, sleeping if necessary. Once at least one event has | |
1153 // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return. | |
1154 // | |
1155 // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to | |
1156 // wait for new events to populate the queue. | |
1157 // | |
1158 // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in | |
1159 // a loop will eventually sleep. (That is to say, false positives are fine.) | |
1160 // | |
1161 // Returns true if wake() has been called from another thread. (Precisely, returns true if | |
1162 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last | |
1163 // called.) | |
1164 | |
1165 virtual bool poll() = 0; | |
1166 // Check if any external events have arrived, but do not sleep. If any events have arrived, | |
1167 // add them to the event queue (e.g. by fulfilling promises) before returning. | |
1168 // | |
1169 // This may be called during `Promise::wait()` when the EventLoop has been executing for a while | |
1170 // without a break but is still non-empty. | |
1171 // | |
1172 // Returns true if wake() has been called from another thread. (Precisely, returns true if | |
1173 // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last | |
1174 // called.) | |
1175 | |
1176 virtual void setRunnable(bool runnable); | |
1177 // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it | |
1178 // transitions from empty -> runnable or runnable -> empty. This is typically useful when | |
1179 // integrating with an external event loop; if the loop is currently runnable then you should | |
1180 // arrange to call run() on it soon. The default implementation does nothing. | |
1181 | |
1182 virtual void wake() const; | |
1183 // Wake up the EventPort's thread from another thread. | |
1184 // | |
1185 // Unlike all other methods on this interface, `wake()` may be called from another thread, hence | |
1186 // it is `const`. | |
1187 // | |
1188 // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep | |
1189 // again until `wait()` or `poll()` has returned true at least once. | |
1190 // | |
1191 // The default implementation throws an UNIMPLEMENTED exception. | |
1192 }; | |
1193 | |
1194 class EventLoop { | |
1195 // Represents a queue of events being executed in a loop. Most code won't interact with | |
1196 // EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the | |
1197 // documentation for `Promise`. | |
1198 // | |
1199 // Each thread can have at most one current EventLoop. To make an `EventLoop` current for | |
1200 // the thread, create a `WaitScope`. Async APIs require that the thread has a current EventLoop, | |
1201 // or they will throw exceptions. APIs that use `Promise::wait()` additionally must explicitly | |
1202 // be passed a reference to the `WaitScope` to make the caller aware that they might block. | |
1203 // | |
1204 // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g. | |
1205 // in the main() function, or in the start function of a thread. You can then use it to | |
1206 // construct some promises and wait on the result. Example: | |
1207 // | |
1208 // int main() { | |
1209 // // `loop` becomes the official EventLoop for the thread. | |
1210 // MyEventPort eventPort; | |
1211 // EventLoop loop(eventPort); | |
1212 // | |
1213 // // Now we can call an async function. | |
1214 // Promise<String> textPromise = getHttp("http://example.com"); | |
1215 // | |
1216 // // And we can wait for the promise to complete. Note that you can only use `wait()` | |
1217 // // from the top level, not from inside a promise callback. | |
1218 // String text = textPromise.wait(); | |
1219 // print(text); | |
1220 // return 0; | |
1221 // } | |
1222 // | |
1223 // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather | |
1224 // than allocate an `EventLoop` directly. | |
1225 | |
1226 public: | |
1227 EventLoop(); | |
1228 // Construct an `EventLoop` which does not receive external events at all. | |
1229 | |
1230 explicit EventLoop(EventPort& port); | |
1231 // Construct an `EventLoop` which receives external events through the given `EventPort`. | |
1232 | |
1233 ~EventLoop() noexcept(false); | |
1234 | |
1235 void run(uint maxTurnCount = maxValue); | |
1236 // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done, | |
1237 // whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will | |
1238 // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty. | |
1239 | |
1240 bool isRunnable(); | |
1241 // Returns true if run() would currently do anything, or false if the queue is empty. | |
1242 | |
1243 const Executor& getExecutor(); | |
1244 // Returns an Executor that can be used to schedule events on this EventLoop from another thread. | |
1245 // | |
1246 // Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's | |
1247 // Executor. | |
1248 // | |
1249 // Note that this is only needed for cross-thread scheduling. To schedule code to run later in | |
1250 // the current thread, use `kj::evalLater()`, which will be more efficient. | |
1251 | |
1252 private: | |
1253 kj::Maybe<EventPort&> port; | |
1254 // If null, this thread doesn't receive I/O events from the OS. It can potentially receive | |
1255 // events from other threads via the Executor. | |
1256 | |
1257 bool running = false; | |
1258 // True while looping -- wait() is then not allowed. | |
1259 | |
1260 bool lastRunnableState = false; | |
1261 // What did we last pass to port.setRunnable()? | |
1262 | |
1263 _::Event* head = nullptr; | |
1264 _::Event** tail = &head; | |
1265 _::Event** depthFirstInsertPoint = &head; | |
1266 _::Event** breadthFirstInsertPoint = &head; | |
1267 | |
1268 kj::Maybe<Own<Executor>> executor; | |
1269 // Allocated the first time getExecutor() is requested, making cross-thread request possible. | |
1270 | |
1271 Own<TaskSet> daemons; | |
1272 | |
1273 _::Event* currentlyFiring = nullptr; | |
1274 | |
1275 bool turn(); | |
1276 void setRunnable(bool runnable); | |
1277 void enterScope(); | |
1278 void leaveScope(); | |
1279 | |
1280 void wait(); | |
1281 void poll(); | |
1282 | |
1283 friend void _::detach(kj::Promise<void>&& promise); | |
1284 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result, | |
1285 WaitScope& waitScope, SourceLocation location); | |
1286 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); | |
1287 friend class _::Event; | |
1288 friend class WaitScope; | |
1289 friend class Executor; | |
1290 friend class _::XThreadEvent; | |
1291 friend class _::XThreadPaf; | |
1292 friend class _::FiberBase; | |
1293 friend class _::FiberStack; | |
1294 friend ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space); | |
1295 }; | |
1296 | |
1297 class WaitScope { | |
1298 // Represents a scope in which asynchronous programming can occur. A `WaitScope` should usually | |
1299 // be allocated on the stack and serves two purposes: | |
1300 // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the | |
1301 // thread. Most operations dealing with `Promise` (including all of its methods) do not work | |
1302 // unless the thread has a current `EventLoop`. | |
1303 // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular | |
1304 // promise to complete. See `Promise::wait()` for an extended discussion. | |
1305 | |
1306 public: | |
1307 inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); } | |
1308 inline ~WaitScope() { if (fiber == nullptr) loop.leaveScope(); } | |
1309 KJ_DISALLOW_COPY_AND_MOVE(WaitScope); | |
1310 | |
1311 uint poll(uint maxTurnCount = maxValue); | |
1312 // Pumps the event queue and polls for I/O until there's nothing left to do (without blocking) or | |
1313 // the maximum turn count has been reached. Returns the number of events popped off the event | |
1314 // queue. | |
1315 // | |
1316 // Not supported in fibers. | |
1317 | |
1318 void setBusyPollInterval(uint count) { busyPollInterval = count; } | |
1319 // Set the maximum number of events to run in a row before calling poll() on the EventPort to | |
1320 // check for new I/O. | |
1321 // | |
1322 // This has no effect when used in a fiber. | |
1323 | |
1324 void runEventCallbacksOnStackPool(kj::Maybe<const FiberPool&> pool) { runningStacksPool = pool; } | |
1325 // Arranges to switch stacks while event callbacks are executing. This is an optimization that | |
1326 // is useful for programs that use extremely high thread counts, where each thread has its own | |
1327 // event loop, but each thread has relatively low event throughput, i.e. each thread spends | |
1328 // most of its time waiting for I/O. Normally, the biggest problem with having lots of threads | |
1329 // is that each thread must allocate a stack, and stacks can take a lot of memory if the | |
1330 // application commonly makes deep calls. But, most of that stack space is only needed while | |
1331 // the thread is executing, not while it's sleeping. So, if threads only switch to a big stack | |
1332 // during execution, switching back when it's time to sleep, and if those stacks are freelisted | |
1333 // so that they can be shared among threads, then a lot of memory is saved. | |
1334 // | |
1335 // We use the `FiberPool` type here because it implements a freelist of stacks, which is exactly | |
1336 // what we happen to want! In our case, though, we don't use those stacks to implement fibers; | |
1337 // we use them as the main thread stack. | |
1338 // | |
1339 // This has no effect if this WaitScope itself is for a fiber. | |
1340 // | |
1341 // Pass `nullptr` as the parameter to go back to running events on the main stack. | |
1342 | |
1343 void cancelAllDetached(); | |
1344 // HACK: Immediately cancel all detached promises. | |
1345 // | |
1346 // New code should not use detached promises, and therefore should not need this. | |
1347 // | |
1348 // This method exists to help existing code deal with the problems of detached promises, | |
1349 // especially at teardown time. | |
1350 // | |
1351 // This method may be removed in the future. | |
1352 | |
1353 private: | |
1354 EventLoop& loop; | |
1355 uint busyPollInterval = kj::maxValue; | |
1356 | |
1357 kj::Maybe<_::FiberBase&> fiber; | |
1358 kj::Maybe<const FiberPool&> runningStacksPool; | |
1359 | |
1360 explicit WaitScope(EventLoop& loop, _::FiberBase& fiber) | |
1361 : loop(loop), fiber(fiber) {} | |
1362 | |
1363 template <typename Func> | |
1364 inline void runOnStackPool(Func&& func) { | |
1365 KJ_IF_MAYBE(pool, runningStacksPool) { | |
1366 pool->runSynchronously(kj::fwd<Func>(func)); | |
1367 } else { | |
1368 func(); | |
1369 } | |
1370 } | |
1371 | |
1372 friend class EventLoop; | |
1373 friend class _::FiberBase; | |
1374 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result, | |
1375 WaitScope& waitScope, SourceLocation location); | |
1376 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); | |
1377 }; | |
1378 | |
1379 } // namespace kj | |
1380 | |
1381 #define KJ_ASYNC_H_INCLUDED | |
1382 #include "async-inl.h" | |
1383 | |
1384 KJ_END_HEADER |