Skip to content

Commit

Permalink
peer: support ipv6 and uds endpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds committed Apr 29, 2024
1 parent f2cda9e commit ba5d4a7
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 14 deletions.
47 changes: 36 additions & 11 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

#include <map>
#include <ostream>
#include <regex>
#include <set>
#include <string>
#include <vector>

namespace braft {

typedef std::string GroupId;
Expand Down Expand Up @@ -78,22 +78,47 @@ struct PeerId {

int parse(const std::string& str) {
reset();
char ip_str[64];
int value = REPLICA;
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str,
&addr.port, &idx, &value)) {
reset();
// peer_id format:
// endpoint:idx:role
// ^
// |- host:port
// |- ip:port
// |- [ipv6]:port
// |- unix:path/to/sock

std::regex peerid_reg(
"((([^:]+)|(\\[.*\\])):[^:]+)(:(\\d)?)?(:(\\d+)?)?");
// ^ ^ ^ ^
// | | | |
// unix,host,ipv4 | idx(6)(opt) |
// ipv6 role(8)(opt)
std::cmatch m;
auto ret = std::regex_match(str.c_str(), m, peerid_reg);
if (!ret || m.size() != 9) {
return -1;
}
role = (Role)value;
if (role > WITNESS) {

// Using str2endpoint to check endpoint is valid.
std::string enpoint_str = m[1];
if (butil::str2endpoint(enpoint_str.c_str(), &addr) != 0) {
reset();
return -1;
}
if (0 != butil::str2ip(ip_str, &addr.ip)) {
reset();
return -1;

if (m[6].matched) {
// Check endpoint.
idx = std::stoi(m[6]);
}

// Check role if it existed.
if (m[8].matched) {
role = static_cast<Role>(std::stoi(m[8]));
if (role > WITNESS) {
reset();
return -1;
}
}

return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <optional>

#include "_deps/brpc/src/src/butil/endpoint.h"
#include "braft/builtin_service_impl.h"
#include "braft/configuration.h"
#include "braft/errno.pb.h"
Expand Down Expand Up @@ -506,7 +507,8 @@ int NodeImpl::init(const NodeOptions& options) {
_options = options;

// check _server_id
if (butil::IP_ANY == _server_id.addr.ip) {
if (!butil::is_endpoint_extended(_server_id.addr) &&
butil::IP_ANY == _server_id.addr.ip) {
LOG(ERROR) << "Group " << _group_id
<< " Node can't started from IP_ANY";
return -1;
Expand Down
3 changes: 2 additions & 1 deletion src/braft/node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "braft/node_manager.h"

#include "_deps/brpc/src/src/butil/details/extended_endpoint.hpp"
#include "braft/builtin_service_impl.h"
#include "braft/cli_service.h"
#include "braft/file_service.h"
Expand All @@ -29,7 +30,7 @@ NodeManager::~NodeManager() {}

bool NodeManager::server_exists(butil::EndPoint addr) {
BAIDU_SCOPED_LOCK(_mutex);
if (addr.ip != butil::IP_ANY) {
if (!is_endpoint_extended(addr) && addr.ip != butil::IP_ANY) {
butil::EndPoint any_addr(butil::IP_ANY, addr.port);
if (_addr_set.find(any_addr) != _addr_set.end()) {
return true;
Expand Down
27 changes: 27 additions & 0 deletions test/test_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ TEST_F(TestUsageSuits, PeerId) {

PeerId id3("1.2.3.4:1000:0");
LOG(INFO) << "id:" << id3;

// UDS format
PeerId id4;
id4.parse("unix:/path/to/sock:1:1");
LOG(INFO) << "id:" << id4;
ASSERT_EQ(id4.idx, 1);
ASSERT_EQ(id4.role, 1);

PeerId id5;
ASSERT_EQ(id5.parse("invalid:1:1"), -1);

// ipv6 format
PeerId id6;
id6.parse("[::1]:1");
ASSERT_EQ(id6.idx, 0);
ASSERT_EQ(id6.role, 0);
LOG(INFO) << "id:" << id6;

PeerId id7;
id7.parse("[::1]:1:1:1");
ASSERT_EQ(id7.idx, 1);
ASSERT_EQ(id7.role, 1);
LOG(INFO) << "id:" << id7;

PeerId id8;
ASSERT_EQ(id8.parse("[:::1:1"), -1);
ASSERT_EQ(id8.parse("::]:1:1"), -1);
}

TEST_F(TestUsageSuits, Configuration) {
Expand Down
30 changes: 30 additions & 0 deletions test/test_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,36 @@ TEST_P(NodeTest, Server) {
server2.Start("0.0.0.0:5007", NULL);
}

TEST_P(NodeTest, UDSNode) {
braft::PeerId peer0("unix:data/0.sock");
braft::PeerId peer1("unix:data/1.sock");
braft::PeerId peer2("unix:data/2.sock");
std::vector<braft::PeerId> peers = {peer0, peer1, peer2};
Cluster cluster("unittest", peers);

for (int i = 0; i < 3; i++) {
cluster.start(peers[i].addr);
}
cluster.wait_leader();
LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string();
}

TEST_P(NodeTest, IPV6Node) {
std::vector<braft::PeerId> peers;
braft::PeerId peer0("[::1]:5006");
braft::PeerId peer1("[::1]:5007");
braft::PeerId peer2("[::1]:5008");
peers = {peer0, peer1, peer2};

Cluster cluster("unittest", peers);
for (int i = 0; i < 3; i++) {
cluster.start(peers[i].addr);
}

cluster.wait_leader();
LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string();
}

TEST_P(NodeTest, SingleNode) {
brpc::Server server;
int ret = braft::add_service(&server, 5006);
Expand Down
4 changes: 3 additions & 1 deletion test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,11 @@ class Cluster {
int snapshot_interval_s = 30,
braft::Closure* leader_start_closure = NULL, bool witness = false) {
if (_server_map[listen_addr] == NULL) {
brpc::ServerOptions server_options;
server_options.server_info_name = butil::endpoint2str(listen_addr).c_str();
brpc::Server* server = new brpc::Server();
if (braft::add_service(server, listen_addr) != 0
|| server->Start(listen_addr, NULL) != 0) {
|| server->Start(listen_addr, &server_options) != 0) {
LOG(ERROR) << "Fail to start raft service";
delete server;
return -1;
Expand Down

0 comments on commit ba5d4a7

Please sign in to comment.