Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-307: Add proxy support for Java client #21789

Merged
merged 9 commits into from
Dec 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
Expand Down Expand Up @@ -300,7 +301,7 @@ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
}

public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
return pulsarClient.getConnection(topic, randomKeyForSelectConnection).thenApply(Pair::getLeft);
}

public CompletableFuture<ClientCnx> getClientCnx(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -135,10 +135,10 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(i -> {
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress, false));
});
final String topicPoliciesServiceInitException
= "Topic creation encountered an exception by initialize topic policies service";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
Expand Down Expand Up @@ -270,9 +271,10 @@ public void testTransactionBufferClientTimeout() throws Exception {
CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture);
when(((PulsarClientImpl)mockClient).getConnection(any(), any(), anyInt())).thenReturn(completableFuture);
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
when(mockClient.getConnection(any(), any(), anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
Expand Down Expand Up @@ -324,10 +326,9 @@ public void testTransactionBufferChannelUnActive() throws PulsarServerException
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString(), anyInt())).thenReturn(completableFuture);
when(mockClient.getConnection(anyString(), anyInt())).thenReturn(
CompletableFuture.completedFuture(Pair.of(clientCnx, false)));
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
Expand Down Expand Up @@ -57,9 +58,9 @@ public void testRequestCredits() throws PulsarServerException {
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
Optional<NamespaceEphemeralData> opData = Optional.empty();
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
when(pulsarClient.getConnection(anyString(), anyInt()))
.thenReturn(CompletableFuture.completedFuture(Pair.of(mock(ClientCnx.class), false)));
when(pulsarClient.getConnection(anyString()))
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,21 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
TopicName topicName = TopicName.get("persistent://public/default/test");

// test request 1
{
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey(), brokerAddress);
Assert.assertEquals(result.getValue(), brokerAddress);
var result = lookupService.getBroker(topicName).get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
Assert.assertEquals(result.isUseProxy(), false);
}
// test request 2
{
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey(), brokerAddress);
Assert.assertEquals(result.getValue(), brokerAddress);
var result = lookupService.getBroker(topicName).get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getLogicalAddress(), brokerAddress);
Assert.assertEquals(result.getPhysicalAddress(), brokerAddress);
Assert.assertEquals(result.isUseProxy(), false);
}
}

Expand Down Expand Up @@ -187,12 +187,11 @@ public void testHttpLookupRedirect() throws Exception {
doReturn(CompletableFuture.completedFuture(optional), CompletableFuture.completedFuture(optional2))
.when(namespaceService).getBrokerServiceUrlAsync(any(), any());

CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));

Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey(), address);
Assert.assertEquals(result.getValue(), address);
var result =
lookupService.getBroker(TopicName.get("persistent://public/default/test")).get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getLogicalAddress(), address);
Assert.assertEquals(result.getPhysicalAddress(), address);
Assert.assertEquals(result.isUseProxy(), false);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public CompletableFuture<ClientCnx> getConnection(String topic) {
result.completeExceptionally(new IOException("New connections are rejected."));
return result;
} else {
return super.getConnection(topic, getCnxPool().genRandomKeyToSelectCon());
return super.getConnection(topic);
heesung-sn marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
Expand All @@ -58,7 +57,7 @@ public class BinaryProtoLookupService implements LookupService {
private final String listenerName;
private final int maxLookupRedirects;

private final ConcurrentHashMap<TopicName, CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>>
private final ConcurrentHashMap<TopicName, CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();

private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
Expand Down Expand Up @@ -99,11 +98,11 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
* topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> newFuture =
CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
newFutureCreated.setValue(newFuture);
return newFuture;
Expand Down Expand Up @@ -139,9 +138,9 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(T
}
}

private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socketAddress,
boolean authoritative, TopicName topicName, final int redirectCount) {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
CompletableFuture<LookupTopicResult> addressFuture = new CompletableFuture<>();

if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
addressFuture.completeExceptionally(
Expand All @@ -159,7 +158,6 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
if (log.isDebugEnabled()) {
log.debug("[{}] Lookup response exception: {}", topicName, t);
}

addressFuture.completeExceptionally(t);
} else {
URI uri = null;
Expand Down Expand Up @@ -198,10 +196,12 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
// (3) received correct broker to connect
if (r.proxyThroughServiceUrl) {
// Connect through proxy
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
addressFuture.complete(
new LookupTopicResult(responseBrokerAddress, socketAddress, true));
} else {
// Normal result with direct connection to broker
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
addressFuture.complete(
new LookupTopicResult(responseBrokerAddress, responseBrokerAddress, false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class ConnectionHandler {
private final AtomicBoolean duringConnect = new AtomicBoolean(false);
protected final int randomKeyForSelectConnection;

private volatile Boolean useProxy;

interface Connection {

/**
Expand Down Expand Up @@ -93,11 +95,14 @@ protected void grabCnx(Optional<URI> hostURI) {

try {
CompletableFuture<ClientCnx> cnxFuture;
if (hostURI.isPresent()) {
InetSocketAddress address = InetSocketAddress.createUnresolved(
hostURI.get().getHost(),
hostURI.get().getPort());
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
if (hostURI.isPresent() && useProxy != null) {
URI uri = hostURI.get();
InetSocketAddress address = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
if (useProxy) {
cnxFuture = state.client.getProxyConnection(address, randomKeyForSelectConnection);
} else {
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (state.redirectedClusterURI != null) {
if (state.topic == null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
Expand All @@ -112,7 +117,11 @@ protected void grabCnx(Optional<URI> hostURI) {
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection);
cnxFuture = state.client.getConnection(state.topic, randomKeyForSelectConnection).thenApply(
connectionResult -> {
useProxy = connectionResult.getRight();
return connectionResult.getLeft();
});
}
cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.client.api.SchemaSerializationException;
Expand Down Expand Up @@ -81,7 +80,7 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
*/
@Override
@SuppressWarnings("deprecation")
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
Expand All @@ -101,7 +100,8 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
}

InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
false /* HTTP lookups never use the proxy */));
} catch (Exception e) {
// Failed to parse url
log.warn("[{}] Lookup Failed due to invalid url {}, {}", topicName, uri, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
Expand Down Expand Up @@ -54,9 +53,10 @@ public interface LookupService extends AutoCloseable {
*
* @param topicName
* topic-name
* @return a pair of addresses, representing the logical and physical address of the broker that serves given topic
* @return a {@link LookupTopicResult} representing the logical and physical address of the broker that serves the
* given topic, as well as proxying information.
*/
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName);
CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);

/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@AllArgsConstructor
@ToString
public class LookupTopicResult {
private final InetSocketAddress logicalAddress;
private final InetSocketAddress physicalAddress;
private final boolean isUseProxy;
}
Loading
Loading