annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 // Copyright (c) 2021 Cloudflare, Inc. and contributors
jpayne@69 2 // Licensed under the MIT License:
jpayne@69 3 //
jpayne@69 4 // Permission is hereby granted, free of charge, to any person obtaining a copy
jpayne@69 5 // of this software and associated documentation files (the "Software"), to deal
jpayne@69 6 // in the Software without restriction, including without limitation the rights
jpayne@69 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
jpayne@69 8 // copies of the Software, and to permit persons to whom the Software is
jpayne@69 9 // furnished to do so, subject to the following conditions:
jpayne@69 10 //
jpayne@69 11 // The above copyright notice and this permission notice shall be included in
jpayne@69 12 // all copies or substantial portions of the Software.
jpayne@69 13 //
jpayne@69 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
jpayne@69 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
jpayne@69 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
jpayne@69 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
jpayne@69 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
jpayne@69 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
jpayne@69 20 // THE SOFTWARE.
jpayne@69 21
jpayne@69 22 #pragma once
jpayne@69 23
jpayne@69 24 #include "async.h"
jpayne@69 25 #include <kj/common.h>
jpayne@69 26 #include <kj/debug.h>
jpayne@69 27 #include <kj/list.h>
jpayne@69 28 #include <kj/memory.h>
jpayne@69 29
jpayne@69 30 #include <list>
jpayne@69 31
jpayne@69 32 KJ_BEGIN_HEADER
jpayne@69 33
jpayne@69 34 namespace kj {
jpayne@69 35
jpayne@69 36 template <typename T>
jpayne@69 37 class WaiterQueue {
jpayne@69 38 public:
jpayne@69 39 // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.
jpayne@69 40
jpayne@69 41 WaiterQueue() = default;
jpayne@69 42 KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue);
jpayne@69 43
jpayne@69 44 Promise<T> wait() {
jpayne@69 45 return newAdaptedPromise<T, Node>(queue);
jpayne@69 46 }
jpayne@69 47
jpayne@69 48 void fulfill(T&& value) {
jpayne@69 49 KJ_IREQUIRE(!empty());
jpayne@69 50
jpayne@69 51 auto& node = static_cast<Node&>(queue.front());
jpayne@69 52 node.fulfiller.fulfill(kj::mv(value));
jpayne@69 53 node.remove();
jpayne@69 54 }
jpayne@69 55
jpayne@69 56 void reject(Exception&& exception) {
jpayne@69 57 KJ_IREQUIRE(!empty());
jpayne@69 58
jpayne@69 59 auto& node = static_cast<Node&>(queue.front());
jpayne@69 60 node.fulfiller.reject(kj::mv(exception));
jpayne@69 61 node.remove();
jpayne@69 62 }
jpayne@69 63
jpayne@69 64 bool empty() const {
jpayne@69 65 return queue.empty();
jpayne@69 66 }
jpayne@69 67
jpayne@69 68 private:
jpayne@69 69 struct BaseNode {
jpayne@69 70 // This is a separate structure because List requires a predefined memory layout but
jpayne@69 71 // newAdaptedPromise() only provides access to the Adaptor type in the ctor.
jpayne@69 72
jpayne@69 73 BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}
jpayne@69 74
jpayne@69 75 PromiseFulfiller<T>& fulfiller;
jpayne@69 76 ListLink<BaseNode> link;
jpayne@69 77 };
jpayne@69 78
jpayne@69 79 using Queue = List<BaseNode, &BaseNode::link>;
jpayne@69 80
jpayne@69 81 struct Node: public BaseNode {
jpayne@69 82 Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
jpayne@69 83 queue.add(*this);
jpayne@69 84 }
jpayne@69 85
jpayne@69 86 ~Node() noexcept(false) {
jpayne@69 87 // When the associated Promise is destructed, so is the Node thus we should leave the queue.
jpayne@69 88 remove();
jpayne@69 89 }
jpayne@69 90
jpayne@69 91 void remove() {
jpayne@69 92 if(BaseNode::link.isLinked()){
jpayne@69 93 queue.remove(*this);
jpayne@69 94 }
jpayne@69 95 }
jpayne@69 96
jpayne@69 97 Queue& queue;
jpayne@69 98 };
jpayne@69 99
jpayne@69 100 Queue queue;
jpayne@69 101 };
jpayne@69 102
jpayne@69 103 template <typename T>
jpayne@69 104 class ProducerConsumerQueue {
jpayne@69 105 // ProducerConsumerQueue is an async FIFO queue.
jpayne@69 106
jpayne@69 107 public:
jpayne@69 108 void push(T v) {
jpayne@69 109 // Push an existing value onto the queue.
jpayne@69 110
jpayne@69 111 if (!waiters.empty()) {
jpayne@69 112 // We have at least one waiter, give the value to the oldest.
jpayne@69 113 KJ_IASSERT(values.empty());
jpayne@69 114
jpayne@69 115 // Fulfill the first waiter and return without store our value.
jpayne@69 116 waiters.fulfill(kj::mv(v));
jpayne@69 117 } else {
jpayne@69 118 // We don't have any waiters, store the value.
jpayne@69 119 values.push_front(kj::mv(v));
jpayne@69 120 }
jpayne@69 121 }
jpayne@69 122
jpayne@69 123 void rejectAll(Exception e) {
jpayne@69 124 // Reject all waiters with a given exception.
jpayne@69 125
jpayne@69 126 while (!waiters.empty()) {
jpayne@69 127 auto newE = Exception(e);
jpayne@69 128 waiters.reject(kj::mv(newE));
jpayne@69 129 }
jpayne@69 130 }
jpayne@69 131
jpayne@69 132 Promise<T> pop() {
jpayne@69 133 // Eventually pop a value from the queue.
jpayne@69 134 // Note that if your sinks lag your sources, the promise will always be ready.
jpayne@69 135
jpayne@69 136 if (!values.empty()) {
jpayne@69 137 // We have at least one value, get the oldest.
jpayne@69 138 KJ_IASSERT(waiters.empty());
jpayne@69 139
jpayne@69 140 auto value = kj::mv(values.back());
jpayne@69 141 values.pop_back();
jpayne@69 142 return kj::mv(value);
jpayne@69 143 } else {
jpayne@69 144 // We don't have any values, add ourselves to the waiting queue.
jpayne@69 145 return waiters.wait();
jpayne@69 146 }
jpayne@69 147 }
jpayne@69 148
jpayne@69 149 private:
jpayne@69 150 std::list<T> values;
jpayne@69 151 WaiterQueue<T> waiters;
jpayne@69 152 };
jpayne@69 153
jpayne@69 154 } // namespace kj
jpayne@69 155
jpayne@69 156 KJ_END_HEADER