Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-inl.h @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
2 // Licensed under the MIT License: | |
3 // | |
4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 // of this software and associated documentation files (the "Software"), to deal | |
6 // in the Software without restriction, including without limitation the rights | |
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 // copies of the Software, and to permit persons to whom the Software is | |
9 // furnished to do so, subject to the following conditions: | |
10 // | |
11 // The above copyright notice and this permission notice shall be included in | |
12 // all copies or substantial portions of the Software. | |
13 // | |
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 // THE SOFTWARE. | |
21 | |
22 // This file contains extended inline implementation details that are required along with async.h. | |
23 // We move this all into a separate file to make async.h more readable. | |
24 // | |
25 // Non-inline declarations here are defined in async.c++. | |
26 | |
27 #pragma once | |
28 | |
29 #ifndef KJ_ASYNC_H_INCLUDED | |
30 #error "Do not include this directly; include kj/async.h." | |
31 #include "async.h" // help IDE parse this file | |
32 #endif | |
33 | |
34 #if _MSC_VER && KJ_HAS_COROUTINE | |
35 #include <intrin.h> | |
36 #endif | |
37 | |
38 #include <kj/list.h> | |
39 | |
40 KJ_BEGIN_HEADER | |
41 | |
42 namespace kj { | |
43 namespace _ { // private | |
44 | |
45 template <typename T> | |
46 class ExceptionOr; | |
47 | |
48 class ExceptionOrValue { | |
49 public: | |
50 ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {} | |
51 KJ_DISALLOW_COPY(ExceptionOrValue); | |
52 | |
53 void addException(Exception&& exception) { | |
54 if (this->exception == nullptr) { | |
55 this->exception = kj::mv(exception); | |
56 } | |
57 } | |
58 | |
59 template <typename T> | |
60 ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); } | |
61 template <typename T> | |
62 const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); } | |
63 | |
64 Maybe<Exception> exception; | |
65 | |
66 protected: | |
67 // Allow subclasses to have move constructor / assignment. | |
68 ExceptionOrValue() = default; | |
69 ExceptionOrValue(ExceptionOrValue&& other) = default; | |
70 ExceptionOrValue& operator=(ExceptionOrValue&& other) = default; | |
71 }; | |
72 | |
73 template <typename T> | |
74 class ExceptionOr: public ExceptionOrValue { | |
75 public: | |
76 ExceptionOr() = default; | |
77 ExceptionOr(T&& value): value(kj::mv(value)) {} | |
78 ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {} | |
79 ExceptionOr(ExceptionOr&&) = default; | |
80 ExceptionOr& operator=(ExceptionOr&&) = default; | |
81 | |
82 Maybe<T> value; | |
83 }; | |
84 | |
85 template <typename T> | |
86 inline T convertToReturn(ExceptionOr<T>&& result) { | |
87 KJ_IF_MAYBE(value, result.value) { | |
88 KJ_IF_MAYBE(exception, result.exception) { | |
89 throwRecoverableException(kj::mv(*exception)); | |
90 } | |
91 return _::returnMaybeVoid(kj::mv(*value)); | |
92 } else KJ_IF_MAYBE(exception, result.exception) { | |
93 throwFatalException(kj::mv(*exception)); | |
94 } else { | |
95 // Result contained neither a value nor an exception? | |
96 KJ_UNREACHABLE; | |
97 } | |
98 } | |
99 | |
100 inline void convertToReturn(ExceptionOr<Void>&& result) { | |
101 // Override <void> case to use throwRecoverableException(). | |
102 | |
103 if (result.value != nullptr) { | |
104 KJ_IF_MAYBE(exception, result.exception) { | |
105 throwRecoverableException(kj::mv(*exception)); | |
106 } | |
107 } else KJ_IF_MAYBE(exception, result.exception) { | |
108 throwRecoverableException(kj::mv(*exception)); | |
109 } else { | |
110 // Result contained neither a value nor an exception? | |
111 KJ_UNREACHABLE; | |
112 } | |
113 } | |
114 | |
115 class TraceBuilder { | |
116 // Helper for methods that build a call trace. | |
117 public: | |
118 TraceBuilder(ArrayPtr<void*> space) | |
119 : start(space.begin()), current(space.begin()), limit(space.end()) {} | |
120 | |
121 inline void add(void* addr) { | |
122 if (current < limit) { | |
123 *current++ = addr; | |
124 } | |
125 } | |
126 | |
127 inline bool full() const { return current == limit; } | |
128 | |
129 ArrayPtr<void*> finish() { | |
130 return arrayPtr(start, current); | |
131 } | |
132 | |
133 String toString(); | |
134 | |
135 private: | |
136 void** start; | |
137 void** current; | |
138 void** limit; | |
139 }; | |
140 | |
141 struct alignas(void*) PromiseArena { | |
142 // Space in which a chain of promises may be allocated. See PromiseDisposer. | |
143 byte bytes[1024]; | |
144 }; | |
145 | |
146 class Event: private AsyncObject { | |
147 // An event waiting to be executed. Not for direct use by applications -- promises use this | |
148 // internally. | |
149 | |
150 public: | |
151 Event(SourceLocation location); | |
152 Event(kj::EventLoop& loop, SourceLocation location); | |
153 ~Event() noexcept(false); | |
154 KJ_DISALLOW_COPY_AND_MOVE(Event); | |
155 | |
156 void armDepthFirst(); | |
157 // Enqueue this event so that `fire()` will be called from the event loop soon. | |
158 // | |
159 // Events scheduled in this way are executed in depth-first order: if an event callback arms | |
160 // more events, those events are placed at the front of the queue (in the order in which they | |
161 // were armed), so that they run immediately after the first event's callback returns. | |
162 // | |
163 // Depth-first event scheduling is appropriate for events that represent simple continuations | |
164 // of a previous event that should be globbed together for performance. Depth-first scheduling | |
165 // can lead to starvation, so any long-running task must occasionally yield with | |
166 // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses | |
167 // breadth-first.) | |
168 // | |
169 // To use breadth-first scheduling instead, use `armBreadthFirst()`. | |
170 | |
171 void armBreadthFirst(); | |
172 // Like `armDepthFirst()` except that the event is placed at the end of the queue. | |
173 | |
174 void armLast(); | |
175 // Enqueues this event to happen after all other events have run to completion and there is | |
176 // really nothing left to do except wait for I/O. | |
177 | |
178 bool isNext(); | |
179 // True if the Event has been armed and is next in line to be fired. This can be used after | |
180 // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately | |
181 // ready, in which case continuations may be optimistically run without returning to the event | |
182 // loop. Note that this optimization is only valid if we know that we would otherwise immediately | |
183 // return to the event loop without running more application code. So this turns out to be useful | |
184 // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it | |
185 // doesn't need to. | |
186 // | |
187 // Returns false if the event loop is not currently running. This ensures that promise | |
188 // continuations don't execute except under a call to .wait(). | |
189 | |
190 void disarm(); | |
191 // If the event is armed but hasn't fired, cancel it. (Destroying the event does this | |
192 // implicitly.) | |
193 | |
194 virtual void traceEvent(TraceBuilder& builder) = 0; | |
195 // Build a trace of the callers leading up to this event. `builder` will be populated with | |
196 // "return addresses" of the promise chain waiting on this event. The return addresses may | |
197 // actually the addresses of lambdas passed to .then(), but in any case, feeding them into | |
198 // addr2line should produce useful source code locations. | |
199 // | |
200 // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It | |
201 // must not allocate nor take locks. | |
202 | |
203 String traceEvent(); | |
204 // Helper that builds a trace and stringifies it. | |
205 | |
206 protected: | |
207 virtual Maybe<Own<Event>> fire() = 0; | |
208 // Fire the event. Possibly returns a pointer to itself, which will be discarded by the | |
209 // caller. This is the only way that an event can delete itself as a result of firing, as | |
210 // doing so from within fire() will throw an exception. | |
211 | |
212 private: | |
213 friend class kj::EventLoop; | |
214 EventLoop& loop; | |
215 Event* next; | |
216 Event** prev; | |
217 bool firing = false; | |
218 | |
219 static constexpr uint MAGIC_LIVE_VALUE = 0x1e366381u; | |
220 uint live = MAGIC_LIVE_VALUE; | |
221 SourceLocation location; | |
222 }; | |
223 | |
224 class PromiseArenaMember { | |
225 // An object that is allocated in a PromiseArena. `PromiseNode` inherits this, and most | |
226 // arena-allocated objects are `PromiseNode` subclasses, but `TaskSet::Task`, ForkHub, and | |
227 // potentially other objects that commonly live on the end of a promise chain can also leverage | |
228 // this. | |
229 | |
230 public: | |
231 virtual void destroy() = 0; | |
232 // Destroys and frees the node. | |
233 // | |
234 // If the node was allocated using allocPromise<T>(), then destroy() must call | |
235 // freePromise<T>(this). If it was allocated some other way, then it is `destroy()`'s | |
236 // responsibility to complete any necessary cleanup of memory, e.g. call `delete this`. | |
237 // | |
238 // We use this instead of a virtual destructor for two reasons: | |
239 // 1. Coroutine nodes are not independent objects, they have to call destroy() on the coroutine | |
240 // handle to delete themselves. | |
241 // 2. XThreadEvents sometimes leave it up to a different thread to actually delete the object. | |
242 | |
243 private: | |
244 PromiseArena* arena = nullptr; | |
245 // If non-null, then this PromiseNode is the last node allocated within the given arena, and | |
246 // therefore owns the arena. After this node is destroyed, the arena should be deleted. | |
247 // | |
248 // PromiseNodes are allocated within the arena starting from the end, and `PromiseNode`s | |
249 // allocated this way are required to have `PromiseNode` itself as their leftmost inherited type, | |
250 // so that the pointers match. Thus, the space in `arena` from its start to the location of the | |
251 // `PromiseNode` is known to be available for subsequent allocations (which should then take | |
252 // ownership of the arena). | |
253 | |
254 friend class PromiseDisposer; | |
255 }; | |
256 | |
257 class PromiseNode: public PromiseArenaMember, private AsyncObject { | |
258 // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations. | |
259 // | |
260 // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky | |
261 // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only | |
262 // so down-cast in the few places that really need to be templated. Luckily this is all | |
263 // internal implementation details. | |
264 | |
265 public: | |
266 virtual void onReady(Event* event) noexcept = 0; | |
267 // Arms the given event when ready. | |
268 // | |
269 // May be called multiple times. If called again before the event was armed, the old event will | |
270 // never be armed, only the new one. If called again after the event was armed, the new event | |
271 // will be armed immediately. Can be called with nullptr to un-register the existing event. | |
272 | |
273 virtual void setSelfPointer(OwnPromiseNode* selfPtr) noexcept; | |
274 // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own | |
275 // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses | |
276 // this to shorten redundant chains. The default implementation does nothing; only | |
277 // ChainPromiseNode should implement this. | |
278 | |
279 virtual void get(ExceptionOrValue& output) noexcept = 0; | |
280 // Get the result. `output` points to an ExceptionOr<T> into which the result will be written. | |
281 // Can only be called once, and only after the node is ready. Must be called directly from the | |
282 // event loop, with no application code on the stack. | |
283 | |
284 virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0; | |
285 // Build a trace of this promise chain, showing what it is currently waiting on. | |
286 // | |
287 // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically | |
288 // recurse to its child first, then after the child returns, add itself to the trace. | |
289 // | |
290 // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that | |
291 // also implements Event, and should not trace that node or its children. This is used in | |
292 // conjunction with Event::traceEvent(). The chain of Events is often more sparse than the chain | |
293 // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an | |
294 // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node | |
295 // when it is ready, and then TransformPromiseNode applies the .then() transformation during the | |
296 // call to .get(). | |
297 // | |
298 // So, when we trace the chain of Events backwards, we end up hoping over segments of | |
299 // TransformPromiseNodes (and other similar types). In order to get those added to the trace, | |
300 // each Event must call back down the PromiseNode chain in the opposite direction, using this | |
301 // method. | |
302 // | |
303 // `tracePromise()` may be called from an async signal handler while `get()` is executing. It | |
304 // must not allocate nor take locks. | |
305 | |
306 template <typename T> | |
307 static OwnPromiseNode from(T&& promise) { | |
308 // Given a Promise, extract the PromiseNode. | |
309 return kj::mv(promise.node); | |
310 } | |
311 template <typename T> | |
312 static PromiseNode& from(T& promise) { | |
313 // Given a Promise, extract the PromiseNode. | |
314 return *promise.node; | |
315 } | |
316 template <typename T> | |
317 static T to(OwnPromiseNode&& node) { | |
318 // Construct a Promise from a PromiseNode. (T should be a Promise type.) | |
319 return T(false, kj::mv(node)); | |
320 } | |
321 | |
322 protected: | |
323 class OnReadyEvent { | |
324 // Helper class for implementing onReady(). | |
325 | |
326 public: | |
327 void init(Event* newEvent); | |
328 | |
329 void arm(); | |
330 void armBreadthFirst(); | |
331 // Arms the event if init() has already been called and makes future calls to init() | |
332 // automatically arm the event. | |
333 | |
334 inline void traceEvent(TraceBuilder& builder) { | |
335 if (event != nullptr && !builder.full()) event->traceEvent(builder); | |
336 } | |
337 | |
338 private: | |
339 Event* event = nullptr; | |
340 }; | |
341 }; | |
342 | |
343 class PromiseDisposer { | |
344 public: | |
345 template <typename T> | |
346 static constexpr bool canArenaAllocate() { | |
347 // We can only use arena allocation for types that fit in an arena and have pointer-size | |
348 // alignment. Anything else will need to be allocated as a separate heap object. | |
349 return sizeof(T) <= sizeof(PromiseArena) && alignof(T) <= alignof(void*); | |
350 } | |
351 | |
352 static void dispose(PromiseArenaMember* node) { | |
353 PromiseArena* arena = node->arena; | |
354 node->destroy(); | |
355 delete arena; // reminder: `delete` automatically ignores null pointers | |
356 } | |
357 | |
358 template <typename T, typename D = PromiseDisposer, typename... Params> | |
359 static kj::Own<T, D> alloc(Params&&... params) noexcept { | |
360 // Implements allocPromise(). | |
361 T* ptr; | |
362 if (!canArenaAllocate<T>()) { | |
363 // Node too big (or needs weird alignment), fall back to regular heap allocation. | |
364 ptr = new T(kj::fwd<Params>(params)...); | |
365 } else { | |
366 // Start a new arena. | |
367 // | |
368 // NOTE: As in append() (below), we don't implement exception-safety because it causes code | |
369 // bloat and these constructors probably don't throw. Instead this function is noexcept, so | |
370 // if a constructor does throw, it'll crash rather than leak memory. | |
371 auto* arena = new PromiseArena; | |
372 ptr = reinterpret_cast<T*>(arena + 1) - 1; | |
373 ctor(*ptr, kj::fwd<Params>(params)...); | |
374 ptr->arena = arena; | |
375 KJ_IREQUIRE(reinterpret_cast<void*>(ptr) == | |
376 reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)), | |
377 "PromiseArenaMember must be the leftmost inherited type."); | |
378 } | |
379 return kj::Own<T, D>(ptr); | |
380 } | |
381 | |
382 template <typename T, typename D = PromiseDisposer, typename... Params> | |
383 static kj::Own<T, D> append( | |
384 OwnPromiseNode&& next, Params&&... params) noexcept { | |
385 // Implements appendPromise(). | |
386 | |
387 PromiseArena* arena = next->arena; | |
388 | |
389 if (!canArenaAllocate<T>() || arena == nullptr || | |
390 reinterpret_cast<byte*>(next.get()) - reinterpret_cast<byte*>(arena) < sizeof(T)) { | |
391 // No arena available, or not enough space, or weird alignment needed. Start new arena. | |
392 return alloc<T, D>(kj::mv(next), kj::fwd<Params>(params)...); | |
393 } else { | |
394 // Append to arena. | |
395 // | |
396 // NOTE: When we call ctor(), it takes ownership of `next`, so we shouldn't assume `next` | |
397 // still exists after it returns. So we have to remove ownership of the arena before that. | |
398 // In theory if we wanted this to be exception-safe, we'd also have to arrange to delete | |
399 // the arena if the constructor throws. However, in practice none of the PromiseNode | |
400 // constructors throw, so we just mark the whole method noexcept in order to avoid the | |
401 // code bloat to handle this case. | |
402 next->arena = nullptr; | |
403 T* ptr = reinterpret_cast<T*>(next.get()) - 1; | |
404 ctor(*ptr, kj::mv(next), kj::fwd<Params>(params)...); | |
405 ptr->arena = arena; | |
406 KJ_IREQUIRE(reinterpret_cast<void*>(ptr) == | |
407 reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)), | |
408 "PromiseArenaMember must be the leftmost inherited type."); | |
409 return kj::Own<T, D>(ptr); | |
410 } | |
411 } | |
412 }; | |
413 | |
414 template <typename T, typename... Params> | |
415 static kj::Own<T, PromiseDisposer> allocPromise(Params&&... params) { | |
416 // Allocate a PromiseNode without appending it to any existing promise arena. Space for a new | |
417 // arena will be allocated. | |
418 return PromiseDisposer::alloc<T>(kj::fwd<Params>(params)...); | |
419 } | |
420 | |
421 template <typename T, bool arena = PromiseDisposer::canArenaAllocate<T>()> | |
422 struct FreePromiseNode; | |
423 template <typename T> | |
424 struct FreePromiseNode<T, true> { | |
425 static inline void free(T* ptr) { | |
426 // The object will have been allocated in an arena, so we only want to run the destructor. | |
427 // The arena's memory will be freed separately. | |
428 kj::dtor(*ptr); | |
429 } | |
430 }; | |
431 template <typename T> | |
432 struct FreePromiseNode<T, false> { | |
433 static inline void free(T* ptr) { | |
434 // The object will have been allocated separately on the heap. | |
435 return delete ptr; | |
436 } | |
437 }; | |
438 | |
439 template <typename T> | |
440 static void freePromise(T* ptr) { | |
441 // Free a PromiseNode originally allocated using `allocPromise<T>()`. The implementation of | |
442 // PromiseNode::destroy() must call this for any type that is allocated using allocPromise(). | |
443 FreePromiseNode<T>::free(ptr); | |
444 } | |
445 | |
446 template <typename T, typename... Params> | |
447 static kj::Own<T, PromiseDisposer> appendPromise(OwnPromiseNode&& next, Params&&... params) { | |
448 // Append a promise to the arena that currently ends with `next`. `next` is also still passed as | |
449 // the first parameter to the new object's constructor. | |
450 // | |
451 // This is semantically the same as `allocPromise()` except that it may avoid the underlying | |
452 // memory allocation. `next` must end up being destroyed before the new object (i.e. the new | |
453 // object must never transfer away ownership of `next`). | |
454 return PromiseDisposer::append<T>(kj::mv(next), kj::fwd<Params>(params)...); | |
455 } | |
456 | |
457 // ------------------------------------------------------------------- | |
458 | |
459 inline ReadyNow::operator Promise<void>() const { | |
460 return PromiseNode::to<Promise<void>>(readyNow()); | |
461 } | |
462 | |
463 template <typename T> | |
464 inline NeverDone::operator Promise<T>() const { | |
465 return PromiseNode::to<Promise<T>>(neverDone()); | |
466 } | |
467 | |
468 // ------------------------------------------------------------------- | |
469 | |
470 class ImmediatePromiseNodeBase: public PromiseNode { | |
471 public: | |
472 ImmediatePromiseNodeBase(); | |
473 ~ImmediatePromiseNodeBase() noexcept(false); | |
474 | |
475 void onReady(Event* event) noexcept override; | |
476 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
477 }; | |
478 | |
479 template <typename T> | |
480 class ImmediatePromiseNode final: public ImmediatePromiseNodeBase { | |
481 // A promise that has already been resolved to an immediate value or exception. | |
482 | |
483 public: | |
484 ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {} | |
485 void destroy() override { freePromise(this); } | |
486 | |
487 void get(ExceptionOrValue& output) noexcept override { | |
488 output.as<T>() = kj::mv(result); | |
489 } | |
490 | |
491 private: | |
492 ExceptionOr<T> result; | |
493 }; | |
494 | |
495 class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase { | |
496 public: | |
497 ImmediateBrokenPromiseNode(Exception&& exception); | |
498 void destroy() override; | |
499 | |
500 void get(ExceptionOrValue& output) noexcept override; | |
501 | |
502 private: | |
503 Exception exception; | |
504 }; | |
505 | |
506 template <typename T, T value> | |
507 class ConstPromiseNode: public ImmediatePromiseNodeBase { | |
508 public: | |
509 void destroy() override {} | |
510 void get(ExceptionOrValue& output) noexcept override { | |
511 output.as<T>() = value; | |
512 } | |
513 }; | |
514 | |
515 // ------------------------------------------------------------------- | |
516 | |
517 class AttachmentPromiseNodeBase: public PromiseNode { | |
518 public: | |
519 AttachmentPromiseNodeBase(OwnPromiseNode&& dependency); | |
520 | |
521 void onReady(Event* event) noexcept override; | |
522 void get(ExceptionOrValue& output) noexcept override; | |
523 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
524 | |
525 private: | |
526 OwnPromiseNode dependency; | |
527 | |
528 void dropDependency(); | |
529 | |
530 template <typename> | |
531 friend class AttachmentPromiseNode; | |
532 }; | |
533 | |
534 template <typename Attachment> | |
535 class AttachmentPromiseNode final: public AttachmentPromiseNodeBase { | |
536 // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable | |
537 // object) until the promise resolves. | |
538 | |
539 public: | |
540 AttachmentPromiseNode(OwnPromiseNode&& dependency, Attachment&& attachment) | |
541 : AttachmentPromiseNodeBase(kj::mv(dependency)), | |
542 attachment(kj::mv<Attachment>(attachment)) {} | |
543 void destroy() override { freePromise(this); } | |
544 | |
545 ~AttachmentPromiseNode() noexcept(false) { | |
546 // We need to make sure the dependency is deleted before we delete the attachment because the | |
547 // dependency may be using the attachment. | |
548 dropDependency(); | |
549 } | |
550 | |
551 private: | |
552 Attachment attachment; | |
553 }; | |
554 | |
555 // ------------------------------------------------------------------- | |
556 | |
557 #if __GNUC__ >= 8 && !__clang__ | |
558 // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no | |
559 // "legal" way for us to extract the content of a PTMF so too bad. | |
560 #pragma GCC diagnostic push | |
561 #pragma GCC diagnostic ignored "-Wclass-memaccess" | |
562 #if __GNUC__ >= 11 | |
563 // GCC 11's array-bounds is similarly upset with us for digging into "private" implementation | |
564 // details. But the format is well-defined by the ABI which cannot change so please just let us | |
565 // do it kthx. | |
566 #pragma GCC diagnostic ignored "-Warray-bounds" | |
567 #endif | |
568 #endif | |
569 | |
570 template <typename T, typename ReturnType, typename... ParamTypes> | |
571 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); | |
572 template <typename T, typename ReturnType, typename... ParamTypes> | |
573 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); | |
574 // Given an object and a pointer-to-method, return the start address of the method's code. The | |
575 // intent is that this address can be used in a trace; addr2line should map it to the start of | |
576 // the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine | |
577 // the address of the specific implementation (otherwise, `obj` wouldn't be needed). | |
578 // | |
579 // Note that if the method is overloaded or is a template, you will need to explicitly specify | |
580 // the param and return types, otherwise the compiler won't know which overload / template | |
581 // specialization you are requesting. | |
582 | |
583 class PtmfHelper { | |
584 // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The | |
585 // class represents the internal representation of a pointer-to-member-function. | |
586 | |
587 template <typename... ParamTypes> | |
588 friend struct GetFunctorStartAddress; | |
589 template <typename T, typename ReturnType, typename... ParamTypes> | |
590 friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); | |
591 template <typename T, typename ReturnType, typename... ParamTypes> | |
592 friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); | |
593 | |
594 #if __GNUG__ | |
595 | |
596 void* ptr; | |
597 ptrdiff_t adj; | |
598 // Layout of a pointer-to-member-function used by GCC and compatible compilers. | |
599 | |
600 void* apply(const void* obj) { | |
601 #if defined(__arm__) || defined(__mips__) || defined(__aarch64__) | |
602 if (adj & 1) { | |
603 ptrdiff_t voff = (ptrdiff_t)ptr; | |
604 #else | |
605 ptrdiff_t voff = (ptrdiff_t)ptr; | |
606 if (voff & 1) { | |
607 voff &= ~1; | |
608 #endif | |
609 return *(void**)(*(char**)obj + voff); | |
610 } else { | |
611 return ptr; | |
612 } | |
613 } | |
614 | |
615 #define BODY \ | |
616 PtmfHelper result; \ | |
617 static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \ | |
618 memcpy(&result, &p, sizeof(result)); \ | |
619 return result | |
620 | |
621 #else // __GNUG__ | |
622 | |
623 void* apply(const void* obj) { return nullptr; } | |
624 // TODO(port): PTMF instruction address extraction | |
625 | |
626 #define BODY return PtmfHelper{} | |
627 | |
628 #endif // __GNUG__, else | |
629 | |
630 template <typename R, typename C, typename... P, typename F> | |
631 static PtmfHelper from(F p) { BODY; } | |
632 // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not | |
633 // overloaded nor a template. In this case the compiler is able to deduce the full function | |
634 // signature directly given the name since there is only one function with that name. | |
635 | |
636 template <typename R, typename C, typename... P> | |
637 static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; } | |
638 template <typename R, typename C, typename... P> | |
639 static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; } | |
640 // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case | |
641 // the function must match exactly the containing type C, return type R, and parameter types P... | |
642 // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a | |
643 // guess at P. Luckily, if the function parameters are template parameters then it's not | |
644 // necessary to be precise about P. | |
645 #undef BODY | |
646 }; | |
647 | |
648 #if __GNUC__ >= 8 && !__clang__ | |
649 #pragma GCC diagnostic pop | |
650 #endif | |
651 | |
652 template <typename T, typename ReturnType, typename... ParamTypes> | |
653 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) { | |
654 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj); | |
655 } | |
656 template <typename T, typename ReturnType, typename... ParamTypes> | |
657 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) { | |
658 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj); | |
659 } | |
660 | |
661 template <typename... ParamTypes> | |
662 struct GetFunctorStartAddress { | |
663 // Given a functor (any object defining operator()), return the start address of the function, | |
664 // suitable for passing to addr2line to obtain a source file/line for debugging purposes. | |
665 // | |
666 // This turns out to be incredibly hard to implement in the presence of overloaded or templated | |
667 // functors. Therefore, we impose these specific restrictions, specific to our use case: | |
668 // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas | |
669 // anyway.) | |
670 // - The template parameters to GetFunctorStartAddress specify a hint as to the expected | |
671 // parameter types. If the functor is templated, its parameters must match exactly these types. | |
672 // (If it's not templated, ParamTypes are ignored.) | |
673 | |
674 template <typename Func> | |
675 static void* apply(Func&& func) { | |
676 typedef decltype(func(instance<ParamTypes>()...)) ReturnType; | |
677 return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>( | |
678 &Decay<Func>::operator()).apply(&func); | |
679 } | |
680 }; | |
681 | |
682 template <> | |
683 struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {}; | |
684 // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function | |
685 // actually has no parameters. | |
686 | |
687 class TransformPromiseNodeBase: public PromiseNode { | |
688 public: | |
689 TransformPromiseNodeBase(OwnPromiseNode&& dependency, void* continuationTracePtr); | |
690 | |
691 void onReady(Event* event) noexcept override; | |
692 void get(ExceptionOrValue& output) noexcept override; | |
693 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
694 | |
695 private: | |
696 OwnPromiseNode dependency; | |
697 void* continuationTracePtr; | |
698 | |
699 void dropDependency(); | |
700 void getDepResult(ExceptionOrValue& output); | |
701 | |
702 virtual void getImpl(ExceptionOrValue& output) = 0; | |
703 | |
704 template <typename, typename, typename, typename> | |
705 friend class TransformPromiseNode; | |
706 }; | |
707 | |
708 template <typename T, typename DepT, typename Func, typename ErrorFunc> | |
709 class TransformPromiseNode final: public TransformPromiseNodeBase { | |
710 // A PromiseNode that transforms the result of another PromiseNode through an application-provided | |
711 // function (implements `then()`). | |
712 | |
713 public: | |
714 TransformPromiseNode(OwnPromiseNode&& dependency, Func&& func, ErrorFunc&& errorHandler, | |
715 void* continuationTracePtr) | |
716 : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr), | |
717 func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {} | |
718 void destroy() override { freePromise(this); } | |
719 | |
720 ~TransformPromiseNode() noexcept(false) { | |
721 // We need to make sure the dependency is deleted before we delete the continuations because it | |
722 // is a common pattern for the continuations to hold ownership of objects that might be in-use | |
723 // by the dependency. | |
724 dropDependency(); | |
725 } | |
726 | |
727 private: | |
728 Func func; | |
729 ErrorFunc errorHandler; | |
730 | |
731 void getImpl(ExceptionOrValue& output) override { | |
732 ExceptionOr<DepT> depResult; | |
733 getDepResult(depResult); | |
734 KJ_IF_MAYBE(depException, depResult.exception) { | |
735 output.as<T>() = handle( | |
736 MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply( | |
737 errorHandler, kj::mv(*depException))); | |
738 } else KJ_IF_MAYBE(depValue, depResult.value) { | |
739 output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue))); | |
740 } | |
741 } | |
742 | |
743 ExceptionOr<T> handle(T&& value) { | |
744 return kj::mv(value); | |
745 } | |
746 ExceptionOr<T> handle(PropagateException::Bottom&& value) { | |
747 return ExceptionOr<T>(false, value.asException()); | |
748 } | |
749 }; | |
750 | |
751 // ------------------------------------------------------------------- | |
752 | |
753 class ForkHubBase; | |
754 using OwnForkHubBase = Own<ForkHubBase, ForkHubBase>; | |
755 | |
756 class ForkBranchBase: public PromiseNode { | |
757 public: | |
758 ForkBranchBase(OwnForkHubBase&& hub); | |
759 ~ForkBranchBase() noexcept(false); | |
760 | |
761 void hubReady() noexcept; | |
762 // Called by the hub to indicate that it is ready. | |
763 | |
764 // implements PromiseNode ------------------------------------------ | |
765 void onReady(Event* event) noexcept override; | |
766 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
767 | |
768 protected: | |
769 inline ExceptionOrValue& getHubResultRef(); | |
770 | |
771 void releaseHub(ExceptionOrValue& output); | |
772 // Release the hub. If an exception is thrown, add it to `output`. | |
773 | |
774 private: | |
775 OnReadyEvent onReadyEvent; | |
776 | |
777 OwnForkHubBase hub; | |
778 ForkBranchBase* next = nullptr; | |
779 ForkBranchBase** prevPtr = nullptr; | |
780 | |
781 friend class ForkHubBase; | |
782 }; | |
783 | |
784 template <typename T> T copyOrAddRef(T& t) { return t; } | |
785 template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); } | |
786 template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) { | |
787 return t.map([](Own<T>& ptr) { | |
788 return ptr->addRef(); | |
789 }); | |
790 } | |
791 | |
792 template <typename T> | |
793 class ForkBranch final: public ForkBranchBase { | |
794 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives | |
795 // a const reference. | |
796 | |
797 public: | |
798 ForkBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {} | |
799 void destroy() override { freePromise(this); } | |
800 | |
801 void get(ExceptionOrValue& output) noexcept override { | |
802 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); | |
803 KJ_IF_MAYBE(value, hubResult.value) { | |
804 output.as<T>().value = copyOrAddRef(*value); | |
805 } else { | |
806 output.as<T>().value = nullptr; | |
807 } | |
808 output.exception = hubResult.exception; | |
809 releaseHub(output); | |
810 } | |
811 }; | |
812 | |
813 template <typename T, size_t index> | |
814 class SplitBranch final: public ForkBranchBase { | |
815 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives | |
816 // a const reference. | |
817 | |
818 public: | |
819 SplitBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {} | |
820 void destroy() override { freePromise(this); } | |
821 | |
822 typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element; | |
823 | |
824 void get(ExceptionOrValue& output) noexcept override { | |
825 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); | |
826 KJ_IF_MAYBE(value, hubResult.value) { | |
827 output.as<Element>().value = kj::mv(kj::get<index>(*value)); | |
828 } else { | |
829 output.as<Element>().value = nullptr; | |
830 } | |
831 output.exception = hubResult.exception; | |
832 releaseHub(output); | |
833 } | |
834 }; | |
835 | |
836 // ------------------------------------------------------------------- | |
837 | |
838 class ForkHubBase: public PromiseArenaMember, protected Event { | |
839 public: | |
840 ForkHubBase(OwnPromiseNode&& inner, ExceptionOrValue& resultRef, SourceLocation location); | |
841 | |
842 inline ExceptionOrValue& getResultRef() { return resultRef; } | |
843 | |
844 inline bool isShared() const { return refcount > 1; } | |
845 | |
846 Own<ForkHubBase, ForkHubBase> addRef() { | |
847 ++refcount; | |
848 return Own<ForkHubBase, ForkHubBase>(this); | |
849 } | |
850 | |
851 static void dispose(ForkHubBase* obj) { | |
852 if (--obj->refcount == 0) { | |
853 PromiseDisposer::dispose(obj); | |
854 } | |
855 } | |
856 | |
857 private: | |
858 uint refcount = 1; | |
859 // We manually implement refcounting for ForkHubBase so that we can use it together with | |
860 // PromiseDisposer's arena allocation. | |
861 | |
862 OwnPromiseNode inner; | |
863 ExceptionOrValue& resultRef; | |
864 | |
865 ForkBranchBase* headBranch = nullptr; | |
866 ForkBranchBase** tailBranch = &headBranch; | |
867 // Tail becomes null once the inner promise is ready and all branches have been notified. | |
868 | |
869 Maybe<Own<Event>> fire() override; | |
870 void traceEvent(TraceBuilder& builder) override; | |
871 | |
872 friend class ForkBranchBase; | |
873 }; | |
874 | |
875 template <typename T> | |
876 class ForkHub final: public ForkHubBase { | |
877 // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces | |
878 // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if | |
879 // possible). | |
880 | |
881 public: | |
882 ForkHub(OwnPromiseNode&& inner, SourceLocation location) | |
883 : ForkHubBase(kj::mv(inner), result, location) {} | |
884 void destroy() override { freePromise(this); } | |
885 | |
886 Promise<_::UnfixVoid<T>> addBranch() { | |
887 return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>( | |
888 allocPromise<ForkBranch<T>>(addRef())); | |
889 } | |
890 | |
891 _::SplitTuplePromise<T> split(SourceLocation location) { | |
892 return splitImpl(MakeIndexes<tupleSize<T>()>(), location); | |
893 } | |
894 | |
895 private: | |
896 ExceptionOr<T> result; | |
897 | |
898 template <size_t... indexes> | |
899 _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>, SourceLocation location) { | |
900 return kj::tuple(addSplit<indexes>(location)...); | |
901 } | |
902 | |
903 template <size_t index> | |
904 ReducePromises<typename SplitBranch<T, index>::Element> addSplit(SourceLocation location) { | |
905 return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>( | |
906 maybeChain(allocPromise<SplitBranch<T, index>>(addRef()), | |
907 implicitCast<typename SplitBranch<T, index>::Element*>(nullptr), | |
908 location)); | |
909 } | |
910 }; | |
911 | |
912 inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { | |
913 return hub->getResultRef(); | |
914 } | |
915 | |
916 // ------------------------------------------------------------------- | |
917 | |
918 class ChainPromiseNode final: public PromiseNode, public Event { | |
919 // Promise node which reduces Promise<Promise<T>> to Promise<T>. | |
920 // | |
921 // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to | |
922 // Own<Event>. Ugh, templates and private... | |
923 | |
924 public: | |
925 explicit ChainPromiseNode(OwnPromiseNode inner, SourceLocation location); | |
926 ~ChainPromiseNode() noexcept(false); | |
927 void destroy() override; | |
928 | |
929 void onReady(Event* event) noexcept override; | |
930 void setSelfPointer(OwnPromiseNode* selfPtr) noexcept override; | |
931 void get(ExceptionOrValue& output) noexcept override; | |
932 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
933 | |
934 private: | |
935 enum State { | |
936 STEP1, | |
937 STEP2 | |
938 }; | |
939 | |
940 State state; | |
941 | |
942 OwnPromiseNode inner; | |
943 // In STEP1, a PromiseNode for a Promise<T>. | |
944 // In STEP2, a PromiseNode for a T. | |
945 | |
946 Event* onReadyEvent = nullptr; | |
947 OwnPromiseNode* selfPtr = nullptr; | |
948 | |
949 Maybe<Own<Event>> fire() override; | |
950 void traceEvent(TraceBuilder& builder) override; | |
951 }; | |
952 | |
953 template <typename T> | |
954 OwnPromiseNode maybeChain(OwnPromiseNode&& node, Promise<T>*, SourceLocation location) { | |
955 return appendPromise<ChainPromiseNode>(kj::mv(node), location); | |
956 } | |
957 | |
958 template <typename T> | |
959 OwnPromiseNode&& maybeChain(OwnPromiseNode&& node, T*, SourceLocation location) { | |
960 return kj::mv(node); | |
961 } | |
962 | |
963 template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))> | |
964 inline Result maybeReduce(Promise<T>&& promise, bool) { | |
965 return T::reducePromise(kj::mv(promise)); | |
966 } | |
967 | |
968 template <typename T> | |
969 inline Promise<T> maybeReduce(Promise<T>&& promise, ...) { | |
970 return kj::mv(promise); | |
971 } | |
972 | |
973 // ------------------------------------------------------------------- | |
974 | |
975 class ExclusiveJoinPromiseNode final: public PromiseNode { | |
976 public: | |
977 ExclusiveJoinPromiseNode(OwnPromiseNode left, OwnPromiseNode right, SourceLocation location); | |
978 ~ExclusiveJoinPromiseNode() noexcept(false); | |
979 void destroy() override; | |
980 | |
981 void onReady(Event* event) noexcept override; | |
982 void get(ExceptionOrValue& output) noexcept override; | |
983 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
984 | |
985 private: | |
986 class Branch: public Event { | |
987 public: | |
988 Branch(ExclusiveJoinPromiseNode& joinNode, OwnPromiseNode dependency, | |
989 SourceLocation location); | |
990 ~Branch() noexcept(false); | |
991 | |
992 bool get(ExceptionOrValue& output); | |
993 // Returns true if this is the side that finished. | |
994 | |
995 Maybe<Own<Event>> fire() override; | |
996 void traceEvent(TraceBuilder& builder) override; | |
997 | |
998 private: | |
999 ExclusiveJoinPromiseNode& joinNode; | |
1000 OwnPromiseNode dependency; | |
1001 | |
1002 friend class ExclusiveJoinPromiseNode; | |
1003 }; | |
1004 | |
1005 Branch left; | |
1006 Branch right; | |
1007 OnReadyEvent onReadyEvent; | |
1008 }; | |
1009 | |
1010 // ------------------------------------------------------------------- | |
1011 | |
1012 enum class ArrayJoinBehavior { | |
1013 LAZY, | |
1014 EAGER, | |
1015 }; | |
1016 | |
1017 class ArrayJoinPromiseNodeBase: public PromiseNode { | |
1018 public: | |
1019 ArrayJoinPromiseNodeBase(Array<OwnPromiseNode> promises, | |
1020 ExceptionOrValue* resultParts, size_t partSize, | |
1021 SourceLocation location, | |
1022 ArrayJoinBehavior joinBehavior); | |
1023 ~ArrayJoinPromiseNodeBase() noexcept(false); | |
1024 | |
1025 void onReady(Event* event) noexcept override final; | |
1026 void get(ExceptionOrValue& output) noexcept override final; | |
1027 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final; | |
1028 | |
1029 protected: | |
1030 virtual void getNoError(ExceptionOrValue& output) noexcept = 0; | |
1031 // Called to compile the result only in the case where there were no errors. | |
1032 | |
1033 private: | |
1034 const ArrayJoinBehavior joinBehavior; | |
1035 | |
1036 uint countLeft; | |
1037 OnReadyEvent onReadyEvent; | |
1038 bool armed = false; | |
1039 | |
1040 class Branch final: public Event { | |
1041 public: | |
1042 Branch(ArrayJoinPromiseNodeBase& joinNode, OwnPromiseNode dependency, | |
1043 ExceptionOrValue& output, SourceLocation location); | |
1044 ~Branch() noexcept(false); | |
1045 | |
1046 Maybe<Own<Event>> fire() override; | |
1047 void traceEvent(TraceBuilder& builder) override; | |
1048 | |
1049 private: | |
1050 ArrayJoinPromiseNodeBase& joinNode; | |
1051 OwnPromiseNode dependency; | |
1052 ExceptionOrValue& output; | |
1053 | |
1054 friend class ArrayJoinPromiseNodeBase; | |
1055 }; | |
1056 | |
1057 Array<Branch> branches; | |
1058 }; | |
1059 | |
1060 template <typename T> | |
1061 class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase { | |
1062 public: | |
1063 ArrayJoinPromiseNode(Array<OwnPromiseNode> promises, | |
1064 Array<ExceptionOr<T>> resultParts, | |
1065 SourceLocation location, | |
1066 ArrayJoinBehavior joinBehavior) | |
1067 : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>), | |
1068 location, joinBehavior), | |
1069 resultParts(kj::mv(resultParts)) {} | |
1070 void destroy() override { freePromise(this); } | |
1071 | |
1072 protected: | |
1073 void getNoError(ExceptionOrValue& output) noexcept override { | |
1074 auto builder = heapArrayBuilder<T>(resultParts.size()); | |
1075 for (auto& part: resultParts) { | |
1076 KJ_IASSERT(part.value != nullptr, | |
1077 "Bug in KJ promise framework: Promise result had neither value no exception."); | |
1078 builder.add(kj::mv(*_::readMaybe(part.value))); | |
1079 } | |
1080 output.as<Array<T>>() = builder.finish(); | |
1081 } | |
1082 | |
1083 private: | |
1084 Array<ExceptionOr<T>> resultParts; | |
1085 }; | |
1086 | |
1087 template <> | |
1088 class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase { | |
1089 public: | |
1090 ArrayJoinPromiseNode(Array<OwnPromiseNode> promises, | |
1091 Array<ExceptionOr<_::Void>> resultParts, | |
1092 SourceLocation location, | |
1093 ArrayJoinBehavior joinBehavior); | |
1094 ~ArrayJoinPromiseNode(); | |
1095 void destroy() override; | |
1096 | |
1097 protected: | |
1098 void getNoError(ExceptionOrValue& output) noexcept override; | |
1099 | |
1100 private: | |
1101 Array<ExceptionOr<_::Void>> resultParts; | |
1102 }; | |
1103 | |
1104 // ------------------------------------------------------------------- | |
1105 | |
1106 class EagerPromiseNodeBase: public PromiseNode, protected Event { | |
1107 // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly | |
1108 // evaluate it. | |
1109 | |
1110 public: | |
1111 EagerPromiseNodeBase(OwnPromiseNode&& dependency, ExceptionOrValue& resultRef, | |
1112 SourceLocation location); | |
1113 | |
1114 void onReady(Event* event) noexcept override; | |
1115 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
1116 | |
1117 private: | |
1118 OwnPromiseNode dependency; | |
1119 OnReadyEvent onReadyEvent; | |
1120 | |
1121 ExceptionOrValue& resultRef; | |
1122 | |
1123 Maybe<Own<Event>> fire() override; | |
1124 void traceEvent(TraceBuilder& builder) override; | |
1125 }; | |
1126 | |
1127 template <typename T> | |
1128 class EagerPromiseNode final: public EagerPromiseNodeBase { | |
1129 public: | |
1130 EagerPromiseNode(OwnPromiseNode&& dependency, SourceLocation location) | |
1131 : EagerPromiseNodeBase(kj::mv(dependency), result, location) {} | |
1132 void destroy() override { freePromise(this); } | |
1133 | |
1134 void get(ExceptionOrValue& output) noexcept override { | |
1135 output.as<T>() = kj::mv(result); | |
1136 } | |
1137 | |
1138 private: | |
1139 ExceptionOr<T> result; | |
1140 }; | |
1141 | |
1142 template <typename T> | |
1143 OwnPromiseNode spark(OwnPromiseNode&& node, SourceLocation location) { | |
1144 // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting | |
1145 // on it. | |
1146 return appendPromise<EagerPromiseNode<T>>(kj::mv(node), location); | |
1147 } | |
1148 | |
1149 // ------------------------------------------------------------------- | |
1150 | |
1151 class AdapterPromiseNodeBase: public PromiseNode { | |
1152 public: | |
1153 void onReady(Event* event) noexcept override; | |
1154 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
1155 | |
1156 protected: | |
1157 inline void setReady() { | |
1158 onReadyEvent.arm(); | |
1159 } | |
1160 | |
1161 private: | |
1162 OnReadyEvent onReadyEvent; | |
1163 }; | |
1164 | |
1165 template <typename T, typename Adapter> | |
1166 class AdapterPromiseNode final: public AdapterPromiseNodeBase, | |
1167 private PromiseFulfiller<UnfixVoid<T>> { | |
1168 // A PromiseNode that wraps a PromiseAdapter. | |
1169 | |
1170 public: | |
1171 template <typename... Params> | |
1172 AdapterPromiseNode(Params&&... params) | |
1173 : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {} | |
1174 void destroy() override { freePromise(this); } | |
1175 | |
1176 void get(ExceptionOrValue& output) noexcept override { | |
1177 KJ_IREQUIRE(!isWaiting()); | |
1178 output.as<T>() = kj::mv(result); | |
1179 } | |
1180 | |
1181 private: | |
1182 ExceptionOr<T> result; | |
1183 bool waiting = true; | |
1184 Adapter adapter; | |
1185 | |
1186 void fulfill(T&& value) override { | |
1187 if (waiting) { | |
1188 waiting = false; | |
1189 result = ExceptionOr<T>(kj::mv(value)); | |
1190 setReady(); | |
1191 } | |
1192 } | |
1193 | |
1194 void reject(Exception&& exception) override { | |
1195 if (waiting) { | |
1196 waiting = false; | |
1197 result = ExceptionOr<T>(false, kj::mv(exception)); | |
1198 setReady(); | |
1199 } | |
1200 } | |
1201 | |
1202 bool isWaiting() override { | |
1203 return waiting; | |
1204 } | |
1205 }; | |
1206 | |
1207 // ------------------------------------------------------------------- | |
1208 | |
1209 class FiberBase: public PromiseNode, private Event { | |
1210 // Base class for the outer PromiseNode representing a fiber. | |
1211 | |
1212 public: | |
1213 explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location); | |
1214 explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location); | |
1215 ~FiberBase() noexcept(false); | |
1216 | |
1217 void start() { armDepthFirst(); } | |
1218 // Call immediately after construction to begin executing the fiber. | |
1219 | |
1220 class WaitDoneEvent; | |
1221 | |
1222 void onReady(_::Event* event) noexcept override; | |
1223 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
1224 | |
1225 protected: | |
1226 bool isFinished() { return state == FINISHED; } | |
1227 void cancel(); | |
1228 | |
1229 private: | |
1230 enum { WAITING, RUNNING, CANCELED, FINISHED } state; | |
1231 | |
1232 _::PromiseNode* currentInner = nullptr; | |
1233 OnReadyEvent onReadyEvent; | |
1234 Own<FiberStack> stack; | |
1235 _::ExceptionOrValue& result; | |
1236 | |
1237 void run(); | |
1238 virtual void runImpl(WaitScope& waitScope) = 0; | |
1239 | |
1240 Maybe<Own<Event>> fire() override; | |
1241 void traceEvent(TraceBuilder& builder) override; | |
1242 // Implements Event. Each time the event is fired, switchToFiber() is called. | |
1243 | |
1244 friend class FiberStack; | |
1245 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result, | |
1246 WaitScope& waitScope, SourceLocation location); | |
1247 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); | |
1248 }; | |
1249 | |
1250 template <typename Func> | |
1251 class Fiber final: public FiberBase { | |
1252 public: | |
1253 explicit Fiber(size_t stackSize, Func&& func, SourceLocation location) | |
1254 : FiberBase(stackSize, result, location), func(kj::fwd<Func>(func)) {} | |
1255 explicit Fiber(const FiberPool& pool, Func&& func, SourceLocation location) | |
1256 : FiberBase(pool, result, location), func(kj::fwd<Func>(func)) {} | |
1257 ~Fiber() noexcept(false) { cancel(); } | |
1258 void destroy() override { freePromise(this); } | |
1259 | |
1260 typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType; | |
1261 | |
1262 void get(ExceptionOrValue& output) noexcept override { | |
1263 KJ_IREQUIRE(isFinished()); | |
1264 output.as<ResultType>() = kj::mv(result); | |
1265 } | |
1266 | |
1267 private: | |
1268 Func func; | |
1269 ExceptionOr<ResultType> result; | |
1270 | |
1271 void runImpl(WaitScope& waitScope) override { | |
1272 result.template as<ResultType>() = | |
1273 MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope); | |
1274 } | |
1275 }; | |
1276 | |
1277 } // namespace _ (private) | |
1278 | |
1279 // ======================================================================================= | |
1280 | |
1281 template <typename T> | |
1282 Promise<T>::Promise(_::FixVoid<T> value) | |
1283 : PromiseBase(_::allocPromise<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {} | |
1284 | |
1285 template <typename T> | |
1286 Promise<T>::Promise(kj::Exception&& exception) | |
1287 : PromiseBase(_::allocPromise<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {} | |
1288 | |
1289 template <typename T> | |
1290 template <typename Func, typename ErrorFunc> | |
1291 PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler, | |
1292 SourceLocation location) { | |
1293 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; | |
1294 | |
1295 void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func); | |
1296 _::OwnPromiseNode intermediate = | |
1297 _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( | |
1298 kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler), | |
1299 continuationTracePtr); | |
1300 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>( | |
1301 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); | |
1302 return _::maybeReduce(kj::mv(result), false); | |
1303 } | |
1304 | |
1305 namespace _ { // private | |
1306 | |
1307 template <typename T> | |
1308 struct IdentityFunc { | |
1309 inline T operator()(T&& value) const { | |
1310 return kj::mv(value); | |
1311 } | |
1312 }; | |
1313 template <typename T> | |
1314 struct IdentityFunc<Promise<T>> { | |
1315 inline Promise<T> operator()(T&& value) const { | |
1316 return kj::mv(value); | |
1317 } | |
1318 }; | |
1319 template <> | |
1320 struct IdentityFunc<void> { | |
1321 inline void operator()() const {} | |
1322 }; | |
1323 template <> | |
1324 struct IdentityFunc<Promise<void>> { | |
1325 Promise<void> operator()() const; | |
1326 // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly, | |
1327 // Cap'n Proto relies on being able to include this header without creating such a link-time | |
1328 // dependency. | |
1329 }; | |
1330 | |
1331 } // namespace _ (private) | |
1332 | |
1333 template <typename T> | |
1334 template <typename ErrorFunc> | |
1335 Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler, SourceLocation location) { | |
1336 // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case, | |
1337 // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise, | |
1338 // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually | |
1339 // return a promise. So we make our Func return match ErrorFunc. | |
1340 typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func; | |
1341 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; | |
1342 | |
1343 // The reason catch_() isn't simply implemented in terms of then() is because we want the trace | |
1344 // pointer to be based on ErrorFunc rather than Func. | |
1345 void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler); | |
1346 _::OwnPromiseNode intermediate = | |
1347 _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( | |
1348 kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr); | |
1349 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>( | |
1350 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); | |
1351 return _::maybeReduce(kj::mv(result), false); | |
1352 } | |
1353 | |
1354 template <typename T> | |
1355 T Promise<T>::wait(WaitScope& waitScope, SourceLocation location) { | |
1356 _::ExceptionOr<_::FixVoid<T>> result; | |
1357 _::waitImpl(kj::mv(node), result, waitScope, location); | |
1358 return convertToReturn(kj::mv(result)); | |
1359 } | |
1360 | |
1361 template <typename T> | |
1362 bool Promise<T>::poll(WaitScope& waitScope, SourceLocation location) { | |
1363 return _::pollImpl(*node, waitScope, location); | |
1364 } | |
1365 | |
1366 template <typename T> | |
1367 ForkedPromise<T> Promise<T>::fork(SourceLocation location) { | |
1368 return ForkedPromise<T>(false, | |
1369 _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>(kj::mv(node), location)); | |
1370 } | |
1371 | |
1372 template <typename T> | |
1373 Promise<T> ForkedPromise<T>::addBranch() { | |
1374 return hub->addBranch(); | |
1375 } | |
1376 | |
1377 template <typename T> | |
1378 bool ForkedPromise<T>::hasBranches() { | |
1379 return hub->isShared(); | |
1380 } | |
1381 | |
1382 template <typename T> | |
1383 _::SplitTuplePromise<T> Promise<T>::split(SourceLocation location) { | |
1384 return _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>( | |
1385 kj::mv(node), location)->split(location); | |
1386 } | |
1387 | |
1388 template <typename T> | |
1389 Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other, SourceLocation location) { | |
1390 return Promise(false, _::appendPromise<_::ExclusiveJoinPromiseNode>( | |
1391 kj::mv(node), kj::mv(other.node), location)); | |
1392 } | |
1393 | |
1394 template <typename T> | |
1395 template <typename... Attachments> | |
1396 Promise<T> Promise<T>::attach(Attachments&&... attachments) { | |
1397 return Promise(false, _::appendPromise<_::AttachmentPromiseNode<Tuple<Attachments...>>>( | |
1398 kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...))); | |
1399 } | |
1400 | |
1401 template <typename T> | |
1402 template <typename ErrorFunc> | |
1403 Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location) { | |
1404 // See catch_() for commentary. | |
1405 return Promise(false, _::spark<_::FixVoid<T>>(then( | |
1406 _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(), | |
1407 kj::fwd<ErrorFunc>(errorHandler)).node, location)); | |
1408 } | |
1409 | |
1410 template <typename T> | |
1411 Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr), SourceLocation location) { | |
1412 return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node), location)); | |
1413 } | |
1414 | |
1415 template <typename T> | |
1416 kj::String Promise<T>::trace() { | |
1417 return PromiseBase::trace(); | |
1418 } | |
1419 | |
1420 template <typename T, T value> | |
1421 inline Promise<T> constPromise() { | |
1422 static _::ConstPromiseNode<T, value> NODE; | |
1423 return _::PromiseNode::to<Promise<T>>(_::OwnPromiseNode(&NODE)); | |
1424 } | |
1425 | |
1426 template <typename Func> | |
1427 inline PromiseForResult<Func, void> evalLater(Func&& func) { | |
1428 return _::yield().then(kj::fwd<Func>(func), _::PropagateException()); | |
1429 } | |
1430 | |
1431 template <typename Func> | |
1432 inline PromiseForResult<Func, void> evalLast(Func&& func) { | |
1433 return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException()); | |
1434 } | |
1435 | |
1436 template <typename Func> | |
1437 inline PromiseForResult<Func, void> evalNow(Func&& func) { | |
1438 PromiseForResult<Func, void> result = nullptr; | |
1439 KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() { | |
1440 result = func(); | |
1441 })) { | |
1442 result = kj::mv(*e); | |
1443 } | |
1444 return result; | |
1445 } | |
1446 | |
1447 template <typename Func> | |
1448 struct RetryOnDisconnect_ { | |
1449 static inline PromiseForResult<Func, void> apply(Func&& func) { | |
1450 return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> { | |
1451 auto promise = evalNow(func); | |
1452 return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> { | |
1453 if (e.getType() == kj::Exception::Type::DISCONNECTED) { | |
1454 return func(); | |
1455 } else { | |
1456 return kj::mv(e); | |
1457 } | |
1458 }); | |
1459 }); | |
1460 } | |
1461 }; | |
1462 template <typename Func> | |
1463 struct RetryOnDisconnect_<Func&> { | |
1464 // Specialization for references. Needed because the syntax for capturing references in a | |
1465 // lambda is different. :( | |
1466 static inline PromiseForResult<Func, void> apply(Func& func) { | |
1467 auto promise = evalLater(func); | |
1468 return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> { | |
1469 if (e.getType() == kj::Exception::Type::DISCONNECTED) { | |
1470 return func(); | |
1471 } else { | |
1472 return kj::mv(e); | |
1473 } | |
1474 }); | |
1475 } | |
1476 }; | |
1477 | |
1478 template <typename Func> | |
1479 inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) { | |
1480 return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func)); | |
1481 } | |
1482 | |
1483 template <typename Func> | |
1484 inline PromiseForResult<Func, WaitScope&> startFiber( | |
1485 size_t stackSize, Func&& func, SourceLocation location) { | |
1486 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT; | |
1487 | |
1488 auto intermediate = _::allocPromise<_::Fiber<Func>>( | |
1489 stackSize, kj::fwd<Func>(func), location); | |
1490 intermediate->start(); | |
1491 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>( | |
1492 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); | |
1493 return _::maybeReduce(kj::mv(result), false); | |
1494 } | |
1495 | |
1496 template <typename Func> | |
1497 inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber( | |
1498 Func&& func, SourceLocation location) const { | |
1499 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT; | |
1500 | |
1501 auto intermediate = _::allocPromise<_::Fiber<Func>>( | |
1502 *this, kj::fwd<Func>(func), location); | |
1503 intermediate->start(); | |
1504 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>( | |
1505 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); | |
1506 return _::maybeReduce(kj::mv(result), false); | |
1507 } | |
1508 | |
1509 template <typename T> | |
1510 template <typename ErrorFunc> | |
1511 void Promise<T>::detach(ErrorFunc&& errorHandler) { | |
1512 return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler))); | |
1513 } | |
1514 | |
1515 template <> | |
1516 template <typename ErrorFunc> | |
1517 void Promise<void>::detach(ErrorFunc&& errorHandler) { | |
1518 return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler))); | |
1519 } | |
1520 | |
1521 template <typename T> | |
1522 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location) { | |
1523 return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>( | |
1524 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, | |
1525 heapArray<_::ExceptionOr<T>>(promises.size()), location, | |
1526 _::ArrayJoinBehavior::LAZY)); | |
1527 } | |
1528 | |
1529 template <typename T> | |
1530 Promise<Array<T>> joinPromisesFailFast(Array<Promise<T>>&& promises, SourceLocation location) { | |
1531 return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>( | |
1532 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, | |
1533 heapArray<_::ExceptionOr<T>>(promises.size()), location, | |
1534 _::ArrayJoinBehavior::EAGER)); | |
1535 } | |
1536 | |
1537 // ======================================================================================= | |
1538 | |
1539 namespace _ { // private | |
1540 | |
1541 class WeakFulfillerBase: protected kj::Disposer { | |
1542 protected: | |
1543 WeakFulfillerBase(): inner(nullptr) {} | |
1544 virtual ~WeakFulfillerBase() noexcept(false) {} | |
1545 | |
1546 template <typename T> | |
1547 inline PromiseFulfiller<T>* getInner() { | |
1548 return static_cast<PromiseFulfiller<T>*>(inner); | |
1549 }; | |
1550 template <typename T> | |
1551 inline void setInner(PromiseFulfiller<T>* ptr) { | |
1552 inner = ptr; | |
1553 }; | |
1554 | |
1555 private: | |
1556 mutable PromiseRejector* inner; | |
1557 | |
1558 void disposeImpl(void* pointer) const override; | |
1559 }; | |
1560 | |
1561 template <typename T> | |
1562 class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase { | |
1563 // A wrapper around PromiseFulfiller which can be detached. | |
1564 // | |
1565 // There are a couple non-trivialities here: | |
1566 // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly | |
1567 // rejected. | |
1568 // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been | |
1569 // detached from the underlying fulfiller, because otherwise the later detach() call will go | |
1570 // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the | |
1571 // refcount never goes over 2 and we manually implement the refcounting because we need to do | |
1572 // other special things when each side detaches anyway. To this end, WeakFulfiller is its own | |
1573 // Disposer -- dispose() is called when the application discards its owned pointer to the | |
1574 // fulfiller and detach() is called when the promise is destroyed. | |
1575 | |
1576 public: | |
1577 KJ_DISALLOW_COPY_AND_MOVE(WeakFulfiller); | |
1578 | |
1579 static kj::Own<WeakFulfiller> make() { | |
1580 WeakFulfiller* ptr = new WeakFulfiller; | |
1581 return Own<WeakFulfiller>(ptr, *ptr); | |
1582 } | |
1583 | |
1584 void fulfill(FixVoid<T>&& value) override { | |
1585 if (getInner<T>() != nullptr) { | |
1586 getInner<T>()->fulfill(kj::mv(value)); | |
1587 } | |
1588 } | |
1589 | |
1590 void reject(Exception&& exception) override { | |
1591 if (getInner<T>() != nullptr) { | |
1592 getInner<T>()->reject(kj::mv(exception)); | |
1593 } | |
1594 } | |
1595 | |
1596 bool isWaiting() override { | |
1597 return getInner<T>() != nullptr && getInner<T>()->isWaiting(); | |
1598 } | |
1599 | |
1600 void attach(PromiseFulfiller<T>& newInner) { | |
1601 setInner<T>(&newInner); | |
1602 } | |
1603 | |
1604 void detach(PromiseFulfiller<T>& from) { | |
1605 if (getInner<T>() == nullptr) { | |
1606 // Already disposed. | |
1607 delete this; | |
1608 } else { | |
1609 KJ_IREQUIRE(getInner<T>() == &from); | |
1610 setInner<T>(nullptr); | |
1611 } | |
1612 } | |
1613 | |
1614 private: | |
1615 WeakFulfiller() {} | |
1616 }; | |
1617 | |
1618 template <typename T> | |
1619 class PromiseAndFulfillerAdapter { | |
1620 public: | |
1621 PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller, | |
1622 WeakFulfiller<T>& wrapper) | |
1623 : fulfiller(fulfiller), wrapper(wrapper) { | |
1624 wrapper.attach(fulfiller); | |
1625 } | |
1626 | |
1627 ~PromiseAndFulfillerAdapter() noexcept(false) { | |
1628 wrapper.detach(fulfiller); | |
1629 } | |
1630 | |
1631 private: | |
1632 PromiseFulfiller<T>& fulfiller; | |
1633 WeakFulfiller<T>& wrapper; | |
1634 }; | |
1635 | |
1636 } // namespace _ (private) | |
1637 | |
1638 template <typename T> | |
1639 template <typename Func> | |
1640 bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) { | |
1641 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { | |
1642 reject(kj::mv(*exception)); | |
1643 return false; | |
1644 } else { | |
1645 return true; | |
1646 } | |
1647 } | |
1648 | |
1649 template <typename Func> | |
1650 bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) { | |
1651 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { | |
1652 reject(kj::mv(*exception)); | |
1653 return false; | |
1654 } else { | |
1655 return true; | |
1656 } | |
1657 } | |
1658 | |
1659 template <typename T, typename Adapter, typename... Params> | |
1660 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) { | |
1661 _::OwnPromiseNode intermediate( | |
1662 _::allocPromise<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>( | |
1663 kj::fwd<Params>(adapterConstructorParams)...)); | |
1664 // We can't capture SourceLocation in this function's arguments since it is a vararg template. :( | |
1665 return _::PromiseNode::to<_::ReducePromises<T>>( | |
1666 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), SourceLocation())); | |
1667 } | |
1668 | |
1669 template <typename T> | |
1670 PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location) { | |
1671 auto wrapper = _::WeakFulfiller<T>::make(); | |
1672 | |
1673 _::OwnPromiseNode intermediate( | |
1674 _::allocPromise<_::AdapterPromiseNode< | |
1675 _::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper)); | |
1676 auto promise = _::PromiseNode::to<_::ReducePromises<T>>( | |
1677 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), location)); | |
1678 | |
1679 return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) }; | |
1680 } | |
1681 | |
1682 // ======================================================================================= | |
1683 // cross-thread stuff | |
1684 | |
1685 namespace _ { // (private) | |
1686 | |
1687 class XThreadEvent: public PromiseNode, // it's a PromiseNode in the requesting thread | |
1688 private Event { // it's an event in the target thread | |
1689 public: | |
1690 XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, EventLoop& loop, | |
1691 void* funcTracePtr, SourceLocation location); | |
1692 | |
1693 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
1694 | |
1695 protected: | |
1696 void ensureDoneOrCanceled(); | |
1697 // MUST be called in destructor of subclasses to make sure the object is not destroyed while | |
1698 // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because | |
1699 // that destructor doesn't run until the subclass has already been destroyed.) | |
1700 | |
1701 virtual kj::Maybe<OwnPromiseNode> execute() = 0; | |
1702 // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise | |
1703 // returns null. | |
1704 | |
1705 // implements PromiseNode ---------------------------------------------------- | |
1706 void onReady(Event* event) noexcept override; | |
1707 | |
1708 private: | |
1709 ExceptionOrValue& result; | |
1710 void* funcTracePtr; | |
1711 | |
1712 kj::Own<const Executor> targetExecutor; | |
1713 Maybe<const Executor&> replyExecutor; // If executeAsync() was used. | |
1714 | |
1715 kj::Maybe<OwnPromiseNode> promiseNode; | |
1716 // Accessed only in target thread. | |
1717 | |
1718 ListLink<XThreadEvent> targetLink; | |
1719 // Membership in one of the linked lists in the target Executor's work list or cancel list. These | |
1720 // fields are protected by the target Executor's mutex. | |
1721 | |
1722 enum { | |
1723 UNUSED, | |
1724 // Object was never queued on another thread. | |
1725 | |
1726 QUEUED, | |
1727 // Target thread has not yet dequeued the event from the state.start list. The requesting | |
1728 // thread can cancel execution by removing the event from the list. | |
1729 | |
1730 EXECUTING, | |
1731 // Target thread has dequeued the event from state.start and moved it to state.executing. To | |
1732 // cancel, the requesting thread must add the event to the state.cancel list and change the | |
1733 // state to CANCELING. | |
1734 | |
1735 CANCELING, | |
1736 // Requesting thread is trying to cancel this event. The target thread will change the state to | |
1737 // `DONE` once canceled. | |
1738 | |
1739 DONE | |
1740 // Target thread has completed handling this event and will not touch it again. The requesting | |
1741 // thread can safely delete the object. The `state` is updated to `DONE` using an atomic | |
1742 // release operation after ensuring that the event will not be touched again, so that the | |
1743 // requesting can safely skip locking if it observes the state is already DONE. | |
1744 } state = UNUSED; | |
1745 // State, which is also protected by `targetExecutor`'s mutex. | |
1746 | |
1747 ListLink<XThreadEvent> replyLink; | |
1748 // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The | |
1749 // executing thread places the event in the reply list near the end of the `EXECUTING` state. | |
1750 // Because the thread cannot lock two mutexes at once, it's possible that the reply executor | |
1751 // will receive the reply while the event is still listed in the EXECUTING state, but it can | |
1752 // ignore the state and proceed with the result. | |
1753 | |
1754 OnReadyEvent onReadyEvent; | |
1755 // Accessed only in requesting thread. | |
1756 | |
1757 friend class kj::Executor; | |
1758 | |
1759 void done(); | |
1760 // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT | |
1761 // call under lock. | |
1762 | |
1763 void sendReply(); | |
1764 // Notifies the originating thread that this event is done, but doesn't set the state to DONE | |
1765 // yet. Do NOT call under lock. | |
1766 | |
1767 void setDoneState(); | |
1768 // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must | |
1769 // only be called in the destination thread, and must either be called under lock, or the thread | |
1770 // must take the lock and release it again shortly after setting the state (because some threads | |
1771 // may be waiting on the DONE state using a conditional wait on the mutex). After calling | |
1772 // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs | |
1773 // solely to the requesting thread. | |
1774 | |
1775 void setDisconnected(); | |
1776 // Sets the result to a DISCONNECTED exception indicating that the target event loop exited. | |
1777 | |
1778 class DelayedDoneHack; | |
1779 | |
1780 // implements Event ---------------------------------------------------------- | |
1781 Maybe<Own<Event>> fire() override; | |
1782 // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr, | |
1783 // then it just indicated readiness and we need to get its result. | |
1784 | |
1785 void traceEvent(TraceBuilder& builder) override; | |
1786 }; | |
1787 | |
1788 template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>> | |
1789 class XThreadEventImpl final: public XThreadEvent { | |
1790 // Implementation for a function that does not return a Promise. | |
1791 public: | |
1792 XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location) | |
1793 : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location), | |
1794 func(kj::fwd<Func>(func)) {} | |
1795 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } | |
1796 void destroy() override { freePromise(this); } | |
1797 | |
1798 typedef _::FixVoid<_::ReturnType<Func, void>> ResultT; | |
1799 | |
1800 kj::Maybe<_::OwnPromiseNode> execute() override { | |
1801 result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void()); | |
1802 return nullptr; | |
1803 } | |
1804 | |
1805 // implements PromiseNode ---------------------------------------------------- | |
1806 void get(ExceptionOrValue& output) noexcept override { | |
1807 output.as<ResultT>() = kj::mv(result); | |
1808 } | |
1809 | |
1810 private: | |
1811 Func func; | |
1812 ExceptionOr<ResultT> result; | |
1813 friend Executor; | |
1814 }; | |
1815 | |
1816 template <typename Func, typename T> | |
1817 class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent { | |
1818 // Implementation for a function that DOES return a Promise. | |
1819 public: | |
1820 XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location) | |
1821 : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location), | |
1822 func(kj::fwd<Func>(func)) {} | |
1823 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } | |
1824 void destroy() override { freePromise(this); } | |
1825 | |
1826 typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT; | |
1827 | |
1828 kj::Maybe<_::OwnPromiseNode> execute() override { | |
1829 auto result = _::PromiseNode::from(func()); | |
1830 KJ_IREQUIRE(result.get() != nullptr); | |
1831 return kj::mv(result); | |
1832 } | |
1833 | |
1834 // implements PromiseNode ---------------------------------------------------- | |
1835 void get(ExceptionOrValue& output) noexcept override { | |
1836 output.as<ResultT>() = kj::mv(result); | |
1837 } | |
1838 | |
1839 private: | |
1840 Func func; | |
1841 ExceptionOr<ResultT> result; | |
1842 friend Executor; | |
1843 }; | |
1844 | |
1845 } // namespace _ (private) | |
1846 | |
1847 template <typename Func> | |
1848 _::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync( | |
1849 Func&& func, SourceLocation location) const { | |
1850 _::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this, getLoop(), location); | |
1851 send(event, true); | |
1852 return convertToReturn(kj::mv(event.result)); | |
1853 } | |
1854 | |
1855 template <typename Func> | |
1856 PromiseForResult<Func, void> Executor::executeAsync(Func&& func, SourceLocation location) const { | |
1857 // HACK: We call getLoop() here, rather than have XThreadEvent's constructor do it, so that if it | |
1858 // throws we don't crash due to `allocPromise()` being `noexcept`. | |
1859 auto event = _::allocPromise<_::XThreadEventImpl<Func>>( | |
1860 kj::fwd<Func>(func), *this, getLoop(), location); | |
1861 send(*event, false); | |
1862 return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event)); | |
1863 } | |
1864 | |
1865 // ----------------------------------------------------------------------------- | |
1866 | |
1867 namespace _ { // (private) | |
1868 | |
1869 template <typename T> | |
1870 class XThreadFulfiller; | |
1871 | |
1872 class XThreadPaf: public PromiseNode { | |
1873 public: | |
1874 XThreadPaf(); | |
1875 virtual ~XThreadPaf() noexcept(false); | |
1876 void destroy() override; | |
1877 | |
1878 // implements PromiseNode ---------------------------------------------------- | |
1879 void onReady(Event* event) noexcept override; | |
1880 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
1881 | |
1882 private: | |
1883 enum { | |
1884 WAITING, | |
1885 // Not yet fulfilled, and the waiter is still waiting. | |
1886 // | |
1887 // Starting from this state, the state may transition to either FULFILLING or CANCELED | |
1888 // using an atomic compare-and-swap. | |
1889 | |
1890 FULFILLING, | |
1891 // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it | |
1892 // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not | |
1893 // disappear out from under it. It then fills in the result value, locks the executor mutex, | |
1894 // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to | |
1895 // FULFILLED, and finally unlocks the mutex. | |
1896 // | |
1897 // If the waiting thread tries to cancel but discovers the object in this state, then it | |
1898 // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED. | |
1899 // It can then delete the object. | |
1900 | |
1901 FULFILLED, | |
1902 // The fulfilling thread has completed filling in the result value and inserting the object | |
1903 // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer | |
1904 // holds any pointers to this object. The waiting thread is responsible for deleting it. | |
1905 | |
1906 DISPATCHED, | |
1907 // The object reached FULFILLED state, and then was dispatched from the waiting thread's | |
1908 // executor's event queue. Therefore, the object is completely owned by the waiting thread with | |
1909 // no need to lock anything. | |
1910 | |
1911 CANCELED | |
1912 // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no | |
1913 // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the | |
1914 // object. | |
1915 } state; | |
1916 | |
1917 const Executor& executor; | |
1918 // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or | |
1919 // `FULFILLING`. After any other state has been reached, this reference may be invalidated. | |
1920 | |
1921 ListLink<XThreadPaf> link; | |
1922 // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting | |
1923 // thread's executor. In those states, these pointers are guarded by said executor's mutex. | |
1924 | |
1925 OnReadyEvent onReadyEvent; | |
1926 | |
1927 class FulfillScope; | |
1928 | |
1929 static kj::Exception unfulfilledException(); | |
1930 // Construct appropriate exception to use to reject an unfulfilled XThreadPaf. | |
1931 | |
1932 template <typename T> | |
1933 friend class XThreadFulfiller; | |
1934 friend Executor; | |
1935 }; | |
1936 | |
1937 template <typename T> | |
1938 class XThreadPafImpl final: public XThreadPaf { | |
1939 public: | |
1940 // implements PromiseNode ---------------------------------------------------- | |
1941 void get(ExceptionOrValue& output) noexcept override { | |
1942 output.as<FixVoid<T>>() = kj::mv(result); | |
1943 } | |
1944 | |
1945 private: | |
1946 ExceptionOr<FixVoid<T>> result; | |
1947 | |
1948 friend class XThreadFulfiller<T>; | |
1949 }; | |
1950 | |
1951 class XThreadPaf::FulfillScope { | |
1952 // Create on stack while setting `XThreadPafImpl<T>::result`. | |
1953 // | |
1954 // This ensures that: | |
1955 // - Only one call is carried out, even if multiple threads try to fulfill concurrently. | |
1956 // - The waiting thread is correctly signaled. | |
1957 public: | |
1958 FulfillScope(XThreadPaf** pointer); | |
1959 // Atomically nulls out *pointer and takes ownership of the pointer. | |
1960 | |
1961 ~FulfillScope() noexcept(false); | |
1962 | |
1963 KJ_DISALLOW_COPY_AND_MOVE(FulfillScope); | |
1964 | |
1965 bool shouldFulfill() { return obj != nullptr; } | |
1966 | |
1967 template <typename T> | |
1968 XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); } | |
1969 | |
1970 private: | |
1971 XThreadPaf* obj; | |
1972 }; | |
1973 | |
1974 template <typename T> | |
1975 class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> { | |
1976 public: | |
1977 XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {} | |
1978 | |
1979 ~XThreadFulfiller() noexcept(false) { | |
1980 if (target != nullptr) { | |
1981 reject(XThreadPaf::unfulfilledException()); | |
1982 } | |
1983 } | |
1984 void fulfill(FixVoid<T>&& value) const override { | |
1985 XThreadPaf::FulfillScope scope(&target); | |
1986 if (scope.shouldFulfill()) { | |
1987 scope.getTarget<T>()->result = kj::mv(value); | |
1988 } | |
1989 } | |
1990 void reject(Exception&& exception) const override { | |
1991 XThreadPaf::FulfillScope scope(&target); | |
1992 if (scope.shouldFulfill()) { | |
1993 scope.getTarget<T>()->result.addException(kj::mv(exception)); | |
1994 } | |
1995 } | |
1996 bool isWaiting() const override { | |
1997 KJ_IF_MAYBE(t, target) { | |
1998 #if _MSC_VER && !__clang__ | |
1999 // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be? | |
2000 return t->state == XThreadPaf::WAITING; | |
2001 #else | |
2002 return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING; | |
2003 #endif | |
2004 } else { | |
2005 return false; | |
2006 } | |
2007 } | |
2008 | |
2009 private: | |
2010 mutable XThreadPaf* target; // accessed using atomic ops | |
2011 }; | |
2012 | |
2013 template <typename T> | |
2014 class XThreadFulfiller<kj::Promise<T>> { | |
2015 public: | |
2016 static_assert(sizeof(T) < 0, | |
2017 "newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported"); | |
2018 // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`, | |
2019 // then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not | |
2020 // the waiting thread. | |
2021 }; | |
2022 | |
2023 } // namespace _ (private) | |
2024 | |
2025 template <typename T> | |
2026 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() { | |
2027 kj::Own<_::XThreadPafImpl<T>, _::PromiseDisposer> node(new _::XThreadPafImpl<T>); | |
2028 auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node); | |
2029 return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) }; | |
2030 } | |
2031 | |
2032 } // namespace kj | |
2033 | |
2034 #if KJ_HAS_COROUTINE | |
2035 | |
2036 // ======================================================================================= | |
2037 // Coroutines TS integration with kj::Promise<T>. | |
2038 // | |
2039 // Here's a simple coroutine: | |
2040 // | |
2041 // Promise<Own<AsyncIoStream>> connectToService(Network& n) { | |
2042 // auto a = co_await n.parseAddress(IP, PORT); | |
2043 // auto c = co_await a->connect(); | |
2044 // co_return kj::mv(c); | |
2045 // } | |
2046 // | |
2047 // The presence of the co_await and co_return keywords tell the compiler it is a coroutine. | |
2048 // Although it looks similar to a function, it has a couple large differences. First, everything | |
2049 // that would normally live in the stack frame lives instead in a heap-based coroutine frame. | |
2050 // Second, the coroutine has the ability to return from its scope without deallocating this frame | |
2051 // (to suspend, in other words), and the ability to resume from its last suspension point. | |
2052 // | |
2053 // In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a | |
2054 // coroutine implementation type via a traits class parameterized by the coroutine return and | |
2055 // parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`, | |
2056 | |
2057 namespace kj::_ { template <typename T> class Coroutine; } | |
2058 | |
2059 // Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`. | |
2060 | |
2061 namespace KJ_COROUTINE_STD_NAMESPACE { | |
2062 | |
2063 template <class T, class... Args> | |
2064 struct coroutine_traits<kj::Promise<T>, Args...> { | |
2065 // `Args...` are the coroutine's parameter types. | |
2066 | |
2067 using promise_type = kj::_::Coroutine<T>; | |
2068 // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines | |
2069 // returning `std::future<T>`, since the coroutine implementation would be a wrapper around | |
2070 // a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it | |
2071 // the "coroutine implementation type" instead. | |
2072 }; | |
2073 | |
2074 } // namespace KJ_COROUTINE_STD_NAMESPACE | |
2075 | |
2076 // Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a | |
2077 // `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or | |
2078 // `kj::_::Coroutine<Own<AsyncIoStream>>`. | |
2079 // | |
2080 // The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and | |
2081 // deallocated when the frame does. | |
2082 | |
2083 namespace kj::_ { | |
2084 | |
2085 namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE; | |
2086 | |
2087 class CoroutineBase: public PromiseNode, | |
2088 public Event { | |
2089 public: | |
2090 CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef, | |
2091 SourceLocation location); | |
2092 ~CoroutineBase() noexcept(false); | |
2093 KJ_DISALLOW_COPY_AND_MOVE(CoroutineBase); | |
2094 void destroy() override; | |
2095 | |
2096 auto initial_suspend() { return stdcoro::suspend_never(); } | |
2097 auto final_suspend() noexcept { | |
2098 #if _MSC_VER && !defined(__clang__) | |
2099 // See comment at `finalSuspendCalled`'s definition. | |
2100 finalSuspendCalled = true; | |
2101 #endif | |
2102 return stdcoro::suspend_always(); | |
2103 } | |
2104 // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately | |
2105 // after completion. | |
2106 // | |
2107 // The initial suspension point could allow us to defer the initial synchronous execution of a | |
2108 // coroutine -- everything before its first co_await, that is. | |
2109 // | |
2110 // The final suspension point is useful to delay deallocation of the coroutine frame to match the | |
2111 // lifetime of the enclosing promise. | |
2112 | |
2113 void unhandled_exception(); | |
2114 | |
2115 protected: | |
2116 class AwaiterBase; | |
2117 | |
2118 bool isWaiting() { return waiting; } | |
2119 void scheduleResumption() { | |
2120 onReadyEvent.arm(); | |
2121 waiting = false; | |
2122 } | |
2123 | |
2124 private: | |
2125 // ------------------------------------------------------- | |
2126 // PromiseNode implementation | |
2127 | |
2128 void onReady(Event* event) noexcept override; | |
2129 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; | |
2130 | |
2131 // ------------------------------------------------------- | |
2132 // Event implementation | |
2133 | |
2134 Maybe<Own<Event>> fire() override; | |
2135 void traceEvent(TraceBuilder& builder) override; | |
2136 | |
2137 stdcoro::coroutine_handle<> coroutine; | |
2138 ExceptionOrValue& resultRef; | |
2139 | |
2140 OnReadyEvent onReadyEvent; | |
2141 bool waiting = true; | |
2142 | |
2143 bool hasSuspendedAtLeastOnce = false; | |
2144 | |
2145 #if _MSC_VER && !defined(__clang__) | |
2146 bool finalSuspendCalled = false; | |
2147 // MSVC erroneously reports the coroutine as done (that is, `coroutine.done()` returns true) | |
2148 // seemingly as soon as `return_value()`/`return_void()` are called. This matters in our | |
2149 // implementation of `unhandled_exception()`, which must arrange to propagate exceptions during | |
2150 // coroutine frame unwind via the returned promise, even if `return_value()`/`return_void()` have | |
2151 // already been called. To prove that our assumptions are correct in that function, we want to be | |
2152 // able to assert that `final_suspend()` has not yet been called. This boolean hack allows us to | |
2153 // preserve that assertion. | |
2154 #endif | |
2155 | |
2156 Maybe<PromiseNode&> promiseNodeForTrace; | |
2157 // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that | |
2158 // promise so tracePromise()/traceEvent() can trace into it. | |
2159 | |
2160 UnwindDetector unwindDetector; | |
2161 | |
2162 struct DisposalResults { | |
2163 bool destructorRan = false; | |
2164 Maybe<Exception> exception; | |
2165 }; | |
2166 Maybe<DisposalResults&> maybeDisposalResults; | |
2167 // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this | |
2168 // to point to a DisposalResults on the stack so unhandled_exception() will have some place to | |
2169 // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once | |
2170 // coroutine.destroy() has returned. Our disposer then rethrows as needed. | |
2171 }; | |
2172 | |
2173 template <typename Self, typename T> | |
2174 class CoroutineMixin; | |
2175 // CRTP mixin, covered later. | |
2176 | |
2177 template <typename T> | |
2178 class Coroutine final: public CoroutineBase, | |
2179 public CoroutineMixin<Coroutine<T>, T> { | |
2180 // The standard calls this the `promise_type` object. We can call this the "coroutine | |
2181 // implementation object" since the word promise means different things in KJ and std styles. This | |
2182 // is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise | |
2183 // is later fulfilled. We also fill in a few lifetime-related details. | |
2184 // | |
2185 // The implementation object is also where we can customize memory allocation of coroutine frames, | |
2186 // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in | |
2187 // coroutine_traits). | |
2188 // | |
2189 // We can also customize how await-expressions are transformed within `kj::Promise<T>`-based | |
2190 // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for | |
2191 // which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to | |
2192 // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the | |
2193 // await-expression's type are both `kj::Promise` instantiations -- see further comments under | |
2194 // `await_transform()`. | |
2195 | |
2196 public: | |
2197 using Handle = stdcoro::coroutine_handle<Coroutine<T>>; | |
2198 | |
2199 Coroutine(SourceLocation location = {}) | |
2200 : CoroutineBase(Handle::from_promise(*this), result, location) {} | |
2201 | |
2202 Promise<T> get_return_object() { | |
2203 // Called after coroutine frame construction and before initial_suspend() to create the | |
2204 // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for | |
2205 // the returned Promise<T> to own `this` via a custom Disposer and by always leaving the | |
2206 // coroutine in a suspended state. | |
2207 return PromiseNode::to<Promise<T>>(OwnPromiseNode(this)); | |
2208 } | |
2209 | |
2210 public: | |
2211 template <typename U> | |
2212 class Awaiter; | |
2213 | |
2214 template <typename U> | |
2215 Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); } | |
2216 template <typename U> | |
2217 Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); } | |
2218 // Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return | |
2219 // an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async | |
2220 // event system. | |
2221 // | |
2222 // There is another hook we could implement: an `operator co_await()` free function. However, a | |
2223 // free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a | |
2224 // member class template of Coroutine<T>, it is able to implement an | |
2225 // `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing | |
2226 // coroutine's PromiseNode. An `operator co_await()` free function would have to implement | |
2227 // a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement | |
2228 // suspension and resumption in terms of .then(). Yuck! | |
2229 | |
2230 private: | |
2231 // ------------------------------------------------------- | |
2232 // PromiseNode implementation | |
2233 | |
2234 void get(ExceptionOrValue& output) noexcept override { | |
2235 output.as<FixVoid<T>>() = kj::mv(result); | |
2236 } | |
2237 | |
2238 void fulfill(FixVoid<T>&& value) { | |
2239 // Called by the return_value()/return_void() functions in our mixin class. | |
2240 | |
2241 if (isWaiting()) { | |
2242 result = kj::mv(value); | |
2243 scheduleResumption(); | |
2244 } | |
2245 } | |
2246 | |
2247 ExceptionOr<FixVoid<T>> result; | |
2248 | |
2249 friend class CoroutineMixin<Coroutine<T>, T>; | |
2250 }; | |
2251 | |
2252 template <typename Self, typename T> | |
2253 class CoroutineMixin { | |
2254 public: | |
2255 void return_value(T value) { | |
2256 static_cast<Self*>(this)->fulfill(kj::mv(value)); | |
2257 } | |
2258 }; | |
2259 template <typename Self> | |
2260 class CoroutineMixin<Self, void> { | |
2261 public: | |
2262 void return_void() { | |
2263 static_cast<Self*>(this)->fulfill(_::Void()); | |
2264 } | |
2265 }; | |
2266 // The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return | |
2267 // statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has | |
2268 // both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so | |
2269 // these return_* functions live in a CRTP mixin. | |
2270 | |
2271 class CoroutineBase::AwaiterBase { | |
2272 public: | |
2273 explicit AwaiterBase(OwnPromiseNode node); | |
2274 AwaiterBase(AwaiterBase&&); | |
2275 ~AwaiterBase() noexcept(false); | |
2276 KJ_DISALLOW_COPY(AwaiterBase); | |
2277 | |
2278 bool await_ready() const { return false; } | |
2279 // This could return "`node->get()` is safe to call" instead, which would make suspension-less | |
2280 // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that | |
2281 // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we | |
2282 // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable | |
2283 // suspension-less co_awaits. | |
2284 | |
2285 protected: | |
2286 void getImpl(ExceptionOrValue& result, void* awaitedAt); | |
2287 bool awaitSuspendImpl(CoroutineBase& coroutineEvent); | |
2288 | |
2289 private: | |
2290 UnwindDetector unwindDetector; | |
2291 OwnPromiseNode node; | |
2292 | |
2293 Maybe<CoroutineBase&> maybeCoroutineEvent; | |
2294 // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our | |
2295 // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack | |
2296 // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called, | |
2297 // we need our own reference to the enclosing Coroutine. (I struggle to think up any such | |
2298 // scenarios, but perhaps they could occur when destroying a suspended coroutine.) | |
2299 }; | |
2300 | |
2301 template <typename T> | |
2302 template <typename U> | |
2303 class Coroutine<T>::Awaiter: public AwaiterBase { | |
2304 // Wrapper around a co_await'ed promise and some storage space for the result of that promise. | |
2305 // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up | |
2306 // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event | |
2307 // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the | |
2308 // awaited promise result. | |
2309 | |
2310 public: | |
2311 explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {} | |
2312 | |
2313 KJ_NOINLINE U await_resume() { | |
2314 // This is marked noinline in order to ensure __builtin_return_address() is accurate for stack | |
2315 // trace purposes. In my experimentation, this method was not inlined anyway even in opt | |
2316 // builds, but I want to make sure it doesn't suddenly start being inlined later causing stack | |
2317 // traces to break. (I also tried always-inline, but this did not appear to cause the compiler | |
2318 // to inline the method -- perhaps a limitation of coroutines?) | |
2319 #if __GNUC__ | |
2320 getImpl(result, __builtin_return_address(0)); | |
2321 #elif _MSC_VER | |
2322 getImpl(result, _ReturnAddress()); | |
2323 #else | |
2324 #error "please implement for your compiler" | |
2325 #endif | |
2326 auto value = kj::_::readMaybe(result.value); | |
2327 KJ_IASSERT(value != nullptr, "Neither exception nor value present."); | |
2328 return U(kj::mv(*value)); | |
2329 } | |
2330 | |
2331 bool await_suspend(Coroutine::Handle coroutine) { | |
2332 return awaitSuspendImpl(coroutine.promise()); | |
2333 } | |
2334 | |
2335 private: | |
2336 ExceptionOr<FixVoid<U>> result; | |
2337 }; | |
2338 | |
2339 #undef KJ_COROUTINE_STD_NAMESPACE | |
2340 | |
2341 } // namespace kj::_ (private) | |
2342 | |
2343 #endif // KJ_HAS_COROUTINE | |
2344 | |
2345 KJ_END_HEADER |