Skip to content

Commit

Permalink
Adapt source code to the new API
Browse files Browse the repository at this point in the history
  • Loading branch information
ymmt2005 committed Aug 19, 2024
1 parent c6aa51e commit 0f52eb5
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 85 deletions.
36 changes: 11 additions & 25 deletions src/counter/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,14 @@ counter_socket::counter_socket(int fd,
}

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
if( errno == EINTR )
continue;
if( errno == ECONNRESET ) {
buf.reset();
release_all();
invalidate_and_close();
break;
}
cybozu::throw_unix_error(errno, "recv");
}
if( n == 0 ) {
auto res = recv(buf, MAX_RECVSIZE);
if( res == recv_result::AGAIN )
break;
if( res == recv_result::RESET || res == recv_result::NONE ) {
buf.reset();
release_all();
invalidate_and_close();
break;
}
// if (n != -1) && (n != 0)
buf.consume(n);

const char* head = buf.data();
std::size_t len = buf.size();
Expand All @@ -70,7 +55,7 @@ counter_socket::counter_socket(int fd,
<< len << " bytes.";
buf.reset();
release_all();
invalidate_and_close();
with_fd([](int) -> bool { return false; });
break;
}
buf.erase(head - buf.data());
Expand All @@ -84,8 +69,9 @@ counter_socket::counter_socket(int fd,
};

m_sendjob = [this](cybozu::dynbuf& buf) {
if( ! write_pending_data() )
invalidate_and_close();
with_fd([=](int fd) -> bool {
return write_pending_data(fd);
});
};
}

Expand All @@ -94,7 +80,7 @@ counter_socket::~counter_socket() {
release_all();
}

