jpayne@69: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors jpayne@69: // Licensed under the MIT License: jpayne@69: // jpayne@69: // Permission is hereby granted, free of charge, to any person obtaining a copy jpayne@69: // of this software and associated documentation files (the "Software"), to deal jpayne@69: // in the Software without restriction, including without limitation the rights jpayne@69: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell jpayne@69: // copies of the Software, and to permit persons to whom the Software is jpayne@69: // furnished to do so, subject to the following conditions: jpayne@69: // jpayne@69: // The above copyright notice and this permission notice shall be included in jpayne@69: // all copies or substantial portions of the Software. jpayne@69: // jpayne@69: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE jpayne@69: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, jpayne@69: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN jpayne@69: // THE SOFTWARE. jpayne@69: jpayne@69: // This file contains extended inline implementation details that are required along with async.h. jpayne@69: // We move this all into a separate file to make async.h more readable. jpayne@69: // jpayne@69: // Non-inline declarations here are defined in async.c++. jpayne@69: jpayne@69: #pragma once jpayne@69: jpayne@69: #ifndef KJ_ASYNC_H_INCLUDED jpayne@69: #error "Do not include this directly; include kj/async.h." jpayne@69: #include "async.h" // help IDE parse this file jpayne@69: #endif jpayne@69: jpayne@69: #if _MSC_VER && KJ_HAS_COROUTINE jpayne@69: #include jpayne@69: #endif jpayne@69: jpayne@69: #include jpayne@69: jpayne@69: KJ_BEGIN_HEADER jpayne@69: jpayne@69: namespace kj { jpayne@69: namespace _ { // private jpayne@69: jpayne@69: template jpayne@69: class ExceptionOr; jpayne@69: jpayne@69: class ExceptionOrValue { jpayne@69: public: jpayne@69: ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {} jpayne@69: KJ_DISALLOW_COPY(ExceptionOrValue); jpayne@69: jpayne@69: void addException(Exception&& exception) { jpayne@69: if (this->exception == nullptr) { jpayne@69: this->exception = kj::mv(exception); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: ExceptionOr& as() { return *static_cast*>(this); } jpayne@69: template jpayne@69: const ExceptionOr& as() const { return *static_cast*>(this); } jpayne@69: jpayne@69: Maybe exception; jpayne@69: jpayne@69: protected: jpayne@69: // Allow subclasses to have move constructor / assignment. jpayne@69: ExceptionOrValue() = default; jpayne@69: ExceptionOrValue(ExceptionOrValue&& other) = default; jpayne@69: ExceptionOrValue& operator=(ExceptionOrValue&& other) = default; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class ExceptionOr: public ExceptionOrValue { jpayne@69: public: jpayne@69: ExceptionOr() = default; jpayne@69: ExceptionOr(T&& value): value(kj::mv(value)) {} jpayne@69: ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {} jpayne@69: ExceptionOr(ExceptionOr&&) = default; jpayne@69: ExceptionOr& operator=(ExceptionOr&&) = default; jpayne@69: jpayne@69: Maybe value; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: inline T convertToReturn(ExceptionOr&& result) { jpayne@69: KJ_IF_MAYBE(value, result.value) { jpayne@69: KJ_IF_MAYBE(exception, result.exception) { jpayne@69: throwRecoverableException(kj::mv(*exception)); jpayne@69: } jpayne@69: return _::returnMaybeVoid(kj::mv(*value)); jpayne@69: } else KJ_IF_MAYBE(exception, result.exception) { jpayne@69: throwFatalException(kj::mv(*exception)); jpayne@69: } else { jpayne@69: // Result contained neither a value nor an exception? jpayne@69: KJ_UNREACHABLE; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: inline void convertToReturn(ExceptionOr&& result) { jpayne@69: // Override case to use throwRecoverableException(). jpayne@69: jpayne@69: if (result.value != nullptr) { jpayne@69: KJ_IF_MAYBE(exception, result.exception) { jpayne@69: throwRecoverableException(kj::mv(*exception)); jpayne@69: } jpayne@69: } else KJ_IF_MAYBE(exception, result.exception) { jpayne@69: throwRecoverableException(kj::mv(*exception)); jpayne@69: } else { jpayne@69: // Result contained neither a value nor an exception? jpayne@69: KJ_UNREACHABLE; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: class TraceBuilder { jpayne@69: // Helper for methods that build a call trace. jpayne@69: public: jpayne@69: TraceBuilder(ArrayPtr space) jpayne@69: : start(space.begin()), current(space.begin()), limit(space.end()) {} jpayne@69: jpayne@69: inline void add(void* addr) { jpayne@69: if (current < limit) { jpayne@69: *current++ = addr; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: inline bool full() const { return current == limit; } jpayne@69: jpayne@69: ArrayPtr finish() { jpayne@69: return arrayPtr(start, current); jpayne@69: } jpayne@69: jpayne@69: String toString(); jpayne@69: jpayne@69: private: jpayne@69: void** start; jpayne@69: void** current; jpayne@69: void** limit; jpayne@69: }; jpayne@69: jpayne@69: struct alignas(void*) PromiseArena { jpayne@69: // Space in which a chain of promises may be allocated. See PromiseDisposer. jpayne@69: byte bytes[1024]; jpayne@69: }; jpayne@69: jpayne@69: class Event: private AsyncObject { jpayne@69: // An event waiting to be executed. Not for direct use by applications -- promises use this jpayne@69: // internally. jpayne@69: jpayne@69: public: jpayne@69: Event(SourceLocation location); jpayne@69: Event(kj::EventLoop& loop, SourceLocation location); jpayne@69: ~Event() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(Event); jpayne@69: jpayne@69: void armDepthFirst(); jpayne@69: // Enqueue this event so that `fire()` will be called from the event loop soon. jpayne@69: // jpayne@69: // Events scheduled in this way are executed in depth-first order: if an event callback arms jpayne@69: // more events, those events are placed at the front of the queue (in the order in which they jpayne@69: // were armed), so that they run immediately after the first event's callback returns. jpayne@69: // jpayne@69: // Depth-first event scheduling is appropriate for events that represent simple continuations jpayne@69: // of a previous event that should be globbed together for performance. Depth-first scheduling jpayne@69: // can lead to starvation, so any long-running task must occasionally yield with jpayne@69: // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses jpayne@69: // breadth-first.) jpayne@69: // jpayne@69: // To use breadth-first scheduling instead, use `armBreadthFirst()`. jpayne@69: jpayne@69: void armBreadthFirst(); jpayne@69: // Like `armDepthFirst()` except that the event is placed at the end of the queue. jpayne@69: jpayne@69: void armLast(); jpayne@69: // Enqueues this event to happen after all other events have run to completion and there is jpayne@69: // really nothing left to do except wait for I/O. jpayne@69: jpayne@69: bool isNext(); jpayne@69: // True if the Event has been armed and is next in line to be fired. This can be used after jpayne@69: // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately jpayne@69: // ready, in which case continuations may be optimistically run without returning to the event jpayne@69: // loop. Note that this optimization is only valid if we know that we would otherwise immediately jpayne@69: // return to the event loop without running more application code. So this turns out to be useful jpayne@69: // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it jpayne@69: // doesn't need to. jpayne@69: // jpayne@69: // Returns false if the event loop is not currently running. This ensures that promise jpayne@69: // continuations don't execute except under a call to .wait(). jpayne@69: jpayne@69: void disarm(); jpayne@69: // If the event is armed but hasn't fired, cancel it. (Destroying the event does this jpayne@69: // implicitly.) jpayne@69: jpayne@69: virtual void traceEvent(TraceBuilder& builder) = 0; jpayne@69: // Build a trace of the callers leading up to this event. `builder` will be populated with jpayne@69: // "return addresses" of the promise chain waiting on this event. The return addresses may jpayne@69: // actually the addresses of lambdas passed to .then(), but in any case, feeding them into jpayne@69: // addr2line should produce useful source code locations. jpayne@69: // jpayne@69: // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It jpayne@69: // must not allocate nor take locks. jpayne@69: jpayne@69: String traceEvent(); jpayne@69: // Helper that builds a trace and stringifies it. jpayne@69: jpayne@69: protected: jpayne@69: virtual Maybe> fire() = 0; jpayne@69: // Fire the event. Possibly returns a pointer to itself, which will be discarded by the jpayne@69: // caller. This is the only way that an event can delete itself as a result of firing, as jpayne@69: // doing so from within fire() will throw an exception. jpayne@69: jpayne@69: private: jpayne@69: friend class kj::EventLoop; jpayne@69: EventLoop& loop; jpayne@69: Event* next; jpayne@69: Event** prev; jpayne@69: bool firing = false; jpayne@69: jpayne@69: static constexpr uint MAGIC_LIVE_VALUE = 0x1e366381u; jpayne@69: uint live = MAGIC_LIVE_VALUE; jpayne@69: SourceLocation location; jpayne@69: }; jpayne@69: jpayne@69: class PromiseArenaMember { jpayne@69: // An object that is allocated in a PromiseArena. `PromiseNode` inherits this, and most jpayne@69: // arena-allocated objects are `PromiseNode` subclasses, but `TaskSet::Task`, ForkHub, and jpayne@69: // potentially other objects that commonly live on the end of a promise chain can also leverage jpayne@69: // this. jpayne@69: jpayne@69: public: jpayne@69: virtual void destroy() = 0; jpayne@69: // Destroys and frees the node. jpayne@69: // jpayne@69: // If the node was allocated using allocPromise(), then destroy() must call jpayne@69: // freePromise(this). If it was allocated some other way, then it is `destroy()`'s jpayne@69: // responsibility to complete any necessary cleanup of memory, e.g. call `delete this`. jpayne@69: // jpayne@69: // We use this instead of a virtual destructor for two reasons: jpayne@69: // 1. Coroutine nodes are not independent objects, they have to call destroy() on the coroutine jpayne@69: // handle to delete themselves. jpayne@69: // 2. XThreadEvents sometimes leave it up to a different thread to actually delete the object. jpayne@69: jpayne@69: private: jpayne@69: PromiseArena* arena = nullptr; jpayne@69: // If non-null, then this PromiseNode is the last node allocated within the given arena, and jpayne@69: // therefore owns the arena. After this node is destroyed, the arena should be deleted. jpayne@69: // jpayne@69: // PromiseNodes are allocated within the arena starting from the end, and `PromiseNode`s jpayne@69: // allocated this way are required to have `PromiseNode` itself as their leftmost inherited type, jpayne@69: // so that the pointers match. Thus, the space in `arena` from its start to the location of the jpayne@69: // `PromiseNode` is known to be available for subsequent allocations (which should then take jpayne@69: // ownership of the arena). jpayne@69: jpayne@69: friend class PromiseDisposer; jpayne@69: }; jpayne@69: jpayne@69: class PromiseNode: public PromiseArenaMember, private AsyncObject { jpayne@69: // A Promise contains a chain of PromiseNodes tracking the pending transformations. jpayne@69: // jpayne@69: // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky jpayne@69: // use of pointers to ExceptionOrValue which actually point to ExceptionOr, but are only jpayne@69: // so down-cast in the few places that really need to be templated. Luckily this is all jpayne@69: // internal implementation details. jpayne@69: jpayne@69: public: jpayne@69: virtual void onReady(Event* event) noexcept = 0; jpayne@69: // Arms the given event when ready. jpayne@69: // jpayne@69: // May be called multiple times. If called again before the event was armed, the old event will jpayne@69: // never be armed, only the new one. If called again after the event was armed, the new event jpayne@69: // will be armed immediately. Can be called with nullptr to un-register the existing event. jpayne@69: jpayne@69: virtual void setSelfPointer(OwnPromiseNode* selfPtr) noexcept; jpayne@69: // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own jpayne@69: // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses jpayne@69: // this to shorten redundant chains. The default implementation does nothing; only jpayne@69: // ChainPromiseNode should implement this. jpayne@69: jpayne@69: virtual void get(ExceptionOrValue& output) noexcept = 0; jpayne@69: // Get the result. `output` points to an ExceptionOr into which the result will be written. jpayne@69: // Can only be called once, and only after the node is ready. Must be called directly from the jpayne@69: // event loop, with no application code on the stack. jpayne@69: jpayne@69: virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0; jpayne@69: // Build a trace of this promise chain, showing what it is currently waiting on. jpayne@69: // jpayne@69: // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically jpayne@69: // recurse to its child first, then after the child returns, add itself to the trace. jpayne@69: // jpayne@69: // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that jpayne@69: // also implements Event, and should not trace that node or its children. This is used in jpayne@69: // conjunction with Event::traceEvent(). The chain of Events is often more sparse than the chain jpayne@69: // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an jpayne@69: // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node jpayne@69: // when it is ready, and then TransformPromiseNode applies the .then() transformation during the jpayne@69: // call to .get(). jpayne@69: // jpayne@69: // So, when we trace the chain of Events backwards, we end up hoping over segments of jpayne@69: // TransformPromiseNodes (and other similar types). In order to get those added to the trace, jpayne@69: // each Event must call back down the PromiseNode chain in the opposite direction, using this jpayne@69: // method. jpayne@69: // jpayne@69: // `tracePromise()` may be called from an async signal handler while `get()` is executing. It jpayne@69: // must not allocate nor take locks. jpayne@69: jpayne@69: template jpayne@69: static OwnPromiseNode from(T&& promise) { jpayne@69: // Given a Promise, extract the PromiseNode. jpayne@69: return kj::mv(promise.node); jpayne@69: } jpayne@69: template jpayne@69: static PromiseNode& from(T& promise) { jpayne@69: // Given a Promise, extract the PromiseNode. jpayne@69: return *promise.node; jpayne@69: } jpayne@69: template jpayne@69: static T to(OwnPromiseNode&& node) { jpayne@69: // Construct a Promise from a PromiseNode. (T should be a Promise type.) jpayne@69: return T(false, kj::mv(node)); jpayne@69: } jpayne@69: jpayne@69: protected: jpayne@69: class OnReadyEvent { jpayne@69: // Helper class for implementing onReady(). jpayne@69: jpayne@69: public: jpayne@69: void init(Event* newEvent); jpayne@69: jpayne@69: void arm(); jpayne@69: void armBreadthFirst(); jpayne@69: // Arms the event if init() has already been called and makes future calls to init() jpayne@69: // automatically arm the event. jpayne@69: jpayne@69: inline void traceEvent(TraceBuilder& builder) { jpayne@69: if (event != nullptr && !builder.full()) event->traceEvent(builder); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Event* event = nullptr; jpayne@69: }; jpayne@69: }; jpayne@69: jpayne@69: class PromiseDisposer { jpayne@69: public: jpayne@69: template jpayne@69: static constexpr bool canArenaAllocate() { jpayne@69: // We can only use arena allocation for types that fit in an arena and have pointer-size jpayne@69: // alignment. Anything else will need to be allocated as a separate heap object. jpayne@69: return sizeof(T) <= sizeof(PromiseArena) && alignof(T) <= alignof(void*); jpayne@69: } jpayne@69: jpayne@69: static void dispose(PromiseArenaMember* node) { jpayne@69: PromiseArena* arena = node->arena; jpayne@69: node->destroy(); jpayne@69: delete arena; // reminder: `delete` automatically ignores null pointers jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: static kj::Own alloc(Params&&... params) noexcept { jpayne@69: // Implements allocPromise(). jpayne@69: T* ptr; jpayne@69: if (!canArenaAllocate()) { jpayne@69: // Node too big (or needs weird alignment), fall back to regular heap allocation. jpayne@69: ptr = new T(kj::fwd(params)...); jpayne@69: } else { jpayne@69: // Start a new arena. jpayne@69: // jpayne@69: // NOTE: As in append() (below), we don't implement exception-safety because it causes code jpayne@69: // bloat and these constructors probably don't throw. Instead this function is noexcept, so jpayne@69: // if a constructor does throw, it'll crash rather than leak memory. jpayne@69: auto* arena = new PromiseArena; jpayne@69: ptr = reinterpret_cast(arena + 1) - 1; jpayne@69: ctor(*ptr, kj::fwd(params)...); jpayne@69: ptr->arena = arena; jpayne@69: KJ_IREQUIRE(reinterpret_cast(ptr) == jpayne@69: reinterpret_cast(static_cast(ptr)), jpayne@69: "PromiseArenaMember must be the leftmost inherited type."); jpayne@69: } jpayne@69: return kj::Own(ptr); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: static kj::Own append( jpayne@69: OwnPromiseNode&& next, Params&&... params) noexcept { jpayne@69: // Implements appendPromise(). jpayne@69: jpayne@69: PromiseArena* arena = next->arena; jpayne@69: jpayne@69: if (!canArenaAllocate() || arena == nullptr || jpayne@69: reinterpret_cast(next.get()) - reinterpret_cast(arena) < sizeof(T)) { jpayne@69: // No arena available, or not enough space, or weird alignment needed. Start new arena. jpayne@69: return alloc(kj::mv(next), kj::fwd(params)...); jpayne@69: } else { jpayne@69: // Append to arena. jpayne@69: // jpayne@69: // NOTE: When we call ctor(), it takes ownership of `next`, so we shouldn't assume `next` jpayne@69: // still exists after it returns. So we have to remove ownership of the arena before that. jpayne@69: // In theory if we wanted this to be exception-safe, we'd also have to arrange to delete jpayne@69: // the arena if the constructor throws. However, in practice none of the PromiseNode jpayne@69: // constructors throw, so we just mark the whole method noexcept in order to avoid the jpayne@69: // code bloat to handle this case. jpayne@69: next->arena = nullptr; jpayne@69: T* ptr = reinterpret_cast(next.get()) - 1; jpayne@69: ctor(*ptr, kj::mv(next), kj::fwd(params)...); jpayne@69: ptr->arena = arena; jpayne@69: KJ_IREQUIRE(reinterpret_cast(ptr) == jpayne@69: reinterpret_cast(static_cast(ptr)), jpayne@69: "PromiseArenaMember must be the leftmost inherited type."); jpayne@69: return kj::Own(ptr); jpayne@69: } jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: static kj::Own allocPromise(Params&&... params) { jpayne@69: // Allocate a PromiseNode without appending it to any existing promise arena. Space for a new jpayne@69: // arena will be allocated. jpayne@69: return PromiseDisposer::alloc(kj::fwd(params)...); jpayne@69: } jpayne@69: jpayne@69: template ()> jpayne@69: struct FreePromiseNode; jpayne@69: template jpayne@69: struct FreePromiseNode { jpayne@69: static inline void free(T* ptr) { jpayne@69: // The object will have been allocated in an arena, so we only want to run the destructor. jpayne@69: // The arena's memory will be freed separately. jpayne@69: kj::dtor(*ptr); jpayne@69: } jpayne@69: }; jpayne@69: template jpayne@69: struct FreePromiseNode { jpayne@69: static inline void free(T* ptr) { jpayne@69: // The object will have been allocated separately on the heap. jpayne@69: return delete ptr; jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: static void freePromise(T* ptr) { jpayne@69: // Free a PromiseNode originally allocated using `allocPromise()`. The implementation of jpayne@69: // PromiseNode::destroy() must call this for any type that is allocated using allocPromise(). jpayne@69: FreePromiseNode::free(ptr); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: static kj::Own appendPromise(OwnPromiseNode&& next, Params&&... params) { jpayne@69: // Append a promise to the arena that currently ends with `next`. `next` is also still passed as jpayne@69: // the first parameter to the new object's constructor. jpayne@69: // jpayne@69: // This is semantically the same as `allocPromise()` except that it may avoid the underlying jpayne@69: // memory allocation. `next` must end up being destroyed before the new object (i.e. the new jpayne@69: // object must never transfer away ownership of `next`). jpayne@69: return PromiseDisposer::append(kj::mv(next), kj::fwd(params)...); jpayne@69: } jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: inline ReadyNow::operator Promise() const { jpayne@69: return PromiseNode::to>(readyNow()); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline NeverDone::operator Promise() const { jpayne@69: return PromiseNode::to>(neverDone()); jpayne@69: } jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class ImmediatePromiseNodeBase: public PromiseNode { jpayne@69: public: jpayne@69: ImmediatePromiseNodeBase(); jpayne@69: ~ImmediatePromiseNodeBase() noexcept(false); jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class ImmediatePromiseNode final: public ImmediatePromiseNodeBase { jpayne@69: // A promise that has already been resolved to an immediate value or exception. jpayne@69: jpayne@69: public: jpayne@69: ImmediatePromiseNode(ExceptionOr&& result): result(kj::mv(result)) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: ExceptionOr result; jpayne@69: }; jpayne@69: jpayne@69: class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase { jpayne@69: public: jpayne@69: ImmediateBrokenPromiseNode(Exception&& exception); jpayne@69: void destroy() override; jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override; jpayne@69: jpayne@69: private: jpayne@69: Exception exception; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class ConstPromiseNode: public ImmediatePromiseNodeBase { jpayne@69: public: jpayne@69: void destroy() override {} jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as() = value; jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class AttachmentPromiseNodeBase: public PromiseNode { jpayne@69: public: jpayne@69: AttachmentPromiseNodeBase(OwnPromiseNode&& dependency); jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void get(ExceptionOrValue& output) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: private: jpayne@69: OwnPromiseNode dependency; jpayne@69: jpayne@69: void dropDependency(); jpayne@69: jpayne@69: template jpayne@69: friend class AttachmentPromiseNode; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class AttachmentPromiseNode final: public AttachmentPromiseNodeBase { jpayne@69: // A PromiseNode that holds on to some object (usually, an Own, but could be any movable jpayne@69: // object) until the promise resolves. jpayne@69: jpayne@69: public: jpayne@69: AttachmentPromiseNode(OwnPromiseNode&& dependency, Attachment&& attachment) jpayne@69: : AttachmentPromiseNodeBase(kj::mv(dependency)), jpayne@69: attachment(kj::mv(attachment)) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: ~AttachmentPromiseNode() noexcept(false) { jpayne@69: // We need to make sure the dependency is deleted before we delete the attachment because the jpayne@69: // dependency may be using the attachment. jpayne@69: dropDependency(); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Attachment attachment; jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: #if __GNUC__ >= 8 && !__clang__ jpayne@69: // GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no jpayne@69: // "legal" way for us to extract the content of a PTMF so too bad. jpayne@69: #pragma GCC diagnostic push jpayne@69: #pragma GCC diagnostic ignored "-Wclass-memaccess" jpayne@69: #if __GNUC__ >= 11 jpayne@69: // GCC 11's array-bounds is similarly upset with us for digging into "private" implementation jpayne@69: // details. But the format is well-defined by the ABI which cannot change so please just let us jpayne@69: // do it kthx. jpayne@69: #pragma GCC diagnostic ignored "-Warray-bounds" jpayne@69: #endif jpayne@69: #endif jpayne@69: jpayne@69: template jpayne@69: void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); jpayne@69: template jpayne@69: void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); jpayne@69: // Given an object and a pointer-to-method, return the start address of the method's code. The jpayne@69: // intent is that this address can be used in a trace; addr2line should map it to the start of jpayne@69: // the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine jpayne@69: // the address of the specific implementation (otherwise, `obj` wouldn't be needed). jpayne@69: // jpayne@69: // Note that if the method is overloaded or is a template, you will need to explicitly specify jpayne@69: // the param and return types, otherwise the compiler won't know which overload / template jpayne@69: // specialization you are requesting. jpayne@69: jpayne@69: class PtmfHelper { jpayne@69: // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The jpayne@69: // class represents the internal representation of a pointer-to-member-function. jpayne@69: jpayne@69: template jpayne@69: friend struct GetFunctorStartAddress; jpayne@69: template jpayne@69: friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); jpayne@69: template jpayne@69: friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); jpayne@69: jpayne@69: #if __GNUG__ jpayne@69: jpayne@69: void* ptr; jpayne@69: ptrdiff_t adj; jpayne@69: // Layout of a pointer-to-member-function used by GCC and compatible compilers. jpayne@69: jpayne@69: void* apply(const void* obj) { jpayne@69: #if defined(__arm__) || defined(__mips__) || defined(__aarch64__) jpayne@69: if (adj & 1) { jpayne@69: ptrdiff_t voff = (ptrdiff_t)ptr; jpayne@69: #else jpayne@69: ptrdiff_t voff = (ptrdiff_t)ptr; jpayne@69: if (voff & 1) { jpayne@69: voff &= ~1; jpayne@69: #endif jpayne@69: return *(void**)(*(char**)obj + voff); jpayne@69: } else { jpayne@69: return ptr; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: #define BODY \ jpayne@69: PtmfHelper result; \ jpayne@69: static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \ jpayne@69: memcpy(&result, &p, sizeof(result)); \ jpayne@69: return result jpayne@69: jpayne@69: #else // __GNUG__ jpayne@69: jpayne@69: void* apply(const void* obj) { return nullptr; } jpayne@69: // TODO(port): PTMF instruction address extraction jpayne@69: jpayne@69: #define BODY return PtmfHelper{} jpayne@69: jpayne@69: #endif // __GNUG__, else jpayne@69: jpayne@69: template jpayne@69: static PtmfHelper from(F p) { BODY; } jpayne@69: // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not jpayne@69: // overloaded nor a template. In this case the compiler is able to deduce the full function jpayne@69: // signature directly given the name since there is only one function with that name. jpayne@69: jpayne@69: template jpayne@69: static PtmfHelper from(R (C::*p)(NoInfer

