jpayne@69: // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors jpayne@69: // Licensed under the MIT License: jpayne@69: // jpayne@69: // Permission is hereby granted, free of charge, to any person obtaining a copy jpayne@69: // of this software and associated documentation files (the "Software"), to deal jpayne@69: // in the Software without restriction, including without limitation the rights jpayne@69: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell jpayne@69: // copies of the Software, and to permit persons to whom the Software is jpayne@69: // furnished to do so, subject to the following conditions: jpayne@69: // jpayne@69: // The above copyright notice and this permission notice shall be included in jpayne@69: // all copies or substantial portions of the Software. jpayne@69: // jpayne@69: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE jpayne@69: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, jpayne@69: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN jpayne@69: // THE SOFTWARE. jpayne@69: jpayne@69: #pragma once jpayne@69: jpayne@69: #include jpayne@69: jpayne@69: KJ_BEGIN_HEADER jpayne@69: jpayne@69: namespace kj { jpayne@69: jpayne@69: class ReadyInputStreamWrapper { jpayne@69: // Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for jpayne@69: // compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL). jpayne@69: // jpayne@69: // Unfortunately this requires buffering, so is not very efficient. jpayne@69: jpayne@69: public: jpayne@69: ReadyInputStreamWrapper(AsyncInputStream& input); jpayne@69: ~ReadyInputStreamWrapper() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(ReadyInputStreamWrapper); jpayne@69: jpayne@69: kj::Maybe read(kj::ArrayPtr dst); jpayne@69: // Reads bytes into `dst`, returning the number of bytes read. Returns zero only at EOF. Returns jpayne@69: // nullptr if not ready. jpayne@69: jpayne@69: kj::Promise whenReady(); jpayne@69: // Returns a promise that resolves when read() will return non-null. jpayne@69: jpayne@69: bool isAtEnd() { return eof; } jpayne@69: // Returns true if read() would return zero. jpayne@69: jpayne@69: private: jpayne@69: AsyncInputStream& input; jpayne@69: kj::ForkedPromise pumpTask = nullptr; jpayne@69: bool isPumping = false; jpayne@69: bool eof = false; jpayne@69: jpayne@69: kj::ArrayPtr content = nullptr; // Points to currently-valid part of `buffer`. jpayne@69: byte buffer[8192]; jpayne@69: }; jpayne@69: jpayne@69: class ReadyOutputStreamWrapper { jpayne@69: // Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for jpayne@69: // compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL). jpayne@69: // jpayne@69: // Unfortunately this requires buffering, so is not very efficient. jpayne@69: jpayne@69: public: jpayne@69: ReadyOutputStreamWrapper(AsyncOutputStream& output); jpayne@69: ~ReadyOutputStreamWrapper() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(ReadyOutputStreamWrapper); jpayne@69: jpayne@69: kj::Maybe write(kj::ArrayPtr src); jpayne@69: // Writes bytes from `src`, returning the number of bytes written. Never returns zero for jpayne@69: // a non-empty `src`. Returns nullptr if not ready. jpayne@69: jpayne@69: kj::Promise whenReady(); jpayne@69: // Returns a promise that resolves when write() will return non-null. jpayne@69: jpayne@69: class Cork; jpayne@69: // An object that, when destructed, will uncork its parent stream. jpayne@69: jpayne@69: Cork cork(); jpayne@69: // After calling, data won't be pumped until either the internal buffer fills up or the returned jpayne@69: // object is destructed. Use this if you know multiple small write() calls will be happening in jpayne@69: // the near future and want to flush them all at once. jpayne@69: // Once the returned object is destructed, behavior goes back to normal. The returned object jpayne@69: // must be destructed before the ReadyOutputStreamWrapper. jpayne@69: // TODO(perf): This is an ugly hack to avoid sending lots of tiny packets when using TLS, which jpayne@69: // has to work around OpenSSL's readiness-based I/O layer. We could certainly do better here. jpayne@69: jpayne@69: private: jpayne@69: AsyncOutputStream& output; jpayne@69: ArrayPtr segments[2]; jpayne@69: kj::ForkedPromise pumpTask = nullptr; jpayne@69: bool isPumping = false; jpayne@69: bool corked = false; jpayne@69: jpayne@69: uint start = 0; // index of first byte jpayne@69: uint filled = 0; // number of bytes currently in buffer jpayne@69: jpayne@69: byte buffer[8192]; jpayne@69: jpayne@69: void uncork(); jpayne@69: jpayne@69: kj::Promise pump(); jpayne@69: // Asynchronously push the buffer out to the underlying stream. jpayne@69: }; jpayne@69: jpayne@69: class ReadyOutputStreamWrapper::Cork { jpayne@69: // An object that, when destructed, will uncork its parent stream. jpayne@69: public: jpayne@69: ~Cork() { jpayne@69: KJ_IF_MAYBE(p, parent) { jpayne@69: p->uncork(); jpayne@69: } jpayne@69: } jpayne@69: Cork(Cork&& other) : parent(kj::mv(other.parent)) { jpayne@69: other.parent = nullptr; jpayne@69: } jpayne@69: KJ_DISALLOW_COPY(Cork); jpayne@69: jpayne@69: private: jpayne@69: Cork(ReadyOutputStreamWrapper& parent) : parent(parent) {} jpayne@69: jpayne@69: kj::Maybe parent; jpayne@69: friend class ReadyOutputStreamWrapper; jpayne@69: }; jpayne@69: jpayne@69: } // namespace kj jpayne@69: jpayne@69: KJ_END_HEADER