Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker]Support to migrate topics from blue to green cluster per…
Browse files Browse the repository at this point in the history
… namespace (apache#21367)

Co-authored-by: Vishwadeepsinh Raulji <[email protected]>
  • Loading branch information
vraulji567 and Vishwadeepsinh Raulji authored Oct 16, 2023
1 parent d09642c commit a10564d
Show file tree
Hide file tree
Showing 14 changed files with 579 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2642,4 +2642,18 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
return null;
});
}

protected void internalEnableMigration(boolean migrated) {
validateSuperUserAccess();
try {
updatePolicies(namespaceName, policies -> {
policies.isMigrated = migrated;
return policies;
});
log.info("Successfully updated migration on namespace {}", namespaceName);
} catch (Exception e) {
log.error("Failed to update migration on namespace {}", namespaceName, e);
throw new RestException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1709,5 +1709,18 @@ public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String
internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
}

@POST
@Path("/{property}/{cluster}/{namespace}/migration")
@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void enableMigration(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
boolean migrated) {
validateNamespaceName(property, cluster, namespace);
internalEnableMigration(migrated);
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,17 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,
});
}


@POST
@Path("/{tenant}/{namespace}/migration")
@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void enableMigration(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
boolean migrated) {
validateNamespaceName(tenant, namespace);
internalEnableMigration(migrated);
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1349,19 +1349,29 @@ public void updateBrokerSubscribeRate() {
}

public Optional<ClusterUrl> getMigratedClusterUrl() {
return getMigratedClusterUrl(brokerService.getPulsar());
return getMigratedClusterUrl(brokerService.getPulsar(), topic);
}

public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar) {
public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar,
String topic) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
.thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled)
-> ((clusterData.isPresent() && clusterData.get().isMigrated())
|| isNamespaceMigrationEnabled)
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty());
: Optional.empty()));
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar) {
private static CompletableFuture<Boolean> isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
return pulsar.getPulsarResources().getNamespaceResources().
getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(policies -> policies.isPresent() && policies.get().isMigrated);
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, String topic) {
try {
return getMigratedClusterUrlAsync(pulsar)
return getMigratedClusterUrlAsync(pulsar, topic)
.get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to get migration cluster URL", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ public void topicMigrated(Optional<ClusterUrl> clusterUrl) {

public boolean checkAndApplyTopicMigration() {
if (subscription.isSubsciptionMigrated()) {
Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar());
Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar(),
topicName);
if (clusterUrl.isPresent()) {
ClusterUrl url = clusterUrl.get();
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
producers.remove(producerId, producerFuture);
}).exceptionallyAsync(ex -> {
if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar());
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(), topic.getName());
if (clusterURL.isPresent()) {
if (topic.isReplicationBacklogExist()) {
log.info("Topic {} is migrated but replication backlog exist: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,8 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
}

private CompletableFuture<Void> updateClusterMigrated() {
return getMigratedClusterUrlAsync(brokerService.getPulsar()).thenAccept(url -> migrated = url.isPresent());
}

private Optional<ClusterUrl> getClusterMigrationUrl() {
return getMigratedClusterUrl(brokerService.getPulsar());
return getMigratedClusterUrlAsync(brokerService.getPulsar(), topic)
.thenAccept(url -> migrated = url.isPresent());
}

public CompletableFuture<Void> initialize() {
Expand Down Expand Up @@ -332,7 +329,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta, MessageId.latest,
DEFAULT_CONSUMER_EPOCH, schemaType);
if (isMigrated()) {
consumer.topicMigrated(getClusterMigrationUrl());
consumer.topicMigrated(getMigratedClusterUrl());
}

addConsumerToSubscription(subscription, consumer).thenRun(() -> {
Expand Down Expand Up @@ -949,7 +946,7 @@ public boolean isActive() {

@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> url = getClusterMigrationUrl();
Optional<ClusterUrl> url = getMigratedClusterUrl();
if (url.isPresent()) {
this.migrated = true;
producers.forEach((__, producer) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,7 @@ private boolean hasBacklogs() {
@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();

if (!clusterUrl.isPresent()) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -2614,13 +2615,13 @@ private CompletableFuture<Void> initMigration() {
return CompletableFuture.completedFuture(null);
}
log.info("{} initializing subscription created at migration cluster", topic);
return getMigratedClusterUrlAsync(getBrokerService().getPulsar()).thenCompose(clusterUrl -> {
return getMigratedClusterUrlAsync(getBrokerService().getPulsar(), topic).thenCompose(clusterUrl -> {
if (!brokerService.getPulsar().getConfig().isClusterMigrationAutoResourceCreation()) {
return CompletableFuture.completedFuture(null);
}
if (!clusterUrl.isPresent()) {
return FutureUtil
.failedFuture(new TopicMigratedException("cluster migration service-url is not configired"));
.failedFuture(new TopicMigratedException("cluster migration service-url is not configured"));
}
ClusterUrl url = clusterUrl.get();
ClusterData clusterData = ClusterData.builder().serviceUrl(url.getServiceUrl())
Expand Down
Loading

0 comments on commit a10564d

Please sign in to comment.