Skip to content

Commit

Permalink
Fix incompatibility with new lookup API
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Dec 27, 2023
1 parent 0e749ee commit 120f655
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
Expand Down Expand Up @@ -70,7 +70,7 @@ private void findBroker(TopicName topicName,
AtomicLong remainingTime,
CompletableFuture<InetSocketAddress> future) {
pulsarClient.getLookup().getBroker(topicName)
.thenCompose(lookupPair ->
.thenCompose(lookupResult ->
localBrokerDataCache.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).thenCompose(brokers -> {
// Get all broker data by metadata
List<CompletableFuture<Optional<LocalBrokerData>>> brokerDataFutures =
Expand All @@ -85,7 +85,7 @@ private void findBroker(TopicName topicName,
Optional<LocalBrokerData> specificBrokerData =
brokerDataFutures.stream().map(CompletableFuture::join)
.filter(brokerData -> brokerData.isPresent()
&& isLookupMQTTBroker(lookupPair, brokerData.get()))
&& isLookupMQTTBroker(lookupResult, brokerData.get()))
.map(Optional::get)
.findAny();
if (!specificBrokerData.isPresent()) {
Expand All @@ -110,7 +110,7 @@ && isLookupMQTTBroker(lookupPair, brokerData.get()))
String port = splits[splits.length - 1];
int mqttBrokerPort = Integer.parseInt(port);
return CompletableFuture.completedFuture(new InetSocketAddress(
lookupPair.getLeft().getHostName(), mqttBrokerPort));
lookupResult.getLogicalAddress().getHostName(), mqttBrokerPort));
});
}))
.thenAccept(future::complete)
Expand Down Expand Up @@ -149,14 +149,16 @@ public CompletableFuture<InetSocketAddress> findBroker(TopicName topicName) {
return lookupResult;
}

private boolean isLookupMQTTBroker(Pair<InetSocketAddress, InetSocketAddress> pair,
private boolean isLookupMQTTBroker(LookupTopicResult result,
LocalBrokerData localBrokerData) {

String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String plain = String.format("pulsar://%s:%s", result.getLogicalAddress().getHostName(),
result.getLogicalAddress().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", result.getLogicalAddress().getHostName(),
result.getLogicalAddress().getPort());
return localBrokerData.getProtocol(protocolHandlerName).isPresent()
&& (localBrokerData.getPulsarServiceUrl().equals(plain)
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<mockito.version>2.22.0</mockito.version>
<testng.version>6.14.3</testng.version>
<awaitility.version>4.0.2</awaitility.version>
<pulsar.version>3.0.0.1-SNAPSHOT</pulsar.version>
<pulsar.version>3.2.0-SNAPSHOT</pulsar.version>
<mqtt.codec.version>4.1.94.Final</mqtt.codec.version>
<log4j2.version>2.18.0</log4j2.version>
<fusesource.client.version>1.16</fusesource.client.version>
Expand Down

0 comments on commit 120f655

Please sign in to comment.