bool counter_socket::on_readable() {
bool counter_socket::on_readable(int fd) {
if( m_busy.load(std::memory_order_acquire) ) {
m_reactor->add_readable(*this);
return true;
Expand All @@ -112,11 +98,11 @@ bool counter_socket::on_readable() {
return true;
}

bool counter_socket::on_writable() {
bool counter_socket::on_writable(int fd) {
cybozu::worker* w = m_finder();
if( w == nullptr ) {
// if there is no idle worker, fallback to the default.
return cybozu::tcp_socket::on_writable();
return cybozu::tcp_socket::on_writable(fd);
}

w->post_job(m_sendjob);
Expand Down
8 changes: 4 additions & 4 deletions src/counter/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class counter_socket: public cybozu::tcp_socket {
std::unordered_map<const cybozu::hash_key*, std::uint32_t>
m_acquired_resources;

virtual void on_invalidate() override final {
virtual void on_invalidate(int fd) override final {
g_stats.curr_connections.fetch_sub(1);
cybozu::tcp_socket::on_invalidate();
cybozu::tcp_socket::on_invalidate(fd);
}

bool on_readable() override;
bool on_writable() override;
bool on_readable(int) override;
bool on_writable(int) override;

void cmd_get(const counter::request& cmd, counter::response& r);
void cmd_acquire(const counter::request& cmd, counter::response& r);
Expand Down
12 changes: 4 additions & 8 deletions src/memcache/handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,10 @@ void handler::on_master_interval() {
continue;
}
if( slave->timed_out() ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(slave->fileno()).str();
} catch (...) {
// ignore errors
}
cybozu::logger::info() << "No heartbeats from a slave (" << addr
<< "). Close the replication socket.";
cybozu::logger::info()
<< "No heartbeats from a slave ("
<< slave->peer_ip()
<< "). Close the replication socket.";
// close the socket and release resources
if( ! slave->invalidate() )
m_reactor.remove_resource(*slave);
Expand Down
2 changes: 1 addition & 1 deletion src/memcache/memcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ inline UInt to_uint(const char* p, bool& result) {
errno == ERANGE ) return 0;
char c = *end;
if( c != CR && c != LF && c != SP ) return 0;
if( i > std::numeric_limits<UInt>::max() ) return 0;
if( i > static_cast<unsigned long long>(std::numeric_limits<UInt>::max()) ) return 0;
result = true;
return static_cast<UInt>(i);
}
Expand Down
61 changes: 25 additions & 36 deletions src/memcache/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ memcache_socket::memcache_socket(int fd,

m_recvjob = [this](cybozu::dynbuf& buf) {
// set lock context for objects.
g_context = m_fd;
with_fd([](int fd) -> bool {
g_context = fd;
return true;
});

// load pending data
if( ! m_pending.empty() ) {
Expand All @@ -53,29 +56,14 @@ memcache_socket::memcache_socket(int fd,
}

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
if( errno == EINTR )
continue;
if( errno == ECONNRESET ) {
buf.reset();
unlock_all();
invalidate_and_close();
break;
}
cybozu::throw_unix_error(errno, "recv");
}
if( n == 0 ) {
auto res = recv(buf, MAX_RECVSIZE);
if( res == recv_result::AGAIN )
break;
if( res == recv_result::RESET || res == recv_result::NONE ) {
buf.reset();
unlock_all();
invalidate_and_close();
break;
}
// if (n != -1) && (n != 0)
buf.consume(n);

const char* head = buf.data();
std::size_t len = buf.size();
Expand All @@ -101,7 +89,7 @@ memcache_socket::memcache_socket(int fd,
<< len << " bytes.";
buf.reset();
unlock_all();
invalidate_and_close();
with_fd([](int) -> bool { return false; });
break;
}
buf.erase(head - buf.data());
Expand All @@ -115,8 +103,9 @@ memcache_socket::memcache_socket(int fd,
};

m_sendjob = [this](cybozu::dynbuf&) {
if( ! write_pending_data() )
invalidate_and_close();
with_fd([=](int fd) -> bool {
return write_pending_data(fd);
});
};
}

Expand All @@ -131,7 +120,7 @@ memcache_socket::~memcache_socket() {
}
}

bool memcache_socket::on_readable() {
bool memcache_socket::on_readable(int) {
if( m_busy.load(std::memory_order_acquire) ) {
m_reactor->add_readable(*this);
return true;
Expand All @@ -152,11 +141,11 @@ bool memcache_socket::on_readable() {
return true;
}

bool memcache_socket::on_writable() {
bool memcache_socket::on_writable(int fd) {
cybozu::worker* w = m_finder();
if( w == nullptr ) {
// if there is no idle worker, fallback to the default.
return cybozu::tcp_socket::on_writable();
return cybozu::tcp_socket::on_writable(fd);
}

w->post_job(m_sendjob);
Expand Down Expand Up @@ -556,7 +545,7 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
case binary_command::QuitQ:
unlock_all();
if( cmd.quiet() ) {
invalidate_and_close();
with_fd([](int) -> bool { return false; });
} else {
r.quit();
}
Expand Down Expand Up @@ -947,18 +936,18 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) {
break;
case text_command::QUIT:
unlock_all();
invalidate_and_close();
with_fd([](int) -> bool { return false; });
break;
default:
cybozu::logger::info() << "not implemented";
r.error();
}
}

bool repl_socket::on_readable() {
bool repl_socket::on_readable(int fd) {
// recv and drop.
while( true ) {
ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand All @@ -967,7 +956,7 @@ bool repl_socket::on_readable() {
if( errno == ECONNRESET ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(m_fd).str();
addr = cybozu::get_peer_ip_address(fd).str();
} catch (...) {
// ignore errors
}
Expand All @@ -979,7 +968,7 @@ bool repl_socket::on_readable() {
if( n == 0 ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(m_fd).str();
addr = cybozu::get_peer_ip_address(fd).str();
} catch (...) {
// ignore errors
}
Expand All @@ -991,18 +980,18 @@ bool repl_socket::on_readable() {
return true;
}

bool repl_socket::on_writable() {
bool repl_socket::on_writable(int fd) {
cybozu::worker* w = m_finder();
if( w == nullptr ) {
// if there is no idle worker, fallback to the default.
return cybozu::tcp_socket::on_writable();
return cybozu::tcp_socket::on_writable(fd);
}

w->post_job(m_sendjob);
return true;
}

bool repl_client_socket::on_readable() {
bool repl_client_socket::on_readable(int fd) {
// This function is executed in the same thread as the function that sends
// heartbeats. If this function takes a very long time, no heartbeats will
// be sent, and this process will be judged dead by the master. To prevent
Expand All @@ -1012,7 +1001,7 @@ bool repl_client_socket::on_readable() {
size_t n_iter = 0;
while( true ) {
char* p = m_recvbuf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down
36 changes: 25 additions & 11 deletions src/memcache/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ class memcache_socket: public cybozu::tcp_socket {
cybozu::worker::job m_sendjob;
std::vector<std::reference_wrapper<const cybozu::hash_key>> m_locks;

virtual void on_invalidate() override final {
virtual void on_invalidate(int fd) override final {
// In order to avoid races and deadlocks, remaining locks
// are not released here. They are released in the destructor
// where no other threads have access to this object.
g_stats.curr_connections.fetch_sub(1);
cybozu::tcp_socket::on_invalidate();
cybozu::tcp_socket::on_invalidate(fd);
}
virtual bool on_readable() override final;
virtual bool on_writable() override final;
virtual bool on_readable(int) override final;
virtual bool on_writable(int) override final;
};


Expand All @@ -102,8 +102,9 @@ class repl_socket: public cybozu::tcp_socket {
m_last_heartbeat(g_current_time.load(std::memory_order_relaxed))
{
m_sendjob = [this](cybozu::dynbuf&) {
if( ! write_pending_data() )
invalidate_and_close();
with_fd([=](int fd) -> bool {
return write_pending_data(fd);
});
};
}

Expand All @@ -112,6 +113,19 @@ class repl_socket: public cybozu::tcp_socket {
return m_last_heartbeat + g_config.slave_timeout() <= now;
}

std::string peer_ip() {
std::string addr = "unknown address";
try {
with_fd([&addr](int fd) -> bool {
addr = cybozu::get_peer_ip_address(fd).str();
return true;
});
} catch (...) {
// ignore errors
}
return addr;
}

virtual void on_buffer_full() override {
cybozu::logger::warning()
<< "Replication buffer is full. "
Expand All @@ -124,8 +138,8 @@ class repl_socket: public cybozu::tcp_socket {
cybozu::worker::job m_sendjob;
std::time_t m_last_heartbeat;

virtual bool on_readable() override final;
virtual bool on_writable() override final;
virtual bool on_readable(int) override final;
virtual bool on_writable(int) override final;
};


Expand All @@ -138,13 +152,13 @@ class repl_client_socket: public cybozu::tcp_socket {
cybozu::hash_map<object>& m_hash;
cybozu::dynbuf m_recvbuf;

virtual bool on_readable() override final;
virtual bool on_hangup() override final {
virtual bool on_readable(int) override final;
virtual bool on_hangup(int) override final {
cybozu::logger::warning() << "The connection to master has hung up.";
m_reactor->quit();
return invalidate();
}
virtual bool on_error() override final {
virtual bool on_error(int) override final {
cybozu::logger::warning() << "An error occurred on the connection to master.";
m_reactor->quit();
return invalidate();
Expand Down

0 comments on commit 0f52eb5

Please sign in to comment.