diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index 2d9ffc42..489d8a27 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -22,7 +22,6 @@ #include "ConnectionPool.h" #include "LogUtils.h" #include "NamespaceName.h" -#include "ServiceNameResolver.h" #include "TopicName.h" DECLARE_LOG_OBJECT() diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index a3c059e4..6132825d 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: - BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool, + BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool, const ClientConfiguration& clientConfiguration) - : serviceNameResolver_(serviceNameResolver), + : serviceNameResolver_(serviceUrl), cnxPool_(pool), listenerName_(clientConfiguration.getListenerName()), maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {} @@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; + ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; } + protected: // Mark findBroker as protected to make it accessible from test. LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, @@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { std::mutex mutex_; uint64_t requestIdGenerator_ = 0; - ServiceNameResolver& serviceNameResolver_; + ServiceNameResolver serviceNameResolver_; ConnectionPool& cnxPool_; std::string listenerName_; const int32_t maxLookupRedirects_; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 00041b2a..0beb739c 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver:: LOG_INFO(cnxString_ << "Connected to broker"); } else { LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ - << ", proxy: " << proxyServiceUrl_); + << ", proxy: " << proxyServiceUrl_ + << ", physical address:" << physicalAddress_); } Lock lock(mutex_); @@ -945,6 +946,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { handleError(incomingCmd.error()); break; + case BaseCommand::TOPIC_MIGRATED: + handleTopicMigrated(incomingCmd.topicmigrated()); + break; + case BaseCommand::CLOSE_PRODUCER: handleCloseProducer(incomingCmd.close_producer()); break; @@ -1761,6 +1766,56 @@ void ClientConnection::handleError(const proto::CommandError& error) { } } +std::string ClientConnection::getMigratedBrokerServiceUrl( + const proto::CommandTopicMigrated& commandTopicMigrated) { + if (tlsSocket_) { + if (commandTopicMigrated.has_brokerserviceurltls()) { + return commandTopicMigrated.brokerserviceurltls(); + } + } else if (commandTopicMigrated.has_brokerserviceurl()) { + return commandTopicMigrated.brokerserviceurl(); + } + return ""; +} + +void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) { + const long resourceId = commandTopicMigrated.resource_id(); + const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated); + + if (migratedBrokerServiceUrl.empty()) { + LOG_WARN("Failed to find the migrated broker url for resource:" + << resourceId + << (commandTopicMigrated.has_brokerserviceurl() + ? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl() + : "") + << (commandTopicMigrated.has_brokerserviceurltls() + ? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls() + : "")); + return; + } + + Lock lock(mutex_); + if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) { + auto it = producers_.find(resourceId); + if (it != producers_.end()) { + auto producer = it->second.lock(); + producer->setRedirectedClusterURI(migratedBrokerServiceUrl); + LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl); + } else { + LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId); + } + } else { + auto it = consumers_.find(resourceId); + if (it != consumers_.end()) { + auto consumer = it->second.lock(); + consumer->setRedirectedClusterURI(migratedBrokerServiceUrl); + LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl); + } else { + LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId); + } + } +} + boost::optional ClientConnection::getAssignedBrokerServiceUrl( const proto::CommandCloseProducer& closeProducer) { if (tlsSocket_) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index b16fc694..3c83b4db 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -85,6 +85,7 @@ class BrokerEntryMetadata; class CommandActiveConsumerChange; class CommandAckResponse; class CommandMessage; +class CommandTopicMigrated; class CommandCloseConsumer; class CommandCloseProducer; class CommandConnected; @@ -414,6 +415,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this getAssignedBrokerServiceUrl( - const proto::CommandCloseProducer& closeProducer); - boost::optional getAssignedBrokerServiceUrl( - const proto::CommandCloseConsumer& closeConsumer); + boost::optional getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&); + boost::optional getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&); + std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&); }; } // namespace pulsar diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index ae339731..3d19c426 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -79,8 +79,8 @@ typedef std::vector StringList; ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration) : mutex_(), state_(Open), - serviceNameResolver_(serviceUrl), - clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())), + clientConfiguration_(ClientConfiguration(clientConfiguration) + .setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))), memoryLimitController_(clientConfiguration.getMemoryLimit()), ioExecutorProvider_(std::make_shared(clientConfiguration_.getIOThreads())), listenerExecutorProvider_( @@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& if (loggerFactory) { LogUtils::setLoggerFactory(std::move(loggerFactory)); } + lookupServicePtr_ = createLookup(serviceUrl); +} + +ClientImpl::~ClientImpl() { shutdown(); } +LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) { LookupServicePtr underlyingLookupServicePtr; - if (serviceNameResolver_.useHttp()) { + if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) { LOG_DEBUG("Using HTTP Lookup"); underlyingLookupServicePtr = std::make_shared( - std::ref(serviceNameResolver_), std::cref(clientConfiguration_), - std::cref(clientConfiguration_.getAuthPtr())); + serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr())); } else { LOG_DEBUG("Using Binary Lookup"); underlyingLookupServicePtr = std::make_shared( - std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_)); + serviceUrl, std::ref(pool_), std::cref(clientConfiguration_)); } - lookupServicePtr_ = RetryableLookupService::create( + auto lookupServicePtr = RetryableLookupService::create( underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); + return lookupServicePtr; } -ClientImpl::~ClientImpl() { shutdown(); } - const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; } MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; } @@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { return partitionListenerExecutorProvider_; } -LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; } +LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) { + if (redirectedClusterURI.empty()) { + return lookupServicePtr_; + } + + Lock lock(mutex_); + auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI); + if (it == redirectedClusterLookupServicePtrs_.end()) { + auto lookup = createLookup(redirectedClusterURI); + redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup); + return lookup; + } + + return it->second; +} void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf, CreateProducerCallback callback, bool autoDownloadSchema) { @@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co } } -GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) { +GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI, + const std::string& topic, size_t key) { Promise promise; const auto topicNamePtr = TopicName::get(topic); @@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k } auto self = shared_from_this(); - lookupServicePtr_->getBroker(*topicNamePtr) + getLookup(redirectedClusterURI) + ->getBroker(*topicNamePtr) .addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) { if (result != ResultOk) { promise.setFailed(result); @@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k return promise.getFuture(); } -const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) { +const std::string& ClientImpl::getPhysicalAddress(const std::string& redirectedClusterURI, + const std::string& logicalAddress) { if (useProxy_) { - return serviceNameResolver_.resolveHost(); + return getLookup(redirectedClusterURI)->getServiceNameResolver().resolveHost(); } else { return logicalAddress; } } -GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) { - const auto& physicalAddress = getPhysicalAddress(logicalAddress); +GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI, + const std::string& logicalAddress, size_t key) { + const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress); Promise promise; pool_.getConnectionAsync(logicalAddress, physicalAddress, key) .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) { @@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) { memoryLimitController_.close(); lookupServicePtr_->close(); + for (const auto& it : redirectedClusterLookupServicePtrs_) { + it.second->close(); + } auto producers = producers_.move(); auto consumers = consumers_.move(); diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 7126542b..27cde3a1 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -97,9 +97,11 @@ class ClientImpl : public std::enable_shared_from_this { void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback); // Use virtual method to test - virtual GetConnectionFuture getConnection(const std::string& topic, size_t key); + virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI, + const std::string& topic, size_t key); - GetConnectionFuture connect(const std::string& logicalAddress, size_t key); + GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress, + size_t key); void closeAsync(CloseCallback callback); void shutdown(); @@ -119,7 +121,7 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr getIOExecutorProvider(); ExecutorServiceProviderPtr getListenerExecutorProvider(); ExecutorServiceProviderPtr getPartitionListenerExecutorProvider(); - LookupServicePtr getLookup(); + LookupServicePtr getLookup(const std::string& redirectedClusterURI = ""); void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); } @@ -165,7 +167,10 @@ class ClientImpl : public std::enable_shared_from_this { const std::string& consumerName, const ConsumerConfiguration& conf, SubscribeCallback callback); - const std::string& getPhysicalAddress(const std::string& logicalAddress); + const std::string& getPhysicalAddress(const std::string& redirectedClusterURI, + const std::string& logicalAddress); + + LookupServicePtr createLookup(const std::string& serviceUrl); static std::string getClientVersion(const ClientConfiguration& clientConfiguration); @@ -179,7 +184,6 @@ class ClientImpl : public std::enable_shared_from_this { std::mutex mutex_; State state_; - ServiceNameResolver serviceNameResolver_; ClientConfiguration clientConfiguration_; MemoryLimitController memoryLimitController_; @@ -188,6 +192,7 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr partitionListenerExecutorProvider_; LookupServicePtr lookupServicePtr_; + std::unordered_map redirectedClusterLookupServicePtrs_; ConnectionPool pool_; uint64_t producerIdGenerator_; diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 0959af2a..93b9db44 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -46,11 +46,11 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/"; const static std::string PARTITION_METHOD_NAME = "partitions"; const static int NUMBER_OF_LOOKUP_THREADS = 1; -HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver, +HTTPLookupService::HTTPLookupService(const std::string &serviceUrl, const ClientConfiguration &clientConfiguration, const AuthenticationPtr &authData) : executorProvider_(std::make_shared(NUMBER_OF_LOOKUP_THREADS)), - serviceNameResolver_(serviceNameResolver), + serviceNameResolver_(serviceUrl), authenticationPtr_(authData), lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()), maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()), diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index cf0b0ad1..17dd110e 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -40,7 +40,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t typedef Promise LookupPromise; ExecutorServiceProviderPtr executorProvider_; - ServiceNameResolver& serviceNameResolver_; + ServiceNameResolver serviceNameResolver_; AuthenticationPtr authenticationPtr_; int lookupTimeoutInSeconds_; const int maxLookupRedirects_; @@ -64,7 +64,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode); public: - HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&); + HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&); LookupResultFuture getBroker(const TopicName& topicName) override; @@ -74,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; + + ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; } }; } // namespace pulsar diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 52e20d2d..46b918f6 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -43,7 +43,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, epoch_(0), timer_(executor_->createDeadlineTimer()), creationTimer_(executor_->createDeadlineTimer()), - reconnectionPending_(false) {} + reconnectionPending_(false), + redirectedClusterURI_("") {} HandlerBase::~HandlerBase() { ASIO_ERROR ignored; @@ -88,9 +89,9 @@ void HandlerBase::grabCnx() { grabCnx(boost::none); } Future HandlerBase::getConnection( const ClientImplPtr& client, const boost::optional& assignedBrokerUrl) { if (assignedBrokerUrl && client->getLookupCount() > 0) { - return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_); + return client->connect(getRedirectedClusterURI(), assignedBrokerUrl.get(), connectionKeySuffix_); } else { - return client->getConnection(topic(), connectionKeySuffix_); + return client->getConnection(getRedirectedClusterURI(), topic(), connectionKeySuffix_); } } @@ -209,4 +210,13 @@ Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimest } } +void HandlerBase::setRedirectedClusterURI(const std::string& serviceUrl) { + Lock lock(mutex_); + redirectedClusterURI_ = serviceUrl; +} +const std::string& HandlerBase::getRedirectedClusterURI() { + Lock lock(mutex_); + return redirectedClusterURI_; +} + } // namespace pulsar diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 32e124a2..415e234c 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -52,6 +52,8 @@ class HandlerBase : public std::enable_shared_from_this { ClientConnectionWeakPtr getCnx() const; void setCnx(const ClientConnectionPtr& cnx); void resetCnx() { setCnx(nullptr); } + void setRedirectedClusterURI(const std::string& serviceUrl); + const std::string& getRedirectedClusterURI(); protected: /* @@ -145,6 +147,8 @@ class HandlerBase : public std::enable_shared_from_this { mutable std::mutex connectionMutex_; std::atomic reconnectionPending_; ClientConnectionWeakPtr connection_; + std::string redirectedClusterURI_; + friend class ClientConnection; friend class PulsarFriend; }; diff --git a/lib/LookupService.h b/lib/LookupService.h index b50d1f82..684984fc 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -29,6 +29,7 @@ #include "Future.h" #include "LookupDataResult.h" #include "ProtoApiEnums.h" +#include "ServiceNameResolver.h" namespace pulsar { using NamespaceTopicsPtr = std::shared_ptr>; @@ -86,6 +87,8 @@ class LookupService { virtual Future getSchema(const TopicNamePtr& topicName, const std::string& version = "") = 0; + virtual ServiceNameResolver& getServiceNameResolver() = 0; + virtual ~LookupService() {} virtual void close() {} diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 890d476c..f84c255e 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -144,7 +144,11 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c return promise.getFuture(); } + LOG_INFO("Creating producer for topic:" << topic() << ", producerName:" << producerName_ << " on " + << cnx->cnxString()); ClientImplPtr client = client_.lock(); + cnx->registerProducer(producerId_, shared_from_this()); + int requestId = client->newRequestId(); SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId, @@ -214,7 +218,6 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result // set the cnx pointer so that new messages will be sent immediately LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString()); - cnx->registerProducer(producerId_, shared_from_this()); producerName_ = responseData.producerName; schemaVersion_ = responseData.schemaVersion; producerStr_ = "[" + topic() + ", " + producerName_ + "] "; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 561855f9..8bc40bf3 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -76,6 +76,10 @@ class RetryableLookupService : public LookupService { }); } + ServiceNameResolver& getServiceNameResolver() override { + return lookupService_->getServiceNameResolver(); + } + private: const std::shared_ptr lookupService_; RetryableOperationCachePtr lookupCache_; diff --git a/lib/ServiceNameResolver.h b/lib/ServiceNameResolver.h index 8457d0e1..e6b2523b 100644 --- a/lib/ServiceNameResolver.h +++ b/lib/ServiceNameResolver.h @@ -36,14 +36,17 @@ class ServiceNameResolver { ServiceNameResolver(const ServiceNameResolver&) = delete; ServiceNameResolver& operator=(const ServiceNameResolver&) = delete; - bool useTls() const noexcept { - return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL || - serviceUri_.getScheme() == PulsarScheme::HTTPS; + bool useTls() const noexcept { return useTls(serviceUri_); } + + static bool useTls(const ServiceURI& serviceUri) noexcept { + return serviceUri.getScheme() == PulsarScheme::PULSAR_SSL || + serviceUri.getScheme() == PulsarScheme::HTTPS; } - bool useHttp() const noexcept { - return serviceUri_.getScheme() == PulsarScheme::HTTP || - serviceUri_.getScheme() == PulsarScheme::HTTPS; + bool useHttp() const noexcept { return useTls(serviceUri_); } + + static bool useHttp(const ServiceURI& serviceUri) noexcept { + return serviceUri.getScheme() == PulsarScheme::HTTP || serviceUri.getScheme() == PulsarScheme::HTTPS; } const std::string& resolveHost() { diff --git a/proto/PulsarApi.proto b/proto/PulsarApi.proto index a2548f3a..4e207913 100644 --- a/proto/PulsarApi.proto +++ b/proto/PulsarApi.proto @@ -262,6 +262,7 @@ enum ProtocolVersion { v17 = 17; // Added support ack receipt v18 = 18; // Add client support for broker entry metadata v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse + v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated } message CommandConnect { @@ -620,6 +621,18 @@ message CommandReachedEndOfTopic { required uint64 consumer_id = 1; } +message CommandTopicMigrated { + enum ResourceType { + Producer = 0; + Consumer = 1; + } + required uint64 resource_id = 1; + required ResourceType resource_type = 2; + optional string brokerServiceUrl = 3; + optional string brokerServiceUrlTls = 4; + +} + message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; @@ -1029,6 +1042,7 @@ message BaseCommand { WATCH_TOPIC_UPDATE = 66; WATCH_TOPIC_LIST_CLOSE = 67; + TOPIC_MIGRATED = 68; } @@ -1110,4 +1124,6 @@ message BaseCommand { optional CommandWatchTopicListSuccess watchTopicListSuccess = 65; optional CommandWatchTopicUpdate watchTopicUpdate = 66; optional CommandWatchTopicListClose watchTopicListClose = 67; + + optional CommandTopicMigrated topicMigrated = 68; } diff --git a/run-unit-tests.sh b/run-unit-tests.sh index 226789c5..698ca62c 100755 --- a/run-unit-tests.sh +++ b/run-unit-tests.sh @@ -33,9 +33,12 @@ export https_proxy= # Run ExtensibleLoadManager tests docker compose -f tests/extensibleLM/docker-compose.yml up -d +docker compose -f tests/blue-green/docker-compose.yml up -d until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done +until curl http://localhost:8081/metrics > /dev/null 2>&1 ; do sleep 1; done sleep 5 $CMAKE_BUILD_DIRECTORY/tests/ExtensibleLoadManagerTest +docker compose -f tests/blue-green/docker-compose.yml down docker compose -f tests/extensibleLM/docker-compose.yml down # Run OAuth2 tests diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 1e8e67d5..b5142860 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -443,7 +443,8 @@ TEST(ClientTest, testRetryUntilSucceed) { EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2); std::atomic_int count{0}; ON_CALL(*clientImpl, getConnection) - .WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) { + .WillByDefault([&clientImpl, &count](const std::string &redirectedClusterURI, + const std::string &topic, size_t index) { if (count++ < kFailCount) { return GetConnectionFuture::failed(ResultRetryable); } @@ -461,9 +462,10 @@ TEST(ClientTest, testRetryTimeout) { auto clientImpl = std::make_shared(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2)); EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2)); - ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) { - return GetConnectionFuture::failed(ResultRetryable); - }); + ON_CALL(*clientImpl, getConnection) + .WillByDefault([](const std::string &redirectedClusterURI, const std::string &topic, size_t index) { + return GetConnectionFuture::failed(ResultRetryable); + }); auto topic = "client-test-retry-timeout"; { @@ -484,9 +486,10 @@ TEST(ClientTest, testNoRetry) { auto clientImpl = std::make_shared(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100)); EXPECT_CALL(*clientImpl, getConnection).Times(2); - ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) { - return GetConnectionFuture::failed(ResultAuthenticationError); - }); + ON_CALL(*clientImpl, getConnection) + .WillByDefault([](const std::string &redirectedClusterURI, const std::string &, size_t) { + return GetConnectionFuture::failed(ResultAuthenticationError); + }); auto topic = "client-test-no-retry"; { diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 0fe13851..924acd43 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -84,8 +84,7 @@ TEST(LookupServiceTest, basicLookup) { ClientConfiguration conf; ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); ConnectionPool pool_(conf, ioExecutorProvider_, authData, ""); - ServiceNameResolver serviceNameResolver(url); - BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf); + BinaryProtoLookupService lookupService(url, pool_, conf); TopicNamePtr topicName = TopicName::get("topic"); @@ -148,26 +147,24 @@ static void testMultiAddresses(LookupService& lookupService) { TEST(LookupServiceTest, testMultiAddresses) { ConnectionPool pool({}, std::make_shared(1), AuthFactory::Disabled(), ""); - ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999"); ClientConfiguration conf; - BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, conf); + BinaryProtoLookupService binaryLookupService("pulsar://localhost,localhost:9999", pool, conf); testMultiAddresses(binaryLookupService); // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test - ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999"); auto httpLookupServicePtr = std::make_shared( - std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled()); + "http://localhost,localhost:9999", ClientConfiguration{}, AuthFactory::Disabled()); testMultiAddresses(*httpLookupServicePtr); } TEST(LookupServiceTest, testRetry) { auto executorProvider = std::make_shared(1); ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); - ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost"); ClientConfiguration conf; auto lookupService = RetryableLookupService::create( - std::make_shared(serviceNameResolver, pool, conf), std::chrono::seconds(30), - executorProvider); + std::make_shared("pulsar://localhost:9999,localhost", pool, conf), + std::chrono::seconds(30), executorProvider); + ServiceNameResolver& serviceNameResolver = lookupService->getServiceNameResolver(); PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); auto topicNamePtr = TopicName::get("lookup-service-test-retry"); @@ -196,12 +193,12 @@ TEST(LookupServiceTest, testRetry) { TEST(LookupServiceTest, testTimeout) { auto executorProvider = std::make_shared(1); ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), ""); - ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904"); ClientConfiguration conf; constexpr int timeoutInSeconds = 2; auto lookupService = RetryableLookupService::create( - std::make_shared(serviceNameResolver, pool, conf), + std::make_shared("pulsar://localhost:9990,localhost:9902,localhost:9904", + pool, conf), std::chrono::seconds(timeoutInSeconds), executorProvider); auto topicNamePtr = TopicName::get("lookup-service-test-retry"); @@ -467,9 +464,9 @@ INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLook class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService { public: - BinaryProtoLookupServiceRedirectTestHelper(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool, + BinaryProtoLookupServiceRedirectTestHelper(const std::string& serviceUrl, ConnectionPool& pool, const ClientConfiguration& clientConfiguration) - : BinaryProtoLookupService(serviceNameResolver, pool, clientConfiguration) {} + : BinaryProtoLookupService(serviceUrl, pool, clientConfiguration) {} LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, size_t redirectCount) { @@ -484,14 +481,13 @@ TEST(LookupServiceTest, testRedirectionLimit) { conf.setMaxLookupRedirects(redirect_limit); ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); ConnectionPool pool_(conf, ioExecutorProvider_, authData, ""); - std::string url = "pulsar://localhost:6650"; - ServiceNameResolver serviceNameResolver(url); - BinaryProtoLookupServiceRedirectTestHelper lookupService(serviceNameResolver, pool_, conf); + string url = "pulsar://localhost:6650"; + BinaryProtoLookupServiceRedirectTestHelper lookupService(url, pool_, conf); const auto topicNamePtr = TopicName::get("topic"); for (auto idx = 0; idx < redirect_limit + 5; ++idx) { - auto future = - lookupService.findBroker(serviceNameResolver.resolveHost(), false, topicNamePtr->toString(), idx); + auto future = lookupService.findBroker(lookupService.getServiceNameResolver().resolveHost(), false, + topicNamePtr->toString(), idx); LookupService::LookupResult lookupResult; auto result = future.get(lookupResult); diff --git a/tests/MockClientImpl.h b/tests/MockClientImpl.h index aa4208a9..074808fc 100644 --- a/tests/MockClientImpl.h +++ b/tests/MockClientImpl.h @@ -39,8 +39,8 @@ class MockClientImpl : public ClientImpl { MockClientImpl(const std::string& serviceUrl, ClientConfiguration conf = {}) : ClientImpl(serviceUrl, conf) {} - MOCK_METHOD((Future), getConnection, (const std::string&, size_t), - (override)); + MOCK_METHOD((Future), getConnection, + (const std::string&, const std::string&, size_t), (override)); SyncOpResult createProducer(const std::string& topic) { using namespace std::chrono; @@ -65,7 +65,7 @@ class MockClientImpl : public ClientImpl { } GetConnectionFuture getConnectionReal(const std::string& topic, size_t key) { - return ClientImpl::getConnection(topic, key); + return ClientImpl::getConnection("", topic, key); } Result close() { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 73778842..bfa11ef1 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -26,6 +26,7 @@ #include "lib/ClientImpl.h" #include "lib/ConsumerConfigurationImpl.h" #include "lib/ConsumerImpl.h" +#include "lib/LookupService.h" #include "lib/MessageImpl.h" #include "lib/MultiTopicsConsumerImpl.h" #include "lib/NamespaceName.h" @@ -38,6 +39,7 @@ using std::string; namespace pulsar { +using ClientConnectionWeakPtr = std::weak_ptr; class PulsarFriend { public: static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) { @@ -166,6 +168,14 @@ class PulsarFriend { static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; } + static std::string getConnectionPhysicalAddress(HandlerBase& handler) { + auto cnx = handler.connection_.lock(); + if (cnx) { + return cnx->physicalAddress_; + } + return ""; + } + static void setClientConnection(HandlerBase& handler, ClientConnectionWeakPtr conn) { handler.connection_ = conn; } @@ -177,7 +187,7 @@ class PulsarFriend { static void setServiceUrlIndex(ServiceNameResolver& resolver, size_t index) { resolver.index_ = index; } static void setServiceUrlIndex(const Client& client, size_t index) { - setServiceUrlIndex(client.impl_->serviceNameResolver_, index); + setServiceUrlIndex(client.impl_->lookupServicePtr_->getServiceNameResolver(), index); } static proto::MessageMetadata& getMessageMetadata(Message& message) { return message.impl_->metadata; } diff --git a/tests/blue-green/docker-compose.yml b/tests/blue-green/docker-compose.yml new file mode 100644 index 00000000..b2d22ebf --- /dev/null +++ b/tests/blue-green/docker-compose.yml @@ -0,0 +1,152 @@ +version: '3' +networks: + green-pulsar: + driver: bridge +services: + # Start ZooKeeper + zookeeper: + image: apachepulsar/pulsar:latest + container_name: green-zookeeper + restart: on-failure + networks: + - green-pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + command: > + bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \ + bin/generate-zookeeper-config.sh conf/zookeeper.conf && \ + exec bin/pulsar zookeeper" + healthcheck: + test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"] + interval: 10s + timeout: 5s + retries: 30 + + # Initialize cluster metadata + pulsar-init: + container_name: green-pulsar-init + hostname: pulsar-init + image: apachepulsar/pulsar:latest + networks: + - green-pulsar + environment: + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + command: > + bin/pulsar initialize-cluster-metadata \ + --cluster cluster-a \ + --zookeeper zookeeper:2181 \ + --configuration-store zookeeper:2181 \ + --web-service-url http://broker-1:8080 \ + --broker-service-url pulsar://broker-1:6650 + depends_on: + zookeeper: + condition: service_healthy + + # Start bookie + bookie: + image: apachepulsar/pulsar:latest + container_name: green-bookie + restart: on-failure + networks: + - green-pulsar + environment: + - clusterName=cluster-a + - zkServers=zookeeper:2181 + - metadataServiceUri=metadata-store:zk:zookeeper:2181 + - advertisedAddress=bookie + - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + depends_on: + zookeeper: + condition: service_healthy + pulsar-init: + condition: service_completed_successfully + command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie" + + proxy: + image: apachepulsar/pulsar:latest + container_name: green-proxy + hostname: proxy + restart: on-failure + networks: + - green-pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + ports: + - "8081:8080" + - "6651:6650" + depends_on: + broker-1: + condition: service_started + broker-2: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy" + + # Start broker 1 + broker-1: + image: apachepulsar/pulsar:latest + container_name: green-broker-1 + hostname: broker-1 + restart: on-failure + networks: + - green-pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=green-broker-1 + - internalListenerName=internal + - advertisedListeners=internal:pulsar://green-broker-1:6650 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m + # Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode. + - loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl + - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder + - loadBalancerSheddingEnabled=false + - loadBalancerDebugModeEnabled=true + - brokerServiceCompactionThresholdInBytes=1000000 + - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" + + # Start broker 2 + broker-2: + image: apachepulsar/pulsar:latest + container_name: green-broker-2 + hostname: broker-2 + restart: on-failure + networks: + - green-pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=green-broker-2 + - internalListenerName=internal + - advertisedListeners=internal:pulsar://green-broker-2:6650 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m + # Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode. + - loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl + - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder + - loadBalancerSheddingEnabled=false + - loadBalancerDebugModeEnabled=true + - brokerServiceCompactionThresholdInBytes=1000000 + - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" \ No newline at end of file diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc b/tests/extensibleLM/ExtensibleLoadManagerTest.cc index a6379f6e..f4e2c81c 100644 --- a/tests/extensibleLM/ExtensibleLoadManagerTest.cc +++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc @@ -20,6 +20,7 @@ #include #include +#include #include "include/pulsar/Client.h" #include "lib/LogUtils.h" @@ -35,21 +36,43 @@ bool checkTime() { const static auto start = std::chrono::high_resolution_clock::now(); auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(end - start).count(); - return duration < 180 * 1000; + return duration < 300 * 1000; } TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { - const static std::string adminUrl = "http://localhost:8080/"; - const static std::string topicName = - "persistent://public/unload-test/topic-1" + std::to_string(time(NULL)); + const static std::string blueAdminUrl = "http://localhost:8080/"; + const static std::string greenAdminUrl = "http://localhost:8081/"; + const static std::string topicNameSuffix = std::to_string(time(NULL)); + const static std::string topicName = "persistent://public/unload-test/topic-" + topicNameSuffix; ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { - std::string url = adminUrl + "admin/v2/namespaces/public/unload-test?bundles=1"; + std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=false"; + int res = makePostRequest(url, R"( + { + "serviceUrl": "http://localhost:8081", + "serviceUrlTls":"https://localhost:8085", + "brokerServiceUrl": "pulsar://localhost:6651", + "brokerServiceUrlTls": "pulsar+ssl://localhost:6655" + })"); + LOG_INFO("res:" << res); + return res == 200; + })); + + ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { + std::string url = blueAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1"; int res = makePutRequest(url, ""); return res == 204 || res == 409; })); - Client client{"pulsar://localhost:6650"}; + ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { + std::string url = greenAdminUrl + "admin/v2/namespaces/public/unload-test?bundles=1"; + int res = makePutRequest(url, ""); + return res == 204 || res == 409; + })); + + ClientConfiguration conf; + conf.setIOThreads(8); + Client client{"pulsar://localhost:6650", conf}; Producer producer; ProducerConfiguration producerConfiguration; Result producerResult = client.createProducer(topicName, producerConfiguration, producer); @@ -58,24 +81,25 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { Result consumerResult = client.subscribe(topicName, "sub", consumer); ASSERT_EQ(consumerResult, ResultOk); - Semaphore firstUnloadSemaphore(0); - Semaphore secondUnloadSemaphore(0); - Semaphore halfPubWaitSemaphore(0); - const int msgCount = 10; - int produced = 0; + Semaphore unloadSemaphore(0); + Semaphore pubWaitSemaphore(0); + Semaphore migrationSemaphore(0); + + const int msgCount = 20; + SynchronizedHashMap producedMsgs; auto produce = [&]() { int i = 0; while (i < msgCount && checkTime()) { - if (i == 3) { - firstUnloadSemaphore.acquire(); + if (i == 3 || i == 8 || i == 17) { + unloadSemaphore.acquire(); } - if (i == 5) { - halfPubWaitSemaphore.release(); + if (i == 5 || i == 15) { + pubWaitSemaphore.release(); } - if (i == 8) { - secondUnloadSemaphore.acquire(); + if (i == 12) { + migrationSemaphore.acquire(); } std::string content = std::to_string(i); @@ -86,37 +110,33 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { return sendResult == ResultOk; })); - LOG_INFO("produced index:" << i); - produced++; + LOG_INFO("produced i:" << i); + producedMsgs.emplace(i, i); i++; } LOG_INFO("producer finished"); }; - - int consumed = 0; + std::atomic stopConsumer(false); + SynchronizedHashMap consumedMsgs; auto consume = [&]() { Message receivedMsg; - int i = 0; - while (i < msgCount && checkTime()) { - ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { - Result receiveResult = - consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message - return receiveResult == ResultOk; - })); - LOG_INFO("received index:" << i); - - int id = std::stoi(receivedMsg.getDataAsString()); - if (id < i) { + while (checkTime()) { + if (stopConsumer && producedMsgs.size() == msgCount && consumedMsgs.size() == msgCount) { + break; + } + Result receiveResult = + consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message + if (receiveResult != ResultOk) { continue; } + int i = std::stoi(receivedMsg.getDataAsString()); + LOG_INFO("received i:" << i); ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { Result ackResult = consumer.acknowledge(receivedMsg); return ackResult == ResultOk; })); - LOG_INFO("acked index:" << i); - - consumed++; - i++; + LOG_INFO("acked i:" << i); + consumedMsgs.emplace(i, i); } LOG_INFO("consumer finished"); }; @@ -124,7 +144,8 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { std::thread produceThread(produce); std::thread consumeThread(consume); - auto unload = [&] { + auto unload = [&](bool migrated) { + const std::string &adminUrl = migrated ? greenAdminUrl : blueAdminUrl; auto clientImplPtr = PulsarFriend::getClientImplPtr(client); auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); auto &producerImpl = PulsarFriend::getProducerImpl(producer); @@ -135,15 +156,17 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { ASSERT_TRUE(waitUntil(std::chrono::seconds(30), [&] { return consumerImpl.isConnected() && producerImpl.isConnected(); })); - std::string url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic-1"; + std::string url = + adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix; std::string responseDataBeforeUnload; int res = makeGetRequest(url, responseDataBeforeUnload); if (res != 200) { continue; } - destinationBroker = responseDataBeforeUnload.find("broker-2") == std::string::npos - ? "broker-2:8080" - : "broker-1:8080"; + std::string prefix = migrated ? "green-" : ""; + destinationBroker = + prefix + (responseDataBeforeUnload.find("broker-2") == std::string::npos ? "broker-2:8080" + : "broker-1:8080"); lookupCountBeforeUnload = clientImplPtr->getLookupCount(); ASSERT_TRUE(lookupCountBeforeUnload > 0); @@ -163,31 +186,69 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { [&] { return consumerImpl.isConnected() && producerImpl.isConnected(); })); std::string responseDataAfterUnload; ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { - url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic-1"; + url = adminUrl + "lookup/v2/topic/persistent/public/unload-test/topic" + topicNameSuffix; res = makeGetRequest(url, responseDataAfterUnload); return res == 200 && responseDataAfterUnload.find(destinationBroker) != std::string::npos; })); LOG_INFO("after lookup responseData:" << responseDataAfterUnload << ",res:" << res); auto lookupCountAfterUnload = clientImplPtr->getLookupCount(); - ASSERT_EQ(lookupCountBeforeUnload, lookupCountAfterUnload); + if (lookupCountBeforeUnload != lookupCountAfterUnload) { + continue; + } break; } }; - LOG_INFO("starting first unload"); - unload(); - firstUnloadSemaphore.release(); - halfPubWaitSemaphore.acquire(); - LOG_INFO("starting second unload"); - unload(); - secondUnloadSemaphore.release(); + LOG_INFO("#### starting first unload ####"); + unload(false); + unloadSemaphore.release(); + pubWaitSemaphore.acquire(); + LOG_INFO("#### starting second unload ####"); + unload(false); + unloadSemaphore.release(); - produceThread.join(); + LOG_INFO("#### migrating the cluster ####"); + migrationSemaphore.release(); + ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { + std::string url = blueAdminUrl + "admin/v2/clusters/cluster-a/migrate?migrated=true"; + int res = makePostRequest(url, R"({ + "serviceUrl": "http://localhost:8081", + "serviceUrlTls":"https://localhost:8085", + "brokerServiceUrl": "pulsar://localhost:6651", + "brokerServiceUrlTls": "pulsar+ssl://localhost:6655" + })"); + LOG_INFO("res:" << res); + return res == 200; + })); + ASSERT_TRUE(waitUntil(std::chrono::seconds(130), [&] { + auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); + auto &producerImpl = PulsarFriend::getProducerImpl(producer); + auto consumerConnAddress = PulsarFriend::getConnectionPhysicalAddress(consumerImpl); + auto producerConnAddress = PulsarFriend::getConnectionPhysicalAddress(producerImpl); + return consumerImpl.isConnected() && producerImpl.isConnected() && + consumerConnAddress.find("6651") != std::string::npos && + producerConnAddress.find("6651") != std::string::npos; + })); + pubWaitSemaphore.acquire(); + LOG_INFO("#### starting third unload after migration ####"); + unload(true); + unloadSemaphore.release(); + + stopConsumer = true; consumeThread.join(); - ASSERT_EQ(consumed, msgCount); - ASSERT_EQ(produced, msgCount); - ASSERT_TRUE(checkTime()) << "timed out"; + produceThread.join(); + ASSERT_EQ(producedMsgs.size(), msgCount); + ASSERT_EQ(consumedMsgs.size(), msgCount); + for (int i = 0; i < msgCount; i++) { + producedMsgs.remove(i); + consumedMsgs.remove(i); + } + ASSERT_EQ(producedMsgs.size(), 0); + ASSERT_EQ(consumedMsgs.size(), 0); + client.close(); + + ASSERT_TRUE(checkTime()) << "timed out"; } int main(int argc, char *argv[]) { diff --git a/tests/extensibleLM/docker-compose.yml b/tests/extensibleLM/docker-compose.yml index 8d3c33a3..14876395 100644 --- a/tests/extensibleLM/docker-compose.yml +++ b/tests/extensibleLM/docker-compose.yml @@ -109,6 +109,8 @@ services: - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder - loadBalancerSheddingEnabled=false - loadBalancerDebugModeEnabled=true + - clusterMigrationCheckDurationSeconds=1 + - brokerServiceCompactionThresholdInBytes=1000000 - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 depends_on: zookeeper: @@ -141,6 +143,8 @@ services: - loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder - loadBalancerSheddingEnabled=false - loadBalancerDebugModeEnabled=true + - clusterMigrationCheckDurationSeconds=1 + - brokerServiceCompactionThresholdInBytes=1000000 - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 depends_on: zookeeper: