diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java index f0de08985..74ecebf2e 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -34,6 +33,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.BackoffBuilder; +import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; @@ -70,7 +70,7 @@ private void findBroker(TopicName topicName, AtomicLong remainingTime, CompletableFuture future) { pulsarClient.getLookup().getBroker(topicName) - .thenCompose(lookupPair -> + .thenCompose(lookupResult -> localBrokerDataCache.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).thenCompose(brokers -> { // Get all broker data by metadata List>> brokerDataFutures = @@ -85,7 +85,7 @@ private void findBroker(TopicName topicName, Optional specificBrokerData = brokerDataFutures.stream().map(CompletableFuture::join) .filter(brokerData -> brokerData.isPresent() - && isLookupMQTTBroker(lookupPair, brokerData.get())) + && isLookupMQTTBroker(lookupResult, brokerData.get())) .map(Optional::get) .findAny(); if (!specificBrokerData.isPresent()) { @@ -110,7 +110,7 @@ && isLookupMQTTBroker(lookupPair, brokerData.get())) String port = splits[splits.length - 1]; int mqttBrokerPort = Integer.parseInt(port); return CompletableFuture.completedFuture(new InetSocketAddress( - lookupPair.getLeft().getHostName(), mqttBrokerPort)); + lookupResult.getLogicalAddress().getHostName(), mqttBrokerPort)); }); })) .thenAccept(future::complete) @@ -149,14 +149,16 @@ public CompletableFuture findBroker(TopicName topicName) { return lookupResult; } - private boolean isLookupMQTTBroker(Pair pair, + private boolean isLookupMQTTBroker(LookupTopicResult result, LocalBrokerData localBrokerData) { - String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort()); - String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort()); + String plain = String.format("pulsar://%s:%s", result.getLogicalAddress().getHostName(), + result.getLogicalAddress().getPort()); + String ssl = String.format("pulsar+ssl://%s:%s", result.getLogicalAddress().getHostName(), + result.getLogicalAddress().getPort()); return localBrokerData.getProtocol(protocolHandlerName).isPresent() && (localBrokerData.getPulsarServiceUrl().equals(plain) - || localBrokerData.getPulsarServiceUrlTls().equals(ssl)); + || localBrokerData.getPulsarServiceUrlTls().equals(ssl)); } @Override diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java index 3ccb74bd5..348e391e3 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTServerCnx.java @@ -14,9 +14,11 @@ package io.streamnative.pulsar.handlers.mqtt.support; import io.netty.channel.ChannelHandlerContext; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.ServerCnx; @@ -42,7 +44,7 @@ protected void close() { } @Override - public void closeConsumer(Consumer consumer) { + public void closeConsumer(Consumer consumer, Optional assignedBrokerLookupData) { safelyRemoveConsumer(consumer); MQTTConsumer mqttConsumer = (MQTTConsumer) consumer; mqttConsumer.getConnection().disconnect(); diff --git a/pom.xml b/pom.xml index 782dad770..538cecb90 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 2.22.0 6.14.3 4.0.2 - 3.0.0.1-SNAPSHOT + 3.2.0-SNAPSHOT 4.1.94.Final 2.18.0 1.16 diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java index dc94fbc4b..d279aea89 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java @@ -56,6 +56,7 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.message.BasicNameValuePair; import org.apache.http.protocol.HTTP; +import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamespaceName; @@ -285,7 +286,7 @@ public void testProxyProcessPingReq() { Thread.sleep(4000); // Sleep 2 times of setKeepAlive. Assert.assertTrue(producer.isConnected()); // Check for broker - CompletableFuture> broker = + CompletableFuture broker = ((PulsarClientImpl) pulsarClient).getLookup().getBroker(TopicName.get(topic)); AtomicDouble active = new AtomicDouble(0); AtomicDouble total = new AtomicDouble(0); @@ -293,7 +294,8 @@ public void testProxyProcessPingReq() { broker.thenAccept(pair -> { try { HttpClient httpClient = HttpClientBuilder.create().build(); - final String mopEndPoint = "http://localhost:" + (pair.getLeft().getPort() + 2) + "/mop/stats"; + final String mopEndPoint = + "http://localhost:" + (pair.getLogicalAddress().getPort() + 2) + "/mop/stats"; HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint)); InputStream inputStream = response.getEntity().getContent(); InputStreamReader isReader = new InputStreamReader(inputStream);