Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] PIP-188 Support blue-green migration #402

Merged
merged 3 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "ConnectionPool.h"
#include "LogUtils.h"
#include "NamespaceName.h"
#include "ServiceNameResolver.h"
#include "TopicName.h"

DECLARE_LOG_OBJECT()
Expand Down
8 changes: 5 additions & 3 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
const ClientConfiguration& clientConfiguration)
: serviceNameResolver_(serviceNameResolver),
: serviceNameResolver_(serviceUrl),
cnxPool_(pool),
listenerName_(clientConfiguration.getListenerName()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
Expand All @@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }

protected:
// Mark findBroker as protected to make it accessible from test.
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
Expand All @@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
std::mutex mutex_;
uint64_t requestIdGenerator_ = 0;

ServiceNameResolver& serviceNameResolver_;
ServiceNameResolver serviceNameResolver_;
ConnectionPool& cnxPool_;
std::string listenerName_;
const int32_t maxLookupRedirects_;
Expand Down
57 changes: 56 additions & 1 deletion lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
<< ", proxy: " << proxyServiceUrl_);
<< ", proxy: " << proxyServiceUrl_
<< ", physical address:" << physicalAddress_);
}

Lock lock(mutex_);
Expand Down Expand Up @@ -945,6 +946,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
handleError(incomingCmd.error());
break;

case BaseCommand::TOPIC_MIGRATED:
handleTopicMigrated(incomingCmd.topicmigrated());
break;

case BaseCommand::CLOSE_PRODUCER:
handleCloseProducer(incomingCmd.close_producer());
break;
Expand Down Expand Up @@ -1761,6 +1766,56 @@ void ClientConnection::handleError(const proto::CommandError& error) {
}
}

std::string ClientConnection::getMigratedBrokerServiceUrl(
const proto::CommandTopicMigrated& commandTopicMigrated) {
if (tlsSocket_) {
if (commandTopicMigrated.has_brokerserviceurltls()) {
return commandTopicMigrated.brokerserviceurltls();
}
} else if (commandTopicMigrated.has_brokerserviceurl()) {
return commandTopicMigrated.brokerserviceurl();
}
return "";
}

void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
const long resourceId = commandTopicMigrated.resource_id();
const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);

if (migratedBrokerServiceUrl.empty()) {
LOG_WARN("Failed to find the migrated broker url for resource:"
<< resourceId
<< (commandTopicMigrated.has_brokerserviceurl()
? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
: "")
<< (commandTopicMigrated.has_brokerserviceurltls()
? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
: ""));
return;
}

Lock lock(mutex_);
if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
auto it = producers_.find(resourceId);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
if (it != producers_.end()) {
auto producer = it->second.lock();
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
}
} else {
auto it = consumers_.find(resourceId);
if (it != consumers_.end()) {
auto consumer = it->second.lock();
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
}
}
}

boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
Expand Down
9 changes: 5 additions & 4 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class BrokerEntryMetadata;
class CommandActiveConsumerChange;
class CommandAckResponse;
class CommandMessage;
class CommandTopicMigrated;
class CommandCloseConsumer;
class CommandCloseProducer;
class CommandConnected;
Expand Down Expand Up @@ -414,17 +415,17 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
void handleProducerSuccess(const proto::CommandProducerSuccess&);
void handleError(const proto::CommandError&);
void handleTopicMigrated(const proto::CommandTopicMigrated&);
void handleCloseProducer(const proto::CommandCloseProducer&);
void handleCloseConsumer(const proto::CommandCloseConsumer&);
void handleAuthChallenge();
void handleGetLastMessageIdResponse(const proto::CommandGetLastMessageIdResponse&);
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
boost::optional<std::string> getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer);
boost::optional<std::string> getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
};
} // namespace pulsar

Expand Down
56 changes: 40 additions & 16 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
: mutex_(),
state_(Open),
serviceNameResolver_(serviceUrl),
clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
clientConfiguration_(ClientConfiguration(clientConfiguration)
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
memoryLimitController_(clientConfiguration.getMemoryLimit()),
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
Expand All @@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
}
lookupServicePtr_ = createLookup(serviceUrl);
}

ClientImpl::~ClientImpl() { shutdown(); }

LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
LookupServicePtr underlyingLookupServicePtr;
if (serviceNameResolver_.useHttp()) {
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
LOG_DEBUG("Using HTTP Lookup");
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_));
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
}

lookupServicePtr_ = RetryableLookupService::create(
auto lookupServicePtr = RetryableLookupService::create(
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
return lookupServicePtr;
}

ClientImpl::~ClientImpl() { shutdown(); }

const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; }

MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; }
Expand All @@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
return partitionListenerExecutorProvider_;
}

LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
if (redirectedClusterURI.empty()) {
return lookupServicePtr_;
}

Lock lock(mutex_);
auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
if (it == redirectedClusterLookupServicePtrs_.end()) {
auto lookup = createLookup(redirectedClusterURI);
redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
return lookup;
}

