From 6cf143995c15d56d63c30516e43b6a3b7a4654c4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 12 Nov 2024 23:47:04 +0800 Subject: [PATCH] [fix] [broker] No longer allow creating subscription that contains slash --- .../broker/admin/v2/PersistentTopics.java | 9 ++++++++ .../pulsar/broker/service/ServerCnx.java | 15 +++++++++++- .../api/SimpleProducerConsumerTest.java | 23 +++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index a8e5e7a3ce77b..ecd4524d1d419 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1708,6 +1708,15 @@ public void createSubscription( ) { try { validateTopicName(tenant, namespace, topic); + String decodedSubName = decode(encodedSubName); + // If subscription is as "a/b". The url of HTTP API that defined as + // "{tenant}/{namespace}/{topic}/{subscription}" will be like below: + // "public/default/tp/a/b", then the broker will assume it is a topic that + // using the old rule "{tenant}/{cluster}/{namespace}/{topic}/{subscription}". + // So denied to create a subscription that contains "/". + if (decodedSubName.contains("/")) { + throw new RestException(Response.Status.BAD_REQUEST, "Subscription does not allow containing '/'"); + } if (!topicName.isPersistent()) { throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic " + "can only be done through client"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f9e593345d85f..ed7b2a718a152 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1348,9 +1348,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } return service.isAllowAutoSubscriptionCreationAsync(topicName) .thenCompose(isAllowedAutoSubscriptionCreation -> { + boolean subscriptionExists = + topic.getSubscriptions().containsKey(subscriptionName); + // If subscription is as "a/b". The url of HTTP API that defined as + // "{tenant}/{namespace}/{topic}/{subscription}" will be like below: + // "public/default/tp/a/b", then the broker will assume it is a topic that + // using the old rule "{tenant}/{cluster}/{namespace}/{topic}/{subscription}". + // So denied to create a subscription that contains "/". + if (!subscriptionExists && subscriptionName.contains("/")) { + return FutureUtil.failedFuture( + new BrokerServiceException.NamingException( + "Subscription does not allow containing '/'")); + } + boolean rejectSubscriptionIfDoesNotExist = isDurable && !isAllowedAutoSubscriptionCreation - && !topic.getSubscriptions().containsKey(subscriptionName) + && !subscriptionExists && topic.isPersistent(); if (rejectSubscriptionIfDoesNotExist) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 78d28e4b22834..c0552f1c1a3eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -283,6 +283,29 @@ public long millis() { } } + @Test(timeOut = 30000) + public void testSlashSubscriptionName() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("my-property/my-ns/tp"); + admin.topics().createNonPartitionedTopic(topic); + try { + admin.topics().createSubscription(topic, "a/b", MessageId.earliest); + fail("The creation for the subscription that contains '/' should fail"); + } catch (PulsarAdminException ex) { + assertTrue(ex.getMessage().contains("Subscription does not allow containing")); + // Expected. + } + try { + pulsarClient.newConsumer().topic(topic).subscriptionName("b/c").subscribe(); + fail("The creation for the subscription that contains '/' should fail"); + } catch (PulsarClientException ex) { + assertTrue(ex.getMessage().contains("Subscription does not allow containing")); + // Expected. + } + assertEquals(admin.topics().getStats(topic).getSubscriptions().size(), 0); + // cleanup. + admin.topics().delete(topic); + } + @Test(timeOut = 100000) public void testPublishTimestampBatchEnabled() throws Exception {