Skip to content

Commit

Permalink
CPP-913 Fix: Ensure no duplicates in token map replica sets
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Mar 16, 2020
1 parent dfce03c commit 239075c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 19 deletions.
2 changes: 2 additions & 0 deletions src/token_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class TokenMap : public RefCounted<TokenMap> {

virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const = 0;

virtual String dump(const String& keyspace_name) const = 0;
};

}}} // namespace datastax::internal::core
Expand Down
92 changes: 83 additions & 9 deletions src/token_map_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include <algorithm>
#include <assert.h>
#include <iomanip>
#include <ios>
#include <uv.h>

#define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
Expand Down Expand Up @@ -143,12 +145,32 @@ class ByteOrderedPartitioner {
static StringRef name() { return "ByteOrderedPartitioner"; }
};

inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
<< std::setw(16) << std::hex << token.lo;
return os;
}

inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
it != end; ++it) {
os << std::hex << *it;
}
return os;
}

class HostSet : public DenseHashSet<Host::Ptr> {
public:
HostSet() {
set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}

template <class InputIterator>
HostSet(InputIterator first, InputIterator last)
: DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}
};

class RackSet : public DenseHashSet<uint32_t> {
Expand Down Expand Up @@ -355,6 +377,17 @@ void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens
}
}

// Adds unique replica. It returns true if the replica was added.
inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
if ((*it)->address() == host->address()) {
return false; // Already in the replica set
}
}
hosts->push_back(host);
return true;
}

template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
Expand Down Expand Up @@ -443,24 +476,27 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
// datacenter only then consider hosts in the same rack

if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(host));
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(host));
racks_observed_this_dc.insert(rack);
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}

// Once we visited every rack in the current datacenter then starting considering
// hosts we've already skipped.
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
}
}
Expand All @@ -484,9 +520,10 @@ void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec&
for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
++i) {
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);
typename TokenHostVec::const_iterator token_it = i;
do {
replicas->push_back(Host::Ptr(token_it->second));
add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
++token_it;
if (token_it == tokens.end()) {
token_it = tokens.begin();
Expand Down Expand Up @@ -578,7 +615,11 @@ class TokenMapImpl : public TokenMap {
virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const;

// Test only
virtual String dump(const String& keyspace_name) const;

public:
// Testing only

bool contains(const Token& token) const {
for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
++i) {
Expand All @@ -587,6 +628,8 @@ class TokenMapImpl : public TokenMap {
return false;
}

const TokenReplicasVec& token_replicas(const String& keyspace_name) const;

private:
void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
bool should_build_replicas);
Expand Down Expand Up @@ -713,6 +756,35 @@ const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String&
return no_replicas_dummy_;
}

template <class Partitioner>
String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
String result;
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
const TokenReplicasVec& replicas = ks_it->second;

for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
it != end; ++it) {
OStringStream ss;
ss << std::setw(20) << it->first << " [ ";
const CopyOnWriteHostVec& hosts = it->second;
for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
++host_it) {
ss << (*host_it)->address_string() << " ";
}
ss << "]\n";
result.append(ss.str());
}
return result;
}

template <class Partitioner>
const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
static TokenReplicasVec not_found;
return ks_it != replicas_.end() ? ks_it->second : not_found;
}

template <class Partitioner>
void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
const ResultResponse* result,
Expand Down Expand Up @@ -773,6 +845,8 @@ void TokenMapImpl<Partitioner>::build_replicas() {
const String& keyspace_name = i->first;
const ReplicationStrategy<Partitioner>& strategy = i->second;
strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
dump(keyspace_name).c_str());
}
}

Expand Down
56 changes: 46 additions & 10 deletions tests/src/unit/tests/test_token_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ template <class Partitioner>
struct TestTokenMap {
typedef typename ReplicationStrategy<Partitioner>::Token Token;
typedef Map<Token, Host::Ptr> TokenHostMap;
typedef typename TokenMapImpl<Partitioner>::TokenReplicasVec TokenReplicasVec;

TokenHostMap tokens;
TokenMap::Ptr token_map;
Expand All @@ -43,14 +44,11 @@ struct TestTokenMap {
const String v(*i);
tokens[Partitioner::from_string(*i)] = host;
}
token_map->add_host(host);
}

void build(const String& keyspace_name = "ks", size_t replication_factor = 3) {
add_keyspace_simple(keyspace_name, replication_factor, token_map.get());
for (typename TokenHostMap::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
++i) {
token_map->add_host(i->second);
}
token_map->build();
}

Expand All @@ -63,7 +61,7 @@ struct TestTokenMap {
}
}

