jpayne@69: // Copyright (c) 2021 Cloudflare, Inc. and contributors jpayne@69: // Licensed under the MIT License: jpayne@69: // jpayne@69: // Permission is hereby granted, free of charge, to any person obtaining a copy jpayne@69: // of this software and associated documentation files (the "Software"), to deal jpayne@69: // in the Software without restriction, including without limitation the rights jpayne@69: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell jpayne@69: // copies of the Software, and to permit persons to whom the Software is jpayne@69: // furnished to do so, subject to the following conditions: jpayne@69: // jpayne@69: // The above copyright notice and this permission notice shall be included in jpayne@69: // all copies or substantial portions of the Software. jpayne@69: // jpayne@69: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE jpayne@69: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, jpayne@69: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN jpayne@69: // THE SOFTWARE. jpayne@69: jpayne@69: #pragma once jpayne@69: jpayne@69: #include "async.h" jpayne@69: #include jpayne@69: #include jpayne@69: #include jpayne@69: #include jpayne@69: jpayne@69: #include jpayne@69: jpayne@69: KJ_BEGIN_HEADER jpayne@69: jpayne@69: namespace kj { jpayne@69: jpayne@69: template jpayne@69: class WaiterQueue { jpayne@69: public: jpayne@69: // A WaiterQueue creates Nodes that blend newAdaptedPromise and List. jpayne@69: jpayne@69: WaiterQueue() = default; jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue); jpayne@69: jpayne@69: Promise wait() { jpayne@69: return newAdaptedPromise(queue); jpayne@69: } jpayne@69: jpayne@69: void fulfill(T&& value) { jpayne@69: KJ_IREQUIRE(!empty()); jpayne@69: jpayne@69: auto& node = static_cast(queue.front()); jpayne@69: node.fulfiller.fulfill(kj::mv(value)); jpayne@69: node.remove(); jpayne@69: } jpayne@69: jpayne@69: void reject(Exception&& exception) { jpayne@69: KJ_IREQUIRE(!empty()); jpayne@69: jpayne@69: auto& node = static_cast(queue.front()); jpayne@69: node.fulfiller.reject(kj::mv(exception)); jpayne@69: node.remove(); jpayne@69: } jpayne@69: jpayne@69: bool empty() const { jpayne@69: return queue.empty(); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: struct BaseNode { jpayne@69: // This is a separate structure because List requires a predefined memory layout but jpayne@69: // newAdaptedPromise() only provides access to the Adaptor type in the ctor. jpayne@69: jpayne@69: BaseNode(PromiseFulfiller& fulfiller): fulfiller(fulfiller) {} jpayne@69: jpayne@69: PromiseFulfiller& fulfiller; jpayne@69: ListLink link; jpayne@69: }; jpayne@69: jpayne@69: using Queue = List; jpayne@69: jpayne@69: struct Node: public BaseNode { jpayne@69: Node(PromiseFulfiller& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) { jpayne@69: queue.add(*this); jpayne@69: } jpayne@69: jpayne@69: ~Node() noexcept(false) { jpayne@69: // When the associated Promise is destructed, so is the Node thus we should leave the queue. jpayne@69: remove(); jpayne@69: } jpayne@69: jpayne@69: void remove() { jpayne@69: if(BaseNode::link.isLinked()){ jpayne@69: queue.remove(*this); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: Queue& queue; jpayne@69: }; jpayne@69: jpayne@69: Queue queue; jpayne@69: }; jpayne@69: jpayne@69: template jpayne@69: class ProducerConsumerQueue { jpayne@69: // ProducerConsumerQueue is an async FIFO queue. jpayne@69: jpayne@69: public: jpayne@69: void push(T v) { jpayne@69: // Push an existing value onto the queue. jpayne@69: jpayne@69: if (!waiters.empty()) { jpayne@69: // We have at least one waiter, give the value to the oldest. jpayne@69: KJ_IASSERT(values.empty()); jpayne@69: jpayne@69: // Fulfill the first waiter and return without store our value. jpayne@69: waiters.fulfill(kj::mv(v)); jpayne@69: } else { jpayne@69: // We don't have any waiters, store the value. jpayne@69: values.push_front(kj::mv(v)); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: void rejectAll(Exception e) { jpayne@69: // Reject all waiters with a given exception. jpayne@69: jpayne@69: while (!waiters.empty()) { jpayne@69: auto newE = Exception(e); jpayne@69: waiters.reject(kj::mv(newE)); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: Promise pop() { jpayne@69: // Eventually pop a value from the queue. jpayne@69: // Note that if your sinks lag your sources, the promise will always be ready. jpayne@69: jpayne@69: if (!values.empty()) { jpayne@69: // We have at least one value, get the oldest. jpayne@69: KJ_IASSERT(waiters.empty()); jpayne@69: jpayne@69: auto value = kj::mv(values.back()); jpayne@69: values.pop_back(); jpayne@69: return kj::mv(value); jpayne@69: } else { jpayne@69: // We don't have any values, add ourselves to the waiting queue. jpayne@69: return waiters.wait(); jpayne@69: } jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: std::list values; jpayne@69: WaiterQueue waiters; jpayne@69: }; jpayne@69: jpayne@69: } // namespace kj jpayne@69: jpayne@69: KJ_END_HEADER