Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-254: Support configuring client version #1316

Merged
merged 13 commits into from
Dec 13, 2024
9 changes: 9 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ type ClientOptions struct {
// The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://"
// The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
LookupProperties map[string]string

// Set the description.
// By default, when the client connects to the broker, a version string like "Pulsar Go <version>" will be
// carried and saved by the broker. The client version string could be queried from the topic stats.
// This method provides a way to add more description to a specific PulsarClient instance. If it's configured,
// the description will be appended to the original client version string, with '-' as the separator.
// For example, if the client version is 3.0.0, and the description is "forked", the final client version string
// "Pulsar Go 3.0.0-forked".
Description string
}

// Client represents a pulsar client
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func newClient(options ClientOptions) (Client, error) {

c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
maxConnectionsPerHost, logger, metrics, options.Description, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
Expand Down
41 changes: 41 additions & 0 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -208,6 +212,43 @@ func TestTokenAuth(t *testing.T) {

client.Close()
}
func TestTokenAuthWithClientVersion(t *testing.T) {
token, err := os.ReadFile(tokenFilePath)
assert.NoError(t, err)

client, err := NewClient(ClientOptions{
URL: serviceURL,
Authentication: NewAuthenticationToken(string(token)),
Description: "test-client",
})
assert.NoError(t, err)
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
defer client.Close()

topic := newAuthTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.NoError(t, err)
assert.NotNil(t, producer)

readFile, err := os.ReadFile("../integration-tests/tokens/admin-token")
assert.NoError(t, err)
cfg := &config.Config{
Token: string(readFile),
}
admin, err := admin.New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

topicName, err := utils.GetTopicName(topic)
assert.Nil(t, err)
topicState, err := admin.Topics().GetStats(*topicName)
assert.Nil(t, err)
publisher := topicState.Publishers[0]
assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version"))
assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client"))
}

func TestTokenAuthWithSupplier(t *testing.T) {
client, err := NewClient(ClientOptions{
Expand Down
53 changes: 53 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import (
"os"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"

"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
Expand Down Expand Up @@ -4985,3 +4988,53 @@ func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) {
return true
}, 30*time.Second, 1*time.Second)
}

func TestClientVersion(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

cfg := &config.Config{}
admin, err := admin.New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

topicName, err := utils.GetTopicName(topic)
assert.Nil(t, err)
topicState, err := admin.Topics().GetStats(*topicName)
assert.Nil(t, err)
publisher := topicState.Publishers[0]
assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version"))

topic = newTopicName()
client, err = NewClient(ClientOptions{
URL: lookupURL,
Description: "test-client",
})
assert.Nil(t, err)
producer, err = client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
topicName, err = utils.GetTopicName(topic)
assert.Nil(t, err)
topicState, err = admin.Topics().GetStats(*topicName)
assert.Nil(t, err)
publisher = topicState.Publishers[0]
assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version"))
assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client"))

}
19 changes: 16 additions & 3 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ type connection struct {

keepAliveInterval time.Duration

lastActive time.Time
lastActive time.Time
description string
}

// connectionOptions defines configurations for creating connection.
Expand All @@ -189,6 +190,7 @@ type connectionOptions struct {
logger log.Logger
metrics *Metrics
keepAliveInterval time.Duration
description string
}

func newConnection(opts connectionOptions) *connection {
Expand Down Expand Up @@ -218,6 +220,7 @@ func newConnection(opts connectionOptions) *connection {
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
metrics: opts.metrics,
description: opts.description,
}
cnx.state.Store(int32(connectionInit))
cnx.reader = newConnectionReader(cnx)
Expand Down Expand Up @@ -305,7 +308,7 @@ func (c *connection) doHandshake() bool {
c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
cmdConnect := &pb.CommandConnect{
ProtocolVersion: proto.Int32(PulsarProtocolVersion),
ClientVersion: proto.String(ClientVersionString),
ClientVersion: proto.String(c.getClientVersion()),
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
FeatureFlags: &pb.FeatureFlags{
Expand Down Expand Up @@ -346,6 +349,16 @@ func (c *connection) doHandshake() bool {
return true
}

func (c *connection) getClientVersion() string {
var clientVersion string
if c.description == "" {
clientVersion = ClientVersionString
} else {
clientVersion = fmt.Sprintf("%s-%s", ClientVersionString, c.description)
}
return clientVersion
}

func (c *connection) IsProxied() bool {
return c.logicalAddr.Host != c.physicalAddr.Host
}
Expand Down Expand Up @@ -832,7 +845,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)

cmdAuthResponse := &pb.CommandAuthResponse{
ProtocolVersion: proto.Int32(PulsarProtocolVersion),
ClientVersion: proto.String(ClientVersionString),
ClientVersion: proto.String(c.getClientVersion()),
Response: &pb.AuthData{
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
Expand Down
8 changes: 6 additions & 2 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type connectionPool struct {
keepAliveInterval time.Duration
closeCh chan struct{}

metrics *Metrics
log log.Logger
metrics *Metrics
log log.Logger
description string
}

// NewConnectionPool init connection pool.
Expand All @@ -65,6 +66,7 @@ func NewConnectionPool(
maxConnectionsPerHost int,
logger log.Logger,
metrics *Metrics,
description string,
connectionMaxIdleTime time.Duration) ConnectionPool {
p := &connectionPool{
connections: make(map[string]*connection),
Expand All @@ -76,6 +78,7 @@ func NewConnectionPool(
log: logger,
metrics: metrics,
closeCh: make(chan struct{}),
description: description,
}
go p.checkAndCleanIdleConnections(connectionMaxIdleTime)
return p
Expand Down Expand Up @@ -113,6 +116,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
keepAliveInterval: p.keepAliveInterval,
logger: p.log,
metrics: p.metrics,
description: p.description,
})
p.connections[key] = conn
p.Unlock()
Expand Down
Loading