annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-inl.h @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
jpayne@69 2 // Licensed under the MIT License:
jpayne@69 3 //
jpayne@69 4 // Permission is hereby granted, free of charge, to any person obtaining a copy
jpayne@69 5 // of this software and associated documentation files (the "Software"), to deal
jpayne@69 6 // in the Software without restriction, including without limitation the rights
jpayne@69 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
jpayne@69 8 // copies of the Software, and to permit persons to whom the Software is
jpayne@69 9 // furnished to do so, subject to the following conditions:
jpayne@69 10 //
jpayne@69 11 // The above copyright notice and this permission notice shall be included in
jpayne@69 12 // all copies or substantial portions of the Software.
jpayne@69 13 //
jpayne@69 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
jpayne@69 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
jpayne@69 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
jpayne@69 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
jpayne@69 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
jpayne@69 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
jpayne@69 20 // THE SOFTWARE.
jpayne@69 21
jpayne@69 22 // This file contains extended inline implementation details that are required along with async.h.
jpayne@69 23 // We move this all into a separate file to make async.h more readable.
jpayne@69 24 //
jpayne@69 25 // Non-inline declarations here are defined in async.c++.
jpayne@69 26
jpayne@69 27 #pragma once
jpayne@69 28
jpayne@69 29 #ifndef KJ_ASYNC_H_INCLUDED
jpayne@69 30 #error "Do not include this directly; include kj/async.h."
jpayne@69 31 #include "async.h" // help IDE parse this file
jpayne@69 32 #endif
jpayne@69 33
jpayne@69 34 #if _MSC_VER && KJ_HAS_COROUTINE
jpayne@69 35 #include <intrin.h>
jpayne@69 36 #endif
jpayne@69 37
jpayne@69 38 #include <kj/list.h>
jpayne@69 39
jpayne@69 40 KJ_BEGIN_HEADER
jpayne@69 41
jpayne@69 42 namespace kj {
jpayne@69 43 namespace _ { // private
jpayne@69 44
jpayne@69 45 template <typename T>
jpayne@69 46 class ExceptionOr;
jpayne@69 47
jpayne@69 48 class ExceptionOrValue {
jpayne@69 49 public:
jpayne@69 50 ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {}
jpayne@69 51 KJ_DISALLOW_COPY(ExceptionOrValue);
jpayne@69 52
jpayne@69 53 void addException(Exception&& exception) {
jpayne@69 54 if (this->exception == nullptr) {
jpayne@69 55 this->exception = kj::mv(exception);
jpayne@69 56 }
jpayne@69 57 }
jpayne@69 58
jpayne@69 59 template <typename T>
jpayne@69 60 ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
jpayne@69 61 template <typename T>
jpayne@69 62 const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); }
jpayne@69 63
jpayne@69 64 Maybe<Exception> exception;
jpayne@69 65
jpayne@69 66 protected:
jpayne@69 67 // Allow subclasses to have move constructor / assignment.
jpayne@69 68 ExceptionOrValue() = default;
jpayne@69 69 ExceptionOrValue(ExceptionOrValue&& other) = default;
jpayne@69 70 ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
jpayne@69 71 };
jpayne@69 72
jpayne@69 73 template <typename T>
jpayne@69 74 class ExceptionOr: public ExceptionOrValue {
jpayne@69 75 public:
jpayne@69 76 ExceptionOr() = default;
jpayne@69 77 ExceptionOr(T&& value): value(kj::mv(value)) {}
jpayne@69 78 ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
jpayne@69 79 ExceptionOr(ExceptionOr&&) = default;
jpayne@69 80 ExceptionOr& operator=(ExceptionOr&&) = default;
jpayne@69 81
jpayne@69 82 Maybe<T> value;
jpayne@69 83 };
jpayne@69 84
jpayne@69 85 template <typename T>
jpayne@69 86 inline T convertToReturn(ExceptionOr<T>&& result) {
jpayne@69 87 KJ_IF_MAYBE(value, result.value) {
jpayne@69 88 KJ_IF_MAYBE(exception, result.exception) {
jpayne@69 89 throwRecoverableException(kj::mv(*exception));
jpayne@69 90 }
jpayne@69 91 return _::returnMaybeVoid(kj::mv(*value));
jpayne@69 92 } else KJ_IF_MAYBE(exception, result.exception) {
jpayne@69 93 throwFatalException(kj::mv(*exception));
jpayne@69 94 } else {
jpayne@69 95 // Result contained neither a value nor an exception?
jpayne@69 96 KJ_UNREACHABLE;
jpayne@69 97 }
jpayne@69 98 }
jpayne@69 99
jpayne@69 100 inline void convertToReturn(ExceptionOr<Void>&& result) {
jpayne@69 101 // Override <void> case to use throwRecoverableException().
jpayne@69 102
jpayne@69 103 if (result.value != nullptr) {
jpayne@69 104 KJ_IF_MAYBE(exception, result.exception) {
jpayne@69 105 throwRecoverableException(kj::mv(*exception));
jpayne@69 106 }
jpayne@69 107 } else KJ_IF_MAYBE(exception, result.exception) {
jpayne@69 108 throwRecoverableException(kj::mv(*exception));
jpayne@69 109 } else {
jpayne@69 110 // Result contained neither a value nor an exception?
jpayne@69 111 KJ_UNREACHABLE;
jpayne@69 112 }
jpayne@69 113 }
jpayne@69 114
jpayne@69 115 class TraceBuilder {
jpayne@69 116 // Helper for methods that build a call trace.
jpayne@69 117 public:
jpayne@69 118 TraceBuilder(ArrayPtr<void*> space)
jpayne@69 119 : start(space.begin()), current(space.begin()), limit(space.end()) {}
jpayne@69 120
jpayne@69 121 inline void add(void* addr) {
jpayne@69 122 if (current < limit) {
jpayne@69 123 *current++ = addr;
jpayne@69 124 }
jpayne@69 125 }
jpayne@69 126
jpayne@69 127 inline bool full() const { return current == limit; }
jpayne@69 128
jpayne@69 129 ArrayPtr<void*> finish() {
jpayne@69 130 return arrayPtr(start, current);
jpayne@69 131 }
jpayne@69 132
jpayne@69 133 String toString();
jpayne@69 134
jpayne@69 135 private:
jpayne@69 136 void** start;
jpayne@69 137 void** current;
jpayne@69 138 void** limit;
jpayne@69 139 };
jpayne@69 140
jpayne@69 141 struct alignas(void*) PromiseArena {
jpayne@69 142 // Space in which a chain of promises may be allocated. See PromiseDisposer.
jpayne@69 143 byte bytes[1024];
jpayne@69 144 };
jpayne@69 145
jpayne@69 146 class Event: private AsyncObject {
jpayne@69 147 // An event waiting to be executed. Not for direct use by applications -- promises use this
jpayne@69 148 // internally.
jpayne@69 149
jpayne@69 150 public:
jpayne@69 151 Event(SourceLocation location);
jpayne@69 152 Event(kj::EventLoop& loop, SourceLocation location);
jpayne@69 153 ~Event() noexcept(false);
jpayne@69 154 KJ_DISALLOW_COPY_AND_MOVE(Event);
jpayne@69 155
jpayne@69 156 void armDepthFirst();
jpayne@69 157 // Enqueue this event so that `fire()` will be called from the event loop soon.
jpayne@69 158 //
jpayne@69 159 // Events scheduled in this way are executed in depth-first order: if an event callback arms
jpayne@69 160 // more events, those events are placed at the front of the queue (in the order in which they
jpayne@69 161 // were armed), so that they run immediately after the first event's callback returns.
jpayne@69 162 //
jpayne@69 163 // Depth-first event scheduling is appropriate for events that represent simple continuations
jpayne@69 164 // of a previous event that should be globbed together for performance. Depth-first scheduling
jpayne@69 165 // can lead to starvation, so any long-running task must occasionally yield with
jpayne@69 166 // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
jpayne@69 167 // breadth-first.)
jpayne@69 168 //
jpayne@69 169 // To use breadth-first scheduling instead, use `armBreadthFirst()`.
jpayne@69 170
jpayne@69 171 void armBreadthFirst();
jpayne@69 172 // Like `armDepthFirst()` except that the event is placed at the end of the queue.
jpayne@69 173
jpayne@69 174 void armLast();
jpayne@69 175 // Enqueues this event to happen after all other events have run to completion and there is
jpayne@69 176 // really nothing left to do except wait for I/O.
jpayne@69 177
jpayne@69 178 bool isNext();
jpayne@69 179 // True if the Event has been armed and is next in line to be fired. This can be used after
jpayne@69 180 // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately
jpayne@69 181 // ready, in which case continuations may be optimistically run without returning to the event
jpayne@69 182 // loop. Note that this optimization is only valid if we know that we would otherwise immediately
jpayne@69 183 // return to the event loop without running more application code. So this turns out to be useful
jpayne@69 184 // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it
jpayne@69 185 // doesn't need to.
jpayne@69 186 //
jpayne@69 187 // Returns false if the event loop is not currently running. This ensures that promise
jpayne@69 188 // continuations don't execute except under a call to .wait().
jpayne@69 189
jpayne@69 190 void disarm();
jpayne@69 191 // If the event is armed but hasn't fired, cancel it. (Destroying the event does this
jpayne@69 192 // implicitly.)
jpayne@69 193
jpayne@69 194 virtual void traceEvent(TraceBuilder& builder) = 0;
jpayne@69 195 // Build a trace of the callers leading up to this event. `builder` will be populated with
jpayne@69 196 // "return addresses" of the promise chain waiting on this event. The return addresses may
jpayne@69 197 // actually the addresses of lambdas passed to .then(), but in any case, feeding them into
jpayne@69 198 // addr2line should produce useful source code locations.
jpayne@69 199 //
jpayne@69 200 // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It
jpayne@69 201 // must not allocate nor take locks.
jpayne@69 202
jpayne@69 203 String traceEvent();
jpayne@69 204 // Helper that builds a trace and stringifies it.
jpayne@69 205
jpayne@69 206 protected:
jpayne@69 207 virtual Maybe<Own<Event>> fire() = 0;
jpayne@69 208 // Fire the event. Possibly returns a pointer to itself, which will be discarded by the
jpayne@69 209 // caller. This is the only way that an event can delete itself as a result of firing, as
jpayne@69 210 // doing so from within fire() will throw an exception.
jpayne@69 211
jpayne@69 212 private:
jpayne@69 213 friend class kj::EventLoop;
jpayne@69 214 EventLoop& loop;
jpayne@69 215 Event* next;
jpayne@69 216 Event** prev;
jpayne@69 217 bool firing = false;
jpayne@69 218
jpayne@69 219 static constexpr uint MAGIC_LIVE_VALUE = 0x1e366381u;
jpayne@69 220 uint live = MAGIC_LIVE_VALUE;
jpayne@69 221 SourceLocation location;
jpayne@69 222 };
jpayne@69 223
jpayne@69 224 class PromiseArenaMember {
jpayne@69 225 // An object that is allocated in a PromiseArena. `PromiseNode` inherits this, and most
jpayne@69 226 // arena-allocated objects are `PromiseNode` subclasses, but `TaskSet::Task`, ForkHub, and
jpayne@69 227 // potentially other objects that commonly live on the end of a promise chain can also leverage
jpayne@69 228 // this.
jpayne@69 229
jpayne@69 230 public:
jpayne@69 231 virtual void destroy() = 0;
jpayne@69 232 // Destroys and frees the node.
jpayne@69 233 //
jpayne@69 234 // If the node was allocated using allocPromise<T>(), then destroy() must call
jpayne@69 235 // freePromise<T>(this). If it was allocated some other way, then it is `destroy()`'s
jpayne@69 236 // responsibility to complete any necessary cleanup of memory, e.g. call `delete this`.
jpayne@69 237 //
jpayne@69 238 // We use this instead of a virtual destructor for two reasons:
jpayne@69 239 // 1. Coroutine nodes are not independent objects, they have to call destroy() on the coroutine
jpayne@69 240 // handle to delete themselves.
jpayne@69 241 // 2. XThreadEvents sometimes leave it up to a different thread to actually delete the object.
jpayne@69 242
jpayne@69 243 private:
jpayne@69 244 PromiseArena* arena = nullptr;
jpayne@69 245 // If non-null, then this PromiseNode is the last node allocated within the given arena, and
jpayne@69 246 // therefore owns the arena. After this node is destroyed, the arena should be deleted.
jpayne@69 247 //
jpayne@69 248 // PromiseNodes are allocated within the arena starting from the end, and `PromiseNode`s
jpayne@69 249 // allocated this way are required to have `PromiseNode` itself as their leftmost inherited type,
jpayne@69 250 // so that the pointers match. Thus, the space in `arena` from its start to the location of the
jpayne@69 251 // `PromiseNode` is known to be available for subsequent allocations (which should then take
jpayne@69 252 // ownership of the arena).
jpayne@69 253
jpayne@69 254 friend class PromiseDisposer;
jpayne@69 255 };
jpayne@69 256
jpayne@69 257 class PromiseNode: public PromiseArenaMember, private AsyncObject {
jpayne@69 258 // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
jpayne@69 259 //
jpayne@69 260 // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky
jpayne@69 261 // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only
jpayne@69 262 // so down-cast in the few places that really need to be templated. Luckily this is all
jpayne@69 263 // internal implementation details.
jpayne@69 264
jpayne@69 265 public:
jpayne@69 266 virtual void onReady(Event* event) noexcept = 0;
jpayne@69 267 // Arms the given event when ready.
jpayne@69 268 //
jpayne@69 269 // May be called multiple times. If called again before the event was armed, the old event will
jpayne@69 270 // never be armed, only the new one. If called again after the event was armed, the new event
jpayne@69 271 // will be armed immediately. Can be called with nullptr to un-register the existing event.
jpayne@69 272
jpayne@69 273 virtual void setSelfPointer(OwnPromiseNode* selfPtr) noexcept;
jpayne@69 274 // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own
jpayne@69 275 // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses
jpayne@69 276 // this to shorten redundant chains. The default implementation does nothing; only
jpayne@69 277 // ChainPromiseNode should implement this.
jpayne@69 278
jpayne@69 279 virtual void get(ExceptionOrValue& output) noexcept = 0;
jpayne@69 280 // Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
jpayne@69 281 // Can only be called once, and only after the node is ready. Must be called directly from the
jpayne@69 282 // event loop, with no application code on the stack.
jpayne@69 283
jpayne@69 284 virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0;
jpayne@69 285 // Build a trace of this promise chain, showing what it is currently waiting on.
jpayne@69 286 //
jpayne@69 287 // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically
jpayne@69 288 // recurse to its child first, then after the child returns, add itself to the trace.
jpayne@69 289 //
jpayne@69 290 // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that
jpayne@69 291 // also implements Event, and should not trace that node or its children. This is used in
jpayne@69 292 // conjunction with Event::traceEvent(). The chain of Events is often more sparse than the chain
jpayne@69 293 // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an
jpayne@69 294 // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node
jpayne@69 295 // when it is ready, and then TransformPromiseNode applies the .then() transformation during the
jpayne@69 296 // call to .get().
jpayne@69 297 //
jpayne@69 298 // So, when we trace the chain of Events backwards, we end up hoping over segments of
jpayne@69 299 // TransformPromiseNodes (and other similar types). In order to get those added to the trace,
jpayne@69 300 // each Event must call back down the PromiseNode chain in the opposite direction, using this
jpayne@69 301 // method.
jpayne@69 302 //
jpayne@69 303 // `tracePromise()` may be called from an async signal handler while `get()` is executing. It
jpayne@69 304 // must not allocate nor take locks.
jpayne@69 305
jpayne@69 306 template <typename T>
jpayne@69 307 static OwnPromiseNode from(T&& promise) {
jpayne@69 308 // Given a Promise, extract the PromiseNode.
jpayne@69 309 return kj::mv(promise.node);
jpayne@69 310 }
jpayne@69 311 template <typename T>
jpayne@69 312 static PromiseNode& from(T& promise) {
jpayne@69 313 // Given a Promise, extract the PromiseNode.
jpayne@69 314 return *promise.node;
jpayne@69 315 }
jpayne@69 316 template <typename T>
jpayne@69 317 static T to(OwnPromiseNode&& node) {
jpayne@69 318 // Construct a Promise from a PromiseNode. (T should be a Promise type.)
jpayne@69 319 return T(false, kj::mv(node));
jpayne@69 320 }
jpayne@69 321
jpayne@69 322 protected:
jpayne@69 323 class OnReadyEvent {
jpayne@69 324 // Helper class for implementing onReady().
jpayne@69 325
jpayne@69 326 public:
jpayne@69 327 void init(Event* newEvent);
jpayne@69 328
jpayne@69 329 void arm();
jpayne@69 330 void armBreadthFirst();
jpayne@69 331 // Arms the event if init() has already been called and makes future calls to init()
jpayne@69 332 // automatically arm the event.
jpayne@69 333
jpayne@69 334 inline void traceEvent(TraceBuilder& builder) {
jpayne@69 335 if (event != nullptr && !builder.full()) event->traceEvent(builder);
jpayne@69 336 }
jpayne@69 337
jpayne@69 338 private:
jpayne@69 339 Event* event = nullptr;
jpayne@69 340 };
jpayne@69 341 };
jpayne@69 342
jpayne@69 343 class PromiseDisposer {
jpayne@69 344 public:
jpayne@69 345 template <typename T>
jpayne@69 346 static constexpr bool canArenaAllocate() {
jpayne@69 347 // We can only use arena allocation for types that fit in an arena and have pointer-size
jpayne@69 348 // alignment. Anything else will need to be allocated as a separate heap object.
jpayne@69 349 return sizeof(T) <= sizeof(PromiseArena) && alignof(T) <= alignof(void*);
jpayne@69 350 }
jpayne@69 351
jpayne@69 352 static void dispose(PromiseArenaMember* node) {
jpayne@69 353 PromiseArena* arena = node->arena;
jpayne@69 354 node->destroy();
jpayne@69 355 delete arena; // reminder: `delete` automatically ignores null pointers
jpayne@69 356 }
jpayne@69 357
jpayne@69 358 template <typename T, typename D = PromiseDisposer, typename... Params>
jpayne@69 359 static kj::Own<T, D> alloc(Params&&... params) noexcept {
jpayne@69 360 // Implements allocPromise().
jpayne@69 361 T* ptr;
jpayne@69 362 if (!canArenaAllocate<T>()) {
jpayne@69 363 // Node too big (or needs weird alignment), fall back to regular heap allocation.
jpayne@69 364 ptr = new T(kj::fwd<Params>(params)...);
jpayne@69 365 } else {
jpayne@69 366 // Start a new arena.
jpayne@69 367 //
jpayne@69 368 // NOTE: As in append() (below), we don't implement exception-safety because it causes code
jpayne@69 369 // bloat and these constructors probably don't throw. Instead this function is noexcept, so
jpayne@69 370 // if a constructor does throw, it'll crash rather than leak memory.
jpayne@69 371 auto* arena = new PromiseArena;
jpayne@69 372 ptr = reinterpret_cast<T*>(arena + 1) - 1;
jpayne@69 373 ctor(*ptr, kj::fwd<Params>(params)...);
jpayne@69 374 ptr->arena = arena;
jpayne@69 375 KJ_IREQUIRE(reinterpret_cast<void*>(ptr) ==
jpayne@69 376 reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)),
jpayne@69 377 "PromiseArenaMember must be the leftmost inherited type.");
jpayne@69 378 }
jpayne@69 379 return kj::Own<T, D>(ptr);
jpayne@69 380 }
jpayne@69 381
jpayne@69 382 template <typename T, typename D = PromiseDisposer, typename... Params>
jpayne@69 383 static kj::Own<T, D> append(
jpayne@69 384 OwnPromiseNode&& next, Params&&... params) noexcept {
jpayne@69 385 // Implements appendPromise().
jpayne@69 386
jpayne@69 387 PromiseArena* arena = next->arena;
jpayne@69 388
jpayne@69 389 if (!canArenaAllocate<T>() || arena == nullptr ||
jpayne@69 390 reinterpret_cast<byte*>(next.get()) - reinterpret_cast<byte*>(arena) < sizeof(T)) {
jpayne@69 391 // No arena available, or not enough space, or weird alignment needed. Start new arena.
jpayne@69 392 return alloc<T, D>(kj::mv(next), kj::fwd<Params>(params)...);
jpayne@69 393 } else {
jpayne@69 394 // Append to arena.
jpayne@69 395 //
jpayne@69 396 // NOTE: When we call ctor(), it takes ownership of `next`, so we shouldn't assume `next`
jpayne@69 397 // still exists after it returns. So we have to remove ownership of the arena before that.
jpayne@69 398 // In theory if we wanted this to be exception-safe, we'd also have to arrange to delete
jpayne@69 399 // the arena if the constructor throws. However, in practice none of the PromiseNode
jpayne@69 400 // constructors throw, so we just mark the whole method noexcept in order to avoid the
jpayne@69 401 // code bloat to handle this case.
jpayne@69 402 next->arena = nullptr;
jpayne@69 403 T* ptr = reinterpret_cast<T*>(next.get()) - 1;
jpayne@69 404 ctor(*ptr, kj::mv(next), kj::fwd<Params>(params)...);
jpayne@69 405 ptr->arena = arena;
jpayne@69 406 KJ_IREQUIRE(reinterpret_cast<void*>(ptr) ==
jpayne@69 407 reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)),
jpayne@69 408 "PromiseArenaMember must be the leftmost inherited type.");
jpayne@69 409 return kj::Own<T, D>(ptr);
jpayne@69 410 }
jpayne@69 411 }
jpayne@69 412 };
jpayne@69 413
jpayne@69 414 template <typename T, typename... Params>
jpayne@69 415 static kj::Own<T, PromiseDisposer> allocPromise(Params&&... params) {
jpayne@69 416 // Allocate a PromiseNode without appending it to any existing promise arena. Space for a new
jpayne@69 417 // arena will be allocated.
jpayne@69 418 return PromiseDisposer::alloc<T>(kj::fwd<Params>(params)...);
jpayne@69 419 }
jpayne@69 420
jpayne@69 421 template <typename T, bool arena = PromiseDisposer::canArenaAllocate<T>()>
jpayne@69 422 struct FreePromiseNode;
jpayne@69 423 template <typename T>
jpayne@69 424 struct FreePromiseNode<T, true> {
jpayne@69 425 static inline void free(T* ptr) {
jpayne@69 426 // The object will have been allocated in an arena, so we only want to run the destructor.
jpayne@69 427 // The arena's memory will be freed separately.
jpayne@69 428 kj::dtor(*ptr);
jpayne@69 429 }
jpayne@69 430 };
jpayne@69 431 template <typename T>
jpayne@69 432 struct FreePromiseNode<T, false> {
jpayne@69 433 static inline void free(T* ptr) {
jpayne@69 434 // The object will have been allocated separately on the heap.
jpayne@69 435 return delete ptr;
jpayne@69 436 }
jpayne@69 437 };
jpayne@69 438
jpayne@69 439 template <typename T>
jpayne@69 440 static void freePromise(T* ptr) {
jpayne@69 441 // Free a PromiseNode originally allocated using `allocPromise<T>()`. The implementation of
jpayne@69 442 // PromiseNode::destroy() must call this for any type that is allocated using allocPromise().
jpayne@69 443 FreePromiseNode<T>::free(ptr);
jpayne@69 444 }
jpayne@69 445
jpayne@69 446 template <typename T, typename... Params>
jpayne@69 447 static kj::Own<T, PromiseDisposer> appendPromise(OwnPromiseNode&& next, Params&&... params) {
jpayne@69 448 // Append a promise to the arena that currently ends with `next`. `next` is also still passed as
jpayne@69 449 // the first parameter to the new object's constructor.
jpayne@69 450 //
jpayne@69 451 // This is semantically the same as `allocPromise()` except that it may avoid the underlying
jpayne@69 452 // memory allocation. `next` must end up being destroyed before the new object (i.e. the new
jpayne@69 453 // object must never transfer away ownership of `next`).
jpayne@69 454 return PromiseDisposer::append<T>(kj::mv(next), kj::fwd<Params>(params)...);
jpayne@69 455 }
jpayne@69 456
jpayne@69 457 // -------------------------------------------------------------------
jpayne@69 458
jpayne@69 459 inline ReadyNow::operator Promise<void>() const {
jpayne@69 460 return PromiseNode::to<Promise<void>>(readyNow());
jpayne@69 461 }
jpayne@69 462
jpayne@69 463 template <typename T>
jpayne@69 464 inline NeverDone::operator Promise<T>() const {
jpayne@69 465 return PromiseNode::to<Promise<T>>(neverDone());
jpayne@69 466 }
jpayne@69 467
jpayne@69 468 // -------------------------------------------------------------------
jpayne@69 469
jpayne@69 470 class ImmediatePromiseNodeBase: public PromiseNode {
jpayne@69 471 public:
jpayne@69 472 ImmediatePromiseNodeBase();
jpayne@69 473 ~ImmediatePromiseNodeBase() noexcept(false);
jpayne@69 474
jpayne@69 475 void onReady(Event* event) noexcept override;
jpayne@69 476 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 477 };
jpayne@69 478
jpayne@69 479 template <typename T>
jpayne@69 480 class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
jpayne@69 481 // A promise that has already been resolved to an immediate value or exception.
jpayne@69 482
jpayne@69 483 public:
jpayne@69 484 ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
jpayne@69 485 void destroy() override { freePromise(this); }
jpayne@69 486
jpayne@69 487 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 488 output.as<T>() = kj::mv(result);
jpayne@69 489 }
jpayne@69 490
jpayne@69 491 private:
jpayne@69 492 ExceptionOr<T> result;
jpayne@69 493 };
jpayne@69 494
jpayne@69 495 class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
jpayne@69 496 public:
jpayne@69 497 ImmediateBrokenPromiseNode(Exception&& exception);
jpayne@69 498 void destroy() override;
jpayne@69 499
jpayne@69 500 void get(ExceptionOrValue& output) noexcept override;
jpayne@69 501
jpayne@69 502 private:
jpayne@69 503 Exception exception;
jpayne@69 504 };
jpayne@69 505
jpayne@69 506 template <typename T, T value>
jpayne@69 507 class ConstPromiseNode: public ImmediatePromiseNodeBase {
jpayne@69 508 public:
jpayne@69 509 void destroy() override {}
jpayne@69 510 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 511 output.as<T>() = value;
jpayne@69 512 }
jpayne@69 513 };
jpayne@69 514
jpayne@69 515 // -------------------------------------------------------------------
jpayne@69 516
jpayne@69 517 class AttachmentPromiseNodeBase: public PromiseNode {
jpayne@69 518 public:
jpayne@69 519 AttachmentPromiseNodeBase(OwnPromiseNode&& dependency);
jpayne@69 520
jpayne@69 521 void onReady(Event* event) noexcept override;
jpayne@69 522 void get(ExceptionOrValue& output) noexcept override;
jpayne@69 523 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 524
jpayne@69 525 private:
jpayne@69 526 OwnPromiseNode dependency;
jpayne@69 527
jpayne@69 528 void dropDependency();
jpayne@69 529
jpayne@69 530 template <typename>
jpayne@69 531 friend class AttachmentPromiseNode;
jpayne@69 532 };
jpayne@69 533
jpayne@69 534 template <typename Attachment>
jpayne@69 535 class AttachmentPromiseNode final: public AttachmentPromiseNodeBase {
jpayne@69 536 // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable
jpayne@69 537 // object) until the promise resolves.
jpayne@69 538
jpayne@69 539 public:
jpayne@69 540 AttachmentPromiseNode(OwnPromiseNode&& dependency, Attachment&& attachment)
jpayne@69 541 : AttachmentPromiseNodeBase(kj::mv(dependency)),
jpayne@69 542 attachment(kj::mv<Attachment>(attachment)) {}
jpayne@69 543 void destroy() override { freePromise(this); }
jpayne@69 544
jpayne@69 545 ~AttachmentPromiseNode() noexcept(false) {
jpayne@69 546 // We need to make sure the dependency is deleted before we delete the attachment because the
jpayne@69 547 // dependency may be using the attachment.
jpayne@69 548 dropDependency();
jpayne@69 549 }
jpayne@69 550
jpayne@69 551 private:
jpayne@69 552 Attachment attachment;
jpayne@69 553 };
jpayne@69 554
jpayne@69 555 // -------------------------------------------------------------------
jpayne@69 556
jpayne@69 557 #if __GNUC__ >= 8 && !__clang__
jpayne@69 558 // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no
jpayne@69 559 // "legal" way for us to extract the content of a PTMF so too bad.
jpayne@69 560 #pragma GCC diagnostic push
jpayne@69 561 #pragma GCC diagnostic ignored "-Wclass-memaccess"
jpayne@69 562 #if __GNUC__ >= 11
jpayne@69 563 // GCC 11's array-bounds is similarly upset with us for digging into "private" implementation
jpayne@69 564 // details. But the format is well-defined by the ABI which cannot change so please just let us
jpayne@69 565 // do it kthx.
jpayne@69 566 #pragma GCC diagnostic ignored "-Warray-bounds"
jpayne@69 567 #endif
jpayne@69 568 #endif
jpayne@69 569
jpayne@69 570 template <typename T, typename ReturnType, typename... ParamTypes>
jpayne@69 571 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
jpayne@69 572 template <typename T, typename ReturnType, typename... ParamTypes>
jpayne@69 573 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
jpayne@69 574 // Given an object and a pointer-to-method, return the start address of the method's code. The
jpayne@69 575 // intent is that this address can be used in a trace; addr2line should map it to the start of
jpayne@69 576 // the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine
jpayne@69 577 // the address of the specific implementation (otherwise, `obj` wouldn't be needed).
jpayne@69 578 //
jpayne@69 579 // Note that if the method is overloaded or is a template, you will need to explicitly specify
jpayne@69 580 // the param and return types, otherwise the compiler won't know which overload / template
jpayne@69 581 // specialization you are requesting.
jpayne@69 582
jpayne@69 583 class PtmfHelper {
jpayne@69 584 // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The
jpayne@69 585 // class represents the internal representation of a pointer-to-member-function.
jpayne@69 586
jpayne@69 587 template <typename... ParamTypes>
jpayne@69 588 friend struct GetFunctorStartAddress;
jpayne@69 589 template <typename T, typename ReturnType, typename... ParamTypes>
jpayne@69 590 friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
jpayne@69 591 template <typename T, typename ReturnType, typename... ParamTypes>
jpayne@69 592 friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
jpayne@69 593
jpayne@69 594 #if __GNUG__
jpayne@69 595
jpayne@69 596 void* ptr;
jpayne@69 597 ptrdiff_t adj;
jpayne@69 598 // Layout of a pointer-to-member-function used by GCC and compatible compilers.
jpayne@69 599
jpayne@69 600 void* apply(const void* obj) {
jpayne@69 601 #if defined(__arm__) || defined(__mips__) || defined(__aarch64__)
jpayne@69 602 if (adj & 1) {
jpayne@69 603 ptrdiff_t voff = (ptrdiff_t)ptr;
jpayne@69 604 #else
jpayne@69 605 ptrdiff_t voff = (ptrdiff_t)ptr;
jpayne@69 606 if (voff & 1) {
jpayne@69 607 voff &= ~1;
jpayne@69 608 #endif
jpayne@69 609 return *(void**)(*(char**)obj + voff);
jpayne@69 610 } else {
jpayne@69 611 return ptr;
jpayne@69 612 }
jpayne@69 613 }
jpayne@69 614
jpayne@69 615 #define BODY \
jpayne@69 616 PtmfHelper result; \
jpayne@69 617 static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \
jpayne@69 618 memcpy(&result, &p, sizeof(result)); \
jpayne@69 619 return result
jpayne@69 620
jpayne@69 621 #else // __GNUG__
jpayne@69 622
jpayne@69 623 void* apply(const void* obj) { return nullptr; }
jpayne@69 624 // TODO(port): PTMF instruction address extraction
jpayne@69 625
jpayne@69 626 #define BODY return PtmfHelper{}
jpayne@69 627
jpayne@69 628 #endif // __GNUG__, else
jpayne@69 629
jpayne@69 630 template <typename R, typename C, typename... P, typename F>
jpayne@69 631 static PtmfHelper from(F p) { BODY; }
jpayne@69 632 // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not
jpayne@69 633 // overloaded nor a template. In this case the compiler is able to deduce the full function
jpayne@69 634 // signature directly given the name since there is only one function with that name.
jpayne@69 635
jpayne@69 636 template <typename R, typename C, typename... P>
jpayne@69 637 static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; }
jpayne@69 638 template <typename R, typename C, typename... P>
jpayne@69 639 static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; }
jpayne@69 640 // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case
jpayne@69 641 // the function must match exactly the containing type C, return type R, and parameter types P...
jpayne@69 642 // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a
jpayne@69 643 // guess at P. Luckily, if the function parameters are template parameters then it's not
jpayne@69 644 // necessary to be precise about P.
jpayne@69 645 #undef BODY
jpayne@69 646 };
jpayne@69 647
jpayne@69 648 #if __GNUC__ >= 8 && !__clang__
jpayne@69 649 #pragma GCC diagnostic pop
jpayne@69 650 #endif
jpayne@69 651
jpayne@69 652 template <typename T, typename ReturnType, typename... ParamTypes>
jpayne@69 653 void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) {
jpayne@69 654 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
jpayne@69 655 }
jpayne@69 656 template <typename T, typename ReturnType, typename... ParamTypes>
jpayne@69 657 void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) {
jpayne@69 658 return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
jpayne@69 659 }
jpayne@69 660
jpayne@69 661 template <typename... ParamTypes>
jpayne@69 662 struct GetFunctorStartAddress {
jpayne@69 663 // Given a functor (any object defining operator()), return the start address of the function,
jpayne@69 664 // suitable for passing to addr2line to obtain a source file/line for debugging purposes.
jpayne@69 665 //
jpayne@69 666 // This turns out to be incredibly hard to implement in the presence of overloaded or templated
jpayne@69 667 // functors. Therefore, we impose these specific restrictions, specific to our use case:
jpayne@69 668 // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas
jpayne@69 669 // anyway.)
jpayne@69 670 // - The template parameters to GetFunctorStartAddress specify a hint as to the expected
jpayne@69 671 // parameter types. If the functor is templated, its parameters must match exactly these types.
jpayne@69 672 // (If it's not templated, ParamTypes are ignored.)
jpayne@69 673
jpayne@69 674 template <typename Func>
jpayne@69 675 static void* apply(Func&& func) {
jpayne@69 676 typedef decltype(func(instance<ParamTypes>()...)) ReturnType;
jpayne@69 677 return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>(
jpayne@69 678 &Decay<Func>::operator()).apply(&func);
jpayne@69 679 }
jpayne@69 680 };
jpayne@69 681
jpayne@69 682 template <>
jpayne@69 683 struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {};
jpayne@69 684 // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function
jpayne@69 685 // actually has no parameters.
jpayne@69 686
jpayne@69 687 class TransformPromiseNodeBase: public PromiseNode {
jpayne@69 688 public:
jpayne@69 689 TransformPromiseNodeBase(OwnPromiseNode&& dependency, void* continuationTracePtr);
jpayne@69 690
jpayne@69 691 void onReady(Event* event) noexcept override;
jpayne@69 692 void get(ExceptionOrValue& output) noexcept override;
jpayne@69 693 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 694
jpayne@69 695 private:
jpayne@69 696 OwnPromiseNode dependency;
jpayne@69 697 void* continuationTracePtr;
jpayne@69 698
jpayne@69 699 void dropDependency();
jpayne@69 700 void getDepResult(ExceptionOrValue& output);
jpayne@69 701
jpayne@69 702 virtual void getImpl(ExceptionOrValue& output) = 0;
jpayne@69 703
jpayne@69 704 template <typename, typename, typename, typename>
jpayne@69 705 friend class TransformPromiseNode;
jpayne@69 706 };
jpayne@69 707
jpayne@69 708 template <typename T, typename DepT, typename Func, typename ErrorFunc>
jpayne@69 709 class TransformPromiseNode final: public TransformPromiseNodeBase {
jpayne@69 710 // A PromiseNode that transforms the result of another PromiseNode through an application-provided
jpayne@69 711 // function (implements `then()`).
jpayne@69 712
jpayne@69 713 public:
jpayne@69 714 TransformPromiseNode(OwnPromiseNode&& dependency, Func&& func, ErrorFunc&& errorHandler,
jpayne@69 715 void* continuationTracePtr)
jpayne@69 716 : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr),
jpayne@69 717 func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
jpayne@69 718 void destroy() override { freePromise(this); }
jpayne@69 719
jpayne@69 720 ~TransformPromiseNode() noexcept(false) {
jpayne@69 721 // We need to make sure the dependency is deleted before we delete the continuations because it
jpayne@69 722 // is a common pattern for the continuations to hold ownership of objects that might be in-use
jpayne@69 723 // by the dependency.
jpayne@69 724 dropDependency();
jpayne@69 725 }
jpayne@69 726
jpayne@69 727 private:
jpayne@69 728 Func func;
jpayne@69 729 ErrorFunc errorHandler;
jpayne@69 730
jpayne@69 731 void getImpl(ExceptionOrValue& output) override {
jpayne@69 732 ExceptionOr<DepT> depResult;
jpayne@69 733 getDepResult(depResult);
jpayne@69 734 KJ_IF_MAYBE(depException, depResult.exception) {
jpayne@69 735 output.as<T>() = handle(
jpayne@69 736 MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply(
jpayne@69 737 errorHandler, kj::mv(*depException)));
jpayne@69 738 } else KJ_IF_MAYBE(depValue, depResult.value) {
jpayne@69 739 output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
jpayne@69 740 }
jpayne@69 741 }
jpayne@69 742
jpayne@69 743 ExceptionOr<T> handle(T&& value) {
jpayne@69 744 return kj::mv(value);
jpayne@69 745 }
jpayne@69 746 ExceptionOr<T> handle(PropagateException::Bottom&& value) {
jpayne@69 747 return ExceptionOr<T>(false, value.asException());
jpayne@69 748 }
jpayne@69 749 };
jpayne@69 750
jpayne@69 751 // -------------------------------------------------------------------
jpayne@69 752
jpayne@69 753 class ForkHubBase;
jpayne@69 754 using OwnForkHubBase = Own<ForkHubBase, ForkHubBase>;
jpayne@69 755
jpayne@69 756 class ForkBranchBase: public PromiseNode {
jpayne@69 757 public:
jpayne@69 758 ForkBranchBase(OwnForkHubBase&& hub);
jpayne@69 759 ~ForkBranchBase() noexcept(false);
jpayne@69 760
jpayne@69 761 void hubReady() noexcept;
jpayne@69 762 // Called by the hub to indicate that it is ready.
jpayne@69 763
jpayne@69 764 // implements PromiseNode ------------------------------------------
jpayne@69 765 void onReady(Event* event) noexcept override;
jpayne@69 766 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 767
jpayne@69 768 protected:
jpayne@69 769 inline ExceptionOrValue& getHubResultRef();
jpayne@69 770
jpayne@69 771 void releaseHub(ExceptionOrValue& output);
jpayne@69 772 // Release the hub. If an exception is thrown, add it to `output`.
jpayne@69 773
jpayne@69 774 private:
jpayne@69 775 OnReadyEvent onReadyEvent;
jpayne@69 776
jpayne@69 777 OwnForkHubBase hub;
jpayne@69 778 ForkBranchBase* next = nullptr;
jpayne@69 779 ForkBranchBase** prevPtr = nullptr;
jpayne@69 780
jpayne@69 781 friend class ForkHubBase;
jpayne@69 782 };
jpayne@69 783
jpayne@69 784 template <typename T> T copyOrAddRef(T& t) { return t; }
jpayne@69 785 template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); }
jpayne@69 786 template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) {
jpayne@69 787 return t.map([](Own<T>& ptr) {
jpayne@69 788 return ptr->addRef();
jpayne@69 789 });
jpayne@69 790 }
jpayne@69 791
jpayne@69 792 template <typename T>
jpayne@69 793 class ForkBranch final: public ForkBranchBase {
jpayne@69 794 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
jpayne@69 795 // a const reference.
jpayne@69 796
jpayne@69 797 public:
jpayne@69 798 ForkBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {}
jpayne@69 799 void destroy() override { freePromise(this); }
jpayne@69 800
jpayne@69 801 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 802 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
jpayne@69 803 KJ_IF_MAYBE(value, hubResult.value) {
jpayne@69 804 output.as<T>().value = copyOrAddRef(*value);
jpayne@69 805 } else {
jpayne@69 806 output.as<T>().value = nullptr;
jpayne@69 807 }
jpayne@69 808 output.exception = hubResult.exception;
jpayne@69 809 releaseHub(output);
jpayne@69 810 }
jpayne@69 811 };
jpayne@69 812
jpayne@69 813 template <typename T, size_t index>
jpayne@69 814 class SplitBranch final: public ForkBranchBase {
jpayne@69 815 // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
jpayne@69 816 // a const reference.
jpayne@69 817
jpayne@69 818 public:
jpayne@69 819 SplitBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {}
jpayne@69 820 void destroy() override { freePromise(this); }
jpayne@69 821
jpayne@69 822 typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element;
jpayne@69 823
jpayne@69 824 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 825 ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
jpayne@69 826 KJ_IF_MAYBE(value, hubResult.value) {
jpayne@69 827 output.as<Element>().value = kj::mv(kj::get<index>(*value));
jpayne@69 828 } else {
jpayne@69 829 output.as<Element>().value = nullptr;
jpayne@69 830 }
jpayne@69 831 output.exception = hubResult.exception;
jpayne@69 832 releaseHub(output);
jpayne@69 833 }
jpayne@69 834 };
jpayne@69 835
jpayne@69 836 // -------------------------------------------------------------------
jpayne@69 837
jpayne@69 838 class ForkHubBase: public PromiseArenaMember, protected Event {
jpayne@69 839 public:
jpayne@69 840 ForkHubBase(OwnPromiseNode&& inner, ExceptionOrValue& resultRef, SourceLocation location);
jpayne@69 841
jpayne@69 842 inline ExceptionOrValue& getResultRef() { return resultRef; }
jpayne@69 843
jpayne@69 844 inline bool isShared() const { return refcount > 1; }
jpayne@69 845
jpayne@69 846 Own<ForkHubBase, ForkHubBase> addRef() {
jpayne@69 847 ++refcount;
jpayne@69 848 return Own<ForkHubBase, ForkHubBase>(this);
jpayne@69 849 }
jpayne@69 850
jpayne@69 851 static void dispose(ForkHubBase* obj) {
jpayne@69 852 if (--obj->refcount == 0) {
jpayne@69 853 PromiseDisposer::dispose(obj);
jpayne@69 854 }
jpayne@69 855 }
jpayne@69 856
jpayne@69 857 private:
jpayne@69 858 uint refcount = 1;
jpayne@69 859 // We manually implement refcounting for ForkHubBase so that we can use it together with
jpayne@69 860 // PromiseDisposer's arena allocation.
jpayne@69 861
jpayne@69 862 OwnPromiseNode inner;
jpayne@69 863 ExceptionOrValue& resultRef;
jpayne@69 864
jpayne@69 865 ForkBranchBase* headBranch = nullptr;
jpayne@69 866 ForkBranchBase** tailBranch = &headBranch;
jpayne@69 867 // Tail becomes null once the inner promise is ready and all branches have been notified.
jpayne@69 868
jpayne@69 869 Maybe<Own<Event>> fire() override;
jpayne@69 870 void traceEvent(TraceBuilder& builder) override;
jpayne@69 871
jpayne@69 872 friend class ForkBranchBase;
jpayne@69 873 };
jpayne@69 874
jpayne@69 875 template <typename T>
jpayne@69 876 class ForkHub final: public ForkHubBase {
jpayne@69 877 // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces
jpayne@69 878 // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if
jpayne@69 879 // possible).
jpayne@69 880
jpayne@69 881 public:
jpayne@69 882 ForkHub(OwnPromiseNode&& inner, SourceLocation location)
jpayne@69 883 : ForkHubBase(kj::mv(inner), result, location) {}
jpayne@69 884 void destroy() override { freePromise(this); }
jpayne@69 885
jpayne@69 886 Promise<_::UnfixVoid<T>> addBranch() {
jpayne@69 887 return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>(
jpayne@69 888 allocPromise<ForkBranch<T>>(addRef()));
jpayne@69 889 }
jpayne@69 890
jpayne@69 891 _::SplitTuplePromise<T> split(SourceLocation location) {
jpayne@69 892 return splitImpl(MakeIndexes<tupleSize<T>()>(), location);
jpayne@69 893 }
jpayne@69 894
jpayne@69 895 private:
jpayne@69 896 ExceptionOr<T> result;
jpayne@69 897
jpayne@69 898 template <size_t... indexes>
jpayne@69 899 _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>, SourceLocation location) {
jpayne@69 900 return kj::tuple(addSplit<indexes>(location)...);
jpayne@69 901 }
jpayne@69 902
jpayne@69 903 template <size_t index>
jpayne@69 904 ReducePromises<typename SplitBranch<T, index>::Element> addSplit(SourceLocation location) {
jpayne@69 905 return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>(
jpayne@69 906 maybeChain(allocPromise<SplitBranch<T, index>>(addRef()),
jpayne@69 907 implicitCast<typename SplitBranch<T, index>::Element*>(nullptr),
jpayne@69 908 location));
jpayne@69 909 }
jpayne@69 910 };
jpayne@69 911
jpayne@69 912 inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
jpayne@69 913 return hub->getResultRef();
jpayne@69 914 }
jpayne@69 915
jpayne@69 916 // -------------------------------------------------------------------
jpayne@69 917
jpayne@69 918 class ChainPromiseNode final: public PromiseNode, public Event {
jpayne@69 919 // Promise node which reduces Promise<Promise<T>> to Promise<T>.
jpayne@69 920 //
jpayne@69 921 // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to
jpayne@69 922 // Own<Event>. Ugh, templates and private...
jpayne@69 923
jpayne@69 924 public:
jpayne@69 925 explicit ChainPromiseNode(OwnPromiseNode inner, SourceLocation location);
jpayne@69 926 ~ChainPromiseNode() noexcept(false);
jpayne@69 927 void destroy() override;
jpayne@69 928
jpayne@69 929 void onReady(Event* event) noexcept override;
jpayne@69 930 void setSelfPointer(OwnPromiseNode* selfPtr) noexcept override;
jpayne@69 931 void get(ExceptionOrValue& output) noexcept override;
jpayne@69 932 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 933
jpayne@69 934 private:
jpayne@69 935 enum State {
jpayne@69 936 STEP1,
jpayne@69 937 STEP2
jpayne@69 938 };
jpayne@69 939
jpayne@69 940 State state;
jpayne@69 941
jpayne@69 942 OwnPromiseNode inner;
jpayne@69 943 // In STEP1, a PromiseNode for a Promise<T>.
jpayne@69 944 // In STEP2, a PromiseNode for a T.
jpayne@69 945
jpayne@69 946 Event* onReadyEvent = nullptr;
jpayne@69 947 OwnPromiseNode* selfPtr = nullptr;
jpayne@69 948
jpayne@69 949 Maybe<Own<Event>> fire() override;
jpayne@69 950 void traceEvent(TraceBuilder& builder) override;
jpayne@69 951 };
jpayne@69 952
jpayne@69 953 template <typename T>
jpayne@69 954 OwnPromiseNode maybeChain(OwnPromiseNode&& node, Promise<T>*, SourceLocation location) {
jpayne@69 955 return appendPromise<ChainPromiseNode>(kj::mv(node), location);
jpayne@69 956 }
jpayne@69 957
jpayne@69 958 template <typename T>
jpayne@69 959 OwnPromiseNode&& maybeChain(OwnPromiseNode&& node, T*, SourceLocation location) {
jpayne@69 960 return kj::mv(node);
jpayne@69 961 }
jpayne@69 962
jpayne@69 963 template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))>
jpayne@69 964 inline Result maybeReduce(Promise<T>&& promise, bool) {
jpayne@69 965 return T::reducePromise(kj::mv(promise));
jpayne@69 966 }
jpayne@69 967
jpayne@69 968 template <typename T>
jpayne@69 969 inline Promise<T> maybeReduce(Promise<T>&& promise, ...) {
jpayne@69 970 return kj::mv(promise);
jpayne@69 971 }
jpayne@69 972
jpayne@69 973 // -------------------------------------------------------------------
jpayne@69 974
jpayne@69 975 class ExclusiveJoinPromiseNode final: public PromiseNode {
jpayne@69 976 public:
jpayne@69 977 ExclusiveJoinPromiseNode(OwnPromiseNode left, OwnPromiseNode right, SourceLocation location);
jpayne@69 978 ~ExclusiveJoinPromiseNode() noexcept(false);
jpayne@69 979 void destroy() override;
jpayne@69 980
jpayne@69 981 void onReady(Event* event) noexcept override;
jpayne@69 982 void get(ExceptionOrValue& output) noexcept override;
jpayne@69 983 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 984
jpayne@69 985 private:
jpayne@69 986 class Branch: public Event {
jpayne@69 987 public:
jpayne@69 988 Branch(ExclusiveJoinPromiseNode& joinNode, OwnPromiseNode dependency,
jpayne@69 989 SourceLocation location);
jpayne@69 990 ~Branch() noexcept(false);
jpayne@69 991
jpayne@69 992 bool get(ExceptionOrValue& output);
jpayne@69 993 // Returns true if this is the side that finished.
jpayne@69 994
jpayne@69 995 Maybe<Own<Event>> fire() override;
jpayne@69 996 void traceEvent(TraceBuilder& builder) override;
jpayne@69 997
jpayne@69 998 private:
jpayne@69 999 ExclusiveJoinPromiseNode& joinNode;
jpayne@69 1000 OwnPromiseNode dependency;
jpayne@69 1001
jpayne@69 1002 friend class ExclusiveJoinPromiseNode;
jpayne@69 1003 };
jpayne@69 1004
jpayne@69 1005 Branch left;
jpayne@69 1006 Branch right;
jpayne@69 1007 OnReadyEvent onReadyEvent;
jpayne@69 1008 };
jpayne@69 1009
jpayne@69 1010 // -------------------------------------------------------------------
jpayne@69 1011
jpayne@69 1012 enum class ArrayJoinBehavior {
jpayne@69 1013 LAZY,
jpayne@69 1014 EAGER,
jpayne@69 1015 };
jpayne@69 1016
jpayne@69 1017 class ArrayJoinPromiseNodeBase: public PromiseNode {
jpayne@69 1018 public:
jpayne@69 1019 ArrayJoinPromiseNodeBase(Array<OwnPromiseNode> promises,
jpayne@69 1020 ExceptionOrValue* resultParts, size_t partSize,
jpayne@69 1021 SourceLocation location,
jpayne@69 1022 ArrayJoinBehavior joinBehavior);
jpayne@69 1023 ~ArrayJoinPromiseNodeBase() noexcept(false);
jpayne@69 1024
jpayne@69 1025 void onReady(Event* event) noexcept override final;
jpayne@69 1026 void get(ExceptionOrValue& output) noexcept override final;
jpayne@69 1027 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final;
jpayne@69 1028
jpayne@69 1029 protected:
jpayne@69 1030 virtual void getNoError(ExceptionOrValue& output) noexcept = 0;
jpayne@69 1031 // Called to compile the result only in the case where there were no errors.
jpayne@69 1032
jpayne@69 1033 private:
jpayne@69 1034 const ArrayJoinBehavior joinBehavior;
jpayne@69 1035
jpayne@69 1036 uint countLeft;
jpayne@69 1037 OnReadyEvent onReadyEvent;
jpayne@69 1038 bool armed = false;
jpayne@69 1039
jpayne@69 1040 class Branch final: public Event {
jpayne@69 1041 public:
jpayne@69 1042 Branch(ArrayJoinPromiseNodeBase& joinNode, OwnPromiseNode dependency,
jpayne@69 1043 ExceptionOrValue& output, SourceLocation location);
jpayne@69 1044 ~Branch() noexcept(false);
jpayne@69 1045
jpayne@69 1046 Maybe<Own<Event>> fire() override;
jpayne@69 1047 void traceEvent(TraceBuilder& builder) override;
jpayne@69 1048
jpayne@69 1049 private:
jpayne@69 1050 ArrayJoinPromiseNodeBase& joinNode;
jpayne@69 1051 OwnPromiseNode dependency;
jpayne@69 1052 ExceptionOrValue& output;
jpayne@69 1053
jpayne@69 1054 friend class ArrayJoinPromiseNodeBase;
jpayne@69 1055 };
jpayne@69 1056
jpayne@69 1057 Array<Branch> branches;
jpayne@69 1058 };
jpayne@69 1059
jpayne@69 1060 template <typename T>
jpayne@69 1061 class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase {
jpayne@69 1062 public:
jpayne@69 1063 ArrayJoinPromiseNode(Array<OwnPromiseNode> promises,
jpayne@69 1064 Array<ExceptionOr<T>> resultParts,
jpayne@69 1065 SourceLocation location,
jpayne@69 1066 ArrayJoinBehavior joinBehavior)
jpayne@69 1067 : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>),
jpayne@69 1068 location, joinBehavior),
jpayne@69 1069 resultParts(kj::mv(resultParts)) {}
jpayne@69 1070 void destroy() override { freePromise(this); }
jpayne@69 1071
jpayne@69 1072 protected:
jpayne@69 1073 void getNoError(ExceptionOrValue& output) noexcept override {
jpayne@69 1074 auto builder = heapArrayBuilder<T>(resultParts.size());
jpayne@69 1075 for (auto& part: resultParts) {
jpayne@69 1076 KJ_IASSERT(part.value != nullptr,
jpayne@69 1077 "Bug in KJ promise framework: Promise result had neither value no exception.");
jpayne@69 1078 builder.add(kj::mv(*_::readMaybe(part.value)));
jpayne@69 1079 }
jpayne@69 1080 output.as<Array<T>>() = builder.finish();
jpayne@69 1081 }
jpayne@69 1082
jpayne@69 1083 private:
jpayne@69 1084 Array<ExceptionOr<T>> resultParts;
jpayne@69 1085 };
jpayne@69 1086
jpayne@69 1087 template <>
jpayne@69 1088 class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase {
jpayne@69 1089 public:
jpayne@69 1090 ArrayJoinPromiseNode(Array<OwnPromiseNode> promises,
jpayne@69 1091 Array<ExceptionOr<_::Void>> resultParts,
jpayne@69 1092 SourceLocation location,
jpayne@69 1093 ArrayJoinBehavior joinBehavior);
jpayne@69 1094 ~ArrayJoinPromiseNode();
jpayne@69 1095 void destroy() override;
jpayne@69 1096
jpayne@69 1097 protected:
jpayne@69 1098 void getNoError(ExceptionOrValue& output) noexcept override;
jpayne@69 1099
jpayne@69 1100 private:
jpayne@69 1101 Array<ExceptionOr<_::Void>> resultParts;
jpayne@69 1102 };
jpayne@69 1103
jpayne@69 1104 // -------------------------------------------------------------------
jpayne@69 1105
jpayne@69 1106 class EagerPromiseNodeBase: public PromiseNode, protected Event {
jpayne@69 1107 // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
jpayne@69 1108 // evaluate it.
jpayne@69 1109
jpayne@69 1110 public:
jpayne@69 1111 EagerPromiseNodeBase(OwnPromiseNode&& dependency, ExceptionOrValue& resultRef,
jpayne@69 1112 SourceLocation location);
jpayne@69 1113
jpayne@69 1114 void onReady(Event* event) noexcept override;
jpayne@69 1115 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 1116
jpayne@69 1117 private:
jpayne@69 1118 OwnPromiseNode dependency;
jpayne@69 1119 OnReadyEvent onReadyEvent;
jpayne@69 1120
jpayne@69 1121 ExceptionOrValue& resultRef;
jpayne@69 1122
jpayne@69 1123 Maybe<Own<Event>> fire() override;
jpayne@69 1124 void traceEvent(TraceBuilder& builder) override;
jpayne@69 1125 };
jpayne@69 1126
jpayne@69 1127 template <typename T>
jpayne@69 1128 class EagerPromiseNode final: public EagerPromiseNodeBase {
jpayne@69 1129 public:
jpayne@69 1130 EagerPromiseNode(OwnPromiseNode&& dependency, SourceLocation location)
jpayne@69 1131 : EagerPromiseNodeBase(kj::mv(dependency), result, location) {}
jpayne@69 1132 void destroy() override { freePromise(this); }
jpayne@69 1133
jpayne@69 1134 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 1135 output.as<T>() = kj::mv(result);
jpayne@69 1136 }
jpayne@69 1137
jpayne@69 1138 private:
jpayne@69 1139 ExceptionOr<T> result;
jpayne@69 1140 };
jpayne@69 1141
jpayne@69 1142 template <typename T>
jpayne@69 1143 OwnPromiseNode spark(OwnPromiseNode&& node, SourceLocation location) {
jpayne@69 1144 // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
jpayne@69 1145 // on it.
jpayne@69 1146 return appendPromise<EagerPromiseNode<T>>(kj::mv(node), location);
jpayne@69 1147 }
jpayne@69 1148
jpayne@69 1149 // -------------------------------------------------------------------
jpayne@69 1150
jpayne@69 1151 class AdapterPromiseNodeBase: public PromiseNode {
jpayne@69 1152 public:
jpayne@69 1153 void onReady(Event* event) noexcept override;
jpayne@69 1154 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 1155
jpayne@69 1156 protected:
jpayne@69 1157 inline void setReady() {
jpayne@69 1158 onReadyEvent.arm();
jpayne@69 1159 }
jpayne@69 1160
jpayne@69 1161 private:
jpayne@69 1162 OnReadyEvent onReadyEvent;
jpayne@69 1163 };
jpayne@69 1164
jpayne@69 1165 template <typename T, typename Adapter>
jpayne@69 1166 class AdapterPromiseNode final: public AdapterPromiseNodeBase,
jpayne@69 1167 private PromiseFulfiller<UnfixVoid<T>> {
jpayne@69 1168 // A PromiseNode that wraps a PromiseAdapter.
jpayne@69 1169
jpayne@69 1170 public:
jpayne@69 1171 template <typename... Params>
jpayne@69 1172 AdapterPromiseNode(Params&&... params)
jpayne@69 1173 : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
jpayne@69 1174 void destroy() override { freePromise(this); }
jpayne@69 1175
jpayne@69 1176 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 1177 KJ_IREQUIRE(!isWaiting());
jpayne@69 1178 output.as<T>() = kj::mv(result);
jpayne@69 1179 }
jpayne@69 1180
jpayne@69 1181 private:
jpayne@69 1182 ExceptionOr<T> result;
jpayne@69 1183 bool waiting = true;
jpayne@69 1184 Adapter adapter;
jpayne@69 1185
jpayne@69 1186 void fulfill(T&& value) override {
jpayne@69 1187 if (waiting) {
jpayne@69 1188 waiting = false;
jpayne@69 1189 result = ExceptionOr<T>(kj::mv(value));
jpayne@69 1190 setReady();
jpayne@69 1191 }
jpayne@69 1192 }
jpayne@69 1193
jpayne@69 1194 void reject(Exception&& exception) override {
jpayne@69 1195 if (waiting) {
jpayne@69 1196 waiting = false;
jpayne@69 1197 result = ExceptionOr<T>(false, kj::mv(exception));
jpayne@69 1198 setReady();
jpayne@69 1199 }
jpayne@69 1200 }
jpayne@69 1201
jpayne@69 1202 bool isWaiting() override {
jpayne@69 1203 return waiting;
jpayne@69 1204 }
jpayne@69 1205 };
jpayne@69 1206
jpayne@69 1207 // -------------------------------------------------------------------
jpayne@69 1208
jpayne@69 1209 class FiberBase: public PromiseNode, private Event {
jpayne@69 1210 // Base class for the outer PromiseNode representing a fiber.
jpayne@69 1211
jpayne@69 1212 public:
jpayne@69 1213 explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location);
jpayne@69 1214 explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location);
jpayne@69 1215 ~FiberBase() noexcept(false);
jpayne@69 1216
jpayne@69 1217 void start() { armDepthFirst(); }
jpayne@69 1218 // Call immediately after construction to begin executing the fiber.
jpayne@69 1219
jpayne@69 1220 class WaitDoneEvent;
jpayne@69 1221
jpayne@69 1222 void onReady(_::Event* event) noexcept override;
jpayne@69 1223 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 1224
jpayne@69 1225 protected:
jpayne@69 1226 bool isFinished() { return state == FINISHED; }
jpayne@69 1227 void cancel();
jpayne@69 1228
jpayne@69 1229 private:
jpayne@69 1230 enum { WAITING, RUNNING, CANCELED, FINISHED } state;
jpayne@69 1231
jpayne@69 1232 _::PromiseNode* currentInner = nullptr;
jpayne@69 1233 OnReadyEvent onReadyEvent;
jpayne@69 1234 Own<FiberStack> stack;
jpayne@69 1235 _::ExceptionOrValue& result;
jpayne@69 1236
jpayne@69 1237 void run();
jpayne@69 1238 virtual void runImpl(WaitScope& waitScope) = 0;
jpayne@69 1239
jpayne@69 1240 Maybe<Own<Event>> fire() override;
jpayne@69 1241 void traceEvent(TraceBuilder& builder) override;
jpayne@69 1242 // Implements Event. Each time the event is fired, switchToFiber() is called.
jpayne@69 1243
jpayne@69 1244 friend class FiberStack;
jpayne@69 1245 friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result,
jpayne@69 1246 WaitScope& waitScope, SourceLocation location);
jpayne@69 1247 friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location);
jpayne@69 1248 };
jpayne@69 1249
jpayne@69 1250 template <typename Func>
jpayne@69 1251 class Fiber final: public FiberBase {
jpayne@69 1252 public:
jpayne@69 1253 explicit Fiber(size_t stackSize, Func&& func, SourceLocation location)
jpayne@69 1254 : FiberBase(stackSize, result, location), func(kj::fwd<Func>(func)) {}
jpayne@69 1255 explicit Fiber(const FiberPool& pool, Func&& func, SourceLocation location)
jpayne@69 1256 : FiberBase(pool, result, location), func(kj::fwd<Func>(func)) {}
jpayne@69 1257 ~Fiber() noexcept(false) { cancel(); }
jpayne@69 1258 void destroy() override { freePromise(this); }
jpayne@69 1259
jpayne@69 1260 typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType;
jpayne@69 1261
jpayne@69 1262 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 1263 KJ_IREQUIRE(isFinished());
jpayne@69 1264 output.as<ResultType>() = kj::mv(result);
jpayne@69 1265 }
jpayne@69 1266
jpayne@69 1267 private:
jpayne@69 1268 Func func;
jpayne@69 1269 ExceptionOr<ResultType> result;
jpayne@69 1270
jpayne@69 1271 void runImpl(WaitScope& waitScope) override {
jpayne@69 1272 result.template as<ResultType>() =
jpayne@69 1273 MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope);
jpayne@69 1274 }
jpayne@69 1275 };
jpayne@69 1276
jpayne@69 1277 } // namespace _ (private)
jpayne@69 1278
jpayne@69 1279 // =======================================================================================
jpayne@69 1280
jpayne@69 1281 template <typename T>
jpayne@69 1282 Promise<T>::Promise(_::FixVoid<T> value)
jpayne@69 1283 : PromiseBase(_::allocPromise<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
jpayne@69 1284
jpayne@69 1285 template <typename T>
jpayne@69 1286 Promise<T>::Promise(kj::Exception&& exception)
jpayne@69 1287 : PromiseBase(_::allocPromise<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {}
jpayne@69 1288
jpayne@69 1289 template <typename T>
jpayne@69 1290 template <typename Func, typename ErrorFunc>
jpayne@69 1291 PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler,
jpayne@69 1292 SourceLocation location) {
jpayne@69 1293 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
jpayne@69 1294
jpayne@69 1295 void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func);
jpayne@69 1296 _::OwnPromiseNode intermediate =
jpayne@69 1297 _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
jpayne@69 1298 kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler),
jpayne@69 1299 continuationTracePtr);
jpayne@69 1300 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
jpayne@69 1301 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
jpayne@69 1302 return _::maybeReduce(kj::mv(result), false);
jpayne@69 1303 }
jpayne@69 1304
jpayne@69 1305 namespace _ { // private
jpayne@69 1306
jpayne@69 1307 template <typename T>
jpayne@69 1308 struct IdentityFunc {
jpayne@69 1309 inline T operator()(T&& value) const {
jpayne@69 1310 return kj::mv(value);
jpayne@69 1311 }
jpayne@69 1312 };
jpayne@69 1313 template <typename T>
jpayne@69 1314 struct IdentityFunc<Promise<T>> {
jpayne@69 1315 inline Promise<T> operator()(T&& value) const {
jpayne@69 1316 return kj::mv(value);
jpayne@69 1317 }
jpayne@69 1318 };
jpayne@69 1319 template <>
jpayne@69 1320 struct IdentityFunc<void> {
jpayne@69 1321 inline void operator()() const {}
jpayne@69 1322 };
jpayne@69 1323 template <>
jpayne@69 1324 struct IdentityFunc<Promise<void>> {
jpayne@69 1325 Promise<void> operator()() const;
jpayne@69 1326 // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly,
jpayne@69 1327 // Cap'n Proto relies on being able to include this header without creating such a link-time
jpayne@69 1328 // dependency.
jpayne@69 1329 };
jpayne@69 1330
jpayne@69 1331 } // namespace _ (private)
jpayne@69 1332
jpayne@69 1333 template <typename T>
jpayne@69 1334 template <typename ErrorFunc>
jpayne@69 1335 Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler, SourceLocation location) {
jpayne@69 1336 // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case,
jpayne@69 1337 // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise,
jpayne@69 1338 // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually
jpayne@69 1339 // return a promise. So we make our Func return match ErrorFunc.
jpayne@69 1340 typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func;
jpayne@69 1341 typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
jpayne@69 1342
jpayne@69 1343 // The reason catch_() isn't simply implemented in terms of then() is because we want the trace
jpayne@69 1344 // pointer to be based on ErrorFunc rather than Func.
jpayne@69 1345 void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler);
jpayne@69 1346 _::OwnPromiseNode intermediate =
jpayne@69 1347 _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
jpayne@69 1348 kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr);
jpayne@69 1349 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
jpayne@69 1350 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
jpayne@69 1351 return _::maybeReduce(kj::mv(result), false);
jpayne@69 1352 }
jpayne@69 1353
jpayne@69 1354 template <typename T>
jpayne@69 1355 T Promise<T>::wait(WaitScope& waitScope, SourceLocation location) {
jpayne@69 1356 _::ExceptionOr<_::FixVoid<T>> result;
jpayne@69 1357 _::waitImpl(kj::mv(node), result, waitScope, location);
jpayne@69 1358 return convertToReturn(kj::mv(result));
jpayne@69 1359 }
jpayne@69 1360
jpayne@69 1361 template <typename T>
jpayne@69 1362 bool Promise<T>::poll(WaitScope& waitScope, SourceLocation location) {
jpayne@69 1363 return _::pollImpl(*node, waitScope, location);
jpayne@69 1364 }
jpayne@69 1365
jpayne@69 1366 template <typename T>
jpayne@69 1367 ForkedPromise<T> Promise<T>::fork(SourceLocation location) {
jpayne@69 1368 return ForkedPromise<T>(false,
jpayne@69 1369 _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>(kj::mv(node), location));
jpayne@69 1370 }
jpayne@69 1371
jpayne@69 1372 template <typename T>
jpayne@69 1373 Promise<T> ForkedPromise<T>::addBranch() {
jpayne@69 1374 return hub->addBranch();
jpayne@69 1375 }
jpayne@69 1376
jpayne@69 1377 template <typename T>
jpayne@69 1378 bool ForkedPromise<T>::hasBranches() {
jpayne@69 1379 return hub->isShared();
jpayne@69 1380 }
jpayne@69 1381
jpayne@69 1382 template <typename T>
jpayne@69 1383 _::SplitTuplePromise<T> Promise<T>::split(SourceLocation location) {
jpayne@69 1384 return _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>(
jpayne@69 1385 kj::mv(node), location)->split(location);
jpayne@69 1386 }
jpayne@69 1387
jpayne@69 1388 template <typename T>
jpayne@69 1389 Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other, SourceLocation location) {
jpayne@69 1390 return Promise(false, _::appendPromise<_::ExclusiveJoinPromiseNode>(
jpayne@69 1391 kj::mv(node), kj::mv(other.node), location));
jpayne@69 1392 }
jpayne@69 1393
jpayne@69 1394 template <typename T>
jpayne@69 1395 template <typename... Attachments>
jpayne@69 1396 Promise<T> Promise<T>::attach(Attachments&&... attachments) {
jpayne@69 1397 return Promise(false, _::appendPromise<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
jpayne@69 1398 kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...)));
jpayne@69 1399 }
jpayne@69 1400
jpayne@69 1401 template <typename T>
jpayne@69 1402 template <typename ErrorFunc>
jpayne@69 1403 Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location) {
jpayne@69 1404 // See catch_() for commentary.
jpayne@69 1405 return Promise(false, _::spark<_::FixVoid<T>>(then(
jpayne@69 1406 _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(),
jpayne@69 1407 kj::fwd<ErrorFunc>(errorHandler)).node, location));
jpayne@69 1408 }
jpayne@69 1409
jpayne@69 1410 template <typename T>
jpayne@69 1411 Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr), SourceLocation location) {
jpayne@69 1412 return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node), location));
jpayne@69 1413 }
jpayne@69 1414
jpayne@69 1415 template <typename T>
jpayne@69 1416 kj::String Promise<T>::trace() {
jpayne@69 1417 return PromiseBase::trace();
jpayne@69 1418 }
jpayne@69 1419
jpayne@69 1420 template <typename T, T value>
jpayne@69 1421 inline Promise<T> constPromise() {
jpayne@69 1422 static _::ConstPromiseNode<T, value> NODE;
jpayne@69 1423 return _::PromiseNode::to<Promise<T>>(_::OwnPromiseNode(&NODE));
jpayne@69 1424 }
jpayne@69 1425
jpayne@69 1426 template <typename Func>
jpayne@69 1427 inline PromiseForResult<Func, void> evalLater(Func&& func) {
jpayne@69 1428 return _::yield().then(kj::fwd<Func>(func), _::PropagateException());
jpayne@69 1429 }
jpayne@69 1430
jpayne@69 1431 template <typename Func>
jpayne@69 1432 inline PromiseForResult<Func, void> evalLast(Func&& func) {
jpayne@69 1433 return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException());
jpayne@69 1434 }
jpayne@69 1435
jpayne@69 1436 template <typename Func>
jpayne@69 1437 inline PromiseForResult<Func, void> evalNow(Func&& func) {
jpayne@69 1438 PromiseForResult<Func, void> result = nullptr;
jpayne@69 1439 KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() {
jpayne@69 1440 result = func();
jpayne@69 1441 })) {
jpayne@69 1442 result = kj::mv(*e);
jpayne@69 1443 }
jpayne@69 1444 return result;
jpayne@69 1445 }
jpayne@69 1446
jpayne@69 1447 template <typename Func>
jpayne@69 1448 struct RetryOnDisconnect_ {
jpayne@69 1449 static inline PromiseForResult<Func, void> apply(Func&& func) {
jpayne@69 1450 return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> {
jpayne@69 1451 auto promise = evalNow(func);
jpayne@69 1452 return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> {
jpayne@69 1453 if (e.getType() == kj::Exception::Type::DISCONNECTED) {
jpayne@69 1454 return func();
jpayne@69 1455 } else {
jpayne@69 1456 return kj::mv(e);
jpayne@69 1457 }
jpayne@69 1458 });
jpayne@69 1459 });
jpayne@69 1460 }
jpayne@69 1461 };
jpayne@69 1462 template <typename Func>
jpayne@69 1463 struct RetryOnDisconnect_<Func&> {
jpayne@69 1464 // Specialization for references. Needed because the syntax for capturing references in a
jpayne@69 1465 // lambda is different. :(
jpayne@69 1466 static inline PromiseForResult<Func, void> apply(Func& func) {
jpayne@69 1467 auto promise = evalLater(func);
jpayne@69 1468 return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> {
jpayne@69 1469 if (e.getType() == kj::Exception::Type::DISCONNECTED) {
jpayne@69 1470 return func();
jpayne@69 1471 } else {
jpayne@69 1472 return kj::mv(e);
jpayne@69 1473 }
jpayne@69 1474 });
jpayne@69 1475 }
jpayne@69 1476 };
jpayne@69 1477
jpayne@69 1478 template <typename Func>
jpayne@69 1479 inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) {
jpayne@69 1480 return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func));
jpayne@69 1481 }
jpayne@69 1482
jpayne@69 1483 template <typename Func>
jpayne@69 1484 inline PromiseForResult<Func, WaitScope&> startFiber(
jpayne@69 1485 size_t stackSize, Func&& func, SourceLocation location) {
jpayne@69 1486 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
jpayne@69 1487
jpayne@69 1488 auto intermediate = _::allocPromise<_::Fiber<Func>>(
jpayne@69 1489 stackSize, kj::fwd<Func>(func), location);
jpayne@69 1490 intermediate->start();
jpayne@69 1491 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
jpayne@69 1492 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
jpayne@69 1493 return _::maybeReduce(kj::mv(result), false);
jpayne@69 1494 }
jpayne@69 1495
jpayne@69 1496 template <typename Func>
jpayne@69 1497 inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber(
jpayne@69 1498 Func&& func, SourceLocation location) const {
jpayne@69 1499 typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
jpayne@69 1500
jpayne@69 1501 auto intermediate = _::allocPromise<_::Fiber<Func>>(
jpayne@69 1502 *this, kj::fwd<Func>(func), location);
jpayne@69 1503 intermediate->start();
jpayne@69 1504 auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
jpayne@69 1505 _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
jpayne@69 1506 return _::maybeReduce(kj::mv(result), false);
jpayne@69 1507 }
jpayne@69 1508
jpayne@69 1509 template <typename T>
jpayne@69 1510 template <typename ErrorFunc>
jpayne@69 1511 void Promise<T>::detach(ErrorFunc&& errorHandler) {
jpayne@69 1512 return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
jpayne@69 1513 }
jpayne@69 1514
jpayne@69 1515 template <>
jpayne@69 1516 template <typename ErrorFunc>
jpayne@69 1517 void Promise<void>::detach(ErrorFunc&& errorHandler) {
jpayne@69 1518 return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
jpayne@69 1519 }
jpayne@69 1520
jpayne@69 1521 template <typename T>
jpayne@69 1522 Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location) {
jpayne@69 1523 return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>(
jpayne@69 1524 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
jpayne@69 1525 heapArray<_::ExceptionOr<T>>(promises.size()), location,
jpayne@69 1526 _::ArrayJoinBehavior::LAZY));
jpayne@69 1527 }
jpayne@69 1528
jpayne@69 1529 template <typename T>
jpayne@69 1530 Promise<Array<T>> joinPromisesFailFast(Array<Promise<T>>&& promises, SourceLocation location) {
jpayne@69 1531 return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>(
jpayne@69 1532 KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
jpayne@69 1533 heapArray<_::ExceptionOr<T>>(promises.size()), location,
jpayne@69 1534 _::ArrayJoinBehavior::EAGER));
jpayne@69 1535 }
jpayne@69 1536
jpayne@69 1537 // =======================================================================================
jpayne@69 1538
jpayne@69 1539 namespace _ { // private
jpayne@69 1540
jpayne@69 1541 class WeakFulfillerBase: protected kj::Disposer {
jpayne@69 1542 protected:
jpayne@69 1543 WeakFulfillerBase(): inner(nullptr) {}
jpayne@69 1544 virtual ~WeakFulfillerBase() noexcept(false) {}
jpayne@69 1545
jpayne@69 1546 template <typename T>
jpayne@69 1547 inline PromiseFulfiller<T>* getInner() {
jpayne@69 1548 return static_cast<PromiseFulfiller<T>*>(inner);
jpayne@69 1549 };
jpayne@69 1550 template <typename T>
jpayne@69 1551 inline void setInner(PromiseFulfiller<T>* ptr) {
jpayne@69 1552 inner = ptr;
jpayne@69 1553 };
jpayne@69 1554
jpayne@69 1555 private:
jpayne@69 1556 mutable PromiseRejector* inner;
jpayne@69 1557
jpayne@69 1558 void disposeImpl(void* pointer) const override;
jpayne@69 1559 };
jpayne@69 1560
jpayne@69 1561 template <typename T>
jpayne@69 1562 class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase {
jpayne@69 1563 // A wrapper around PromiseFulfiller which can be detached.
jpayne@69 1564 //
jpayne@69 1565 // There are a couple non-trivialities here:
jpayne@69 1566 // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
jpayne@69 1567 // rejected.
jpayne@69 1568 // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
jpayne@69 1569 // detached from the underlying fulfiller, because otherwise the later detach() call will go
jpayne@69 1570 // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the
jpayne@69 1571 // refcount never goes over 2 and we manually implement the refcounting because we need to do
jpayne@69 1572 // other special things when each side detaches anyway. To this end, WeakFulfiller is its own
jpayne@69 1573 // Disposer -- dispose() is called when the application discards its owned pointer to the
jpayne@69 1574 // fulfiller and detach() is called when the promise is destroyed.
jpayne@69 1575
jpayne@69 1576 public:
jpayne@69 1577 KJ_DISALLOW_COPY_AND_MOVE(WeakFulfiller);
jpayne@69 1578
jpayne@69 1579 static kj::Own<WeakFulfiller> make() {
jpayne@69 1580 WeakFulfiller* ptr = new WeakFulfiller;
jpayne@69 1581 return Own<WeakFulfiller>(ptr, *ptr);
jpayne@69 1582 }
jpayne@69 1583
jpayne@69 1584 void fulfill(FixVoid<T>&& value) override {
jpayne@69 1585 if (getInner<T>() != nullptr) {
jpayne@69 1586 getInner<T>()->fulfill(kj::mv(value));
jpayne@69 1587 }
jpayne@69 1588 }
jpayne@69 1589
jpayne@69 1590 void reject(Exception&& exception) override {
jpayne@69 1591 if (getInner<T>() != nullptr) {
jpayne@69 1592 getInner<T>()->reject(kj::mv(exception));
jpayne@69 1593 }
jpayne@69 1594 }
jpayne@69 1595
jpayne@69 1596 bool isWaiting() override {
jpayne@69 1597 return getInner<T>() != nullptr && getInner<T>()->isWaiting();
jpayne@69 1598 }
jpayne@69 1599
jpayne@69 1600 void attach(PromiseFulfiller<T>& newInner) {
jpayne@69 1601 setInner<T>(&newInner);
jpayne@69 1602 }
jpayne@69 1603
jpayne@69 1604 void detach(PromiseFulfiller<T>& from) {
jpayne@69 1605 if (getInner<T>() == nullptr) {
jpayne@69 1606 // Already disposed.
jpayne@69 1607 delete this;
jpayne@69 1608 } else {
jpayne@69 1609 KJ_IREQUIRE(getInner<T>() == &from);
jpayne@69 1610 setInner<T>(nullptr);
jpayne@69 1611 }
jpayne@69 1612 }
jpayne@69 1613
jpayne@69 1614 private:
jpayne@69 1615 WeakFulfiller() {}
jpayne@69 1616 };
jpayne@69 1617
jpayne@69 1618 template <typename T>
jpayne@69 1619 class PromiseAndFulfillerAdapter {
jpayne@69 1620 public:
jpayne@69 1621 PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
jpayne@69 1622 WeakFulfiller<T>& wrapper)
jpayne@69 1623 : fulfiller(fulfiller), wrapper(wrapper) {
jpayne@69 1624 wrapper.attach(fulfiller);
jpayne@69 1625 }
jpayne@69 1626
jpayne@69 1627 ~PromiseAndFulfillerAdapter() noexcept(false) {
jpayne@69 1628 wrapper.detach(fulfiller);
jpayne@69 1629 }
jpayne@69 1630
jpayne@69 1631 private:
jpayne@69 1632 PromiseFulfiller<T>& fulfiller;
jpayne@69 1633 WeakFulfiller<T>& wrapper;
jpayne@69 1634 };
jpayne@69 1635
jpayne@69 1636 } // namespace _ (private)
jpayne@69 1637
jpayne@69 1638 template <typename T>
jpayne@69 1639 template <typename Func>
jpayne@69 1640 bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) {
jpayne@69 1641 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
jpayne@69 1642 reject(kj::mv(*exception));
jpayne@69 1643 return false;
jpayne@69 1644 } else {
jpayne@69 1645 return true;
jpayne@69 1646 }
jpayne@69 1647 }
jpayne@69 1648
jpayne@69 1649 template <typename Func>
jpayne@69 1650 bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) {
jpayne@69 1651 KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
jpayne@69 1652 reject(kj::mv(*exception));
jpayne@69 1653 return false;
jpayne@69 1654 } else {
jpayne@69 1655 return true;
jpayne@69 1656 }
jpayne@69 1657 }
jpayne@69 1658
jpayne@69 1659 template <typename T, typename Adapter, typename... Params>
jpayne@69 1660 _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
jpayne@69 1661 _::OwnPromiseNode intermediate(
jpayne@69 1662 _::allocPromise<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
jpayne@69 1663 kj::fwd<Params>(adapterConstructorParams)...));
jpayne@69 1664 // We can't capture SourceLocation in this function's arguments since it is a vararg template. :(
jpayne@69 1665 return _::PromiseNode::to<_::ReducePromises<T>>(
jpayne@69 1666 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), SourceLocation()));
jpayne@69 1667 }
jpayne@69 1668
jpayne@69 1669 template <typename T>
jpayne@69 1670 PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location) {
jpayne@69 1671 auto wrapper = _::WeakFulfiller<T>::make();
jpayne@69 1672
jpayne@69 1673 _::OwnPromiseNode intermediate(
jpayne@69 1674 _::allocPromise<_::AdapterPromiseNode<
jpayne@69 1675 _::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
jpayne@69 1676 auto promise = _::PromiseNode::to<_::ReducePromises<T>>(
jpayne@69 1677 _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), location));
jpayne@69 1678
jpayne@69 1679 return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
jpayne@69 1680 }
jpayne@69 1681
jpayne@69 1682 // =======================================================================================
jpayne@69 1683 // cross-thread stuff
jpayne@69 1684
jpayne@69 1685 namespace _ { // (private)
jpayne@69 1686
jpayne@69 1687 class XThreadEvent: public PromiseNode, // it's a PromiseNode in the requesting thread
jpayne@69 1688 private Event { // it's an event in the target thread
jpayne@69 1689 public:
jpayne@69 1690 XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, EventLoop& loop,
jpayne@69 1691 void* funcTracePtr, SourceLocation location);
jpayne@69 1692
jpayne@69 1693 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 1694
jpayne@69 1695 protected:
jpayne@69 1696 void ensureDoneOrCanceled();
jpayne@69 1697 // MUST be called in destructor of subclasses to make sure the object is not destroyed while
jpayne@69 1698 // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because
jpayne@69 1699 // that destructor doesn't run until the subclass has already been destroyed.)
jpayne@69 1700
jpayne@69 1701 virtual kj::Maybe<OwnPromiseNode> execute() = 0;
jpayne@69 1702 // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
jpayne@69 1703 // returns null.
jpayne@69 1704
jpayne@69 1705 // implements PromiseNode ----------------------------------------------------
jpayne@69 1706 void onReady(Event* event) noexcept override;
jpayne@69 1707
jpayne@69 1708 private:
jpayne@69 1709 ExceptionOrValue& result;
jpayne@69 1710 void* funcTracePtr;
jpayne@69 1711
jpayne@69 1712 kj::Own<const Executor> targetExecutor;
jpayne@69 1713 Maybe<const Executor&> replyExecutor; // If executeAsync() was used.
jpayne@69 1714
jpayne@69 1715 kj::Maybe<OwnPromiseNode> promiseNode;
jpayne@69 1716 // Accessed only in target thread.
jpayne@69 1717
jpayne@69 1718 ListLink<XThreadEvent> targetLink;
jpayne@69 1719 // Membership in one of the linked lists in the target Executor's work list or cancel list. These
jpayne@69 1720 // fields are protected by the target Executor's mutex.
jpayne@69 1721
jpayne@69 1722 enum {
jpayne@69 1723 UNUSED,
jpayne@69 1724 // Object was never queued on another thread.
jpayne@69 1725
jpayne@69 1726 QUEUED,
jpayne@69 1727 // Target thread has not yet dequeued the event from the state.start list. The requesting
jpayne@69 1728 // thread can cancel execution by removing the event from the list.
jpayne@69 1729
jpayne@69 1730 EXECUTING,
jpayne@69 1731 // Target thread has dequeued the event from state.start and moved it to state.executing. To
jpayne@69 1732 // cancel, the requesting thread must add the event to the state.cancel list and change the
jpayne@69 1733 // state to CANCELING.
jpayne@69 1734
jpayne@69 1735 CANCELING,
jpayne@69 1736 // Requesting thread is trying to cancel this event. The target thread will change the state to
jpayne@69 1737 // `DONE` once canceled.
jpayne@69 1738
jpayne@69 1739 DONE
jpayne@69 1740 // Target thread has completed handling this event and will not touch it again. The requesting
jpayne@69 1741 // thread can safely delete the object. The `state` is updated to `DONE` using an atomic
jpayne@69 1742 // release operation after ensuring that the event will not be touched again, so that the
jpayne@69 1743 // requesting can safely skip locking if it observes the state is already DONE.
jpayne@69 1744 } state = UNUSED;
jpayne@69 1745 // State, which is also protected by `targetExecutor`'s mutex.
jpayne@69 1746
jpayne@69 1747 ListLink<XThreadEvent> replyLink;
jpayne@69 1748 // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The
jpayne@69 1749 // executing thread places the event in the reply list near the end of the `EXECUTING` state.
jpayne@69 1750 // Because the thread cannot lock two mutexes at once, it's possible that the reply executor
jpayne@69 1751 // will receive the reply while the event is still listed in the EXECUTING state, but it can
jpayne@69 1752 // ignore the state and proceed with the result.
jpayne@69 1753
jpayne@69 1754 OnReadyEvent onReadyEvent;
jpayne@69 1755 // Accessed only in requesting thread.
jpayne@69 1756
jpayne@69 1757 friend class kj::Executor;
jpayne@69 1758
jpayne@69 1759 void done();
jpayne@69 1760 // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT
jpayne@69 1761 // call under lock.
jpayne@69 1762
jpayne@69 1763 void sendReply();
jpayne@69 1764 // Notifies the originating thread that this event is done, but doesn't set the state to DONE
jpayne@69 1765 // yet. Do NOT call under lock.
jpayne@69 1766
jpayne@69 1767 void setDoneState();
jpayne@69 1768 // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must
jpayne@69 1769 // only be called in the destination thread, and must either be called under lock, or the thread
jpayne@69 1770 // must take the lock and release it again shortly after setting the state (because some threads
jpayne@69 1771 // may be waiting on the DONE state using a conditional wait on the mutex). After calling
jpayne@69 1772 // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs
jpayne@69 1773 // solely to the requesting thread.
jpayne@69 1774
jpayne@69 1775 void setDisconnected();
jpayne@69 1776 // Sets the result to a DISCONNECTED exception indicating that the target event loop exited.
jpayne@69 1777
jpayne@69 1778 class DelayedDoneHack;
jpayne@69 1779
jpayne@69 1780 // implements Event ----------------------------------------------------------
jpayne@69 1781 Maybe<Own<Event>> fire() override;
jpayne@69 1782 // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr,
jpayne@69 1783 // then it just indicated readiness and we need to get its result.
jpayne@69 1784
jpayne@69 1785 void traceEvent(TraceBuilder& builder) override;
jpayne@69 1786 };
jpayne@69 1787
jpayne@69 1788 template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>>
jpayne@69 1789 class XThreadEventImpl final: public XThreadEvent {
jpayne@69 1790 // Implementation for a function that does not return a Promise.
jpayne@69 1791 public:
jpayne@69 1792 XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location)
jpayne@69 1793 : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location),
jpayne@69 1794 func(kj::fwd<Func>(func)) {}
jpayne@69 1795 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
jpayne@69 1796 void destroy() override { freePromise(this); }
jpayne@69 1797
jpayne@69 1798 typedef _::FixVoid<_::ReturnType<Func, void>> ResultT;
jpayne@69 1799
jpayne@69 1800 kj::Maybe<_::OwnPromiseNode> execute() override {
jpayne@69 1801 result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void());
jpayne@69 1802 return nullptr;
jpayne@69 1803 }
jpayne@69 1804
jpayne@69 1805 // implements PromiseNode ----------------------------------------------------
jpayne@69 1806 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 1807 output.as<ResultT>() = kj::mv(result);
jpayne@69 1808 }
jpayne@69 1809
jpayne@69 1810 private:
jpayne@69 1811 Func func;
jpayne@69 1812 ExceptionOr<ResultT> result;
jpayne@69 1813 friend Executor;
jpayne@69 1814 };
jpayne@69 1815
jpayne@69 1816 template <typename Func, typename T>
jpayne@69 1817 class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent {
jpayne@69 1818 // Implementation for a function that DOES return a Promise.
jpayne@69 1819 public:
jpayne@69 1820 XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location)
jpayne@69 1821 : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location),
jpayne@69 1822 func(kj::fwd<Func>(func)) {}
jpayne@69 1823 ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
jpayne@69 1824 void destroy() override { freePromise(this); }
jpayne@69 1825
jpayne@69 1826 typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT;
jpayne@69 1827
jpayne@69 1828 kj::Maybe<_::OwnPromiseNode> execute() override {
jpayne@69 1829 auto result = _::PromiseNode::from(func());
jpayne@69 1830 KJ_IREQUIRE(result.get() != nullptr);
jpayne@69 1831 return kj::mv(result);
jpayne@69 1832 }
jpayne@69 1833
jpayne@69 1834 // implements PromiseNode ----------------------------------------------------
jpayne@69 1835 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 1836 output.as<ResultT>() = kj::mv(result);
jpayne@69 1837 }
jpayne@69 1838
jpayne@69 1839 private:
jpayne@69 1840 Func func;
jpayne@69 1841 ExceptionOr<ResultT> result;
jpayne@69 1842 friend Executor;
jpayne@69 1843 };
jpayne@69 1844
jpayne@69 1845 } // namespace _ (private)
jpayne@69 1846
jpayne@69 1847 template <typename Func>
jpayne@69 1848 _::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(
jpayne@69 1849 Func&& func, SourceLocation location) const {
jpayne@69 1850 _::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this, getLoop(), location);
jpayne@69 1851 send(event, true);
jpayne@69 1852 return convertToReturn(kj::mv(event.result));
jpayne@69 1853 }
jpayne@69 1854
jpayne@69 1855 template <typename Func>
jpayne@69 1856 PromiseForResult<Func, void> Executor::executeAsync(Func&& func, SourceLocation location) const {
jpayne@69 1857 // HACK: We call getLoop() here, rather than have XThreadEvent's constructor do it, so that if it
jpayne@69 1858 // throws we don't crash due to `allocPromise()` being `noexcept`.
jpayne@69 1859 auto event = _::allocPromise<_::XThreadEventImpl<Func>>(
jpayne@69 1860 kj::fwd<Func>(func), *this, getLoop(), location);
jpayne@69 1861 send(*event, false);
jpayne@69 1862 return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event));
jpayne@69 1863 }
jpayne@69 1864
jpayne@69 1865 // -----------------------------------------------------------------------------
jpayne@69 1866
jpayne@69 1867 namespace _ { // (private)
jpayne@69 1868
jpayne@69 1869 template <typename T>
jpayne@69 1870 class XThreadFulfiller;
jpayne@69 1871
jpayne@69 1872 class XThreadPaf: public PromiseNode {
jpayne@69 1873 public:
jpayne@69 1874 XThreadPaf();
jpayne@69 1875 virtual ~XThreadPaf() noexcept(false);
jpayne@69 1876 void destroy() override;
jpayne@69 1877
jpayne@69 1878 // implements PromiseNode ----------------------------------------------------
jpayne@69 1879 void onReady(Event* event) noexcept override;
jpayne@69 1880 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 1881
jpayne@69 1882 private:
jpayne@69 1883 enum {
jpayne@69 1884 WAITING,
jpayne@69 1885 // Not yet fulfilled, and the waiter is still waiting.
jpayne@69 1886 //
jpayne@69 1887 // Starting from this state, the state may transition to either FULFILLING or CANCELED
jpayne@69 1888 // using an atomic compare-and-swap.
jpayne@69 1889
jpayne@69 1890 FULFILLING,
jpayne@69 1891 // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it
jpayne@69 1892 // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not
jpayne@69 1893 // disappear out from under it. It then fills in the result value, locks the executor mutex,
jpayne@69 1894 // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to
jpayne@69 1895 // FULFILLED, and finally unlocks the mutex.
jpayne@69 1896 //
jpayne@69 1897 // If the waiting thread tries to cancel but discovers the object in this state, then it
jpayne@69 1898 // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED.
jpayne@69 1899 // It can then delete the object.
jpayne@69 1900
jpayne@69 1901 FULFILLED,
jpayne@69 1902 // The fulfilling thread has completed filling in the result value and inserting the object
jpayne@69 1903 // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer
jpayne@69 1904 // holds any pointers to this object. The waiting thread is responsible for deleting it.
jpayne@69 1905
jpayne@69 1906 DISPATCHED,
jpayne@69 1907 // The object reached FULFILLED state, and then was dispatched from the waiting thread's
jpayne@69 1908 // executor's event queue. Therefore, the object is completely owned by the waiting thread with
jpayne@69 1909 // no need to lock anything.
jpayne@69 1910
jpayne@69 1911 CANCELED
jpayne@69 1912 // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no
jpayne@69 1913 // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the
jpayne@69 1914 // object.
jpayne@69 1915 } state;
jpayne@69 1916
jpayne@69 1917 const Executor& executor;
jpayne@69 1918 // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or
jpayne@69 1919 // `FULFILLING`. After any other state has been reached, this reference may be invalidated.
jpayne@69 1920
jpayne@69 1921 ListLink<XThreadPaf> link;
jpayne@69 1922 // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting
jpayne@69 1923 // thread's executor. In those states, these pointers are guarded by said executor's mutex.
jpayne@69 1924
jpayne@69 1925 OnReadyEvent onReadyEvent;
jpayne@69 1926
jpayne@69 1927 class FulfillScope;
jpayne@69 1928
jpayne@69 1929 static kj::Exception unfulfilledException();
jpayne@69 1930 // Construct appropriate exception to use to reject an unfulfilled XThreadPaf.
jpayne@69 1931
jpayne@69 1932 template <typename T>
jpayne@69 1933 friend class XThreadFulfiller;
jpayne@69 1934 friend Executor;
jpayne@69 1935 };
jpayne@69 1936
jpayne@69 1937 template <typename T>
jpayne@69 1938 class XThreadPafImpl final: public XThreadPaf {
jpayne@69 1939 public:
jpayne@69 1940 // implements PromiseNode ----------------------------------------------------
jpayne@69 1941 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 1942 output.as<FixVoid<T>>() = kj::mv(result);
jpayne@69 1943 }
jpayne@69 1944
jpayne@69 1945 private:
jpayne@69 1946 ExceptionOr<FixVoid<T>> result;
jpayne@69 1947
jpayne@69 1948 friend class XThreadFulfiller<T>;
jpayne@69 1949 };
jpayne@69 1950
jpayne@69 1951 class XThreadPaf::FulfillScope {
jpayne@69 1952 // Create on stack while setting `XThreadPafImpl<T>::result`.
jpayne@69 1953 //
jpayne@69 1954 // This ensures that:
jpayne@69 1955 // - Only one call is carried out, even if multiple threads try to fulfill concurrently.
jpayne@69 1956 // - The waiting thread is correctly signaled.
jpayne@69 1957 public:
jpayne@69 1958 FulfillScope(XThreadPaf** pointer);
jpayne@69 1959 // Atomically nulls out *pointer and takes ownership of the pointer.
jpayne@69 1960
jpayne@69 1961 ~FulfillScope() noexcept(false);
jpayne@69 1962
jpayne@69 1963 KJ_DISALLOW_COPY_AND_MOVE(FulfillScope);
jpayne@69 1964
jpayne@69 1965 bool shouldFulfill() { return obj != nullptr; }
jpayne@69 1966
jpayne@69 1967 template <typename T>
jpayne@69 1968 XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); }
jpayne@69 1969
jpayne@69 1970 private:
jpayne@69 1971 XThreadPaf* obj;
jpayne@69 1972 };
jpayne@69 1973
jpayne@69 1974 template <typename T>
jpayne@69 1975 class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> {
jpayne@69 1976 public:
jpayne@69 1977 XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {}
jpayne@69 1978
jpayne@69 1979 ~XThreadFulfiller() noexcept(false) {
jpayne@69 1980 if (target != nullptr) {
jpayne@69 1981 reject(XThreadPaf::unfulfilledException());
jpayne@69 1982 }
jpayne@69 1983 }
jpayne@69 1984 void fulfill(FixVoid<T>&& value) const override {
jpayne@69 1985 XThreadPaf::FulfillScope scope(&target);
jpayne@69 1986 if (scope.shouldFulfill()) {
jpayne@69 1987 scope.getTarget<T>()->result = kj::mv(value);
jpayne@69 1988 }
jpayne@69 1989 }
jpayne@69 1990 void reject(Exception&& exception) const override {
jpayne@69 1991 XThreadPaf::FulfillScope scope(&target);
jpayne@69 1992 if (scope.shouldFulfill()) {
jpayne@69 1993 scope.getTarget<T>()->result.addException(kj::mv(exception));
jpayne@69 1994 }
jpayne@69 1995 }
jpayne@69 1996 bool isWaiting() const override {
jpayne@69 1997 KJ_IF_MAYBE(t, target) {
jpayne@69 1998 #if _MSC_VER && !__clang__
jpayne@69 1999 // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be?
jpayne@69 2000 return t->state == XThreadPaf::WAITING;
jpayne@69 2001 #else
jpayne@69 2002 return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING;
jpayne@69 2003 #endif
jpayne@69 2004 } else {
jpayne@69 2005 return false;
jpayne@69 2006 }
jpayne@69 2007 }
jpayne@69 2008
jpayne@69 2009 private:
jpayne@69 2010 mutable XThreadPaf* target; // accessed using atomic ops
jpayne@69 2011 };
jpayne@69 2012
jpayne@69 2013 template <typename T>
jpayne@69 2014 class XThreadFulfiller<kj::Promise<T>> {
jpayne@69 2015 public:
jpayne@69 2016 static_assert(sizeof(T) < 0,
jpayne@69 2017 "newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported");
jpayne@69 2018 // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`,
jpayne@69 2019 // then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not
jpayne@69 2020 // the waiting thread.
jpayne@69 2021 };
jpayne@69 2022
jpayne@69 2023 } // namespace _ (private)
jpayne@69 2024
jpayne@69 2025 template <typename T>
jpayne@69 2026 PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() {
jpayne@69 2027 kj::Own<_::XThreadPafImpl<T>, _::PromiseDisposer> node(new _::XThreadPafImpl<T>);
jpayne@69 2028 auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node);
jpayne@69 2029 return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) };
jpayne@69 2030 }
jpayne@69 2031
jpayne@69 2032 } // namespace kj
jpayne@69 2033
jpayne@69 2034 #if KJ_HAS_COROUTINE
jpayne@69 2035
jpayne@69 2036 // =======================================================================================
jpayne@69 2037 // Coroutines TS integration with kj::Promise<T>.
jpayne@69 2038 //
jpayne@69 2039 // Here's a simple coroutine:
jpayne@69 2040 //
jpayne@69 2041 // Promise<Own<AsyncIoStream>> connectToService(Network& n) {
jpayne@69 2042 // auto a = co_await n.parseAddress(IP, PORT);
jpayne@69 2043 // auto c = co_await a->connect();
jpayne@69 2044 // co_return kj::mv(c);
jpayne@69 2045 // }
jpayne@69 2046 //
jpayne@69 2047 // The presence of the co_await and co_return keywords tell the compiler it is a coroutine.
jpayne@69 2048 // Although it looks similar to a function, it has a couple large differences. First, everything
jpayne@69 2049 // that would normally live in the stack frame lives instead in a heap-based coroutine frame.
jpayne@69 2050 // Second, the coroutine has the ability to return from its scope without deallocating this frame
jpayne@69 2051 // (to suspend, in other words), and the ability to resume from its last suspension point.
jpayne@69 2052 //
jpayne@69 2053 // In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a
jpayne@69 2054 // coroutine implementation type via a traits class parameterized by the coroutine return and
jpayne@69 2055 // parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`,
jpayne@69 2056
jpayne@69 2057 namespace kj::_ { template <typename T> class Coroutine; }
jpayne@69 2058
jpayne@69 2059 // Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`.
jpayne@69 2060
jpayne@69 2061 namespace KJ_COROUTINE_STD_NAMESPACE {
jpayne@69 2062
jpayne@69 2063 template <class T, class... Args>
jpayne@69 2064 struct coroutine_traits<kj::Promise<T>, Args...> {
jpayne@69 2065 // `Args...` are the coroutine's parameter types.
jpayne@69 2066
jpayne@69 2067 using promise_type = kj::_::Coroutine<T>;
jpayne@69 2068 // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines
jpayne@69 2069 // returning `std::future<T>`, since the coroutine implementation would be a wrapper around
jpayne@69 2070 // a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it
jpayne@69 2071 // the "coroutine implementation type" instead.
jpayne@69 2072 };
jpayne@69 2073
jpayne@69 2074 } // namespace KJ_COROUTINE_STD_NAMESPACE
jpayne@69 2075
jpayne@69 2076 // Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a
jpayne@69 2077 // `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or
jpayne@69 2078 // `kj::_::Coroutine<Own<AsyncIoStream>>`.
jpayne@69 2079 //
jpayne@69 2080 // The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and
jpayne@69 2081 // deallocated when the frame does.
jpayne@69 2082
jpayne@69 2083 namespace kj::_ {
jpayne@69 2084
jpayne@69 2085 namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE;
jpayne@69 2086
jpayne@69 2087 class CoroutineBase: public PromiseNode,
jpayne@69 2088 public Event {
jpayne@69 2089 public:
jpayne@69 2090 CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef,
jpayne@69 2091 SourceLocation location);
jpayne@69 2092 ~CoroutineBase() noexcept(false);
jpayne@69 2093 KJ_DISALLOW_COPY_AND_MOVE(CoroutineBase);
jpayne@69 2094 void destroy() override;
jpayne@69 2095
jpayne@69 2096 auto initial_suspend() { return stdcoro::suspend_never(); }
jpayne@69 2097 auto final_suspend() noexcept {
jpayne@69 2098 #if _MSC_VER && !defined(__clang__)
jpayne@69 2099 // See comment at `finalSuspendCalled`'s definition.
jpayne@69 2100 finalSuspendCalled = true;
jpayne@69 2101 #endif
jpayne@69 2102 return stdcoro::suspend_always();
jpayne@69 2103 }
jpayne@69 2104 // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately
jpayne@69 2105 // after completion.
jpayne@69 2106 //
jpayne@69 2107 // The initial suspension point could allow us to defer the initial synchronous execution of a
jpayne@69 2108 // coroutine -- everything before its first co_await, that is.
jpayne@69 2109 //
jpayne@69 2110 // The final suspension point is useful to delay deallocation of the coroutine frame to match the
jpayne@69 2111 // lifetime of the enclosing promise.
jpayne@69 2112
jpayne@69 2113 void unhandled_exception();
jpayne@69 2114
jpayne@69 2115 protected:
jpayne@69 2116 class AwaiterBase;
jpayne@69 2117
jpayne@69 2118 bool isWaiting() { return waiting; }
jpayne@69 2119 void scheduleResumption() {
jpayne@69 2120 onReadyEvent.arm();
jpayne@69 2121 waiting = false;
jpayne@69 2122 }
jpayne@69 2123
jpayne@69 2124 private:
jpayne@69 2125 // -------------------------------------------------------
jpayne@69 2126 // PromiseNode implementation
jpayne@69 2127
jpayne@69 2128 void onReady(Event* event) noexcept override;
jpayne@69 2129 void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
jpayne@69 2130
jpayne@69 2131 // -------------------------------------------------------
jpayne@69 2132 // Event implementation
jpayne@69 2133
jpayne@69 2134 Maybe<Own<Event>> fire() override;
jpayne@69 2135 void traceEvent(TraceBuilder& builder) override;
jpayne@69 2136
jpayne@69 2137 stdcoro::coroutine_handle<> coroutine;
jpayne@69 2138 ExceptionOrValue& resultRef;
jpayne@69 2139
jpayne@69 2140 OnReadyEvent onReadyEvent;
jpayne@69 2141 bool waiting = true;
jpayne@69 2142
jpayne@69 2143 bool hasSuspendedAtLeastOnce = false;
jpayne@69 2144
jpayne@69 2145 #if _MSC_VER && !defined(__clang__)
jpayne@69 2146 bool finalSuspendCalled = false;
jpayne@69 2147 // MSVC erroneously reports the coroutine as done (that is, `coroutine.done()` returns true)
jpayne@69 2148 // seemingly as soon as `return_value()`/`return_void()` are called. This matters in our
jpayne@69 2149 // implementation of `unhandled_exception()`, which must arrange to propagate exceptions during
jpayne@69 2150 // coroutine frame unwind via the returned promise, even if `return_value()`/`return_void()` have
jpayne@69 2151 // already been called. To prove that our assumptions are correct in that function, we want to be
jpayne@69 2152 // able to assert that `final_suspend()` has not yet been called. This boolean hack allows us to
jpayne@69 2153 // preserve that assertion.
jpayne@69 2154 #endif
jpayne@69 2155
jpayne@69 2156 Maybe<PromiseNode&> promiseNodeForTrace;
jpayne@69 2157 // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that
jpayne@69 2158 // promise so tracePromise()/traceEvent() can trace into it.
jpayne@69 2159
jpayne@69 2160 UnwindDetector unwindDetector;
jpayne@69 2161
jpayne@69 2162 struct DisposalResults {
jpayne@69 2163 bool destructorRan = false;
jpayne@69 2164 Maybe<Exception> exception;
jpayne@69 2165 };
jpayne@69 2166 Maybe<DisposalResults&> maybeDisposalResults;
jpayne@69 2167 // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this
jpayne@69 2168 // to point to a DisposalResults on the stack so unhandled_exception() will have some place to
jpayne@69 2169 // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once
jpayne@69 2170 // coroutine.destroy() has returned. Our disposer then rethrows as needed.
jpayne@69 2171 };
jpayne@69 2172
jpayne@69 2173 template <typename Self, typename T>
jpayne@69 2174 class CoroutineMixin;
jpayne@69 2175 // CRTP mixin, covered later.
jpayne@69 2176
jpayne@69 2177 template <typename T>
jpayne@69 2178 class Coroutine final: public CoroutineBase,
jpayne@69 2179 public CoroutineMixin<Coroutine<T>, T> {
jpayne@69 2180 // The standard calls this the `promise_type` object. We can call this the "coroutine
jpayne@69 2181 // implementation object" since the word promise means different things in KJ and std styles. This
jpayne@69 2182 // is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise
jpayne@69 2183 // is later fulfilled. We also fill in a few lifetime-related details.
jpayne@69 2184 //
jpayne@69 2185 // The implementation object is also where we can customize memory allocation of coroutine frames,
jpayne@69 2186 // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in
jpayne@69 2187 // coroutine_traits).
jpayne@69 2188 //
jpayne@69 2189 // We can also customize how await-expressions are transformed within `kj::Promise<T>`-based
jpayne@69 2190 // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for
jpayne@69 2191 // which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to
jpayne@69 2192 // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the
jpayne@69 2193 // await-expression's type are both `kj::Promise` instantiations -- see further comments under
jpayne@69 2194 // `await_transform()`.
jpayne@69 2195
jpayne@69 2196 public:
jpayne@69 2197 using Handle = stdcoro::coroutine_handle<Coroutine<T>>;
jpayne@69 2198
jpayne@69 2199 Coroutine(SourceLocation location = {})
jpayne@69 2200 : CoroutineBase(Handle::from_promise(*this), result, location) {}
jpayne@69 2201
jpayne@69 2202 Promise<T> get_return_object() {
jpayne@69 2203 // Called after coroutine frame construction and before initial_suspend() to create the
jpayne@69 2204 // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for
jpayne@69 2205 // the returned Promise<T> to own `this` via a custom Disposer and by always leaving the
jpayne@69 2206 // coroutine in a suspended state.
jpayne@69 2207 return PromiseNode::to<Promise<T>>(OwnPromiseNode(this));
jpayne@69 2208 }
jpayne@69 2209
jpayne@69 2210 public:
jpayne@69 2211 template <typename U>
jpayne@69 2212 class Awaiter;
jpayne@69 2213
jpayne@69 2214 template <typename U>
jpayne@69 2215 Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); }
jpayne@69 2216 template <typename U>
jpayne@69 2217 Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); }
jpayne@69 2218 // Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return
jpayne@69 2219 // an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async
jpayne@69 2220 // event system.
jpayne@69 2221 //
jpayne@69 2222 // There is another hook we could implement: an `operator co_await()` free function. However, a
jpayne@69 2223 // free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a
jpayne@69 2224 // member class template of Coroutine<T>, it is able to implement an
jpayne@69 2225 // `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing
jpayne@69 2226 // coroutine's PromiseNode. An `operator co_await()` free function would have to implement
jpayne@69 2227 // a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement
jpayne@69 2228 // suspension and resumption in terms of .then(). Yuck!
jpayne@69 2229
jpayne@69 2230 private:
jpayne@69 2231 // -------------------------------------------------------
jpayne@69 2232 // PromiseNode implementation
jpayne@69 2233
jpayne@69 2234 void get(ExceptionOrValue& output) noexcept override {
jpayne@69 2235 output.as<FixVoid<T>>() = kj::mv(result);
jpayne@69 2236 }
jpayne@69 2237
jpayne@69 2238 void fulfill(FixVoid<T>&& value) {
jpayne@69 2239 // Called by the return_value()/return_void() functions in our mixin class.
jpayne@69 2240
jpayne@69 2241 if (isWaiting()) {
jpayne@69 2242 result = kj::mv(value);
jpayne@69 2243 scheduleResumption();
jpayne@69 2244 }
jpayne@69 2245 }
jpayne@69 2246
jpayne@69 2247 ExceptionOr<FixVoid<T>> result;
jpayne@69 2248
jpayne@69 2249 friend class CoroutineMixin<Coroutine<T>, T>;
jpayne@69 2250 };
jpayne@69 2251
jpayne@69 2252 template <typename Self, typename T>
jpayne@69 2253 class CoroutineMixin {
jpayne@69 2254 public:
jpayne@69 2255 void return_value(T value) {
jpayne@69 2256 static_cast<Self*>(this)->fulfill(kj::mv(value));
jpayne@69 2257 }
jpayne@69 2258 };
jpayne@69 2259 template <typename Self>
jpayne@69 2260 class CoroutineMixin<Self, void> {
jpayne@69 2261 public:
jpayne@69 2262 void return_void() {
jpayne@69 2263 static_cast<Self*>(this)->fulfill(_::Void());
jpayne@69 2264 }
jpayne@69 2265 };
jpayne@69 2266 // The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return
jpayne@69 2267 // statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has
jpayne@69 2268 // both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so
jpayne@69 2269 // these return_* functions live in a CRTP mixin.
jpayne@69 2270
jpayne@69 2271 class CoroutineBase::AwaiterBase {
jpayne@69 2272 public:
jpayne@69 2273 explicit AwaiterBase(OwnPromiseNode node);
jpayne@69 2274 AwaiterBase(AwaiterBase&&);
jpayne@69 2275 ~AwaiterBase() noexcept(false);
jpayne@69 2276 KJ_DISALLOW_COPY(AwaiterBase);
jpayne@69 2277
jpayne@69 2278 bool await_ready() const { return false; }
jpayne@69 2279 // This could return "`node->get()` is safe to call" instead, which would make suspension-less
jpayne@69 2280 // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that
jpayne@69 2281 // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we
jpayne@69 2282 // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable
jpayne@69 2283 // suspension-less co_awaits.
jpayne@69 2284
jpayne@69 2285 protected:
jpayne@69 2286 void getImpl(ExceptionOrValue& result, void* awaitedAt);
jpayne@69 2287 bool awaitSuspendImpl(CoroutineBase& coroutineEvent);
jpayne@69 2288
jpayne@69 2289 private:
jpayne@69 2290 UnwindDetector unwindDetector;
jpayne@69 2291 OwnPromiseNode node;
jpayne@69 2292
jpayne@69 2293 Maybe<CoroutineBase&> maybeCoroutineEvent;
jpayne@69 2294 // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our
jpayne@69 2295 // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack
jpayne@69 2296 // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called,
jpayne@69 2297 // we need our own reference to the enclosing Coroutine. (I struggle to think up any such
jpayne@69 2298 // scenarios, but perhaps they could occur when destroying a suspended coroutine.)
jpayne@69 2299 };
jpayne@69 2300
jpayne@69 2301 template <typename T>
jpayne@69 2302 template <typename U>
jpayne@69 2303 class Coroutine<T>::Awaiter: public AwaiterBase {
jpayne@69 2304 // Wrapper around a co_await'ed promise and some storage space for the result of that promise.
jpayne@69 2305 // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up
jpayne@69 2306 // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event
jpayne@69 2307 // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the
jpayne@69 2308 // awaited promise result.
jpayne@69 2309
jpayne@69 2310 public:
jpayne@69 2311 explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {}
jpayne@69 2312
jpayne@69 2313 KJ_NOINLINE U await_resume() {
jpayne@69 2314 // This is marked noinline in order to ensure __builtin_return_address() is accurate for stack
jpayne@69 2315 // trace purposes. In my experimentation, this method was not inlined anyway even in opt
jpayne@69 2316 // builds, but I want to make sure it doesn't suddenly start being inlined later causing stack
jpayne@69 2317 // traces to break. (I also tried always-inline, but this did not appear to cause the compiler
jpayne@69 2318 // to inline the method -- perhaps a limitation of coroutines?)
jpayne@69 2319 #if __GNUC__
jpayne@69 2320 getImpl(result, __builtin_return_address(0));
jpayne@69 2321 #elif _MSC_VER
jpayne@69 2322 getImpl(result, _ReturnAddress());
jpayne@69 2323 #else
jpayne@69 2324 #error "please implement for your compiler"
jpayne@69 2325 #endif
jpayne@69 2326 auto value = kj::_::readMaybe(result.value);
jpayne@69 2327 KJ_IASSERT(value != nullptr, "Neither exception nor value present.");
jpayne@69 2328 return U(kj::mv(*value));
jpayne@69 2329 }
jpayne@69 2330
jpayne@69 2331 bool await_suspend(Coroutine::Handle coroutine) {
jpayne@69 2332 return awaitSuspendImpl(coroutine.promise());
jpayne@69 2333 }
jpayne@69 2334
jpayne@69 2335 private:
jpayne@69 2336 ExceptionOr<FixVoid<U>> result;
jpayne@69 2337 };
jpayne@69 2338
jpayne@69 2339 #undef KJ_COROUTINE_STD_NAMESPACE
jpayne@69 2340
jpayne@69 2341 } // namespace kj::_ (private)
jpayne@69 2342
jpayne@69 2343 #endif // KJ_HAS_COROUTINE
jpayne@69 2344
jpayne@69 2345 KJ_END_HEADER