Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 // Copyright (c) 2021 Cloudflare, Inc. and contributors | |
2 // Licensed under the MIT License: | |
3 // | |
4 // Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 // of this software and associated documentation files (the "Software"), to deal | |
6 // in the Software without restriction, including without limitation the rights | |
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 // copies of the Software, and to permit persons to whom the Software is | |
9 // furnished to do so, subject to the following conditions: | |
10 // | |
11 // The above copyright notice and this permission notice shall be included in | |
12 // all copies or substantial portions of the Software. | |
13 // | |
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 // THE SOFTWARE. | |
21 | |
22 #pragma once | |
23 | |
24 #include "async.h" | |
25 #include <kj/common.h> | |
26 #include <kj/debug.h> | |
27 #include <kj/list.h> | |
28 #include <kj/memory.h> | |
29 | |
30 #include <list> | |
31 | |
32 KJ_BEGIN_HEADER | |
33 | |
34 namespace kj { | |
35 | |
36 template <typename T> | |
37 class WaiterQueue { | |
38 public: | |
39 // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>. | |
40 | |
41 WaiterQueue() = default; | |
42 KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue); | |
43 | |
44 Promise<T> wait() { | |
45 return newAdaptedPromise<T, Node>(queue); | |
46 } | |
47 | |
48 void fulfill(T&& value) { | |
49 KJ_IREQUIRE(!empty()); | |
50 | |
51 auto& node = static_cast<Node&>(queue.front()); | |
52 node.fulfiller.fulfill(kj::mv(value)); | |
53 node.remove(); | |
54 } | |
55 | |
56 void reject(Exception&& exception) { | |
57 KJ_IREQUIRE(!empty()); | |
58 | |
59 auto& node = static_cast<Node&>(queue.front()); | |
60 node.fulfiller.reject(kj::mv(exception)); | |
61 node.remove(); | |
62 } | |
63 | |
64 bool empty() const { | |
65 return queue.empty(); | |
66 } | |
67 | |
68 private: | |
69 struct BaseNode { | |
70 // This is a separate structure because List requires a predefined memory layout but | |
71 // newAdaptedPromise() only provides access to the Adaptor type in the ctor. | |
72 | |
73 BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {} | |
74 | |
75 PromiseFulfiller<T>& fulfiller; | |
76 ListLink<BaseNode> link; | |
77 }; | |
78 | |
79 using Queue = List<BaseNode, &BaseNode::link>; | |
80 | |
81 struct Node: public BaseNode { | |
82 Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) { | |
83 queue.add(*this); | |
84 } | |
85 | |
86 ~Node() noexcept(false) { | |
87 // When the associated Promise is destructed, so is the Node thus we should leave the queue. | |
88 remove(); | |
89 } | |
90 | |
91 void remove() { | |
92 if(BaseNode::link.isLinked()){ | |
93 queue.remove(*this); | |
94 } | |
95 } | |
96 | |
97 Queue& queue; | |
98 }; | |
99 | |
100 Queue queue; | |
101 }; | |
102 | |
103 template <typename T> | |
104 class ProducerConsumerQueue { | |
105 // ProducerConsumerQueue is an async FIFO queue. | |
106 | |
107 public: | |
108 void push(T v) { | |
109 // Push an existing value onto the queue. | |
110 | |
111 if (!waiters.empty()) { | |
112 // We have at least one waiter, give the value to the oldest. | |
113 KJ_IASSERT(values.empty()); | |
114 | |
115 // Fulfill the first waiter and return without store our value. | |
116 waiters.fulfill(kj::mv(v)); | |
117 } else { | |
118 // We don't have any waiters, store the value. | |
119 values.push_front(kj::mv(v)); | |
120 } | |
121 } | |
122 | |
123 void rejectAll(Exception e) { | |
124 // Reject all waiters with a given exception. | |
125 | |
126 while (!waiters.empty()) { | |
127 auto newE = Exception(e); | |
128 waiters.reject(kj::mv(newE)); | |
129 } | |
130 } | |
131 | |
132 Promise<T> pop() { | |
133 // Eventually pop a value from the queue. | |
134 // Note that if your sinks lag your sources, the promise will always be ready. | |
135 | |
136 if (!values.empty()) { | |
137 // We have at least one value, get the oldest. | |
138 KJ_IASSERT(waiters.empty()); | |
139 | |
140 auto value = kj::mv(values.back()); | |
141 values.pop_back(); | |
142 return kj::mv(value); | |
143 } else { | |
144 // We don't have any values, add ourselves to the waiting queue. | |
145 return waiters.wait(); | |
146 } | |
147 } | |
148 | |
149 private: | |
150 std::list<T> values; | |
151 WaiterQueue<T> waiters; | |
152 }; | |
153 | |
154 } // namespace kj | |
155 | |
156 KJ_END_HEADER |