From f26d9546ad4a34e149617ef5849f2c427fd25b5b Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 10 Dec 2024 18:33:17 +0800 Subject: [PATCH 01/12] pip-254 --- pulsar/client.go | 9 ++++++ pulsar/client_impl.go | 2 +- pulsar/client_impl_test.go | 36 +++++++++++++++++++++ pulsar/consumer_test.go | 50 ++++++++++++++++++++++++++++++ pulsar/internal/connection.go | 19 ++++++++++-- pulsar/internal/connection_pool.go | 8 +++-- 6 files changed, 118 insertions(+), 6 deletions(-) diff --git a/pulsar/client.go b/pulsar/client.go index 368b4f3767..ec3438b365 100644 --- a/pulsar/client.go +++ b/pulsar/client.go @@ -171,6 +171,15 @@ type ClientOptions struct { // The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://" // The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface LookupProperties map[string]string + + // Set the description. + // By default, when the client connects to the broker, a version string like "Pulsar Go " will be + // carried and saved by the broker. The client version string could be queried from the topic stats. + // This method provides a way to add more description to a specific PulsarClient instance. If it's configured, + // the description will be appended to the original client version string, with '-' as the separator. + // For example, if the client version is 3.0.0, and the description is "forked", the final client version string + // "Pulsar Go 3.0.0-forked". + Description string } // Client represents a pulsar client diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 08c5616870..0295d1ecf7 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -163,7 +163,7 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, - maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), + maxConnectionsPerHost, logger, metrics, options.Description, connectionMaxIdleTime), log: logger, metrics: metrics, memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 0fdf6be73f..a815a82c51 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -21,6 +21,9 @@ import ( "context" "crypto/tls" "fmt" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "log" "net/http" "net/http/httptest" @@ -208,6 +211,39 @@ func TestTokenAuth(t *testing.T) { client.Close() } +func TestTokenAuthWithClientVersion(t *testing.T) { + token, err := os.ReadFile(tokenFilePath) + assert.NoError(t, err) + + client, err := NewClient(ClientOptions{ + URL: serviceURL, + Authentication: NewAuthenticationToken(string(token)), + Description: "test-client", + }) + assert.NoError(t, err) + + topic := newAuthTopicName() + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + + cfg := &config.Config{} + admin, err := admin.New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + topicState, err := admin.Topics().GetStats(*topicName) + assert.Nil(t, err) + publisher := topicState.Publishers[0] + assert.Contains(t, publisher.ClientVersion, "test-client") + + client.Close() +} func TestTokenAuthWithSupplier(t *testing.T) { client, err := NewClient(ClientOptions{ diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index eae8dd3c2e..e16f86a640 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" "log" "net/http" "os" @@ -4985,3 +4986,52 @@ func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) { return true }, 30*time.Second, 1*time.Second) } + +func TestClientVersion(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + cfg := &config.Config{} + admin, err := admin.New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + topicState, err := admin.Topics().GetStats(*topicName) + assert.Nil(t, err) + publisher := topicState.Publishers[0] + assert.Contains(t, publisher.ClientVersion, "Pulsar Go version") + + topic = newTopicName() + client, err = NewClient(ClientOptions{ + URL: lookupURL, + Description: "test-client", + }) + assert.Nil(t, err) + producer, err = client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + topicName, err = utils.GetTopicName(topic) + assert.Nil(t, err) + topicState, err = admin.Topics().GetStats(*topicName) + assert.Nil(t, err) + publisher = topicState.Publishers[0] + assert.Contains(t, publisher.ClientVersion, "test-client") + +} diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 04a3cc83f9..84c4323d9c 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -176,7 +176,8 @@ type connection struct { keepAliveInterval time.Duration - lastActive time.Time + lastActive time.Time + description string } // connectionOptions defines configurations for creating connection. @@ -189,6 +190,7 @@ type connectionOptions struct { logger log.Logger metrics *Metrics keepAliveInterval time.Duration + description string } func newConnection(opts connectionOptions) *connection { @@ -218,6 +220,7 @@ func newConnection(opts connectionOptions) *connection { listeners: make(map[uint64]ConnectionListener), consumerHandlers: make(map[uint64]ConsumerHandler), metrics: opts.metrics, + description: opts.description, } cnx.state.Store(int32(connectionInit)) cnx.reader = newConnectionReader(cnx) @@ -305,7 +308,7 @@ func (c *connection) doHandshake() bool { c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval)) cmdConnect := &pb.CommandConnect{ ProtocolVersion: proto.Int32(PulsarProtocolVersion), - ClientVersion: proto.String(ClientVersionString), + ClientVersion: proto.String(c.getClientVersion()), AuthMethodName: proto.String(c.auth.Name()), AuthData: authData, FeatureFlags: &pb.FeatureFlags{ @@ -346,6 +349,16 @@ func (c *connection) doHandshake() bool { return true } +func (c *connection) getClientVersion() string { + var clientVersion string + if c.description == "" { + clientVersion = ClientVersionString + } else { + clientVersion = fmt.Sprintf("%s-%s", ClientVersionString, c.description) + } + return clientVersion +} + func (c *connection) IsProxied() bool { return c.logicalAddr.Host != c.physicalAddr.Host } @@ -832,7 +845,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) cmdAuthResponse := &pb.CommandAuthResponse{ ProtocolVersion: proto.Int32(PulsarProtocolVersion), - ClientVersion: proto.String(ClientVersionString), + ClientVersion: proto.String(c.getClientVersion()), Response: &pb.AuthData{ AuthMethodName: proto.String(c.auth.Name()), AuthData: authData, diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 5f858b1182..ee583825a9 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -52,8 +52,9 @@ type connectionPool struct { keepAliveInterval time.Duration closeCh chan struct{} - metrics *Metrics - log log.Logger + metrics *Metrics + log log.Logger + description string } // NewConnectionPool init connection pool. @@ -65,6 +66,7 @@ func NewConnectionPool( maxConnectionsPerHost int, logger log.Logger, metrics *Metrics, + description string, connectionMaxIdleTime time.Duration) ConnectionPool { p := &connectionPool{ connections: make(map[string]*connection), @@ -76,6 +78,7 @@ func NewConnectionPool( log: logger, metrics: metrics, closeCh: make(chan struct{}), + description: description, } go p.checkAndCleanIdleConnections(connectionMaxIdleTime) return p @@ -113,6 +116,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U keepAliveInterval: p.keepAliveInterval, logger: p.log, metrics: p.metrics, + description: p.description, }) p.connections[key] = conn p.Unlock() From 6704f8cd069a05c23731117ad9b67c9bd8a093ea Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 10 Dec 2024 18:53:02 +0800 Subject: [PATCH 02/12] lint --- pulsar/consumer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index e16f86a640..378ccfefa1 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" "log" "net/http" "os" @@ -32,6 +31,8 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" + "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" From a6d603e0f5d8167e3c1215616d4125fd4039586d Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 10 Dec 2024 18:55:47 +0800 Subject: [PATCH 03/12] lint --- pulsar/client_impl_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index a815a82c51..bcff23aa82 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -21,9 +21,6 @@ import ( "context" "crypto/tls" "fmt" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" - "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "log" "net/http" "net/http/httptest" @@ -32,6 +29,10 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/stretchr/testify/assert" From aa9cdfcd13d76b72088726bf1119d893b0d0f611 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 10 Dec 2024 23:13:13 +0800 Subject: [PATCH 04/12] admin token --- pulsar/client_impl_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index bcff23aa82..4e728825b2 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -231,7 +231,9 @@ func TestTokenAuthWithClientVersion(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, producer) - cfg := &config.Config{} + cfg := &config.Config{ + Token: string(token), + } admin, err := admin.New(cfg) assert.NoError(t, err) assert.NotNil(t, admin) From 3b7297c74c71acbbd1632054802a42765ac36ff8 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 10 Dec 2024 23:50:28 +0800 Subject: [PATCH 05/12] admin token --- pulsar/client_impl_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 4e728825b2..393ca22cf6 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -231,8 +231,10 @@ func TestTokenAuthWithClientVersion(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, producer) + readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token") + assert.NoError(t, err) cfg := &config.Config{ - Token: string(token), + Token: string(readFile), } admin, err := admin.New(cfg) assert.NoError(t, err) From f244726fd7b489c561b8c60b0d0f778d855003f6 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 11 Dec 2024 00:01:25 +0800 Subject: [PATCH 06/12] admin token --- pulsar/client_impl_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 393ca22cf6..9faba43712 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -231,7 +231,7 @@ func TestTokenAuthWithClientVersion(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, producer) - readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token") + readFile, err := os.ReadFile("../integration-tests/tokens/admin-token") assert.NoError(t, err) cfg := &config.Config{ Token: string(readFile), From d3075a57b4151aa9e301eb7fba420507e54874c0 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 13 Dec 2024 13:51:25 +0800 Subject: [PATCH 07/12] Update pulsar/client_impl_test.go Co-authored-by: Zixuan Liu --- pulsar/client_impl_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 9faba43712..3b43019648 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -245,7 +245,8 @@ func TestTokenAuthWithClientVersion(t *testing.T) { topicState, err := admin.Topics().GetStats(*topicName) assert.Nil(t, err) publisher := topicState.Publishers[0] - assert.Contains(t, publisher.ClientVersion, "test-client") + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) client.Close() } From c7bec1af0bde344eebdf315b952c9c588f16a41f Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 13 Dec 2024 13:51:44 +0800 Subject: [PATCH 08/12] Update pulsar/client_impl_test.go Co-authored-by: Zixuan Liu --- pulsar/client_impl_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 3b43019648..1e3cdff84b 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -222,6 +222,7 @@ func TestTokenAuthWithClientVersion(t *testing.T) { Description: "test-client", }) assert.NoError(t, err) + defer client.Close() topic := newAuthTopicName() producer, err := client.CreateProducer(ProducerOptions{ From bcd0c7c72371e57a007d6e78710be8a1f604c5c5 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 13 Dec 2024 13:51:57 +0800 Subject: [PATCH 09/12] Update pulsar/consumer_test.go Co-authored-by: Zixuan Liu --- pulsar/consumer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 378ccfefa1..41697b04c0 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5033,6 +5033,7 @@ func TestClientVersion(t *testing.T) { topicState, err = admin.Topics().GetStats(*topicName) assert.Nil(t, err) publisher = topicState.Publishers[0] - assert.Contains(t, publisher.ClientVersion, "test-client") + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) } From 37af84fa23aaa24a4861f454fc81858761ef863c Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 13 Dec 2024 13:52:09 +0800 Subject: [PATCH 10/12] Update pulsar/consumer_test.go Co-authored-by: Zixuan Liu --- pulsar/consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 41697b04c0..905bd91408 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5015,7 +5015,7 @@ func TestClientVersion(t *testing.T) { topicState, err := admin.Topics().GetStats(*topicName) assert.Nil(t, err) publisher := topicState.Publishers[0] - assert.Contains(t, publisher.ClientVersion, "Pulsar Go version") + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) topic = newTopicName() client, err = NewClient(ClientOptions{ From aea803cd91404fa948b870cc1be89773e26dfc9a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 13 Dec 2024 13:52:37 +0800 Subject: [PATCH 11/12] Update pulsar/client_impl_test.go Co-authored-by: Zixuan Liu --- pulsar/client_impl_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 1e3cdff84b..14df8fb2d5 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -248,8 +248,6 @@ func TestTokenAuthWithClientVersion(t *testing.T) { publisher := topicState.Publishers[0] assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) - - client.Close() } func TestTokenAuthWithSupplier(t *testing.T) { From e3e3246f9e039026af200e2a907c7ade4120e465 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 13 Dec 2024 16:06:55 +0800 Subject: [PATCH 12/12] fix lint --- pulsar/client_impl_test.go | 2 +- pulsar/consumer_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 14df8fb2d5..0896ec4f97 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -246,7 +246,7 @@ func TestTokenAuthWithClientVersion(t *testing.T) { topicState, err := admin.Topics().GetStats(*topicName) assert.Nil(t, err) publisher := topicState.Publishers[0] - assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 905bd91408..3445f42543 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -26,6 +26,7 @@ import ( "os" "regexp" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -5033,7 +5034,7 @@ func TestClientVersion(t *testing.T) { topicState, err = admin.Topics().GetStats(*topicName) assert.Nil(t, err) publisher = topicState.Publishers[0] - assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) - assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) }