Skip to content

Commit

Permalink
[improve] [broker] Do not try to open ML when the topic meta does not…
Browse files Browse the repository at this point in the history
… exist and do not expect to create a new one. apache#21995 (apache#22004)

Co-authored-by: Jiwe Guo <[email protected]>
  • Loading branch information
poorbarcode and Technoboy- authored Feb 23, 2024
1 parent 48c7e32 commit 1c652f5
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -967,43 +967,49 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
getTopicPoliciesBypassSystemTopic(topicName);
return topicPoliciesFuture.exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil
.failedFuture(new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
return CompletableFuture.completedFuture(Optional.empty());
}
return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName,
rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
});
});
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testEvents(String topicTypePersistence, String topicTypePartitioned,
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

events.clear();
if (topicTypePartitioned.equals("partitioned")) {
Expand All @@ -150,7 +150,7 @@ public void testEventsWithUnload(String topicTypePersistence, String topicTypePa
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

events.clear();
admin.topics().unload(topicName);
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

admin.namespaces().setInactiveTopicPolicies(namespace,
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
Expand All @@ -262,33 +262,36 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti
);
}

private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception {
private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception {
final String[] expectedEvents;
if (topicTypePartitioned.equals("partitioned")) {
topicNameToWatch = topicName + "-partition-1";
admin.topics().createPartitionedTopic(topicName, 2);
triggerPartitionsCreation(topicName);

if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};

} else {
topicNameToWatch = topicName;
admin.topics().createNonPartitionedTopic(topicName);

expectedEvents = new String[]{
// Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic
// was already exists, and the action "check topic exists" will try to load Managed ledger,
// the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE].
// #21995 fixed this wrong behavior, so remove these two events.
"LOAD__BEFORE",
"LOAD__FAILURE",
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};

}
if (topicTypePartitioned.equals("partitioned")) {
topicNameToWatch = topicName + "-partition-1";
admin.topics().createPartitionedTopic(topicName, 2);
triggerPartitionsCreation(topicName);
} else {
topicNameToWatch = topicName;
admin.topics().createNonPartitionedTopic(topicName);
}

Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3418,4 +3418,32 @@ private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
producer.close();
admin.topics().delete(topic);
}

@Test
public void testGetStatsIfPartitionNotExists() throws Exception {
// create topic.
final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
admin.topics().createPartitionedTopic(partitionedTp, 1);
TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertTrue(topicExists1);
// Verify topics-stats works.
TopicStats topicStats = admin.topics().getStats(partition0.toString());
assertNotNull(topicStats);

// Delete partition and call topic-stats again.
admin.topics().delete(partition0.toString());
boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertFalse(topicExists2);
// Verify: respond 404.
try {
admin.topics().getStats(partition0.toString());
fail("Should respond 404 after the partition was deleted");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("Topic partitions were not yet created"));
}

// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,23 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create()) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe()) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ public void testPersistentPartitionedTopicUnload() throws Exception {

assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
pulsar.getBrokerService().getTopicIfExists(topicName).get();
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName));
// The map topics should only contain partitions, does not contain partitioned topic.
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));

// ref of partitioned-topic name should be empty
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
Expand Down

0 comments on commit 1c652f5

Please sign in to comment.