void verify(const String& keyspace_name = "ks") {
void verify(const String& keyspace_name = "ks", size_t replication_factor = 3) {
const String keys[] = { "test", "abc", "def", "a", "b", "c", "d" };

for (size_t i = 0; i < sizeof(keys) / sizeof(keys[0]); ++i) {
Expand All @@ -78,6 +76,25 @@ struct TestTokenMap {

EXPECT_EQ(hosts->front()->address(), host->address());
}

verify_unique_replica_count(keyspace_name, replication_factor);
}

void verify_unique_replica_count(const String& keyspace_name = "ks",
size_t replication_factor = 3) {
// Verify a unique set of replicas per token
const TokenReplicasVec& token_replicas =
static_cast<TokenMapImpl<Partitioner>*>(token_map.get())->token_replicas(keyspace_name);

ASSERT_EQ(tokens.size(), token_replicas.size());

for (typename TokenReplicasVec::const_iterator it = token_replicas.begin(),
end = token_replicas.end();
it != end; ++it) {
HostSet replicas(it->second->begin(), it->second->end());
// Using assert here because they're can be many, many tokens
ASSERT_EQ(replication_factor, replicas.size());
}
}
};

Expand Down Expand Up @@ -117,6 +134,7 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
size_t num_hosts = 4;
size_t num_vnodes = 256;
size_t replication_factor = 3;
size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs;

ReplicationMap replication;
MT19937_64 rng;
Expand Down Expand Up @@ -144,7 +162,6 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
Murmur3Partitioner::name().to_string(), rack, dc));

test_murmur3.add_host(host);
token_map->add_host(host);
}
}
}
Expand All @@ -159,7 +176,7 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
const String& key = keys[i];

const CopyOnWriteHostVec& hosts = token_map->get_replicas("ks1", key);
ASSERT_TRUE(hosts && hosts->size() == replication_factor * num_dcs);
ASSERT_TRUE(hosts && hosts->size() == total_replicas);

typedef Map<String, Set<String> > DcRackMap;

Expand All @@ -181,6 +198,8 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {

EXPECT_EQ((*hosts)[0]->address(), host->address());
}

test_murmur3.verify_unique_replica_count("ks1", total_replicas);
}

TEST(TokenMapUnitTest, Random) {
Expand Down Expand Up @@ -223,7 +242,7 @@ TEST(TokenMapUnitTest, RemoveHost) {
test_remove_host.add_host(create_host("1.0.0.3", single_token(CASS_INT64_MAX / 2)));

test_remove_host.build("ks", 2);
test_remove_host.verify();
test_remove_host.verify("ks", 2);

TokenMap* token_map = test_remove_host.token_map.get();

Expand Down Expand Up @@ -275,7 +294,7 @@ TEST(TokenMapUnitTest, UpdateHost) {
test_update_host.add_host(create_host("1.0.0.2", single_token(CASS_INT64_MIN / 4)));

test_update_host.build("ks", 4);
test_update_host.verify();
test_update_host.verify("ks", 2); // Only two hosts, so rf = 2

TokenMap* token_map = test_update_host.token_map.get();

Expand Down Expand Up @@ -317,6 +336,8 @@ TEST(TokenMapUnitTest, UpdateHost) {
EXPECT_EQ((*replicas)[2]->address(), Address("1.0.0.3", 9042));
EXPECT_EQ((*replicas)[3]->address(), Address("1.0.0.4", 9042));
}

test_update_host.verify("ks", 4);
}

/**
Expand Down Expand Up @@ -436,7 +457,7 @@ TEST(TokenMapUnitTest, DropKeyspace) {
test_drop_keyspace.add_host(create_host("1.0.0.3", single_token(CASS_INT64_MAX / 2)));

test_drop_keyspace.build("ks", 2);
test_drop_keyspace.verify();
test_drop_keyspace.verify("ks", 2);

TokenMap* token_map = test_drop_keyspace.token_map.get();

Expand All @@ -456,3 +477,18 @@ TEST(TokenMapUnitTest, DropKeyspace) {
EXPECT_FALSE(replicas);
}
}

TEST(TokenMapUnitTest, UniqueReplicas) {
TestTokenMap<Murmur3Partitioner> test_murmur3;

const size_t tokens_per_host = 256;
MT19937_64 rng;

test_murmur3.add_host(create_host("1.0.0.1", random_murmur3_tokens(rng, tokens_per_host)));
test_murmur3.add_host(create_host("1.0.0.2", random_murmur3_tokens(rng, tokens_per_host)));
test_murmur3.add_host(create_host("1.0.0.3", random_murmur3_tokens(rng, tokens_per_host)));
test_murmur3.add_host(create_host("1.0.0.4", random_murmur3_tokens(rng, tokens_per_host)));

test_murmur3.build();
test_murmur3.verify();
}

0 comments on commit 239075c

Please sign in to comment.