Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-inl.h @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-inl.h Tue Mar 18 17:55:14 2025 -0400 @@ -0,0 +1,2345 @@ +// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors +// Licensed under the MIT License: +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// This file contains extended inline implementation details that are required along with async.h. +// We move this all into a separate file to make async.h more readable. +// +// Non-inline declarations here are defined in async.c++. + +#pragma once + +#ifndef KJ_ASYNC_H_INCLUDED +#error "Do not include this directly; include kj/async.h." +#include "async.h" // help IDE parse this file +#endif + +#if _MSC_VER && KJ_HAS_COROUTINE +#include <intrin.h> +#endif + +#include <kj/list.h> + +KJ_BEGIN_HEADER + +namespace kj { +namespace _ { // private + +template <typename T> +class ExceptionOr; + +class ExceptionOrValue { +public: + ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {} + KJ_DISALLOW_COPY(ExceptionOrValue); + + void addException(Exception&& exception) { + if (this->exception == nullptr) { + this->exception = kj::mv(exception); + } + } + + template <typename T> + ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); } + template <typename T> + const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); } + + Maybe<Exception> exception; + +protected: + // Allow subclasses to have move constructor / assignment. + ExceptionOrValue() = default; + ExceptionOrValue(ExceptionOrValue&& other) = default; + ExceptionOrValue& operator=(ExceptionOrValue&& other) = default; +}; + +template <typename T> +class ExceptionOr: public ExceptionOrValue { +public: + ExceptionOr() = default; + ExceptionOr(T&& value): value(kj::mv(value)) {} + ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {} + ExceptionOr(ExceptionOr&&) = default; + ExceptionOr& operator=(ExceptionOr&&) = default; + + Maybe<T> value; +}; + +template <typename T> +inline T convertToReturn(ExceptionOr<T>&& result) { + KJ_IF_MAYBE(value, result.value) { + KJ_IF_MAYBE(exception, result.exception) { + throwRecoverableException(kj::mv(*exception)); + } + return _::returnMaybeVoid(kj::mv(*value)); + } else KJ_IF_MAYBE(exception, result.exception) { + throwFatalException(kj::mv(*exception)); + } else { + // Result contained neither a value nor an exception? + KJ_UNREACHABLE; + } +} + +inline void convertToReturn(ExceptionOr<Void>&& result) { + // Override <void> case to use throwRecoverableException(). + + if (result.value != nullptr) { + KJ_IF_MAYBE(exception, result.exception) { + throwRecoverableException(kj::mv(*exception)); + } + } else KJ_IF_MAYBE(exception, result.exception) { + throwRecoverableException(kj::mv(*exception)); + } else { + // Result contained neither a value nor an exception? + KJ_UNREACHABLE; + } +} + +class TraceBuilder { + // Helper for methods that build a call trace. +public: + TraceBuilder(ArrayPtr<void*> space) + : start(space.begin()), current(space.begin()), limit(space.end()) {} + + inline void add(void* addr) { + if (current < limit) { + *current++ = addr; + } + } + + inline bool full() const { return current == limit; } + + ArrayPtr<void*> finish() { + return arrayPtr(start, current); + } + + String toString(); + +private: + void** start; + void** current; + void** limit; +}; + +struct alignas(void*) PromiseArena { + // Space in which a chain of promises may be allocated. See PromiseDisposer. + byte bytes[1024]; +}; + +class Event: private AsyncObject { + // An event waiting to be executed. Not for direct use by applications -- promises use this + // internally. + +public: + Event(SourceLocation location); + Event(kj::EventLoop& loop, SourceLocation location); + ~Event() noexcept(false); + KJ_DISALLOW_COPY_AND_MOVE(Event); + + void armDepthFirst(); + // Enqueue this event so that `fire()` will be called from the event loop soon. + // + // Events scheduled in this way are executed in depth-first order: if an event callback arms + // more events, those events are placed at the front of the queue (in the order in which they + // were armed), so that they run immediately after the first event's callback returns. + // + // Depth-first event scheduling is appropriate for events that represent simple continuations + // of a previous event that should be globbed together for performance. Depth-first scheduling + // can lead to starvation, so any long-running task must occasionally yield with + // `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses + // breadth-first.) + // + // To use breadth-first scheduling instead, use `armBreadthFirst()`. + + void armBreadthFirst(); + // Like `armDepthFirst()` except that the event is placed at the end of the queue. + + void armLast(); + // Enqueues this event to happen after all other events have run to completion and there is + // really nothing left to do except wait for I/O. + + bool isNext(); + // True if the Event has been armed and is next in line to be fired. This can be used after + // calling PromiseNode::onReady(event) to determine if a promise being waited is immediately + // ready, in which case continuations may be optimistically run without returning to the event + // loop. Note that this optimization is only valid if we know that we would otherwise immediately + // return to the event loop without running more application code. So this turns out to be useful + // in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it + // doesn't need to. + // + // Returns false if the event loop is not currently running. This ensures that promise + // continuations don't execute except under a call to .wait(). + + void disarm(); + // If the event is armed but hasn't fired, cancel it. (Destroying the event does this + // implicitly.) + + virtual void traceEvent(TraceBuilder& builder) = 0; + // Build a trace of the callers leading up to this event. `builder` will be populated with + // "return addresses" of the promise chain waiting on this event. The return addresses may + // actually the addresses of lambdas passed to .then(), but in any case, feeding them into + // addr2line should produce useful source code locations. + // + // `traceEvent()` may be called from an async signal handler while `fire()` is executing. It + // must not allocate nor take locks. + + String traceEvent(); + // Helper that builds a trace and stringifies it. + +protected: + virtual Maybe<Own<Event>> fire() = 0; + // Fire the event. Possibly returns a pointer to itself, which will be discarded by the + // caller. This is the only way that an event can delete itself as a result of firing, as + // doing so from within fire() will throw an exception. + +private: + friend class kj::EventLoop; + EventLoop& loop; + Event* next; + Event** prev; + bool firing = false; + + static constexpr uint MAGIC_LIVE_VALUE = 0x1e366381u; + uint live = MAGIC_LIVE_VALUE; + SourceLocation location; +}; + +class PromiseArenaMember { + // An object that is allocated in a PromiseArena. `PromiseNode` inherits this, and most + // arena-allocated objects are `PromiseNode` subclasses, but `TaskSet::Task`, ForkHub, and + // potentially other objects that commonly live on the end of a promise chain can also leverage + // this. + +public: + virtual void destroy() = 0; + // Destroys and frees the node. + // + // If the node was allocated using allocPromise<T>(), then destroy() must call + // freePromise<T>(this). If it was allocated some other way, then it is `destroy()`'s + // responsibility to complete any necessary cleanup of memory, e.g. call `delete this`. + // + // We use this instead of a virtual destructor for two reasons: + // 1. Coroutine nodes are not independent objects, they have to call destroy() on the coroutine + // handle to delete themselves. + // 2. XThreadEvents sometimes leave it up to a different thread to actually delete the object. + +private: + PromiseArena* arena = nullptr; + // If non-null, then this PromiseNode is the last node allocated within the given arena, and + // therefore owns the arena. After this node is destroyed, the arena should be deleted. + // + // PromiseNodes are allocated within the arena starting from the end, and `PromiseNode`s + // allocated this way are required to have `PromiseNode` itself as their leftmost inherited type, + // so that the pointers match. Thus, the space in `arena` from its start to the location of the + // `PromiseNode` is known to be available for subsequent allocations (which should then take + // ownership of the arena). + + friend class PromiseDisposer; +}; + +class PromiseNode: public PromiseArenaMember, private AsyncObject { + // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations. + // + // To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky + // use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only + // so down-cast in the few places that really need to be templated. Luckily this is all + // internal implementation details. + +public: + virtual void onReady(Event* event) noexcept = 0; + // Arms the given event when ready. + // + // May be called multiple times. If called again before the event was armed, the old event will + // never be armed, only the new one. If called again after the event was armed, the new event + // will be armed immediately. Can be called with nullptr to un-register the existing event. + + virtual void setSelfPointer(OwnPromiseNode* selfPtr) noexcept; + // Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own + // this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses + // this to shorten redundant chains. The default implementation does nothing; only + // ChainPromiseNode should implement this. + + virtual void get(ExceptionOrValue& output) noexcept = 0; + // Get the result. `output` points to an ExceptionOr<T> into which the result will be written. + // Can only be called once, and only after the node is ready. Must be called directly from the + // event loop, with no application code on the stack. + + virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0; + // Build a trace of this promise chain, showing what it is currently waiting on. + // + // Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically + // recurse to its child first, then after the child returns, add itself to the trace. + // + // If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that + // also implements Event, and should not trace that node or its children. This is used in + // conjunction with Event::traceEvent(). The chain of Events is often more sparse than the chain + // of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an + // Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node + // when it is ready, and then TransformPromiseNode applies the .then() transformation during the + // call to .get(). + // + // So, when we trace the chain of Events backwards, we end up hoping over segments of + // TransformPromiseNodes (and other similar types). In order to get those added to the trace, + // each Event must call back down the PromiseNode chain in the opposite direction, using this + // method. + // + // `tracePromise()` may be called from an async signal handler while `get()` is executing. It + // must not allocate nor take locks. + + template <typename T> + static OwnPromiseNode from(T&& promise) { + // Given a Promise, extract the PromiseNode. + return kj::mv(promise.node); + } + template <typename T> + static PromiseNode& from(T& promise) { + // Given a Promise, extract the PromiseNode. + return *promise.node; + } + template <typename T> + static T to(OwnPromiseNode&& node) { + // Construct a Promise from a PromiseNode. (T should be a Promise type.) + return T(false, kj::mv(node)); + } + +protected: + class OnReadyEvent { + // Helper class for implementing onReady(). + + public: + void init(Event* newEvent); + + void arm(); + void armBreadthFirst(); + // Arms the event if init() has already been called and makes future calls to init() + // automatically arm the event. + + inline void traceEvent(TraceBuilder& builder) { + if (event != nullptr && !builder.full()) event->traceEvent(builder); + } + + private: + Event* event = nullptr; + }; +}; + +class PromiseDisposer { +public: + template <typename T> + static constexpr bool canArenaAllocate() { + // We can only use arena allocation for types that fit in an arena and have pointer-size + // alignment. Anything else will need to be allocated as a separate heap object. + return sizeof(T) <= sizeof(PromiseArena) && alignof(T) <= alignof(void*); + } + + static void dispose(PromiseArenaMember* node) { + PromiseArena* arena = node->arena; + node->destroy(); + delete arena; // reminder: `delete` automatically ignores null pointers + } + + template <typename T, typename D = PromiseDisposer, typename... Params> + static kj::Own<T, D> alloc(Params&&... params) noexcept { + // Implements allocPromise(). + T* ptr; + if (!canArenaAllocate<T>()) { + // Node too big (or needs weird alignment), fall back to regular heap allocation. + ptr = new T(kj::fwd<Params>(params)...); + } else { + // Start a new arena. + // + // NOTE: As in append() (below), we don't implement exception-safety because it causes code + // bloat and these constructors probably don't throw. Instead this function is noexcept, so + // if a constructor does throw, it'll crash rather than leak memory. + auto* arena = new PromiseArena; + ptr = reinterpret_cast<T*>(arena + 1) - 1; + ctor(*ptr, kj::fwd<Params>(params)...); + ptr->arena = arena; + KJ_IREQUIRE(reinterpret_cast<void*>(ptr) == + reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)), + "PromiseArenaMember must be the leftmost inherited type."); + } + return kj::Own<T, D>(ptr); + } + + template <typename T, typename D = PromiseDisposer, typename... Params> + static kj::Own<T, D> append( + OwnPromiseNode&& next, Params&&... params) noexcept { + // Implements appendPromise(). + + PromiseArena* arena = next->arena; + + if (!canArenaAllocate<T>() || arena == nullptr || + reinterpret_cast<byte*>(next.get()) - reinterpret_cast<byte*>(arena) < sizeof(T)) { + // No arena available, or not enough space, or weird alignment needed. Start new arena. + return alloc<T, D>(kj::mv(next), kj::fwd<Params>(params)...); + } else { + // Append to arena. + // + // NOTE: When we call ctor(), it takes ownership of `next`, so we shouldn't assume `next` + // still exists after it returns. So we have to remove ownership of the arena before that. + // In theory if we wanted this to be exception-safe, we'd also have to arrange to delete + // the arena if the constructor throws. However, in practice none of the PromiseNode + // constructors throw, so we just mark the whole method noexcept in order to avoid the + // code bloat to handle this case. + next->arena = nullptr; + T* ptr = reinterpret_cast<T*>(next.get()) - 1; + ctor(*ptr, kj::mv(next), kj::fwd<Params>(params)...); + ptr->arena = arena; + KJ_IREQUIRE(reinterpret_cast<void*>(ptr) == + reinterpret_cast<void*>(static_cast<PromiseArenaMember*>(ptr)), + "PromiseArenaMember must be the leftmost inherited type."); + return kj::Own<T, D>(ptr); + } + } +}; + +template <typename T, typename... Params> +static kj::Own<T, PromiseDisposer> allocPromise(Params&&... params) { + // Allocate a PromiseNode without appending it to any existing promise arena. Space for a new + // arena will be allocated. + return PromiseDisposer::alloc<T>(kj::fwd<Params>(params)...); +} + +template <typename T, bool arena = PromiseDisposer::canArenaAllocate<T>()> +struct FreePromiseNode; +template <typename T> +struct FreePromiseNode<T, true> { + static inline void free(T* ptr) { + // The object will have been allocated in an arena, so we only want to run the destructor. + // The arena's memory will be freed separately. + kj::dtor(*ptr); + } +}; +template <typename T> +struct FreePromiseNode<T, false> { + static inline void free(T* ptr) { + // The object will have been allocated separately on the heap. + return delete ptr; + } +}; + +template <typename T> +static void freePromise(T* ptr) { + // Free a PromiseNode originally allocated using `allocPromise<T>()`. The implementation of + // PromiseNode::destroy() must call this for any type that is allocated using allocPromise(). + FreePromiseNode<T>::free(ptr); +} + +template <typename T, typename... Params> +static kj::Own<T, PromiseDisposer> appendPromise(OwnPromiseNode&& next, Params&&... params) { + // Append a promise to the arena that currently ends with `next`. `next` is also still passed as + // the first parameter to the new object's constructor. + // + // This is semantically the same as `allocPromise()` except that it may avoid the underlying + // memory allocation. `next` must end up being destroyed before the new object (i.e. the new + // object must never transfer away ownership of `next`). + return PromiseDisposer::append<T>(kj::mv(next), kj::fwd<Params>(params)...); +} + +// ------------------------------------------------------------------- + +inline ReadyNow::operator Promise<void>() const { + return PromiseNode::to<Promise<void>>(readyNow()); +} + +template <typename T> +inline NeverDone::operator Promise<T>() const { + return PromiseNode::to<Promise<T>>(neverDone()); +} + +// ------------------------------------------------------------------- + +class ImmediatePromiseNodeBase: public PromiseNode { +public: + ImmediatePromiseNodeBase(); + ~ImmediatePromiseNodeBase() noexcept(false); + + void onReady(Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; +}; + +template <typename T> +class ImmediatePromiseNode final: public ImmediatePromiseNodeBase { + // A promise that has already been resolved to an immediate value or exception. + +public: + ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {} + void destroy() override { freePromise(this); } + + void get(ExceptionOrValue& output) noexcept override { + output.as<T>() = kj::mv(result); + } + +private: + ExceptionOr<T> result; +}; + +class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase { +public: + ImmediateBrokenPromiseNode(Exception&& exception); + void destroy() override; + + void get(ExceptionOrValue& output) noexcept override; + +private: + Exception exception; +}; + +template <typename T, T value> +class ConstPromiseNode: public ImmediatePromiseNodeBase { +public: + void destroy() override {} + void get(ExceptionOrValue& output) noexcept override { + output.as<T>() = value; + } +}; + +// ------------------------------------------------------------------- + +class AttachmentPromiseNodeBase: public PromiseNode { +public: + AttachmentPromiseNodeBase(OwnPromiseNode&& dependency); + + void onReady(Event* event) noexcept override; + void get(ExceptionOrValue& output) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +private: + OwnPromiseNode dependency; + + void dropDependency(); + + template <typename> + friend class AttachmentPromiseNode; +}; + +template <typename Attachment> +class AttachmentPromiseNode final: public AttachmentPromiseNodeBase { + // A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable + // object) until the promise resolves. + +public: + AttachmentPromiseNode(OwnPromiseNode&& dependency, Attachment&& attachment) + : AttachmentPromiseNodeBase(kj::mv(dependency)), + attachment(kj::mv<Attachment>(attachment)) {} + void destroy() override { freePromise(this); } + + ~AttachmentPromiseNode() noexcept(false) { + // We need to make sure the dependency is deleted before we delete the attachment because the + // dependency may be using the attachment. + dropDependency(); + } + +private: + Attachment attachment; +}; + +// ------------------------------------------------------------------- + +#if __GNUC__ >= 8 && !__clang__ +// GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no +// "legal" way for us to extract the content of a PTMF so too bad. +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wclass-memaccess" +#if __GNUC__ >= 11 +// GCC 11's array-bounds is similarly upset with us for digging into "private" implementation +// details. But the format is well-defined by the ABI which cannot change so please just let us +// do it kthx. +#pragma GCC diagnostic ignored "-Warray-bounds" +#endif +#endif + +template <typename T, typename ReturnType, typename... ParamTypes> +void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); +template <typename T, typename ReturnType, typename... ParamTypes> +void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); +// Given an object and a pointer-to-method, return the start address of the method's code. The +// intent is that this address can be used in a trace; addr2line should map it to the start of +// the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine +// the address of the specific implementation (otherwise, `obj` wouldn't be needed). +// +// Note that if the method is overloaded or is a template, you will need to explicitly specify +// the param and return types, otherwise the compiler won't know which overload / template +// specialization you are requesting. + +class PtmfHelper { + // This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The + // class represents the internal representation of a pointer-to-member-function. + + template <typename... ParamTypes> + friend struct GetFunctorStartAddress; + template <typename T, typename ReturnType, typename... ParamTypes> + friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)); + template <typename T, typename ReturnType, typename... ParamTypes> + friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const); + +#if __GNUG__ + + void* ptr; + ptrdiff_t adj; + // Layout of a pointer-to-member-function used by GCC and compatible compilers. + + void* apply(const void* obj) { +#if defined(__arm__) || defined(__mips__) || defined(__aarch64__) + if (adj & 1) { + ptrdiff_t voff = (ptrdiff_t)ptr; +#else + ptrdiff_t voff = (ptrdiff_t)ptr; + if (voff & 1) { + voff &= ~1; +#endif + return *(void**)(*(char**)obj + voff); + } else { + return ptr; + } + } + +#define BODY \ + PtmfHelper result; \ + static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \ + memcpy(&result, &p, sizeof(result)); \ + return result + +#else // __GNUG__ + + void* apply(const void* obj) { return nullptr; } + // TODO(port): PTMF instruction address extraction + +#define BODY return PtmfHelper{} + +#endif // __GNUG__, else + + template <typename R, typename C, typename... P, typename F> + static PtmfHelper from(F p) { BODY; } + // Create a PtmfHelper from some arbitrary pointer-to-member-function which is not + // overloaded nor a template. In this case the compiler is able to deduce the full function + // signature directly given the name since there is only one function with that name. + + template <typename R, typename C, typename... P> + static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; } + template <typename R, typename C, typename... P> + static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; } + // Create a PtmfHelper from some poniter-to-member-function which is a template. In this case + // the function must match exactly the containing type C, return type R, and parameter types P... + // GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a + // guess at P. Luckily, if the function parameters are template parameters then it's not + // necessary to be precise about P. +#undef BODY +}; + +#if __GNUC__ >= 8 && !__clang__ +#pragma GCC diagnostic pop +#endif + +template <typename T, typename ReturnType, typename... ParamTypes> +void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) { + return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj); +} +template <typename T, typename ReturnType, typename... ParamTypes> +void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) { + return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj); +} + +template <typename... ParamTypes> +struct GetFunctorStartAddress { + // Given a functor (any object defining operator()), return the start address of the function, + // suitable for passing to addr2line to obtain a source file/line for debugging purposes. + // + // This turns out to be incredibly hard to implement in the presence of overloaded or templated + // functors. Therefore, we impose these specific restrictions, specific to our use case: + // - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas + // anyway.) + // - The template parameters to GetFunctorStartAddress specify a hint as to the expected + // parameter types. If the functor is templated, its parameters must match exactly these types. + // (If it's not templated, ParamTypes are ignored.) + + template <typename Func> + static void* apply(Func&& func) { + typedef decltype(func(instance<ParamTypes>()...)) ReturnType; + return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>( + &Decay<Func>::operator()).apply(&func); + } +}; + +template <> +struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {}; +// Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function +// actually has no parameters. + +class TransformPromiseNodeBase: public PromiseNode { +public: + TransformPromiseNodeBase(OwnPromiseNode&& dependency, void* continuationTracePtr); + + void onReady(Event* event) noexcept override; + void get(ExceptionOrValue& output) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +private: + OwnPromiseNode dependency; + void* continuationTracePtr; + + void dropDependency(); + void getDepResult(ExceptionOrValue& output); + + virtual void getImpl(ExceptionOrValue& output) = 0; + + template <typename, typename, typename, typename> + friend class TransformPromiseNode; +}; + +template <typename T, typename DepT, typename Func, typename ErrorFunc> +class TransformPromiseNode final: public TransformPromiseNodeBase { + // A PromiseNode that transforms the result of another PromiseNode through an application-provided + // function (implements `then()`). + +public: + TransformPromiseNode(OwnPromiseNode&& dependency, Func&& func, ErrorFunc&& errorHandler, + void* continuationTracePtr) + : TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr), + func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {} + void destroy() override { freePromise(this); } + + ~TransformPromiseNode() noexcept(false) { + // We need to make sure the dependency is deleted before we delete the continuations because it + // is a common pattern for the continuations to hold ownership of objects that might be in-use + // by the dependency. + dropDependency(); + } + +private: + Func func; + ErrorFunc errorHandler; + + void getImpl(ExceptionOrValue& output) override { + ExceptionOr<DepT> depResult; + getDepResult(depResult); + KJ_IF_MAYBE(depException, depResult.exception) { + output.as<T>() = handle( + MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply( + errorHandler, kj::mv(*depException))); + } else KJ_IF_MAYBE(depValue, depResult.value) { + output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue))); + } + } + + ExceptionOr<T> handle(T&& value) { + return kj::mv(value); + } + ExceptionOr<T> handle(PropagateException::Bottom&& value) { + return ExceptionOr<T>(false, value.asException()); + } +}; + +// ------------------------------------------------------------------- + +class ForkHubBase; +using OwnForkHubBase = Own<ForkHubBase, ForkHubBase>; + +class ForkBranchBase: public PromiseNode { +public: + ForkBranchBase(OwnForkHubBase&& hub); + ~ForkBranchBase() noexcept(false); + + void hubReady() noexcept; + // Called by the hub to indicate that it is ready. + + // implements PromiseNode ------------------------------------------ + void onReady(Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +protected: + inline ExceptionOrValue& getHubResultRef(); + + void releaseHub(ExceptionOrValue& output); + // Release the hub. If an exception is thrown, add it to `output`. + +private: + OnReadyEvent onReadyEvent; + + OwnForkHubBase hub; + ForkBranchBase* next = nullptr; + ForkBranchBase** prevPtr = nullptr; + + friend class ForkHubBase; +}; + +template <typename T> T copyOrAddRef(T& t) { return t; } +template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); } +template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) { + return t.map([](Own<T>& ptr) { + return ptr->addRef(); + }); +} + +template <typename T> +class ForkBranch final: public ForkBranchBase { + // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives + // a const reference. + +public: + ForkBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {} + void destroy() override { freePromise(this); } + + void get(ExceptionOrValue& output) noexcept override { + ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); + KJ_IF_MAYBE(value, hubResult.value) { + output.as<T>().value = copyOrAddRef(*value); + } else { + output.as<T>().value = nullptr; + } + output.exception = hubResult.exception; + releaseHub(output); + } +}; + +template <typename T, size_t index> +class SplitBranch final: public ForkBranchBase { + // A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives + // a const reference. + +public: + SplitBranch(OwnForkHubBase&& hub): ForkBranchBase(kj::mv(hub)) {} + void destroy() override { freePromise(this); } + + typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element; + + void get(ExceptionOrValue& output) noexcept override { + ExceptionOr<T>& hubResult = getHubResultRef().template as<T>(); + KJ_IF_MAYBE(value, hubResult.value) { + output.as<Element>().value = kj::mv(kj::get<index>(*value)); + } else { + output.as<Element>().value = nullptr; + } + output.exception = hubResult.exception; + releaseHub(output); + } +}; + +// ------------------------------------------------------------------- + +class ForkHubBase: public PromiseArenaMember, protected Event { +public: + ForkHubBase(OwnPromiseNode&& inner, ExceptionOrValue& resultRef, SourceLocation location); + + inline ExceptionOrValue& getResultRef() { return resultRef; } + + inline bool isShared() const { return refcount > 1; } + + Own<ForkHubBase, ForkHubBase> addRef() { + ++refcount; + return Own<ForkHubBase, ForkHubBase>(this); + } + + static void dispose(ForkHubBase* obj) { + if (--obj->refcount == 0) { + PromiseDisposer::dispose(obj); + } + } + +private: + uint refcount = 1; + // We manually implement refcounting for ForkHubBase so that we can use it together with + // PromiseDisposer's arena allocation. + + OwnPromiseNode inner; + ExceptionOrValue& resultRef; + + ForkBranchBase* headBranch = nullptr; + ForkBranchBase** tailBranch = &headBranch; + // Tail becomes null once the inner promise is ready and all branches have been notified. + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; + + friend class ForkBranchBase; +}; + +template <typename T> +class ForkHub final: public ForkHubBase { + // A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces + // the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if + // possible). + +public: + ForkHub(OwnPromiseNode&& inner, SourceLocation location) + : ForkHubBase(kj::mv(inner), result, location) {} + void destroy() override { freePromise(this); } + + Promise<_::UnfixVoid<T>> addBranch() { + return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>( + allocPromise<ForkBranch<T>>(addRef())); + } + + _::SplitTuplePromise<T> split(SourceLocation location) { + return splitImpl(MakeIndexes<tupleSize<T>()>(), location); + } + +private: + ExceptionOr<T> result; + + template <size_t... indexes> + _::SplitTuplePromise<T> splitImpl(Indexes<indexes...>, SourceLocation location) { + return kj::tuple(addSplit<indexes>(location)...); + } + + template <size_t index> + ReducePromises<typename SplitBranch<T, index>::Element> addSplit(SourceLocation location) { + return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>( + maybeChain(allocPromise<SplitBranch<T, index>>(addRef()), + implicitCast<typename SplitBranch<T, index>::Element*>(nullptr), + location)); + } +}; + +inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { + return hub->getResultRef(); +} + +// ------------------------------------------------------------------- + +class ChainPromiseNode final: public PromiseNode, public Event { + // Promise node which reduces Promise<Promise<T>> to Promise<T>. + // + // `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to + // Own<Event>. Ugh, templates and private... + +public: + explicit ChainPromiseNode(OwnPromiseNode inner, SourceLocation location); + ~ChainPromiseNode() noexcept(false); + void destroy() override; + + void onReady(Event* event) noexcept override; + void setSelfPointer(OwnPromiseNode* selfPtr) noexcept override; + void get(ExceptionOrValue& output) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +private: + enum State { + STEP1, + STEP2 + }; + + State state; + + OwnPromiseNode inner; + // In STEP1, a PromiseNode for a Promise<T>. + // In STEP2, a PromiseNode for a T. + + Event* onReadyEvent = nullptr; + OwnPromiseNode* selfPtr = nullptr; + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; +}; + +template <typename T> +OwnPromiseNode maybeChain(OwnPromiseNode&& node, Promise<T>*, SourceLocation location) { + return appendPromise<ChainPromiseNode>(kj::mv(node), location); +} + +template <typename T> +OwnPromiseNode&& maybeChain(OwnPromiseNode&& node, T*, SourceLocation location) { + return kj::mv(node); +} + +template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))> +inline Result maybeReduce(Promise<T>&& promise, bool) { + return T::reducePromise(kj::mv(promise)); +} + +template <typename T> +inline Promise<T> maybeReduce(Promise<T>&& promise, ...) { + return kj::mv(promise); +} + +// ------------------------------------------------------------------- + +class ExclusiveJoinPromiseNode final: public PromiseNode { +public: + ExclusiveJoinPromiseNode(OwnPromiseNode left, OwnPromiseNode right, SourceLocation location); + ~ExclusiveJoinPromiseNode() noexcept(false); + void destroy() override; + + void onReady(Event* event) noexcept override; + void get(ExceptionOrValue& output) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +private: + class Branch: public Event { + public: + Branch(ExclusiveJoinPromiseNode& joinNode, OwnPromiseNode dependency, + SourceLocation location); + ~Branch() noexcept(false); + + bool get(ExceptionOrValue& output); + // Returns true if this is the side that finished. + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; + + private: + ExclusiveJoinPromiseNode& joinNode; + OwnPromiseNode dependency; + + friend class ExclusiveJoinPromiseNode; + }; + + Branch left; + Branch right; + OnReadyEvent onReadyEvent; +}; + +// ------------------------------------------------------------------- + +enum class ArrayJoinBehavior { + LAZY, + EAGER, +}; + +class ArrayJoinPromiseNodeBase: public PromiseNode { +public: + ArrayJoinPromiseNodeBase(Array<OwnPromiseNode> promises, + ExceptionOrValue* resultParts, size_t partSize, + SourceLocation location, + ArrayJoinBehavior joinBehavior); + ~ArrayJoinPromiseNodeBase() noexcept(false); + + void onReady(Event* event) noexcept override final; + void get(ExceptionOrValue& output) noexcept override final; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final; + +protected: + virtual void getNoError(ExceptionOrValue& output) noexcept = 0; + // Called to compile the result only in the case where there were no errors. + +private: + const ArrayJoinBehavior joinBehavior; + + uint countLeft; + OnReadyEvent onReadyEvent; + bool armed = false; + + class Branch final: public Event { + public: + Branch(ArrayJoinPromiseNodeBase& joinNode, OwnPromiseNode dependency, + ExceptionOrValue& output, SourceLocation location); + ~Branch() noexcept(false); + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; + + private: + ArrayJoinPromiseNodeBase& joinNode; + OwnPromiseNode dependency; + ExceptionOrValue& output; + + friend class ArrayJoinPromiseNodeBase; + }; + + Array<Branch> branches; +}; + +template <typename T> +class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase { +public: + ArrayJoinPromiseNode(Array<OwnPromiseNode> promises, + Array<ExceptionOr<T>> resultParts, + SourceLocation location, + ArrayJoinBehavior joinBehavior) + : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>), + location, joinBehavior), + resultParts(kj::mv(resultParts)) {} + void destroy() override { freePromise(this); } + +protected: + void getNoError(ExceptionOrValue& output) noexcept override { + auto builder = heapArrayBuilder<T>(resultParts.size()); + for (auto& part: resultParts) { + KJ_IASSERT(part.value != nullptr, + "Bug in KJ promise framework: Promise result had neither value no exception."); + builder.add(kj::mv(*_::readMaybe(part.value))); + } + output.as<Array<T>>() = builder.finish(); + } + +private: + Array<ExceptionOr<T>> resultParts; +}; + +template <> +class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase { +public: + ArrayJoinPromiseNode(Array<OwnPromiseNode> promises, + Array<ExceptionOr<_::Void>> resultParts, + SourceLocation location, + ArrayJoinBehavior joinBehavior); + ~ArrayJoinPromiseNode(); + void destroy() override; + +protected: + void getNoError(ExceptionOrValue& output) noexcept override; + +private: + Array<ExceptionOr<_::Void>> resultParts; +}; + +// ------------------------------------------------------------------- + +class EagerPromiseNodeBase: public PromiseNode, protected Event { + // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly + // evaluate it. + +public: + EagerPromiseNodeBase(OwnPromiseNode&& dependency, ExceptionOrValue& resultRef, + SourceLocation location); + + void onReady(Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +private: + OwnPromiseNode dependency; + OnReadyEvent onReadyEvent; + + ExceptionOrValue& resultRef; + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; +}; + +template <typename T> +class EagerPromiseNode final: public EagerPromiseNodeBase { +public: + EagerPromiseNode(OwnPromiseNode&& dependency, SourceLocation location) + : EagerPromiseNodeBase(kj::mv(dependency), result, location) {} + void destroy() override { freePromise(this); } + + void get(ExceptionOrValue& output) noexcept override { + output.as<T>() = kj::mv(result); + } + +private: + ExceptionOr<T> result; +}; + +template <typename T> +OwnPromiseNode spark(OwnPromiseNode&& node, SourceLocation location) { + // Forces evaluation of the given node to begin as soon as possible, even if no one is waiting + // on it. + return appendPromise<EagerPromiseNode<T>>(kj::mv(node), location); +} + +// ------------------------------------------------------------------- + +class AdapterPromiseNodeBase: public PromiseNode { +public: + void onReady(Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +protected: + inline void setReady() { + onReadyEvent.arm(); + } + +private: + OnReadyEvent onReadyEvent; +}; + +template <typename T, typename Adapter> +class AdapterPromiseNode final: public AdapterPromiseNodeBase, + private PromiseFulfiller<UnfixVoid<T>> { + // A PromiseNode that wraps a PromiseAdapter. + +public: + template <typename... Params> + AdapterPromiseNode(Params&&... params) + : adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {} + void destroy() override { freePromise(this); } + + void get(ExceptionOrValue& output) noexcept override { + KJ_IREQUIRE(!isWaiting()); + output.as<T>() = kj::mv(result); + } + +private: + ExceptionOr<T> result; + bool waiting = true; + Adapter adapter; + + void fulfill(T&& value) override { + if (waiting) { + waiting = false; + result = ExceptionOr<T>(kj::mv(value)); + setReady(); + } + } + + void reject(Exception&& exception) override { + if (waiting) { + waiting = false; + result = ExceptionOr<T>(false, kj::mv(exception)); + setReady(); + } + } + + bool isWaiting() override { + return waiting; + } +}; + +// ------------------------------------------------------------------- + +class FiberBase: public PromiseNode, private Event { + // Base class for the outer PromiseNode representing a fiber. + +public: + explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location); + explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location); + ~FiberBase() noexcept(false); + + void start() { armDepthFirst(); } + // Call immediately after construction to begin executing the fiber. + + class WaitDoneEvent; + + void onReady(_::Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +protected: + bool isFinished() { return state == FINISHED; } + void cancel(); + +private: + enum { WAITING, RUNNING, CANCELED, FINISHED } state; + + _::PromiseNode* currentInner = nullptr; + OnReadyEvent onReadyEvent; + Own<FiberStack> stack; + _::ExceptionOrValue& result; + + void run(); + virtual void runImpl(WaitScope& waitScope) = 0; + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; + // Implements Event. Each time the event is fired, switchToFiber() is called. + + friend class FiberStack; + friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result, + WaitScope& waitScope, SourceLocation location); + friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); +}; + +template <typename Func> +class Fiber final: public FiberBase { +public: + explicit Fiber(size_t stackSize, Func&& func, SourceLocation location) + : FiberBase(stackSize, result, location), func(kj::fwd<Func>(func)) {} + explicit Fiber(const FiberPool& pool, Func&& func, SourceLocation location) + : FiberBase(pool, result, location), func(kj::fwd<Func>(func)) {} + ~Fiber() noexcept(false) { cancel(); } + void destroy() override { freePromise(this); } + + typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType; + + void get(ExceptionOrValue& output) noexcept override { + KJ_IREQUIRE(isFinished()); + output.as<ResultType>() = kj::mv(result); + } + +private: + Func func; + ExceptionOr<ResultType> result; + + void runImpl(WaitScope& waitScope) override { + result.template as<ResultType>() = + MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope); + } +}; + +} // namespace _ (private) + +// ======================================================================================= + +template <typename T> +Promise<T>::Promise(_::FixVoid<T> value) + : PromiseBase(_::allocPromise<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {} + +template <typename T> +Promise<T>::Promise(kj::Exception&& exception) + : PromiseBase(_::allocPromise<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {} + +template <typename T> +template <typename Func, typename ErrorFunc> +PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler, + SourceLocation location) { + typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; + + void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func); + _::OwnPromiseNode intermediate = + _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( + kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler), + continuationTracePtr); + auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>( + _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); + return _::maybeReduce(kj::mv(result), false); +} + +namespace _ { // private + +template <typename T> +struct IdentityFunc { + inline T operator()(T&& value) const { + return kj::mv(value); + } +}; +template <typename T> +struct IdentityFunc<Promise<T>> { + inline Promise<T> operator()(T&& value) const { + return kj::mv(value); + } +}; +template <> +struct IdentityFunc<void> { + inline void operator()() const {} +}; +template <> +struct IdentityFunc<Promise<void>> { + Promise<void> operator()() const; + // This can't be inline because it will make the translation unit depend on kj-async. Awkwardly, + // Cap'n Proto relies on being able to include this header without creating such a link-time + // dependency. +}; + +} // namespace _ (private) + +template <typename T> +template <typename ErrorFunc> +Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler, SourceLocation location) { + // then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case, + // Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise, + // but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually + // return a promise. So we make our Func return match ErrorFunc. + typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func; + typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; + + // The reason catch_() isn't simply implemented in terms of then() is because we want the trace + // pointer to be based on ErrorFunc rather than Func. + void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler); + _::OwnPromiseNode intermediate = + _::appendPromise<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>( + kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr); + auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>( + _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); + return _::maybeReduce(kj::mv(result), false); +} + +template <typename T> +T Promise<T>::wait(WaitScope& waitScope, SourceLocation location) { + _::ExceptionOr<_::FixVoid<T>> result; + _::waitImpl(kj::mv(node), result, waitScope, location); + return convertToReturn(kj::mv(result)); +} + +template <typename T> +bool Promise<T>::poll(WaitScope& waitScope, SourceLocation location) { + return _::pollImpl(*node, waitScope, location); +} + +template <typename T> +ForkedPromise<T> Promise<T>::fork(SourceLocation location) { + return ForkedPromise<T>(false, + _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>(kj::mv(node), location)); +} + +template <typename T> +Promise<T> ForkedPromise<T>::addBranch() { + return hub->addBranch(); +} + +template <typename T> +bool ForkedPromise<T>::hasBranches() { + return hub->isShared(); +} + +template <typename T> +_::SplitTuplePromise<T> Promise<T>::split(SourceLocation location) { + return _::PromiseDisposer::alloc<_::ForkHub<_::FixVoid<T>>, _::ForkHubBase>( + kj::mv(node), location)->split(location); +} + +template <typename T> +Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other, SourceLocation location) { + return Promise(false, _::appendPromise<_::ExclusiveJoinPromiseNode>( + kj::mv(node), kj::mv(other.node), location)); +} + +template <typename T> +template <typename... Attachments> +Promise<T> Promise<T>::attach(Attachments&&... attachments) { + return Promise(false, _::appendPromise<_::AttachmentPromiseNode<Tuple<Attachments...>>>( + kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...))); +} + +template <typename T> +template <typename ErrorFunc> +Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location) { + // See catch_() for commentary. + return Promise(false, _::spark<_::FixVoid<T>>(then( + _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(), + kj::fwd<ErrorFunc>(errorHandler)).node, location)); +} + +template <typename T> +Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr), SourceLocation location) { + return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node), location)); +} + +template <typename T> +kj::String Promise<T>::trace() { + return PromiseBase::trace(); +} + +template <typename T, T value> +inline Promise<T> constPromise() { + static _::ConstPromiseNode<T, value> NODE; + return _::PromiseNode::to<Promise<T>>(_::OwnPromiseNode(&NODE)); +} + +template <typename Func> +inline PromiseForResult<Func, void> evalLater(Func&& func) { + return _::yield().then(kj::fwd<Func>(func), _::PropagateException()); +} + +template <typename Func> +inline PromiseForResult<Func, void> evalLast(Func&& func) { + return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException()); +} + +template <typename Func> +inline PromiseForResult<Func, void> evalNow(Func&& func) { + PromiseForResult<Func, void> result = nullptr; + KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() { + result = func(); + })) { + result = kj::mv(*e); + } + return result; +} + +template <typename Func> +struct RetryOnDisconnect_ { + static inline PromiseForResult<Func, void> apply(Func&& func) { + return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> { + auto promise = evalNow(func); + return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> { + if (e.getType() == kj::Exception::Type::DISCONNECTED) { + return func(); + } else { + return kj::mv(e); + } + }); + }); + } +}; +template <typename Func> +struct RetryOnDisconnect_<Func&> { + // Specialization for references. Needed because the syntax for capturing references in a + // lambda is different. :( + static inline PromiseForResult<Func, void> apply(Func& func) { + auto promise = evalLater(func); + return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> { + if (e.getType() == kj::Exception::Type::DISCONNECTED) { + return func(); + } else { + return kj::mv(e); + } + }); + } +}; + +template <typename Func> +inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) { + return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func)); +} + +template <typename Func> +inline PromiseForResult<Func, WaitScope&> startFiber( + size_t stackSize, Func&& func, SourceLocation location) { + typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT; + + auto intermediate = _::allocPromise<_::Fiber<Func>>( + stackSize, kj::fwd<Func>(func), location); + intermediate->start(); + auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>( + _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); + return _::maybeReduce(kj::mv(result), false); +} + +template <typename Func> +inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber( + Func&& func, SourceLocation location) const { + typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT; + + auto intermediate = _::allocPromise<_::Fiber<Func>>( + *this, kj::fwd<Func>(func), location); + intermediate->start(); + auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>( + _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location)); + return _::maybeReduce(kj::mv(result), false); +} + +template <typename T> +template <typename ErrorFunc> +void Promise<T>::detach(ErrorFunc&& errorHandler) { + return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler))); +} + +template <> +template <typename ErrorFunc> +void Promise<void>::detach(ErrorFunc&& errorHandler) { + return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler))); +} + +template <typename T> +Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location) { + return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>( + KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, + heapArray<_::ExceptionOr<T>>(promises.size()), location, + _::ArrayJoinBehavior::LAZY)); +} + +template <typename T> +Promise<Array<T>> joinPromisesFailFast(Array<Promise<T>>&& promises, SourceLocation location) { + return _::PromiseNode::to<Promise<Array<T>>>(_::allocPromise<_::ArrayJoinPromiseNode<T>>( + KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); }, + heapArray<_::ExceptionOr<T>>(promises.size()), location, + _::ArrayJoinBehavior::EAGER)); +} + +// ======================================================================================= + +namespace _ { // private + +class WeakFulfillerBase: protected kj::Disposer { +protected: + WeakFulfillerBase(): inner(nullptr) {} + virtual ~WeakFulfillerBase() noexcept(false) {} + + template <typename T> + inline PromiseFulfiller<T>* getInner() { + return static_cast<PromiseFulfiller<T>*>(inner); + }; + template <typename T> + inline void setInner(PromiseFulfiller<T>* ptr) { + inner = ptr; + }; + +private: + mutable PromiseRejector* inner; + + void disposeImpl(void* pointer) const override; +}; + +template <typename T> +class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase { + // A wrapper around PromiseFulfiller which can be detached. + // + // There are a couple non-trivialities here: + // - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly + // rejected. + // - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been + // detached from the underlying fulfiller, because otherwise the later detach() call will go + // to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the + // refcount never goes over 2 and we manually implement the refcounting because we need to do + // other special things when each side detaches anyway. To this end, WeakFulfiller is its own + // Disposer -- dispose() is called when the application discards its owned pointer to the + // fulfiller and detach() is called when the promise is destroyed. + +public: + KJ_DISALLOW_COPY_AND_MOVE(WeakFulfiller); + + static kj::Own<WeakFulfiller> make() { + WeakFulfiller* ptr = new WeakFulfiller; + return Own<WeakFulfiller>(ptr, *ptr); + } + + void fulfill(FixVoid<T>&& value) override { + if (getInner<T>() != nullptr) { + getInner<T>()->fulfill(kj::mv(value)); + } + } + + void reject(Exception&& exception) override { + if (getInner<T>() != nullptr) { + getInner<T>()->reject(kj::mv(exception)); + } + } + + bool isWaiting() override { + return getInner<T>() != nullptr && getInner<T>()->isWaiting(); + } + + void attach(PromiseFulfiller<T>& newInner) { + setInner<T>(&newInner); + } + + void detach(PromiseFulfiller<T>& from) { + if (getInner<T>() == nullptr) { + // Already disposed. + delete this; + } else { + KJ_IREQUIRE(getInner<T>() == &from); + setInner<T>(nullptr); + } + } + +private: + WeakFulfiller() {} +}; + +template <typename T> +class PromiseAndFulfillerAdapter { +public: + PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller, + WeakFulfiller<T>& wrapper) + : fulfiller(fulfiller), wrapper(wrapper) { + wrapper.attach(fulfiller); + } + + ~PromiseAndFulfillerAdapter() noexcept(false) { + wrapper.detach(fulfiller); + } + +private: + PromiseFulfiller<T>& fulfiller; + WeakFulfiller<T>& wrapper; +}; + +} // namespace _ (private) + +template <typename T> +template <typename Func> +bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) { + KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { + reject(kj::mv(*exception)); + return false; + } else { + return true; + } +} + +template <typename Func> +bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) { + KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) { + reject(kj::mv(*exception)); + return false; + } else { + return true; + } +} + +template <typename T, typename Adapter, typename... Params> +_::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) { + _::OwnPromiseNode intermediate( + _::allocPromise<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>( + kj::fwd<Params>(adapterConstructorParams)...)); + // We can't capture SourceLocation in this function's arguments since it is a vararg template. :( + return _::PromiseNode::to<_::ReducePromises<T>>( + _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), SourceLocation())); +} + +template <typename T> +PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location) { + auto wrapper = _::WeakFulfiller<T>::make(); + + _::OwnPromiseNode intermediate( + _::allocPromise<_::AdapterPromiseNode< + _::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper)); + auto promise = _::PromiseNode::to<_::ReducePromises<T>>( + _::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), location)); + + return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) }; +} + +// ======================================================================================= +// cross-thread stuff + +namespace _ { // (private) + +class XThreadEvent: public PromiseNode, // it's a PromiseNode in the requesting thread + private Event { // it's an event in the target thread +public: + XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, EventLoop& loop, + void* funcTracePtr, SourceLocation location); + + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +protected: + void ensureDoneOrCanceled(); + // MUST be called in destructor of subclasses to make sure the object is not destroyed while + // still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because + // that destructor doesn't run until the subclass has already been destroyed.) + + virtual kj::Maybe<OwnPromiseNode> execute() = 0; + // Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise + // returns null. + + // implements PromiseNode ---------------------------------------------------- + void onReady(Event* event) noexcept override; + +private: + ExceptionOrValue& result; + void* funcTracePtr; + + kj::Own<const Executor> targetExecutor; + Maybe<const Executor&> replyExecutor; // If executeAsync() was used. + + kj::Maybe<OwnPromiseNode> promiseNode; + // Accessed only in target thread. + + ListLink<XThreadEvent> targetLink; + // Membership in one of the linked lists in the target Executor's work list or cancel list. These + // fields are protected by the target Executor's mutex. + + enum { + UNUSED, + // Object was never queued on another thread. + + QUEUED, + // Target thread has not yet dequeued the event from the state.start list. The requesting + // thread can cancel execution by removing the event from the list. + + EXECUTING, + // Target thread has dequeued the event from state.start and moved it to state.executing. To + // cancel, the requesting thread must add the event to the state.cancel list and change the + // state to CANCELING. + + CANCELING, + // Requesting thread is trying to cancel this event. The target thread will change the state to + // `DONE` once canceled. + + DONE + // Target thread has completed handling this event and will not touch it again. The requesting + // thread can safely delete the object. The `state` is updated to `DONE` using an atomic + // release operation after ensuring that the event will not be touched again, so that the + // requesting can safely skip locking if it observes the state is already DONE. + } state = UNUSED; + // State, which is also protected by `targetExecutor`'s mutex. + + ListLink<XThreadEvent> replyLink; + // Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The + // executing thread places the event in the reply list near the end of the `EXECUTING` state. + // Because the thread cannot lock two mutexes at once, it's possible that the reply executor + // will receive the reply while the event is still listed in the EXECUTING state, but it can + // ignore the state and proceed with the result. + + OnReadyEvent onReadyEvent; + // Accessed only in requesting thread. + + friend class kj::Executor; + + void done(); + // Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT + // call under lock. + + void sendReply(); + // Notifies the originating thread that this event is done, but doesn't set the state to DONE + // yet. Do NOT call under lock. + + void setDoneState(); + // Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must + // only be called in the destination thread, and must either be called under lock, or the thread + // must take the lock and release it again shortly after setting the state (because some threads + // may be waiting on the DONE state using a conditional wait on the mutex). After calling + // setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs + // solely to the requesting thread. + + void setDisconnected(); + // Sets the result to a DISCONNECTED exception indicating that the target event loop exited. + + class DelayedDoneHack; + + // implements Event ---------------------------------------------------------- + Maybe<Own<Event>> fire() override; + // If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr, + // then it just indicated readiness and we need to get its result. + + void traceEvent(TraceBuilder& builder) override; +}; + +template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>> +class XThreadEventImpl final: public XThreadEvent { + // Implementation for a function that does not return a Promise. +public: + XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location) + : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location), + func(kj::fwd<Func>(func)) {} + ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } + void destroy() override { freePromise(this); } + + typedef _::FixVoid<_::ReturnType<Func, void>> ResultT; + + kj::Maybe<_::OwnPromiseNode> execute() override { + result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void()); + return nullptr; + } + + // implements PromiseNode ---------------------------------------------------- + void get(ExceptionOrValue& output) noexcept override { + output.as<ResultT>() = kj::mv(result); + } + +private: + Func func; + ExceptionOr<ResultT> result; + friend Executor; +}; + +template <typename Func, typename T> +class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent { + // Implementation for a function that DOES return a Promise. +public: + XThreadEventImpl(Func&& func, const Executor& target, EventLoop& loop, SourceLocation location) + : XThreadEvent(result, target, loop, GetFunctorStartAddress<>::apply(func), location), + func(kj::fwd<Func>(func)) {} + ~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); } + void destroy() override { freePromise(this); } + + typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT; + + kj::Maybe<_::OwnPromiseNode> execute() override { + auto result = _::PromiseNode::from(func()); + KJ_IREQUIRE(result.get() != nullptr); + return kj::mv(result); + } + + // implements PromiseNode ---------------------------------------------------- + void get(ExceptionOrValue& output) noexcept override { + output.as<ResultT>() = kj::mv(result); + } + +private: + Func func; + ExceptionOr<ResultT> result; + friend Executor; +}; + +} // namespace _ (private) + +template <typename Func> +_::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync( + Func&& func, SourceLocation location) const { + _::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this, getLoop(), location); + send(event, true); + return convertToReturn(kj::mv(event.result)); +} + +template <typename Func> +PromiseForResult<Func, void> Executor::executeAsync(Func&& func, SourceLocation location) const { + // HACK: We call getLoop() here, rather than have XThreadEvent's constructor do it, so that if it + // throws we don't crash due to `allocPromise()` being `noexcept`. + auto event = _::allocPromise<_::XThreadEventImpl<Func>>( + kj::fwd<Func>(func), *this, getLoop(), location); + send(*event, false); + return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event)); +} + +// ----------------------------------------------------------------------------- + +namespace _ { // (private) + +template <typename T> +class XThreadFulfiller; + +class XThreadPaf: public PromiseNode { +public: + XThreadPaf(); + virtual ~XThreadPaf() noexcept(false); + void destroy() override; + + // implements PromiseNode ---------------------------------------------------- + void onReady(Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + +private: + enum { + WAITING, + // Not yet fulfilled, and the waiter is still waiting. + // + // Starting from this state, the state may transition to either FULFILLING or CANCELED + // using an atomic compare-and-swap. + + FULFILLING, + // The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it + // wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not + // disappear out from under it. It then fills in the result value, locks the executor mutex, + // adds the object to the executor's list of fulfilled XThreadPafs, changes the state to + // FULFILLED, and finally unlocks the mutex. + // + // If the waiting thread tries to cancel but discovers the object in this state, then it + // must perform a conditional wait on the executor mutex to await the state becoming FULFILLED. + // It can then delete the object. + + FULFILLED, + // The fulfilling thread has completed filling in the result value and inserting the object + // into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer + // holds any pointers to this object. The waiting thread is responsible for deleting it. + + DISPATCHED, + // The object reached FULFILLED state, and then was dispatched from the waiting thread's + // executor's event queue. Therefore, the object is completely owned by the waiting thread with + // no need to lock anything. + + CANCELED + // The waiting thread atomically transitions the state from WAITING to CANCELED if it is no + // longer listening. In this state, it is the fulfiller thread's responsibility to destroy the + // object. + } state; + + const Executor& executor; + // Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or + // `FULFILLING`. After any other state has been reached, this reference may be invalidated. + + ListLink<XThreadPaf> link; + // In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting + // thread's executor. In those states, these pointers are guarded by said executor's mutex. + + OnReadyEvent onReadyEvent; + + class FulfillScope; + + static kj::Exception unfulfilledException(); + // Construct appropriate exception to use to reject an unfulfilled XThreadPaf. + + template <typename T> + friend class XThreadFulfiller; + friend Executor; +}; + +template <typename T> +class XThreadPafImpl final: public XThreadPaf { +public: + // implements PromiseNode ---------------------------------------------------- + void get(ExceptionOrValue& output) noexcept override { + output.as<FixVoid<T>>() = kj::mv(result); + } + +private: + ExceptionOr<FixVoid<T>> result; + + friend class XThreadFulfiller<T>; +}; + +class XThreadPaf::FulfillScope { + // Create on stack while setting `XThreadPafImpl<T>::result`. + // + // This ensures that: + // - Only one call is carried out, even if multiple threads try to fulfill concurrently. + // - The waiting thread is correctly signaled. +public: + FulfillScope(XThreadPaf** pointer); + // Atomically nulls out *pointer and takes ownership of the pointer. + + ~FulfillScope() noexcept(false); + + KJ_DISALLOW_COPY_AND_MOVE(FulfillScope); + + bool shouldFulfill() { return obj != nullptr; } + + template <typename T> + XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); } + +private: + XThreadPaf* obj; +}; + +template <typename T> +class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> { +public: + XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {} + + ~XThreadFulfiller() noexcept(false) { + if (target != nullptr) { + reject(XThreadPaf::unfulfilledException()); + } + } + void fulfill(FixVoid<T>&& value) const override { + XThreadPaf::FulfillScope scope(&target); + if (scope.shouldFulfill()) { + scope.getTarget<T>()->result = kj::mv(value); + } + } + void reject(Exception&& exception) const override { + XThreadPaf::FulfillScope scope(&target); + if (scope.shouldFulfill()) { + scope.getTarget<T>()->result.addException(kj::mv(exception)); + } + } + bool isWaiting() const override { + KJ_IF_MAYBE(t, target) { +#if _MSC_VER && !__clang__ + // Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be? + return t->state == XThreadPaf::WAITING; +#else + return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING; +#endif + } else { + return false; + } + } + +private: + mutable XThreadPaf* target; // accessed using atomic ops +}; + +template <typename T> +class XThreadFulfiller<kj::Promise<T>> { +public: + static_assert(sizeof(T) < 0, + "newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported"); + // TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`, + // then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not + // the waiting thread. +}; + +} // namespace _ (private) + +template <typename T> +PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() { + kj::Own<_::XThreadPafImpl<T>, _::PromiseDisposer> node(new _::XThreadPafImpl<T>); + auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node); + return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) }; +} + +} // namespace kj + +#if KJ_HAS_COROUTINE + +// ======================================================================================= +// Coroutines TS integration with kj::Promise<T>. +// +// Here's a simple coroutine: +// +// Promise<Own<AsyncIoStream>> connectToService(Network& n) { +// auto a = co_await n.parseAddress(IP, PORT); +// auto c = co_await a->connect(); +// co_return kj::mv(c); +// } +// +// The presence of the co_await and co_return keywords tell the compiler it is a coroutine. +// Although it looks similar to a function, it has a couple large differences. First, everything +// that would normally live in the stack frame lives instead in a heap-based coroutine frame. +// Second, the coroutine has the ability to return from its scope without deallocating this frame +// (to suspend, in other words), and the ability to resume from its last suspension point. +// +// In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a +// coroutine implementation type via a traits class parameterized by the coroutine return and +// parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`, + +namespace kj::_ { template <typename T> class Coroutine; } + +// Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`. + +namespace KJ_COROUTINE_STD_NAMESPACE { + +template <class T, class... Args> +struct coroutine_traits<kj::Promise<T>, Args...> { + // `Args...` are the coroutine's parameter types. + + using promise_type = kj::_::Coroutine<T>; + // The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines + // returning `std::future<T>`, since the coroutine implementation would be a wrapper around + // a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it + // the "coroutine implementation type" instead. +}; + +} // namespace KJ_COROUTINE_STD_NAMESPACE + +// Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a +// `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or +// `kj::_::Coroutine<Own<AsyncIoStream>>`. +// +// The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and +// deallocated when the frame does. + +namespace kj::_ { + +namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE; + +class CoroutineBase: public PromiseNode, + public Event { +public: + CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef, + SourceLocation location); + ~CoroutineBase() noexcept(false); + KJ_DISALLOW_COPY_AND_MOVE(CoroutineBase); + void destroy() override; + + auto initial_suspend() { return stdcoro::suspend_never(); } + auto final_suspend() noexcept { +#if _MSC_VER && !defined(__clang__) + // See comment at `finalSuspendCalled`'s definition. + finalSuspendCalled = true; +#endif + return stdcoro::suspend_always(); + } + // These adjust the suspension behavior of coroutines immediately upon initiation, and immediately + // after completion. + // + // The initial suspension point could allow us to defer the initial synchronous execution of a + // coroutine -- everything before its first co_await, that is. + // + // The final suspension point is useful to delay deallocation of the coroutine frame to match the + // lifetime of the enclosing promise. + + void unhandled_exception(); + +protected: + class AwaiterBase; + + bool isWaiting() { return waiting; } + void scheduleResumption() { + onReadyEvent.arm(); + waiting = false; + } + +private: + // ------------------------------------------------------- + // PromiseNode implementation + + void onReady(Event* event) noexcept override; + void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override; + + // ------------------------------------------------------- + // Event implementation + + Maybe<Own<Event>> fire() override; + void traceEvent(TraceBuilder& builder) override; + + stdcoro::coroutine_handle<> coroutine; + ExceptionOrValue& resultRef; + + OnReadyEvent onReadyEvent; + bool waiting = true; + + bool hasSuspendedAtLeastOnce = false; + +#if _MSC_VER && !defined(__clang__) + bool finalSuspendCalled = false; + // MSVC erroneously reports the coroutine as done (that is, `coroutine.done()` returns true) + // seemingly as soon as `return_value()`/`return_void()` are called. This matters in our + // implementation of `unhandled_exception()`, which must arrange to propagate exceptions during + // coroutine frame unwind via the returned promise, even if `return_value()`/`return_void()` have + // already been called. To prove that our assumptions are correct in that function, we want to be + // able to assert that `final_suspend()` has not yet been called. This boolean hack allows us to + // preserve that assertion. +#endif + + Maybe<PromiseNode&> promiseNodeForTrace; + // Whenever this coroutine is suspended waiting on another promise, we keep a reference to that + // promise so tracePromise()/traceEvent() can trace into it. + + UnwindDetector unwindDetector; + + struct DisposalResults { + bool destructorRan = false; + Maybe<Exception> exception; + }; + Maybe<DisposalResults&> maybeDisposalResults; + // Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this + // to point to a DisposalResults on the stack so unhandled_exception() will have some place to + // store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once + // coroutine.destroy() has returned. Our disposer then rethrows as needed. +}; + +template <typename Self, typename T> +class CoroutineMixin; +// CRTP mixin, covered later. + +template <typename T> +class Coroutine final: public CoroutineBase, + public CoroutineMixin<Coroutine<T>, T> { + // The standard calls this the `promise_type` object. We can call this the "coroutine + // implementation object" since the word promise means different things in KJ and std styles. This + // is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise + // is later fulfilled. We also fill in a few lifetime-related details. + // + // The implementation object is also where we can customize memory allocation of coroutine frames, + // by implementing a member `operator new(size_t, Args...)` (same `Args...` as in + // coroutine_traits). + // + // We can also customize how await-expressions are transformed within `kj::Promise<T>`-based + // coroutines by implementing an `await_transform(P)` member function, where `P` is some type for + // which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to + // provide an optimized `kj::EventLoop` integration when the coroutine's return type and the + // await-expression's type are both `kj::Promise` instantiations -- see further comments under + // `await_transform()`. + +public: + using Handle = stdcoro::coroutine_handle<Coroutine<T>>; + + Coroutine(SourceLocation location = {}) + : CoroutineBase(Handle::from_promise(*this), result, location) {} + + Promise<T> get_return_object() { + // Called after coroutine frame construction and before initial_suspend() to create the + // coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for + // the returned Promise<T> to own `this` via a custom Disposer and by always leaving the + // coroutine in a suspended state. + return PromiseNode::to<Promise<T>>(OwnPromiseNode(this)); + } + +public: + template <typename U> + class Awaiter; + + template <typename U> + Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); } + template <typename U> + Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); } + // Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return + // an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async + // event system. + // + // There is another hook we could implement: an `operator co_await()` free function. However, a + // free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a + // member class template of Coroutine<T>, it is able to implement an + // `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing + // coroutine's PromiseNode. An `operator co_await()` free function would have to implement + // a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement + // suspension and resumption in terms of .then(). Yuck! + +private: + // ------------------------------------------------------- + // PromiseNode implementation + + void get(ExceptionOrValue& output) noexcept override { + output.as<FixVoid<T>>() = kj::mv(result); + } + + void fulfill(FixVoid<T>&& value) { + // Called by the return_value()/return_void() functions in our mixin class. + + if (isWaiting()) { + result = kj::mv(value); + scheduleResumption(); + } + } + + ExceptionOr<FixVoid<T>> result; + + friend class CoroutineMixin<Coroutine<T>, T>; +}; + +template <typename Self, typename T> +class CoroutineMixin { +public: + void return_value(T value) { + static_cast<Self*>(this)->fulfill(kj::mv(value)); + } +}; +template <typename Self> +class CoroutineMixin<Self, void> { +public: + void return_void() { + static_cast<Self*>(this)->fulfill(_::Void()); + } +}; +// The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return +// statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has +// both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so +// these return_* functions live in a CRTP mixin. + +class CoroutineBase::AwaiterBase { +public: + explicit AwaiterBase(OwnPromiseNode node); + AwaiterBase(AwaiterBase&&); + ~AwaiterBase() noexcept(false); + KJ_DISALLOW_COPY(AwaiterBase); + + bool await_ready() const { return false; } + // This could return "`node->get()` is safe to call" instead, which would make suspension-less + // co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that + // out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we + // must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable + // suspension-less co_awaits. + +protected: + void getImpl(ExceptionOrValue& result, void* awaitedAt); + bool awaitSuspendImpl(CoroutineBase& coroutineEvent); + +private: + UnwindDetector unwindDetector; + OwnPromiseNode node; + + Maybe<CoroutineBase&> maybeCoroutineEvent; + // If we do suspend waiting for our wrapped promise, we store a reference to `node` in our + // enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack + // trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called, + // we need our own reference to the enclosing Coroutine. (I struggle to think up any such + // scenarios, but perhaps they could occur when destroying a suspended coroutine.) +}; + +template <typename T> +template <typename U> +class Coroutine<T>::Awaiter: public AwaiterBase { + // Wrapper around a co_await'ed promise and some storage space for the result of that promise. + // The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up + // when the awaited promise is settled. Once that happens, the enclosing coroutine's Event + // implementation resumes the coroutine, which transitively calls await_resume() to unwrap the + // awaited promise result. + +public: + explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {} + + KJ_NOINLINE U await_resume() { + // This is marked noinline in order to ensure __builtin_return_address() is accurate for stack + // trace purposes. In my experimentation, this method was not inlined anyway even in opt + // builds, but I want to make sure it doesn't suddenly start being inlined later causing stack + // traces to break. (I also tried always-inline, but this did not appear to cause the compiler + // to inline the method -- perhaps a limitation of coroutines?) +#if __GNUC__ + getImpl(result, __builtin_return_address(0)); +#elif _MSC_VER + getImpl(result, _ReturnAddress()); +#else + #error "please implement for your compiler" +#endif + auto value = kj::_::readMaybe(result.value); + KJ_IASSERT(value != nullptr, "Neither exception nor value present."); + return U(kj::mv(*value)); + } + + bool await_suspend(Coroutine::Handle coroutine) { + return awaitSuspendImpl(coroutine.promise()); + } + +private: + ExceptionOr<FixVoid<U>> result; +}; + +#undef KJ_COROUTINE_STD_NAMESPACE + +} // namespace kj::_ (private) + +#endif // KJ_HAS_COROUTINE + +KJ_END_HEADER