Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Update topic admin interface comment, add topic admin test … #1202

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integration-tests/conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ maxUnackedMessagesPerConsumer=50000
# Set maxMessageSize to 1MB rather than the default value 5MB for testing
maxMessageSize=1048576

# enable topic level policies to test topic admin functions
topicLevelPoliciesEnabled=true

### --- Authentication --- ###

# Enable TLS
Expand Down
179 changes: 154 additions & 25 deletions pulsaradmin/pkg/admin/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,35 @@ import (

// Topics is admin interface for topics management
type Topics interface {
// Create a topic
Create(utils.TopicName, int) error

// Delete a topic
Delete(utils.TopicName, bool, bool) error
// Create a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
Create(topic utils.TopicName, partitions int) error

// Delete a topic, this function can delete both partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param force
// delete topic forcefully
// @param nonPartitioned
// when set to true, topic will be treated as a non-partitioned topic
// Otherwise it will be treated as a partitioned topic
Delete(topic utils.TopicName, force bool, nonPartitioned bool) error

// Update number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
Update(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param partitions
// number of new partitions of already exist partitioned-topic
Update(topic utils.TopicName, partitions int) error

// GetMetadata returns metadata of a partitioned topic
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
Expand All @@ -52,12 +71,24 @@ type Topics interface {
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)

// GrantPermission grants a new permission to a client role on a single topic
GrantPermission(utils.TopicName, string, []utils.AuthAction) error
//
// @param topic
// topicName struct
// @param role
// client role to which grant permission
// @param action
// auth actions (e.g. produce and consume)
GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error

// RevokePermission revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
RevokePermission(utils.TopicName, string) error
//
// @param topic
// topicName struct
// @param role
// client role to which remove permissions
RevokePermission(topic utils.TopicName, role string) error

// Lookup a topic returns the broker URL that serves the topic
Lookup(utils.TopicName) (utils.LookupData, error)
Expand All @@ -69,24 +100,56 @@ type Topics interface {
GetLastMessageID(utils.TopicName) (utils.MessageID, error)

// GetMessageID returns the message Id by timestamp(ms) of a topic
GetMessageID(utils.TopicName, int64) (utils.MessageID, error)

// GetStats returns the stats for the topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
//
// @param topic
// topicName struct
// @param timestamp
// absolute timestamp (in ms)
GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error)

// GetStats returns the stats for the topic.
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
GetStats(utils.TopicName) (utils.TopicStats, error)

// GetStatsWithOption returns the stats for the topic
GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error)

// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error)

// GetPartitionedStats returns the stats for the partitioned topic
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error)

// GetPartitionedStatsWithOption returns the stats for the partitioned topic
GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetPartitionedStatsWithOption(
topic utils.TopicName,
perPartition bool,
option utils.GetStatsOptions,
) (utils.PartitionedTopicStats, error)

// Terminate the topic and prevent any more messages being published on it
Terminate(utils.TopicName) (utils.MessageID, error)
Expand All @@ -111,7 +174,12 @@ type Topics interface {
GetMessageTTL(utils.TopicName) (int, error)

// SetMessageTTL Set the message TTL for a topic
SetMessageTTL(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param messageTTL
// Message TTL in second
SetMessageTTL(topic utils.TopicName, messageTTL int) error

// RemoveMessageTTL Remove the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
Expand All @@ -120,7 +188,12 @@ type Topics interface {
GetMaxProducers(utils.TopicName) (int, error)

// SetMaxProducers Set max number of producers for a topic
SetMaxProducers(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxProducers
// max number of producer
SetMaxProducers(topic utils.TopicName, maxProducers int) error

// RemoveMaxProducers Remove max number of producers for a topic
RemoveMaxProducers(utils.TopicName) error
Expand All @@ -129,7 +202,12 @@ type Topics interface {
GetMaxConsumers(utils.TopicName) (int, error)

// SetMaxConsumers Set max number of consumers for a topic
SetMaxConsumers(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxConsumers
// max number of consumer
SetMaxConsumers(topic utils.TopicName, maxConsumers int) error

// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error
Expand All @@ -138,7 +216,12 @@ type Topics interface {
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on each consumer
SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error

// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
Expand All @@ -147,7 +230,12 @@ type Topics interface {
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerSubscription Set max unacked messages policy on subscription for a topic
SetMaxUnackMessagesPerSubscription(utils.TopicName, int) error
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on subscription of a topic
SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error

// RemoveMaxUnackMessagesPerSubscription Remove max unacked messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
Expand Down Expand Up @@ -192,30 +280,60 @@ type Topics interface {
GetDeduplicationStatus(utils.TopicName) (bool, error)

// SetDeduplicationStatus Set the deduplication policy for a topic
SetDeduplicationStatus(utils.TopicName, bool) error
//
// @param topic
// topicName struct
// @param enabled
// set enable or disable deduplication of the topic
SetDeduplicationStatus(topic utils.TopicName, enabled bool) error

// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error

// GetRetention returns the retention configuration for a topic
GetRetention(utils.TopicName, bool) (*utils.RetentionPolicies, error)
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)

// RemoveRetention removes the retention configuration on a topic
RemoveRetention(utils.TopicName) error

// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error

// Get the compaction threshold for a topic
// GetCompactionThreshold Get the compaction threshold for a topic.
//
// i.e. The maximum number of bytes can have before compaction is triggered.
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)

// Set the compaction threshold for a topic
// SetCompactionThreshold Set the compaction threshold for a topic
//
// @param topic
// topicName struct
// @param threshold
// maximum number of backlog bytes before compaction is triggered
SetCompactionThreshold(topic utils.TopicName, threshold int64) error

// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error

// GetBacklogQuotaMap returns backlog quota map for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)

// SetBacklogQuota sets a backlog quota for a topic
Expand All @@ -225,6 +343,12 @@ type Topics interface {
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error

// GetInactiveTopicPolicies gets the inactive topic policies on a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)

// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
Expand All @@ -237,6 +361,11 @@ type Topics interface {
GetReplicationClusters(topic utils.TopicName) ([]string, error)

// SetReplicationClusters sets the replication clusters on a topic
//
// @param topic
// topicName struct
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
}

Expand Down
Loading
Loading