diff --git a/src/counter/sockets.cpp b/src/counter/sockets.cpp index e5cb8a1..7eb24d9 100644 --- a/src/counter/sockets.cpp +++ b/src/counter/sockets.cpp @@ -31,29 +31,14 @@ counter_socket::counter_socket(int fd, } while( true ) { - char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); - if( n == -1 ) { - if( errno == EAGAIN || errno == EWOULDBLOCK ) - break; - if( errno == EINTR ) - continue; - if( errno == ECONNRESET ) { - buf.reset(); - release_all(); - invalidate_and_close(); - break; - } - cybozu::throw_unix_error(errno, "recv"); - } - if( n == 0 ) { + auto res = recv(buf, MAX_RECVSIZE); + if( res == recv_result::AGAIN ) + break; + if( res == recv_result::RESET || res == recv_result::NONE ) { buf.reset(); release_all(); - invalidate_and_close(); break; } - // if (n != -1) && (n != 0) - buf.consume(n); const char* head = buf.data(); std::size_t len = buf.size(); @@ -70,7 +55,7 @@ counter_socket::counter_socket(int fd, << len << " bytes."; buf.reset(); release_all(); - invalidate_and_close(); + with_fd([](int) -> bool { return false; }); break; } buf.erase(head - buf.data()); @@ -84,8 +69,9 @@ counter_socket::counter_socket(int fd, }; m_sendjob = [this](cybozu::dynbuf& buf) { - if( ! write_pending_data() ) - invalidate_and_close(); + with_fd([=](int fd) -> bool { + return write_pending_data(fd); + }); }; } @@ -94,7 +80,7 @@ counter_socket::~counter_socket() { release_all(); } -bool counter_socket::on_readable() { +bool counter_socket::on_readable(int fd) { if( m_busy.load(std::memory_order_acquire) ) { m_reactor->add_readable(*this); return true; @@ -112,11 +98,11 @@ bool counter_socket::on_readable() { return true; } -bool counter_socket::on_writable() { +bool counter_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); diff --git a/src/counter/sockets.hpp b/src/counter/sockets.hpp index 0c56ae3..116e2fd 100644 --- a/src/counter/sockets.hpp +++ b/src/counter/sockets.hpp @@ -42,13 +42,13 @@ class counter_socket: public cybozu::tcp_socket { std::unordered_map m_acquired_resources; - virtual void on_invalidate() override final { + virtual void on_invalidate(int fd) override final { g_stats.curr_connections.fetch_sub(1); - cybozu::tcp_socket::on_invalidate(); + cybozu::tcp_socket::on_invalidate(fd); } - bool on_readable() override; - bool on_writable() override; + bool on_readable(int) override; + bool on_writable(int) override; void cmd_get(const counter::request& cmd, counter::response& r); void cmd_acquire(const counter::request& cmd, counter::response& r); diff --git a/src/memcache/handler.cpp b/src/memcache/handler.cpp index 5d47d05..76edf97 100644 --- a/src/memcache/handler.cpp +++ b/src/memcache/handler.cpp @@ -114,14 +114,10 @@ void handler::on_master_interval() { continue; } if( slave->timed_out() ) { - std::string addr = "unknown address"; - try { - addr = cybozu::get_peer_ip_address(slave->fileno()).str(); - } catch (...) { - // ignore errors - } - cybozu::logger::info() << "No heartbeats from a slave (" << addr - << "). Close the replication socket."; + cybozu::logger::info() + << "No heartbeats from a slave (" + << slave->peer_ip() + << "). Close the replication socket."; // close the socket and release resources if( ! slave->invalidate() ) m_reactor.remove_resource(*slave); diff --git a/src/memcache/memcache.cpp b/src/memcache/memcache.cpp index cbadd86..0f6774f 100644 --- a/src/memcache/memcache.cpp +++ b/src/memcache/memcache.cpp @@ -58,7 +58,7 @@ inline UInt to_uint(const char* p, bool& result) { errno == ERANGE ) return 0; char c = *end; if( c != CR && c != LF && c != SP ) return 0; - if( i > std::numeric_limits::max() ) return 0; + if( i > static_cast(std::numeric_limits::max()) ) return 0; result = true; return static_cast(i); } diff --git a/src/memcache/sockets.cpp b/src/memcache/sockets.cpp index 142ea0b..67702bd 100644 --- a/src/memcache/sockets.cpp +++ b/src/memcache/sockets.cpp @@ -44,7 +44,10 @@ memcache_socket::memcache_socket(int fd, m_recvjob = [this](cybozu::dynbuf& buf) { // set lock context for objects. - g_context = m_fd; + with_fd([](int fd) -> bool { + g_context = fd; + return true; + }); // load pending data if( ! m_pending.empty() ) { @@ -53,29 +56,14 @@ memcache_socket::memcache_socket(int fd, } while( true ) { - char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); - if( n == -1 ) { - if( errno == EAGAIN || errno == EWOULDBLOCK ) - break; - if( errno == EINTR ) - continue; - if( errno == ECONNRESET ) { - buf.reset(); - unlock_all(); - invalidate_and_close(); - break; - } - cybozu::throw_unix_error(errno, "recv"); - } - if( n == 0 ) { + auto res = recv(buf, MAX_RECVSIZE); + if( res == recv_result::AGAIN ) + break; + if( res == recv_result::RESET || res == recv_result::NONE ) { buf.reset(); unlock_all(); - invalidate_and_close(); break; } - // if (n != -1) && (n != 0) - buf.consume(n); const char* head = buf.data(); std::size_t len = buf.size(); @@ -101,7 +89,7 @@ memcache_socket::memcache_socket(int fd, << len << " bytes."; buf.reset(); unlock_all(); - invalidate_and_close(); + with_fd([](int) -> bool { return false; }); break; } buf.erase(head - buf.data()); @@ -115,8 +103,9 @@ memcache_socket::memcache_socket(int fd, }; m_sendjob = [this](cybozu::dynbuf&) { - if( ! write_pending_data() ) - invalidate_and_close(); + with_fd([=](int fd) -> bool { + return write_pending_data(fd); + }); }; } @@ -131,7 +120,7 @@ memcache_socket::~memcache_socket() { } } -bool memcache_socket::on_readable() { +bool memcache_socket::on_readable(int) { if( m_busy.load(std::memory_order_acquire) ) { m_reactor->add_readable(*this); return true; @@ -152,11 +141,11 @@ bool memcache_socket::on_readable() { return true; } -bool memcache_socket::on_writable() { +bool memcache_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); @@ -556,7 +545,7 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) { case binary_command::QuitQ: unlock_all(); if( cmd.quiet() ) { - invalidate_and_close(); + with_fd([](int) -> bool { return false; }); } else { r.quit(); } @@ -947,7 +936,7 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) { break; case text_command::QUIT: unlock_all(); - invalidate_and_close(); + with_fd([](int) -> bool { return false; }); break; default: cybozu::logger::info() << "not implemented"; @@ -955,10 +944,10 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) { } } -bool repl_socket::on_readable() { +bool repl_socket::on_readable(int fd) { // recv and drop. while( true ) { - ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -967,7 +956,7 @@ bool repl_socket::on_readable() { if( errno == ECONNRESET ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -979,7 +968,7 @@ bool repl_socket::on_readable() { if( n == 0 ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -991,18 +980,18 @@ bool repl_socket::on_readable() { return true; } -bool repl_socket::on_writable() { +bool repl_socket::on_writable(int fd) { cybozu::worker* w = m_finder(); if( w == nullptr ) { // if there is no idle worker, fallback to the default. - return cybozu::tcp_socket::on_writable(); + return cybozu::tcp_socket::on_writable(fd); } w->post_job(m_sendjob); return true; } -bool repl_client_socket::on_readable() { +bool repl_client_socket::on_readable(int fd) { // This function is executed in the same thread as the function that sends // heartbeats. If this function takes a very long time, no heartbeats will // be sent, and this process will be judged dead by the master. To prevent @@ -1012,7 +1001,7 @@ bool repl_client_socket::on_readable() { size_t n_iter = 0; while( true ) { char* p = m_recvbuf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; diff --git a/src/memcache/sockets.hpp b/src/memcache/sockets.hpp index 93329c5..f591074 100644 --- a/src/memcache/sockets.hpp +++ b/src/memcache/sockets.hpp @@ -78,15 +78,15 @@ class memcache_socket: public cybozu::tcp_socket { cybozu::worker::job m_sendjob; std::vector> m_locks; - virtual void on_invalidate() override final { + virtual void on_invalidate(int fd) override final { // In order to avoid races and deadlocks, remaining locks // are not released here. They are released in the destructor // where no other threads have access to this object. g_stats.curr_connections.fetch_sub(1); - cybozu::tcp_socket::on_invalidate(); + cybozu::tcp_socket::on_invalidate(fd); } - virtual bool on_readable() override final; - virtual bool on_writable() override final; + virtual bool on_readable(int) override final; + virtual bool on_writable(int) override final; }; @@ -102,8 +102,9 @@ class repl_socket: public cybozu::tcp_socket { m_last_heartbeat(g_current_time.load(std::memory_order_relaxed)) { m_sendjob = [this](cybozu::dynbuf&) { - if( ! write_pending_data() ) - invalidate_and_close(); + with_fd([=](int fd) -> bool { + return write_pending_data(fd); + }); }; } @@ -112,6 +113,19 @@ class repl_socket: public cybozu::tcp_socket { return m_last_heartbeat + g_config.slave_timeout() <= now; } + std::string peer_ip() { + std::string addr = "unknown address"; + try { + with_fd([&addr](int fd) -> bool { + addr = cybozu::get_peer_ip_address(fd).str(); + return true; + }); + } catch (...) { + // ignore errors + } + return addr; + } + virtual void on_buffer_full() override { cybozu::logger::warning() << "Replication buffer is full. " @@ -124,8 +138,8 @@ class repl_socket: public cybozu::tcp_socket { cybozu::worker::job m_sendjob; std::time_t m_last_heartbeat; - virtual bool on_readable() override final; - virtual bool on_writable() override final; + virtual bool on_readable(int) override final; + virtual bool on_writable(int) override final; }; @@ -138,13 +152,13 @@ class repl_client_socket: public cybozu::tcp_socket { cybozu::hash_map& m_hash; cybozu::dynbuf m_recvbuf; - virtual bool on_readable() override final; - virtual bool on_hangup() override final { + virtual bool on_readable(int) override final; + virtual bool on_hangup(int) override final { cybozu::logger::warning() << "The connection to master has hung up."; m_reactor->quit(); return invalidate(); } - virtual bool on_error() override final { + virtual bool on_error(int) override final { cybozu::logger::warning() << "An error occurred on the connection to master."; m_reactor->quit(); return invalidate();