Skip to content

Commit

Permalink
[improve] PIP-307: Use assigned broker URL hints during broker reconn…
Browse files Browse the repository at this point in the history
…ection (#1208)
  • Loading branch information
dragosvictor authored Apr 23, 2024
1 parent 458defe commit 86054c5
Show file tree
Hide file tree
Showing 14 changed files with 1,504 additions and 949 deletions.
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,23 @@ container:
--build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \
--build-arg ARCH="${CONTAINER_ARCH}" .

test: container test_standalone test_clustered
test: container test_standalone test_clustered test_extensible_load_manager

test_standalone: container
docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh"

test_clustered: container
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d
until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh"
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down

test_extensible_load_manager: container
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/extensible-load-manager/docker-compose.yml up -d
until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
docker run --network "extensible-load-manager_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-extensible-load-manager.sh"
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/extensible-load-manager/docker-compose.yml down

clean:
docker rmi --force $(IMAGE_NAME) || true
rm bin/*
181 changes: 181 additions & 0 deletions integration-tests/extensible-load-manager/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: '3'
networks:
pulsar:
driver: bridge
services:
# Start ZooKeeper
zookeeper:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: zookeeper
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
exec bin/pulsar zookeeper"
healthcheck:
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
interval: 10s
timeout: 5s
retries: 30

# Initialize cluster metadata
pulsar-init:
container_name: pulsar-init
hostname: pulsar-init
image: apachepulsar/pulsar:${PULSAR_VERSION}
networks:
- pulsar
environment:
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bin/pulsar initialize-cluster-metadata \
--cluster cluster-a \
--zookeeper zookeeper:2181 \
--configuration-store zookeeper:2181 \
--web-service-url http://broker-1:8080 \
--broker-service-url pulsar://broker-1:6650
depends_on:
zookeeper:
condition: service_healthy

# Start bookie
bookie:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: bookie
restart: on-failure
networks:
- pulsar
environment:
- clusterName=cluster-a
- zkServers=zookeeper:2181
- metadataServiceUri=metadata-store:zk:zookeeper:2181
- advertisedAddress=bookie
- BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
depends_on:
zookeeper:
condition: service_healthy
pulsar-init:
condition: service_completed_successfully
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"

proxy:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: proxy
hostname: proxy
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
ports:
- "8080:8080"
- "6650:6650"
depends_on:
broker-1:
condition: service_healthy
broker-2:
condition: service_healthy
command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"

# Start broker 1
broker-1:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-1
hostname: broker-1
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-1
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-1:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
# Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
- loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
- loadBalancerSheddingEnabled=false
- loadBalancerDebugModeEnabled=true
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30

# Start broker 2
broker-2:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-2
hostname: broker-2
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-2
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-2:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
# Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
- loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
- loadBalancerSheddingEnabled=false
- loadBalancerDebugModeEnabled=true
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30
13 changes: 11 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type client struct {
memLimit internal.MemoryLimitController
closeOnce sync.Once
operationTimeout time.Duration
tlsEnabled bool

log log.Logger
}
Expand Down Expand Up @@ -166,6 +167,7 @@ func newClient(options ClientOptions) (Client, error) {
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
operationTimeout: operationTimeout,
tlsEnabled: tlsConfig != nil,
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

Expand All @@ -174,7 +176,7 @@ func newClient(options ClientOptions) (Client, error) {
switch url.Scheme {
case "pulsar", "pulsar+ssl":
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver,
tlsConfig != nil, options.ListenerName, logger, metrics)
c.tlsEnabled, options.ListenerName, logger, metrics)
case "http", "https":
httpClient, err := internal.NewHTTPClient(url, serviceNameResolver, tlsConfig,
operationTimeout, logger, metrics, authProvider)
Expand All @@ -183,7 +185,7 @@ func newClient(options ClientOptions) (Client, error) {
err.Error()))
}
c.lookupService = internal.NewHTTPLookupService(httpClient, url, serviceNameResolver,
tlsConfig != nil, logger, metrics)
c.tlsEnabled, logger, metrics)
default:
return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
Expand Down Expand Up @@ -275,3 +277,10 @@ func (c *client) Close() {
c.lookupService.Close()
})
}

func (c *client) selectServiceURL(brokerServiceURL, brokerServiceURLTLS string) string {
if c.tlsEnabled {
return brokerServiceURLTLS
}
return brokerServiceURL
}
58 changes: 43 additions & 15 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type partitionConsumer struct {

eventsCh chan interface{}
connectedCh chan struct{}
connectClosedCh chan connectionClosed
connectClosedCh chan *connectionClosed
closeCh chan struct{}
clearQueueCh chan func(id *trackingMessageID)

Expand Down Expand Up @@ -326,7 +326,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan connectionClosed, 10),
connectClosedCh: make(chan *connectionClosed, 10),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *trackingMessageID)),
compressionProviders: sync.Map{},
Expand Down Expand Up @@ -370,7 +370,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)

err := pc.grabConn()
err := pc.grabConn("")
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
pc.nackTracker.Close()
Expand Down Expand Up @@ -1358,10 +1358,17 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
return &encCtx
}

func (pc *partitionConsumer) ConnectionClosed() {
func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseConsumer) {
// Trigger reconnection in the consumer goroutine
pc.log.Debug("connection closed and send to connectClosedCh")
pc.connectClosedCh <- connectionClosed{}
var assignedBrokerURL string
if closeConsumer != nil {
assignedBrokerURL = pc.client.selectServiceURL(
closeConsumer.GetAssignedBrokerServiceUrl(), closeConsumer.GetAssignedBrokerServiceUrlTls())
}
pc.connectClosedCh <- &connectionClosed{
assignedBrokerURL: assignedBrokerURL,
}
}

// Flow command gives additional permits to send messages to the consumer.
Expand Down Expand Up @@ -1566,9 +1573,9 @@ func (pc *partitionConsumer) runEventsLoop() {
case <-pc.closeCh:
pc.log.Info("close consumer, exit reconnect")
return
case <-pc.connectClosedCh:
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()
pc.reconnectToBroker(connectionClosed)
}
}
}()
Expand Down Expand Up @@ -1652,7 +1659,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
close(pc.closeCh)
}

func (pc *partitionConsumer) reconnectToBroker() {
func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
var maxRetry int

if pc.options.maxReconnectToBroker == nil {
Expand All @@ -1673,13 +1680,22 @@ func (pc *partitionConsumer) reconnectToBroker() {
return
}

if pc.options.backoffPolicy == nil {
var assignedBrokerURL string

if connectionClosed != nil && connectionClosed.HasURL() {
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the assigned broker just once
} else if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
}

pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
time.Sleep(delayReconnectTime)

// double check
Expand All @@ -1689,7 +1705,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
return
}

err := pc.grabConn()
err := pc.grabConn(assignedBrokerURL)
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
Expand All @@ -1713,13 +1729,25 @@ func (pc *partitionConsumer) reconnectToBroker() {
}
}

func (pc *partitionConsumer) grabConn() error {
lr, err := pc.client.lookupService.Lookup(pc.topic)
func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
if len(brokerServiceURL) == 0 {
lr, err := pc.client.lookupService.Lookup(pc.topic)
if err != nil {
pc.log.WithError(err).Warn("Failed to lookup topic")
return nil, err
}

pc.log.Debug("Lookup result: ", lr)
return lr, err
}
return pc.client.lookupService.GetBrokerAddress(brokerServiceURL, pc._getConn().IsProxied())
}

func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
lr, err := pc.lookupTopic(assignedBrokerURL)
if err != nil {
pc.log.WithError(err).Warn("Failed to lookup topic")
return err
}
pc.log.Debugf("Lookup result: %+v", lr)

subType := toProtoSubType(pc.options.subscriptionType)
initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
Expand Down
Loading

0 comments on commit 86054c5

Please sign in to comment.