diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h	Tue Mar 18 17:55:14 2025 -0400
@@ -0,0 +1,156 @@
+// Copyright (c) 2021 Cloudflare, Inc. and contributors
+// Licensed under the MIT License:
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#pragma once
+
+#include "async.h"
+#include <kj/common.h>
+#include <kj/debug.h>
+#include <kj/list.h>
+#include <kj/memory.h>
+
+#include <list>
+
+KJ_BEGIN_HEADER
+
+namespace kj {
+
+template <typename T>
+class WaiterQueue {
+public:
+  // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.
+
+  WaiterQueue() = default;
+  KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue);
+
+  Promise<T> wait() {
+    return newAdaptedPromise<T, Node>(queue);
+  }
+
+  void fulfill(T&& value) {
+    KJ_IREQUIRE(!empty());
+
+    auto& node = static_cast<Node&>(queue.front());
+    node.fulfiller.fulfill(kj::mv(value));
+    node.remove();
+  }
+
+  void reject(Exception&& exception) {
+    KJ_IREQUIRE(!empty());
+
+    auto& node = static_cast<Node&>(queue.front());
+    node.fulfiller.reject(kj::mv(exception));
+    node.remove();
+  }
+
+  bool empty() const {
+    return queue.empty();
+  }
+
+private:
+  struct BaseNode {
+    // This is a separate structure because List requires a predefined memory layout but
+    // newAdaptedPromise() only provides access to the Adaptor type in the ctor.
+
+    BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}
+
+    PromiseFulfiller<T>& fulfiller;
+    ListLink<BaseNode> link;
+  };
+
+  using Queue = List<BaseNode, &BaseNode::link>;
+
+  struct Node: public BaseNode {
+    Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
+      queue.add(*this);
+    }
+
+    ~Node() noexcept(false) {
+      // When the associated Promise is destructed, so is the Node thus we should leave the queue.
+      remove();
+    }
+
+    void remove() {
+      if(BaseNode::link.isLinked()){
+        queue.remove(*this);
+      }
+    }
+
+    Queue& queue;
+  };
+
+  Queue queue;
+};
+
+template <typename T>
+class ProducerConsumerQueue {
+  // ProducerConsumerQueue is an async FIFO queue.
+
+public:
+  void push(T v) {
+    // Push an existing value onto the queue.
+
+    if (!waiters.empty()) {
+      // We have at least one waiter, give the value to the oldest.
+      KJ_IASSERT(values.empty());
+
+      // Fulfill the first waiter and return without store our value.
+      waiters.fulfill(kj::mv(v));
+    } else {
+      // We don't have any waiters, store the value.
+      values.push_front(kj::mv(v));
+    }
+  }
+
+  void rejectAll(Exception e) {
+    // Reject all waiters with a given exception.
+
+    while (!waiters.empty()) {
+      auto newE = Exception(e);
+      waiters.reject(kj::mv(newE));
+    }
+  }
+
+  Promise<T> pop() {
+    // Eventually pop a value from the queue.
+    // Note that if your sinks lag your sources, the promise will always be ready.
+
+    if (!values.empty()) {
+      // We have at least one value, get the oldest.
+      KJ_IASSERT(waiters.empty());
+
+      auto value = kj::mv(values.back());
+      values.pop_back();
+      return kj::mv(value);
+    } else {
+      // We don't have any values, add ourselves to the waiting queue.
+      return waiters.wait();
+    }
+  }
+
+private:
+  std::list<T> values;
+  WaiterQueue<T> waiters;
+};
+
+}  // namespace kj
+
+KJ_END_HEADER