Skip to content

Commit

Permalink
raft learner support.
Browse files Browse the repository at this point in the history
Signed-off-by: Yichao Li <[email protected]>
  • Loading branch information
Li Yichao committed Feb 13, 2023
1 parent bc527db commit 4ae3a76
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 19 deletions.
8 changes: 6 additions & 2 deletions src/braft/ballot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ int Ballot::init(const Configuration& conf, const Configuration* old_conf) {
_peers.reserve(conf.size());
for (Configuration::const_iterator
iter = conf.begin(); iter != conf.end(); ++iter) {
_peers.push_back(*iter);
if (!iter->learner) {
_peers.push_back(*iter);
}
}
_quorum = _peers.size() / 2 + 1;
if (!old_conf) {
Expand All @@ -39,7 +41,9 @@ int Ballot::init(const Configuration& conf, const Configuration* old_conf) {
_old_peers.reserve(old_conf->size());
for (Configuration::const_iterator
iter = old_conf->begin(); iter != old_conf->end(); ++iter) {
_old_peers.push_back(*iter);
if (!iter->learner) {
_old_peers.push_back(*iter);
}
}
_old_quorum = _old_peers.size() / 2 + 1;
return 0;
Expand Down
3 changes: 3 additions & 0 deletions src/braft/ballot_box.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ int BallotBox::init(const BallotBoxOptions &options) {

int BallotBox::commit_at(
int64_t first_log_index, int64_t last_log_index, const PeerId& peer) {
if (peer.learner) {
return 0;
}
// FIXME(chenzhangyi01): The cricital section is unacceptable because it
// blocks all the other Replicators and LogManagers
std::unique_lock<raft_mutex_t> lck(_mutex);
Expand Down
16 changes: 12 additions & 4 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ typedef std::string VersionedGroupId;
struct PeerId {
butil::EndPoint addr; // ip+port.
int idx; // idx in same addr, default 0
bool learner;

PeerId() : idx(0) {}
explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {}
PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_) {}
PeerId(butil::EndPoint addr_, int idx_, bool learner_=false) : addr(addr_), idx(idx_), learner(learner_) {}
PeerId() : PeerId(butil::EndPoint(), 0) {}
explicit PeerId(butil::EndPoint addr_) : PeerId(addr_, 0) {}
/*intended implicit*/PeerId(const std::string& str)
{ CHECK_EQ(0, parse(str)); }
PeerId(const PeerId& id) : addr(id.addr), idx(id.idx) {}
PeerId(const PeerId& id) : PeerId(id.addr, id.idx, id.learner) {}

void reset() {
addr.ip = butil::IP_ANY;
addr.port = 0;
idx = 0;
learner = false;
}

bool is_empty() const {
Expand All @@ -59,6 +61,9 @@ struct PeerId {
int parse(const std::string& str) {
reset();
char ip_str[64];
if (!str.empty() && str.back() == 'l') {
learner = true;
}
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d", ip_str, &addr.port, &idx)) {
reset();
return -1;
Expand All @@ -85,6 +90,9 @@ inline bool operator<(const PeerId& id1, const PeerId& id2) {
}
}

// intentionally leave behind `learner` field.
// So when we change a peer from learner to normal or normal to learner
// raft will do nothing
inline bool operator==(const PeerId& id1, const PeerId& id2) {
return (id1.addr == id2.addr && id1.idx == id2.idx);
}
Expand Down
40 changes: 28 additions & 12 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,11 @@ int NodeImpl::init(const NodeOptions& options) {
return -1;
}

CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
if (!options.learner) {
CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms));
CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms));
CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms));
}
CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000));

_config_manager = new ConfigurationManager();
Expand Down Expand Up @@ -633,7 +635,7 @@ int NodeImpl::init(const NodeOptions& options) {
// conditions
std::unique_lock<raft_mutex_t> lck(_mutex);
if (_conf.stable() && _conf.conf.size() == 1u
&& _conf.conf.contains(_server_id)) {
&& _conf.conf.contains(_server_id) && !options.learner) {
// The group contains only this server which must be the LEADER, trigger
// the timer immediately.
elect_self(&lck);
Expand Down Expand Up @@ -1189,18 +1191,32 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) {
if (_replicator_group.find_the_next_candidate(&peer_id, _conf) != 0) {
return -1;
}
} else {
bool found = false;
for (Configuration::const_iterator iter = _conf.begin(); iter != _conf.end(); ++iter) {
if (*iter == peer_id) {
if (iter->learner) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer " << peer_id
<< " which is a learner";
return EINVAL;
}
found = true;
break;
}
}
if (!found) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer " << peer_id
<< " which doesn't belong to " << _conf.conf;
return EINVAL;
}
}
if (peer_id == _server_id) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " transfering leadership to self";
return 0;
}
if (!_conf.contains(peer_id)) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " refused to transfer leadership to peer " << peer_id
<< " which doesn't belong to " << _conf.conf;
return EINVAL;
}
const int64_t last_log_index = _log_manager->last_log_index();
const int rc = _replicator_group.transfer_leadership_to(peer_id, last_log_index);
if (rc != 0) {
Expand Down Expand Up @@ -1609,7 +1625,7 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {

for (std::set<PeerId>::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
if (*iter == _server_id || iter->learner) {
continue;
}
brpc::ChannelOptions options;
Expand Down Expand Up @@ -1715,7 +1731,7 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
const DisruptedLeader& disrupted_leader) {
for (std::set<PeerId>::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
if (*iter == _server_id || iter->learner) {
continue;
}
brpc::ChannelOptions options;
Expand Down
6 changes: 6 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ struct NodeOptions {
// Default: false
bool disable_cli;

// If true, this node will neither participate in elections nor be inclued in quorum
// learner will not trigger election_timeout or vote
// Default: false
bool learner;

// Construct a default instance
NodeOptions();

Expand All @@ -609,6 +614,7 @@ inline NodeOptions::NodeOptions()
, snapshot_file_system_adaptor(NULL)
, snapshot_throttle(NULL)
, disable_cli(false)
, learner(false)
{}

inline int NodeOptions::get_catchup_timeout_ms() {
Expand Down
2 changes: 1 addition & 1 deletion src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ int ReplicatorGroup::find_the_next_candidate(
int64_t max_index = 0;
for (std::map<PeerId, ReplicatorIdAndStatus>::const_iterator
iter = _rmap.begin(); iter != _rmap.end(); ++iter) {
if (!conf.contains(iter->first)) {
if (!conf.contains(iter->first) || iter->first.learner) {
continue;
}
const int64_t next_index = Replicator::get_next_index(iter->second.id);
Expand Down

0 comments on commit 4ae3a76

Please sign in to comment.