Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-307: Use assigned broker URL hints during broker reconnection #1208

Merged
merged 71 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
cb1e2ed
Update CommandCloseProducer and CommandCloseConsumer proto definitions
dragosvictor Nov 7, 2023
bb31f14
Generate proto bindings
dragosvictor Nov 7, 2023
77a7ae5
Update producer close
dragosvictor Nov 7, 2023
7111dca
Use assigned broker URLs during reconnection
dragosvictor Nov 8, 2023
b13063f
Update close consumer
dragosvictor Nov 8, 2023
ab79909
Fix build
dragosvictor Nov 8, 2023
8f8d2f1
Lint
dragosvictor Nov 9, 2023
169f2c6
Fix condition in consumer partition
dragosvictor Nov 9, 2023
c9a5150
Merge remote-tracking branch 'origin/master' into pip-307
dragosvictor Jan 18, 2024
858adcb
Add log info
dragosvictor Mar 4, 2024
06d2dee
Merge remote-tracking branch 'origin/master' into pip-307
dragosvictor Mar 4, 2024
947df48
Merge remote-tracking branch 'origin/master' into pip-307
dragosvictor Mar 14, 2024
425133d
Add cluster test
dragosvictor Mar 17, 2024
aa6bf6d
Revert changes to helper_for_test.go
dragosvictor Mar 19, 2024
cab1284
Simplify test
dragosvictor Mar 20, 2024
4b297f3
Support specifying container platform
dragosvictor Mar 20, 2024
ed648b1
Add proxy test
dragosvictor Mar 21, 2024
c185298
Add TestExtensibleLoadManagerTestSuite
dragosvictor Mar 21, 2024
fef37e6
Debug proxy issues
dragosvictor Mar 21, 2024
8762632
Debug proxy test
dragosvictor Mar 21, 2024
2240fe1
Test clean
dragosvictor Mar 21, 2024
a28750d
Refactor lookup service changes
dragosvictor Mar 21, 2024
14f3d87
Add method Connection.IsProxied
dragosvictor Mar 22, 2024
d409f3f
Include physical address in connection pool key
dragosvictor Mar 22, 2024
6dde5a7
Use proxy to connect producer if enabled
dragosvictor Mar 22, 2024
dcad4a0
Test cleanup
dragosvictor Mar 22, 2024
9849b7c
Update message counts in proxy test
dragosvictor Mar 22, 2024
32410d5
Allow proxy test to run within docker-compose environment
dragosvictor Mar 22, 2024
1d237cc
Test refactor
dragosvictor Mar 22, 2024
fd556c7
Run extensible load manager tests in make test target
dragosvictor Mar 28, 2024
e136b68
Factor out common image build steps
dragosvictor Mar 28, 2024
ce4ee6c
Merge remote-tracking branch 'origin/master' into pip-307
dragosvictor Mar 28, 2024
7582e84
Fix TestRetryWithMultipleHosts
dragosvictor Apr 1, 2024
b45ee51
Add clustered test suites
dragosvictor Apr 8, 2024
fac5301
Add missing copyright notice
dragosvictor Apr 8, 2024
2474077
Revert changes to pulsar-test-service-start.sh
dragosvictor Apr 8, 2024
e06f5bb
Fix build error
dragosvictor Apr 8, 2024
a5e2670
Update clustered test setup
dragosvictor Apr 12, 2024
c79ec00
Add copyright notices
dragosvictor Apr 12, 2024
0e4282e
Lint fixes
dragosvictor Apr 12, 2024
03fed2f
Fix arch name mismatch on x86_64
dragosvictor Apr 12, 2024
df8e929
Copyright fixes
dragosvictor Apr 12, 2024
b3c910d
Run all tests during 'make test' target
dragosvictor Apr 12, 2024
b6da985
Merge branch 'master' into pip-307
dragosvictor Apr 16, 2024
3dbc6c7
Include physical address in connection pool key
dragosvictor Mar 22, 2024
06a4596
Support specifying container platform
dragosvictor Mar 20, 2024
9420cd4
Fix arch name mismatch on x86_64
dragosvictor Apr 12, 2024
0188fc8
Add clustered test suites
dragosvictor Apr 8, 2024
bc76f42
Fix build error
dragosvictor Apr 8, 2024
abef20c
Update clustered test setup
dragosvictor Apr 12, 2024
bec8fc1
Factor out common image build steps
dragosvictor Mar 28, 2024
ef3e376
Run all tests during 'make test' target
dragosvictor Apr 12, 2024
7f66b9d
Merge remote-tracking branch 'origin/master' into fix-connection-pool…
dragosvictor Apr 17, 2024
73e0472
Cosmetic fix
dragosvictor Apr 17, 2024
1122ea3
Fix typo in TestReaderWithMultipleHosts name
dragosvictor Apr 17, 2024
ba0582a
Add missing copyright information
dragosvictor Apr 17, 2024
b33e7bd
Merge branch 'fix-connection-pool-key' into pip-307
dragosvictor Apr 17, 2024
62ddc83
Revert changes to pulsar-test-service-start.sh
dragosvictor Apr 17, 2024
c448cfb
Revert debug changes to connection_pool.go
dragosvictor Apr 17, 2024
05dca55
Use proper delayConnectTime when the assigned broker URL is empty
dragosvictor Apr 17, 2024
67a863b
Merge remote-tracking branch 'origin/master' into pip-307
dragosvictor Apr 18, 2024
89e51fd
Fix merge issue
dragosvictor Apr 18, 2024
b2a39d6
Reuse lookupService.GetBrokerAddress
dragosvictor Apr 18, 2024
700511b
Reuse httpLookupService.GetBrokerAddress
dragosvictor Apr 18, 2024
0e50497
Pass around only one broker URL
dragosvictor Apr 18, 2024
7a92def
Fix linter
dragosvictor Apr 18, 2024
925b98a
Cleanup Makefile debug
dragosvictor Apr 19, 2024
f88da1b
Merge remote-tracking branch 'origin/master' into pip-307
dragosvictor Apr 22, 2024
c10512e
Null check ConnectionClosed parameter in consumer/producer partition
dragosvictor Apr 22, 2024
880d732
Revert to using old protobuf compiler version
dragosvictor Apr 22, 2024
16deb1b
Remove unused field LookupResult.IsProxyThroughServiceURL
dragosvictor Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,23 @@ container:
--build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \
--build-arg ARCH="${CONTAINER_ARCH}" .

