diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 734631a..e624163 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,8 +8,8 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-20.04, ubuntu-22.04] - compiler: [[gcc, g++], [clang, clang++]] + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04] + compiler: [{cc: gcc, cxx: g++}, {cc: clang, cxx: clang++}] timeout-minutes: 30 steps: @@ -26,8 +26,8 @@ jobs: ./yrmcdsd & sleep 1 env: - CC: ${{ matrix.compiler[0] }} - CXX: ${{ matrix.compiler[1] }} + CC: ${{ matrix.compiler.cc }} + CXX: ${{ matrix.compiler.cxx }} - name: Run tests run: | make YRMCDS_SERVER=localhost tests diff --git a/.gitignore b/.gitignore index 941b59f..9e5ffb5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ # Editors *~ .*.swp +.vscode/settings.json # Directories html diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..daff258 --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,18 @@ +{ + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${workspaceFolder}/**" + ], + "defines": [ + "CACHELINE_SIZE=32" + ], + "compilerPath": "/usr/bin/gcc", + "cStandard": "c17", + "cppStandard": "gnu++17", + "intelliSenseMode": "linux-gcc-x64" + } + ], + "version": 4 +} diff --git a/Makefile b/Makefile index b5f0361..3b02c9c 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # Makefile for yrmcds -# Prerequisites: gcc 4.8+ or clang 3.3+ +# Prerequisites: gcc 11.1+ or clang 14+ PREFIX = /usr/local DEFAULT_CONFIG = $(PREFIX)/etc/yrmcds.conf @@ -17,7 +17,7 @@ OPTFLAGS = -O2 #-flto DEBUGFLAGS = -gdwarf-3 #-fsanitize=address WARNFLAGS = -Wall -Wnon-virtual-dtor -Woverloaded-virtual CPUFLAGS = #-march=core2 -mtune=corei7 -CXXFLAGS = -std=gnu++11 $(OPTFLAGS) $(DEBUGFLAGS) $(shell getconf LFS_CFLAGS) $(WARNFLAGS) $(CPUFLAGS) +CXXFLAGS = -std=gnu++17 $(OPTFLAGS) $(DEBUGFLAGS) $(shell getconf LFS_CFLAGS) $(WARNFLAGS) $(CPUFLAGS) LDFLAGS = -L. $(shell getconf LFS_LDFLAGS) LDLIBS = $(shell getconf LFS_LIBS) -lyrmcds $(LIBTCMALLOC) -latomic -lpthread diff --git a/cybozu/reactor.cpp b/cybozu/reactor.cpp index 5c7a07b..2ab8f7d 100644 --- a/cybozu/reactor.cpp +++ b/cybozu/reactor.cpp @@ -16,6 +16,17 @@ const int POLLING_TIMEOUT = 100; // milli seconds namespace cybozu { +void resource::invalidate_and_close_() { + bool expected = true; + if( ! m_valid.compare_exchange_strong(expected, false) ) + return; + + // no need to read-lock `m_lock` here because `m_fd` will not be + // closed before this function calls `reactor::request_removal`. + on_invalidate(m_fd); + m_reactor->request_removal(*this); +} + reactor::reactor(): m_fd( epoll_create1(EPOLL_CLOEXEC) ), m_running(true) { @@ -37,7 +48,7 @@ void reactor::add_resource(std::unique_ptr res, int events) { if( res->m_reactor != nullptr ) throw std::logic_error(" already added!"); res->m_reactor = this; - const int fd = res->fileno(); + const int fd = res->m_fd; struct epoll_event ev; ev.events = events | EPOLLET; ev.data.fd = fd; @@ -47,7 +58,7 @@ void reactor::add_resource(std::unique_ptr res, int events) { } void reactor::modify_events(const resource& res, int events) { - const int fd = res.fileno(); + const int fd = res.m_fd; struct epoll_event ev; ev.events = events | EPOLLET; ev.data.fd = fd; @@ -78,12 +89,17 @@ void reactor::remove_resource(int fd) { dump_stack(); throw std::logic_error("bug in remove_resource"); } + auto res = it->second.get(); m_garbage.emplace_back( std::move(it->second) ); m_resources.erase(it); if( epoll_ctl(m_fd, EPOLL_CTL_DEL, fd, NULL) == -1 ) throw_unix_error(errno, "epoll_ctl(EPOLL_CTL_DEL)"); m_readables.erase(std::remove(m_readables.begin(), m_readables.end(), fd), m_readables.end()); + + if( ! res->try_close() ) { + logger::debug() << "failed to close fd (will be closed later): " << fd; + } } void reactor::poll() { @@ -93,7 +109,7 @@ void reactor::poll() { std::back_inserter(m_readables_copy)); m_readables.clear(); for( int fd: m_readables_copy ) { - if( ! m_resources[fd]->on_readable() ) + if( ! m_resources[fd]->on_readable(fd) ) remove_resource(fd); } m_readables_copy.clear(); @@ -120,24 +136,24 @@ void reactor::poll() { const int fd = ev.data.fd; resource& r = *(m_resources[fd]); if( ev.events & EPOLLERR ) { - if( ! r.on_error() ) + if( ! r.on_error(fd) ) remove_resource(fd); continue; } if( ev.events & EPOLLHUP ) { - if( ! r.on_hangup() ) { + if( ! r.on_hangup(fd) ) { remove_resource(fd); continue; } } if( ev.events & EPOLLIN ) { - if( ! r.on_readable() ) { + if( ! r.on_readable(fd) ) { remove_resource(fd); continue; } } if( ev.events & EPOLLOUT ) { - if( ! r.on_writable() ) + if( ! r.on_writable(fd) ) remove_resource(fd); } } diff --git a/cybozu/reactor.hpp b/cybozu/reactor.hpp index f5376bb..5872e37 100644 --- a/cybozu/reactor.hpp +++ b/cybozu/reactor.hpp @@ -8,6 +8,7 @@ #include "spinlock.hpp" #include "util.hpp" +#include #include #include #include @@ -16,6 +17,14 @@ #include #include +// hack the pthread_rwlock initializer to avoid writer starvation +// see man pthread_rwlockattr_setkind_np(3) +#ifdef PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP +#undef PTHREAD_RWLOCK_INITIALIZER +#define PTHREAD_RWLOCK_INITIALIZER PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP +#endif +#include + namespace cybozu { class reactor; @@ -25,8 +34,8 @@ class reactor; // An abstraction of a file descriptor for use with . // The file descriptor should be set non-blocking. // -// All member functions except for are for the -// reactor thread. Sub classes can add methods for the other threads. +// All member functions except for are for the reactor thread. +// Sub classes can add methods for non-reactor threads. class resource { public: // Constructor. @@ -36,43 +45,46 @@ class resource { resource& operator=(const resource&) = delete; // Close the file descriptor. + // + // It is guaranteed that no other threads are using this resource + // when being destructed. See the design doc for details. + // https://github.com/cybozu/yrmcds/blob/master/docs/design.md#strategy-to-reclaim-shared-sockets + // + // `m_closed` is edited and checked only by the reactor thread, so no memory synchronization + // is necessary here. `m_fd` is a constant, so no synchronization is necessary either. virtual ~resource() { - ::close(m_fd); + if( ! m_closed ) { + ::close(m_fd); + } } - // Return the UNIX file descriptor for this resource. - int fileno() const { return m_fd; } - // `true` if this resource is still valid. bool valid() const { - lock_guard g(m_lock); - return m_valid; + return m_valid.load(); } // Invalidate this resource. // // This is for the reactor thread only. // You may call this from within or . + // + // This returns `false` only if it successfully invalidates the resource. bool invalidate() { - lock_guard g(m_lock); - if( ! m_valid ) return true; - invalidate_( std::move(g) ); + bool expected = true; + if( ! m_valid.compare_exchange_strong(expected, false) ) { + return true; + } + + // no need to read-lock `m_lock` because this is on the reactor thread. + on_invalidate(m_fd); return false; } - // Invalidate this resource then request the reactor to remove this. - // - // This method invalidates this resource then requests the reactor - // thread to remove this resource from the reactor. - // - // **Do not use this in the reactor thread**. - void invalidate_and_close(); - - // A template method called from within invalidate. + // A template method called from within and . // - // A template method called from within invalidate. - // Subclasses can override this to clean up something. - virtual void on_invalidate() {} + // Subclasses can override this to clean up something when the resource + // is invalidated. This is called only once. + virtual void on_invalidate(int fd) {} // Called when the reactor finds this resource is readable. // @@ -90,7 +102,7 @@ class resource { // the client may still be waiting for the response. // // @return `true` or return value of . - virtual bool on_readable() = 0; + virtual bool on_readable(int fd) = 0; // Called when the reactor finds this resource is writable. // @@ -101,38 +113,112 @@ class resource { // If some error happened, execute `return invalidate();`. // // @return `true` or return value of . - virtual bool on_writable() = 0; + virtual bool on_writable(int fd) = 0; // Called when the reactor finds this resource has hanged up. // // This method is called when the reactor detects unexpected hangup. - virtual bool on_hangup() { + virtual bool on_hangup(int fd) { return invalidate(); } // Called when the reactor finds an error on this resource. // // This method is called when the reactor detects some error. - virtual bool on_error() { + virtual bool on_error(int fd) { return invalidate(); } protected: - const int m_fd; reactor* m_reactor = nullptr; friend class reactor; + // Call `f` with the file descriptor of this resource. + // `f` should be a function like `bool f(int fd)`. + // This is intended to be called by non-reactor threads. + // + // This calls `f` only if this resource is still valid. + // If it is invalid, this returns `false`. Otherwise, the + // return value will be the return value of `f`. + // + // The template function `f` should return `true` if it wants to keep + // the resource valid. If it should returns `false`, then + // invalidates the resource. + template + bool with_fd(Func&& f) { + read_lock g(m_lock); + if( ! valid() ) return false; + // no need to check m_closed because it becomes true only after m_valid is set to false. + + if( f(m_fd) ) { + return true; + } + + invalidate_and_close_(); + return false; + } + + // Invalidates this resource and closes the file descriptor. + // This is intended to be called by non-reactor threads. + // + // This is a simple wrapper of just to invalidate + // the resource and close the file descriptor. + void invalidate_and_close() { + with_fd([](int) -> bool { return false; }); + } + private: - bool m_valid = true; - mutable spinlock m_lock; - typedef std::unique_lock lock_guard; - void invalidate_(lock_guard g) { - m_valid = false; - g.unlock(); - on_invalidate(); + // The resource status is represented with the following two flags. + // `m_valid`: New operations (such as read, write) on this resource can be initiated only when `m_valid` is true. + // Note that even if `m_valid` is false, there may still be outstanding operations. + // `m_closed`: This flag represents the open/close status of the file descriptor. + // The file descriptor is closed only after `m_valid` is set to false. + // + // We have to have `m_closed` separately from `m_valid` because we want to + // close the file descriptor as early as possible, but it cannot always be + // done immediately. See `try_close` for details. + // + // Since `m_closed` is used only by the reactor thread, we do not need to + // protect it with a guarding lock. + std::atomic_bool m_valid = true; + bool m_closed = false; + + // `m_fd` is the file descriptor of this resource. + // + // The file descriptor may be closed by the reactor thread earlier than the resource is destructed. + // To avoid closing the file descriptor while other threads are using it, we use a shared mutex `m_lock`. + // + // The reactor thread tries to acquire a write lock when it closes the file descriptor. + // Other threads must acquire a read lock while it uses the file descriptor through . + // + // Since the reactor thread is the only thread that can close `m_fd`, the reactor thread does + // not need to acquire a read lock when it uses `m_fd`. + mutable std::shared_mutex m_lock; + typedef std::shared_lock read_lock; + typedef std::unique_lock write_lock; + const int m_fd; + + // This tries to close the file descriptor if no other threads are using it. + // Called only from the friend class to early close the file descriptor. + // That means that only the reactor thread can use this. + // + // If another thread is using the file descriptor, this does nothing and returns `false`. + // In that case, the file descriptor will be closed when this resource is destructed. + bool try_close() { + write_lock g(m_lock, std::try_to_lock); + if( ! g ) return false; + + if( ! m_closed ) { + ::close(m_fd); + m_closed = true; + } + return true; } + + // A supplementary method for . + void invalidate_and_close_(); }; @@ -188,10 +274,12 @@ class reactor { // Add a resource to the readable resource list. // - // Only may call this when it stops reading - // from a resource before it encounters `EAGAIN` or `EWOULDBLOCK`. + // This can be used only in an `on_readable` hook of a resource when + // it stops reading from the file descriptor before it encounters + // `EAGAIN` or `EWOULDBLOCK`. Note that `on_readable` is executed by + // the reactor thread. void add_readable(const resource& res) { - m_readables.push_back(res.fileno()); + m_readables.push_back(res.m_fd); } // Add a removal request for a resource. @@ -200,9 +288,13 @@ class reactor { // When such a thread successfully invalidates a resource, the thread // need to request the reactor thread to remove the resource by // calling this. + // + // Note that the use of `m_fd` here does not initiate any new syscalls. + // The value is used just as a map key to find the resource to remove + // from the reactor. So, no read-lock for `m_fd` is necessary. void request_removal(const resource& res) { lock_guard g(m_lock); - m_drop_req.push_back(res.fileno()); + m_drop_req.push_back(res.m_fd); } bool has_garbage() const noexcept { @@ -238,7 +330,7 @@ class reactor { // Remove a registered resource. // This is only for the reactor thread. void remove_resource(const resource& res) { - remove_resource(res.fileno()); + remove_resource(res.m_fd); } private: @@ -263,14 +355,6 @@ class reactor { void poll(); }; - -inline void resource::invalidate_and_close() { - lock_guard g(m_lock); - if( ! m_valid ) return; - invalidate_( std::move(g) ); - m_reactor->request_removal(*this); -} - } // namespace cybozu #endif // CYBOZU_REACTOR_HPP diff --git a/cybozu/signal.hpp b/cybozu/signal.hpp index eacf964..c45cc2d 100644 --- a/cybozu/signal.hpp +++ b/cybozu/signal.hpp @@ -30,10 +30,15 @@ class signal_reader: public resource { // need to be blocked on all threads. signal_reader(const sigset_t *mask, callback_t callback): resource( signalfd(-1, mask, SFD_NONBLOCK|SFD_CLOEXEC) ), - m_callback(callback) { - if( m_fd == -1 ) - throw_unix_error(errno, "signalfd"); + m_callback(callback) + { + with_fd([](int fd) -> bool { + if( fd == -1 ) + throw_unix_error(errno, "signalfd"); + return true; + }); } + // Constructor. // @mask A set of signals to be handled by this resource. // @@ -52,10 +57,10 @@ class signal_reader: public resource { private: callback_t m_callback; - virtual bool on_readable() override final { + virtual bool on_readable(int fd) override final { while( true ) { struct signalfd_siginfo si; - ssize_t n = read(m_fd, &si, sizeof(si)); + ssize_t n = read(fd, &si, sizeof(si)); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno ==EWOULDBLOCK ) return true; @@ -66,7 +71,10 @@ class signal_reader: public resource { if( m_callback ) m_callback(si, *m_reactor); } } - virtual bool on_writable() override final { return true; } + + virtual bool on_writable(int) override final { + return true; + } }; diff --git a/cybozu/tcp.cpp b/cybozu/tcp.cpp index e32d72e..0071325 100644 --- a/cybozu/tcp.cpp +++ b/cybozu/tcp.cpp @@ -159,7 +159,7 @@ void tcp_socket::free_buffers() { m_shutdown = true; } -bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { +bool tcp_socket::_send(int fd, const char* p, std::size_t len, lock_guard& g) { while( ! can_send(len) ) { on_buffer_full(); m_cond_write.wait(g); @@ -168,7 +168,7 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { if( m_pending.empty() ) { while( len > 0 ) { - ssize_t n = ::send(m_fd, p, len, 0); + ssize_t n = ::send(fd, p, len, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; if( errno == EINTR ) continue; @@ -177,8 +177,6 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { logger::error() << ": (" << ecnd.value() << ") " << ecnd.message(); - g.unlock(); - invalidate_and_close(); return false; } p += n; @@ -225,7 +223,7 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { return true; } -bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { +bool tcp_socket::_sendv(int fd, const iovec* iov, const int iovcnt, lock_guard& g) { std::size_t total = 0; for( int i = 0; i < iovcnt; ++i ) { total += iov[i].len; @@ -250,7 +248,7 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { if( m_pending.empty() ) { while( ind < v_size ) { - ssize_t n = ::writev(m_fd, &(v[ind]), v_size - ind); + ssize_t n = ::writev(fd, &(v[ind]), v_size - ind); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; if( errno == EINTR ) continue; @@ -259,8 +257,6 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { logger::error() << ": (" << ecnd.value() << ") " << ecnd.message(); - g.unlock(); - invalidate_and_close(); return false; } while( n > 0 ) { @@ -326,11 +322,11 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { return true; } -bool tcp_socket::write_pending_data() { +bool tcp_socket::write_pending_data(int fd) { lock_guard g(m_lock); while( ! m_tmpbuf.empty() ) { - ssize_t n = ::send(m_fd, m_tmpbuf.data(), m_tmpbuf.size(), 0); + ssize_t n = ::send(fd, m_tmpbuf.data(), m_tmpbuf.size(), 0); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno == EWOULDBLOCK ) return true; @@ -353,7 +349,7 @@ bool tcp_socket::write_pending_data() { std::tie(p, len, sent) = t; while( len != sent ) { - ssize_t n = ::send(m_fd, p+sent, len-sent, 0); + ssize_t n = ::send(fd, p+sent, len-sent, 0); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -378,7 +374,7 @@ bool tcp_socket::write_pending_data() { } // all data have been sent. - _flush(); + _flush(fd); if( ! m_shutdown ) { g.unlock(); @@ -452,7 +448,7 @@ setup_server_socket(const char* bind_addr, std::uint16_t port, bool freebind) { return s; } -bool tcp_server_socket::on_readable() { +bool tcp_server_socket::on_readable(int fd) { while( true ) { union { struct sockaddr sa; @@ -460,7 +456,7 @@ bool tcp_server_socket::on_readable() { } addr; socklen_t addrlen = sizeof(addr); #ifdef _GNU_SOURCE - int s = ::accept4(m_fd, &(addr.sa), &addrlen, + int s = ::accept4(fd, &(addr.sa), &addrlen, SOCK_NONBLOCK|SOCK_CLOEXEC); #else int s = ::accept(m_fd, &(addr.sa), &addrlen); diff --git a/cybozu/tcp.hpp b/cybozu/tcp.hpp index d54ddca..d6d099d 100644 --- a/cybozu/tcp.hpp +++ b/cybozu/tcp.hpp @@ -4,6 +4,7 @@ #ifndef CYBOZU_TCP_HPP #define CYBOZU_TCP_HPP +#include "dynbuf.hpp" #include "ip_address.hpp" #include "reactor.hpp" #include "util.hpp" @@ -86,12 +87,14 @@ class tcp_socket: public resource { // // @return `true` if this socket is valid, `false` otherwise. bool send(const char* p, std::size_t len, bool flush=false) { - lock_guard g(m_lock); - if( ! _send(p, len, g) ) - return false; - if( flush && empty() ) - _flush(); - return true; + return with_fd([=](int fd) -> bool { + lock_guard g(m_lock); + if( ! _send(fd, p, len, g) ) + return false; + if( flush && empty() ) + _flush(fd); + return true; + }); } // Atomically send multiple data. @@ -104,14 +107,16 @@ class tcp_socket: public resource { // // @return `true` if this socket is valid, `false` otherwise. bool sendv(const iovec* iov, int iovcnt, bool flush=false) { - if( iovcnt >= MAX_IOVCNT ) - throw std::logic_error(" too many iovec."); - lock_guard g(m_lock); - if( ! _sendv(iov, iovcnt, g) ) - return false; - if( flush && empty() ) - _flush(); - return true; + return with_fd([=](int fd) -> bool { + if( iovcnt >= MAX_IOVCNT ) + throw std::logic_error(" too many iovec."); + lock_guard g(m_lock); + if( ! _sendv(fd, iov, iovcnt, g) ) + return false; + if( flush && empty() ) + _flush(fd); + return true; + }); } // Atomically send data, then close the socket. @@ -124,19 +129,19 @@ class tcp_socket: public resource { // // @return `true` if this socket is valid, `false` otherwise. bool send_close(const char* p, std::size_t len) { - lock_guard g(m_lock); - if( ! _send(p, len, g) ) - return false; - m_shutdown = true; - if( empty() ) { - _flush(); + return with_fd([=](int fd) -> bool { + lock_guard g(m_lock); + if( ! _send(fd, p, len, g) ) + return false; + m_shutdown = true; + if( empty() ) { + _flush(fd); + return false; + } g.unlock(); - invalidate_and_close(); + m_cond_write.notify_all(); return true; - } - g.unlock(); - m_cond_write.notify_all(); - return true; + }); } // Atomically send multiple data, then close the socket. @@ -149,21 +154,65 @@ class tcp_socket: public resource { // // @return `true` if this socket is valid, `false` otherwise. bool sendv_close(const iovec* iov, int iovcnt) { - if( iovcnt >= MAX_IOVCNT ) - throw std::logic_error(" too many iov."); - lock_guard g(m_lock); - if( ! _sendv(iov, iovcnt, g) ) - return false; - m_shutdown = true; - if( empty() ) { - _flush(); + return with_fd([=](int fd) -> bool { + if( iovcnt >= MAX_IOVCNT ) + throw std::logic_error(" too many iov."); + lock_guard g(m_lock); + if( ! _sendv(fd, iov, iovcnt, g) ) + return false; + m_shutdown = true; + if( empty() ) { + _flush(fd); + return false; + } g.unlock(); - invalidate_and_close(); + m_cond_write.notify_all(); return true; + }); + } + + enum class recv_result { + OK, // Received some data. + AGAIN, // No data was available. + RESET, // Connection reset by peer. + NONE, // Peer half-closed the connection. + }; + + // Receive data from the socket. + // @buf A buffer to store received data. + // @max_recvsize Maximum size of data to be received. + // + // This function receives data from the socket. Since this uses `with_fd` + // internally, the reactor thread should not call this. + // + // @return The result of the operation. + recv_result receive(dynbuf& buf, const std::size_t max_recvsize) { + char* p = buf.prepare(max_recvsize); + ::ssize_t n; + auto ret = with_fd([=, &n](int fd) -> bool { + do { + n = ::recv(fd, p, max_recvsize, 0); + } while( n == -1 && errno == EINTR ); + + return (n != -1) || (errno != ECONNRESET); + }); + + if( ! ret ) { + return recv_result::RESET; } - g.unlock(); - m_cond_write.notify_all(); - return true; + + if( n == 0 ) { + return recv_result::NONE; + } + + if( n == -1 ) { + if( errno == EAGAIN || errno == EWOULDBLOCK ) + return recv_result::AGAIN; + throw_unix_error(errno, "recv"); + } + + buf.consume(n); + return recv_result::OK; } protected: @@ -172,20 +221,20 @@ class tcp_socket: public resource { // This method tries to send pending data as much as possible. // // @return `false` if some error happened, `true` otherwise. - bool write_pending_data(); + bool write_pending_data(int fd); // Just call . // // The default implementation just invoke . // You may override this to dispatch the job to another thread. - virtual bool on_writable() override { - if( write_pending_data() ) + virtual bool on_writable(int fd) override { + if( write_pending_data(fd) ) return true; return invalidate(); } - virtual void on_invalidate() override { - ::shutdown(m_fd, SHUT_RDWR); + virtual void on_invalidate(int fd) override { + ::shutdown(fd, SHUT_RDWR); free_buffers(); m_cond_write.notify_all(); } @@ -217,16 +266,16 @@ class tcp_socket: public resource { if( m_pending.empty() ) return true; return capacity() >= len; } - bool _send(const char* p, std::size_t len, lock_guard& g); - bool _sendv(const iovec* iov, const int iovcnt, lock_guard& g); + bool _send(int fd, const char* p, std::size_t len, lock_guard& g); + bool _sendv(int fd, const iovec* iov, const int iovcnt, lock_guard& g); bool empty() const { return m_pending.empty() && m_tmpbuf.empty(); } - void _flush() { + void _flush(int fd) { // with TCP_CORK, setting TCP_NODELAY effectively flushes // the kernel send buffer. int v = 1; - if( setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 ) + if( setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 ) throw_unix_error(errno, "setsockopt(TCP_NODELAY)"); } void free_buffers(); @@ -276,8 +325,8 @@ class tcp_server_socket: public resource { private: wrapper m_wrapper; - virtual bool on_readable() override final; - virtual bool on_writable() override final { return true; } + virtual bool on_readable(int) override final; + virtual bool on_writable(int) override final { return true; } }; diff --git a/src/counter/sockets.cpp b/src/counter/sockets.cpp index e5cb8a1..675a653 100644 --- a/src/counter/sockets.cpp +++ b/src/counter/sockets.cpp @@ -31,29 +31,14 @@ counter_socket::counter_socket(int fd, } while( true ) { - char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); - if( n == -1 ) { - if( errno == EAGAIN || errno == EWOULDBLOCK ) - break; - if( errno == EINTR ) - continue; - if( errno == ECONNRESET ) { - buf.reset(); - release_all(); - invalidate_and_close(); - break; - } - cybozu::throw_unix_error(errno, "recv"); - } - if( n == 0 ) { + auto res = receive(buf, MAX_RECVSIZE); + if( res == recv_result::AGAIN ) + break; + if( res == recv_result::RESET || res == recv_result::NONE ) { buf.reset(); release_all(); - invalidate_and_close(); break; } - // if (n != -1) && (n != 0) - buf.consume(n); const char* head = buf.data(); std::size_t len = buf.size(); @@ -84,8 +69,9 @@ counter_socket::counter_socket(int fd, }; m_sendjob = [this](cybozu::dynbuf& buf) { - if( ! write_pending_data() ) - invalidate_and_close(); + with_fd([=](int fd) -> bool { + return write_pending_data(fd); + }); }; } @@ -94,7 +80,7 @@ counter_socket::~counter_socket() { release_all(); } -bool counter_socket::on_readable() { +bool counter_socket::on_readable(int fd) { if( m_busy.load(std::memory_order_acquire) ) { m_reactor->add_readable(*this); return true; @@ -112,11 +98,11 @@ bool counter_socket::on_readable() { return true; } -bool counter_socket::on_writable() { +bool counter_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); diff --git a/src/counter/sockets.hpp b/src/counter/sockets.hpp index 0c56ae3..116e2fd 100644 --- a/src/counter/sockets.hpp +++ b/src/counter/sockets.hpp @@ -42,13 +42,13 @@ class counter_socket: public cybozu::tcp_socket { std::unordered_map m_acquired_resources; - virtual void on_invalidate() override final { + virtual void on_invalidate(int fd) override final { g_stats.curr_connections.fetch_sub(1); - cybozu::tcp_socket::on_invalidate(); + cybozu::tcp_socket::on_invalidate(fd); } - bool on_readable() override; - bool on_writable() override; + bool on_readable(int) override; + bool on_writable(int) override; void cmd_get(const counter::request& cmd, counter::response& r); void cmd_acquire(const counter::request& cmd, counter::response& r); diff --git a/src/memcache/handler.cpp b/src/memcache/handler.cpp index 5d47d05..76edf97 100644 --- a/src/memcache/handler.cpp +++ b/src/memcache/handler.cpp @@ -114,14 +114,10 @@ void handler::on_master_interval() { continue; } if( slave->timed_out() ) { - std::string addr = "unknown address"; - try { - addr = cybozu::get_peer_ip_address(slave->fileno()).str(); - } catch (...) { - // ignore errors - } - cybozu::logger::info() << "No heartbeats from a slave (" << addr - << "). Close the replication socket."; + cybozu::logger::info() + << "No heartbeats from a slave (" + << slave->peer_ip() + << "). Close the replication socket."; // close the socket and release resources if( ! slave->invalidate() ) m_reactor.remove_resource(*slave); diff --git a/src/memcache/memcache.cpp b/src/memcache/memcache.cpp index cbadd86..23ba03a 100644 --- a/src/memcache/memcache.cpp +++ b/src/memcache/memcache.cpp @@ -50,6 +50,8 @@ inline const char* cfind(const char* p, char c, std::size_t len) { template inline UInt to_uint(const char* p, bool& result) { + static_assert( sizeof(UInt) <= sizeof(unsigned long long), + "UInt is larger than unsigned long long" ); result = false; char* end; unsigned long long i = strtoull(p, &end, 10); @@ -58,7 +60,7 @@ inline UInt to_uint(const char* p, bool& result) { errno == ERANGE ) return 0; char c = *end; if( c != CR && c != LF && c != SP ) return 0; - if( i > std::numeric_limits::max() ) return 0; + if( i > static_cast(std::numeric_limits::max()) ) return 0; result = true; return static_cast(i); } diff --git a/src/memcache/sockets.cpp b/src/memcache/sockets.cpp index 142ea0b..5967188 100644 --- a/src/memcache/sockets.cpp +++ b/src/memcache/sockets.cpp @@ -44,7 +44,10 @@ memcache_socket::memcache_socket(int fd, m_recvjob = [this](cybozu::dynbuf& buf) { // set lock context for objects. - g_context = m_fd; + with_fd([](int fd) -> bool { + g_context = fd; + return true; + }); // load pending data if( ! m_pending.empty() ) { @@ -53,29 +56,14 @@ memcache_socket::memcache_socket(int fd, } while( true ) { - char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); - if( n == -1 ) { - if( errno == EAGAIN || errno == EWOULDBLOCK ) - break; - if( errno == EINTR ) - continue; - if( errno == ECONNRESET ) { - buf.reset(); - unlock_all(); - invalidate_and_close(); - break; - } - cybozu::throw_unix_error(errno, "recv"); - } - if( n == 0 ) { + auto res = receive(buf, MAX_RECVSIZE); + if( res == recv_result::AGAIN ) + break; + if( res == recv_result::RESET || res == recv_result::NONE ) { buf.reset(); unlock_all(); - invalidate_and_close(); break; } - // if (n != -1) && (n != 0) - buf.consume(n); const char* head = buf.data(); std::size_t len = buf.size(); @@ -115,8 +103,9 @@ memcache_socket::memcache_socket(int fd, }; m_sendjob = [this](cybozu::dynbuf&) { - if( ! write_pending_data() ) - invalidate_and_close(); + with_fd([=](int fd) -> bool { + return write_pending_data(fd); + }); }; } @@ -131,7 +120,7 @@ memcache_socket::~memcache_socket() { } } -bool memcache_socket::on_readable() { +bool memcache_socket::on_readable(int) { if( m_busy.load(std::memory_order_acquire) ) { m_reactor->add_readable(*this); return true; @@ -152,11 +141,11 @@ bool memcache_socket::on_readable() { return true; } -bool memcache_socket::on_writable() { +bool memcache_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); @@ -955,10 +944,10 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) { } } -bool repl_socket::on_readable() { +bool repl_socket::on_readable(int fd) { // recv and drop. while( true ) { - ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -967,7 +956,7 @@ bool repl_socket::on_readable() { if( errno == ECONNRESET ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -979,7 +968,7 @@ bool repl_socket::on_readable() { if( n == 0 ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -991,18 +980,18 @@ bool repl_socket::on_readable() { return true; } -bool repl_socket::on_writable() { +bool repl_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); return true; } -bool repl_client_socket::on_readable() { +bool repl_client_socket::on_readable(int fd) { // This function is executed in the same thread as the function that sends // heartbeats. If this function takes a very long time, no heartbeats will // be sent, and this process will be judged dead by the master. To prevent @@ -1012,7 +1001,7 @@ bool repl_client_socket::on_readable() { size_t n_iter = 0; while( true ) { char* p = m_recvbuf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; diff --git a/src/memcache/sockets.hpp b/src/memcache/sockets.hpp index 93329c5..f591074 100644 --- a/src/memcache/sockets.hpp +++ b/src/memcache/sockets.hpp @@ -78,15 +78,15 @@ class memcache_socket: public cybozu::tcp_socket { cybozu::worker::job m_sendjob; std::vector> m_locks; - virtual void on_invalidate() override final { + virtual void on_invalidate(int fd) override final { // In order to avoid races and deadlocks, remaining locks // are not released here. They are released in the destructor // where no other threads have access to this object. g_stats.curr_connections.fetch_sub(1); - cybozu::tcp_socket::on_invalidate(); + cybozu::tcp_socket::on_invalidate(fd); } - virtual bool on_readable() override final; - virtual bool on_writable() override final; + virtual bool on_readable(int) override final; + virtual bool on_writable(int) override final; }; @@ -102,8 +102,9 @@ class repl_socket: public cybozu::tcp_socket { m_last_heartbeat(g_current_time.load(std::memory_order_relaxed)) { m_sendjob = [this](cybozu::dynbuf&) { - if( ! write_pending_data() ) - invalidate_and_close(); + with_fd([=](int fd) -> bool { + return write_pending_data(fd); + }); }; } @@ -112,6 +113,19 @@ class repl_socket: public cybozu::tcp_socket { return m_last_heartbeat + g_config.slave_timeout() <= now; } + std::string peer_ip() { + std::string addr = "unknown address"; + try { + with_fd([&addr](int fd) -> bool { + addr = cybozu::get_peer_ip_address(fd).str(); + return true; + }); + } catch (...) { + // ignore errors + } + return addr; + } + virtual void on_buffer_full() override { cybozu::logger::warning() << "Replication buffer is full. " @@ -124,8 +138,8 @@ class repl_socket: public cybozu::tcp_socket { cybozu::worker::job m_sendjob; std::time_t m_last_heartbeat; - virtual bool on_readable() override final; - virtual bool on_writable() override final; + virtual bool on_readable(int) override final; + virtual bool on_writable(int) override final; }; @@ -138,13 +152,13 @@ class repl_client_socket: public cybozu::tcp_socket { cybozu::hash_map& m_hash; cybozu::dynbuf m_recvbuf; - virtual bool on_readable() override final; - virtual bool on_hangup() override final { + virtual bool on_readable(int) override final; + virtual bool on_hangup(int) override final { cybozu::logger::warning() << "The connection to master has hung up."; m_reactor->quit(); return invalidate(); } - virtual bool on_error() override final { + virtual bool on_error(int) override final { cybozu::logger::warning() << "An error occurred on the connection to master."; m_reactor->quit(); return invalidate(); diff --git a/test/tcp.cpp b/test/tcp.cpp index 41e08b3..ac3acb4 100644 --- a/test/tcp.cpp +++ b/test/tcp.cpp @@ -38,7 +38,7 @@ AUTOTEST(fd_exhausted) { struct dummy_socket : public cybozu::tcp_socket { dummy_socket(int s): cybozu::tcp_socket(s) {} - virtual bool on_readable() override { return true; } + virtual bool on_readable(int) override { return true; } }; auto on_accept = [](int s, const cybozu::ip_address addr) { return std::unique_ptr(new dummy_socket(s));