From 5ec21ee9fc70bab948e59191c2eb878b5131bc8e Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 27 Dec 2023 14:23:13 +0800 Subject: [PATCH] Fix incompatibility with new lookup&ServerCnx API --- .../mqtt/proxy/PulsarServiceLookupHandler.java | 18 ++++++++++-------- .../handlers/mqtt/support/MQTTServerCnx.java | 4 +++- pom.xml | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java index f0de08985..74ecebf2e 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -34,6 +33,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.BackoffBuilder; +import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; @@ -70,7 +70,7 @@ private void findBroker(TopicName topicName, AtomicLong remainingTime, CompletableFuture future) { pulsarClient.getLookup().getBroker(topicName) - .thenCompose(lookupPair -> + .thenCompose(lookupResult -> localBrokerDataCache.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).thenCompose(brokers -> { // Get all broker data by metadata List>> brokerDataFutures = @@ -85,7 +85,7 @@ private void findBroker(TopicName topicName, Optional specificBrokerData = brokerDataFutures.stream().map(CompletableFuture::join) .filter(brokerData -> brokerData.isPresent() - && isLookupMQTTBroker(lookupPair, brokerData.get())) + && isLookupMQTTBroker(lookupResult, brokerData.get())) .map(Optional::get) .findAny(); if (!specificBrokerData.isPresent()) { @@ -110,7 +110,7 @@ && isLookupMQTTBroker(lookupPair, brokerData.get())) String port = splits[splits.length - 1]; int mqttBrokerPort = Integer.parseInt(port); return CompletableFuture.completedFuture(new InetSocketAddress( - lookupPair.getLeft().getHostName(), mqttBrokerPort)); + lookupResult.getLogicalAddress().getHostName(), mqttBrokerPort)); }); })) .thenAccept(future::complete) @@ -149,14 +149,16 @@ public CompletableFuture findBroker(TopicName topicName) { return lookupResult; } - private boolean isLookupMQTTBroker(Pair pair, + private boolean isLookupMQTTBroker(LookupTopicResult result, LocalBrokerData localBrokerData) { - String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort()); - String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort()); + String plain = String.format("pulsar://%s:%s", result.getLogicalAddress().getHostName(), + result.getLogicalAddress().getPort()); + String ssl = String.format("pulsar+ssl://%s:%s", result.getLogicalAddress().getHostName(), + result.getLogicalAddress().getPort()); return localBrokerData.getProtocol(protocolHandlerName).isPresent() && (localBrokerData.getPulsarServiceUrl().equals(plain) - || localBrokerData.getPulsarServiceUrlTls().equals(ssl)); + || localBrokerData.getPulsarServiceUrlTls().equals(ssl)); } @Override diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java index 3ccb74bd5..348e391e3 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java @@ -14,9 +14,11 @@ package io.streamnative.pulsar.handlers.mqtt.support; import io.netty.channel.ChannelHandlerContext; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.ServerCnx; @@ -42,7 +44,7 @@ protected void close() { } @Override - public void closeConsumer(Consumer consumer) { + public void closeConsumer(Consumer consumer, Optional assignedBrokerLookupData) { safelyRemoveConsumer(consumer); MQTTConsumer mqttConsumer = (MQTTConsumer) consumer; mqttConsumer.getConnection().disconnect(); diff --git a/pom.xml b/pom.xml index 782dad770..538cecb90 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 2.22.0 6.14.3 4.0.2 - 3.0.0.1-SNAPSHOT + 3.2.0-SNAPSHOT 4.1.94.Final 2.18.0 1.16