Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce address comparisons for network topology replica calculation #532

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
41 changes: 32 additions & 9 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#!groovy
import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException

def get_os_distro() {
return sh(label: 'Assign env.OS_DISTRO based on OS env', script: '''#!/bin/bash -le
echo ${OS_DISTRO}''', returnStdout: true).trim()
}

def initializeEnvironment() {
env.DRIVER_DISPLAY_NAME = 'Cassandra C/C++ Driver'
env.DRIVER_TYPE = 'CASS'
Expand Down Expand Up @@ -545,7 +550,10 @@ pipeline {
post {
success {
// Allow empty results for 'osx/high-sierra' which doesn't produce packages
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/libuv*", allowEmptyArchive: true
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/libuv*", allowEmptyArchive: true
}
}
}
}
Expand All @@ -555,12 +563,18 @@ pipeline {
}
post {
success {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/cassandra-*-tests"
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/dse-*-tests", allowEmptyArchive: true
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/cassandra-*-tests"
archiveArtifacts artifacts: "${distro}/**/dse-*-tests", allowEmptyArchive: true
}
}
failure {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeError.log"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${distro}/**/CMakeError.log"
}
}
}
}
Expand Down Expand Up @@ -590,7 +604,10 @@ pipeline {
}
post {
success {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/*-cpp-driver*"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/*-cpp-driver*"
}
}
}
}
Expand Down Expand Up @@ -748,8 +765,11 @@ pipeline {
}
post {
failure {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeError.log"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/CMakeOutput.log"
archiveArtifacts artifacts: "${distro}/**/CMakeError.log"
}
}
}
}
Expand All @@ -764,7 +784,10 @@ pipeline {
junit testResults: '*integration-tests-*-results.xml', allowEmptyResults: true
}
failure {
archiveArtifacts artifacts: "${env.OS_DISTRO}/**/*-integration-tests-driver-logs.tgz"
script {
def distro = get_os_distro()
archiveArtifacts artifacts: "${distro}/**/*-integration-tests-driver-logs.tgz"
}
}
cleanup {
cleanWs()
Expand Down
1 change: 1 addition & 0 deletions src/host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,5 @@ bool remove_host(CopyOnWriteHostVec& hosts, const Address& address);

}}} // namespace datastax::internal::core


#endif
14 changes: 10 additions & 4 deletions src/token_map_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);

AddressSet replicas_set;
replicas_set.resize(num_replicas);

// Clear datacenter and rack information for the next token
for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end();
j != end; ++j) {
Expand All @@ -444,7 +447,7 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
}

for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end();
j != end && replicas->size() < num_replicas; ++j) {
j != end && replicas_set.size() < num_replicas; ++j) {
typename TokenHostVec::const_iterator curr_token_it = token_it;
Host* host = curr_token_it->second;
uint32_t dc = host->dc_id();
Expand Down Expand Up @@ -476,15 +479,17 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
// datacenter only then consider hosts in the same rack

if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
if (add_replica(replicas, Host::Ptr(host))) {
if (replicas_set.insert(host->address()).second) {
replicas->push_back(Host::Ptr(host));
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
if (add_replica(replicas, Host::Ptr(host))) {
if (replicas_set.insert(host->address()).second) {
replicas->push_back(Host::Ptr(host));
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}
Expand All @@ -494,7 +499,8 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
if (replicas_set.insert(skipped_endpoints_this_dc.front()->second->address()).second) {
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
Expand Down
12 changes: 10 additions & 2 deletions tests/src/unit/test_token_map_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class BufferBuilder {

static size_t size_of(const String& value) { return value.size(); }

static size_t size_of(const Address& value) { char buf[16]; return value.to_inet(buf); }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes fix test warnings.


static void encode(char* buf, uint16_t value) { datastax::internal::encode_uint16(buf, value); }

static void encode(char* buf, int32_t value) { datastax::internal::encode_int32(buf, value); }
Expand All @@ -87,6 +89,8 @@ class BufferBuilder {

static void encode(char* buf, const String& value) { memcpy(buf, value.data(), value.size()); }

static void encode(char* buf, const Address& value) { value.to_inet(buf); }

private:
String buffer_;
};
Expand Down Expand Up @@ -154,9 +158,11 @@ class RowResultResponseBuilder : protected BufferBuilder {
++row_count_;
}

void append_local_peers_row_v3(const TokenVec& tokens, const String& partitioner,
void append_local_peers_row_v3(const Address& rpc_address,
const TokenVec& tokens, const String& partitioner,
const String& dc, const String& rack,
const String& release_version) {
append_value<Address>(rpc_address);
append_value<String>(rack);
append_value<String>(dc);
append_value<String>(release_version);
Expand Down Expand Up @@ -306,8 +312,10 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens,
Host::Ptr host(new Host(address));

DataType::ConstPtr varchar_data_type(new DataType(CASS_VALUE_TYPE_VARCHAR));
DataType::ConstPtr inet_data_type(new DataType(CASS_VALUE_TYPE_INET));

ColumnMetadataVec column_metadata;
column_metadata.push_back(ColumnMetadata("rpc_address", inet_data_type));
column_metadata.push_back(ColumnMetadata("data_center", varchar_data_type));
column_metadata.push_back(ColumnMetadata("rack", varchar_data_type));
column_metadata.push_back(ColumnMetadata("release_version", varchar_data_type));
Expand All @@ -318,7 +326,7 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens,
ColumnMetadata("tokens", CollectionType::list(varchar_data_type, true)));

RowResultResponseBuilder builder(column_metadata);
builder.append_local_peers_row_v3(tokens, partitioner, dc, rack, release_version);
builder.append_local_peers_row_v3(address, tokens, partitioner, dc, rack, release_version);

host->set(&builder.finish()->first_row(), true);

Expand Down
10 changes: 5 additions & 5 deletions tests/src/unit/tests/test_token_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ TEST(TokenMapUnitTest, Murmur3MultipleTokensPerHost) {
TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
TestTokenMap<Murmur3Partitioner> test_murmur3;

size_t num_dcs = 3;
size_t num_racks = 3;
size_t num_hosts = 4;
size_t num_dcs = 2;
size_t num_racks = 1;
size_t num_hosts = 27;
size_t num_vnodes = 256;
size_t replication_factor = 3;
size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs;
size_t replication_factor = 54;
size_t total_replicas = replication_factor;
Copy link
Contributor Author

@mpenick mpenick Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should likely be reverted. This is a pathological use case though.


ReplicationMap replication;
MT19937_64 rng;
Expand Down