Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h Tue Mar 18 17:55:14 2025 -0400 @@ -0,0 +1,156 @@ +// Copyright (c) 2021 Cloudflare, Inc. and contributors +// Licensed under the MIT License: +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +#pragma once + +#include "async.h" +#include <kj/common.h> +#include <kj/debug.h> +#include <kj/list.h> +#include <kj/memory.h> + +#include <list> + +KJ_BEGIN_HEADER + +namespace kj { + +template <typename T> +class WaiterQueue { +public: + // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>. + + WaiterQueue() = default; + KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue); + + Promise<T> wait() { + return newAdaptedPromise<T, Node>(queue); + } + + void fulfill(T&& value) { + KJ_IREQUIRE(!empty()); + + auto& node = static_cast<Node&>(queue.front()); + node.fulfiller.fulfill(kj::mv(value)); + node.remove(); + } + + void reject(Exception&& exception) { + KJ_IREQUIRE(!empty()); + + auto& node = static_cast<Node&>(queue.front()); + node.fulfiller.reject(kj::mv(exception)); + node.remove(); + } + + bool empty() const { + return queue.empty(); + } + +private: + struct BaseNode { + // This is a separate structure because List requires a predefined memory layout but + // newAdaptedPromise() only provides access to the Adaptor type in the ctor. + + BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {} + + PromiseFulfiller<T>& fulfiller; + ListLink<BaseNode> link; + }; + + using Queue = List<BaseNode, &BaseNode::link>; + + struct Node: public BaseNode { + Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) { + queue.add(*this); + } + + ~Node() noexcept(false) { + // When the associated Promise is destructed, so is the Node thus we should leave the queue. + remove(); + } + + void remove() { + if(BaseNode::link.isLinked()){ + queue.remove(*this); + } + } + + Queue& queue; + }; + + Queue queue; +}; + +template <typename T> +class ProducerConsumerQueue { + // ProducerConsumerQueue is an async FIFO queue. + +public: + void push(T v) { + // Push an existing value onto the queue. + + if (!waiters.empty()) { + // We have at least one waiter, give the value to the oldest. + KJ_IASSERT(values.empty()); + + // Fulfill the first waiter and return without store our value. + waiters.fulfill(kj::mv(v)); + } else { + // We don't have any waiters, store the value. + values.push_front(kj::mv(v)); + } + } + + void rejectAll(Exception e) { + // Reject all waiters with a given exception. + + while (!waiters.empty()) { + auto newE = Exception(e); + waiters.reject(kj::mv(newE)); + } + } + + Promise<T> pop() { + // Eventually pop a value from the queue. + // Note that if your sinks lag your sources, the promise will always be ready. + + if (!values.empty()) { + // We have at least one value, get the oldest. + KJ_IASSERT(waiters.empty()); + + auto value = kj::mv(values.back()); + values.pop_back(); + return kj::mv(value); + } else { + // We don't have any values, add ourselves to the waiting queue. + return waiters.wait(); + } + } + +private: + std::list<T> values; + WaiterQueue<T> waiters; +}; + +} // namespace kj + +KJ_END_HEADER