From 239075c5abe1c5f50b2afe3d0d50908e133305f9 Mon Sep 17 00:00:00 2001 From: Michael Penick Date: Mon, 16 Mar 2020 13:23:54 -0400 Subject: [PATCH] CPP-913 Fix: Ensure no duplicates in token map replica sets --- src/token_map.hpp | 2 + src/token_map_impl.hpp | 92 ++++++++++++++++++++++--- tests/src/unit/tests/test_token_map.cpp | 56 ++++++++++++--- 3 files changed, 131 insertions(+), 19 deletions(-) diff --git a/src/token_map.hpp b/src/token_map.hpp index 908a16f7d..d36e3f9fa 100644 --- a/src/token_map.hpp +++ b/src/token_map.hpp @@ -52,6 +52,8 @@ class TokenMap : public RefCounted { virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name, const String& routing_key) const = 0; + + virtual String dump(const String& keyspace_name) const = 0; }; }}} // namespace datastax::internal::core diff --git a/src/token_map_impl.hpp b/src/token_map_impl.hpp index b4af34692..b08b37cbf 100644 --- a/src/token_map_impl.hpp +++ b/src/token_map_impl.hpp @@ -34,6 +34,8 @@ #include #include +#include +#include #include #define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy" @@ -143,12 +145,32 @@ class ByteOrderedPartitioner { static StringRef name() { return "ByteOrderedPartitioner"; } }; +inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) { + os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0') + << std::setw(16) << std::hex << token.lo; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) { + for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end(); + it != end; ++it) { + os << std::hex << *it; + } + return os; +} + class HostSet : public DenseHashSet { public: HostSet() { set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY))); set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY))); } + + template + HostSet(InputIterator first, InputIterator last) + : DenseHashSet(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) { + set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY))); + } }; class RackSet : public DenseHashSet { @@ -355,6 +377,17 @@ void ReplicationStrategy::build_replicas(const TokenHostVec& tokens } } +// Adds unique replica. It returns true if the replica was added. +inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) { + for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) { + if ((*it)->address() == host->address()) { + return false; // Already in the replica set + } + } + hosts->push_back(host); + return true; +} + template void ReplicationStrategy::build_replicas_network_topology( const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const { @@ -443,24 +476,27 @@ void ReplicationStrategy::build_replicas_network_topology( // datacenter only then consider hosts in the same rack if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) { - ++replica_count_this_dc; - replicas->push_back(Host::Ptr(host)); + if (add_replica(replicas, Host::Ptr(host))) { + ++replica_count_this_dc; + } } else { TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints; if (racks_observed_this_dc.count(rack) > 0) { skipped_endpoints_this_dc.push_back(curr_token_it); } else { - ++replica_count_this_dc; - replicas->push_back(Host::Ptr(host)); - racks_observed_this_dc.insert(rack); + if (add_replica(replicas, Host::Ptr(host))) { + ++replica_count_this_dc; + racks_observed_this_dc.insert(rack); + } // Once we visited every rack in the current datacenter then starting considering // hosts we've already skipped. if (racks_observed_this_dc.size() == rack_count_this_dc) { while (!skipped_endpoints_this_dc.empty() && replica_count_this_dc < replication_factor) { - ++replica_count_this_dc; - replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second)); + if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) { + ++replica_count_this_dc; + } skipped_endpoints_this_dc.pop_front(); } } @@ -484,9 +520,10 @@ void ReplicationStrategy::build_replicas_simple(const TokenHostVec& for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end; ++i) { CopyOnWriteHostVec replicas(new HostVec()); + replicas->reserve(num_replicas); typename TokenHostVec::const_iterator token_it = i; do { - replicas->push_back(Host::Ptr(token_it->second)); + add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second))); ++token_it; if (token_it == tokens.end()) { token_it = tokens.begin(); @@ -578,7 +615,11 @@ class TokenMapImpl : public TokenMap { virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name, const String& routing_key) const; - // Test only + virtual String dump(const String& keyspace_name) const; + +public: + // Testing only + bool contains(const Token& token) const { for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end; ++i) { @@ -587,6 +628,8 @@ class TokenMapImpl : public TokenMap { return false; } + const TokenReplicasVec& token_replicas(const String& keyspace_name) const; + private: void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result, bool should_build_replicas); @@ -713,6 +756,35 @@ const CopyOnWriteHostVec& TokenMapImpl::get_replicas(const String& return no_replicas_dummy_; } +template +String TokenMapImpl::dump(const String& keyspace_name) const { + String result; + typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name); + const TokenReplicasVec& replicas = ks_it->second; + + for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end(); + it != end; ++it) { + OStringStream ss; + ss << std::setw(20) << it->first << " [ "; + const CopyOnWriteHostVec& hosts = it->second; + for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end; + ++host_it) { + ss << (*host_it)->address_string() << " "; + } + ss << "]\n"; + result.append(ss.str()); + } + return result; +} + +template +const typename TokenMapImpl::TokenReplicasVec& +TokenMapImpl::token_replicas(const String& keyspace_name) const { + typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name); + static TokenReplicasVec not_found; + return ks_it != replicas_.end() ? ks_it->second : not_found; +} + template void TokenMapImpl::update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result, @@ -773,6 +845,8 @@ void TokenMapImpl::build_replicas() { const String& keyspace_name = i->first; const ReplicationStrategy& strategy = i->second; strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]); + LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(), + dump(keyspace_name).c_str()); } } diff --git a/tests/src/unit/tests/test_token_map.cpp b/tests/src/unit/tests/test_token_map.cpp index 6c12268cc..3487cb544 100644 --- a/tests/src/unit/tests/test_token_map.cpp +++ b/tests/src/unit/tests/test_token_map.cpp @@ -30,6 +30,7 @@ template struct TestTokenMap { typedef typename ReplicationStrategy::Token Token; typedef Map TokenHostMap; + typedef typename TokenMapImpl::TokenReplicasVec TokenReplicasVec; TokenHostMap tokens; TokenMap::Ptr token_map; @@ -43,14 +44,11 @@ struct TestTokenMap { const String v(*i); tokens[Partitioner::from_string(*i)] = host; } + token_map->add_host(host); } void build(const String& keyspace_name = "ks", size_t replication_factor = 3) { add_keyspace_simple(keyspace_name, replication_factor, token_map.get()); - for (typename TokenHostMap::const_iterator i = tokens.begin(), end = tokens.end(); i != end; - ++i) { - token_map->add_host(i->second); - } token_map->build(); } @@ -63,7 +61,7 @@ struct TestTokenMap { } } - void verify(const String& keyspace_name = "ks") { + void verify(const String& keyspace_name = "ks", size_t replication_factor = 3) { const String keys[] = { "test", "abc", "def", "a", "b", "c", "d" }; for (size_t i = 0; i < sizeof(keys) / sizeof(keys[0]); ++i) { @@ -78,6 +76,25 @@ struct TestTokenMap { EXPECT_EQ(hosts->front()->address(), host->address()); } + + verify_unique_replica_count(keyspace_name, replication_factor); + } + + void verify_unique_replica_count(const String& keyspace_name = "ks", + size_t replication_factor = 3) { + // Verify a unique set of replicas per token + const TokenReplicasVec& token_replicas = + static_cast*>(token_map.get())->token_replicas(keyspace_name); + + ASSERT_EQ(tokens.size(), token_replicas.size()); + + for (typename TokenReplicasVec::const_iterator it = token_replicas.begin(), + end = token_replicas.end(); + it != end; ++it) { + HostSet replicas(it->second->begin(), it->second->end()); + // Using assert here because they're can be many, many tokens + ASSERT_EQ(replication_factor, replicas.size()); + } } }; @@ -117,6 +134,7 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { size_t num_hosts = 4; size_t num_vnodes = 256; size_t replication_factor = 3; + size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs; ReplicationMap replication; MT19937_64 rng; @@ -144,7 +162,6 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { Murmur3Partitioner::name().to_string(), rack, dc)); test_murmur3.add_host(host); - token_map->add_host(host); } } } @@ -159,7 +176,7 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { const String& key = keys[i]; const CopyOnWriteHostVec& hosts = token_map->get_replicas("ks1", key); - ASSERT_TRUE(hosts && hosts->size() == replication_factor * num_dcs); + ASSERT_TRUE(hosts && hosts->size() == total_replicas); typedef Map > DcRackMap; @@ -181,6 +198,8 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { EXPECT_EQ((*hosts)[0]->address(), host->address()); } + + test_murmur3.verify_unique_replica_count("ks1", total_replicas); } TEST(TokenMapUnitTest, Random) { @@ -223,7 +242,7 @@ TEST(TokenMapUnitTest, RemoveHost) { test_remove_host.add_host(create_host("1.0.0.3", single_token(CASS_INT64_MAX / 2))); test_remove_host.build("ks", 2); - test_remove_host.verify(); + test_remove_host.verify("ks", 2); TokenMap* token_map = test_remove_host.token_map.get(); @@ -275,7 +294,7 @@ TEST(TokenMapUnitTest, UpdateHost) { test_update_host.add_host(create_host("1.0.0.2", single_token(CASS_INT64_MIN / 4))); test_update_host.build("ks", 4); - test_update_host.verify(); + test_update_host.verify("ks", 2); // Only two hosts, so rf = 2 TokenMap* token_map = test_update_host.token_map.get(); @@ -317,6 +336,8 @@ TEST(TokenMapUnitTest, UpdateHost) { EXPECT_EQ((*replicas)[2]->address(), Address("1.0.0.3", 9042)); EXPECT_EQ((*replicas)[3]->address(), Address("1.0.0.4", 9042)); } + + test_update_host.verify("ks", 4); } /** @@ -436,7 +457,7 @@ TEST(TokenMapUnitTest, DropKeyspace) { test_drop_keyspace.add_host(create_host("1.0.0.3", single_token(CASS_INT64_MAX / 2))); test_drop_keyspace.build("ks", 2); - test_drop_keyspace.verify(); + test_drop_keyspace.verify("ks", 2); TokenMap* token_map = test_drop_keyspace.token_map.get(); @@ -456,3 +477,18 @@ TEST(TokenMapUnitTest, DropKeyspace) { EXPECT_FALSE(replicas); } } + +TEST(TokenMapUnitTest, UniqueReplicas) { + TestTokenMap test_murmur3; + + const size_t tokens_per_host = 256; + MT19937_64 rng; + + test_murmur3.add_host(create_host("1.0.0.1", random_murmur3_tokens(rng, tokens_per_host))); + test_murmur3.add_host(create_host("1.0.0.2", random_murmur3_tokens(rng, tokens_per_host))); + test_murmur3.add_host(create_host("1.0.0.3", random_murmur3_tokens(rng, tokens_per_host))); + test_murmur3.add_host(create_host("1.0.0.4", random_murmur3_tokens(rng, tokens_per_host))); + + test_murmur3.build(); + test_murmur3.verify(); +}