comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 69:33d812a61356
1 // Copyright (c) 2021 Cloudflare, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #pragma once
23
24 #include "async.h"
25 #include <kj/common.h>
26 #include <kj/debug.h>
27 #include <kj/list.h>
28 #include <kj/memory.h>
29
30 #include <list>
31
32 KJ_BEGIN_HEADER
33
34 namespace kj {
35
36 template <typename T>
37 class WaiterQueue {
38 public:
39 // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.
40
41 WaiterQueue() = default;
42 KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue);
43
44 Promise<T> wait() {
45 return newAdaptedPromise<T, Node>(queue);
46 }
47
48 void fulfill(T&& value) {
49 KJ_IREQUIRE(!empty());
50
51 auto& node = static_cast<Node&>(queue.front());
52 node.fulfiller.fulfill(kj::mv(value));
53 node.remove();
54 }
55
56 void reject(Exception&& exception) {
57 KJ_IREQUIRE(!empty());
58
59 auto& node = static_cast<Node&>(queue.front());
60 node.fulfiller.reject(kj::mv(exception));
61 node.remove();
62 }
63
64 bool empty() const {
65 return queue.empty();
66 }
67
68 private:
69 struct BaseNode {
70 // This is a separate structure because List requires a predefined memory layout but
71 // newAdaptedPromise() only provides access to the Adaptor type in the ctor.
72
73 BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}
74
75 PromiseFulfiller<T>& fulfiller;
76 ListLink<BaseNode> link;
77 };
78
79 using Queue = List<BaseNode, &BaseNode::link>;
80
81 struct Node: public BaseNode {
82 Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
83 queue.add(*this);
84 }
85
86 ~Node() noexcept(false) {
87 // When the associated Promise is destructed, so is the Node thus we should leave the queue.
88 remove();
89 }
90
91 void remove() {
92 if(BaseNode::link.isLinked()){
93 queue.remove(*this);
94 }
95 }
96
97 Queue& queue;
98 };
99
100 Queue queue;
101 };
102
103 template <typename T>
104 class ProducerConsumerQueue {
105 // ProducerConsumerQueue is an async FIFO queue.
106
107 public:
108 void push(T v) {
109 // Push an existing value onto the queue.
110
111 if (!waiters.empty()) {
112 // We have at least one waiter, give the value to the oldest.
113 KJ_IASSERT(values.empty());
114
115 // Fulfill the first waiter and return without store our value.
116 waiters.fulfill(kj::mv(v));
117 } else {
118 // We don't have any waiters, store the value.
119 values.push_front(kj::mv(v));
120 }
121 }
122
123 void rejectAll(Exception e) {
124 // Reject all waiters with a given exception.
125
126 while (!waiters.empty()) {
127 auto newE = Exception(e);
128 waiters.reject(kj::mv(newE));
129 }
130 }
131
132 Promise<T> pop() {
133 // Eventually pop a value from the queue.
134 // Note that if your sinks lag your sources, the promise will always be ready.
135
136 if (!values.empty()) {
137 // We have at least one value, get the oldest.
138 KJ_IASSERT(waiters.empty());
139
140 auto value = kj::mv(values.back());
141 values.pop_back();
142 return kj::mv(value);
143 } else {
144 // We don't have any values, add ourselves to the waiting queue.
145 return waiters.wait();
146 }
147 }
148
149 private:
150 std::list<T> values;
151 WaiterQueue<T> waiters;
152 };
153
154 } // namespace kj
155
156 KJ_END_HEADER