Skip to content

Commit

Permalink
[improve] Update topic admin interface comment, add topic admin test …
Browse files Browse the repository at this point in the history
…cases
  • Loading branch information
geniusjoe authored and ninjazhou committed Apr 11, 2024
1 parent 393f80b commit 9c19e44
Showing 3 changed files with 376 additions and 25 deletions.
3 changes: 3 additions & 0 deletions integration-tests/conf/standalone.conf
Original file line number Diff line number Diff line change
@@ -83,6 +83,9 @@ maxUnackedMessagesPerConsumer=50000
# Set maxMessageSize to 1MB rather than the default value 5MB for testing
maxMessageSize=1048576

# enable topic level policies to test topic admin functions
topicLevelPoliciesEnabled=true

### --- Authentication --- ###

# Enable TLS
175 changes: 150 additions & 25 deletions pulsaradmin/pkg/admin/topic.go
Original file line number Diff line number Diff line change
@@ -26,16 +26,35 @@ import (

// Topics is admin interface for topics management
type Topics interface {
// Create a topic
Create(utils.TopicName, int) error

// Delete a topic
Delete(utils.TopicName, bool, bool) error
// Create a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
Create(topic utils.TopicName, partitions int) error

// Delete a topic, this function can delete both partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param force
// delete topic forcefully
// @param nonPartitioned
// when set to true, topic will be treated as a non-partitioned topic
// Otherwise it will be treated as a partitioned topic
Delete(topic utils.TopicName, force bool, nonPartitioned bool) error

// Update number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
Update(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param partitions
// number of new partitions of already exist partitioned-topic
Update(topic utils.TopicName, partitions int) error

// GetMetadata returns metadata of a partitioned topic
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
@@ -52,12 +71,24 @@ type Topics interface {
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)

// GrantPermission grants a new permission to a client role on a single topic
GrantPermission(utils.TopicName, string, []utils.AuthAction) error
//
// @param topic
// topicName struct
// @param role
// client role to which grant permission
// @param action
// auth actions (e.g. produce and consume)
GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error

// RevokePermission revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
RevokePermission(utils.TopicName, string) error
//
// @param topic
// topicName struct
// @param role
// client role to which remove permissions
RevokePermission(topic utils.TopicName, role string) error

// Lookup a topic returns the broker URL that serves the topic
Lookup(utils.TopicName) (utils.LookupData, error)
@@ -69,24 +100,52 @@ type Topics interface {
GetLastMessageID(utils.TopicName) (utils.MessageID, error)

// GetMessageID returns the message Id by timestamp(ms) of a topic
GetMessageID(utils.TopicName, int64) (utils.MessageID, error)

// GetStats returns the stats for the topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
//
// @param topic
// topicName struct
// @param timestamp
// absolute timestamp (in ms)
GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error)

// GetStats returns the stats for the topic.
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
GetStats(utils.TopicName) (utils.TopicStats, error)

// GetStatsWithOption returns the stats for the topic
GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error)

// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error)

// GetPartitionedStats returns the stats for the partitioned topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error)

// GetPartitionedStatsWithOption returns the stats for the partitioned topic
GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetPartitionedStatsWithOption(topic utils.TopicName, perPartition bool, option utils.GetStatsOptions) (utils.PartitionedTopicStats, error)

// Terminate the topic and prevent any more messages being published on it
Terminate(utils.TopicName) (utils.MessageID, error)
@@ -111,7 +170,12 @@ type Topics interface {
GetMessageTTL(utils.TopicName) (int, error)

// SetMessageTTL Set the message TTL for a topic
SetMessageTTL(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param messageTTL
// Message TTL in second
SetMessageTTL(topic utils.TopicName, messageTTL int) error

// RemoveMessageTTL Remove the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
@@ -120,7 +184,12 @@ type Topics interface {
GetMaxProducers(utils.TopicName) (int, error)

// SetMaxProducers Set max number of producers for a topic
SetMaxProducers(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxProducers
// max number of producer
SetMaxProducers(topic utils.TopicName, maxProducers int) error

// RemoveMaxProducers Remove max number of producers for a topic
RemoveMaxProducers(utils.TopicName) error
@@ -129,7 +198,12 @@ type Topics interface {
GetMaxConsumers(utils.TopicName) (int, error)

// SetMaxConsumers Set max number of consumers for a topic
SetMaxConsumers(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxConsumers
// max number of consumer
SetMaxConsumers(topic utils.TopicName, maxConsumers int) error

// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error
@@ -138,7 +212,12 @@ type Topics interface {
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on each consumer
SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error

// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
@@ -147,7 +226,12 @@ type Topics interface {
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerSubscription Set max unacked messages policy on subscription for a topic
SetMaxUnackMessagesPerSubscription(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on subscription of a topic
SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error

// RemoveMaxUnackMessagesPerSubscription Remove max unacked messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
@@ -192,30 +276,60 @@ type Topics interface {
GetDeduplicationStatus(utils.TopicName) (bool, error)

// SetDeduplicationStatus Set the deduplication policy for a topic
SetDeduplicationStatus(utils.TopicName, bool) error
//
// @param topic
// topicName struct
// @param enabled
// set enable or disable deduplication of the topic
SetDeduplicationStatus(topic utils.TopicName, enabled bool) error

// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error

// GetRetention returns the retention configuration for a topic
GetRetention(utils.TopicName, bool) (*utils.RetentionPolicies, error)
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)

// RemoveRetention removes the retention configuration on a topic
RemoveRetention(utils.TopicName) error

// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error

// Get the compaction threshold for a topic
// GetCompactionThreshold Get the compaction threshold for a topic.
//
// i.e. The maximum number of bytes can have before compaction is triggered.
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)

// Set the compaction threshold for a topic
// SetCompactionThreshold Set the compaction threshold for a topic
//
// @param topic
// topicName struct
// @param threshold
// maximum number of backlog bytes before compaction is triggered
SetCompactionThreshold(topic utils.TopicName, threshold int64) error

// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error

// GetBacklogQuotaMap returns backlog quota map for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)

// SetBacklogQuota sets a backlog quota for a topic
@@ -225,6 +339,12 @@ type Topics interface {
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error

// GetInactiveTopicPolicies gets the inactive topic policies on a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)

// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
@@ -237,6 +357,11 @@ type Topics interface {
GetReplicationClusters(topic utils.TopicName) ([]string, error)

// SetReplicationClusters sets the replication clusters on a topic
//
// @param topic
// topicName struct
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
}

Loading

0 comments on commit 9c19e44

Please sign in to comment.