jpayne@69
|
1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
|
jpayne@69
|
2 // Licensed under the MIT License:
|
jpayne@69
|
3 //
|
jpayne@69
|
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
jpayne@69
|
5 // of this software and associated documentation files (the "Software"), to deal
|
jpayne@69
|
6 // in the Software without restriction, including without limitation the rights
|
jpayne@69
|
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
jpayne@69
|
8 // copies of the Software, and to permit persons to whom the Software is
|
jpayne@69
|
9 // furnished to do so, subject to the following conditions:
|
jpayne@69
|
10 //
|
jpayne@69
|
11 // The above copyright notice and this permission notice shall be included in
|
jpayne@69
|
12 // all copies or substantial portions of the Software.
|
jpayne@69
|
13 //
|
jpayne@69
|
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
jpayne@69
|
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
jpayne@69
|
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
jpayne@69
|
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
jpayne@69
|
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
jpayne@69
|
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
jpayne@69
|
20 // THE SOFTWARE.
|
jpayne@69
|
21
|
jpayne@69
|
22 #pragma once
|
jpayne@69
|
23
|
jpayne@69
|
24 #include <kj/async-io.h>
|
jpayne@69
|
25
|
jpayne@69
|
26 KJ_BEGIN_HEADER
|
jpayne@69
|
27
|
jpayne@69
|
28 namespace kj {
|
jpayne@69
|
29
|
jpayne@69
|
30 class ReadyInputStreamWrapper {
|
jpayne@69
|
31 // Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for
|
jpayne@69
|
32 // compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL).
|
jpayne@69
|
33 //
|
jpayne@69
|
34 // Unfortunately this requires buffering, so is not very efficient.
|
jpayne@69
|
35
|
jpayne@69
|
36 public:
|
jpayne@69
|
37 ReadyInputStreamWrapper(AsyncInputStream& input);
|
jpayne@69
|
38 ~ReadyInputStreamWrapper() noexcept(false);
|
jpayne@69
|
39 KJ_DISALLOW_COPY_AND_MOVE(ReadyInputStreamWrapper);
|
jpayne@69
|
40
|
jpayne@69
|
41 kj::Maybe<size_t> read(kj::ArrayPtr<byte> dst);
|
jpayne@69
|
42 // Reads bytes into `dst`, returning the number of bytes read. Returns zero only at EOF. Returns
|
jpayne@69
|
43 // nullptr if not ready.
|
jpayne@69
|
44
|
jpayne@69
|
45 kj::Promise<void> whenReady();
|
jpayne@69
|
46 // Returns a promise that resolves when read() will return non-null.
|
jpayne@69
|
47
|
jpayne@69
|
48 bool isAtEnd() { return eof; }
|
jpayne@69
|
49 // Returns true if read() would return zero.
|
jpayne@69
|
50
|
jpayne@69
|
51 private:
|
jpayne@69
|
52 AsyncInputStream& input;
|
jpayne@69
|
53 kj::ForkedPromise<void> pumpTask = nullptr;
|
jpayne@69
|
54 bool isPumping = false;
|
jpayne@69
|
55 bool eof = false;
|
jpayne@69
|
56
|
jpayne@69
|
57 kj::ArrayPtr<const byte> content = nullptr; // Points to currently-valid part of `buffer`.
|
jpayne@69
|
58 byte buffer[8192];
|
jpayne@69
|
59 };
|
jpayne@69
|
60
|
jpayne@69
|
61 class ReadyOutputStreamWrapper {
|
jpayne@69
|
62 // Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for
|
jpayne@69
|
63 // compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL).
|
jpayne@69
|
64 //
|
jpayne@69
|
65 // Unfortunately this requires buffering, so is not very efficient.
|
jpayne@69
|
66
|
jpayne@69
|
67 public:
|
jpayne@69
|
68 ReadyOutputStreamWrapper(AsyncOutputStream& output);
|
jpayne@69
|
69 ~ReadyOutputStreamWrapper() noexcept(false);
|
jpayne@69
|
70 KJ_DISALLOW_COPY_AND_MOVE(ReadyOutputStreamWrapper);
|
jpayne@69
|
71
|
jpayne@69
|
72 kj::Maybe<size_t> write(kj::ArrayPtr<const byte> src);
|
jpayne@69
|
73 // Writes bytes from `src`, returning the number of bytes written. Never returns zero for
|
jpayne@69
|
74 // a non-empty `src`. Returns nullptr if not ready.
|
jpayne@69
|
75
|
jpayne@69
|
76 kj::Promise<void> whenReady();
|
jpayne@69
|
77 // Returns a promise that resolves when write() will return non-null.
|
jpayne@69
|
78
|
jpayne@69
|
79 class Cork;
|
jpayne@69
|
80 // An object that, when destructed, will uncork its parent stream.
|
jpayne@69
|
81
|
jpayne@69
|
82 Cork cork();
|
jpayne@69
|
83 // After calling, data won't be pumped until either the internal buffer fills up or the returned
|
jpayne@69
|
84 // object is destructed. Use this if you know multiple small write() calls will be happening in
|
jpayne@69
|
85 // the near future and want to flush them all at once.
|
jpayne@69
|
86 // Once the returned object is destructed, behavior goes back to normal. The returned object
|
jpayne@69
|
87 // must be destructed before the ReadyOutputStreamWrapper.
|
jpayne@69
|
88 // TODO(perf): This is an ugly hack to avoid sending lots of tiny packets when using TLS, which
|
jpayne@69
|
89 // has to work around OpenSSL's readiness-based I/O layer. We could certainly do better here.
|
jpayne@69
|
90
|
jpayne@69
|
91 private:
|
jpayne@69
|
92 AsyncOutputStream& output;
|
jpayne@69
|
93 ArrayPtr<const byte> segments[2];
|
jpayne@69
|
94 kj::ForkedPromise<void> pumpTask = nullptr;
|
jpayne@69
|
95 bool isPumping = false;
|
jpayne@69
|
96 bool corked = false;
|
jpayne@69
|
97
|
jpayne@69
|
98 uint start = 0; // index of first byte
|
jpayne@69
|
99 uint filled = 0; // number of bytes currently in buffer
|
jpayne@69
|
100
|
jpayne@69
|
101 byte buffer[8192];
|
jpayne@69
|
102
|
jpayne@69
|
103 void uncork();
|
jpayne@69
|
104
|
jpayne@69
|
105 kj::Promise<void> pump();
|
jpayne@69
|
106 // Asynchronously push the buffer out to the underlying stream.
|
jpayne@69
|
107 };
|
jpayne@69
|
108
|
jpayne@69
|
109 class ReadyOutputStreamWrapper::Cork {
|
jpayne@69
|
110 // An object that, when destructed, will uncork its parent stream.
|
jpayne@69
|
111 public:
|
jpayne@69
|
112 ~Cork() {
|
jpayne@69
|
113 KJ_IF_MAYBE(p, parent) {
|
jpayne@69
|
114 p->uncork();
|
jpayne@69
|
115 }
|
jpayne@69
|
116 }
|
jpayne@69
|
117 Cork(Cork&& other) : parent(kj::mv(other.parent)) {
|
jpayne@69
|
118 other.parent = nullptr;
|
jpayne@69
|
119 }
|
jpayne@69
|
120 KJ_DISALLOW_COPY(Cork);
|
jpayne@69
|
121
|
jpayne@69
|
122 private:
|
jpayne@69
|
123 Cork(ReadyOutputStreamWrapper& parent) : parent(parent) {}
|
jpayne@69
|
124
|
jpayne@69
|
125 kj::Maybe<ReadyOutputStreamWrapper&> parent;
|
jpayne@69
|
126 friend class ReadyOutputStreamWrapper;
|
jpayne@69
|
127 };
|
jpayne@69
|
128
|
jpayne@69
|
129 } // namespace kj
|
jpayne@69
|
130
|
jpayne@69
|
131 KJ_END_HEADER
|