Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce address comparisons for network topology replica calculation #532

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
20 changes: 20 additions & 0 deletions src/host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,24 @@ bool remove_host(CopyOnWriteHostVec& hosts, const Address& address);

}}} // namespace datastax::internal::core


namespace std {

#if defined(HASH_IN_TR1) && !defined(_WIN32)
namespace tr1 {
#endif

template <>
struct hash<datastax::internal::core::Host*> {
size_t operator()(const datastax::internal::core::Host* host) const {
return host->address().hash_code();
mpenick marked this conversation as resolved.
Show resolved Hide resolved
}
};

#if defined(HASH_IN_TR1) && !defined(_WIN32)
} // namespace tr1
#endif

} // namespace std

#endif
22 changes: 18 additions & 4 deletions src/token_map_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,14 @@ inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
return true;
}

class RawPtrHostSet : public DenseHashSet<Host*> {
public:
RawPtrHostSet() {
set_empty_key(0x0);
set_deleted_key(reinterpret_cast<Host*>(0x1));
}
};

template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
Expand Down Expand Up @@ -435,6 +443,9 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);

RawPtrHostSet replicas_set;
replicas_set.resize(num_replicas);

// Clear datacenter and rack information for the next token
for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end();
j != end; ++j) {
Expand All @@ -444,7 +455,7 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
}

for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end();
j != end && replicas->size() < num_replicas; ++j) {
j != end && replicas_set.size() < num_replicas; ++j) {
typename TokenHostVec::const_iterator curr_token_it = token_it;
Host* host = curr_token_it->second;
uint32_t dc = host->dc_id();
Expand Down Expand Up @@ -476,15 +487,17 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
// datacenter only then consider hosts in the same rack

if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
if (add_replica(replicas, Host::Ptr(host))) {
if (replicas_set.insert(host).second) {
replicas->push_back(Host::Ptr(host));
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
if (add_replica(replicas, Host::Ptr(host))) {
if (replicas_set.insert(host).second) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just embed the changes in the add_replica()?
another option is to try keep replicas sorted and use binary search? i.e. use
std::sort(), or even std::stable_sort() that should be faster for sorted or, almost sorted data, then just use std::lower_bound() with custom Comparator to check if the host needs to be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token order of replicas matters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still could embed the logic in to add_replica()

replicas->push_back(Host::Ptr(host));
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}
Expand All @@ -494,7 +507,8 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
if (replicas_set.insert(skipped_endpoints_this_dc.front()->second).second) {
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
Expand Down
12 changes: 10 additions & 2 deletions tests/src/unit/test_token_map_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class BufferBuilder {

static size_t size_of(const String& value) { return value.size(); }

static size_t size_of(const Address& value) { char buf[16]; return value.to_inet(buf); }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes fix test warnings.


static void encode(char* buf, uint16_t value) { datastax::internal::encode_uint16(buf, value); }

static void encode(char* buf, int32_t value) { datastax::internal::encode_int32(buf, value); }
Expand All @@ -87,6 +89,8 @@ class BufferBuilder {

static void encode(char* buf, const String& value) { memcpy(buf, value.data(), value.size()); }

static void encode(char* buf, const Address& value) { value.to_inet(buf); }

private:
String buffer_;
};
Expand Down Expand Up @@ -154,9 +158,11 @@ class RowResultResponseBuilder : protected BufferBuilder {
++row_count_;
}

void append_local_peers_row_v3(const TokenVec& tokens, const String& partitioner,
void append_local_peers_row_v3(const Address& rpc_address,
const TokenVec& tokens, const String& partitioner,
const String& dc, const String& rack,
const String& release_version) {
append_value<Address>(rpc_address);
append_value<String>(rack);
append_value<String>(dc);
append_value<String>(release_version);
Expand Down Expand Up @@ -306,8 +312,10 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens,
Host::Ptr host(new Host(address));

DataType::ConstPtr varchar_data_type(new DataType(CASS_VALUE_TYPE_VARCHAR));
DataType::ConstPtr inet_data_type(new DataType(CASS_VALUE_TYPE_INET));

ColumnMetadataVec column_metadata;
column_metadata.push_back(ColumnMetadata("rpc_address", inet_data_type));
column_metadata.push_back(ColumnMetadata("data_center", varchar_data_type));
column_metadata.push_back(ColumnMetadata("rack", varchar_data_type));
column_metadata.push_back(ColumnMetadata("release_version", varchar_data_type));
Expand All @@ -318,7 +326,7 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens,
ColumnMetadata("tokens", CollectionType::list(varchar_data_type, true)));

RowResultResponseBuilder builder(column_metadata);
builder.append_local_peers_row_v3(tokens, partitioner, dc, rack, release_version);
builder.append_local_peers_row_v3(address, tokens, partitioner, dc, rack, release_version);

host->set(&builder.finish()->first_row(), true);

Expand Down
10 changes: 5 additions & 5 deletions tests/src/unit/tests/test_token_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ TEST(TokenMapUnitTest, Murmur3MultipleTokensPerHost) {
TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
TestTokenMap<Murmur3Partitioner> test_murmur3;

size_t num_dcs = 3;
size_t num_racks = 3;
size_t num_hosts = 4;
size_t num_dcs = 2;
size_t num_racks = 1;
size_t num_hosts = 27;
size_t num_vnodes = 256;
size_t replication_factor = 3;
size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs;
size_t replication_factor = 54;
size_t total_replicas = replication_factor;
Copy link
Contributor Author

@mpenick mpenick Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should likely be reverted. This is a pathological use case though.


ReplicationMap replication;
MT19937_64 rng;
Expand Down