Skip to content

Commit

Permalink
Merge pull request #470 from datastax/CPP-913
Browse files Browse the repository at this point in the history
CPP-913 Fix: Ensure no duplicates in token map replica sets
  • Loading branch information
mpenick authored Mar 17, 2020
2 parents bd32509 + 239075c commit 7a539df
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 19 deletions.
28 changes: 28 additions & 0 deletions src/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,24 @@ RequestHandler::RequestHandler(const Request::ConstPtr& request, const ResponseF
, manager_(NULL)
, metrics_(metrics) {}

RequestHandler::~RequestHandler() {
if (Logger::log_level() >= CASS_LOG_TRACE) {
OStringStream ss;
for (RequestTryVec::const_iterator it = request_tries_.begin(), end = request_tries_.end();
it != end; ++it) {
if (it != request_tries_.begin()) ss << ", ";
ss << "(" << it->address << ", ";
if (it->error != CASS_OK) {
ss << cass_error_desc(it->error);
} else {
ss << it->latency;
}
ss << ")";
}
LOG_TRACE("Speculative execution attempts: [%s]", ss.str().c_str());
}
}

void RequestHandler::set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry) {
wrapper_.set_prepared_metadata(entry);
}
Expand Down Expand Up @@ -269,6 +287,10 @@ void RequestHandler::set_response(const Host::Ptr& host, const Response::Ptr& re
metrics_->record_speculative_request(uv_hrtime() - start_time_ns_);
}
}

if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), uv_hrtime() - start_time_ns_));
}
}

void RequestHandler::set_error(CassError code, const String& message) {
Expand All @@ -289,6 +311,9 @@ void RequestHandler::set_error(const Host::Ptr& host, CassError code, const Stri
set_error(code, message);
}
}
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), code));
}
}

void RequestHandler::set_error_with_error_response(const Host::Ptr& host,
Expand All @@ -297,6 +322,9 @@ void RequestHandler::set_error_with_error_response(const Host::Ptr& host,
stop_request();
running_executions_--;
future_->set_error_with_response(host->address(), error, code, message);
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), code));
}
}

void RequestHandler::stop_timer() { timer_.stop(); }
Expand Down
25 changes: 25 additions & 0 deletions src/request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@ class ExecutionProfile;
class Timer;
class TokenMap;

struct RequestTry {
RequestTry()
: error(CASS_OK)
, latency(0) {}

RequestTry(const Address& address, uint64_t latency)
: address(address)
, error(CASS_OK)
, latency(latency / (1000 * 1000)) {} // To milliseconds

RequestTry(const Address& address, CassError error)
: address(address)
, error(error)
, latency(0) {}

Address address;
CassError error;
uint64_t latency;
};

typedef SmallVector<RequestTry, 2> RequestTryVec;

class ResponseFuture : public Future {
public:
typedef SharedRefPtr<ResponseFuture> Ptr;
Expand Down Expand Up @@ -138,6 +160,7 @@ class RequestHandler : public RefCounted<RequestHandler> {

RequestHandler(const Request::ConstPtr& request, const ResponseFuture::Ptr& future,
Metrics* metrics = NULL);
~RequestHandler();

void set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry);

Expand Down Expand Up @@ -210,6 +233,8 @@ class RequestHandler : public RefCounted<RequestHandler> {
ConnectionPoolManager* manager_;

Metrics* const metrics_;

RequestTryVec request_tries_;
};

class KeyspaceChangedResponse {
Expand Down
2 changes: 2 additions & 0 deletions src/token_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class TokenMap : public RefCounted<TokenMap> {

virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const = 0;

virtual String dump(const String& keyspace_name) const = 0;
};

}}} // namespace datastax::internal::core
Expand Down
92 changes: 83 additions & 9 deletions src/token_map_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include <algorithm>
#include <assert.h>
#include <iomanip>
#include <ios>
#include <uv.h>

#define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
Expand Down Expand Up @@ -143,12 +145,32 @@ class ByteOrderedPartitioner {
static StringRef name() { return "ByteOrderedPartitioner"; }
};

inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
<< std::setw(16) << std::hex << token.lo;
return os;
}

inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
it != end; ++it) {
os << std::hex << *it;
}
return os;
}

class HostSet : public DenseHashSet<Host::Ptr> {
public:
HostSet() {
set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}

template <class InputIterator>
HostSet(InputIterator first, InputIterator last)
: DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}
};

class RackSet : public DenseHashSet<uint32_t> {
Expand Down Expand Up @@ -355,6 +377,17 @@ void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens
}
}

// Adds unique replica. It returns true if the replica was added.
inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
if ((*it)->address() == host->address()) {
return false; // Already in the replica set
}
}
hosts->push_back(host);
return true;
}

template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
Expand Down Expand Up @@ -443,24 +476,27 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
// datacenter only then consider hosts in the same rack

if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(host));
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(host));
racks_observed_this_dc.insert(rack);
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}

// Once we visited every rack in the current datacenter then starting considering
// hosts we've already skipped.
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
}
}
Expand All @@ -484,9 +520,10 @@ void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec&
for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
++i) {
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);
typename TokenHostVec::const_iterator token_it = i;
do {
replicas->push_back(Host::Ptr(token_it->second));
add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
++token_it;
if (token_it == tokens.end()) {
token_it = tokens.begin();
Expand Down Expand Up @@ -578,7 +615,11 @@ class TokenMapImpl : public TokenMap {
virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const;

// Test only
virtual String dump(const String& keyspace_name) const;

public:
// Testing only

bool contains(const Token& token) const {
for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
++i) {
Expand All @@ -587,6 +628,8 @@ class TokenMapImpl : public TokenMap {
return false;
}

const TokenReplicasVec& token_replicas(const String& keyspace_name) const;

private:
void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
bool should_build_replicas);
Expand Down Expand Up @@ -713,6 +756,35 @@ const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String&
return no_replicas_dummy_;
}

template <class Partitioner>
String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
String result;
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
const TokenReplicasVec& replicas = ks_it->second;

for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
it != end; ++it) {
OStringStream ss;
ss << std::setw(20) << it->first << " [ ";
const CopyOnWriteHostVec& hosts = it->second;
for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
++host_it) {
ss << (*host_it)->address_string() << " ";
}
ss << "]\n";
result.append(ss.str());
}
return result;
}

template <class Partitioner>
const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
static TokenReplicasVec not_found;
return ks_it != replicas_.end() ? ks_it->second : not_found;
}

template <class Partitioner>
void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
const ResultResponse* result,
Expand Down Expand Up @@ -773,6 +845,8 @@ void TokenMapImpl<Partitioner>::build_replicas() {
const String& keyspace_name = i->first;
const ReplicationStrategy<Partitioner>& strategy = i->second;
strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
dump(keyspace_name).c_str());
}
}

Expand Down
Loading

0 comments on commit 7a539df

Please sign in to comment.