Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve] Update topic admin interface comment, add topic admin test …
Browse files Browse the repository at this point in the history
…cases
geniusjoe authored and ninjazhou committed Apr 11, 2024
1 parent 393f80b commit 5aaf9cc
Showing 3 changed files with 384 additions and 25 deletions.
3 changes: 3 additions & 0 deletions integration-tests/conf/standalone.conf
Original file line number Diff line number Diff line change
@@ -83,6 +83,9 @@ maxUnackedMessagesPerConsumer=50000
# Set maxMessageSize to 1MB rather than the default value 5MB for testing
maxMessageSize=1048576

# enable topic level policies to test topic admin functions
topicLevelPoliciesEnabled=true

### --- Authentication --- ###

# Enable TLS
179 changes: 154 additions & 25 deletions pulsaradmin/pkg/admin/topic.go
Original file line number Diff line number Diff line change
@@ -26,16 +26,35 @@ import (

// Topics is admin interface for topics management
type Topics interface {
// Create a topic
Create(utils.TopicName, int) error

// Delete a topic
Delete(utils.TopicName, bool, bool) error
// Create a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
Create(topic utils.TopicName, partitions int) error

// Delete a topic, this function can delete both partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param force
// delete topic forcefully
// @param nonPartitioned
// when set to true, topic will be treated as a non-partitioned topic
// Otherwise it will be treated as a partitioned topic
Delete(topic utils.TopicName, force bool, nonPartitioned bool) error

// Update number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
Update(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param partitions
// number of new partitions of already exist partitioned-topic
Update(topic utils.TopicName, partitions int) error

// GetMetadata returns metadata of a partitioned topic
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
@@ -52,12 +71,24 @@ type Topics interface {
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)

// GrantPermission grants a new permission to a client role on a single topic
GrantPermission(utils.TopicName, string, []utils.AuthAction) error
//
// @param topic
// topicName struct
// @param role
// client role to which grant permission
// @param action
// auth actions (e.g. produce and consume)
GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error

// RevokePermission revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
RevokePermission(utils.TopicName, string) error
//
// @param topic
// topicName struct
// @param role
// client role to which remove permissions
RevokePermission(topic utils.TopicName, role string) error

// Lookup a topic returns the broker URL that serves the topic
Lookup(utils.TopicName) (utils.LookupData, error)
@@ -69,24 +100,56 @@ type Topics interface {
GetLastMessageID(utils.TopicName) (utils.MessageID, error)

// GetMessageID returns the message Id by timestamp(ms) of a topic
GetMessageID(utils.TopicName, int64) (utils.MessageID, error)

// GetStats returns the stats for the topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
//
// @param topic
// topicName struct
// @param timestamp
// absolute timestamp (in ms)
GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error)

// GetStats returns the stats for the topic.
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
GetStats(utils.TopicName) (utils.TopicStats, error)

// GetStatsWithOption returns the stats for the topic
GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error)

// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error)

// GetPartitionedStats returns the stats for the partitioned topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error)

// GetPartitionedStatsWithOption returns the stats for the partitioned topic
GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetPartitionedStatsWithOption(
topic utils.TopicName,
perPartition bool,
option utils.GetStatsOptions,
) (utils.PartitionedTopicStats, error)

// Terminate the topic and prevent any more messages being published on it
Terminate(utils.TopicName) (utils.MessageID, error)
@@ -111,7 +174,12 @@ type Topics interface {
GetMessageTTL(utils.TopicName) (int, error)

// SetMessageTTL Set the message TTL for a topic
SetMessageTTL(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param messageTTL
// Message TTL in second
SetMessageTTL(topic utils.TopicName, messageTTL int) error

// RemoveMessageTTL Remove the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
@@ -120,7 +188,12 @@ type Topics interface {
GetMaxProducers(utils.TopicName) (int, error)

// SetMaxProducers Set max number of producers for a topic
SetMaxProducers(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxProducers
// max number of producer
SetMaxProducers(topic utils.TopicName, maxProducers int) error

// RemoveMaxProducers Remove max number of producers for a topic
RemoveMaxProducers(utils.TopicName) error
@@ -129,7 +202,12 @@ type Topics interface {
GetMaxConsumers(utils.TopicName) (int, error)

// SetMaxConsumers Set max number of consumers for a topic
SetMaxConsumers(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxConsumers
// max number of consumer
SetMaxConsumers(topic utils.TopicName, maxConsumers int) error

// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error
@@ -138,7 +216,12 @@ type Topics interface {
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on each consumer
SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error

// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
@@ -147,7 +230,12 @@ type Topics interface {
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerSubscription Set max unacked messages policy on subscription for a topic
SetMaxUnackMessagesPerSubscription(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on subscription of a topic
SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error

// RemoveMaxUnackMessagesPerSubscription Remove max unacked messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
@@ -192,30 +280,60 @@ type Topics interface {
GetDeduplicationStatus(utils.TopicName) (bool, error)

// SetDeduplicationStatus Set the deduplication policy for a topic
SetDeduplicationStatus(utils.TopicName, bool) error
//
// @param topic
// topicName struct
// @param enabled
// set enable or disable deduplication of the topic
SetDeduplicationStatus(topic utils.TopicName, enabled bool) error

// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error

// GetRetention returns the retention configuration for a topic
GetRetention(utils.TopicName, bool) (*utils.RetentionPolicies, error)
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)

// RemoveRetention removes the retention configuration on a topic
RemoveRetention(utils.TopicName) error

// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error

// Get the compaction threshold for a topic
// GetCompactionThreshold Get the compaction threshold for a topic.
//
// i.e. The maximum number of bytes can have before compaction is triggered.
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)

// Set the compaction threshold for a topic
// SetCompactionThreshold Set the compaction threshold for a topic
//
// @param topic
// topicName struct
// @param threshold
// maximum number of backlog bytes before compaction is triggered
SetCompactionThreshold(topic utils.TopicName, threshold int64) error

// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error

// GetBacklogQuotaMap returns backlog quota map for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)

// SetBacklogQuota sets a backlog quota for a topic
@@ -225,6 +343,12 @@ type Topics interface {
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error

// GetInactiveTopicPolicies gets the inactive topic policies on a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)

// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
@@ -237,6 +361,11 @@ type Topics interface {
GetReplicationClusters(topic utils.TopicName) ([]string, error)

// SetReplicationClusters sets the replication clusters on a topic
//
// @param topic
// topicName struct
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
}

227 changes: 227 additions & 0 deletions pulsaradmin/pkg/admin/topic_test.go
Original file line number Diff line number Diff line change
@@ -213,3 +213,230 @@ func TestNonPartitionState(t *testing.T) {
func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}

func TestDeleteNonPartitionedTopic(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 0)
assert.NoError(t, err)
err = admin.Topics().Delete(*topicName, false, true)
assert.NoError(t, err)
topicList, err := admin.Namespaces().GetTopics("public/default")
assert.NoError(t, err)
isTopicExist := false
for _, topicIterator := range topicList {
if topicIterator == topic {
isTopicExist = true
}
}
assert.Equal(t, false, isTopicExist)
}

func TestDeletePartitionedTopic(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 3)
assert.NoError(t, err)
err = admin.Topics().Delete(*topicName, false, false)
assert.NoError(t, err)
topicList, err := admin.Namespaces().GetTopics("public/default")
assert.NoError(t, err)
isTopicExist := false
for _, topicIterator := range topicList {
if topicIterator == topic {
isTopicExist = true
}
}
assert.Equal(t, false, isTopicExist)
}

func TestUpdateTopicPartitions(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 3)
assert.NoError(t, err)
topicMetadata, err := admin.Topics().GetMetadata(*topicName)
assert.NoError(t, err)
assert.Equal(t, 3, topicMetadata.Partitions)

err = admin.Topics().Update(*topicName, 4)
assert.NoError(t, err)
topicMetadata, err = admin.Topics().GetMetadata(*topicName)
assert.NoError(t, err)
assert.Equal(t, 4, topicMetadata.Partitions)
}

func TestGetMessageID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicPartitionZero := topic + "-partition-0"
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
topicPartitionZeroName, err := utils.GetTopicName(topicPartitionZero)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 1)
assert.NoError(t, err)
ctx := context.Background()

// create consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: pulsar.Exclusive,
})
assert.NoError(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.NoError(t, err)
defer producer.Close()
_, err = producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte("hello"),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.NoError(t, err)

// ack message
msg, err := consumer.Receive(ctx)
assert.NoError(t, err)
assert.Equal(t, []byte("hello"), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
err = consumer.Ack(msg)
assert.NoError(t, err)

messageID, err := admin.Topics().GetMessageID(
*topicPartitionZeroName,
msg.PublishTime().Unix()*1000-1000,
)
assert.NoError(t, err)
assert.Equal(t, msg.ID().EntryID(), messageID.EntryID)
assert.Equal(t, msg.ID().LedgerID(), messageID.LedgerID)
assert.Equal(t, int(msg.ID().PartitionIdx()), messageID.PartitionIndex)
}

func TestMessageTTL(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)

messageTTL, err := admin.Topics().GetMessageTTL(*topicName)
assert.NoError(t, err)
assert.Equal(t, 0, messageTTL)
err = admin.Topics().SetMessageTTL(*topicName, 600)
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
messageTTL, err = admin.Topics().GetMessageTTL(*topicName)
return err == nil && messageTTL == 600
},
10*time.Second,
100*time.Millisecond,
)
err = admin.Topics().RemoveMessageTTL(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
messageTTL, err = admin.Topics().GetMessageTTL(*topicName)
return err == nil && messageTTL == 0
},
10*time.Second,
100*time.Millisecond,
)
}

func TestRetention(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
err = admin.Topics().Create(*topicName, 4)
assert.NoError(t, err)

topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName, false)
assert.NoError(t, err)
assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB)
assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes)
err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{
RetentionSizeInMB: 20480,
RetentionTimeInMinutes: 1440,
})
assert.NoError(t, err)
// topic policy is an async operation,
// so we need to wait for a while to get current value
assert.Eventually(
t,
func() bool {
topicRetentionPolicy, err = admin.Topics().GetRetention(*topicName, false)
return err == nil &&
topicRetentionPolicy.RetentionSizeInMB == int64(20480) &&
topicRetentionPolicy.RetentionTimeInMinutes == 1440
},
10*time.Second,
100*time.Millisecond,
)
err = admin.Topics().RemoveRetention(*topicName)
assert.NoError(t, err)
assert.Eventually(
t,
func() bool {
topicRetentionPolicy, err = admin.Topics().GetRetention(*topicName, false)
return err == nil &&
topicRetentionPolicy.RetentionSizeInMB == int64(0) &&
topicRetentionPolicy.RetentionTimeInMinutes == 0
},
10*time.Second,
100*time.Millisecond,
)
}

0 comments on commit 5aaf9cc

Please sign in to comment.