test: container test_standalone test_clustered
test: container test_standalone test_clustered test_extensible_load_manager

test_standalone: container
docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh"

test_clustered: container
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d
until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh"
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down

test_extensible_load_manager: container
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/extensible-load-manager/docker-compose.yml up -d
until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
docker run --network "extensible-load-manager_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-extensible-load-manager.sh"
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/extensible-load-manager/docker-compose.yml down

clean:
docker rmi --force $(IMAGE_NAME) || true
rm bin/*
181 changes: 181 additions & 0 deletions integration-tests/extensible-load-manager/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: '3'
networks:
pulsar:
driver: bridge
services:
# Start ZooKeeper
zookeeper:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: zookeeper
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
exec bin/pulsar zookeeper"
healthcheck:
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
interval: 10s
timeout: 5s
retries: 30

# Initialize cluster metadata
pulsar-init:
container_name: pulsar-init
hostname: pulsar-init
image: apachepulsar/pulsar:${PULSAR_VERSION}
networks:
- pulsar
environment:
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bin/pulsar initialize-cluster-metadata \
--cluster cluster-a \
--zookeeper zookeeper:2181 \
--configuration-store zookeeper:2181 \
--web-service-url http://broker-1:8080 \
--broker-service-url pulsar://broker-1:6650
depends_on:
zookeeper:
condition: service_healthy

# Start bookie
bookie:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: bookie
restart: on-failure
networks:
- pulsar
environment:
- clusterName=cluster-a
- zkServers=zookeeper:2181
- metadataServiceUri=metadata-store:zk:zookeeper:2181
- advertisedAddress=bookie
- BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
depends_on:
zookeeper:
condition: service_healthy
pulsar-init:
condition: service_completed_successfully
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"

proxy:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: proxy
hostname: proxy
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
ports:
- "8080:8080"
- "6650:6650"
depends_on:
broker-1:
condition: service_healthy
broker-2:
condition: service_healthy
command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"

# Start broker 1
broker-1:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-1
hostname: broker-1
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-1
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-1:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
# Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
- loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
- loadBalancerSheddingEnabled=false
- loadBalancerDebugModeEnabled=true
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30

# Start broker 2
broker-2:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-2
hostname: broker-2
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-2
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-2:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
# Load Manager. Here uses the extensible load balancer, sets the unloading strategy to TransferShedder, and enables debug mode.
- loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
- loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder
- loadBalancerSheddingEnabled=false
- loadBalancerDebugModeEnabled=true
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30
13 changes: 11 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type client struct {
memLimit internal.MemoryLimitController
closeOnce sync.Once
operationTimeout time.Duration
tlsEnabled bool

log log.Logger
}
Expand Down Expand Up @@ -166,6 +167,7 @@ func newClient(options ClientOptions) (Client, error) {
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
operationTimeout: operationTimeout,
tlsEnabled: tlsConfig != nil,
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

Expand All @@ -174,7 +176,7 @@ func newClient(options ClientOptions) (Client, error) {
switch url.Scheme {
case "pulsar", "pulsar+ssl":
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver,
tlsConfig != nil, options.ListenerName, logger, metrics)
c.tlsEnabled, options.ListenerName, logger, metrics)
case "http", "https":
httpClient, err := internal.NewHTTPClient(url, serviceNameResolver, tlsConfig,
operationTimeout, logger, metrics, authProvider)
Expand All @@ -183,7 +185,7 @@ func newClient(options ClientOptions) (Client, error) {
err.Error()))
}
c.lookupService = internal.NewHTTPLookupService(httpClient, url, serviceNameResolver,
tlsConfig != nil, logger, metrics)
c.tlsEnabled, logger, metrics)
default:
return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
Expand Down Expand Up @@ -275,3 +277,10 @@ func (c *client) Close() {
c.lookupService.Close()
})
}

func (c *client) selectServiceURL(brokerServiceURL, brokerServiceURLTLS string) string {
if c.tlsEnabled {
return brokerServiceURLTLS
}
return brokerServiceURL
}
58 changes: 43 additions & 15 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type partitionConsumer struct {

eventsCh chan interface{}
connectedCh chan struct{}
connectClosedCh chan connectionClosed
connectClosedCh chan *connectionClosed
closeCh chan struct{}
clearQueueCh chan func(id *trackingMessageID)

Expand Down Expand Up @@ -326,7 +326,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan connectionClosed, 10),
connectClosedCh: make(chan *connectionClosed, 10),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *trackingMessageID)),
compressionProviders: sync.Map{},
Expand Down Expand Up @@ -370,7 +370,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)

err := pc.grabConn()
err := pc.grabConn("")
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
pc.nackTracker.Close()
Expand Down Expand Up @@ -1358,10 +1358,17 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
return &encCtx
}

func (pc *partitionConsumer) ConnectionClosed() {
func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseConsumer) {
// Trigger reconnection in the consumer goroutine
pc.log.Debug("connection closed and send to connectClosedCh")
pc.connectClosedCh <- connectionClosed{}
var assignedBrokerURL string
if closeConsumer != nil {
assignedBrokerURL = pc.client.selectServiceURL(
closeConsumer.GetAssignedBrokerServiceUrl(), closeConsumer.GetAssignedBrokerServiceUrlTls())
}
pc.connectClosedCh <- &connectionClosed{
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
assignedBrokerURL: assignedBrokerURL,
}
}

// Flow command gives additional permits to send messages to the consumer.
Expand Down Expand Up @@ -1566,9 +1573,9 @@ func (pc *partitionConsumer) runEventsLoop() {
case <-pc.closeCh:
pc.log.Info("close consumer, exit reconnect")
return
case <-pc.connectClosedCh:
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()
pc.reconnectToBroker(connectionClosed)
}
}
}()
Expand Down Expand Up @@ -1652,7 +1659,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
close(pc.closeCh)
}

func (pc *partitionConsumer) reconnectToBroker() {
func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
var maxRetry int

if pc.options.maxReconnectToBroker == nil {
Expand All @@ -1673,13 +1680,22 @@ func (pc *partitionConsumer) reconnectToBroker() {
return
}

if pc.options.backoffPolicy == nil {
var assignedBrokerURL string

if connectionClosed != nil && connectionClosed.HasURL() {
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the assigned broker just once
} else if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
}

pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
time.Sleep(delayReconnectTime)

// double check
Expand All @@ -1689,7 +1705,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
return
}

err := pc.grabConn()
err := pc.grabConn(assignedBrokerURL)
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
Expand All @@ -1713,13 +1729,25 @@ func (pc *partitionConsumer) reconnectToBroker() {
}
}

func (pc *partitionConsumer) grabConn() error {
lr, err := pc.client.lookupService.Lookup(pc.topic)
func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
if len(brokerServiceURL) == 0 {
lr, err := pc.client.lookupService.Lookup(pc.topic)
if err != nil {
pc.log.WithError(err).Warn("Failed to lookup topic")
return nil, err
}

pc.log.Debug("Lookup result: ", lr)
return lr, err
}
return pc.client.lookupService.GetBrokerAddress(brokerServiceURL, pc._getConn().IsProxied())
}

func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
lr, err := pc.lookupTopic(assignedBrokerURL)
if err != nil {
pc.log.WithError(err).Warn("Failed to lookup topic")
return err
}
pc.log.Debugf("Lookup result: %+v", lr)

subType := toProtoSubType(pc.options.subscriptionType)
initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
Expand Down
Loading
Loading