...)) { BODY; } jpayne@69: template jpayne@69: static PtmfHelper from(R (C::*p)(NoInfer

...) const) { BODY; } jpayne@69: // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case jpayne@69: // the function must match exactly the containing type C, return type R, and parameter types P... jpayne@69: // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a jpayne@69: // guess at P. Luckily, if the function parameters are template parameters then it's not jpayne@69: // necessary to be precise about P. jpayne@69: #undef BODY jpayne@69: }; jpayne@69: jpayne@69: #if __GNUC__ >= 8 && !__clang__ jpayne@69: #pragma GCC diagnostic pop jpayne@69: #endif jpayne@69: jpayne@69: template jpayne@69: void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) { jpayne@69: return PtmfHelper::from(method).apply(&obj); jpayne@69: } jpayne@69: template jpayne@69: void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) { jpayne@69: return PtmfHelper::from(method).apply(&obj); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: struct GetFunctorStartAddress { jpayne@69: // Given a functor (any object defining operator()), return the start address of the function, jpayne@69: // suitable for passing to addr2line to obtain a source file/line for debugging purposes. jpayne@69: // jpayne@69: // This turns out to be incredibly hard to implement in the presence of overloaded or templated jpayne@69: // functors. Therefore, we impose these specific restrictions, specific to our use case: jpayne@69: // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas jpayne@69: // anyway.) jpayne@69: // - The template parameters to GetFunctorStartAddress specify a hint as to the expected jpayne@69: // parameter types. If the functor is templated, its parameters must match exactly these types. jpayne@69: // (If it's not templated, ParamTypes are ignored.) jpayne@69: jpayne@69: template jpayne@69: static void* apply(Func&& func) { jpayne@69: typedef decltype(func(instance()...)) ReturnType; jpayne@69: return PtmfHelper::from, ParamTypes...>( jpayne@69: &Decay::operator()).apply(&func); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: template <> jpayne@69: struct GetFunctorStartAddress: public GetFunctorStartAddress<> {}; jpayne@69: // Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function jpayne@69: // actually has no parameters. jpayne@69: jpayne@69: class TransformPromiseNodeBase: public PromiseNode { jpayne@69: public: jpayne@69: TransformPromiseNodeBase(OwnPromiseNode&& dependency, void* continuationTracePtr); jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void get(ExceptionOrValue& output) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: private: jpayne@69: OwnPromiseNode dependency; jpayne@69: void* continuationTracePtr; jpayne@69: jpayne@69: void dropDependency(); jpayne@69: void getDepResult(ExceptionOrValue& output); jpayne@69: jpayne@69: virtual void getImpl(ExceptionOrValue& output) = 0; jpayne@69: jpayne@69: template jpayne@69: friend class TransformPromiseNode; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class TransformPromiseNode final: public TransformPromiseNodeBase { jpayne@69: // A PromiseNode that transforms the result of another PromiseNode through an application-provided jpayne@69: // function (implements `then()`). jpayne@69: jpayne@69: public: jpayne@69: TransformPromiseNode(OwnPromiseNode&& dependency, Func&& func, ErrorFunc&& errorHandler, jpayne@69: void* continuationTracePtr) jpayne@69: : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr), jpayne@69: func(kj::fwd(func)), errorHandler(kj::fwd(errorHandler)) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: ~TransformPromiseNode() noexcept(false) { jpayne@69: // We need to make sure the dependency is deleted before we delete the continuations because it jpayne@69: // is a common pattern for the continuations to hold ownership of objects that might be in-use jpayne@69: // by the dependency. jpayne@69: dropDependency(); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Func func; jpayne@69: ErrorFunc errorHandler; jpayne@69: jpayne@69: void getImpl(ExceptionOrValue& output) override { jpayne@69: ExceptionOr depResult; jpayne@69: getDepResult(depResult); jpayne@69: KJ_IF_MAYBE(depException, depResult.exception) { jpayne@69: output.as() = handle( jpayne@69: MaybeVoidCaller>>::apply( jpayne@69: errorHandler, kj::mv(*depException))); jpayne@69: } else KJ_IF_MAYBE(depValue, depResult.value) { jpayne@69: output.as() = handle(MaybeVoidCaller::apply(func, kj::mv(*depValue))); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: ExceptionOr handle(T&& value) { jpayne@69: return kj::mv(value); jpayne@69: } jpayne@69: ExceptionOr handle(PropagateException::Bottom&& value) { jpayne@69: return ExceptionOr(false, value.asException()); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class ForkHubBase; jpayne@69: using OwnForkHubBase = Own; jpayne@69: jpayne@69: class ForkBranchBase: public PromiseNode { jpayne@69: public: jpayne@69: ForkBranchBase(OwnForkHubBase&& hub); jpayne@69: ~ForkBranchBase() noexcept(false); jpayne@69: jpayne@69: void hubReady() noexcept; jpayne@69: // Called by the hub to indicate that it is ready. jpayne@69: jpayne@69: // implements PromiseNode ------------------------------------------ jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: protected: jpayne@69: inline ExceptionOrValue& getHubResultRef(); jpayne@69: jpayne@69: void releaseHub(ExceptionOrValue& output); jpayne@69: // Release the hub. If an exception is thrown, add it to `output`. jpayne@69: jpayne@69: private: jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: jpayne@69: OwnForkHubBase hub; jpayne@69: ForkBranchBase* next = nullptr; jpayne@69: ForkBranchBase** prevPtr = nullptr; jpayne@69: jpayne@69: friend class ForkHubBase; jpayne@69: }; jpayne@69: jpayne@69: template T copyOrAddRef(T& t) { return t; } jpayne@69: template Own copyOrAddRef(Own& t) { return t->addRef(); } jpayne@69: template Maybe> copyOrAddRef(Maybe>& t) { jpayne@69: return t.map([](Own& ptr) { jpayne@69: return ptr->addRef(); jpayne@69: }); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: class ForkBranch final: public ForkBranchBase { jpayne@69: // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives jpayne@69: // a const reference. jpayne@69: jpayne@69: public: jpayne@69: ForkBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: ExceptionOr& hubResult = getHubResultRef().template as(); jpayne@69: KJ_IF_MAYBE(value, hubResult.value) { jpayne@69: output.as().value = copyOrAddRef(*value); jpayne@69: } else { jpayne@69: output.as().value = nullptr; jpayne@69: } jpayne@69: output.exception = hubResult.exception; jpayne@69: releaseHub(output); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class SplitBranch final: public ForkBranchBase { jpayne@69: // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives jpayne@69: // a const reference. jpayne@69: jpayne@69: public: jpayne@69: SplitBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: typedef kj::Decay(kj::instance()))> Element; jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: ExceptionOr& hubResult = getHubResultRef().template as(); jpayne@69: KJ_IF_MAYBE(value, hubResult.value) { jpayne@69: output.as().value = kj::mv(kj::get(*value)); jpayne@69: } else { jpayne@69: output.as().value = nullptr; jpayne@69: } jpayne@69: output.exception = hubResult.exception; jpayne@69: releaseHub(output); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class ForkHubBase: public PromiseArenaMember, protected Event { jpayne@69: public: jpayne@69: ForkHubBase(OwnPromiseNode&& inner, ExceptionOrValue& resultRef, SourceLocation location); jpayne@69: jpayne@69: inline ExceptionOrValue& getResultRef() { return resultRef; } jpayne@69: jpayne@69: inline bool isShared() const { return refcount > 1; } jpayne@69: jpayne@69: Own addRef() { jpayne@69: ++refcount; jpayne@69: return Own(this); jpayne@69: } jpayne@69: jpayne@69: static void dispose(ForkHubBase* obj) { jpayne@69: if (--obj->refcount == 0) { jpayne@69: PromiseDisposer::dispose(obj); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: uint refcount = 1; jpayne@69: // We manually implement refcounting for ForkHubBase so that we can use it together with jpayne@69: // PromiseDisposer's arena allocation. jpayne@69: jpayne@69: OwnPromiseNode inner; jpayne@69: ExceptionOrValue& resultRef; jpayne@69: jpayne@69: ForkBranchBase* headBranch = nullptr; jpayne@69: ForkBranchBase** tailBranch = &headBranch; jpayne@69: // Tail becomes null once the inner promise is ready and all branches have been notified. jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: jpayne@69: friend class ForkBranchBase; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class ForkHub final: public ForkHubBase { jpayne@69: // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces jpayne@69: // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if jpayne@69: // possible). jpayne@69: jpayne@69: public: jpayne@69: ForkHub(OwnPromiseNode&& inner, SourceLocation location) jpayne@69: : ForkHubBase(kj::mv(inner), result, location) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: Promise<_::UnfixVoid> addBranch() { jpayne@69: return _::PromiseNode::to>>( jpayne@69: allocPromise>(addRef())); jpayne@69: } jpayne@69: jpayne@69: _::SplitTuplePromise split(SourceLocation location) { jpayne@69: return splitImpl(MakeIndexes()>(), location); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: ExceptionOr result; jpayne@69: jpayne@69: template jpayne@69: _::SplitTuplePromise splitImpl(Indexes, SourceLocation location) { jpayne@69: return kj::tuple(addSplit(location)...); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: ReducePromises::Element> addSplit(SourceLocation location) { jpayne@69: return _::PromiseNode::to::Element>>( jpayne@69: maybeChain(allocPromise>(addRef()), jpayne@69: implicitCast::Element*>(nullptr), jpayne@69: location)); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { jpayne@69: return hub->getResultRef(); jpayne@69: } jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class ChainPromiseNode final: public PromiseNode, public Event { jpayne@69: // Promise node which reduces Promise> to Promise. jpayne@69: // jpayne@69: // `Event` is only a public base class because otherwise we can't cast Own to jpayne@69: // Own. Ugh, templates and private... jpayne@69: jpayne@69: public: jpayne@69: explicit ChainPromiseNode(OwnPromiseNode inner, SourceLocation location); jpayne@69: ~ChainPromiseNode() noexcept(false); jpayne@69: void destroy() override; jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void setSelfPointer(OwnPromiseNode* selfPtr) noexcept override; jpayne@69: void get(ExceptionOrValue& output) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: private: jpayne@69: enum State { jpayne@69: STEP1, jpayne@69: STEP2 jpayne@69: }; jpayne@69: jpayne@69: State state; jpayne@69: jpayne@69: OwnPromiseNode inner; jpayne@69: // In STEP1, a PromiseNode for a Promise. jpayne@69: // In STEP2, a PromiseNode for a T. jpayne@69: jpayne@69: Event* onReadyEvent = nullptr; jpayne@69: OwnPromiseNode* selfPtr = nullptr; jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: OwnPromiseNode maybeChain(OwnPromiseNode&& node, Promise*, SourceLocation location) { jpayne@69: return appendPromise(kj::mv(node), location); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: OwnPromiseNode&& maybeChain(OwnPromiseNode&& node, T*, SourceLocation location) { jpayne@69: return kj::mv(node); jpayne@69: } jpayne@69: jpayne@69: template >()))> jpayne@69: inline Result maybeReduce(Promise&& promise, bool) { jpayne@69: return T::reducePromise(kj::mv(promise)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline Promise maybeReduce(Promise&& promise, ...) { jpayne@69: return kj::mv(promise); jpayne@69: } jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class ExclusiveJoinPromiseNode final: public PromiseNode { jpayne@69: public: jpayne@69: ExclusiveJoinPromiseNode(OwnPromiseNode left, OwnPromiseNode right, SourceLocation location); jpayne@69: ~ExclusiveJoinPromiseNode() noexcept(false); jpayne@69: void destroy() override; jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void get(ExceptionOrValue& output) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: private: jpayne@69: class Branch: public Event { jpayne@69: public: jpayne@69: Branch(ExclusiveJoinPromiseNode& joinNode, OwnPromiseNode dependency, jpayne@69: SourceLocation location); jpayne@69: ~Branch() noexcept(false); jpayne@69: jpayne@69: bool get(ExceptionOrValue& output); jpayne@69: // Returns true if this is the side that finished. jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: jpayne@69: private: jpayne@69: ExclusiveJoinPromiseNode& joinNode; jpayne@69: OwnPromiseNode dependency; jpayne@69: jpayne@69: friend class ExclusiveJoinPromiseNode; jpayne@69: }; jpayne@69: jpayne@69: Branch left; jpayne@69: Branch right; jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: enum class ArrayJoinBehavior { jpayne@69: LAZY, jpayne@69: EAGER, jpayne@69: }; jpayne@69: jpayne@69: class ArrayJoinPromiseNodeBase: public PromiseNode { jpayne@69: public: jpayne@69: ArrayJoinPromiseNodeBase(Array promises, jpayne@69: ExceptionOrValue* resultParts, size_t partSize, jpayne@69: SourceLocation location, jpayne@69: ArrayJoinBehavior joinBehavior); jpayne@69: ~ArrayJoinPromiseNodeBase() noexcept(false); jpayne@69: jpayne@69: void onReady(Event* event) noexcept override final; jpayne@69: void get(ExceptionOrValue& output) noexcept override final; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final; jpayne@69: jpayne@69: protected: jpayne@69: virtual void getNoError(ExceptionOrValue& output) noexcept = 0; jpayne@69: // Called to compile the result only in the case where there were no errors. jpayne@69: jpayne@69: private: jpayne@69: const ArrayJoinBehavior joinBehavior; jpayne@69: jpayne@69: uint countLeft; jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: bool armed = false; jpayne@69: jpayne@69: class Branch final: public Event { jpayne@69: public: jpayne@69: Branch(ArrayJoinPromiseNodeBase& joinNode, OwnPromiseNode dependency, jpayne@69: ExceptionOrValue& output, SourceLocation location); jpayne@69: ~Branch() noexcept(false); jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: jpayne@69: private: jpayne@69: ArrayJoinPromiseNodeBase& joinNode; jpayne@69: OwnPromiseNode dependency; jpayne@69: ExceptionOrValue& output; jpayne@69: jpayne@69: friend class ArrayJoinPromiseNodeBase; jpayne@69: }; jpayne@69: jpayne@69: Array branches; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase { jpayne@69: public: jpayne@69: ArrayJoinPromiseNode(Array promises, jpayne@69: Array> resultParts, jpayne@69: SourceLocation location, jpayne@69: ArrayJoinBehavior joinBehavior) jpayne@69: : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr), jpayne@69: location, joinBehavior), jpayne@69: resultParts(kj::mv(resultParts)) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: protected: jpayne@69: void getNoError(ExceptionOrValue& output) noexcept override { jpayne@69: auto builder = heapArrayBuilder(resultParts.size()); jpayne@69: for (auto& part: resultParts) { jpayne@69: KJ_IASSERT(part.value != nullptr, jpayne@69: "Bug in KJ promise framework: Promise result had neither value no exception."); jpayne@69: builder.add(kj::mv(*_::readMaybe(part.value))); jpayne@69: } jpayne@69: output.as>() = builder.finish(); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Array> resultParts; jpayne@69: }; jpayne@69: jpayne@69: template <> jpayne@69: class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase { jpayne@69: public: jpayne@69: ArrayJoinPromiseNode(Array promises, jpayne@69: Array> resultParts, jpayne@69: SourceLocation location, jpayne@69: ArrayJoinBehavior joinBehavior); jpayne@69: ~ArrayJoinPromiseNode(); jpayne@69: void destroy() override; jpayne@69: jpayne@69: protected: jpayne@69: void getNoError(ExceptionOrValue& output) noexcept override; jpayne@69: jpayne@69: private: jpayne@69: Array> resultParts; jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class EagerPromiseNodeBase: public PromiseNode, protected Event { jpayne@69: // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly jpayne@69: // evaluate it. jpayne@69: jpayne@69: public: jpayne@69: EagerPromiseNodeBase(OwnPromiseNode&& dependency, ExceptionOrValue& resultRef, jpayne@69: SourceLocation location); jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: private: jpayne@69: OwnPromiseNode dependency; jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: jpayne@69: ExceptionOrValue& resultRef; jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class EagerPromiseNode final: public EagerPromiseNodeBase { jpayne@69: public: jpayne@69: EagerPromiseNode(OwnPromiseNode&& dependency, SourceLocation location) jpayne@69: : EagerPromiseNodeBase(kj::mv(dependency), result, location) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: ExceptionOr result; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: OwnPromiseNode spark(OwnPromiseNode&& node, SourceLocation location) { jpayne@69: // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting jpayne@69: // on it. jpayne@69: return appendPromise>(kj::mv(node), location); jpayne@69: } jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class AdapterPromiseNodeBase: public PromiseNode { jpayne@69: public: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: protected: jpayne@69: inline void setReady() { jpayne@69: onReadyEvent.arm(); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class AdapterPromiseNode final: public AdapterPromiseNodeBase, jpayne@69: private PromiseFulfiller> { jpayne@69: // A PromiseNode that wraps a PromiseAdapter. jpayne@69: jpayne@69: public: jpayne@69: template jpayne@69: AdapterPromiseNode(Params&&... params) jpayne@69: : adapter(static_cast>&>(*this), kj::fwd(params)...) {} jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: KJ_IREQUIRE(!isWaiting()); jpayne@69: output.as() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: ExceptionOr result; jpayne@69: bool waiting = true; jpayne@69: Adapter adapter; jpayne@69: jpayne@69: void fulfill(T&& value) override { jpayne@69: if (waiting) { jpayne@69: waiting = false; jpayne@69: result = ExceptionOr(kj::mv(value)); jpayne@69: setReady(); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: void reject(Exception&& exception) override { jpayne@69: if (waiting) { jpayne@69: waiting = false; jpayne@69: result = ExceptionOr(false, kj::mv(exception)); jpayne@69: setReady(); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: bool isWaiting() override { jpayne@69: return waiting; jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: // ------------------------------------------------------------------- jpayne@69: jpayne@69: class FiberBase: public PromiseNode, private Event { jpayne@69: // Base class for the outer PromiseNode representing a fiber. jpayne@69: jpayne@69: public: jpayne@69: explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location); jpayne@69: explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location); jpayne@69: ~FiberBase() noexcept(false); jpayne@69: jpayne@69: void start() { armDepthFirst(); } jpayne@69: // Call immediately after construction to begin executing the fiber. jpayne@69: jpayne@69: class WaitDoneEvent; jpayne@69: jpayne@69: void onReady(_::Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: protected: jpayne@69: bool isFinished() { return state == FINISHED; } jpayne@69: void cancel(); jpayne@69: jpayne@69: private: jpayne@69: enum { WAITING, RUNNING, CANCELED, FINISHED } state; jpayne@69: jpayne@69: _::PromiseNode* currentInner = nullptr; jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: Own stack; jpayne@69: _::ExceptionOrValue& result; jpayne@69: jpayne@69: void run(); jpayne@69: virtual void runImpl(WaitScope& waitScope) = 0; jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: // Implements Event. Each time the event is fired, switchToFiber() is called. jpayne@69: jpayne@69: friend class FiberStack; jpayne@69: friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result, jpayne@69: WaitScope& waitScope, SourceLocation location); jpayne@69: friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class Fiber final: public FiberBase { jpayne@69: public: jpayne@69: explicit Fiber(size_t stackSize, Func&& func, SourceLocation location) jpayne@69: : FiberBase(stackSize, result, location), func(kj::fwd(func)) {} jpayne@69: explicit Fiber(const FiberPool& pool, Func&& func, SourceLocation location) jpayne@69: : FiberBase(pool, result, location), func(kj::fwd(func)) {} jpayne@69: ~Fiber() noexcept(false) { cancel(); } jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: typedef FixVoid()(kj::instance()))> ResultType; jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: KJ_IREQUIRE(isFinished()); jpayne@69: output.as() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Func func; jpayne@69: ExceptionOr result; jpayne@69: jpayne@69: void runImpl(WaitScope& waitScope) override { jpayne@69: result.template as() = jpayne@69: MaybeVoidCaller::apply(func, waitScope); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: } // namespace _ (private) jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: jpayne@69: template jpayne@69: Promise::Promise(_::FixVoid value) jpayne@69: : PromiseBase(_::allocPromise<_::ImmediatePromiseNode<_::FixVoid>>(kj::mv(value))) {} jpayne@69: jpayne@69: template jpayne@69: Promise::Promise(kj::Exception&& exception) jpayne@69: : PromiseBase(_::allocPromise<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {} jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: PromiseForResult Promise::then(Func&& func, ErrorFunc&& errorHandler, jpayne@69: SourceLocation location) { jpayne@69: typedef _::FixVoid<_::ReturnType> ResultT; jpayne@69: jpayne@69: void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid&&>::apply(func); jpayne@69: _::OwnPromiseNode intermediate = jpayne@69: _::appendPromise<_::TransformPromiseNode, Func, ErrorFunc>>( jpayne@69: kj::mv(node), kj::fwd(func), kj::fwd(errorHandler), jpayne@69: continuationTracePtr); jpayne@69: auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType>>( jpayne@69: _::maybeChain(kj::mv(intermediate), implicitCast(nullptr), location)); jpayne@69: return _::maybeReduce(kj::mv(result), false); jpayne@69: } jpayne@69: jpayne@69: namespace _ { // private jpayne@69: jpayne@69: template jpayne@69: struct IdentityFunc { jpayne@69: inline T operator()(T&& value) const { jpayne@69: return kj::mv(value); jpayne@69: } jpayne@69: }; jpayne@69: template jpayne@69: struct IdentityFunc> { jpayne@69: inline Promise operator()(T&& value) const { jpayne@69: return kj::mv(value); jpayne@69: } jpayne@69: }; jpayne@69: template <> jpayne@69: struct IdentityFunc { jpayne@69: inline void operator()() const {} jpayne@69: }; jpayne@69: template <> jpayne@69: struct IdentityFunc> { jpayne@69: Promise operator()() const; jpayne@69: // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly, jpayne@69: // Cap'n Proto relies on being able to include this header without creating such a link-time jpayne@69: // dependency. jpayne@69: }; jpayne@69: jpayne@69: } // namespace _ (private) jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: Promise Promise::catch_(ErrorFunc&& errorHandler, SourceLocation location) { jpayne@69: // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case, jpayne@69: // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise, jpayne@69: // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually jpayne@69: // return a promise. So we make our Func return match ErrorFunc. jpayne@69: typedef _::IdentityFunc()))> Func; jpayne@69: typedef _::FixVoid<_::ReturnType> ResultT; jpayne@69: jpayne@69: // The reason catch_() isn't simply implemented in terms of then() is because we want the trace jpayne@69: // pointer to be based on ErrorFunc rather than Func. jpayne@69: void* continuationTracePtr = _::GetFunctorStartAddress::apply(errorHandler); jpayne@69: _::OwnPromiseNode intermediate = jpayne@69: _::appendPromise<_::TransformPromiseNode, Func, ErrorFunc>>( jpayne@69: kj::mv(node), Func(), kj::fwd(errorHandler), continuationTracePtr); jpayne@69: auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType>>( jpayne@69: _::maybeChain(kj::mv(intermediate), implicitCast(nullptr), location)); jpayne@69: return _::maybeReduce(kj::mv(result), false); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: T Promise::wait(WaitScope& waitScope, SourceLocation location) { jpayne@69: _::ExceptionOr<_::FixVoid> result; jpayne@69: _::waitImpl(kj::mv(node), result, waitScope, location); jpayne@69: return convertToReturn(kj::mv(result)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: bool Promise::poll(WaitScope& waitScope, SourceLocation location) { jpayne@69: return _::pollImpl(*node, waitScope, location); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: ForkedPromise Promise::fork(SourceLocation location) { jpayne@69: return ForkedPromise(false, jpayne@69: _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid>, _::ForkHubBase>(kj::mv(node), location)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: Promise ForkedPromise::addBranch() { jpayne@69: return hub->addBranch(); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: bool ForkedPromise::hasBranches() { jpayne@69: return hub->isShared(); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: _::SplitTuplePromise Promise::split(SourceLocation location) { jpayne@69: return _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid>, _::ForkHubBase>( jpayne@69: kj::mv(node), location)->split(location); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: Promise Promise::exclusiveJoin(Promise&& other, SourceLocation location) { jpayne@69: return Promise(false, _::appendPromise<_::ExclusiveJoinPromiseNode>( jpayne@69: kj::mv(node), kj::mv(other.node), location)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: Promise Promise::attach(Attachments&&... attachments) { jpayne@69: return Promise(false, _::appendPromise<_::AttachmentPromiseNode>>( jpayne@69: kj::mv(node), kj::tuple(kj::fwd(attachments)...))); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: Promise Promise::eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location) { jpayne@69: // See catch_() for commentary. jpayne@69: return Promise(false, _::spark<_::FixVoid>(then( jpayne@69: _::IdentityFunc()))>(), jpayne@69: kj::fwd(errorHandler)).node, location)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: Promise Promise::eagerlyEvaluate(decltype(nullptr), SourceLocation location) { jpayne@69: return Promise(false, _::spark<_::FixVoid>(kj::mv(node), location)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: kj::String Promise::trace() { jpayne@69: return PromiseBase::trace(); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline Promise constPromise() { jpayne@69: static _::ConstPromiseNode NODE; jpayne@69: return _::PromiseNode::to>(_::OwnPromiseNode(&NODE)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline PromiseForResult evalLater(Func&& func) { jpayne@69: return _::yield().then(kj::fwd(func), _::PropagateException()); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline PromiseForResult evalLast(Func&& func) { jpayne@69: return _::yieldHarder().then(kj::fwd(func), _::PropagateException()); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline PromiseForResult evalNow(Func&& func) { jpayne@69: PromiseForResult result = nullptr; jpayne@69: KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() { jpayne@69: result = func(); jpayne@69: })) { jpayne@69: result = kj::mv(*e); jpayne@69: } jpayne@69: return result; jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: struct RetryOnDisconnect_ { jpayne@69: static inline PromiseForResult apply(Func&& func) { jpayne@69: return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult { jpayne@69: auto promise = evalNow(func); jpayne@69: return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult { jpayne@69: if (e.getType() == kj::Exception::Type::DISCONNECTED) { jpayne@69: return func(); jpayne@69: } else { jpayne@69: return kj::mv(e); jpayne@69: } jpayne@69: }); jpayne@69: }); jpayne@69: } jpayne@69: }; jpayne@69: template jpayne@69: struct RetryOnDisconnect_ { jpayne@69: // Specialization for references. Needed because the syntax for capturing references in a jpayne@69: // lambda is different. :( jpayne@69: static inline PromiseForResult apply(Func& func) { jpayne@69: auto promise = evalLater(func); jpayne@69: return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult { jpayne@69: if (e.getType() == kj::Exception::Type::DISCONNECTED) { jpayne@69: return func(); jpayne@69: } else { jpayne@69: return kj::mv(e); jpayne@69: } jpayne@69: }); jpayne@69: } jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: inline PromiseForResult retryOnDisconnect(Func&& func) { jpayne@69: return RetryOnDisconnect_::apply(kj::fwd(func)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline PromiseForResult startFiber( jpayne@69: size_t stackSize, Func&& func, SourceLocation location) { jpayne@69: typedef _::FixVoid<_::ReturnType> ResultT; jpayne@69: jpayne@69: auto intermediate = _::allocPromise<_::Fiber>( jpayne@69: stackSize, kj::fwd(func), location); jpayne@69: intermediate->start(); jpayne@69: auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType>>( jpayne@69: _::maybeChain(kj::mv(intermediate), implicitCast(nullptr), location)); jpayne@69: return _::maybeReduce(kj::mv(result), false); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: inline PromiseForResult FiberPool::startFiber( jpayne@69: Func&& func, SourceLocation location) const { jpayne@69: typedef _::FixVoid<_::ReturnType> ResultT; jpayne@69: jpayne@69: auto intermediate = _::allocPromise<_::Fiber>( jpayne@69: *this, kj::fwd(func), location); jpayne@69: intermediate->start(); jpayne@69: auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType>>( jpayne@69: _::maybeChain(kj::mv(intermediate), implicitCast(nullptr), location)); jpayne@69: return _::maybeReduce(kj::mv(result), false); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: void Promise::detach(ErrorFunc&& errorHandler) { jpayne@69: return _::detach(then([](T&&) {}, kj::fwd(errorHandler))); jpayne@69: } jpayne@69: jpayne@69: template <> jpayne@69: template jpayne@69: void Promise::detach(ErrorFunc&& errorHandler) { jpayne@69: return _::detach(then([]() {}, kj::fwd(errorHandler))); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: Promise> joinPromises(Array>&& promises, SourceLocation location) { jpayne@69: return _::PromiseNode::to>>(_::allocPromise<_::ArrayJoinPromiseNode>( jpayne@69: KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, jpayne@69: heapArray<_::ExceptionOr>(promises.size()), location, jpayne@69: _::ArrayJoinBehavior::LAZY)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: Promise> joinPromisesFailFast(Array>&& promises, SourceLocation location) { jpayne@69: return _::PromiseNode::to>>(_::allocPromise<_::ArrayJoinPromiseNode>( jpayne@69: KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, jpayne@69: heapArray<_::ExceptionOr>(promises.size()), location, jpayne@69: _::ArrayJoinBehavior::EAGER)); jpayne@69: } jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: jpayne@69: namespace _ { // private jpayne@69: jpayne@69: class WeakFulfillerBase: protected kj::Disposer { jpayne@69: protected: jpayne@69: WeakFulfillerBase(): inner(nullptr) {} jpayne@69: virtual ~WeakFulfillerBase() noexcept(false) {} jpayne@69: jpayne@69: template jpayne@69: inline PromiseFulfiller* getInner() { jpayne@69: return static_cast*>(inner); jpayne@69: }; jpayne@69: template jpayne@69: inline void setInner(PromiseFulfiller* ptr) { jpayne@69: inner = ptr; jpayne@69: }; jpayne@69: jpayne@69: private: jpayne@69: mutable PromiseRejector* inner; jpayne@69: jpayne@69: void disposeImpl(void* pointer) const override; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class WeakFulfiller final: public PromiseFulfiller, public WeakFulfillerBase { jpayne@69: // A wrapper around PromiseFulfiller which can be detached. jpayne@69: // jpayne@69: // There are a couple non-trivialities here: jpayne@69: // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly jpayne@69: // rejected. jpayne@69: // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been jpayne@69: // detached from the underlying fulfiller, because otherwise the later detach() call will go jpayne@69: // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the jpayne@69: // refcount never goes over 2 and we manually implement the refcounting because we need to do jpayne@69: // other special things when each side detaches anyway. To this end, WeakFulfiller is its own jpayne@69: // Disposer -- dispose() is called when the application discards its owned pointer to the jpayne@69: // fulfiller and detach() is called when the promise is destroyed. jpayne@69: jpayne@69: public: jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(WeakFulfiller); jpayne@69: jpayne@69: static kj::Own make() { jpayne@69: WeakFulfiller* ptr = new WeakFulfiller; jpayne@69: return Own(ptr, *ptr); jpayne@69: } jpayne@69: jpayne@69: void fulfill(FixVoid&& value) override { jpayne@69: if (getInner() != nullptr) { jpayne@69: getInner()->fulfill(kj::mv(value)); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: void reject(Exception&& exception) override { jpayne@69: if (getInner() != nullptr) { jpayne@69: getInner()->reject(kj::mv(exception)); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: bool isWaiting() override { jpayne@69: return getInner() != nullptr && getInner()->isWaiting(); jpayne@69: } jpayne@69: jpayne@69: void attach(PromiseFulfiller& newInner) { jpayne@69: setInner(&newInner); jpayne@69: } jpayne@69: jpayne@69: void detach(PromiseFulfiller& from) { jpayne@69: if (getInner() == nullptr) { jpayne@69: // Already disposed. jpayne@69: delete this; jpayne@69: } else { jpayne@69: KJ_IREQUIRE(getInner() == &from); jpayne@69: setInner(nullptr); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: WeakFulfiller() {} jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class PromiseAndFulfillerAdapter { jpayne@69: public: jpayne@69: PromiseAndFulfillerAdapter(PromiseFulfiller& fulfiller, jpayne@69: WeakFulfiller& wrapper) jpayne@69: : fulfiller(fulfiller), wrapper(wrapper) { jpayne@69: wrapper.attach(fulfiller); jpayne@69: } jpayne@69: jpayne@69: ~PromiseAndFulfillerAdapter() noexcept(false) { jpayne@69: wrapper.detach(fulfiller); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: PromiseFulfiller& fulfiller; jpayne@69: WeakFulfiller& wrapper; jpayne@69: }; jpayne@69: jpayne@69: } // namespace _ (private) jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: bool PromiseFulfiller::rejectIfThrows(Func&& func) { jpayne@69: KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { jpayne@69: reject(kj::mv(*exception)); jpayne@69: return false; jpayne@69: } else { jpayne@69: return true; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: bool PromiseFulfiller::rejectIfThrows(Func&& func) { jpayne@69: KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { jpayne@69: reject(kj::mv(*exception)); jpayne@69: return false; jpayne@69: } else { jpayne@69: return true; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: _::ReducePromises newAdaptedPromise(Params&&... adapterConstructorParams) { jpayne@69: _::OwnPromiseNode intermediate( jpayne@69: _::allocPromise<_::AdapterPromiseNode<_::FixVoid, Adapter>>( jpayne@69: kj::fwd(adapterConstructorParams)...)); jpayne@69: // We can't capture SourceLocation in this function's arguments since it is a vararg template. :( jpayne@69: return _::PromiseNode::to<_::ReducePromises>( jpayne@69: _::maybeChain(kj::mv(intermediate), implicitCast(nullptr), SourceLocation())); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: PromiseFulfillerPair newPromiseAndFulfiller(SourceLocation location) { jpayne@69: auto wrapper = _::WeakFulfiller::make(); jpayne@69: jpayne@69: _::OwnPromiseNode intermediate( jpayne@69: _::allocPromise<_::AdapterPromiseNode< jpayne@69: _::FixVoid, _::PromiseAndFulfillerAdapter>>(*wrapper)); jpayne@69: auto promise = _::PromiseNode::to<_::ReducePromises>( jpayne@69: _::maybeChain(kj::mv(intermediate), implicitCast(nullptr), location)); jpayne@69: jpayne@69: return PromiseFulfillerPair { kj::mv(promise), kj::mv(wrapper) }; jpayne@69: } jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // cross-thread stuff jpayne@69: jpayne@69: namespace _ { // (private) jpayne@69: jpayne@69: class XThreadEvent: public PromiseNode, // it's a PromiseNode in the requesting thread jpayne@69: private Event { // it's an event in the target thread jpayne@69: public: jpayne@69: XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, EventLoop& loop, jpayne@69: void* funcTracePtr, SourceLocation location); jpayne@69: jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: protected: jpayne@69: void ensureDoneOrCanceled(); jpayne@69: // MUST be called in destructor of subclasses to make sure the object is not destroyed while jpayne@69: // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because jpayne@69: // that destructor doesn't run until the subclass has already been destroyed.) jpayne@69: jpayne@69: virtual kj::Maybe execute() = 0; jpayne@69: // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise jpayne@69: // returns null. jpayne@69: jpayne@69: // implements PromiseNode ---------------------------------------------------- jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: jpayne@69: private: jpayne@69: ExceptionOrValue& result; jpayne@69: void* funcTracePtr; jpayne@69: jpayne@69: kj::Own targetExecutor; jpayne@69: Maybe replyExecutor; // If executeAsync() was used. jpayne@69: jpayne@69: kj::Maybe promiseNode; jpayne@69: // Accessed only in target thread. jpayne@69: jpayne@69: ListLink targetLink; jpayne@69: // Membership in one of the linked lists in the target Executor's work list or cancel list. These jpayne@69: // fields are protected by the target Executor's mutex. jpayne@69: jpayne@69: enum { jpayne@69: UNUSED, jpayne@69: // Object was never queued on another thread. jpayne@69: jpayne@69: QUEUED, jpayne@69: // Target thread has not yet dequeued the event from the state.start list. The requesting jpayne@69: // thread can cancel execution by removing the event from the list. jpayne@69: jpayne@69: EXECUTING, jpayne@69: // Target thread has dequeued the event from state.start and moved it to state.executing. To jpayne@69: // cancel, the requesting thread must add the event to the state.cancel list and change the jpayne@69: // state to CANCELING. jpayne@69: jpayne@69: CANCELING, jpayne@69: // Requesting thread is trying to cancel this event. The target thread will change the state to jpayne@69: // `DONE` once canceled. jpayne@69: jpayne@69: DONE jpayne@69: // Target thread has completed handling this event and will not touch it again. The requesting jpayne@69: // thread can safely delete the object. The `state` is updated to `DONE` using an atomic jpayne@69: // release operation after ensuring that the event will not be touched again, so that the jpayne@69: // requesting can safely skip locking if it observes the state is already DONE. jpayne@69: } state = UNUSED; jpayne@69: // State, which is also protected by `targetExecutor`'s mutex. jpayne@69: jpayne@69: ListLink replyLink; jpayne@69: // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The jpayne@69: // executing thread places the event in the reply list near the end of the `EXECUTING` state. jpayne@69: // Because the thread cannot lock two mutexes at once, it's possible that the reply executor jpayne@69: // will receive the reply while the event is still listed in the EXECUTING state, but it can jpayne@69: // ignore the state and proceed with the result. jpayne@69: jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: // Accessed only in requesting thread. jpayne@69: jpayne@69: friend class kj::Executor; jpayne@69: jpayne@69: void done(); jpayne@69: // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT jpayne@69: // call under lock. jpayne@69: jpayne@69: void sendReply(); jpayne@69: // Notifies the originating thread that this event is done, but doesn't set the state to DONE jpayne@69: // yet. Do NOT call under lock. jpayne@69: jpayne@69: void setDoneState(); jpayne@69: // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must jpayne@69: // only be called in the destination thread, and must either be called under lock, or the thread jpayne@69: // must take the lock and release it again shortly after setting the state (because some threads jpayne@69: // may be waiting on the DONE state using a conditional wait on the mutex). After calling jpayne@69: // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs jpayne@69: // solely to the requesting thread. jpayne@69: jpayne@69: void setDisconnected(); jpayne@69: // Sets the result to a DISCONNECTED exception indicating that the target event loop exited. jpayne@69: jpayne@69: class DelayedDoneHack; jpayne@69: jpayne@69: // implements Event ---------------------------------------------------------- jpayne@69: Maybe> fire() override; jpayne@69: // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr, jpayne@69: // then it just indicated readiness and we need to get its result. jpayne@69: jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: }; jpayne@69: jpayne@69: template >> jpayne@69: class XThreadEventImpl final: public XThreadEvent { jpayne@69: // Implementation for a function that does not return a Promise. jpayne@69: public: jpayne@69: XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location) jpayne@69: : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location), jpayne@69: func(kj::fwd(func)) {} jpayne@69: ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: typedef _::FixVoid<_::ReturnType> ResultT; jpayne@69: jpayne@69: kj::Maybe<_::OwnPromiseNode> execute() override { jpayne@69: result.value = MaybeVoidCaller>::apply(func, Void()); jpayne@69: return nullptr; jpayne@69: } jpayne@69: jpayne@69: // implements PromiseNode ---------------------------------------------------- jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Func func; jpayne@69: ExceptionOr result; jpayne@69: friend Executor; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class XThreadEventImpl> final: public XThreadEvent { jpayne@69: // Implementation for a function that DOES return a Promise. jpayne@69: public: jpayne@69: XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location) jpayne@69: : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location), jpayne@69: func(kj::fwd(func)) {} jpayne@69: ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } jpayne@69: void destroy() override { freePromise(this); } jpayne@69: jpayne@69: typedef _::FixVoid<_::UnwrapPromise>> ResultT; jpayne@69: jpayne@69: kj::Maybe<_::OwnPromiseNode> execute() override { jpayne@69: auto result = _::PromiseNode::from(func()); jpayne@69: KJ_IREQUIRE(result.get() != nullptr); jpayne@69: return kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: // implements PromiseNode ---------------------------------------------------- jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: Func func; jpayne@69: ExceptionOr result; jpayne@69: friend Executor; jpayne@69: }; jpayne@69: jpayne@69: } // namespace _ (private) jpayne@69: jpayne@69: template jpayne@69: _::UnwrapPromise> Executor::executeSync( jpayne@69: Func&& func, SourceLocation location) const { jpayne@69: _::XThreadEventImpl event(kj::fwd(func), *this, getLoop(), location); jpayne@69: send(event, true); jpayne@69: return convertToReturn(kj::mv(event.result)); jpayne@69: } jpayne@69: jpayne@69: template jpayne@69: PromiseForResult Executor::executeAsync(Func&& func, SourceLocation location) const { jpayne@69: // HACK: We call getLoop() here, rather than have XThreadEvent's constructor do it, so that if it jpayne@69: // throws we don't crash due to `allocPromise()` being `noexcept`. jpayne@69: auto event = _::allocPromise<_::XThreadEventImpl>( jpayne@69: kj::fwd(func), *this, getLoop(), location); jpayne@69: send(*event, false); jpayne@69: return _::PromiseNode::to>(kj::mv(event)); jpayne@69: } jpayne@69: jpayne@69: // ----------------------------------------------------------------------------- jpayne@69: jpayne@69: namespace _ { // (private) jpayne@69: jpayne@69: template jpayne@69: class XThreadFulfiller; jpayne@69: jpayne@69: class XThreadPaf: public PromiseNode { jpayne@69: public: jpayne@69: XThreadPaf(); jpayne@69: virtual ~XThreadPaf() noexcept(false); jpayne@69: void destroy() override; jpayne@69: jpayne@69: // implements PromiseNode ---------------------------------------------------- jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: private: jpayne@69: enum { jpayne@69: WAITING, jpayne@69: // Not yet fulfilled, and the waiter is still waiting. jpayne@69: // jpayne@69: // Starting from this state, the state may transition to either FULFILLING or CANCELED jpayne@69: // using an atomic compare-and-swap. jpayne@69: jpayne@69: FULFILLING, jpayne@69: // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it jpayne@69: // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not jpayne@69: // disappear out from under it. It then fills in the result value, locks the executor mutex, jpayne@69: // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to jpayne@69: // FULFILLED, and finally unlocks the mutex. jpayne@69: // jpayne@69: // If the waiting thread tries to cancel but discovers the object in this state, then it jpayne@69: // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED. jpayne@69: // It can then delete the object. jpayne@69: jpayne@69: FULFILLED, jpayne@69: // The fulfilling thread has completed filling in the result value and inserting the object jpayne@69: // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer jpayne@69: // holds any pointers to this object. The waiting thread is responsible for deleting it. jpayne@69: jpayne@69: DISPATCHED, jpayne@69: // The object reached FULFILLED state, and then was dispatched from the waiting thread's jpayne@69: // executor's event queue. Therefore, the object is completely owned by the waiting thread with jpayne@69: // no need to lock anything. jpayne@69: jpayne@69: CANCELED jpayne@69: // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no jpayne@69: // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the jpayne@69: // object. jpayne@69: } state; jpayne@69: jpayne@69: const Executor& executor; jpayne@69: // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or jpayne@69: // `FULFILLING`. After any other state has been reached, this reference may be invalidated. jpayne@69: jpayne@69: ListLink link; jpayne@69: // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting jpayne@69: // thread's executor. In those states, these pointers are guarded by said executor's mutex. jpayne@69: jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: jpayne@69: class FulfillScope; jpayne@69: jpayne@69: static kj::Exception unfulfilledException(); jpayne@69: // Construct appropriate exception to use to reject an unfulfilled XThreadPaf. jpayne@69: jpayne@69: template jpayne@69: friend class XThreadFulfiller; jpayne@69: friend Executor; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class XThreadPafImpl final: public XThreadPaf { jpayne@69: public: jpayne@69: // implements PromiseNode ---------------------------------------------------- jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as>() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: ExceptionOr> result; jpayne@69: jpayne@69: friend class XThreadFulfiller; jpayne@69: }; jpayne@69: jpayne@69: class XThreadPaf::FulfillScope { jpayne@69: // Create on stack while setting `XThreadPafImpl::result`. jpayne@69: // jpayne@69: // This ensures that: jpayne@69: // - Only one call is carried out, even if multiple threads try to fulfill concurrently. jpayne@69: // - The waiting thread is correctly signaled. jpayne@69: public: jpayne@69: FulfillScope(XThreadPaf** pointer); jpayne@69: // Atomically nulls out *pointer and takes ownership of the pointer. jpayne@69: jpayne@69: ~FulfillScope() noexcept(false); jpayne@69: jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(FulfillScope); jpayne@69: jpayne@69: bool shouldFulfill() { return obj != nullptr; } jpayne@69: jpayne@69: template jpayne@69: XThreadPafImpl* getTarget() { return static_cast*>(obj); } jpayne@69: jpayne@69: private: jpayne@69: XThreadPaf* obj; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class XThreadFulfiller final: public CrossThreadPromiseFulfiller { jpayne@69: public: jpayne@69: XThreadFulfiller(XThreadPafImpl* target): target(target) {} jpayne@69: jpayne@69: ~XThreadFulfiller() noexcept(false) { jpayne@69: if (target != nullptr) { jpayne@69: reject(XThreadPaf::unfulfilledException()); jpayne@69: } jpayne@69: } jpayne@69: void fulfill(FixVoid&& value) const override { jpayne@69: XThreadPaf::FulfillScope scope(&target); jpayne@69: if (scope.shouldFulfill()) { jpayne@69: scope.getTarget()->result = kj::mv(value); jpayne@69: } jpayne@69: } jpayne@69: void reject(Exception&& exception) const override { jpayne@69: XThreadPaf::FulfillScope scope(&target); jpayne@69: if (scope.shouldFulfill()) { jpayne@69: scope.getTarget()->result.addException(kj::mv(exception)); jpayne@69: } jpayne@69: } jpayne@69: bool isWaiting() const override { jpayne@69: KJ_IF_MAYBE(t, target) { jpayne@69: #if _MSC_VER && !__clang__ jpayne@69: // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be? jpayne@69: return t->state == XThreadPaf::WAITING; jpayne@69: #else jpayne@69: return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING; jpayne@69: #endif jpayne@69: } else { jpayne@69: return false; jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: mutable XThreadPaf* target; // accessed using atomic ops jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class XThreadFulfiller> { jpayne@69: public: jpayne@69: static_assert(sizeof(T) < 0, jpayne@69: "newCrosssThreadPromiseAndFulfiller>() is not currently supported"); jpayne@69: // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`, jpayne@69: // then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not jpayne@69: // the waiting thread. jpayne@69: }; jpayne@69: jpayne@69: } // namespace _ (private) jpayne@69: jpayne@69: template jpayne@69: PromiseCrossThreadFulfillerPair newPromiseAndCrossThreadFulfiller() { jpayne@69: kj::Own<_::XThreadPafImpl, _::PromiseDisposer> node(new _::XThreadPafImpl); jpayne@69: auto fulfiller = kj::heap<_::XThreadFulfiller>(node); jpayne@69: return { _::PromiseNode::to<_::ReducePromises>(kj::mv(node)), kj::mv(fulfiller) }; jpayne@69: } jpayne@69: jpayne@69: } // namespace kj jpayne@69: jpayne@69: #if KJ_HAS_COROUTINE jpayne@69: jpayne@69: // ======================================================================================= jpayne@69: // Coroutines TS integration with kj::Promise. jpayne@69: // jpayne@69: // Here's a simple coroutine: jpayne@69: // jpayne@69: // Promise> connectToService(Network& n) { jpayne@69: // auto a = co_await n.parseAddress(IP, PORT); jpayne@69: // auto c = co_await a->connect(); jpayne@69: // co_return kj::mv(c); jpayne@69: // } jpayne@69: // jpayne@69: // The presence of the co_await and co_return keywords tell the compiler it is a coroutine. jpayne@69: // Although it looks similar to a function, it has a couple large differences. First, everything jpayne@69: // that would normally live in the stack frame lives instead in a heap-based coroutine frame. jpayne@69: // Second, the coroutine has the ability to return from its scope without deallocating this frame jpayne@69: // (to suspend, in other words), and the ability to resume from its last suspension point. jpayne@69: // jpayne@69: // In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a jpayne@69: // coroutine implementation type via a traits class parameterized by the coroutine return and jpayne@69: // parameter types. We'll name our coroutine implementation `kj::_::Coroutine`, jpayne@69: jpayne@69: namespace kj::_ { template class Coroutine; } jpayne@69: jpayne@69: // Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine`. jpayne@69: jpayne@69: namespace KJ_COROUTINE_STD_NAMESPACE { jpayne@69: jpayne@69: template jpayne@69: struct coroutine_traits, Args...> { jpayne@69: // `Args...` are the coroutine's parameter types. jpayne@69: jpayne@69: using promise_type = kj::_::Coroutine; jpayne@69: // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines jpayne@69: // returning `std::future`, since the coroutine implementation would be a wrapper around jpayne@69: // a `std::promise`. It's extremely confusing from a KJ perspective, however, so I call it jpayne@69: // the "coroutine implementation type" instead. jpayne@69: }; jpayne@69: jpayne@69: } // namespace KJ_COROUTINE_STD_NAMESPACE jpayne@69: jpayne@69: // Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a jpayne@69: // `coroutine_traits>, Network&>::promise_type`, or jpayne@69: // `kj::_::Coroutine>`. jpayne@69: // jpayne@69: // The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and jpayne@69: // deallocated when the frame does. jpayne@69: jpayne@69: namespace kj::_ { jpayne@69: jpayne@69: namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE; jpayne@69: jpayne@69: class CoroutineBase: public PromiseNode, jpayne@69: public Event { jpayne@69: public: jpayne@69: CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef, jpayne@69: SourceLocation location); jpayne@69: ~CoroutineBase() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(CoroutineBase); jpayne@69: void destroy() override; jpayne@69: jpayne@69: auto initial_suspend() { return stdcoro::suspend_never(); } jpayne@69: auto final_suspend() noexcept { jpayne@69: #if _MSC_VER && !defined(__clang__) jpayne@69: // See comment at `finalSuspendCalled`'s definition. jpayne@69: finalSuspendCalled = true; jpayne@69: #endif jpayne@69: return stdcoro::suspend_always(); jpayne@69: } jpayne@69: // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately jpayne@69: // after completion. jpayne@69: // jpayne@69: // The initial suspension point could allow us to defer the initial synchronous execution of a jpayne@69: // coroutine -- everything before its first co_await, that is. jpayne@69: // jpayne@69: // The final suspension point is useful to delay deallocation of the coroutine frame to match the jpayne@69: // lifetime of the enclosing promise. jpayne@69: jpayne@69: void unhandled_exception(); jpayne@69: jpayne@69: protected: jpayne@69: class AwaiterBase; jpayne@69: jpayne@69: bool isWaiting() { return waiting; } jpayne@69: void scheduleResumption() { jpayne@69: onReadyEvent.arm(); jpayne@69: waiting = false; jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: // ------------------------------------------------------- jpayne@69: // PromiseNode implementation jpayne@69: jpayne@69: void onReady(Event* event) noexcept override; jpayne@69: void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; jpayne@69: jpayne@69: // ------------------------------------------------------- jpayne@69: // Event implementation jpayne@69: jpayne@69: Maybe> fire() override; jpayne@69: void traceEvent(TraceBuilder& builder) override; jpayne@69: jpayne@69: stdcoro::coroutine_handle<> coroutine; jpayne@69: ExceptionOrValue& resultRef; jpayne@69: jpayne@69: OnReadyEvent onReadyEvent; jpayne@69: bool waiting = true; jpayne@69: jpayne@69: bool hasSuspendedAtLeastOnce = false; jpayne@69: jpayne@69: #if _MSC_VER && !defined(__clang__) jpayne@69: bool finalSuspendCalled = false; jpayne@69: // MSVC erroneously reports the coroutine as done (that is, `coroutine.done()` returns true) jpayne@69: // seemingly as soon as `return_value()`/`return_void()` are called. This matters in our jpayne@69: // implementation of `unhandled_exception()`, which must arrange to propagate exceptions during jpayne@69: // coroutine frame unwind via the returned promise, even if `return_value()`/`return_void()` have jpayne@69: // already been called. To prove that our assumptions are correct in that function, we want to be jpayne@69: // able to assert that `final_suspend()` has not yet been called. This boolean hack allows us to jpayne@69: // preserve that assertion. jpayne@69: #endif jpayne@69: jpayne@69: Maybe promiseNodeForTrace; jpayne@69: // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that jpayne@69: // promise so tracePromise()/traceEvent() can trace into it. jpayne@69: jpayne@69: UnwindDetector unwindDetector; jpayne@69: jpayne@69: struct DisposalResults { jpayne@69: bool destructorRan = false; jpayne@69: Maybe exception; jpayne@69: }; jpayne@69: Maybe maybeDisposalResults; jpayne@69: // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this jpayne@69: // to point to a DisposalResults on the stack so unhandled_exception() will have some place to jpayne@69: // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once jpayne@69: // coroutine.destroy() has returned. Our disposer then rethrows as needed. jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class CoroutineMixin; jpayne@69: // CRTP mixin, covered later. jpayne@69: jpayne@69: template jpayne@69: class Coroutine final: public CoroutineBase, jpayne@69: public CoroutineMixin, T> { jpayne@69: // The standard calls this the `promise_type` object. We can call this the "coroutine jpayne@69: // implementation object" since the word promise means different things in KJ and std styles. This jpayne@69: // is where we implement how a `kj::Promise` is returned from a coroutine, and how that promise jpayne@69: // is later fulfilled. We also fill in a few lifetime-related details. jpayne@69: // jpayne@69: // The implementation object is also where we can customize memory allocation of coroutine frames, jpayne@69: // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in jpayne@69: // coroutine_traits). jpayne@69: // jpayne@69: // We can also customize how await-expressions are transformed within `kj::Promise`-based jpayne@69: // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for jpayne@69: // which we want to implement co_await support, e.g. `kj::Promise`. This feature allows us to jpayne@69: // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the jpayne@69: // await-expression's type are both `kj::Promise` instantiations -- see further comments under jpayne@69: // `await_transform()`. jpayne@69: jpayne@69: public: jpayne@69: using Handle = stdcoro::coroutine_handle>; jpayne@69: jpayne@69: Coroutine(SourceLocation location = {}) jpayne@69: : CoroutineBase(Handle::from_promise(*this), result, location) {} jpayne@69: jpayne@69: Promise get_return_object() { jpayne@69: // Called after coroutine frame construction and before initial_suspend() to create the jpayne@69: // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for jpayne@69: // the returned Promise to own `this` via a custom Disposer and by always leaving the jpayne@69: // coroutine in a suspended state. jpayne@69: return PromiseNode::to>(OwnPromiseNode(this)); jpayne@69: } jpayne@69: jpayne@69: public: jpayne@69: template jpayne@69: class Awaiter; jpayne@69: jpayne@69: template jpayne@69: Awaiter await_transform(kj::Promise& promise) { return Awaiter(kj::mv(promise)); } jpayne@69: template jpayne@69: Awaiter await_transform(kj::Promise&& promise) { return Awaiter(kj::mv(promise)); } jpayne@69: // Called when someone writes `co_await promise`, where `promise` is a kj::Promise. We return jpayne@69: // an Awaiter, which implements coroutine suspension and resumption in terms of the KJ async jpayne@69: // event system. jpayne@69: // jpayne@69: // There is another hook we could implement: an `operator co_await()` free function. However, a jpayne@69: // free function would be unaware of the type of the enclosing coroutine. Since Awaiter is a jpayne@69: // member class template of Coroutine, it is able to implement an jpayne@69: // `await_suspend(Coroutine::Handle)` override, providing it type-safe access to our enclosing jpayne@69: // coroutine's PromiseNode. An `operator co_await()` free function would have to implement jpayne@69: // a type-erased `await_suspend(stdcoro::coroutine_handle)` override, and implement jpayne@69: // suspension and resumption in terms of .then(). Yuck! jpayne@69: jpayne@69: private: jpayne@69: // ------------------------------------------------------- jpayne@69: // PromiseNode implementation jpayne@69: jpayne@69: void get(ExceptionOrValue& output) noexcept override { jpayne@69: output.as>() = kj::mv(result); jpayne@69: } jpayne@69: jpayne@69: void fulfill(FixVoid&& value) { jpayne@69: // Called by the return_value()/return_void() functions in our mixin class. jpayne@69: jpayne@69: if (isWaiting()) { jpayne@69: result = kj::mv(value); jpayne@69: scheduleResumption(); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: ExceptionOr> result; jpayne@69: jpayne@69: friend class CoroutineMixin, T>; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class CoroutineMixin { jpayne@69: public: jpayne@69: void return_value(T value) { jpayne@69: static_cast(this)->fulfill(kj::mv(value)); jpayne@69: } jpayne@69: }; jpayne@69: template jpayne@69: class CoroutineMixin { jpayne@69: public: jpayne@69: void return_void() { jpayne@69: static_cast(this)->fulfill(_::Void()); jpayne@69: } jpayne@69: }; jpayne@69: // The Coroutines spec has no `_::FixVoid` equivalent to unify valueful and valueless co_return jpayne@69: // statements, and programs are ill-formed if the coroutine implementation object (Coroutine) has jpayne@69: // both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so jpayne@69: // these return_* functions live in a CRTP mixin. jpayne@69: jpayne@69: class CoroutineBase::AwaiterBase { jpayne@69: public: jpayne@69: explicit AwaiterBase(OwnPromiseNode node); jpayne@69: AwaiterBase(AwaiterBase&&); jpayne@69: ~AwaiterBase() noexcept(false); jpayne@69: KJ_DISALLOW_COPY(AwaiterBase); jpayne@69: jpayne@69: bool await_ready() const { return false; } jpayne@69: // This could return "`node->get()` is safe to call" instead, which would make suspension-less jpayne@69: // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that jpayne@69: // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we jpayne@69: // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable jpayne@69: // suspension-less co_awaits. jpayne@69: jpayne@69: protected: jpayne@69: void getImpl(ExceptionOrValue& result, void* awaitedAt); jpayne@69: bool awaitSuspendImpl(CoroutineBase& coroutineEvent); jpayne@69: jpayne@69: private: jpayne@69: UnwindDetector unwindDetector; jpayne@69: OwnPromiseNode node; jpayne@69: jpayne@69: Maybe maybeCoroutineEvent; jpayne@69: // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our jpayne@69: // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack jpayne@69: // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called, jpayne@69: // we need our own reference to the enclosing Coroutine. (I struggle to think up any such jpayne@69: // scenarios, but perhaps they could occur when destroying a suspended coroutine.) jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: template jpayne@69: class Coroutine::Awaiter: public AwaiterBase { jpayne@69: // Wrapper around a co_await'ed promise and some storage space for the result of that promise. jpayne@69: // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up jpayne@69: // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event jpayne@69: // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the jpayne@69: // awaited promise result. jpayne@69: jpayne@69: public: jpayne@69: explicit Awaiter(Promise promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {} jpayne@69: jpayne@69: KJ_NOINLINE U await_resume() { jpayne@69: // This is marked noinline in order to ensure __builtin_return_address() is accurate for stack jpayne@69: // trace purposes. In my experimentation, this method was not inlined anyway even in opt jpayne@69: // builds, but I want to make sure it doesn't suddenly start being inlined later causing stack jpayne@69: // traces to break. (I also tried always-inline, but this did not appear to cause the compiler jpayne@69: // to inline the method -- perhaps a limitation of coroutines?) jpayne@69: #if __GNUC__ jpayne@69: getImpl(result, __builtin_return_address(0)); jpayne@69: #elif _MSC_VER jpayne@69: getImpl(result, _ReturnAddress()); jpayne@69: #else jpayne@69: #error "please implement for your compiler" jpayne@69: #endif jpayne@69: auto value = kj::_::readMaybe(result.value); jpayne@69: KJ_IASSERT(value != nullptr, "Neither exception nor value present."); jpayne@69: return U(kj::mv(*value)); jpayne@69: } jpayne@69: jpayne@69: bool await_suspend(Coroutine::Handle coroutine) { jpayne@69: return awaitSuspendImpl(coroutine.promise()); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: ExceptionOr> result; jpayne@69: }; jpayne@69: jpayne@69: #undef KJ_COROUTINE_STD_NAMESPACE jpayne@69: jpayne@69: } // namespace kj::_ (private) jpayne@69: jpayne@69: #endif // KJ_HAS_COROUTINE jpayne@69: jpayne@69: KJ_END_HEADER