jpayne@69: // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors jpayne@69: // Licensed under the MIT License: jpayne@69: // jpayne@69: // Permission is hereby granted, free of charge, to any person obtaining a copy jpayne@69: // of this software and associated documentation files (the "Software"), to deal jpayne@69: // in the Software without restriction, including without limitation the rights jpayne@69: // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell jpayne@69: // copies of the Software, and to permit persons to whom the Software is jpayne@69: // furnished to do so, subject to the following conditions: jpayne@69: // jpayne@69: // The above copyright notice and this permission notice shall be included in jpayne@69: // all copies or substantial portions of the Software. jpayne@69: // jpayne@69: // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR jpayne@69: // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, jpayne@69: // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE jpayne@69: // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER jpayne@69: // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, jpayne@69: // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN jpayne@69: // THE SOFTWARE. jpayne@69: jpayne@69: #pragma once jpayne@69: jpayne@69: #if _WIN32 jpayne@69: #error "This file is Unix-specific. On Windows, include async-win32.h instead." jpayne@69: #endif jpayne@69: jpayne@69: #include "async.h" jpayne@69: #include "timer.h" jpayne@69: #include jpayne@69: #include jpayne@69: #include jpayne@69: jpayne@69: KJ_BEGIN_HEADER jpayne@69: jpayne@69: #if !defined(KJ_USE_EPOLL) && !defined(KJ_USE_KQUEUE) jpayne@69: #if __linux__ jpayne@69: // Default to epoll on Linux. jpayne@69: #define KJ_USE_EPOLL 1 jpayne@69: #elif __APPLE__ || __FreeBSD__ || __OpenBSD__ || __NetBSD__ || __DragonFly__ jpayne@69: // MacOS and BSDs prefer kqueue() for event notification. jpayne@69: #define KJ_USE_KQUEUE 1 jpayne@69: #endif jpayne@69: #endif jpayne@69: jpayne@69: #if KJ_USE_EPOLL && KJ_USE_KQUEUE jpayne@69: #error "Both KJ_USE_EPOLL and KJ_USE_KQUEUE are set. Please choose only one of these." jpayne@69: #endif jpayne@69: jpayne@69: #if __CYGWIN__ && !defined(KJ_USE_PIPE_FOR_WAKEUP) jpayne@69: // Cygwin has serious issues with the intersection of signals and threads, reported here: jpayne@69: // https://cygwin.com/ml/cygwin/2019-07/msg00052.html jpayne@69: // On Cygwin, therefore, we do not use signals to wake threads. Instead, each thread allocates a jpayne@69: // pipe, and we write a byte to the pipe to wake the thread... ick. jpayne@69: #define KJ_USE_PIPE_FOR_WAKEUP 1 jpayne@69: #endif jpayne@69: jpayne@69: #if KJ_USE_EPOLL jpayne@69: struct epoll_event; jpayne@69: #elif KJ_USE_KQUEUE jpayne@69: struct kevent; jpayne@69: struct timespec; jpayne@69: #endif jpayne@69: jpayne@69: namespace kj { jpayne@69: jpayne@69: class UnixEventPort: public EventPort { jpayne@69: // An EventPort implementation which can wait for events on file descriptors as well as signals. jpayne@69: // This API only makes sense on Unix. jpayne@69: // jpayne@69: // The implementation uses `poll()` or possibly a platform-specific API (e.g. epoll, kqueue). jpayne@69: // To also wait on signals without race conditions, the implementation may block signals until jpayne@69: // just before `poll()` while using a signal handler which `siglongjmp()`s back to just before jpayne@69: // the signal was unblocked, or it may use a nicer platform-specific API. jpayne@69: // jpayne@69: // The implementation reserves a signal for internal use. By default, it uses SIGUSR1. If you jpayne@69: // need to use SIGUSR1 for something else, you must offer a different signal by calling jpayne@69: // setReservedSignal() at startup. (On Linux, no signal is reserved; eventfd is used instead.) jpayne@69: // jpayne@69: // WARNING: A UnixEventPort can only be used in the thread and process that created it. In jpayne@69: // particular, note that after a fork(), a UnixEventPort created in the parent process will jpayne@69: // not work correctly in the child, even if the parent ceases to use its copy. In particular jpayne@69: // note that this means that server processes which daemonize themselves at startup must wait jpayne@69: // until after daemonization to create a UnixEventPort. jpayne@69: // jpayne@69: // TODO(cleanup): The above warning is no longer accurate -- daemonizing after creating a jpayne@69: // UnixEventPort should now work since we no longer use signalfd. But do we want to commit to jpayne@69: // keeping it that way? Note it's still unsafe to fork() and then use UnixEventPort from both jpayne@69: // processes! jpayne@69: jpayne@69: public: jpayne@69: UnixEventPort(); jpayne@69: jpayne@69: ~UnixEventPort() noexcept(false); jpayne@69: jpayne@69: class FdObserver; jpayne@69: // Class that watches an fd for readability or writability. See definition below. jpayne@69: jpayne@69: Promise onSignal(int signum); jpayne@69: // When the given signal is delivered to this thread, return the corresponding siginfo_t. jpayne@69: // The signal must have been captured using `captureSignal()`. jpayne@69: // jpayne@69: // If `onSignal()` has not been called, the signal will remain blocked in this thread. jpayne@69: // Therefore, a signal which arrives before `onSignal()` was called will not be "missed" -- the jpayne@69: // next call to 'onSignal()' will receive it. Also, you can control which thread receives a jpayne@69: // process-wide signal by only calling `onSignal()` on that thread's event loop. jpayne@69: // jpayne@69: // The result of waiting on the same signal twice at once is undefined. jpayne@69: // jpayne@69: // WARNING: On MacOS and iOS, `onSignal()` will only see process-level signals, NOT jpayne@69: // thread-specific signals (i.e. not those sent with pthread_kill()). This is a limitation of jpayne@69: // Apple's implemnetation of kqueue() introduced in MacOS 10.14 which Apple says is not a bug. jpayne@69: // See: https://github.com/libevent/libevent/issues/765 Consider using kj::Executor or jpayne@69: // kj::newPromiseAndCrossThreadFulfiller() for cross-thread communications instead of signals. jpayne@69: // If you must have signals, build KJ and your app with `-DKJ_USE_KQUEUE=0`, which will cause jpayne@69: // KJ to fall back to a generic poll()-based implementation that is less efficient but handles jpayne@69: // thread-specific signals. jpayne@69: jpayne@69: static void captureSignal(int signum); jpayne@69: // Arranges for the given signal to be captured and handled via UnixEventPort, so that you may jpayne@69: // then pass it to `onSignal()`. This method is static because it registers a signal handler jpayne@69: // which applies process-wide. If any other threads exist in the process when `captureSignal()` jpayne@69: // is called, you *must* set the signal mask in those threads to block this signal, otherwise jpayne@69: // terrible things will happen if the signal happens to be delivered to those threads. If at jpayne@69: // all possible, call `captureSignal()` *before* creating threads, so that threads you create in jpayne@69: // the future will inherit the proper signal mask. jpayne@69: // jpayne@69: // To un-capture a signal, simply install a different signal handler and then un-block it from jpayne@69: // the signal mask. jpayne@69: jpayne@69: static void setReservedSignal(int signum); jpayne@69: // Sets the signal number which `UnixEventPort` reserves for internal use. If your application jpayne@69: // needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before jpayne@69: // constructing an `UnixEventPort`) to offer a different signal. jpayne@69: jpayne@69: Timer& getTimer() { return timerImpl; } jpayne@69: jpayne@69: Promise onChildExit(Maybe& pid); jpayne@69: // When the given child process exits, resolves to its wait status, as returned by wait(2). You jpayne@69: // will need to use the WIFEXITED() etc. macros to interpret the status code. jpayne@69: // jpayne@69: // You must call onChildExit() immediately after the child is created, before returning to the jpayne@69: // event loop. Otherwise, you may miss the child exit event. jpayne@69: // jpayne@69: // `pid` is a reference to a Maybe which must be non-null at the time of the call. When jpayne@69: // wait() is invoked (and indicates this pid has finished), `pid` will be nulled out. This is jpayne@69: // necessary to avoid a race condition: as soon as the child has been wait()ed, the PID table jpayne@69: // entry is freed and can then be reused. So, if you ever want safely to call `kill()` on the jpayne@69: // PID, it's necessary to know whether it has been wait()ed already. Since the promise's jpayne@69: // .then() continuation may not run immediately, we need a more precise way, hence we null out jpayne@69: // the Maybe. jpayne@69: // jpayne@69: // The caller must NOT null out `pid` on its own unless it cancels the Promise first. If the jpayne@69: // caller decides to cancel the Promise, and `pid` is still non-null after this cancellation, jpayne@69: // then the caller is expected to `waitpid()` on it BEFORE returning to the event loop again. jpayne@69: // Probably, the caller should kill() the child before waiting to avoid a hang. If the caller jpayne@69: // fails to do its own waitpid() before returning to the event loop, the child may become a jpayne@69: // zombie, or may be reaped automatically, depending on the platform -- since the caller does not jpayne@69: // know, the caller cannot try to reap the zombie later. jpayne@69: // jpayne@69: // You must call `kj::UnixEventPort::captureChildExit()` early in your program if you want to use jpayne@69: // `onChildExit()`. jpayne@69: // jpayne@69: // WARNING: Only one UnixEventPort per process is allowed to use onChildExit(). This is because jpayne@69: // child exit is signaled to the process via SIGCHLD, and Unix does not allow the program to jpayne@69: // control which thread receives the signal. (We may fix this in the future by automatically jpayne@69: // coordinating between threads when multiple threads are expecting child exits.) jpayne@69: // WARNING 2: If any UnixEventPort in the process is currently waiting for onChildExit(), then jpayne@69: // *only* that port's thread can safely wait on child processes, even synchronously. This is jpayne@69: // because the thread which used onChildExit() uses wait() to reap children, without specifying jpayne@69: // which child, and therefore it may inadvertently reap children created by other threads. jpayne@69: jpayne@69: static void captureChildExit(); jpayne@69: // Arranges for child process exit to be captured and handled via UnixEventPort, so that you may jpayne@69: // call `onChildExit()`. Much like `captureSignal()`, this static method must be called early on jpayne@69: // in program startup. jpayne@69: // jpayne@69: // This method may capture the `SIGCHLD` signal. You must not use `captureSignal(SIGCHLD)` nor jpayne@69: // `onSignal(SIGCHLD)` in your own code if you use `captureChildExit()`. jpayne@69: jpayne@69: // implements EventPort ------------------------------------------------------ jpayne@69: bool wait() override; jpayne@69: bool poll() override; jpayne@69: void wake() const override; jpayne@69: jpayne@69: private: jpayne@69: class SignalPromiseAdapter; jpayne@69: class ChildExitPromiseAdapter; jpayne@69: jpayne@69: const MonotonicClock& clock; jpayne@69: TimerImpl timerImpl; jpayne@69: jpayne@69: #if !KJ_USE_KQUEUE jpayne@69: SignalPromiseAdapter* signalHead = nullptr; jpayne@69: SignalPromiseAdapter** signalTail = &signalHead; jpayne@69: jpayne@69: void gotSignal(const siginfo_t& siginfo); jpayne@69: #endif jpayne@69: jpayne@69: friend class TimerPromiseAdapter; jpayne@69: jpayne@69: #if KJ_USE_EPOLL jpayne@69: sigset_t originalMask; jpayne@69: AutoCloseFd epollFd; jpayne@69: AutoCloseFd eventFd; // Used for cross-thread wakeups. jpayne@69: jpayne@69: bool processEpollEvents(struct epoll_event events[], int n); jpayne@69: #elif KJ_USE_KQUEUE jpayne@69: AutoCloseFd kqueueFd; jpayne@69: jpayne@69: bool doKqueueWait(struct timespec* timeout); jpayne@69: #else jpayne@69: class PollContext; jpayne@69: jpayne@69: FdObserver* observersHead = nullptr; jpayne@69: FdObserver** observersTail = &observersHead; jpayne@69: jpayne@69: #if KJ_USE_PIPE_FOR_WAKEUP jpayne@69: AutoCloseFd wakePipeIn; jpayne@69: AutoCloseFd wakePipeOut; jpayne@69: #else jpayne@69: unsigned long long threadId; // actually pthread_t jpayne@69: #endif jpayne@69: #endif jpayne@69: jpayne@69: #if !KJ_USE_KQUEUE jpayne@69: struct ChildSet; jpayne@69: Maybe> childSet; jpayne@69: #endif jpayne@69: jpayne@69: static void signalHandler(int, siginfo_t* siginfo, void*) noexcept; jpayne@69: static void registerSignalHandler(int signum); jpayne@69: #if !KJ_USE_EPOLL && !KJ_USE_KQUEUE && !KJ_USE_PIPE_FOR_WAKEUP jpayne@69: static void registerReservedSignal(); jpayne@69: #endif jpayne@69: static void ignoreSigpipe(); jpayne@69: }; jpayne@69: jpayne@69: class UnixEventPort::FdObserver: private AsyncObject { jpayne@69: // Object which watches a file descriptor to determine when it is readable or writable. jpayne@69: // jpayne@69: // For listen sockets, "readable" means that there is a connection to accept(). For everything jpayne@69: // else, it means that read() (or recv()) will return data. jpayne@69: // jpayne@69: // The presence of out-of-band data should NOT fire this event. However, the event may jpayne@69: // occasionally fire spuriously (when there is actually no data to read), and one thing that can jpayne@69: // cause such spurious events is the arrival of OOB data on certain platforms whose event jpayne@69: // interfaces fail to distinguish between regular and OOB data (e.g. Mac OSX). jpayne@69: // jpayne@69: // WARNING: The exact behavior of this class differs across systems, since event interfaces jpayne@69: // vary wildly. Be sure to read the documentation carefully and avoid depending on unspecified jpayne@69: // behavior. If at all possible, use the higher-level AsyncInputStream interface instead. jpayne@69: jpayne@69: public: jpayne@69: enum Flags { jpayne@69: OBSERVE_READ = 1, jpayne@69: OBSERVE_WRITE = 2, jpayne@69: OBSERVE_URGENT = 4, jpayne@69: OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE jpayne@69: }; jpayne@69: jpayne@69: FdObserver(UnixEventPort& eventPort, int fd, uint flags); jpayne@69: // Begin watching the given file descriptor for readability. Only one ReadObserver may exist jpayne@69: // for a given file descriptor at a time. jpayne@69: jpayne@69: ~FdObserver() noexcept(false); jpayne@69: jpayne@69: KJ_DISALLOW_COPY_AND_MOVE(FdObserver); jpayne@69: jpayne@69: Promise whenBecomesReadable(); jpayne@69: // Resolves the next time the file descriptor transitions from having no data to read to having jpayne@69: // some data to read. jpayne@69: // jpayne@69: // KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error jpayne@69: // to call this method when there is already data in the read buffer which has been there since jpayne@69: // prior to the last turn of the event loop or prior to creation FdWatcher. In this case, it is jpayne@69: // unspecified whether the promise will ever resolve -- it depends on the underlying event jpayne@69: // mechanism being used. jpayne@69: // jpayne@69: // In order to avoid this problem, make sure that you only call `whenBecomesReadable()` jpayne@69: // only at times when you know the buffer is empty. You know this for sure when one of the jpayne@69: // following happens: jpayne@69: // * read() or recv() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode jpayne@69: // enabled on the fd!) jpayne@69: // * The file descriptor is a regular byte-oriented object (like a socket or pipe), jpayne@69: // read() or recv() returns fewer than the number of bytes requested, and `atEndHint()` jpayne@69: // returns false. This can only happen if the buffer is empty but EOF is not reached. (Note, jpayne@69: // though, that for record-oriented file descriptors like Linux's inotify interface, this jpayne@69: // rule does not hold, because it could simply be that the next record did not fit into the jpayne@69: // space available.) jpayne@69: // jpayne@69: // It is an error to call `whenBecomesReadable()` again when the promise returned previously jpayne@69: // has not yet resolved. If you do this, the previous promise may throw an exception. jpayne@69: jpayne@69: inline Maybe atEndHint() { return atEnd; } jpayne@69: // Returns true if the event system has indicated that EOF has been received. There may still jpayne@69: // be data in the read buffer, but once that is gone, there's nothing left. jpayne@69: // jpayne@69: // Returns false if the event system has indicated that EOF had NOT been received as of the jpayne@69: // last turn of the event loop. jpayne@69: // jpayne@69: // Returns nullptr if the event system does not know whether EOF has been reached. In this jpayne@69: // case, the only way to know for sure is to call read() or recv() and check if it returns jpayne@69: // zero. jpayne@69: // jpayne@69: // This hint may be useful as an optimization to avoid an unnecessary system call. jpayne@69: jpayne@69: Promise whenBecomesWritable(); jpayne@69: // Resolves the next time the file descriptor transitions from having no space available in the jpayne@69: // write buffer to having some space available. jpayne@69: // jpayne@69: // KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error jpayne@69: // to call this method when there is already space in the write buffer which has been there jpayne@69: // since prior to the last turn of the event loop or prior to creation FdWatcher. In this case, jpayne@69: // it is unspecified whether the promise will ever resolve -- it depends on the underlying jpayne@69: // event mechanism being used. jpayne@69: // jpayne@69: // In order to avoid this problem, make sure that you only call `whenBecomesWritable()` jpayne@69: // only at times when you know the buffer is full. You know this for sure when one of the jpayne@69: // following happens: jpayne@69: // * write() or send() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode jpayne@69: // enabled on the fd!) jpayne@69: // * write() or send() succeeds but accepts fewer than the number of bytes provided. This can jpayne@69: // only happen if the buffer is full. jpayne@69: // jpayne@69: // It is an error to call `whenBecomesWritable()` again when the promise returned previously jpayne@69: // has not yet resolved. If you do this, the previous promise may throw an exception. jpayne@69: jpayne@69: Promise whenUrgentDataAvailable(); jpayne@69: // Resolves the next time the file descriptor's read buffer contains "urgent" data. jpayne@69: // jpayne@69: // The conditions for availability of urgent data are specific to the file descriptor's jpayne@69: // underlying implementation. jpayne@69: // jpayne@69: // It is an error to call `whenUrgentDataAvailable()` again when the promise returned previously jpayne@69: // has not yet resolved. If you do this, the previous promise may throw an exception. jpayne@69: // jpayne@69: // WARNING: This has some known weird behavior on macOS. See jpayne@69: // https://github.com/capnproto/capnproto/issues/374. jpayne@69: jpayne@69: Promise whenWriteDisconnected(); jpayne@69: // Resolves when poll() on the file descriptor reports POLLHUP or POLLERR. jpayne@69: jpayne@69: private: jpayne@69: UnixEventPort& eventPort; jpayne@69: int fd; jpayne@69: uint flags; jpayne@69: jpayne@69: kj::Maybe>> readFulfiller; jpayne@69: kj::Maybe>> writeFulfiller; jpayne@69: kj::Maybe>> urgentFulfiller; jpayne@69: kj::Maybe>> hupFulfiller; jpayne@69: // Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to jpayne@69: // null every time an event is fired. jpayne@69: jpayne@69: Maybe atEnd; jpayne@69: jpayne@69: #if KJ_USE_KQUEUE jpayne@69: void fire(struct kevent event); jpayne@69: #else jpayne@69: void fire(short events); jpayne@69: #endif jpayne@69: jpayne@69: #if !KJ_USE_EPOLL jpayne@69: FdObserver* next; jpayne@69: FdObserver** prev; jpayne@69: // Linked list of observers which currently have a non-null readFulfiller or writeFulfiller. jpayne@69: // If `prev` is null then the observer is not currently in the list. jpayne@69: jpayne@69: short getEventMask(); jpayne@69: #endif jpayne@69: jpayne@69: friend class UnixEventPort; jpayne@69: }; jpayne@69: jpayne@69: } // namespace kj jpayne@69: jpayne@69: KJ_END_HEADER