jpayne@69
|
1 // Copyright (c) 2021 Cloudflare, Inc. and contributors
|
jpayne@69
|
2 // Licensed under the MIT License:
|
jpayne@69
|
3 //
|
jpayne@69
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
jpayne@69
|
5 // of this software and associated documentation files (the "Software"), to deal
|
jpayne@69
|
6 // in the Software without restriction, including without limitation the rights
|
jpayne@69
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
jpayne@69
|
8 // copies of the Software, and to permit persons to whom the Software is
|
jpayne@69
|
9 // furnished to do so, subject to the following conditions:
|
jpayne@69
|
10 //
|
jpayne@69
|
11 // The above copyright notice and this permission notice shall be included in
|
jpayne@69
|
12 // all copies or substantial portions of the Software.
|
jpayne@69
|
13 //
|
jpayne@69
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
jpayne@69
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
jpayne@69
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
jpayne@69
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
jpayne@69
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
jpayne@69
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
jpayne@69
|
20 // THE SOFTWARE.
|
jpayne@69
|
21
|
jpayne@69
|
22 #pragma once
|
jpayne@69
|
23
|
jpayne@69
|
24 #include "async.h"
|
jpayne@69
|
25 #include <kj/common.h>
|
jpayne@69
|
26 #include <kj/debug.h>
|
jpayne@69
|
27 #include <kj/list.h>
|
jpayne@69
|
28 #include <kj/memory.h>
|
jpayne@69
|
29
|
jpayne@69
|
30 #include <list>
|
jpayne@69
|
31
|
jpayne@69
|
32 KJ_BEGIN_HEADER
|
jpayne@69
|
33
|
jpayne@69
|
34 namespace kj {
|
jpayne@69
|
35
|
jpayne@69
|
36 template <typename T>
|
jpayne@69
|
37 class WaiterQueue {
|
jpayne@69
|
38 public:
|
jpayne@69
|
39 // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.
|
jpayne@69
|
40
|
jpayne@69
|
41 WaiterQueue() = default;
|
jpayne@69
|
42 KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue);
|
jpayne@69
|
43
|
jpayne@69
|
44 Promise<T> wait() {
|
jpayne@69
|
45 return newAdaptedPromise<T, Node>(queue);
|
jpayne@69
|
46 }
|
jpayne@69
|
47
|
jpayne@69
|
48 void fulfill(T&& value) {
|
jpayne@69
|
49 KJ_IREQUIRE(!empty());
|
jpayne@69
|
50
|
jpayne@69
|
51 auto& node = static_cast<Node&>(queue.front());
|
jpayne@69
|
52 node.fulfiller.fulfill(kj::mv(value));
|
jpayne@69
|
53 node.remove();
|
jpayne@69
|
54 }
|
jpayne@69
|
55
|
jpayne@69
|
56 void reject(Exception&& exception) {
|
jpayne@69
|
57 KJ_IREQUIRE(!empty());
|
jpayne@69
|
58
|
jpayne@69
|
59 auto& node = static_cast<Node&>(queue.front());
|
jpayne@69
|
60 node.fulfiller.reject(kj::mv(exception));
|
jpayne@69
|
61 node.remove();
|
jpayne@69
|
62 }
|
jpayne@69
|
63
|
jpayne@69
|
64 bool empty() const {
|
jpayne@69
|
65 return queue.empty();
|
jpayne@69
|
66 }
|
jpayne@69
|
67
|
jpayne@69
|
68 private:
|
jpayne@69
|
69 struct BaseNode {
|
jpayne@69
|
70 // This is a separate structure because List requires a predefined memory layout but
|
jpayne@69
|
71 // newAdaptedPromise() only provides access to the Adaptor type in the ctor.
|
jpayne@69
|
72
|
jpayne@69
|
73 BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}
|
jpayne@69
|
74
|
jpayne@69
|
75 PromiseFulfiller<T>& fulfiller;
|
jpayne@69
|
76 ListLink<BaseNode> link;
|
jpayne@69
|
77 };
|
jpayne@69
|
78
|
jpayne@69
|
79 using Queue = List<BaseNode, &BaseNode::link>;
|
jpayne@69
|
80
|
jpayne@69
|
81 struct Node: public BaseNode {
|
jpayne@69
|
82 Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
|
jpayne@69
|
83 queue.add(*this);
|
jpayne@69
|
84 }
|
jpayne@69
|
85
|
jpayne@69
|
86 ~Node() noexcept(false) {
|
jpayne@69
|
87 // When the associated Promise is destructed, so is the Node thus we should leave the queue.
|
jpayne@69
|
88 remove();
|
jpayne@69
|
89 }
|
jpayne@69
|
90
|
jpayne@69
|
91 void remove() {
|
jpayne@69
|
92 if(BaseNode::link.isLinked()){
|
jpayne@69
|
93 queue.remove(*this);
|
jpayne@69
|
94 }
|
jpayne@69
|
95 }
|
jpayne@69
|
96
|
jpayne@69
|
97 Queue& queue;
|
jpayne@69
|
98 };
|
jpayne@69
|
99
|
jpayne@69
|
100 Queue queue;
|
jpayne@69
|
101 };
|
jpayne@69
|
102
|
jpayne@69
|
103 template <typename T>
|
jpayne@69
|
104 class ProducerConsumerQueue {
|
jpayne@69
|
105 // ProducerConsumerQueue is an async FIFO queue.
|
jpayne@69
|
106
|
jpayne@69
|
107 public:
|
jpayne@69
|
108 void push(T v) {
|
jpayne@69
|
109 // Push an existing value onto the queue.
|
jpayne@69
|
110
|
jpayne@69
|
111 if (!waiters.empty()) {
|
jpayne@69
|
112 // We have at least one waiter, give the value to the oldest.
|
jpayne@69
|
113 KJ_IASSERT(values.empty());
|
jpayne@69
|
114
|
jpayne@69
|
115 // Fulfill the first waiter and return without store our value.
|
jpayne@69
|
116 waiters.fulfill(kj::mv(v));
|
jpayne@69
|
117 } else {
|
jpayne@69
|
118 // We don't have any waiters, store the value.
|
jpayne@69
|
119 values.push_front(kj::mv(v));
|
jpayne@69
|
120 }
|
jpayne@69
|
121 }
|
jpayne@69
|
122
|
jpayne@69
|
123 void rejectAll(Exception e) {
|
jpayne@69
|
124 // Reject all waiters with a given exception.
|
jpayne@69
|
125
|
jpayne@69
|
126 while (!waiters.empty()) {
|
jpayne@69
|
127 auto newE = Exception(e);
|
jpayne@69
|
128 waiters.reject(kj::mv(newE));
|
jpayne@69
|
129 }
|
jpayne@69
|
130 }
|
jpayne@69
|
131
|
jpayne@69
|
132 Promise<T> pop() {
|
jpayne@69
|
133 // Eventually pop a value from the queue.
|
jpayne@69
|
134 // Note that if your sinks lag your sources, the promise will always be ready.
|
jpayne@69
|
135
|
jpayne@69
|
136 if (!values.empty()) {
|
jpayne@69
|
137 // We have at least one value, get the oldest.
|
jpayne@69
|
138 KJ_IASSERT(waiters.empty());
|
jpayne@69
|
139
|
jpayne@69
|
140 auto value = kj::mv(values.back());
|
jpayne@69
|
141 values.pop_back();
|
jpayne@69
|
142 return kj::mv(value);
|
jpayne@69
|
143 } else {
|
jpayne@69
|
144 // We don't have any values, add ourselves to the waiting queue.
|
jpayne@69
|
145 return waiters.wait();
|
jpayne@69
|
146 }
|
jpayne@69
|
147 }
|
jpayne@69
|
148
|
jpayne@69
|
149 private:
|
jpayne@69
|
150 std::list<T> values;
|
jpayne@69
|
151 WaiterQueue<T> waiters;
|
jpayne@69
|
152 };
|
jpayne@69
|
153
|
jpayne@69
|
154 } // namespace kj
|
jpayne@69
|
155
|
jpayne@69
|
156 KJ_END_HEADER
|