jpayne@69: // Copyright (c) 2017 Cloudflare, Inc. and contributors jpayne@69: // Licensed under the MIT License: jpayne@69: // jpayne@69: // Permission is hereby granted, free of charge, to any person obtaining a copy jpayne@69: // of this software and associated documentation files (the "Software"), to deal jpayne@69: // in the Software without restriction, including without limitation the rights jpayne@69: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell jpayne@69: // copies of the Software, and to permit persons to whom the Software is jpayne@69: // furnished to do so, subject to the following conditions: jpayne@69: // jpayne@69: // The above copyright notice and this permission notice shall be included in jpayne@69: // all copies or substantial portions of the Software. jpayne@69: // jpayne@69: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE jpayne@69: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, jpayne@69: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN jpayne@69: // THE SOFTWARE. jpayne@69: jpayne@69: #pragma once jpayne@69: jpayne@69: #include jpayne@69: #include jpayne@69: #include jpayne@69: jpayne@69: KJ_BEGIN_HEADER jpayne@69: jpayne@69: namespace kj { jpayne@69: jpayne@69: namespace _ { // private jpayne@69: jpayne@69: constexpr size_t KJ_GZ_BUF_SIZE = 4096; jpayne@69: jpayne@69: class GzipOutputContext final { jpayne@69: public: jpayne@69: GzipOutputContext(kj::Maybe compressionLevel); jpayne@69: ~GzipOutputContext() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(GzipOutputContext); jpayne@69: jpayne@69: void setInput(const void* in, size_t size); jpayne@69: kj::Tuple> pumpOnce(int flush); jpayne@69: jpayne@69: private: jpayne@69: bool compressing; jpayne@69: z_stream ctx = {}; jpayne@69: byte buffer[_::KJ_GZ_BUF_SIZE]; jpayne@69: jpayne@69: [[noreturn]] void fail(int result); jpayne@69: }; jpayne@69: jpayne@69: } // namespace _ (private) jpayne@69: jpayne@69: class GzipInputStream final: public InputStream { jpayne@69: public: jpayne@69: GzipInputStream(InputStream& inner); jpayne@69: ~GzipInputStream() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(GzipInputStream); jpayne@69: jpayne@69: size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override; jpayne@69: jpayne@69: private: jpayne@69: InputStream& inner; jpayne@69: z_stream ctx = {}; jpayne@69: bool atValidEndpoint = false; jpayne@69: jpayne@69: byte buffer[_::KJ_GZ_BUF_SIZE]; jpayne@69: jpayne@69: size_t readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead); jpayne@69: }; jpayne@69: jpayne@69: class GzipOutputStream final: public OutputStream { jpayne@69: public: jpayne@69: enum { DECOMPRESS }; jpayne@69: jpayne@69: GzipOutputStream(OutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION); jpayne@69: GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS)); jpayne@69: ~GzipOutputStream() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(GzipOutputStream); jpayne@69: jpayne@69: void write(const void* buffer, size_t size) override; jpayne@69: using OutputStream::write; jpayne@69: jpayne@69: inline void flush() { jpayne@69: pump(Z_SYNC_FLUSH); jpayne@69: } jpayne@69: jpayne@69: private: jpayne@69: OutputStream& inner; jpayne@69: _::GzipOutputContext ctx; jpayne@69: jpayne@69: void pump(int flush); jpayne@69: }; jpayne@69: jpayne@69: class GzipAsyncInputStream final: public AsyncInputStream { jpayne@69: public: jpayne@69: GzipAsyncInputStream(AsyncInputStream& inner); jpayne@69: ~GzipAsyncInputStream() noexcept(false); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(GzipAsyncInputStream); jpayne@69: jpayne@69: Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override; jpayne@69: jpayne@69: private: jpayne@69: AsyncInputStream& inner; jpayne@69: z_stream ctx = {}; jpayne@69: bool atValidEndpoint = false; jpayne@69: jpayne@69: byte buffer[_::KJ_GZ_BUF_SIZE]; jpayne@69: jpayne@69: Promise readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead); jpayne@69: }; jpayne@69: jpayne@69: class GzipAsyncOutputStream final: public AsyncOutputStream { jpayne@69: public: jpayne@69: enum { DECOMPRESS }; jpayne@69: jpayne@69: GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION); jpayne@69: GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS)); jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(GzipAsyncOutputStream); jpayne@69: jpayne@69: Promise write(const void* buffer, size_t size) override; jpayne@69: Promise write(ArrayPtr> pieces) override; jpayne@69: jpayne@69: Promise whenWriteDisconnected() override { return inner.whenWriteDisconnected(); } jpayne@69: jpayne@69: inline Promise flush() { jpayne@69: return pump(Z_SYNC_FLUSH); jpayne@69: } jpayne@69: // Call if you need to flush a stream at an arbitrary data point. jpayne@69: jpayne@69: Promise end() { jpayne@69: return pump(Z_FINISH); jpayne@69: } jpayne@69: // Must call to flush and finish the stream, since some data may be buffered. jpayne@69: // jpayne@69: // TODO(cleanup): This should be a virtual method on AsyncOutputStream. jpayne@69: jpayne@69: private: jpayne@69: AsyncOutputStream& inner; jpayne@69: _::GzipOutputContext ctx; jpayne@69: jpayne@69: kj::Promise pump(int flush); jpayne@69: }; jpayne@69: jpayne@69: } // namespace kj jpayne@69: jpayne@69: KJ_END_HEADER