Skip to content

Commit

Permalink
fix(replication): slave blocks until keepalive timer is reached when …
Browse files Browse the repository at this point in the history
…master is gone without fin/rst notification (#2662)

Co-authored-by: yxj25245 <[email protected]>
Co-authored-by: hulk <[email protected]>
Co-authored-by: Twice <[email protected]>
Co-authored-by: Twice <[email protected]>
  • Loading branch information
5 people authored Nov 16, 2024
1 parent eb4de5c commit 5e9db79
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 5 deletions.
14 changes: 14 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ slave-read-only yes
# By default the priority is 100.
slave-priority 100

# Change the default timeout in milliseconds for socket connect during replication.
# The default value is 3100, and 0 means no timeout.
#
# If the master is unreachable before connecting, not having a timeout may block future
# 'clusterx setnodes' commands because the replication thread is blocked on connect.
replication-connect-timeout-ms 3100

# Change the default timeout in milliseconds for socket recv during fullsync.
# The default value is 3200, and 0 means no timeout.
#
# If the master is unreachable when fetching SST files, not having a timeout may block
# future 'clusterx setnodes' commands because the replication thread is blocked on recv.
replication-recv-timeout-ms 3200

# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
Expand Down
20 changes: 17 additions & 3 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,14 @@ void ReplicationThread::CallbacksStateMachine::Start() {
}

uint64_t last_connect_timestamp = 0;
int connect_timeout_ms = 3100;

while (!repl_->stop_flag_ && bev == nullptr) {
if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
// prevent frequent re-connect when the master is down with the connection refused error
sleep(1);
}
last_connect_timestamp = util::GetTimeStampMS();
auto cfd = util::SockConnect(repl_->host_, repl_->port_, connect_timeout_ms);
auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg();
continue;
Expand Down Expand Up @@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err"));
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
this->srv_->GetConfig()->replication_connect_timeout_ms,
this->srv_->GetConfig()->replication_recv_timeout_ms)
.Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
Expand Down Expand Up @@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read size");
}
continue;
Expand Down Expand Up @@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
remain -= data_len;
} else {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read sst file");
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,12 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may
howmuch = BUFFER_SIZE;
}
if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) {
return {Status::NotOK, fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))};
int err = SSL_get_error(ssl, howmuch);
if (err == SSL_ERROR_ZERO_RETURN) {
return {Status::EndOfFile, "EOF encountered while reading from SSL connection"};
}
return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK,
fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))};
}

if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) {
Expand All @@ -514,8 +519,11 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may
#endif
if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) {
return ret;
} else if (ret == 0) {
return {Status::EndOfFile, "EOF encountered while reading from socket"};
} else {
return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))};
return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain : Status::NotOK,
fmt::format("failed to read from socket: {}", strerror(errno))};
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class [[nodiscard]] Status {
// Search
NoPrefixMatched,
TypeMismatched,

// IO
TryAgain,
EndOfFile,
};

Status() : impl_{nullptr} {}
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ Config::Config() {
{"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)},
{"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)},
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
{"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct Config {
bool slave_serve_stale_data = true;
bool slave_empty_db_before_fullsync = false;
int slave_priority = 100;
int replication_connect_timeout_ms = 3100;
int replication_recv_timeout_ms = 3200;
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;
Expand Down

0 comments on commit 5e9db79

Please sign in to comment.