Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] No longer allow creating subscription that contains slash #23594

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,15 @@ public void createSubscription(
) {
try {
validateTopicName(tenant, namespace, topic);
String decodedSubName = decode(encodedSubName);
// If subscription is as "a/b". The url of HTTP API that defined as
// "{tenant}/{namespace}/{topic}/{subscription}" will be like below:
// "public/default/tp/a/b", then the broker will assume it is a topic that
// using the old rule "{tenant}/{cluster}/{namespace}/{topic}/{subscription}".
// So denied to create a subscription that contains "/".
if (decodedSubName.contains("/")) {
throw new RestException(Response.Status.BAD_REQUEST, "Subscription does not allow containing '/'");
}
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,9 +1348,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}
return service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowedAutoSubscriptionCreation -> {
boolean subscriptionExists =
topic.getSubscriptions().containsKey(subscriptionName);
// If subscription is as "a/b". The url of HTTP API that defined as
// "{tenant}/{namespace}/{topic}/{subscription}" will be like below:
// "public/default/tp/a/b", then the broker will assume it is a topic that
// using the old rule "{tenant}/{cluster}/{namespace}/{topic}/{subscription}".
// So denied to create a subscription that contains "/".
if (!subscriptionExists && subscriptionName.contains("/")) {
return FutureUtil.failedFuture(
new BrokerServiceException.NamingException(
"Subscription does not allow containing '/'"));
}

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !isAllowedAutoSubscriptionCreation
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& !subscriptionExists
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,29 @@ public long millis() {
}
}

@Test(timeOut = 30000)
public void testSlashSubscriptionName() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("my-property/my-ns/tp");
admin.topics().createNonPartitionedTopic(topic);
try {
admin.topics().createSubscription(topic, "a/b", MessageId.earliest);
fail("The creation for the subscription that contains '/' should fail");
} catch (PulsarAdminException ex) {
assertTrue(ex.getMessage().contains("Subscription does not allow containing"));
// Expected.
}
try {
pulsarClient.newConsumer().topic(topic).subscriptionName("b/c").subscribe();
fail("The creation for the subscription that contains '/' should fail");
} catch (PulsarClientException ex) {
assertTrue(ex.getMessage().contains("Subscription does not allow containing"));
// Expected.
}
assertEquals(admin.topics().getStats(topic).getSubscriptions().size(), 0);
// cleanup.
admin.topics().delete(topic);
}

@Test(timeOut = 100000)
public void testPublishTimestampBatchEnabled() throws Exception {

Expand Down
Loading