Skip to content

Commit

Permalink
Fix incompatibility with new lookup&ServerCnx API
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Dec 27, 2023
1 parent 0e749ee commit 253ae76
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
Expand Down Expand Up @@ -70,7 +70,7 @@ private void findBroker(TopicName topicName,
AtomicLong remainingTime,
CompletableFuture<InetSocketAddress> future) {
pulsarClient.getLookup().getBroker(topicName)
.thenCompose(lookupPair ->
.thenCompose(lookupResult ->
localBrokerDataCache.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).thenCompose(brokers -> {
// Get all broker data by metadata
List<CompletableFuture<Optional<LocalBrokerData>>> brokerDataFutures =
Expand All @@ -85,7 +85,7 @@ private void findBroker(TopicName topicName,
Optional<LocalBrokerData> specificBrokerData =
brokerDataFutures.stream().map(CompletableFuture::join)
.filter(brokerData -> brokerData.isPresent()
&& isLookupMQTTBroker(lookupPair, brokerData.get()))
&& isLookupMQTTBroker(lookupResult, brokerData.get()))
.map(Optional::get)
.findAny();
if (!specificBrokerData.isPresent()) {
Expand All @@ -110,7 +110,7 @@ && isLookupMQTTBroker(lookupPair, brokerData.get()))
String port = splits[splits.length - 1];
int mqttBrokerPort = Integer.parseInt(port);
return CompletableFuture.completedFuture(new InetSocketAddress(
lookupPair.getLeft().getHostName(), mqttBrokerPort));
lookupResult.getLogicalAddress().getHostName(), mqttBrokerPort));
});
}))
.thenAccept(future::complete)
Expand Down Expand Up @@ -149,14 +149,16 @@ public CompletableFuture<InetSocketAddress> findBroker(TopicName topicName) {
return lookupResult;
}

private boolean isLookupMQTTBroker(Pair<InetSocketAddress, InetSocketAddress> pair,
private boolean isLookupMQTTBroker(LookupTopicResult result,
LocalBrokerData localBrokerData) {

String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String plain = String.format("pulsar://%s:%s", result.getLogicalAddress().getHostName(),
result.getLogicalAddress().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", result.getLogicalAddress().getHostName(),
result.getLogicalAddress().getPort());
return localBrokerData.getProtocol(protocolHandlerName).isPresent()
&& (localBrokerData.getPulsarServiceUrl().equals(plain)
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package io.streamnative.pulsar.handlers.mqtt.support;

import io.netty.channel.ChannelHandlerContext;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;

Expand All @@ -42,7 +44,7 @@ protected void close() {
}

@Override
public void closeConsumer(Consumer consumer) {
public void closeConsumer(Consumer consumer, Optional<BrokerLookupData> assignedBrokerLookupData) {
safelyRemoveConsumer(consumer);
MQTTConsumer mqttConsumer = (MQTTConsumer) consumer;
mqttConsumer.getConnection().disconnect();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<mockito.version>2.22.0</mockito.version>
<testng.version>6.14.3</testng.version>
<awaitility.version>4.0.2</awaitility.version>
<pulsar.version>3.0.0.1-SNAPSHOT</pulsar.version>
<pulsar.version>3.2.0-SNAPSHOT</pulsar.version>
<mqtt.codec.version>4.1.94.Final</mqtt.codec.version>
<log4j2.version>2.18.0</log4j2.version>
<fusesource.client.version>1.16</fusesource.client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HTTP;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -285,15 +286,16 @@ public void testProxyProcessPingReq() {
Thread.sleep(4000); // Sleep 2 times of setKeepAlive.
Assert.assertTrue(producer.isConnected());
// Check for broker
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> broker =
CompletableFuture<LookupTopicResult> broker =
((PulsarClientImpl) pulsarClient).getLookup().getBroker(TopicName.get(topic));
AtomicDouble active = new AtomicDouble(0);
AtomicDouble total = new AtomicDouble(0);
CompletableFuture<Void> result = new CompletableFuture<>();
broker.thenAccept(pair -> {
try {
HttpClient httpClient = HttpClientBuilder.create().build();
final String mopEndPoint = "http://localhost:" + (pair.getLeft().getPort() + 2) + "/mop/stats";
final String mopEndPoint =
"http://localhost:" + (pair.getLogicalAddress().getPort() + 2) + "/mop/stats";
HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint));
InputStream inputStream = response.getEntity().getContent();
InputStreamReader isReader = new InputStreamReader(inputStream);
Expand Down

0 comments on commit 253ae76

Please sign in to comment.