jpayne@69
|
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
|
jpayne@69
|
2 // Licensed under the MIT License:
|
jpayne@69
|
3 //
|
jpayne@69
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
jpayne@69
|
5 // of this software and associated documentation files (the "Software"), to deal
|
jpayne@69
|
6 // in the Software without restriction, including without limitation the rights
|
jpayne@69
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
jpayne@69
|
8 // copies of the Software, and to permit persons to whom the Software is
|
jpayne@69
|
9 // furnished to do so, subject to the following conditions:
|
jpayne@69
|
10 //
|
jpayne@69
|
11 // The above copyright notice and this permission notice shall be included in
|
jpayne@69
|
12 // all copies or substantial portions of the Software.
|
jpayne@69
|
13 //
|
jpayne@69
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
jpayne@69
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
jpayne@69
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
jpayne@69
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
jpayne@69
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
jpayne@69
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
jpayne@69
|
20 // THE SOFTWARE.
|
jpayne@69
|
21
|
jpayne@69
|
22 // This file contains extended inline implementation details that are required along with async.h.
|
jpayne@69
|
23 // We move this all into a separate file to make async.h more readable.
|
jpayne@69
|
24 //
|
jpayne@69
|
25 // Non-inline declarations here are defined in async.c++.
|
jpayne@69
|
26
|
jpayne@69
|
27 #pragma once
|
jpayne@69
|
28
|
jpayne@69
|
29 #ifndef KJ_ASYNC_H_INCLUDED
|
jpayne@69
|
30 #error "Do not include this directly; include kj/async.h."
|
jpayne@69
|
31 #include "async.h" // help IDE parse this file
|
jpayne@69
|
32 #endif
|
jpayne@69
|
33
|
jpayne@69
|
34 #if _MSC_VER && KJ_HAS_COROUTINE
|
jpayne@69
|
35 #include <intrin.h>
|
jpayne@69
|
36 #endif
|
jpayne@69
|
37
|
jpayne@69
|
38 #include <kj/list.h>
|
jpayne@69
|
39
|
jpayne@69
|
40 KJ_BEGIN_HEADER
|
jpayne@69
|
41
|
jpayne@69
|
42 namespace kj {
|
jpayne@69
|
43 namespace _ { // private
|
jpayne@69
|
44
|
jpayne@69
|
45 template <typename T>
|
jpayne@69
|
46 class ExceptionOr;
|
jpayne@69
|
47
|
jpayne@69
|
48 class ExceptionOrValue {
|
jpayne@69
|
49 public:
|
jpayne@69
|
50 ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {}
|
jpayne@69
|
51 KJ_DISALLOW_COPY(ExceptionOrValue);
|
jpayne@69
|
52
|
jpayne@69
|
53 void addException(Exception&& exception) {
|
jpayne@69
|
54 if (this->exception == nullptr) {
|
jpayne@69
|
55 this->exception = kj::mv(exception);
|
jpayne@69
|
56 }
|
jpayne@69
|
57 }
|
jpayne@69
|
58
|
jpayne@69
|
59 template <typename T>
|
jpayne@69
|
60 ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
|
jpayne@69
|
61 template <typename T>
|
jpayne@69
|
62 const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); }
|
jpayne@69
|
63
|
jpayne@69
|
64 Maybe<Exception> exception;
|
jpayne@69
|
65
|
jpayne@69
|
66 protected:
|
jpayne@69
|
67 // Allow subclasses to have move constructor / assignment.
|
jpayne@69
|
68 ExceptionOrValue() = default;
|
jpayne@69
|
69 ExceptionOrValue(ExceptionOrValue&& other) = default;
|
jpayne@69
|
70 ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
|
jpayne@69
|
71 };
|
jpayne@69
|
72
|
jpayne@69
|
73 template <typename T>
|
jpayne@69
|
74 class ExceptionOr: public ExceptionOrValue {
|
jpayne@69
|
75 public:
|
jpayne@69
|
76 ExceptionOr() = default;
|
jpayne@69
|
77 ExceptionOr(T&& value): value(kj::mv(value)) {}
|
jpayne@69
|
78 ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
|
jpayne@69
|
79 ExceptionOr(ExceptionOr&&) = default;
|
jpayne@69
|
80 ExceptionOr& operator=(ExceptionOr&&) = default;
|
jpayne@69
|
81
|
jpayne@69
|
82 Maybe<T> value;
|
jpayne@69
|
83 };
|
jpayne@69
|
84
|
jpayne@69
|
85 template <typename T>
|
jpayne@69
|
86 inline T convertToReturn(ExceptionOr<T>&& result) {
|
jpayne@69
|
87 KJ_IF_MAYBE(value, result.value) {
|
jpayne@69
|
88 KJ_IF_MAYBE(exception, result.exception) {
|
jpayne@69
|
89 throwRecoverableException(kj::mv(*exception));
|
jpayne@69
|
90 }
|
jpayne@69
|
91 return _::returnMaybeVoid(kj::mv(*value));
|
jpayne@69
|
92 } else KJ_IF_MAYBE(exception, result.exception) {
|
jpayne@69
|
93 throwFatalException(kj::mv(*exception));
|
jpayne@69
|
94 } else {
|
jpayne@69
|
95 // Result contained neither a value nor an exception?
|
jpayne@69
|
96 KJ_UNREACHABLE;
|
jpayne@69
|
97 }
|
jpayne@69
|
98 }
|
jpayne@69
|
99
|
jpayne@69
|
100 inline void convertToReturn(ExceptionOr<Void>&& result) {
|
jpayne@69
|
101 // Override <void> case to use throwRecoverableException().
|
jpayne@69
|
102
|
jpayne@69
|
103 if (result.value != nullptr) {
|
jpayne@69
|
104 KJ_IF_MAYBE(exception, result.exception) {
|
jpayne@69
|
105 throwRecoverableException(kj::mv(*exception));
|
jpayne@69
|
106 }
|
jpayne@69
|
107 } else KJ_IF_MAYBE(exception, result.exception) {
|
jpayne@69
|
108 throwRecoverableException(kj::mv(*exception));
|
jpayne@69
|
109 } else {
|
jpayne@69
|
110 // Result contained neither a value nor an exception?
|
jpayne@69
|
111 KJ_UNREACHABLE;
|
jpayne@69
|
112 }
|
jpayne@69
|
113 }
|
jpayne@69
|
114
|
jpayne@69
|
115 class TraceBuilder {
|
jpayne@69
|
116 // Helper for methods that build a call trace.
|
jpayne@69
|
117 public:
|
jpayne@69
|
118 TraceBuilder(ArrayPtr<void*> space)
|
jpayne@69
|
119 : start(space.begin()), current(space.begin()), limit(space.end()) {}
|
jpayne@69
|
120
|
jpayne@69
|
121 inline void add(void* addr) {
|
jpayne@69
|
122 if (current < limit) {
|
jpayne@69
|
123 *current++ = addr;
|
jpayne@69
|
124 }
|
jpayne@69
|
125 }
|
jpayne@69
|
126
|
jpayne@69
|
127 inline bool full() const { return current == limit; }
|
jpayne@69
|
128
|
jpayne@69
|
129 ArrayPtr<void*> finish() {
|
jpayne@69
|
130 return arrayPtr(start, current);
|
jpayne@69
|
131 }
|
jpayne@69
|
132
|
jpayne@69
|
133 String toString();
|
jpayne@69
|
134
|
jpayne@69
|
135 private:
|
jpayne@69
|
136 void** start;
|
jpayne@69
|
137 void** current;
|
jpayne@69
|
138 void** limit;
|
jpayne@69
|
139 };
|
jpayne@69
|
140
|
jpayne@69
|
141 struct alignas(void*) PromiseArena {
|
jpayne@69
|
142 // Space in which a chain of promises may be allocated. See PromiseDisposer.
|
jpayne@69
|
143 byte bytes[1024];
|
jpayne@69
|
144 };
|
jpayne@69
|
145
|
jpayne@69
|
146 class Event: private AsyncObject {
|
jpayne@69
|
147 // An event waiting to be executed. Not for direct use by applications -- promises use this
|
jpayne@69
|
148 // internally.
|
jpayne@69
|
149
|
jpayne@69
|
150 public:
|
jpayne@69
|
151 Event(SourceLocation location);
|
jpayne@69
|
152 Event(kj::EventLoop& loop, SourceLocation location);
|
jpayne@69
|
153 ~Event() noexcept(false);
|
jpayne@69
|
154 KJ_DISALLOW_COPY_AND_MOVE(Event);
|
jpayne@69
|
155
|
jpayne@69
|
156 void armDepthFirst();
|
jpayne@69
|
157 // Enqueue this event so that `fire()` will be called from the event loop soon.
|
jpayne@69
|
158 //
|
jpayne@69
|
159 // Events scheduled in this way are executed in depth-first order: if an event callback arms
|
jpayne@69
|
160 // more events, those events are placed at the front of the queue (in the order in which they
|
jpayne@69
|
161 // were armed), so that they run immediately after the first event's callback returns.
|
jpayne@69
|
162 //
|
jpayne@69
|
163 // Depth-first event scheduling is appropriate for events that represent simple continuations
|
jpayne@69
|
164 // of a previous event that should be globbed together for performance. Depth-first scheduling
|
jpayne@69
|
165 // can lead to starvation, so any long-running task must occasionally yield with
|
jpayne@69
|
166 // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
|
jpayne@69
|
167 // breadth-first.)
|
jpayne@69
|
168 //
|
jpayne@69
|
169 // To use breadth-first scheduling instead, use `armBreadthFirst()`.
|
jpayne@69
|
170
|
jpayne@69
|
171 void armBreadthFirst();
|
jpayne@69
|
172 // Like `armDepthFirst()` except that the event is placed at the end of the queue.
|
jpayne@69
|
173
|
jpayne@69
|
174 void armLast();
|
jpayne@69
|
175 // Enqueues this event to happen after all other events have run to completion and there is
|
jpayne@69
|
176 // really nothing left to do except wait for I/O.
|
jpayne@69
|
177
|
jpayne@69
|
178 bool isNext();
|
jpayne@69
|
179 // True if the Event has been armed and is next in line to be fired. This can be used after
|
jpayne@69
|
180 // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately
|
jpayne@69
|
181 // ready, in which case continuations may be optimistically run without returning to the event
|
jpayne@69
|
182 // loop. Note that this optimization is only valid if we know that we would otherwise immediately
|
jpayne@69
|
183 // return to the event loop without running more application code. So this turns out to be useful
|
jpayne@69
|
184 // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it
|
jpayne@69
|
185 // doesn't need to.
|
jpayne@69
|
186 //
|
jpayne@69
|
187 // Returns false if the event loop is not currently running. This ensures that promise
|
jpayne@69
|
188 // continuations don't execute except under a call to .wait().
|
jpayne@69
|
189
|
jpayne@69
|
190 void disarm();
|
jpayne@69
|
191 // If the event is armed but hasn't fired, cancel it. (Destroying the event does this
|
jpayne@69
|
192 // implicitly.)
|
jpayne@69
|
193
|
jpayne@69
|
194 virtual void traceEvent(TraceBuilder& builder) = 0;
|
jpayne@69
|
195 // Build a trace of the callers leading up to this event. `builder` will be populated with
|
jpayne@69
|
196 // "return addresses" of the promise chain waiting on this event. The return addresses may
|
jpayne@69
|
197 // actually the addresses of lambdas passed to .then(), but in any case, feeding them into
|
jpayne@69
|
198 // addr2line should produce useful source code locations.
|
jpayne@69
|
199 //
|
jpayne@69
|
200 // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It
|
jpayne@69
|
201 // must not allocate nor take locks.
|
jpayne@69
|
202
|
jpayne@69
|
203 String traceEvent();
|
jpayne@69
|
204 // Helper that builds a trace and stringifies it.
|
jpayne@69
|
205
|
jpayne@69
|
206 protected:
|
jpayne@69
|
207 virtual Maybe<Own<Event>> fire() = 0;
|
jpayne@69
|
208 // Fire the event. Possibly returns a pointer to itself, which will be discarded by the
|
jpayne@69
|
209 // caller. This is the only way that an event can delete itself as a result of firing, as
|
jpayne@69
|
210 // doing so from within fire() will throw an exception.
|
jpayne@69
|
211
|
jpayne@69
|
212 private:
|
jpayne@69
|
213 friend class kj::EventLoop;
|
jpayne@69
|
214 EventLoop& loop;
|
jpayne@69
|
215 Event* next;
|
jpayne@69
|
216 Event** prev;
|
jpayne@69
|
217 bool firing = false;
|
jpayne@69
|
218
|
jpayne@69
|
219 static constexpr uint MAGIC_LIVE_VALUE = 0x1e366381u;
|
jpayne@69
|
220 uint live = MAGIC_LIVE_VALUE;
|
jpayne@69
|
221 SourceLocation location;
|
jpayne@69
|
222 };
|
jpayne@69
|
223
|
jpayne@69
|
224 class PromiseArenaMember {
|
jpayne@69
|
225 // An object that is allocated in a PromiseArena. `PromiseNode` inherits this, and most
|
jpayne@69
|
226 // arena-allocated objects are `PromiseNode` subclasses, but `TaskSet::Task`, ForkHub, and
|
jpayne@69
|
227 // potentially other objects that commonly live on the end of a promise chain can also leverage
|
jpayne@69
|
228 // this.
|
jpayne@69
|
229
|
jpayne@69
|
230 public:
|
jpayne@69
|
231 virtual void destroy() = 0;
|
jpayne@69
|
232 // Destroys and frees the node.
|
jpayne@69
|
233 //
|
jpayne@69
|
234 // If the node was allocated using allocPromise<T>(), then destroy() must call
|
jpayne@69
|
235 // freePromise<T>(this). If it was allocated some other way, then it is `destroy()`'s
|
jpayne@69
|
236 // responsibility to complete any necessary cleanup of memory, e.g. call `delete this`.
|
jpayne@69
|
237 //
|
jpayne@69
|
238 // We use this instead of a virtual destructor for two reasons:
|
jpayne@69
|
239 // 1. Coroutine nodes are not independent objects, they have to call destroy() on the coroutine
|
jpayne@69
|
240 // handle to delete themselves.
|
jpayne@69
|
241 // 2. XThreadEvents sometimes leave it up to a different thread to actually delete the object.
|
jpayne@69
|
242
|
jpayne@69
|
243 private:
|
jpayne@69
|
244 PromiseArena* arena = nullptr;
|
jpayne@69
|
245 // If non-null, then this PromiseNode is the last node allocated within the given arena, and
|
jpayne@69
|
246 // therefore owns the arena. After this node is destroyed, the arena should be deleted.
|
jpayne@69
|
247 //
|
jpayne@69
|
248 // PromiseNodes are allocated within the arena starting from the end, and `PromiseNode`s
|
jpayne@69
|
249 // allocated this way are required to have `PromiseNode` itself as their leftmost inherited type,
|
jpayne@69
|
250 // so that the pointers match. Thus, the space in `arena` from its start to the location of the
|
jpayne@69
|
251 // `PromiseNode` is known to be available for subsequent allocations (which should then take
|
jpayne@69
|
252 // ownership of the arena).
|
jpayne@69
|
253
|
jpayne@69
|
254 friend class PromiseDisposer;
|
jpayne@69
|
255 };
|
jpayne@69
|
256
|
jpayne@69
|
257 class PromiseNode: public PromiseArenaMember, private AsyncObject {
|
jpayne@69
|
258 // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
|
jpayne@69
|
259 //
|
jpayne@69
|
260 // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky
|
jpayne@69
|
261 // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only
|
jpayne@69
|
262 // so down-cast in the few places that really need to be templated. Luckily this is all
|
jpayne@69
|
263 // internal implementation details.
|
jpayne@69
|
264
|
jpayne@69
|
265 public:
|
jpayne@69
|
266 virtual void onReady(Event* event) noexcept = 0;
|
jpayne@69
|
267 // Arms the given event when ready.
|
jpayne@69
|
268 //
|
jpayne@69
|
269 // May be called multiple times. If called again before the event was armed, the old event will
|
jpayne@69
|
270 // never be armed, only the new one. If called again after the event was armed, the new event
|
jpayne@69
|
271 // will be armed immediately. Can be called with nullptr to un-register the existing event.
|
jpayne@69
|
272
|
jpayne@69
|
273 virtual void setSelfPointer(OwnPromiseNode* selfPtr) noexcept;
|
jpayne@69
|
274 // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own
|
jpayne@69
|
275 // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses
|
jpayne@69
|
276 // this to shorten redundant chains. The default implementation does nothing; only
|
jpayne@69
|
277 // ChainPromiseNode should implement this.
|
jpayne@69
|
278
|
jpayne@69
|
279 virtual void get(ExceptionOrValue& output) noexcept = 0;
|
jpayne@69
|
280 // Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
|
jpayne@69
|
281 // Can only be called once, and only after the node is ready. Must be called directly from the
|
jpayne@69
|
282 // event loop, with no application code on the stack.
|
jpayne@69
|
283
|
jpayne@69
|
284 virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0;
|
jpayne@69
|
285 // Build a trace of this promise chain, showing what it is currently waiting on.
|
jpayne@69
|
286 //
|
jpayne@69
|
287 // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically
|
jpayne@69
|
288 // recurse to its child first, then after the child returns, add itself to the trace.
|
jpayne@69
|
289 //
|
jpayne@69
|
290 // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that
|
jpayne@69
|
291 // also implements Event, and should not trace that node or its children. This is used in
|
jpayne@69
|
292 // conjunction with Event::traceEvent(). The chain of Events is often more sparse than the chain
|
jpayne@69
|
293 // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an
|
jpayne@69
|
294 // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node
|
jpayne@69
|
295 // when it is ready, and then TransformPromiseNode applies the .then() transformation during the
|
jpayne@69
|
296 // call to .get().
|
jpayne@69
|
297 //
|
jpayne@69
|
298 // So, when we trace the chain of Events backwards, we end up hoping over segments of
|
jpayne@69
|
299 // TransformPromiseNodes (and other similar types). In order to get those added to the trace,
|
jpayne@69
|
300 // each Event must call back down the PromiseNode chain in the opposite direction, using this
|
jpayne@69
|
301 // method.
|
jpayne@69
|
302 //
|
jpayne@69
|
303 // `tracePromise()` may be called from an async signal handler while `get()` is executing. It
|
jpayne@69
|
304 // must not allocate nor take locks.
|
jpayne@69
|
305
|
jpayne@69
|
306 template <typename T>
|
jpayne@69
|
307 static OwnPromiseNode from(T&& promise) {
|
jpayne@69
|
308 // Given a Promise, extract the PromiseNode.
|
jpayne@69
|
309 return kj::mv(promise.node);
|
jpayne@69
|
310 }
|
jpayne@69
|
311 template <typename T>
|
jpayne@69
|
312 static PromiseNode& from(T& promise) {
|
jpayne@69
|
313 // Given a Promise, extract the PromiseNode.
|
jpayne@69
|
314 return *promise.node;
|
jpayne@69
|
315 }
|
jpayne@69
|
316 template <typename T>
|
jpayne@69
|
317 static T to(OwnPromiseNode&& node) {
|
jpayne@69
|
318 // Construct a Promise from a PromiseNode. (T should be a Promise type.)
|
jpayne@69
|
319 return T(false, kj::mv(node));
|
jpayne@69
|
320 }
|
jpayne@69
|
321
|
jpayne@69
|
322 protected:
|
jpayne@69
|
323 class OnReadyEvent {
|
jpayne@69
|
324 // Helper class for implementing onReady().
|
jpayne@69
|
325
|
jpayne@69
|
326 public:
|
jpayne@69
|
327 void init(Event* newEvent);
|
jpayne@69
|
328
|
jpayne@69
|
329 void arm();
|
jpayne@69
|
330 void armBreadthFirst();
|
jpayne@69
|
331 // Arms the event if init() has already been called and makes future calls to init()
|
jpayne@69
|
332 // automatically arm the event.
|
jpayne@69
|
333
|
jpayne@69
|
334 inline void traceEvent(TraceBuilder& builder) {
|
jpayne@69
|
335 if (event != nullptr && !builder.full()) event->traceEvent(builder);
|
jpayne@69
|
336 }
|
jpayne@69
|
337
|
jpayne@69
|
338 private:
|
jpayne@69
|
339 Event* event = nullptr;
|
jpayne@69
|
340 };
|
jpayne@69
|
341 };
|
jpayne@69
|
342
|
jpayne@69
|
343 class PromiseDisposer {
|
jpayne@69
|
344 public:
|
jpayne@69
|
345 template <typename T>
|
jpayne@69
|
346 static constexpr bool canArenaAllocate() {
|
jpayne@69
|
347 // We can only use arena allocation for types that fit in an arena and have pointer-size
|
jpayne@69
|
348 // alignment. Anything else will need to be allocated as a separate heap object.
|
jpayne@69
|
349 return sizeof(T) <= sizeof(PromiseArena) && alignof(T) <= alignof(void*);
|
jpayne@69
|
350 }
|
jpayne@69
|
351
|
jpayne@69
|
352 static void dispose(PromiseArenaMember* node) {
|
jpayne@69
|
353 PromiseArena* arena = node->arena;
|
jpayne@69
|
354 node->destroy();
|
jpayne@69
|
355 delete arena; // reminder: `delete` automatically ignores null pointers
|
jpayne@69
|
356 }
|
jpayne@69
|
357
|
jpayne@69
|
358 template <typename T, typename D = PromiseDisposer, typename... Params>
|
jpayne@69
|
359 static kj::Own<T, D> alloc(Params&&... params) noexcept {
|
jpayne@69
|
360 // Implements allocPromise().
|
jpayne@69
|
361 T* ptr;
|
jpayne@69
|
362 if (!canArenaAllocate<T>()) {
|
jpayne@69
|
363 // Node too big (or needs weird alignment), fall back to regular heap allocation.
|
jpayne@69
|
364 ptr = new T(kj::fwd<Params>(params)...);
|
jpayne@69
|
365 } else {
|
jpayne@69
|
366 // Start a new arena.
|
jpayne@69
|
367 //
|
jpayne@69
|
368 // NOTE: As in append() (below), we don't implement exception-safety because it causes code
|
jpayne@69
|
369 // bloat and these constructors probably don't throw. Instead this function is noexcept, so
|
jpayne@69
|
370 // if a constructor does throw, it'll crash rather than leak memory.
|
jpayne@69
|
371 auto* arena = new PromiseArena;
|
jpayne@69
|
372 ptr = reinterpret_cast<T*>(arena + 1) - 1;
|
jpayne@69
|
373 ctor(*ptr, kj::fwd<Params>(params)...);
|
jpayne@69
|
374 ptr->arena = arena;
|
jpayne@69
|
375 KJ_IREQUIRE(reinterpret_cast<void*>(ptr) ==
|
jpayne@69
|
376 reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)),
|
jpayne@69
|
377 "PromiseArenaMember must be the leftmost inherited type.");
|
jpayne@69
|
378 }
|
jpayne@69
|
379 return kj::Own<T, D>(ptr);
|
jpayne@69
|
380 }
|
jpayne@69
|
381
|
jpayne@69
|
382 template <typename T, typename D = PromiseDisposer, typename... Params>
|
jpayne@69
|
383 static kj::Own<T, D> append(
|
jpayne@69
|
384 OwnPromiseNode&& next, Params&&... params) noexcept {
|
jpayne@69
|
385 // Implements appendPromise().
|
jpayne@69
|
386
|
jpayne@69
|
387 PromiseArena* arena = next->arena;
|
jpayne@69
|
388
|
jpayne@69
|
389 if (!canArenaAllocate<T>() || arena == nullptr ||
|
jpayne@69
|
390 reinterpret_cast<byte*>(next.get()) - reinterpret_cast<byte*>(arena) < sizeof(T)) {
|
jpayne@69
|
391 // No arena available, or not enough space, or weird alignment needed. Start new arena.
|
jpayne@69
|
392 return alloc<T, D>(kj::mv(next), kj::fwd<Params>(params)...);
|
jpayne@69
|
393 } else {
|
jpayne@69
|
394 // Append to arena.
|
jpayne@69
|
395 //
|
jpayne@69
|
396 // NOTE: When we call ctor(), it takes ownership of `next`, so we shouldn't assume `next`
|
jpayne@69
|
397 // still exists after it returns. So we have to remove ownership of the arena before that.
|
jpayne@69
|
398 // In theory if we wanted this to be exception-safe, we'd also have to arrange to delete
|
jpayne@69
|
399 // the arena if the constructor throws. However, in practice none of the PromiseNode
|
jpayne@69
|
400 // constructors throw, so we just mark the whole method noexcept in order to avoid the
|
jpayne@69
|
401 // code bloat to handle this case.
|
jpayne@69
|
402 next->arena = nullptr;
|
jpayne@69
|
403 T* ptr = reinterpret_cast<T*>(next.get()) - 1;
|
jpayne@69
|
404 ctor(*ptr, kj::mv(next), kj::fwd<Params>(params)...);
|
jpayne@69
|
405 ptr->arena = arena;
|
jpayne@69
|
406 KJ_IREQUIRE(reinterpret_cast<void*>(ptr) ==
|
jpayne@69
|
407 reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)),
|
jpayne@69
|
408 "PromiseArenaMember must be the leftmost inherited type.");
|
jpayne@69
|
409 return kj::Own<T, D>(ptr);
|
jpayne@69
|
410 }
|
jpayne@69
|
411 }
|
jpayne@69
|
412 };
|
jpayne@69
|
413
|
jpayne@69
|
414 template <typename T, typename... Params>
|
jpayne@69
|
415 static kj::Own<T, PromiseDisposer> allocPromise(Params&&... params) {
|
jpayne@69
|
416 // Allocate a PromiseNode without appending it to any existing promise arena. Space for a new
|
jpayne@69
|
417 // arena will be allocated.
|
jpayne@69
|
418 return PromiseDisposer::alloc<T>(kj::fwd<Params>(params)...);
|
jpayne@69
|
419 }
|
jpayne@69
|
420
|
jpayne@69
|
421 template <typename T, bool arena = PromiseDisposer::canArenaAllocate<T>()>
|
jpayne@69
|
422 struct FreePromiseNode;
|
jpayne@69
|
423 template <typename T>
|
jpayne@69
|
424 struct FreePromiseNode<T, true> {
|
jpayne@69
|
425 static inline void free(T* ptr) {
|
jpayne@69
|
426 // The object will have been allocated in an arena, so we only want to run the destructor.
|
jpayne@69
|
427 // The arena's memory will be freed separately.
|
jpayne@69
|
428 kj::dtor(*ptr);
|
jpayne@69
|
429 }
|
jpayne@69
|
430 };
|
jpayne@69
|
431 template <typename T>
|
jpayne@69
|
432 struct FreePromiseNode<T, false> {
|
jpayne@69
|
433 static inline void free(T* ptr) {
|
jpayne@69
|
434 // The object will have been allocated separately on the heap.
|
jpayne@69
|
435 return delete ptr;
|
jpayne@69
|
436 }
|
jpayne@69
|
437 };
|
jpayne@69
|
438
|
jpayne@69
|
439 template <typename T>
|
jpayne@69
|
440 static void freePromise(T* ptr) {
|
jpayne@69
|
441 // Free a PromiseNode originally allocated using `allocPromise<T>()`. The implementation of
|
jpayne@69
|
442 // PromiseNode::destroy() must call this for any type that is allocated using allocPromise().
|
jpayne@69
|
443 FreePromiseNode<T>::free(ptr);
|
jpayne@69
|
444 }
|
jpayne@69
|
445
|
jpayne@69
|
446 template <typename T, typename... Params>
|
jpayne@69
|
447 static kj::Own<T, PromiseDisposer> appendPromise(OwnPromiseNode&& next, Params&&... params) {
|
jpayne@69
|
448 // Append a promise to the arena that currently ends with `next`. `next` is also still passed as
|
jpayne@69
|
449 // the first parameter to the new object's constructor.
|
jpayne@69
|
450 //
|
jpayne@69
|
451 // This is semantically the same as `allocPromise()` except that it may avoid the underlying
|
jpayne@69
|
452 // memory allocation. `next` must end up being destroyed before the new object (i.e. the new
|
jpayne@69
|
453 // object must never transfer away ownership of `next`).
|
jpayne@69
|
454 return PromiseDisposer::append<T>(kj::mv(next), kj::fwd<Params>(params)...);
|
jpayne@69
|
455 }
|
jpayne@69
|
456
|
jpayne@69
|
457 // -------------------------------------------------------------------
|
jpayne@69
|
458
|
jpayne@69
|
459 inline ReadyNow::operator Promise<void>() const {
|
jpayne@69
|
460 return PromiseNode::to<Promise<void>>(readyNow());
|
jpayne@69
|
461 }
|
jpayne@69
|
462
|
jpayne@69
|
463 template <typename T>
|
jpayne@69
|
464 inline NeverDone::operator Promise<T>() const {
|
jpayne@69
|
465 return PromiseNode::to<Promise<T>>(neverDone());
|
jpayne@69
|
466 }
|
jpayne@69
|
467
|
jpayne@69
|
468 // -------------------------------------------------------------------
|
jpayne@69
|
469
|
jpayne@69
|
470 class ImmediatePromiseNodeBase: public PromiseNode {
|
jpayne@69
|
471 public:
|
jpayne@69
|
472 ImmediatePromiseNodeBase();
|
jpayne@69
|
473 ~ImmediatePromiseNodeBase() noexcept(false);
|
jpayne@69
|
474
|
jpayne@69
|
475 void onReady(Event* event) noexcept override;
|
jpayne@69
|
476 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
477 };
|
jpayne@69
|
478
|
jpayne@69
|
479 template <typename T>
|
jpayne@69
|
480 class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
|
jpayne@69
|
481 // A promise that has already been resolved to an immediate value or exception.
|
jpayne@69
|
482
|
jpayne@69
|
483 public:
|
jpayne@69
|
484 ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
|
jpayne@69
|
485 void destroy() override { freePromise(this); }
|
jpayne@69
|
486
|
jpayne@69
|
487 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
488 output.as<T>() = kj::mv(result);
|
jpayne@69
|
489 }
|
jpayne@69
|
490
|
jpayne@69
|
491 private:
|
jpayne@69
|
492 ExceptionOr<T> result;
|
jpayne@69
|
493 };
|
jpayne@69
|
494
|
jpayne@69
|
495 class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
|
jpayne@69
|
496 public:
|
jpayne@69
|
497 ImmediateBrokenPromiseNode(Exception&& exception);
|
jpayne@69
|
498 void destroy() override;
|
jpayne@69
|
499
|
jpayne@69
|
500 void get(ExceptionOrValue& output) noexcept override;
|
jpayne@69
|
501
|
jpayne@69
|
502 private:
|
jpayne@69
|
503 Exception exception;
|
jpayne@69
|
504 };
|
jpayne@69
|
505
|
jpayne@69
|
506 template <typename T, T value>
|
jpayne@69
|
507 class ConstPromiseNode: public ImmediatePromiseNodeBase {
|
jpayne@69
|
508 public:
|
jpayne@69
|
509 void destroy() override {}
|
jpayne@69
|
510 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
511 output.as<T>() = value;
|
jpayne@69
|
512 }
|
jpayne@69
|
513 };
|
jpayne@69
|
514
|
jpayne@69
|
515 // -------------------------------------------------------------------
|
jpayne@69
|
516
|
jpayne@69
|
517 class AttachmentPromiseNodeBase: public PromiseNode {
|
jpayne@69
|
518 public:
|
jpayne@69
|
519 AttachmentPromiseNodeBase(OwnPromiseNode&& dependency);
|
jpayne@69
|
520
|
jpayne@69
|
521 void onReady(Event* event) noexcept override;
|
jpayne@69
|
522 void get(ExceptionOrValue& output) noexcept override;
|
jpayne@69
|
523 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
524
|
jpayne@69
|
525 private:
|
jpayne@69
|
526 OwnPromiseNode dependency;
|
jpayne@69
|
527
|
jpayne@69
|
528 void dropDependency();
|
jpayne@69
|
529
|
jpayne@69
|
530 template <typename>
|
jpayne@69
|
531 friend class AttachmentPromiseNode;
|
jpayne@69
|
532 };
|
jpayne@69
|
533
|
jpayne@69
|
534 template <typename Attachment>
|
jpayne@69
|
535 class AttachmentPromiseNode final: public AttachmentPromiseNodeBase {
|
jpayne@69
|
536 // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable
|
jpayne@69
|
537 // object) until the promise resolves.
|
jpayne@69
|
538
|
jpayne@69
|
539 public:
|
jpayne@69
|
540 AttachmentPromiseNode(OwnPromiseNode&& dependency, Attachment&& attachment)
|
jpayne@69
|
541 : AttachmentPromiseNodeBase(kj::mv(dependency)),
|
jpayne@69
|
542 attachment(kj::mv<Attachment>(attachment)) {}
|
jpayne@69
|
543 void destroy() override { freePromise(this); }
|
jpayne@69
|
544
|
jpayne@69
|
545 ~AttachmentPromiseNode() noexcept(false) {
|
jpayne@69
|
546 // We need to make sure the dependency is deleted before we delete the attachment because the
|
jpayne@69
|
547 // dependency may be using the attachment.
|
jpayne@69
|
548 dropDependency();
|
jpayne@69
|
549 }
|
jpayne@69
|
550
|
jpayne@69
|
551 private:
|
jpayne@69
|
552 Attachment attachment;
|
jpayne@69
|
553 };
|
jpayne@69
|
554
|
jpayne@69
|
555 // -------------------------------------------------------------------
|
jpayne@69
|
556
|
jpayne@69
|
557 #if __GNUC__ >= 8 && !__clang__
|
jpayne@69
|
558 // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no
|
jpayne@69
|
559 // "legal" way for us to extract the content of a PTMF so too bad.
|
jpayne@69
|
560 #pragma GCC diagnostic push
|
jpayne@69
|
561 #pragma GCC diagnostic ignored "-Wclass-memaccess"
|
jpayne@69
|
562 #if __GNUC__ >= 11
|
jpayne@69
|
563 // GCC 11's array-bounds is similarly upset with us for digging into "private" implementation
|
jpayne@69
|
564 // details. But the format is well-defined by the ABI which cannot change so please just let us
|
jpayne@69
|
565 // do it kthx.
|
jpayne@69
|
566 #pragma GCC diagnostic ignored "-Warray-bounds"
|
jpayne@69
|
567 #endif
|
jpayne@69
|
568 #endif
|
jpayne@69
|
569
|
jpayne@69
|
570 template <typename T, typename ReturnType, typename... ParamTypes>
|
jpayne@69
|
571 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
|
jpayne@69
|
572 template <typename T, typename ReturnType, typename... ParamTypes>
|
jpayne@69
|
573 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
|
jpayne@69
|
574 // Given an object and a pointer-to-method, return the start address of the method's code. The
|
jpayne@69
|
575 // intent is that this address can be used in a trace; addr2line should map it to the start of
|
jpayne@69
|
576 // the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine
|
jpayne@69
|
577 // the address of the specific implementation (otherwise, `obj` wouldn't be needed).
|
jpayne@69
|
578 //
|
jpayne@69
|
579 // Note that if the method is overloaded or is a template, you will need to explicitly specify
|
jpayne@69
|
580 // the param and return types, otherwise the compiler won't know which overload / template
|
jpayne@69
|
581 // specialization you are requesting.
|
jpayne@69
|
582
|
jpayne@69
|
583 class PtmfHelper {
|
jpayne@69
|
584 // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The
|
jpayne@69
|
585 // class represents the internal representation of a pointer-to-member-function.
|
jpayne@69
|
586
|
jpayne@69
|
587 template <typename... ParamTypes>
|
jpayne@69
|
588 friend struct GetFunctorStartAddress;
|
jpayne@69
|
589 template <typename T, typename ReturnType, typename... ParamTypes>
|
jpayne@69
|
590 friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
|
jpayne@69
|
591 template <typename T, typename ReturnType, typename... ParamTypes>
|
jpayne@69
|
592 friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
|
jpayne@69
|
593
|
jpayne@69
|
594 #if __GNUG__
|
jpayne@69
|
595
|
jpayne@69
|
596 void* ptr;
|
jpayne@69
|
597 ptrdiff_t adj;
|
jpayne@69
|
598 // Layout of a pointer-to-member-function used by GCC and compatible compilers.
|
jpayne@69
|
599
|
jpayne@69
|
600 void* apply(const void* obj) {
|
jpayne@69
|
601 #if defined(__arm__) || defined(__mips__) || defined(__aarch64__)
|
jpayne@69
|
602 if (adj & 1) {
|
jpayne@69
|
603 ptrdiff_t voff = (ptrdiff_t)ptr;
|
jpayne@69
|
604 #else
|
jpayne@69
|
605 ptrdiff_t voff = (ptrdiff_t)ptr;
|
jpayne@69
|
606 if (voff & 1) {
|
jpayne@69
|
607 voff &= ~1;
|
jpayne@69
|
608 #endif
|
jpayne@69
|
609 return *(void**)(*(char**)obj + voff);
|
jpayne@69
|
610 } else {
|
jpayne@69
|
611 return ptr;
|
jpayne@69
|
612 }
|
jpayne@69
|
613 }
|
jpayne@69
|
614
|
jpayne@69
|
615 #define BODY \
|
jpayne@69
|
616 PtmfHelper result; \
|
jpayne@69
|
617 static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \
|
jpayne@69
|
618 memcpy(&result, &p, sizeof(result)); \
|
jpayne@69
|
619 return result
|
jpayne@69
|
620
|
jpayne@69
|
621 #else // __GNUG__
|
jpayne@69
|
622
|
jpayne@69
|
623 void* apply(const void* obj) { return nullptr; }
|
jpayne@69
|
624 // TODO(port): PTMF instruction address extraction
|
jpayne@69
|
625
|
jpayne@69
|
626 #define BODY return PtmfHelper{}
|
jpayne@69
|
627
|
jpayne@69
|
628 #endif // __GNUG__, else
|
jpayne@69
|
629
|
jpayne@69
|
630 template <typename R, typename C, typename... P, typename F>
|
jpayne@69
|
631 static PtmfHelper from(F p) { BODY; }
|
jpayne@69
|
632 // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not
|
jpayne@69
|
633 // overloaded nor a template. In this case the compiler is able to deduce the full function
|
jpayne@69
|
634 // signature directly given the name since there is only one function with that name.
|
jpayne@69
|
635
|
jpayne@69
|
636 template <typename R, typename C, typename... P>
|
jpayne@69
|
637 static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; }
|
jpayne@69
|
638 template <typename R, typename C, typename... P>
|
jpayne@69
|
639 static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; }
|
jpayne@69
|
640 // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case
|
jpayne@69
|
641 // the function must match exactly the containing type C, return type R, and parameter types P...
|
jpayne@69
|
642 // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a
|
jpayne@69
|
643 // guess at P. Luckily, if the function parameters are template parameters then it's not
|
jpayne@69
|
644 // necessary to be precise about P.
|
jpayne@69
|
645 #undef BODY
|
jpayne@69
|
646 };
|
jpayne@69
|
647
|
jpayne@69
|
648 #if __GNUC__ >= 8 && !__clang__
|
jpayne@69
|
649 #pragma GCC diagnostic pop
|
jpayne@69
|
650 #endif
|
jpayne@69
|
651
|
jpayne@69
|
652 template <typename T, typename ReturnType, typename... ParamTypes>
|
jpayne@69
|
653 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) {
|
jpayne@69
|
654 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
|
jpayne@69
|
655 }
|
jpayne@69
|
656 template <typename T, typename ReturnType, typename... ParamTypes>
|
jpayne@69
|
657 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) {
|
jpayne@69
|
658 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
|
jpayne@69
|
659 }
|
jpayne@69
|
660
|
jpayne@69
|
661 template <typename... ParamTypes>
|
jpayne@69
|
662 struct GetFunctorStartAddress {
|
jpayne@69
|
663 // Given a functor (any object defining operator()), return the start address of the function,
|
jpayne@69
|
664 // suitable for passing to addr2line to obtain a source file/line for debugging purposes.
|
jpayne@69
|
665 //
|
jpayne@69
|
666 // This turns out to be incredibly hard to implement in the presence of overloaded or templated
|
jpayne@69
|
667 // functors. Therefore, we impose these specific restrictions, specific to our use case:
|
jpayne@69
|
668 // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas
|
jpayne@69
|
669 // anyway.)
|
jpayne@69
|
670 // - The template parameters to GetFunctorStartAddress specify a hint as to the expected
|
jpayne@69
|
671 // parameter types. If the functor is templated, its parameters must match exactly these types.
|
jpayne@69
|
672 // (If it's not templated, ParamTypes are ignored.)
|
jpayne@69
|
673
|
jpayne@69
|
674 template <typename Func>
|
jpayne@69
|
675 static void* apply(Func&& func) {
|
jpayne@69
|
676 typedef decltype(func(instance<ParamTypes>()...)) ReturnType;
|
jpayne@69
|
677 return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>(
|
jpayne@69
|
678 &Decay<Func>::operator()).apply(&func);
|
jpayne@69
|
679 }
|
jpayne@69
|
680 };
|
jpayne@69
|
681
|
jpayne@69
|
682 template <>
|
jpayne@69
|
683 struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {};
|
jpayne@69
|
684 // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function
|
jpayne@69
|
685 // actually has no parameters.
|
jpayne@69
|
686
|
jpayne@69
|
687 class TransformPromiseNodeBase: public PromiseNode {
|
jpayne@69
|
688 public:
|
jpayne@69
|
689 TransformPromiseNodeBase(OwnPromiseNode&& dependency, void* continuationTracePtr);
|
jpayne@69
|
690
|
jpayne@69
|
691 void onReady(Event* event) noexcept override;
|
jpayne@69
|
692 void get(ExceptionOrValue& output) noexcept override;
|
jpayne@69
|
693 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
694
|
jpayne@69
|
695 private:
|
jpayne@69
|
696 OwnPromiseNode dependency;
|
jpayne@69
|
697 void* continuationTracePtr;
|
jpayne@69
|
698
|
jpayne@69
|
699 void dropDependency();
|
jpayne@69
|
700 void getDepResult(ExceptionOrValue& output);
|
jpayne@69
|
701
|
jpayne@69
|
702 virtual void getImpl(ExceptionOrValue& output) = 0;
|
jpayne@69
|
703
|
jpayne@69
|
704 template <typename, typename, typename, typename>
|
jpayne@69
|
705 friend class TransformPromiseNode;
|
jpayne@69
|
706 };
|
jpayne@69
|
707
|
jpayne@69
|
708 template <typename T, typename DepT, typename Func, typename ErrorFunc>
|
jpayne@69
|
709 class TransformPromiseNode final: public TransformPromiseNodeBase {
|
jpayne@69
|
710 // A PromiseNode that transforms the result of another PromiseNode through an application-provided
|
jpayne@69
|
711 // function (implements `then()`).
|
jpayne@69
|
712
|
jpayne@69
|
713 public:
|
jpayne@69
|
714 TransformPromiseNode(OwnPromiseNode&& dependency, Func&& func, ErrorFunc&& errorHandler,
|
jpayne@69
|
715 void* continuationTracePtr)
|
jpayne@69
|
716 : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr),
|
jpayne@69
|
717 func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
|
jpayne@69
|
718 void destroy() override { freePromise(this); }
|
jpayne@69
|
719
|
jpayne@69
|
720 ~TransformPromiseNode() noexcept(false) {
|
jpayne@69
|
721 // We need to make sure the dependency is deleted before we delete the continuations because it
|
jpayne@69
|
722 // is a common pattern for the continuations to hold ownership of objects that might be in-use
|
jpayne@69
|
723 // by the dependency.
|
jpayne@69
|
724 dropDependency();
|
jpayne@69
|
725 }
|
jpayne@69
|
726
|
jpayne@69
|
727 private:
|
jpayne@69
|
728 Func func;
|
jpayne@69
|
729 ErrorFunc errorHandler;
|
jpayne@69
|
730
|
jpayne@69
|
731 void getImpl(ExceptionOrValue& output) override {
|
jpayne@69
|
732 ExceptionOr<DepT> depResult;
|
jpayne@69
|
733 getDepResult(depResult);
|
jpayne@69
|
734 KJ_IF_MAYBE(depException, depResult.exception) {
|
jpayne@69
|
735 output.as<T>() = handle(
|
jpayne@69
|
736 MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply(
|
jpayne@69
|
737 errorHandler, kj::mv(*depException)));
|
jpayne@69
|
738 } else KJ_IF_MAYBE(depValue, depResult.value) {
|
jpayne@69
|
739 output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
|
jpayne@69
|
740 }
|
jpayne@69
|
741 }
|
jpayne@69
|
742
|
jpayne@69
|
743 ExceptionOr<T> handle(T&& value) {
|
jpayne@69
|
744 return kj::mv(value);
|
jpayne@69
|
745 }
|
jpayne@69
|
746 ExceptionOr<T> handle(PropagateException::Bottom&& value) {
|
jpayne@69
|
747 return ExceptionOr<T>(false, value.asException());
|
jpayne@69
|
748 }
|
jpayne@69
|
749 };
|
jpayne@69
|
750
|
jpayne@69
|
751 // -------------------------------------------------------------------
|
jpayne@69
|
752
|
jpayne@69
|
753 class ForkHubBase;
|
jpayne@69
|
754 using OwnForkHubBase = Own<ForkHubBase, ForkHubBase>;
|
jpayne@69
|
755
|
jpayne@69
|
756 class ForkBranchBase: public PromiseNode {
|
jpayne@69
|
757 public:
|
jpayne@69
|
758 ForkBranchBase(OwnForkHubBase&& hub);
|
jpayne@69
|
759 ~ForkBranchBase() noexcept(false);
|
jpayne@69
|
760
|
jpayne@69
|
761 void hubReady() noexcept;
|
jpayne@69
|
762 // Called by the hub to indicate that it is ready.
|
jpayne@69
|
763
|
jpayne@69
|
764 // implements PromiseNode ------------------------------------------
|
jpayne@69
|
765 void onReady(Event* event) noexcept override;
|
jpayne@69
|
766 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
767
|
jpayne@69
|
768 protected:
|
jpayne@69
|
769 inline ExceptionOrValue& getHubResultRef();
|
jpayne@69
|
770
|
jpayne@69
|
771 void releaseHub(ExceptionOrValue& output);
|
jpayne@69
|
772 // Release the hub. If an exception is thrown, add it to `output`.
|
jpayne@69
|
773
|
jpayne@69
|
774 private:
|
jpayne@69
|
775 OnReadyEvent onReadyEvent;
|
jpayne@69
|
776
|
jpayne@69
|
777 OwnForkHubBase hub;
|
jpayne@69
|
778 ForkBranchBase* next = nullptr;
|
jpayne@69
|
779 ForkBranchBase** prevPtr = nullptr;
|
jpayne@69
|
780
|
jpayne@69
|
781 friend class ForkHubBase;
|
jpayne@69
|
782 };
|
jpayne@69
|
783
|
jpayne@69
|
784 template <typename T> T copyOrAddRef(T& t) { return t; }
|
jpayne@69
|
785 template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); }
|
jpayne@69
|
786 template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) {
|
jpayne@69
|
787 return t.map([](Own<T>& ptr) {
|
jpayne@69
|
788 return ptr->addRef();
|
jpayne@69
|
789 });
|
jpayne@69
|
790 }
|
jpayne@69
|
791
|
jpayne@69
|
792 template <typename T>
|
jpayne@69
|
793 class ForkBranch final: public ForkBranchBase {
|
jpayne@69
|
794 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
|
jpayne@69
|
795 // a const reference.
|
jpayne@69
|
796
|
jpayne@69
|
797 public:
|
jpayne@69
|
798 ForkBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {}
|
jpayne@69
|
799 void destroy() override { freePromise(this); }
|
jpayne@69
|
800
|
jpayne@69
|
801 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
802 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
|
jpayne@69
|
803 KJ_IF_MAYBE(value, hubResult.value) {
|
jpayne@69
|
804 output.as<T>().value = copyOrAddRef(*value);
|
jpayne@69
|
805 } else {
|
jpayne@69
|
806 output.as<T>().value = nullptr;
|
jpayne@69
|
807 }
|
jpayne@69
|
808 output.exception = hubResult.exception;
|
jpayne@69
|
809 releaseHub(output);
|
jpayne@69
|
810 }
|
jpayne@69
|
811 };
|
jpayne@69
|
812
|
jpayne@69
|
813 template <typename T, size_t index>
|
jpayne@69
|
814 class SplitBranch final: public ForkBranchBase {
|
jpayne@69
|
815 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
|
jpayne@69
|
816 // a const reference.
|
jpayne@69
|
817
|
jpayne@69
|
818 public:
|
jpayne@69
|
819 SplitBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {}
|
jpayne@69
|
820 void destroy() override { freePromise(this); }
|
jpayne@69
|
821
|
jpayne@69
|
822 typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element;
|
jpayne@69
|
823
|
jpayne@69
|
824 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
825 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
|
jpayne@69
|
826 KJ_IF_MAYBE(value, hubResult.value) {
|
jpayne@69
|
827 output.as<Element>().value = kj::mv(kj::get<index>(*value));
|
jpayne@69
|
828 } else {
|
jpayne@69
|
829 output.as<Element>().value = nullptr;
|
jpayne@69
|
830 }
|
jpayne@69
|
831 output.exception = hubResult.exception;
|
jpayne@69
|
832 releaseHub(output);
|
jpayne@69
|
833 }
|
jpayne@69
|
834 };
|
jpayne@69
|
835
|
jpayne@69
|
836 // -------------------------------------------------------------------
|
jpayne@69
|
837
|
jpayne@69
|
838 class ForkHubBase: public PromiseArenaMember, protected Event {
|
jpayne@69
|
839 public:
|
jpayne@69
|
840 ForkHubBase(OwnPromiseNode&& inner, ExceptionOrValue& resultRef, SourceLocation location);
|
jpayne@69
|
841
|
jpayne@69
|
842 inline ExceptionOrValue& getResultRef() { return resultRef; }
|
jpayne@69
|
843
|
jpayne@69
|
844 inline bool isShared() const { return refcount > 1; }
|
jpayne@69
|
845
|
jpayne@69
|
846 Own<ForkHubBase, ForkHubBase> addRef() {
|
jpayne@69
|
847 ++refcount;
|
jpayne@69
|
848 return Own<ForkHubBase, ForkHubBase>(this);
|
jpayne@69
|
849 }
|
jpayne@69
|
850
|
jpayne@69
|
851 static void dispose(ForkHubBase* obj) {
|
jpayne@69
|
852 if (--obj->refcount == 0) {
|
jpayne@69
|
853 PromiseDisposer::dispose(obj);
|
jpayne@69
|
854 }
|
jpayne@69
|
855 }
|
jpayne@69
|
856
|
jpayne@69
|
857 private:
|
jpayne@69
|
858 uint refcount = 1;
|
jpayne@69
|
859 // We manually implement refcounting for ForkHubBase so that we can use it together with
|
jpayne@69
|
860 // PromiseDisposer's arena allocation.
|
jpayne@69
|
861
|
jpayne@69
|
862 OwnPromiseNode inner;
|
jpayne@69
|
863 ExceptionOrValue& resultRef;
|
jpayne@69
|
864
|
jpayne@69
|
865 ForkBranchBase* headBranch = nullptr;
|
jpayne@69
|
866 ForkBranchBase** tailBranch = &headBranch;
|
jpayne@69
|
867 // Tail becomes null once the inner promise is ready and all branches have been notified.
|
jpayne@69
|
868
|
jpayne@69
|
869 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
870 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
871
|
jpayne@69
|
872 friend class ForkBranchBase;
|
jpayne@69
|
873 };
|
jpayne@69
|
874
|
jpayne@69
|
875 template <typename T>
|
jpayne@69
|
876 class ForkHub final: public ForkHubBase {
|
jpayne@69
|
877 // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces
|
jpayne@69
|
878 // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if
|
jpayne@69
|
879 // possible).
|
jpayne@69
|
880
|
jpayne@69
|
881 public:
|
jpayne@69
|
882 ForkHub(OwnPromiseNode&& inner, SourceLocation location)
|
jpayne@69
|
883 : ForkHubBase(kj::mv(inner), result, location) {}
|
jpayne@69
|
884 void destroy() override { freePromise(this); }
|
jpayne@69
|
885
|
jpayne@69
|
886 Promise<_::UnfixVoid<T>> addBranch() {
|
jpayne@69
|
887 return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>(
|
jpayne@69
|
888 allocPromise<ForkBranch<T>>(addRef()));
|
jpayne@69
|
889 }
|
jpayne@69
|
890
|
jpayne@69
|
891 _::SplitTuplePromise<T> split(SourceLocation location) {
|
jpayne@69
|
892 return splitImpl(MakeIndexes<tupleSize<T>()>(), location);
|
jpayne@69
|
893 }
|
jpayne@69
|
894
|
jpayne@69
|
895 private:
|
jpayne@69
|
896 ExceptionOr<T> result;
|
jpayne@69
|
897
|
jpayne@69
|
898 template <size_t... indexes>
|
jpayne@69
|
899 _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>, SourceLocation location) {
|
jpayne@69
|
900 return kj::tuple(addSplit<indexes>(location)...);
|
jpayne@69
|
901 }
|
jpayne@69
|
902
|
jpayne@69
|
903 template <size_t index>
|
jpayne@69
|
904 ReducePromises<typename SplitBranch<T, index>::Element> addSplit(SourceLocation location) {
|
jpayne@69
|
905 return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>(
|
jpayne@69
|
906 maybeChain(allocPromise<SplitBranch<T, index>>(addRef()),
|
jpayne@69
|
907 implicitCast<typename SplitBranch<T, index>::Element*>(nullptr),
|
jpayne@69
|
908 location));
|
jpayne@69
|
909 }
|
jpayne@69
|
910 };
|
jpayne@69
|
911
|
jpayne@69
|
912 inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
|
jpayne@69
|
913 return hub->getResultRef();
|
jpayne@69
|
914 }
|
jpayne@69
|
915
|
jpayne@69
|
916 // -------------------------------------------------------------------
|
jpayne@69
|
917
|
jpayne@69
|
918 class ChainPromiseNode final: public PromiseNode, public Event {
|
jpayne@69
|
919 // Promise node which reduces Promise<Promise<T>> to Promise<T>.
|
jpayne@69
|
920 //
|
jpayne@69
|
921 // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to
|
jpayne@69
|
922 // Own<Event>. Ugh, templates and private...
|
jpayne@69
|
923
|
jpayne@69
|
924 public:
|
jpayne@69
|
925 explicit ChainPromiseNode(OwnPromiseNode inner, SourceLocation location);
|
jpayne@69
|
926 ~ChainPromiseNode() noexcept(false);
|
jpayne@69
|
927 void destroy() override;
|
jpayne@69
|
928
|
jpayne@69
|
929 void onReady(Event* event) noexcept override;
|
jpayne@69
|
930 void setSelfPointer(OwnPromiseNode* selfPtr) noexcept override;
|
jpayne@69
|
931 void get(ExceptionOrValue& output) noexcept override;
|
jpayne@69
|
932 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
933
|
jpayne@69
|
934 private:
|
jpayne@69
|
935 enum State {
|
jpayne@69
|
936 STEP1,
|
jpayne@69
|
937 STEP2
|
jpayne@69
|
938 };
|
jpayne@69
|
939
|
jpayne@69
|
940 State state;
|
jpayne@69
|
941
|
jpayne@69
|
942 OwnPromiseNode inner;
|
jpayne@69
|
943 // In STEP1, a PromiseNode for a Promise<T>.
|
jpayne@69
|
944 // In STEP2, a PromiseNode for a T.
|
jpayne@69
|
945
|
jpayne@69
|
946 Event* onReadyEvent = nullptr;
|
jpayne@69
|
947 OwnPromiseNode* selfPtr = nullptr;
|
jpayne@69
|
948
|
jpayne@69
|
949 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
950 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
951 };
|
jpayne@69
|
952
|
jpayne@69
|
953 template <typename T>
|
jpayne@69
|
954 OwnPromiseNode maybeChain(OwnPromiseNode&& node, Promise<T>*, SourceLocation location) {
|
jpayne@69
|
955 return appendPromise<ChainPromiseNode>(kj::mv(node), location);
|
jpayne@69
|
956 }
|
jpayne@69
|
957
|
jpayne@69
|
958 template <typename T>
|
jpayne@69
|
959 OwnPromiseNode&& maybeChain(OwnPromiseNode&& node, T*, SourceLocation location) {
|
jpayne@69
|
960 return kj::mv(node);
|
jpayne@69
|
961 }
|
jpayne@69
|
962
|
jpayne@69
|
963 template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))>
|
jpayne@69
|
964 inline Result maybeReduce(Promise<T>&& promise, bool) {
|
jpayne@69
|
965 return T::reducePromise(kj::mv(promise));
|
jpayne@69
|
966 }
|
jpayne@69
|
967
|
jpayne@69
|
968 template <typename T>
|
jpayne@69
|
969 inline Promise<T> maybeReduce(Promise<T>&& promise, ...) {
|
jpayne@69
|
970 return kj::mv(promise);
|
jpayne@69
|
971 }
|
jpayne@69
|
972
|
jpayne@69
|
973 // -------------------------------------------------------------------
|
jpayne@69
|
974
|
jpayne@69
|
975 class ExclusiveJoinPromiseNode final: public PromiseNode {
|
jpayne@69
|
976 public:
|
jpayne@69
|
977 ExclusiveJoinPromiseNode(OwnPromiseNode left, OwnPromiseNode right, SourceLocation location);
|
jpayne@69
|
978 ~ExclusiveJoinPromiseNode() noexcept(false);
|
jpayne@69
|
979 void destroy() override;
|
jpayne@69
|
980
|
jpayne@69
|
981 void onReady(Event* event) noexcept override;
|
jpayne@69
|
982 void get(ExceptionOrValue& output) noexcept override;
|
jpayne@69
|
983 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
984
|
jpayne@69
|
985 private:
|
jpayne@69
|
986 class Branch: public Event {
|
jpayne@69
|
987 public:
|
jpayne@69
|
988 Branch(ExclusiveJoinPromiseNode& joinNode, OwnPromiseNode dependency,
|
jpayne@69
|
989 SourceLocation location);
|
jpayne@69
|
990 ~Branch() noexcept(false);
|
jpayne@69
|
991
|
jpayne@69
|
992 bool get(ExceptionOrValue& output);
|
jpayne@69
|
993 // Returns true if this is the side that finished.
|
jpayne@69
|
994
|
jpayne@69
|
995 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
996 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
997
|
jpayne@69
|
998 private:
|
jpayne@69
|
999 ExclusiveJoinPromiseNode& joinNode;
|
jpayne@69
|
1000 OwnPromiseNode dependency;
|
jpayne@69
|
1001
|
jpayne@69
|
1002 friend class ExclusiveJoinPromiseNode;
|
jpayne@69
|
1003 };
|
jpayne@69
|
1004
|
jpayne@69
|
1005 Branch left;
|
jpayne@69
|
1006 Branch right;
|
jpayne@69
|
1007 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1008 };
|
jpayne@69
|
1009
|
jpayne@69
|
1010 // -------------------------------------------------------------------
|
jpayne@69
|
1011
|
jpayne@69
|
1012 enum class ArrayJoinBehavior {
|
jpayne@69
|
1013 LAZY,
|
jpayne@69
|
1014 EAGER,
|
jpayne@69
|
1015 };
|
jpayne@69
|
1016
|
jpayne@69
|
1017 class ArrayJoinPromiseNodeBase: public PromiseNode {
|
jpayne@69
|
1018 public:
|
jpayne@69
|
1019 ArrayJoinPromiseNodeBase(Array<OwnPromiseNode> promises,
|
jpayne@69
|
1020 ExceptionOrValue* resultParts, size_t partSize,
|
jpayne@69
|
1021 SourceLocation location,
|
jpayne@69
|
1022 ArrayJoinBehavior joinBehavior);
|
jpayne@69
|
1023 ~ArrayJoinPromiseNodeBase() noexcept(false);
|
jpayne@69
|
1024
|
jpayne@69
|
1025 void onReady(Event* event) noexcept override final;
|
jpayne@69
|
1026 void get(ExceptionOrValue& output) noexcept override final;
|
jpayne@69
|
1027 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final;
|
jpayne@69
|
1028
|
jpayne@69
|
1029 protected:
|
jpayne@69
|
1030 virtual void getNoError(ExceptionOrValue& output) noexcept = 0;
|
jpayne@69
|
1031 // Called to compile the result only in the case where there were no errors.
|
jpayne@69
|
1032
|
jpayne@69
|
1033 private:
|
jpayne@69
|
1034 const ArrayJoinBehavior joinBehavior;
|
jpayne@69
|
1035
|
jpayne@69
|
1036 uint countLeft;
|
jpayne@69
|
1037 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1038 bool armed = false;
|
jpayne@69
|
1039
|
jpayne@69
|
1040 class Branch final: public Event {
|
jpayne@69
|
1041 public:
|
jpayne@69
|
1042 Branch(ArrayJoinPromiseNodeBase& joinNode, OwnPromiseNode dependency,
|
jpayne@69
|
1043 ExceptionOrValue& output, SourceLocation location);
|
jpayne@69
|
1044 ~Branch() noexcept(false);
|
jpayne@69
|
1045
|
jpayne@69
|
1046 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
1047 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
1048
|
jpayne@69
|
1049 private:
|
jpayne@69
|
1050 ArrayJoinPromiseNodeBase& joinNode;
|
jpayne@69
|
1051 OwnPromiseNode dependency;
|
jpayne@69
|
1052 ExceptionOrValue& output;
|
jpayne@69
|
1053
|
jpayne@69
|
1054 friend class ArrayJoinPromiseNodeBase;
|
jpayne@69
|
1055 };
|
jpayne@69
|
1056
|
jpayne@69
|
1057 Array<Branch> branches;
|
jpayne@69
|
1058 };
|
jpayne@69
|
1059
|
jpayne@69
|
1060 template <typename T>
|
jpayne@69
|
1061 class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase {
|
jpayne@69
|
1062 public:
|
jpayne@69
|
1063 ArrayJoinPromiseNode(Array<OwnPromiseNode> promises,
|
jpayne@69
|
1064 Array<ExceptionOr<T>> resultParts,
|
jpayne@69
|
1065 SourceLocation location,
|
jpayne@69
|
1066 ArrayJoinBehavior joinBehavior)
|
jpayne@69
|
1067 : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>),
|
jpayne@69
|
1068 location, joinBehavior),
|
jpayne@69
|
1069 resultParts(kj::mv(resultParts)) {}
|
jpayne@69
|
1070 void destroy() override { freePromise(this); }
|
jpayne@69
|
1071
|
jpayne@69
|
1072 protected:
|
jpayne@69
|
1073 void getNoError(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1074 auto builder = heapArrayBuilder<T>(resultParts.size());
|
jpayne@69
|
1075 for (auto& part: resultParts) {
|
jpayne@69
|
1076 KJ_IASSERT(part.value != nullptr,
|
jpayne@69
|
1077 "Bug in KJ promise framework: Promise result had neither value no exception.");
|
jpayne@69
|
1078 builder.add(kj::mv(*_::readMaybe(part.value)));
|
jpayne@69
|
1079 }
|
jpayne@69
|
1080 output.as<Array<T>>() = builder.finish();
|
jpayne@69
|
1081 }
|
jpayne@69
|
1082
|
jpayne@69
|
1083 private:
|
jpayne@69
|
1084 Array<ExceptionOr<T>> resultParts;
|
jpayne@69
|
1085 };
|
jpayne@69
|
1086
|
jpayne@69
|
1087 template <>
|
jpayne@69
|
1088 class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase {
|
jpayne@69
|
1089 public:
|
jpayne@69
|
1090 ArrayJoinPromiseNode(Array<OwnPromiseNode> promises,
|
jpayne@69
|
1091 Array<ExceptionOr<_::Void>> resultParts,
|
jpayne@69
|
1092 SourceLocation location,
|
jpayne@69
|
1093 ArrayJoinBehavior joinBehavior);
|
jpayne@69
|
1094 ~ArrayJoinPromiseNode();
|
jpayne@69
|
1095 void destroy() override;
|
jpayne@69
|
1096
|
jpayne@69
|
1097 protected:
|
jpayne@69
|
1098 void getNoError(ExceptionOrValue& output) noexcept override;
|
jpayne@69
|
1099
|
jpayne@69
|
1100 private:
|
jpayne@69
|
1101 Array<ExceptionOr<_::Void>> resultParts;
|
jpayne@69
|
1102 };
|
jpayne@69
|
1103
|
jpayne@69
|
1104 // -------------------------------------------------------------------
|
jpayne@69
|
1105
|
jpayne@69
|
1106 class EagerPromiseNodeBase: public PromiseNode, protected Event {
|
jpayne@69
|
1107 // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
|
jpayne@69
|
1108 // evaluate it.
|
jpayne@69
|
1109
|
jpayne@69
|
1110 public:
|
jpayne@69
|
1111 EagerPromiseNodeBase(OwnPromiseNode&& dependency, ExceptionOrValue& resultRef,
|
jpayne@69
|
1112 SourceLocation location);
|
jpayne@69
|
1113
|
jpayne@69
|
1114 void onReady(Event* event) noexcept override;
|
jpayne@69
|
1115 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
1116
|
jpayne@69
|
1117 private:
|
jpayne@69
|
1118 OwnPromiseNode dependency;
|
jpayne@69
|
1119 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1120
|
jpayne@69
|
1121 ExceptionOrValue& resultRef;
|
jpayne@69
|
1122
|
jpayne@69
|
1123 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
1124 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
1125 };
|
jpayne@69
|
1126
|
jpayne@69
|
1127 template <typename T>
|
jpayne@69
|
1128 class EagerPromiseNode final: public EagerPromiseNodeBase {
|
jpayne@69
|
1129 public:
|
jpayne@69
|
1130 EagerPromiseNode(OwnPromiseNode&& dependency, SourceLocation location)
|
jpayne@69
|
1131 : EagerPromiseNodeBase(kj::mv(dependency), result, location) {}
|
jpayne@69
|
1132 void destroy() override { freePromise(this); }
|
jpayne@69
|
1133
|
jpayne@69
|
1134 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1135 output.as<T>() = kj::mv(result);
|
jpayne@69
|
1136 }
|
jpayne@69
|
1137
|
jpayne@69
|
1138 private:
|
jpayne@69
|
1139 ExceptionOr<T> result;
|
jpayne@69
|
1140 };
|
jpayne@69
|
1141
|
jpayne@69
|
1142 template <typename T>
|
jpayne@69
|
1143 OwnPromiseNode spark(OwnPromiseNode&& node, SourceLocation location) {
|
jpayne@69
|
1144 // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
|
jpayne@69
|
1145 // on it.
|
jpayne@69
|
1146 return appendPromise<EagerPromiseNode<T>>(kj::mv(node), location);
|
jpayne@69
|
1147 }
|
jpayne@69
|
1148
|
jpayne@69
|
1149 // -------------------------------------------------------------------
|
jpayne@69
|
1150
|
jpayne@69
|
1151 class AdapterPromiseNodeBase: public PromiseNode {
|
jpayne@69
|
1152 public:
|
jpayne@69
|
1153 void onReady(Event* event) noexcept override;
|
jpayne@69
|
1154 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
1155
|
jpayne@69
|
1156 protected:
|
jpayne@69
|
1157 inline void setReady() {
|
jpayne@69
|
1158 onReadyEvent.arm();
|
jpayne@69
|
1159 }
|
jpayne@69
|
1160
|
jpayne@69
|
1161 private:
|
jpayne@69
|
1162 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1163 };
|
jpayne@69
|
1164
|
jpayne@69
|
1165 template <typename T, typename Adapter>
|
jpayne@69
|
1166 class AdapterPromiseNode final: public AdapterPromiseNodeBase,
|
jpayne@69
|
1167 private PromiseFulfiller<UnfixVoid<T>> {
|
jpayne@69
|
1168 // A PromiseNode that wraps a PromiseAdapter.
|
jpayne@69
|
1169
|
jpayne@69
|
1170 public:
|
jpayne@69
|
1171 template <typename... Params>
|
jpayne@69
|
1172 AdapterPromiseNode(Params&&... params)
|
jpayne@69
|
1173 : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
|
jpayne@69
|
1174 void destroy() override { freePromise(this); }
|
jpayne@69
|
1175
|
jpayne@69
|
1176 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1177 KJ_IREQUIRE(!isWaiting());
|
jpayne@69
|
1178 output.as<T>() = kj::mv(result);
|
jpayne@69
|
1179 }
|
jpayne@69
|
1180
|
jpayne@69
|
1181 private:
|
jpayne@69
|
1182 ExceptionOr<T> result;
|
jpayne@69
|
1183 bool waiting = true;
|
jpayne@69
|
1184 Adapter adapter;
|
jpayne@69
|
1185
|
jpayne@69
|
1186 void fulfill(T&& value) override {
|
jpayne@69
|
1187 if (waiting) {
|
jpayne@69
|
1188 waiting = false;
|
jpayne@69
|
1189 result = ExceptionOr<T>(kj::mv(value));
|
jpayne@69
|
1190 setReady();
|
jpayne@69
|
1191 }
|
jpayne@69
|
1192 }
|
jpayne@69
|
1193
|
jpayne@69
|
1194 void reject(Exception&& exception) override {
|
jpayne@69
|
1195 if (waiting) {
|
jpayne@69
|
1196 waiting = false;
|
jpayne@69
|
1197 result = ExceptionOr<T>(false, kj::mv(exception));
|
jpayne@69
|
1198 setReady();
|
jpayne@69
|
1199 }
|
jpayne@69
|
1200 }
|
jpayne@69
|
1201
|
jpayne@69
|
1202 bool isWaiting() override {
|
jpayne@69
|
1203 return waiting;
|
jpayne@69
|
1204 }
|
jpayne@69
|
1205 };
|
jpayne@69
|
1206
|
jpayne@69
|
1207 // -------------------------------------------------------------------
|
jpayne@69
|
1208
|
jpayne@69
|
1209 class FiberBase: public PromiseNode, private Event {
|
jpayne@69
|
1210 // Base class for the outer PromiseNode representing a fiber.
|
jpayne@69
|
1211
|
jpayne@69
|
1212 public:
|
jpayne@69
|
1213 explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location);
|
jpayne@69
|
1214 explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location);
|
jpayne@69
|
1215 ~FiberBase() noexcept(false);
|
jpayne@69
|
1216
|
jpayne@69
|
1217 void start() { armDepthFirst(); }
|
jpayne@69
|
1218 // Call immediately after construction to begin executing the fiber.
|
jpayne@69
|
1219
|
jpayne@69
|
1220 class WaitDoneEvent;
|
jpayne@69
|
1221
|
jpayne@69
|
1222 void onReady(_::Event* event) noexcept override;
|
jpayne@69
|
1223 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
1224
|
jpayne@69
|
1225 protected:
|
jpayne@69
|
1226 bool isFinished() { return state == FINISHED; }
|
jpayne@69
|
1227 void cancel();
|
jpayne@69
|
1228
|
jpayne@69
|
1229 private:
|
jpayne@69
|
1230 enum { WAITING, RUNNING, CANCELED, FINISHED } state;
|
jpayne@69
|
1231
|
jpayne@69
|
1232 _::PromiseNode* currentInner = nullptr;
|
jpayne@69
|
1233 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1234 Own<FiberStack> stack;
|
jpayne@69
|
1235 _::ExceptionOrValue& result;
|
jpayne@69
|
1236
|
jpayne@69
|
1237 void run();
|
jpayne@69
|
1238 virtual void runImpl(WaitScope& waitScope) = 0;
|
jpayne@69
|
1239
|
jpayne@69
|
1240 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
1241 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
1242 // Implements Event. Each time the event is fired, switchToFiber() is called.
|
jpayne@69
|
1243
|
jpayne@69
|
1244 friend class FiberStack;
|
jpayne@69
|
1245 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result,
|
jpayne@69
|
1246 WaitScope& waitScope, SourceLocation location);
|
jpayne@69
|
1247 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location);
|
jpayne@69
|
1248 };
|
jpayne@69
|
1249
|
jpayne@69
|
1250 template <typename Func>
|
jpayne@69
|
1251 class Fiber final: public FiberBase {
|
jpayne@69
|
1252 public:
|
jpayne@69
|
1253 explicit Fiber(size_t stackSize, Func&& func, SourceLocation location)
|
jpayne@69
|
1254 : FiberBase(stackSize, result, location), func(kj::fwd<Func>(func)) {}
|
jpayne@69
|
1255 explicit Fiber(const FiberPool& pool, Func&& func, SourceLocation location)
|
jpayne@69
|
1256 : FiberBase(pool, result, location), func(kj::fwd<Func>(func)) {}
|
jpayne@69
|
1257 ~Fiber() noexcept(false) { cancel(); }
|
jpayne@69
|
1258 void destroy() override { freePromise(this); }
|
jpayne@69
|
1259
|
jpayne@69
|
1260 typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType;
|
jpayne@69
|
1261
|
jpayne@69
|
1262 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1263 KJ_IREQUIRE(isFinished());
|
jpayne@69
|
1264 output.as<ResultType>() = kj::mv(result);
|
jpayne@69
|
1265 }
|
jpayne@69
|
1266
|
jpayne@69
|
1267 private:
|
jpayne@69
|
1268 Func func;
|
jpayne@69
|
1269 ExceptionOr<ResultType> result;
|
jpayne@69
|
1270
|
jpayne@69
|
1271 void runImpl(WaitScope& waitScope) override {
|
jpayne@69
|
1272 result.template as<ResultType>() =
|
jpayne@69
|
1273 MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope);
|
jpayne@69
|
1274 }
|
jpayne@69
|
1275 };
|
jpayne@69
|
1276
|
jpayne@69
|
1277 } // namespace _ (private)
|
jpayne@69
|
1278
|
jpayne@69
|
1279 // =======================================================================================
|
jpayne@69
|
1280
|
jpayne@69
|
1281 template <typename T>
|
jpayne@69
|
1282 Promise<T>::Promise(_::FixVoid<T> value)
|
jpayne@69
|
1283 : PromiseBase(_::allocPromise<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
|
jpayne@69
|
1284
|
jpayne@69
|
1285 template <typename T>
|
jpayne@69
|
1286 Promise<T>::Promise(kj::Exception&& exception)
|
jpayne@69
|
1287 : PromiseBase(_::allocPromise<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {}
|
jpayne@69
|
1288
|
jpayne@69
|
1289 template <typename T>
|
jpayne@69
|
1290 template <typename Func, typename ErrorFunc>
|
jpayne@69
|
1291 PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler,
|
jpayne@69
|
1292 SourceLocation location) {
|
jpayne@69
|
1293 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
|
jpayne@69
|
1294
|
jpayne@69
|
1295 void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func);
|
jpayne@69
|
1296 _::OwnPromiseNode intermediate =
|
jpayne@69
|
1297 _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
|
jpayne@69
|
1298 kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler),
|
jpayne@69
|
1299 continuationTracePtr);
|
jpayne@69
|
1300 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
|
jpayne@69
|
1301 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
|
jpayne@69
|
1302 return _::maybeReduce(kj::mv(result), false);
|
jpayne@69
|
1303 }
|
jpayne@69
|
1304
|
jpayne@69
|
1305 namespace _ { // private
|
jpayne@69
|
1306
|
jpayne@69
|
1307 template <typename T>
|
jpayne@69
|
1308 struct IdentityFunc {
|
jpayne@69
|
1309 inline T operator()(T&& value) const {
|
jpayne@69
|
1310 return kj::mv(value);
|
jpayne@69
|
1311 }
|
jpayne@69
|
1312 };
|
jpayne@69
|
1313 template <typename T>
|
jpayne@69
|
1314 struct IdentityFunc<Promise<T>> {
|
jpayne@69
|
1315 inline Promise<T> operator()(T&& value) const {
|
jpayne@69
|
1316 return kj::mv(value);
|
jpayne@69
|
1317 }
|
jpayne@69
|
1318 };
|
jpayne@69
|
1319 template <>
|
jpayne@69
|
1320 struct IdentityFunc<void> {
|
jpayne@69
|
1321 inline void operator()() const {}
|
jpayne@69
|
1322 };
|
jpayne@69
|
1323 template <>
|
jpayne@69
|
1324 struct IdentityFunc<Promise<void>> {
|
jpayne@69
|
1325 Promise<void> operator()() const;
|
jpayne@69
|
1326 // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly,
|
jpayne@69
|
1327 // Cap'n Proto relies on being able to include this header without creating such a link-time
|
jpayne@69
|
1328 // dependency.
|
jpayne@69
|
1329 };
|
jpayne@69
|
1330
|
jpayne@69
|
1331 } // namespace _ (private)
|
jpayne@69
|
1332
|
jpayne@69
|
1333 template <typename T>
|
jpayne@69
|
1334 template <typename ErrorFunc>
|
jpayne@69
|
1335 Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler, SourceLocation location) {
|
jpayne@69
|
1336 // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case,
|
jpayne@69
|
1337 // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise,
|
jpayne@69
|
1338 // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually
|
jpayne@69
|
1339 // return a promise. So we make our Func return match ErrorFunc.
|
jpayne@69
|
1340 typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func;
|
jpayne@69
|
1341 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
|
jpayne@69
|
1342
|
jpayne@69
|
1343 // The reason catch_() isn't simply implemented in terms of then() is because we want the trace
|
jpayne@69
|
1344 // pointer to be based on ErrorFunc rather than Func.
|
jpayne@69
|
1345 void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler);
|
jpayne@69
|
1346 _::OwnPromiseNode intermediate =
|
jpayne@69
|
1347 _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
|
jpayne@69
|
1348 kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr);
|
jpayne@69
|
1349 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
|
jpayne@69
|
1350 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
|
jpayne@69
|
1351 return _::maybeReduce(kj::mv(result), false);
|
jpayne@69
|
1352 }
|
jpayne@69
|
1353
|
jpayne@69
|
1354 template <typename T>
|
jpayne@69
|
1355 T Promise<T>::wait(WaitScope& waitScope, SourceLocation location) {
|
jpayne@69
|
1356 _::ExceptionOr<_::FixVoid<T>> result;
|
jpayne@69
|
1357 _::waitImpl(kj::mv(node), result, waitScope, location);
|
jpayne@69
|
1358 return convertToReturn(kj::mv(result));
|
jpayne@69
|
1359 }
|
jpayne@69
|
1360
|
jpayne@69
|
1361 template <typename T>
|
jpayne@69
|
1362 bool Promise<T>::poll(WaitScope& waitScope, SourceLocation location) {
|
jpayne@69
|
1363 return _::pollImpl(*node, waitScope, location);
|
jpayne@69
|
1364 }
|
jpayne@69
|
1365
|
jpayne@69
|
1366 template <typename T>
|
jpayne@69
|
1367 ForkedPromise<T> Promise<T>::fork(SourceLocation location) {
|
jpayne@69
|
1368 return ForkedPromise<T>(false,
|
jpayne@69
|
1369 _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>(kj::mv(node), location));
|
jpayne@69
|
1370 }
|
jpayne@69
|
1371
|
jpayne@69
|
1372 template <typename T>
|
jpayne@69
|
1373 Promise<T> ForkedPromise<T>::addBranch() {
|
jpayne@69
|
1374 return hub->addBranch();
|
jpayne@69
|
1375 }
|
jpayne@69
|
1376
|
jpayne@69
|
1377 template <typename T>
|
jpayne@69
|
1378 bool ForkedPromise<T>::hasBranches() {
|
jpayne@69
|
1379 return hub->isShared();
|
jpayne@69
|
1380 }
|
jpayne@69
|
1381
|
jpayne@69
|
1382 template <typename T>
|
jpayne@69
|
1383 _::SplitTuplePromise<T> Promise<T>::split(SourceLocation location) {
|
jpayne@69
|
1384 return _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>(
|
jpayne@69
|
1385 kj::mv(node), location)->split(location);
|
jpayne@69
|
1386 }
|
jpayne@69
|
1387
|
jpayne@69
|
1388 template <typename T>
|
jpayne@69
|
1389 Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other, SourceLocation location) {
|
jpayne@69
|
1390 return Promise(false, _::appendPromise<_::ExclusiveJoinPromiseNode>(
|
jpayne@69
|
1391 kj::mv(node), kj::mv(other.node), location));
|
jpayne@69
|
1392 }
|
jpayne@69
|
1393
|
jpayne@69
|
1394 template <typename T>
|
jpayne@69
|
1395 template <typename... Attachments>
|
jpayne@69
|
1396 Promise<T> Promise<T>::attach(Attachments&&... attachments) {
|
jpayne@69
|
1397 return Promise(false, _::appendPromise<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
|
jpayne@69
|
1398 kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...)));
|
jpayne@69
|
1399 }
|
jpayne@69
|
1400
|
jpayne@69
|
1401 template <typename T>
|
jpayne@69
|
1402 template <typename ErrorFunc>
|
jpayne@69
|
1403 Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location) {
|
jpayne@69
|
1404 // See catch_() for commentary.
|
jpayne@69
|
1405 return Promise(false, _::spark<_::FixVoid<T>>(then(
|
jpayne@69
|
1406 _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(),
|
jpayne@69
|
1407 kj::fwd<ErrorFunc>(errorHandler)).node, location));
|
jpayne@69
|
1408 }
|
jpayne@69
|
1409
|
jpayne@69
|
1410 template <typename T>
|
jpayne@69
|
1411 Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr), SourceLocation location) {
|
jpayne@69
|
1412 return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node), location));
|
jpayne@69
|
1413 }
|
jpayne@69
|
1414
|
jpayne@69
|
1415 template <typename T>
|
jpayne@69
|
1416 kj::String Promise<T>::trace() {
|
jpayne@69
|
1417 return PromiseBase::trace();
|
jpayne@69
|
1418 }
|
jpayne@69
|
1419
|
jpayne@69
|
1420 template <typename T, T value>
|
jpayne@69
|
1421 inline Promise<T> constPromise() {
|
jpayne@69
|
1422 static _::ConstPromiseNode<T, value> NODE;
|
jpayne@69
|
1423 return _::PromiseNode::to<Promise<T>>(_::OwnPromiseNode(&NODE));
|
jpayne@69
|
1424 }
|
jpayne@69
|
1425
|
jpayne@69
|
1426 template <typename Func>
|
jpayne@69
|
1427 inline PromiseForResult<Func, void> evalLater(Func&& func) {
|
jpayne@69
|
1428 return _::yield().then(kj::fwd<Func>(func), _::PropagateException());
|
jpayne@69
|
1429 }
|
jpayne@69
|
1430
|
jpayne@69
|
1431 template <typename Func>
|
jpayne@69
|
1432 inline PromiseForResult<Func, void> evalLast(Func&& func) {
|
jpayne@69
|
1433 return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException());
|
jpayne@69
|
1434 }
|
jpayne@69
|
1435
|
jpayne@69
|
1436 template <typename Func>
|
jpayne@69
|
1437 inline PromiseForResult<Func, void> evalNow(Func&& func) {
|
jpayne@69
|
1438 PromiseForResult<Func, void> result = nullptr;
|
jpayne@69
|
1439 KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() {
|
jpayne@69
|
1440 result = func();
|
jpayne@69
|
1441 })) {
|
jpayne@69
|
1442 result = kj::mv(*e);
|
jpayne@69
|
1443 }
|
jpayne@69
|
1444 return result;
|
jpayne@69
|
1445 }
|
jpayne@69
|
1446
|
jpayne@69
|
1447 template <typename Func>
|
jpayne@69
|
1448 struct RetryOnDisconnect_ {
|
jpayne@69
|
1449 static inline PromiseForResult<Func, void> apply(Func&& func) {
|
jpayne@69
|
1450 return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> {
|
jpayne@69
|
1451 auto promise = evalNow(func);
|
jpayne@69
|
1452 return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> {
|
jpayne@69
|
1453 if (e.getType() == kj::Exception::Type::DISCONNECTED) {
|
jpayne@69
|
1454 return func();
|
jpayne@69
|
1455 } else {
|
jpayne@69
|
1456 return kj::mv(e);
|
jpayne@69
|
1457 }
|
jpayne@69
|
1458 });
|
jpayne@69
|
1459 });
|
jpayne@69
|
1460 }
|
jpayne@69
|
1461 };
|
jpayne@69
|
1462 template <typename Func>
|
jpayne@69
|
1463 struct RetryOnDisconnect_<Func&> {
|
jpayne@69
|
1464 // Specialization for references. Needed because the syntax for capturing references in a
|
jpayne@69
|
1465 // lambda is different. :(
|
jpayne@69
|
1466 static inline PromiseForResult<Func, void> apply(Func& func) {
|
jpayne@69
|
1467 auto promise = evalLater(func);
|
jpayne@69
|
1468 return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> {
|
jpayne@69
|
1469 if (e.getType() == kj::Exception::Type::DISCONNECTED) {
|
jpayne@69
|
1470 return func();
|
jpayne@69
|
1471 } else {
|
jpayne@69
|
1472 return kj::mv(e);
|
jpayne@69
|
1473 }
|
jpayne@69
|
1474 });
|
jpayne@69
|
1475 }
|
jpayne@69
|
1476 };
|
jpayne@69
|
1477
|
jpayne@69
|
1478 template <typename Func>
|
jpayne@69
|
1479 inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) {
|
jpayne@69
|
1480 return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func));
|
jpayne@69
|
1481 }
|
jpayne@69
|
1482
|
jpayne@69
|
1483 template <typename Func>
|
jpayne@69
|
1484 inline PromiseForResult<Func, WaitScope&> startFiber(
|
jpayne@69
|
1485 size_t stackSize, Func&& func, SourceLocation location) {
|
jpayne@69
|
1486 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
|
jpayne@69
|
1487
|
jpayne@69
|
1488 auto intermediate = _::allocPromise<_::Fiber<Func>>(
|
jpayne@69
|
1489 stackSize, kj::fwd<Func>(func), location);
|
jpayne@69
|
1490 intermediate->start();
|
jpayne@69
|
1491 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
|
jpayne@69
|
1492 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
|
jpayne@69
|
1493 return _::maybeReduce(kj::mv(result), false);
|
jpayne@69
|
1494 }
|
jpayne@69
|
1495
|
jpayne@69
|
1496 template <typename Func>
|
jpayne@69
|
1497 inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber(
|
jpayne@69
|
1498 Func&& func, SourceLocation location) const {
|
jpayne@69
|
1499 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
|
jpayne@69
|
1500
|
jpayne@69
|
1501 auto intermediate = _::allocPromise<_::Fiber<Func>>(
|
jpayne@69
|
1502 *this, kj::fwd<Func>(func), location);
|
jpayne@69
|
1503 intermediate->start();
|
jpayne@69
|
1504 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
|
jpayne@69
|
1505 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
|
jpayne@69
|
1506 return _::maybeReduce(kj::mv(result), false);
|
jpayne@69
|
1507 }
|
jpayne@69
|
1508
|
jpayne@69
|
1509 template <typename T>
|
jpayne@69
|
1510 template <typename ErrorFunc>
|
jpayne@69
|
1511 void Promise<T>::detach(ErrorFunc&& errorHandler) {
|
jpayne@69
|
1512 return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
|
jpayne@69
|
1513 }
|
jpayne@69
|
1514
|
jpayne@69
|
1515 template <>
|
jpayne@69
|
1516 template <typename ErrorFunc>
|
jpayne@69
|
1517 void Promise<void>::detach(ErrorFunc&& errorHandler) {
|
jpayne@69
|
1518 return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
|
jpayne@69
|
1519 }
|
jpayne@69
|
1520
|
jpayne@69
|
1521 template <typename T>
|
jpayne@69
|
1522 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location) {
|
jpayne@69
|
1523 return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>(
|
jpayne@69
|
1524 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
|
jpayne@69
|
1525 heapArray<_::ExceptionOr<T>>(promises.size()), location,
|
jpayne@69
|
1526 _::ArrayJoinBehavior::LAZY));
|
jpayne@69
|
1527 }
|
jpayne@69
|
1528
|
jpayne@69
|
1529 template <typename T>
|
jpayne@69
|
1530 Promise<Array<T>> joinPromisesFailFast(Array<Promise<T>>&& promises, SourceLocation location) {
|
jpayne@69
|
1531 return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>(
|
jpayne@69
|
1532 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
|
jpayne@69
|
1533 heapArray<_::ExceptionOr<T>>(promises.size()), location,
|
jpayne@69
|
1534 _::ArrayJoinBehavior::EAGER));
|
jpayne@69
|
1535 }
|
jpayne@69
|
1536
|
jpayne@69
|
1537 // =======================================================================================
|
jpayne@69
|
1538
|
jpayne@69
|
1539 namespace _ { // private
|
jpayne@69
|
1540
|
jpayne@69
|
1541 class WeakFulfillerBase: protected kj::Disposer {
|
jpayne@69
|
1542 protected:
|
jpayne@69
|
1543 WeakFulfillerBase(): inner(nullptr) {}
|
jpayne@69
|
1544 virtual ~WeakFulfillerBase() noexcept(false) {}
|
jpayne@69
|
1545
|
jpayne@69
|
1546 template <typename T>
|
jpayne@69
|
1547 inline PromiseFulfiller<T>* getInner() {
|
jpayne@69
|
1548 return static_cast<PromiseFulfiller<T>*>(inner);
|
jpayne@69
|
1549 };
|
jpayne@69
|
1550 template <typename T>
|
jpayne@69
|
1551 inline void setInner(PromiseFulfiller<T>* ptr) {
|
jpayne@69
|
1552 inner = ptr;
|
jpayne@69
|
1553 };
|
jpayne@69
|
1554
|
jpayne@69
|
1555 private:
|
jpayne@69
|
1556 mutable PromiseRejector* inner;
|
jpayne@69
|
1557
|
jpayne@69
|
1558 void disposeImpl(void* pointer) const override;
|
jpayne@69
|
1559 };
|
jpayne@69
|
1560
|
jpayne@69
|
1561 template <typename T>
|
jpayne@69
|
1562 class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase {
|
jpayne@69
|
1563 // A wrapper around PromiseFulfiller which can be detached.
|
jpayne@69
|
1564 //
|
jpayne@69
|
1565 // There are a couple non-trivialities here:
|
jpayne@69
|
1566 // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
|
jpayne@69
|
1567 // rejected.
|
jpayne@69
|
1568 // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
|
jpayne@69
|
1569 // detached from the underlying fulfiller, because otherwise the later detach() call will go
|
jpayne@69
|
1570 // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the
|
jpayne@69
|
1571 // refcount never goes over 2 and we manually implement the refcounting because we need to do
|
jpayne@69
|
1572 // other special things when each side detaches anyway. To this end, WeakFulfiller is its own
|
jpayne@69
|
1573 // Disposer -- dispose() is called when the application discards its owned pointer to the
|
jpayne@69
|
1574 // fulfiller and detach() is called when the promise is destroyed.
|
jpayne@69
|
1575
|
jpayne@69
|
1576 public:
|
jpayne@69
|
1577 KJ_DISALLOW_COPY_AND_MOVE(WeakFulfiller);
|
jpayne@69
|
1578
|
jpayne@69
|
1579 static kj::Own<WeakFulfiller> make() {
|
jpayne@69
|
1580 WeakFulfiller* ptr = new WeakFulfiller;
|
jpayne@69
|
1581 return Own<WeakFulfiller>(ptr, *ptr);
|
jpayne@69
|
1582 }
|
jpayne@69
|
1583
|
jpayne@69
|
1584 void fulfill(FixVoid<T>&& value) override {
|
jpayne@69
|
1585 if (getInner<T>() != nullptr) {
|
jpayne@69
|
1586 getInner<T>()->fulfill(kj::mv(value));
|
jpayne@69
|
1587 }
|
jpayne@69
|
1588 }
|
jpayne@69
|
1589
|
jpayne@69
|
1590 void reject(Exception&& exception) override {
|
jpayne@69
|
1591 if (getInner<T>() != nullptr) {
|
jpayne@69
|
1592 getInner<T>()->reject(kj::mv(exception));
|
jpayne@69
|
1593 }
|
jpayne@69
|
1594 }
|
jpayne@69
|
1595
|
jpayne@69
|
1596 bool isWaiting() override {
|
jpayne@69
|
1597 return getInner<T>() != nullptr && getInner<T>()->isWaiting();
|
jpayne@69
|
1598 }
|
jpayne@69
|
1599
|
jpayne@69
|
1600 void attach(PromiseFulfiller<T>& newInner) {
|
jpayne@69
|
1601 setInner<T>(&newInner);
|
jpayne@69
|
1602 }
|
jpayne@69
|
1603
|
jpayne@69
|
1604 void detach(PromiseFulfiller<T>& from) {
|
jpayne@69
|
1605 if (getInner<T>() == nullptr) {
|
jpayne@69
|
1606 // Already disposed.
|
jpayne@69
|
1607 delete this;
|
jpayne@69
|
1608 } else {
|
jpayne@69
|
1609 KJ_IREQUIRE(getInner<T>() == &from);
|
jpayne@69
|
1610 setInner<T>(nullptr);
|
jpayne@69
|
1611 }
|
jpayne@69
|
1612 }
|
jpayne@69
|
1613
|
jpayne@69
|
1614 private:
|
jpayne@69
|
1615 WeakFulfiller() {}
|
jpayne@69
|
1616 };
|
jpayne@69
|
1617
|
jpayne@69
|
1618 template <typename T>
|
jpayne@69
|
1619 class PromiseAndFulfillerAdapter {
|
jpayne@69
|
1620 public:
|
jpayne@69
|
1621 PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
|
jpayne@69
|
1622 WeakFulfiller<T>& wrapper)
|
jpayne@69
|
1623 : fulfiller(fulfiller), wrapper(wrapper) {
|
jpayne@69
|
1624 wrapper.attach(fulfiller);
|
jpayne@69
|
1625 }
|
jpayne@69
|
1626
|
jpayne@69
|
1627 ~PromiseAndFulfillerAdapter() noexcept(false) {
|
jpayne@69
|
1628 wrapper.detach(fulfiller);
|
jpayne@69
|
1629 }
|
jpayne@69
|
1630
|
jpayne@69
|
1631 private:
|
jpayne@69
|
1632 PromiseFulfiller<T>& fulfiller;
|
jpayne@69
|
1633 WeakFulfiller<T>& wrapper;
|
jpayne@69
|
1634 };
|
jpayne@69
|
1635
|
jpayne@69
|
1636 } // namespace _ (private)
|
jpayne@69
|
1637
|
jpayne@69
|
1638 template <typename T>
|
jpayne@69
|
1639 template <typename Func>
|
jpayne@69
|
1640 bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) {
|
jpayne@69
|
1641 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
|
jpayne@69
|
1642 reject(kj::mv(*exception));
|
jpayne@69
|
1643 return false;
|
jpayne@69
|
1644 } else {
|
jpayne@69
|
1645 return true;
|
jpayne@69
|
1646 }
|
jpayne@69
|
1647 }
|
jpayne@69
|
1648
|
jpayne@69
|
1649 template <typename Func>
|
jpayne@69
|
1650 bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) {
|
jpayne@69
|
1651 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
|
jpayne@69
|
1652 reject(kj::mv(*exception));
|
jpayne@69
|
1653 return false;
|
jpayne@69
|
1654 } else {
|
jpayne@69
|
1655 return true;
|
jpayne@69
|
1656 }
|
jpayne@69
|
1657 }
|
jpayne@69
|
1658
|
jpayne@69
|
1659 template <typename T, typename Adapter, typename... Params>
|
jpayne@69
|
1660 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
|
jpayne@69
|
1661 _::OwnPromiseNode intermediate(
|
jpayne@69
|
1662 _::allocPromise<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
|
jpayne@69
|
1663 kj::fwd<Params>(adapterConstructorParams)...));
|
jpayne@69
|
1664 // We can't capture SourceLocation in this function's arguments since it is a vararg template. :(
|
jpayne@69
|
1665 return _::PromiseNode::to<_::ReducePromises<T>>(
|
jpayne@69
|
1666 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), SourceLocation()));
|
jpayne@69
|
1667 }
|
jpayne@69
|
1668
|
jpayne@69
|
1669 template <typename T>
|
jpayne@69
|
1670 PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location) {
|
jpayne@69
|
1671 auto wrapper = _::WeakFulfiller<T>::make();
|
jpayne@69
|
1672
|
jpayne@69
|
1673 _::OwnPromiseNode intermediate(
|
jpayne@69
|
1674 _::allocPromise<_::AdapterPromiseNode<
|
jpayne@69
|
1675 _::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
|
jpayne@69
|
1676 auto promise = _::PromiseNode::to<_::ReducePromises<T>>(
|
jpayne@69
|
1677 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), location));
|
jpayne@69
|
1678
|
jpayne@69
|
1679 return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
|
jpayne@69
|
1680 }
|
jpayne@69
|
1681
|
jpayne@69
|
1682 // =======================================================================================
|
jpayne@69
|
1683 // cross-thread stuff
|
jpayne@69
|
1684
|
jpayne@69
|
1685 namespace _ { // (private)
|
jpayne@69
|
1686
|
jpayne@69
|
1687 class XThreadEvent: public PromiseNode, // it's a PromiseNode in the requesting thread
|
jpayne@69
|
1688 private Event { // it's an event in the target thread
|
jpayne@69
|
1689 public:
|
jpayne@69
|
1690 XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, EventLoop& loop,
|
jpayne@69
|
1691 void* funcTracePtr, SourceLocation location);
|
jpayne@69
|
1692
|
jpayne@69
|
1693 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
1694
|
jpayne@69
|
1695 protected:
|
jpayne@69
|
1696 void ensureDoneOrCanceled();
|
jpayne@69
|
1697 // MUST be called in destructor of subclasses to make sure the object is not destroyed while
|
jpayne@69
|
1698 // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because
|
jpayne@69
|
1699 // that destructor doesn't run until the subclass has already been destroyed.)
|
jpayne@69
|
1700
|
jpayne@69
|
1701 virtual kj::Maybe<OwnPromiseNode> execute() = 0;
|
jpayne@69
|
1702 // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
|
jpayne@69
|
1703 // returns null.
|
jpayne@69
|
1704
|
jpayne@69
|
1705 // implements PromiseNode ----------------------------------------------------
|
jpayne@69
|
1706 void onReady(Event* event) noexcept override;
|
jpayne@69
|
1707
|
jpayne@69
|
1708 private:
|
jpayne@69
|
1709 ExceptionOrValue& result;
|
jpayne@69
|
1710 void* funcTracePtr;
|
jpayne@69
|
1711
|
jpayne@69
|
1712 kj::Own<const Executor> targetExecutor;
|
jpayne@69
|
1713 Maybe<const Executor&> replyExecutor; // If executeAsync() was used.
|
jpayne@69
|
1714
|
jpayne@69
|
1715 kj::Maybe<OwnPromiseNode> promiseNode;
|
jpayne@69
|
1716 // Accessed only in target thread.
|
jpayne@69
|
1717
|
jpayne@69
|
1718 ListLink<XThreadEvent> targetLink;
|
jpayne@69
|
1719 // Membership in one of the linked lists in the target Executor's work list or cancel list. These
|
jpayne@69
|
1720 // fields are protected by the target Executor's mutex.
|
jpayne@69
|
1721
|
jpayne@69
|
1722 enum {
|
jpayne@69
|
1723 UNUSED,
|
jpayne@69
|
1724 // Object was never queued on another thread.
|
jpayne@69
|
1725
|
jpayne@69
|
1726 QUEUED,
|
jpayne@69
|
1727 // Target thread has not yet dequeued the event from the state.start list. The requesting
|
jpayne@69
|
1728 // thread can cancel execution by removing the event from the list.
|
jpayne@69
|
1729
|
jpayne@69
|
1730 EXECUTING,
|
jpayne@69
|
1731 // Target thread has dequeued the event from state.start and moved it to state.executing. To
|
jpayne@69
|
1732 // cancel, the requesting thread must add the event to the state.cancel list and change the
|
jpayne@69
|
1733 // state to CANCELING.
|
jpayne@69
|
1734
|
jpayne@69
|
1735 CANCELING,
|
jpayne@69
|
1736 // Requesting thread is trying to cancel this event. The target thread will change the state to
|
jpayne@69
|
1737 // `DONE` once canceled.
|
jpayne@69
|
1738
|
jpayne@69
|
1739 DONE
|
jpayne@69
|
1740 // Target thread has completed handling this event and will not touch it again. The requesting
|
jpayne@69
|
1741 // thread can safely delete the object. The `state` is updated to `DONE` using an atomic
|
jpayne@69
|
1742 // release operation after ensuring that the event will not be touched again, so that the
|
jpayne@69
|
1743 // requesting can safely skip locking if it observes the state is already DONE.
|
jpayne@69
|
1744 } state = UNUSED;
|
jpayne@69
|
1745 // State, which is also protected by `targetExecutor`'s mutex.
|
jpayne@69
|
1746
|
jpayne@69
|
1747 ListLink<XThreadEvent> replyLink;
|
jpayne@69
|
1748 // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The
|
jpayne@69
|
1749 // executing thread places the event in the reply list near the end of the `EXECUTING` state.
|
jpayne@69
|
1750 // Because the thread cannot lock two mutexes at once, it's possible that the reply executor
|
jpayne@69
|
1751 // will receive the reply while the event is still listed in the EXECUTING state, but it can
|
jpayne@69
|
1752 // ignore the state and proceed with the result.
|
jpayne@69
|
1753
|
jpayne@69
|
1754 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1755 // Accessed only in requesting thread.
|
jpayne@69
|
1756
|
jpayne@69
|
1757 friend class kj::Executor;
|
jpayne@69
|
1758
|
jpayne@69
|
1759 void done();
|
jpayne@69
|
1760 // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT
|
jpayne@69
|
1761 // call under lock.
|
jpayne@69
|
1762
|
jpayne@69
|
1763 void sendReply();
|
jpayne@69
|
1764 // Notifies the originating thread that this event is done, but doesn't set the state to DONE
|
jpayne@69
|
1765 // yet. Do NOT call under lock.
|
jpayne@69
|
1766
|
jpayne@69
|
1767 void setDoneState();
|
jpayne@69
|
1768 // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must
|
jpayne@69
|
1769 // only be called in the destination thread, and must either be called under lock, or the thread
|
jpayne@69
|
1770 // must take the lock and release it again shortly after setting the state (because some threads
|
jpayne@69
|
1771 // may be waiting on the DONE state using a conditional wait on the mutex). After calling
|
jpayne@69
|
1772 // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs
|
jpayne@69
|
1773 // solely to the requesting thread.
|
jpayne@69
|
1774
|
jpayne@69
|
1775 void setDisconnected();
|
jpayne@69
|
1776 // Sets the result to a DISCONNECTED exception indicating that the target event loop exited.
|
jpayne@69
|
1777
|
jpayne@69
|
1778 class DelayedDoneHack;
|
jpayne@69
|
1779
|
jpayne@69
|
1780 // implements Event ----------------------------------------------------------
|
jpayne@69
|
1781 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
1782 // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr,
|
jpayne@69
|
1783 // then it just indicated readiness and we need to get its result.
|
jpayne@69
|
1784
|
jpayne@69
|
1785 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
1786 };
|
jpayne@69
|
1787
|
jpayne@69
|
1788 template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>>
|
jpayne@69
|
1789 class XThreadEventImpl final: public XThreadEvent {
|
jpayne@69
|
1790 // Implementation for a function that does not return a Promise.
|
jpayne@69
|
1791 public:
|
jpayne@69
|
1792 XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location)
|
jpayne@69
|
1793 : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location),
|
jpayne@69
|
1794 func(kj::fwd<Func>(func)) {}
|
jpayne@69
|
1795 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
|
jpayne@69
|
1796 void destroy() override { freePromise(this); }
|
jpayne@69
|
1797
|
jpayne@69
|
1798 typedef _::FixVoid<_::ReturnType<Func, void>> ResultT;
|
jpayne@69
|
1799
|
jpayne@69
|
1800 kj::Maybe<_::OwnPromiseNode> execute() override {
|
jpayne@69
|
1801 result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void());
|
jpayne@69
|
1802 return nullptr;
|
jpayne@69
|
1803 }
|
jpayne@69
|
1804
|
jpayne@69
|
1805 // implements PromiseNode ----------------------------------------------------
|
jpayne@69
|
1806 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1807 output.as<ResultT>() = kj::mv(result);
|
jpayne@69
|
1808 }
|
jpayne@69
|
1809
|
jpayne@69
|
1810 private:
|
jpayne@69
|
1811 Func func;
|
jpayne@69
|
1812 ExceptionOr<ResultT> result;
|
jpayne@69
|
1813 friend Executor;
|
jpayne@69
|
1814 };
|
jpayne@69
|
1815
|
jpayne@69
|
1816 template <typename Func, typename T>
|
jpayne@69
|
1817 class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent {
|
jpayne@69
|
1818 // Implementation for a function that DOES return a Promise.
|
jpayne@69
|
1819 public:
|
jpayne@69
|
1820 XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location)
|
jpayne@69
|
1821 : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location),
|
jpayne@69
|
1822 func(kj::fwd<Func>(func)) {}
|
jpayne@69
|
1823 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
|
jpayne@69
|
1824 void destroy() override { freePromise(this); }
|
jpayne@69
|
1825
|
jpayne@69
|
1826 typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT;
|
jpayne@69
|
1827
|
jpayne@69
|
1828 kj::Maybe<_::OwnPromiseNode> execute() override {
|
jpayne@69
|
1829 auto result = _::PromiseNode::from(func());
|
jpayne@69
|
1830 KJ_IREQUIRE(result.get() != nullptr);
|
jpayne@69
|
1831 return kj::mv(result);
|
jpayne@69
|
1832 }
|
jpayne@69
|
1833
|
jpayne@69
|
1834 // implements PromiseNode ----------------------------------------------------
|
jpayne@69
|
1835 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1836 output.as<ResultT>() = kj::mv(result);
|
jpayne@69
|
1837 }
|
jpayne@69
|
1838
|
jpayne@69
|
1839 private:
|
jpayne@69
|
1840 Func func;
|
jpayne@69
|
1841 ExceptionOr<ResultT> result;
|
jpayne@69
|
1842 friend Executor;
|
jpayne@69
|
1843 };
|
jpayne@69
|
1844
|
jpayne@69
|
1845 } // namespace _ (private)
|
jpayne@69
|
1846
|
jpayne@69
|
1847 template <typename Func>
|
jpayne@69
|
1848 _::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(
|
jpayne@69
|
1849 Func&& func, SourceLocation location) const {
|
jpayne@69
|
1850 _::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this, getLoop(), location);
|
jpayne@69
|
1851 send(event, true);
|
jpayne@69
|
1852 return convertToReturn(kj::mv(event.result));
|
jpayne@69
|
1853 }
|
jpayne@69
|
1854
|
jpayne@69
|
1855 template <typename Func>
|
jpayne@69
|
1856 PromiseForResult<Func, void> Executor::executeAsync(Func&& func, SourceLocation location) const {
|
jpayne@69
|
1857 // HACK: We call getLoop() here, rather than have XThreadEvent's constructor do it, so that if it
|
jpayne@69
|
1858 // throws we don't crash due to `allocPromise()` being `noexcept`.
|
jpayne@69
|
1859 auto event = _::allocPromise<_::XThreadEventImpl<Func>>(
|
jpayne@69
|
1860 kj::fwd<Func>(func), *this, getLoop(), location);
|
jpayne@69
|
1861 send(*event, false);
|
jpayne@69
|
1862 return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event));
|
jpayne@69
|
1863 }
|
jpayne@69
|
1864
|
jpayne@69
|
1865 // -----------------------------------------------------------------------------
|
jpayne@69
|
1866
|
jpayne@69
|
1867 namespace _ { // (private)
|
jpayne@69
|
1868
|
jpayne@69
|
1869 template <typename T>
|
jpayne@69
|
1870 class XThreadFulfiller;
|
jpayne@69
|
1871
|
jpayne@69
|
1872 class XThreadPaf: public PromiseNode {
|
jpayne@69
|
1873 public:
|
jpayne@69
|
1874 XThreadPaf();
|
jpayne@69
|
1875 virtual ~XThreadPaf() noexcept(false);
|
jpayne@69
|
1876 void destroy() override;
|
jpayne@69
|
1877
|
jpayne@69
|
1878 // implements PromiseNode ----------------------------------------------------
|
jpayne@69
|
1879 void onReady(Event* event) noexcept override;
|
jpayne@69
|
1880 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
1881
|
jpayne@69
|
1882 private:
|
jpayne@69
|
1883 enum {
|
jpayne@69
|
1884 WAITING,
|
jpayne@69
|
1885 // Not yet fulfilled, and the waiter is still waiting.
|
jpayne@69
|
1886 //
|
jpayne@69
|
1887 // Starting from this state, the state may transition to either FULFILLING or CANCELED
|
jpayne@69
|
1888 // using an atomic compare-and-swap.
|
jpayne@69
|
1889
|
jpayne@69
|
1890 FULFILLING,
|
jpayne@69
|
1891 // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it
|
jpayne@69
|
1892 // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not
|
jpayne@69
|
1893 // disappear out from under it. It then fills in the result value, locks the executor mutex,
|
jpayne@69
|
1894 // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to
|
jpayne@69
|
1895 // FULFILLED, and finally unlocks the mutex.
|
jpayne@69
|
1896 //
|
jpayne@69
|
1897 // If the waiting thread tries to cancel but discovers the object in this state, then it
|
jpayne@69
|
1898 // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED.
|
jpayne@69
|
1899 // It can then delete the object.
|
jpayne@69
|
1900
|
jpayne@69
|
1901 FULFILLED,
|
jpayne@69
|
1902 // The fulfilling thread has completed filling in the result value and inserting the object
|
jpayne@69
|
1903 // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer
|
jpayne@69
|
1904 // holds any pointers to this object. The waiting thread is responsible for deleting it.
|
jpayne@69
|
1905
|
jpayne@69
|
1906 DISPATCHED,
|
jpayne@69
|
1907 // The object reached FULFILLED state, and then was dispatched from the waiting thread's
|
jpayne@69
|
1908 // executor's event queue. Therefore, the object is completely owned by the waiting thread with
|
jpayne@69
|
1909 // no need to lock anything.
|
jpayne@69
|
1910
|
jpayne@69
|
1911 CANCELED
|
jpayne@69
|
1912 // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no
|
jpayne@69
|
1913 // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the
|
jpayne@69
|
1914 // object.
|
jpayne@69
|
1915 } state;
|
jpayne@69
|
1916
|
jpayne@69
|
1917 const Executor& executor;
|
jpayne@69
|
1918 // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or
|
jpayne@69
|
1919 // `FULFILLING`. After any other state has been reached, this reference may be invalidated.
|
jpayne@69
|
1920
|
jpayne@69
|
1921 ListLink<XThreadPaf> link;
|
jpayne@69
|
1922 // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting
|
jpayne@69
|
1923 // thread's executor. In those states, these pointers are guarded by said executor's mutex.
|
jpayne@69
|
1924
|
jpayne@69
|
1925 OnReadyEvent onReadyEvent;
|
jpayne@69
|
1926
|
jpayne@69
|
1927 class FulfillScope;
|
jpayne@69
|
1928
|
jpayne@69
|
1929 static kj::Exception unfulfilledException();
|
jpayne@69
|
1930 // Construct appropriate exception to use to reject an unfulfilled XThreadPaf.
|
jpayne@69
|
1931
|
jpayne@69
|
1932 template <typename T>
|
jpayne@69
|
1933 friend class XThreadFulfiller;
|
jpayne@69
|
1934 friend Executor;
|
jpayne@69
|
1935 };
|
jpayne@69
|
1936
|
jpayne@69
|
1937 template <typename T>
|
jpayne@69
|
1938 class XThreadPafImpl final: public XThreadPaf {
|
jpayne@69
|
1939 public:
|
jpayne@69
|
1940 // implements PromiseNode ----------------------------------------------------
|
jpayne@69
|
1941 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
1942 output.as<FixVoid<T>>() = kj::mv(result);
|
jpayne@69
|
1943 }
|
jpayne@69
|
1944
|
jpayne@69
|
1945 private:
|
jpayne@69
|
1946 ExceptionOr<FixVoid<T>> result;
|
jpayne@69
|
1947
|
jpayne@69
|
1948 friend class XThreadFulfiller<T>;
|
jpayne@69
|
1949 };
|
jpayne@69
|
1950
|
jpayne@69
|
1951 class XThreadPaf::FulfillScope {
|
jpayne@69
|
1952 // Create on stack while setting `XThreadPafImpl<T>::result`.
|
jpayne@69
|
1953 //
|
jpayne@69
|
1954 // This ensures that:
|
jpayne@69
|
1955 // - Only one call is carried out, even if multiple threads try to fulfill concurrently.
|
jpayne@69
|
1956 // - The waiting thread is correctly signaled.
|
jpayne@69
|
1957 public:
|
jpayne@69
|
1958 FulfillScope(XThreadPaf** pointer);
|
jpayne@69
|
1959 // Atomically nulls out *pointer and takes ownership of the pointer.
|
jpayne@69
|
1960
|
jpayne@69
|
1961 ~FulfillScope() noexcept(false);
|
jpayne@69
|
1962
|
jpayne@69
|
1963 KJ_DISALLOW_COPY_AND_MOVE(FulfillScope);
|
jpayne@69
|
1964
|
jpayne@69
|
1965 bool shouldFulfill() { return obj != nullptr; }
|
jpayne@69
|
1966
|
jpayne@69
|
1967 template <typename T>
|
jpayne@69
|
1968 XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); }
|
jpayne@69
|
1969
|
jpayne@69
|
1970 private:
|
jpayne@69
|
1971 XThreadPaf* obj;
|
jpayne@69
|
1972 };
|
jpayne@69
|
1973
|
jpayne@69
|
1974 template <typename T>
|
jpayne@69
|
1975 class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> {
|
jpayne@69
|
1976 public:
|
jpayne@69
|
1977 XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {}
|
jpayne@69
|
1978
|
jpayne@69
|
1979 ~XThreadFulfiller() noexcept(false) {
|
jpayne@69
|
1980 if (target != nullptr) {
|
jpayne@69
|
1981 reject(XThreadPaf::unfulfilledException());
|
jpayne@69
|
1982 }
|
jpayne@69
|
1983 }
|
jpayne@69
|
1984 void fulfill(FixVoid<T>&& value) const override {
|
jpayne@69
|
1985 XThreadPaf::FulfillScope scope(&target);
|
jpayne@69
|
1986 if (scope.shouldFulfill()) {
|
jpayne@69
|
1987 scope.getTarget<T>()->result = kj::mv(value);
|
jpayne@69
|
1988 }
|
jpayne@69
|
1989 }
|
jpayne@69
|
1990 void reject(Exception&& exception) const override {
|
jpayne@69
|
1991 XThreadPaf::FulfillScope scope(&target);
|
jpayne@69
|
1992 if (scope.shouldFulfill()) {
|
jpayne@69
|
1993 scope.getTarget<T>()->result.addException(kj::mv(exception));
|
jpayne@69
|
1994 }
|
jpayne@69
|
1995 }
|
jpayne@69
|
1996 bool isWaiting() const override {
|
jpayne@69
|
1997 KJ_IF_MAYBE(t, target) {
|
jpayne@69
|
1998 #if _MSC_VER && !__clang__
|
jpayne@69
|
1999 // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be?
|
jpayne@69
|
2000 return t->state == XThreadPaf::WAITING;
|
jpayne@69
|
2001 #else
|
jpayne@69
|
2002 return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING;
|
jpayne@69
|
2003 #endif
|
jpayne@69
|
2004 } else {
|
jpayne@69
|
2005 return false;
|
jpayne@69
|
2006 }
|
jpayne@69
|
2007 }
|
jpayne@69
|
2008
|
jpayne@69
|
2009 private:
|
jpayne@69
|
2010 mutable XThreadPaf* target; // accessed using atomic ops
|
jpayne@69
|
2011 };
|
jpayne@69
|
2012
|
jpayne@69
|
2013 template <typename T>
|
jpayne@69
|
2014 class XThreadFulfiller<kj::Promise<T>> {
|
jpayne@69
|
2015 public:
|
jpayne@69
|
2016 static_assert(sizeof(T) < 0,
|
jpayne@69
|
2017 "newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported");
|
jpayne@69
|
2018 // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`,
|
jpayne@69
|
2019 // then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not
|
jpayne@69
|
2020 // the waiting thread.
|
jpayne@69
|
2021 };
|
jpayne@69
|
2022
|
jpayne@69
|
2023 } // namespace _ (private)
|
jpayne@69
|
2024
|
jpayne@69
|
2025 template <typename T>
|
jpayne@69
|
2026 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() {
|
jpayne@69
|
2027 kj::Own<_::XThreadPafImpl<T>, _::PromiseDisposer> node(new _::XThreadPafImpl<T>);
|
jpayne@69
|
2028 auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node);
|
jpayne@69
|
2029 return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) };
|
jpayne@69
|
2030 }
|
jpayne@69
|
2031
|
jpayne@69
|
2032 } // namespace kj
|
jpayne@69
|
2033
|
jpayne@69
|
2034 #if KJ_HAS_COROUTINE
|
jpayne@69
|
2035
|
jpayne@69
|
2036 // =======================================================================================
|
jpayne@69
|
2037 // Coroutines TS integration with kj::Promise<T>.
|
jpayne@69
|
2038 //
|
jpayne@69
|
2039 // Here's a simple coroutine:
|
jpayne@69
|
2040 //
|
jpayne@69
|
2041 // Promise<Own<AsyncIoStream>> connectToService(Network& n) {
|
jpayne@69
|
2042 // auto a = co_await n.parseAddress(IP, PORT);
|
jpayne@69
|
2043 // auto c = co_await a->connect();
|
jpayne@69
|
2044 // co_return kj::mv(c);
|
jpayne@69
|
2045 // }
|
jpayne@69
|
2046 //
|
jpayne@69
|
2047 // The presence of the co_await and co_return keywords tell the compiler it is a coroutine.
|
jpayne@69
|
2048 // Although it looks similar to a function, it has a couple large differences. First, everything
|
jpayne@69
|
2049 // that would normally live in the stack frame lives instead in a heap-based coroutine frame.
|
jpayne@69
|
2050 // Second, the coroutine has the ability to return from its scope without deallocating this frame
|
jpayne@69
|
2051 // (to suspend, in other words), and the ability to resume from its last suspension point.
|
jpayne@69
|
2052 //
|
jpayne@69
|
2053 // In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a
|
jpayne@69
|
2054 // coroutine implementation type via a traits class parameterized by the coroutine return and
|
jpayne@69
|
2055 // parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`,
|
jpayne@69
|
2056
|
jpayne@69
|
2057 namespace kj::_ { template <typename T> class Coroutine; }
|
jpayne@69
|
2058
|
jpayne@69
|
2059 // Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`.
|
jpayne@69
|
2060
|
jpayne@69
|
2061 namespace KJ_COROUTINE_STD_NAMESPACE {
|
jpayne@69
|
2062
|
jpayne@69
|
2063 template <class T, class... Args>
|
jpayne@69
|
2064 struct coroutine_traits<kj::Promise<T>, Args...> {
|
jpayne@69
|
2065 // `Args...` are the coroutine's parameter types.
|
jpayne@69
|
2066
|
jpayne@69
|
2067 using promise_type = kj::_::Coroutine<T>;
|
jpayne@69
|
2068 // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines
|
jpayne@69
|
2069 // returning `std::future<T>`, since the coroutine implementation would be a wrapper around
|
jpayne@69
|
2070 // a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it
|
jpayne@69
|
2071 // the "coroutine implementation type" instead.
|
jpayne@69
|
2072 };
|
jpayne@69
|
2073
|
jpayne@69
|
2074 } // namespace KJ_COROUTINE_STD_NAMESPACE
|
jpayne@69
|
2075
|
jpayne@69
|
2076 // Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a
|
jpayne@69
|
2077 // `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or
|
jpayne@69
|
2078 // `kj::_::Coroutine<Own<AsyncIoStream>>`.
|
jpayne@69
|
2079 //
|
jpayne@69
|
2080 // The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and
|
jpayne@69
|
2081 // deallocated when the frame does.
|
jpayne@69
|
2082
|
jpayne@69
|
2083 namespace kj::_ {
|
jpayne@69
|
2084
|
jpayne@69
|
2085 namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE;
|
jpayne@69
|
2086
|
jpayne@69
|
2087 class CoroutineBase: public PromiseNode,
|
jpayne@69
|
2088 public Event {
|
jpayne@69
|
2089 public:
|
jpayne@69
|
2090 CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef,
|
jpayne@69
|
2091 SourceLocation location);
|
jpayne@69
|
2092 ~CoroutineBase() noexcept(false);
|
jpayne@69
|
2093 KJ_DISALLOW_COPY_AND_MOVE(CoroutineBase);
|
jpayne@69
|
2094 void destroy() override;
|
jpayne@69
|
2095
|
jpayne@69
|
2096 auto initial_suspend() { return stdcoro::suspend_never(); }
|
jpayne@69
|
2097 auto final_suspend() noexcept {
|
jpayne@69
|
2098 #if _MSC_VER && !defined(__clang__)
|
jpayne@69
|
2099 // See comment at `finalSuspendCalled`'s definition.
|
jpayne@69
|
2100 finalSuspendCalled = true;
|
jpayne@69
|
2101 #endif
|
jpayne@69
|
2102 return stdcoro::suspend_always();
|
jpayne@69
|
2103 }
|
jpayne@69
|
2104 // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately
|
jpayne@69
|
2105 // after completion.
|
jpayne@69
|
2106 //
|
jpayne@69
|
2107 // The initial suspension point could allow us to defer the initial synchronous execution of a
|
jpayne@69
|
2108 // coroutine -- everything before its first co_await, that is.
|
jpayne@69
|
2109 //
|
jpayne@69
|
2110 // The final suspension point is useful to delay deallocation of the coroutine frame to match the
|
jpayne@69
|
2111 // lifetime of the enclosing promise.
|
jpayne@69
|
2112
|
jpayne@69
|
2113 void unhandled_exception();
|
jpayne@69
|
2114
|
jpayne@69
|
2115 protected:
|
jpayne@69
|
2116 class AwaiterBase;
|
jpayne@69
|
2117
|
jpayne@69
|
2118 bool isWaiting() { return waiting; }
|
jpayne@69
|
2119 void scheduleResumption() {
|
jpayne@69
|
2120 onReadyEvent.arm();
|
jpayne@69
|
2121 waiting = false;
|
jpayne@69
|
2122 }
|
jpayne@69
|
2123
|
jpayne@69
|
2124 private:
|
jpayne@69
|
2125 // -------------------------------------------------------
|
jpayne@69
|
2126 // PromiseNode implementation
|
jpayne@69
|
2127
|
jpayne@69
|
2128 void onReady(Event* event) noexcept override;
|
jpayne@69
|
2129 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
|
jpayne@69
|
2130
|
jpayne@69
|
2131 // -------------------------------------------------------
|
jpayne@69
|
2132 // Event implementation
|
jpayne@69
|
2133
|
jpayne@69
|
2134 Maybe<Own<Event>> fire() override;
|
jpayne@69
|
2135 void traceEvent(TraceBuilder& builder) override;
|
jpayne@69
|
2136
|
jpayne@69
|
2137 stdcoro::coroutine_handle<> coroutine;
|
jpayne@69
|
2138 ExceptionOrValue& resultRef;
|
jpayne@69
|
2139
|
jpayne@69
|
2140 OnReadyEvent onReadyEvent;
|
jpayne@69
|
2141 bool waiting = true;
|
jpayne@69
|
2142
|
jpayne@69
|
2143 bool hasSuspendedAtLeastOnce = false;
|
jpayne@69
|
2144
|
jpayne@69
|
2145 #if _MSC_VER && !defined(__clang__)
|
jpayne@69
|
2146 bool finalSuspendCalled = false;
|
jpayne@69
|
2147 // MSVC erroneously reports the coroutine as done (that is, `coroutine.done()` returns true)
|
jpayne@69
|
2148 // seemingly as soon as `return_value()`/`return_void()` are called. This matters in our
|
jpayne@69
|
2149 // implementation of `unhandled_exception()`, which must arrange to propagate exceptions during
|
jpayne@69
|
2150 // coroutine frame unwind via the returned promise, even if `return_value()`/`return_void()` have
|
jpayne@69
|
2151 // already been called. To prove that our assumptions are correct in that function, we want to be
|
jpayne@69
|
2152 // able to assert that `final_suspend()` has not yet been called. This boolean hack allows us to
|
jpayne@69
|
2153 // preserve that assertion.
|
jpayne@69
|
2154 #endif
|
jpayne@69
|
2155
|
jpayne@69
|
2156 Maybe<PromiseNode&> promiseNodeForTrace;
|
jpayne@69
|
2157 // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that
|
jpayne@69
|
2158 // promise so tracePromise()/traceEvent() can trace into it.
|
jpayne@69
|
2159
|
jpayne@69
|
2160 UnwindDetector unwindDetector;
|
jpayne@69
|
2161
|
jpayne@69
|
2162 struct DisposalResults {
|
jpayne@69
|
2163 bool destructorRan = false;
|
jpayne@69
|
2164 Maybe<Exception> exception;
|
jpayne@69
|
2165 };
|
jpayne@69
|
2166 Maybe<DisposalResults&> maybeDisposalResults;
|
jpayne@69
|
2167 // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this
|
jpayne@69
|
2168 // to point to a DisposalResults on the stack so unhandled_exception() will have some place to
|
jpayne@69
|
2169 // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once
|
jpayne@69
|
2170 // coroutine.destroy() has returned. Our disposer then rethrows as needed.
|
jpayne@69
|
2171 };
|
jpayne@69
|
2172
|
jpayne@69
|
2173 template <typename Self, typename T>
|
jpayne@69
|
2174 class CoroutineMixin;
|
jpayne@69
|
2175 // CRTP mixin, covered later.
|
jpayne@69
|
2176
|
jpayne@69
|
2177 template <typename T>
|
jpayne@69
|
2178 class Coroutine final: public CoroutineBase,
|
jpayne@69
|
2179 public CoroutineMixin<Coroutine<T>, T> {
|
jpayne@69
|
2180 // The standard calls this the `promise_type` object. We can call this the "coroutine
|
jpayne@69
|
2181 // implementation object" since the word promise means different things in KJ and std styles. This
|
jpayne@69
|
2182 // is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise
|
jpayne@69
|
2183 // is later fulfilled. We also fill in a few lifetime-related details.
|
jpayne@69
|
2184 //
|
jpayne@69
|
2185 // The implementation object is also where we can customize memory allocation of coroutine frames,
|
jpayne@69
|
2186 // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in
|
jpayne@69
|
2187 // coroutine_traits).
|
jpayne@69
|
2188 //
|
jpayne@69
|
2189 // We can also customize how await-expressions are transformed within `kj::Promise<T>`-based
|
jpayne@69
|
2190 // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for
|
jpayne@69
|
2191 // which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to
|
jpayne@69
|
2192 // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the
|
jpayne@69
|
2193 // await-expression's type are both `kj::Promise` instantiations -- see further comments under
|
jpayne@69
|
2194 // `await_transform()`.
|
jpayne@69
|
2195
|
jpayne@69
|
2196 public:
|
jpayne@69
|
2197 using Handle = stdcoro::coroutine_handle<Coroutine<T>>;
|
jpayne@69
|
2198
|
jpayne@69
|
2199 Coroutine(SourceLocation location = {})
|
jpayne@69
|
2200 : CoroutineBase(Handle::from_promise(*this), result, location) {}
|
jpayne@69
|
2201
|
jpayne@69
|
2202 Promise<T> get_return_object() {
|
jpayne@69
|
2203 // Called after coroutine frame construction and before initial_suspend() to create the
|
jpayne@69
|
2204 // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for
|
jpayne@69
|
2205 // the returned Promise<T> to own `this` via a custom Disposer and by always leaving the
|
jpayne@69
|
2206 // coroutine in a suspended state.
|
jpayne@69
|
2207 return PromiseNode::to<Promise<T>>(OwnPromiseNode(this));
|
jpayne@69
|
2208 }
|
jpayne@69
|
2209
|
jpayne@69
|
2210 public:
|
jpayne@69
|
2211 template <typename U>
|
jpayne@69
|
2212 class Awaiter;
|
jpayne@69
|
2213
|
jpayne@69
|
2214 template <typename U>
|
jpayne@69
|
2215 Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); }
|
jpayne@69
|
2216 template <typename U>
|
jpayne@69
|
2217 Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); }
|
jpayne@69
|
2218 // Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return
|
jpayne@69
|
2219 // an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async
|
jpayne@69
|
2220 // event system.
|
jpayne@69
|
2221 //
|
jpayne@69
|
2222 // There is another hook we could implement: an `operator co_await()` free function. However, a
|
jpayne@69
|
2223 // free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a
|
jpayne@69
|
2224 // member class template of Coroutine<T>, it is able to implement an
|
jpayne@69
|
2225 // `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing
|
jpayne@69
|
2226 // coroutine's PromiseNode. An `operator co_await()` free function would have to implement
|
jpayne@69
|
2227 // a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement
|
jpayne@69
|
2228 // suspension and resumption in terms of .then(). Yuck!
|
jpayne@69
|
2229
|
jpayne@69
|
2230 private:
|
jpayne@69
|
2231 // -------------------------------------------------------
|
jpayne@69
|
2232 // PromiseNode implementation
|
jpayne@69
|
2233
|
jpayne@69
|
2234 void get(ExceptionOrValue& output) noexcept override {
|
jpayne@69
|
2235 output.as<FixVoid<T>>() = kj::mv(result);
|
jpayne@69
|
2236 }
|
jpayne@69
|
2237
|
jpayne@69
|
2238 void fulfill(FixVoid<T>&& value) {
|
jpayne@69
|
2239 // Called by the return_value()/return_void() functions in our mixin class.
|
jpayne@69
|
2240
|
jpayne@69
|
2241 if (isWaiting()) {
|
jpayne@69
|
2242 result = kj::mv(value);
|
jpayne@69
|
2243 scheduleResumption();
|
jpayne@69
|
2244 }
|
jpayne@69
|
2245 }
|
jpayne@69
|
2246
|
jpayne@69
|
2247 ExceptionOr<FixVoid<T>> result;
|
jpayne@69
|
2248
|
jpayne@69
|
2249 friend class CoroutineMixin<Coroutine<T>, T>;
|
jpayne@69
|
2250 };
|
jpayne@69
|
2251
|
jpayne@69
|
2252 template <typename Self, typename T>
|
jpayne@69
|
2253 class CoroutineMixin {
|
jpayne@69
|
2254 public:
|
jpayne@69
|
2255 void return_value(T value) {
|
jpayne@69
|
2256 static_cast<Self*>(this)->fulfill(kj::mv(value));
|
jpayne@69
|
2257 }
|
jpayne@69
|
2258 };
|
jpayne@69
|
2259 template <typename Self>
|
jpayne@69
|
2260 class CoroutineMixin<Self, void> {
|
jpayne@69
|
2261 public:
|
jpayne@69
|
2262 void return_void() {
|
jpayne@69
|
2263 static_cast<Self*>(this)->fulfill(_::Void());
|
jpayne@69
|
2264 }
|
jpayne@69
|
2265 };
|
jpayne@69
|
2266 // The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return
|
jpayne@69
|
2267 // statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has
|
jpayne@69
|
2268 // both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so
|
jpayne@69
|
2269 // these return_* functions live in a CRTP mixin.
|
jpayne@69
|
2270
|
jpayne@69
|
2271 class CoroutineBase::AwaiterBase {
|
jpayne@69
|
2272 public:
|
jpayne@69
|
2273 explicit AwaiterBase(OwnPromiseNode node);
|
jpayne@69
|
2274 AwaiterBase(AwaiterBase&&);
|
jpayne@69
|
2275 ~AwaiterBase() noexcept(false);
|
jpayne@69
|
2276 KJ_DISALLOW_COPY(AwaiterBase);
|
jpayne@69
|
2277
|
jpayne@69
|
2278 bool await_ready() const { return false; }
|
jpayne@69
|
2279 // This could return "`node->get()` is safe to call" instead, which would make suspension-less
|
jpayne@69
|
2280 // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that
|
jpayne@69
|
2281 // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we
|
jpayne@69
|
2282 // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable
|
jpayne@69
|
2283 // suspension-less co_awaits.
|
jpayne@69
|
2284
|
jpayne@69
|
2285 protected:
|
jpayne@69
|
2286 void getImpl(ExceptionOrValue& result, void* awaitedAt);
|
jpayne@69
|
2287 bool awaitSuspendImpl(CoroutineBase& coroutineEvent);
|
jpayne@69
|
2288
|
jpayne@69
|
2289 private:
|
jpayne@69
|
2290 UnwindDetector unwindDetector;
|
jpayne@69
|
2291 OwnPromiseNode node;
|
jpayne@69
|
2292
|
jpayne@69
|
2293 Maybe<CoroutineBase&> maybeCoroutineEvent;
|
jpayne@69
|
2294 // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our
|
jpayne@69
|
2295 // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack
|
jpayne@69
|
2296 // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called,
|
jpayne@69
|
2297 // we need our own reference to the enclosing Coroutine. (I struggle to think up any such
|
jpayne@69
|
2298 // scenarios, but perhaps they could occur when destroying a suspended coroutine.)
|
jpayne@69
|
2299 };
|
jpayne@69
|
2300
|
jpayne@69
|
2301 template <typename T>
|
jpayne@69
|
2302 template <typename U>
|
jpayne@69
|
2303 class Coroutine<T>::Awaiter: public AwaiterBase {
|
jpayne@69
|
2304 // Wrapper around a co_await'ed promise and some storage space for the result of that promise.
|
jpayne@69
|
2305 // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up
|
jpayne@69
|
2306 // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event
|
jpayne@69
|
2307 // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the
|
jpayne@69
|
2308 // awaited promise result.
|
jpayne@69
|
2309
|
jpayne@69
|
2310 public:
|
jpayne@69
|
2311 explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {}
|
jpayne@69
|
2312
|
jpayne@69
|
2313 KJ_NOINLINE U await_resume() {
|
jpayne@69
|
2314 // This is marked noinline in order to ensure __builtin_return_address() is accurate for stack
|
jpayne@69
|
2315 // trace purposes. In my experimentation, this method was not inlined anyway even in opt
|
jpayne@69
|
2316 // builds, but I want to make sure it doesn't suddenly start being inlined later causing stack
|
jpayne@69
|
2317 // traces to break. (I also tried always-inline, but this did not appear to cause the compiler
|
jpayne@69
|
2318 // to inline the method -- perhaps a limitation of coroutines?)
|
jpayne@69
|
2319 #if __GNUC__
|
jpayne@69
|
2320 getImpl(result, __builtin_return_address(0));
|
jpayne@69
|
2321 #elif _MSC_VER
|
jpayne@69
|
2322 getImpl(result, _ReturnAddress());
|
jpayne@69
|
2323 #else
|
jpayne@69
|
2324 #error "please implement for your compiler"
|
jpayne@69
|
2325 #endif
|
jpayne@69
|
2326 auto value = kj::_::readMaybe(result.value);
|
jpayne@69
|
2327 KJ_IASSERT(value != nullptr, "Neither exception nor value present.");
|
jpayne@69
|
2328 return U(kj::mv(*value));
|
jpayne@69
|
2329 }
|
jpayne@69
|
2330
|
jpayne@69
|
2331 bool await_suspend(Coroutine::Handle coroutine) {
|
jpayne@69
|
2332 return awaitSuspendImpl(coroutine.promise());
|
jpayne@69
|
2333 }
|
jpayne@69
|
2334
|
jpayne@69
|
2335 private:
|
jpayne@69
|
2336 ExceptionOr<FixVoid<U>> result;
|
jpayne@69
|
2337 };
|
jpayne@69
|
2338
|
jpayne@69
|
2339 #undef KJ_COROUTINE_STD_NAMESPACE
|
jpayne@69
|
2340
|
jpayne@69
|
2341 } // namespace kj::_ (private)
|
jpayne@69
|
2342
|
jpayne@69
|
2343 #endif // KJ_HAS_COROUTINE
|
jpayne@69
|
2344
|
jpayne@69
|
2345 KJ_END_HEADER
|