return it->second;
}

void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback, bool autoDownloadSchema) {
Expand Down Expand Up @@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
}
}

GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI,
const std::string& topic, size_t key) {
Promise<Result, ClientConnectionPtr> promise;

const auto topicNamePtr = TopicName::get(topic);
Expand All @@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
}

auto self = shared_from_this();
lookupServicePtr_->getBroker(*topicNamePtr)
getLookup(redirectedClusterURI)
->getBroker(*topicNamePtr)
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
if (result != ResultOk) {
promise.setFailed(result);
Expand All @@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
return promise.getFuture();
}

const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) {
const std::string& ClientImpl::getPhysicalAddress(const std::string& redirectedClusterURI,
const std::string& logicalAddress) {
if (useProxy_) {
return serviceNameResolver_.resolveHost();
return getLookup(redirectedClusterURI)->getServiceNameResolver().resolveHost();
} else {
return logicalAddress;
}
}

GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI,
const std::string& logicalAddress, size_t key) {
const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress);
Promise<Result, ClientConnectionPtr> promise;
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
Expand Down Expand Up @@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {

memoryLimitController_.close();
lookupServicePtr_->close();
for (const auto& it : redirectedClusterLookupServicePtrs_) {
it.second->close();
}

auto producers = producers_.move();
auto consumers = consumers_.move();
Expand Down
15 changes: 10 additions & 5 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

// Use virtual method to test
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI,
const std::string& topic, size_t key);

GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress,
size_t key);

void closeAsync(CloseCallback callback);
void shutdown();
Expand All @@ -119,7 +121,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr getIOExecutorProvider();
ExecutorServiceProviderPtr getListenerExecutorProvider();
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
LookupServicePtr getLookup();
LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");

void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }

Expand Down Expand Up @@ -165,7 +167,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
const std::string& consumerName, const ConsumerConfiguration& conf,
SubscribeCallback callback);

const std::string& getPhysicalAddress(const std::string& logicalAddress);
const std::string& getPhysicalAddress(const std::string& redirectedClusterURI,
const std::string& logicalAddress);

LookupServicePtr createLookup(const std::string& serviceUrl);

static std::string getClientVersion(const ClientConfiguration& clientConfiguration);

Expand All @@ -179,7 +184,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
std::mutex mutex_;

State state_;
ServiceNameResolver serviceNameResolver_;
ClientConfiguration clientConfiguration_;
MemoryLimitController memoryLimitController_;

Expand All @@ -188,6 +192,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr partitionListenerExecutorProvider_;

LookupServicePtr lookupServicePtr_;
std::unordered_map<std::string, LookupServicePtr> redirectedClusterLookupServicePtrs_;
ConnectionPool pool_;

uint64_t producerIdGenerator_;
Expand Down
4 changes: 2 additions & 2 deletions lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;

HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
serviceNameResolver_(serviceNameResolver),
serviceNameResolver_(serviceUrl),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),
Expand Down
6 changes: 4 additions & 2 deletions lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
typedef Promise<Result, LookupDataResultPtr> LookupPromise;

ExecutorServiceProviderPtr executorProvider_;
ServiceNameResolver& serviceNameResolver_;
ServiceNameResolver serviceNameResolver_;
AuthenticationPtr authenticationPtr_;
int lookupTimeoutInSeconds_;
const int maxLookupRedirects_;
Expand All @@ -64,7 +64,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode);

public:
HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&);
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);

LookupResultFuture getBroker(const TopicName& topicName) override;

Expand All @@ -74,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;

ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
};
} // namespace pulsar

Expand Down
16 changes: 13 additions & 3 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
epoch_(0),
timer_(executor_->createDeadlineTimer()),
creationTimer_(executor_->createDeadlineTimer()),
reconnectionPending_(false) {}
reconnectionPending_(false),
redirectedClusterURI_("") {}

HandlerBase::~HandlerBase() {
ASIO_ERROR ignored;
Expand Down Expand Up @@ -88,9 +89,9 @@ void HandlerBase::grabCnx() { grabCnx(boost::none); }
Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) {
if (assignedBrokerUrl && client->getLookupCount() > 0) {
return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
return client->connect(getRedirectedClusterURI(), assignedBrokerUrl.get(), connectionKeySuffix_);
} else {
return client->getConnection(topic(), connectionKeySuffix_);
return client->getConnection(getRedirectedClusterURI(), topic(), connectionKeySuffix_);
}
}

Expand Down Expand Up @@ -209,4 +210,13 @@ Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimest
}
}

void HandlerBase::setRedirectedClusterURI(const std::string& serviceUrl) {
Lock lock(mutex_);
redirectedClusterURI_ = serviceUrl;
}
const std::string& HandlerBase::getRedirectedClusterURI() {
Lock lock(mutex_);
return redirectedClusterURI_;
}

} // namespace pulsar
Loading
Loading