view CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/include/kj/async-queue.h @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
line wrap: on
line source
// Copyright (c) 2021 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

#pragma once

#include "async.h"
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/list.h>
#include <kj/memory.h>

#include <list>

KJ_BEGIN_HEADER

namespace kj {

template <typename T>
class WaiterQueue {
public:
  // A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.

  WaiterQueue() = default;
  KJ_DISALLOW_COPY_AND_MOVE(WaiterQueue);

  Promise<T> wait() {
    return newAdaptedPromise<T, Node>(queue);
  }

  void fulfill(T&& value) {
    KJ_IREQUIRE(!empty());

    auto& node = static_cast<Node&>(queue.front());
    node.fulfiller.fulfill(kj::mv(value));
    node.remove();
  }

  void reject(Exception&& exception) {
    KJ_IREQUIRE(!empty());

    auto& node = static_cast<Node&>(queue.front());
    node.fulfiller.reject(kj::mv(exception));
    node.remove();
  }

  bool empty() const {
    return queue.empty();
  }

private:
  struct BaseNode {
    // This is a separate structure because List requires a predefined memory layout but
    // newAdaptedPromise() only provides access to the Adaptor type in the ctor.

    BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}

    PromiseFulfiller<T>& fulfiller;
    ListLink<BaseNode> link;
  };

  using Queue = List<BaseNode, &BaseNode::link>;

  struct Node: public BaseNode {
    Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
      queue.add(*this);
    }

    ~Node() noexcept(false) {
      // When the associated Promise is destructed, so is the Node thus we should leave the queue.
      remove();
    }

    void remove() {
      if(BaseNode::link.isLinked()){
        queue.remove(*this);
      }
    }

    Queue& queue;
  };

  Queue queue;
};

template <typename T>
class ProducerConsumerQueue {
  // ProducerConsumerQueue is an async FIFO queue.

public:
  void push(T v) {
    // Push an existing value onto the queue.

    if (!waiters.empty()) {
      // We have at least one waiter, give the value to the oldest.
      KJ_IASSERT(values.empty());

      // Fulfill the first waiter and return without store our value.
      waiters.fulfill(kj::mv(v));
    } else {
      // We don't have any waiters, store the value.
      values.push_front(kj::mv(v));
    }
  }

  void rejectAll(Exception e) {
    // Reject all waiters with a given exception.

    while (!waiters.empty()) {
      auto newE = Exception(e);
      waiters.reject(kj::mv(newE));
    }
  }

  Promise<T> pop() {
    // Eventually pop a value from the queue.
    // Note that if your sinks lag your sources, the promise will always be ready.

    if (!values.empty()) {
      // We have at least one value, get the oldest.
      KJ_IASSERT(waiters.empty());

      auto value = kj::mv(values.back());
      values.pop_back();
      return kj::mv(value);
    } else {
      // We don't have any values, add ourselves to the waiting queue.
      return waiters.wait();
    }
  }

private:
  std::list<T> values;
  WaiterQueue<T> waiters;
};

}  // namespace kj

KJ_END_HEADER