diff --git a/Jenkinsfile b/Jenkinsfile index ccce7253b..d045db369 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,6 +1,11 @@ #!groovy import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException +def get_os_distro() { + return sh(label: 'Assign env.OS_DISTRO based on OS env', script: '''#!/bin/bash -le + echo ${OS_DISTRO}''', returnStdout: true).trim() +} + def initializeEnvironment() { env.DRIVER_DISPLAY_NAME = 'Cassandra C/C++ Driver' env.DRIVER_TYPE = 'CASS' @@ -545,7 +550,10 @@ pipeline { post { success { // Allow empty results for 'osx/high-sierra' which doesn't produce packages - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/libuv*", allowEmptyArchive: true + script { + def distro = get_os_distro() + archiveArtifacts artifacts: "${distro}/**/libuv*", allowEmptyArchive: true + } } } } @@ -555,12 +563,18 @@ pipeline { } post { success { - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/cassandra-*-tests" - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/dse-*-tests", allowEmptyArchive: true + script { + def distro = get_os_distro() + archiveArtifacts artifacts: "${distro}/**/cassandra-*-tests" + archiveArtifacts artifacts: "${distro}/**/dse-*-tests", allowEmptyArchive: true + } } failure { - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeOutput.log" - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeError.log" + script { + def distro = get_os_distro() + archiveArtifacts artifacts: "${distro}/**/CMakeOutput.log" + archiveArtifacts artifacts: "${distro}/**/CMakeError.log" + } } } } @@ -590,7 +604,10 @@ pipeline { } post { success { - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/*-cpp-driver*" + script { + def distro = get_os_distro() + archiveArtifacts artifacts: "${distro}/**/*-cpp-driver*" + } } } } @@ -748,8 +765,11 @@ pipeline { } post { failure { - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeOutput.log" - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/CMakeError.log" + script { + def distro = get_os_distro() + archiveArtifacts artifacts: "${distro}/**/CMakeOutput.log" + archiveArtifacts artifacts: "${distro}/**/CMakeError.log" + } } } } @@ -764,7 +784,10 @@ pipeline { junit testResults: '*integration-tests-*-results.xml', allowEmptyResults: true } failure { - archiveArtifacts artifacts: "${env.OS_DISTRO}/**/*-integration-tests-driver-logs.tgz" + script { + def distro = get_os_distro() + archiveArtifacts artifacts: "${distro}/**/*-integration-tests-driver-logs.tgz" + } } cleanup { cleanWs() diff --git a/src/host.hpp b/src/host.hpp index 28665aeed..bbae4610f 100644 --- a/src/host.hpp +++ b/src/host.hpp @@ -304,4 +304,5 @@ bool remove_host(CopyOnWriteHostVec& hosts, const Address& address); }}} // namespace datastax::internal::core + #endif diff --git a/src/token_map_impl.hpp b/src/token_map_impl.hpp index 8596278d1..b404bb2b0 100644 --- a/src/token_map_impl.hpp +++ b/src/token_map_impl.hpp @@ -435,6 +435,9 @@ void ReplicationStrategy::build_replicas_network_topology( CopyOnWriteHostVec replicas(new HostVec()); replicas->reserve(num_replicas); + AddressSet replicas_set; + replicas_set.resize(num_replicas); + // Clear datacenter and rack information for the next token for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end(); j != end; ++j) { @@ -444,7 +447,7 @@ void ReplicationStrategy::build_replicas_network_topology( } for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end(); - j != end && replicas->size() < num_replicas; ++j) { + j != end && replicas_set.size() < num_replicas; ++j) { typename TokenHostVec::const_iterator curr_token_it = token_it; Host* host = curr_token_it->second; uint32_t dc = host->dc_id(); @@ -476,7 +479,8 @@ void ReplicationStrategy::build_replicas_network_topology( // datacenter only then consider hosts in the same rack if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) { - if (add_replica(replicas, Host::Ptr(host))) { + if (replicas_set.insert(host->address()).second) { + replicas->push_back(Host::Ptr(host)); ++replica_count_this_dc; } } else { @@ -484,7 +488,8 @@ void ReplicationStrategy::build_replicas_network_topology( if (racks_observed_this_dc.count(rack) > 0) { skipped_endpoints_this_dc.push_back(curr_token_it); } else { - if (add_replica(replicas, Host::Ptr(host))) { + if (replicas_set.insert(host->address()).second) { + replicas->push_back(Host::Ptr(host)); ++replica_count_this_dc; racks_observed_this_dc.insert(rack); } @@ -494,7 +499,8 @@ void ReplicationStrategy::build_replicas_network_topology( if (racks_observed_this_dc.size() == rack_count_this_dc) { while (!skipped_endpoints_this_dc.empty() && replica_count_this_dc < replication_factor) { - if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) { + if (replicas_set.insert(skipped_endpoints_this_dc.front()->second->address()).second) { + replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second)); ++replica_count_this_dc; } skipped_endpoints_this_dc.pop_front(); diff --git a/tests/src/unit/test_token_map_utils.hpp b/tests/src/unit/test_token_map_utils.hpp index 9a492f0a0..bee3ba491 100644 --- a/tests/src/unit/test_token_map_utils.hpp +++ b/tests/src/unit/test_token_map_utils.hpp @@ -79,6 +79,8 @@ class BufferBuilder { static size_t size_of(const String& value) { return value.size(); } + static size_t size_of(const Address& value) { char buf[16]; return value.to_inet(buf); } + static void encode(char* buf, uint16_t value) { datastax::internal::encode_uint16(buf, value); } static void encode(char* buf, int32_t value) { datastax::internal::encode_int32(buf, value); } @@ -87,6 +89,8 @@ class BufferBuilder { static void encode(char* buf, const String& value) { memcpy(buf, value.data(), value.size()); } + static void encode(char* buf, const Address& value) { value.to_inet(buf); } + private: String buffer_; }; @@ -154,9 +158,11 @@ class RowResultResponseBuilder : protected BufferBuilder { ++row_count_; } - void append_local_peers_row_v3(const TokenVec& tokens, const String& partitioner, + void append_local_peers_row_v3(const Address& rpc_address, + const TokenVec& tokens, const String& partitioner, const String& dc, const String& rack, const String& release_version) { + append_value
(rpc_address); append_value(rack); append_value(dc); append_value(release_version); @@ -306,8 +312,10 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens, Host::Ptr host(new Host(address)); DataType::ConstPtr varchar_data_type(new DataType(CASS_VALUE_TYPE_VARCHAR)); + DataType::ConstPtr inet_data_type(new DataType(CASS_VALUE_TYPE_INET)); ColumnMetadataVec column_metadata; + column_metadata.push_back(ColumnMetadata("rpc_address", inet_data_type)); column_metadata.push_back(ColumnMetadata("data_center", varchar_data_type)); column_metadata.push_back(ColumnMetadata("rack", varchar_data_type)); column_metadata.push_back(ColumnMetadata("release_version", varchar_data_type)); @@ -318,7 +326,7 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens, ColumnMetadata("tokens", CollectionType::list(varchar_data_type, true))); RowResultResponseBuilder builder(column_metadata); - builder.append_local_peers_row_v3(tokens, partitioner, dc, rack, release_version); + builder.append_local_peers_row_v3(address, tokens, partitioner, dc, rack, release_version); host->set(&builder.finish()->first_row(), true); diff --git a/tests/src/unit/tests/test_token_map.cpp b/tests/src/unit/tests/test_token_map.cpp index 3487cb544..9aa4717de 100644 --- a/tests/src/unit/tests/test_token_map.cpp +++ b/tests/src/unit/tests/test_token_map.cpp @@ -129,12 +129,12 @@ TEST(TokenMapUnitTest, Murmur3MultipleTokensPerHost) { TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { TestTokenMap test_murmur3; - size_t num_dcs = 3; - size_t num_racks = 3; - size_t num_hosts = 4; + size_t num_dcs = 2; + size_t num_racks = 1; + size_t num_hosts = 27; size_t num_vnodes = 256; - size_t replication_factor = 3; - size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs; + size_t replication_factor = 54; + size_t total_replicas = replication_factor; ReplicationMap replication; MT19937_64 rng;