Skip to content

Commit

Permalink
add support of entity-default kafka quotas
Browse files Browse the repository at this point in the history
  • Loading branch information
azhurbilo committed Jun 13, 2022
1 parent b77f249 commit 846e6a2
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 11 deletions.
2 changes: 1 addition & 1 deletion kafka/kafka_acls.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (c *Client) ListACLs() ([]*sarama.ResourceAcls, error) {
if err != nil {
return nil, err
}

log.Printf("[TRACE] ThrottleTime: %d", aclsR.ThrottleTime)

if err == nil {
Expand Down
34 changes: 26 additions & 8 deletions kafka/kafka_quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,19 @@ func (c *Client) AlterQuota(quota Quota, validateOnly bool) error {
return err
}

entity := sarama.QuotaEntityComponent{
EntityType: sarama.QuotaEntityType(quota.EntityType),
MatchType: sarama.QuotaMatchExact,
Name: quota.EntityName,
var entity sarama.QuotaEntityComponent

if quota.EntityName == "" {
entity = sarama.QuotaEntityComponent{
EntityType: sarama.QuotaEntityType(quota.EntityType),
MatchType: sarama.QuotaMatchDefault,
}
} else {
entity = sarama.QuotaEntityComponent{
EntityType: sarama.QuotaEntityType(quota.EntityType),
MatchType: sarama.QuotaMatchExact,
Name: quota.EntityName,
}
}

configs := quota.Ops
Expand Down Expand Up @@ -92,10 +101,19 @@ func (c *Client) DescribeQuota(entityType string, entityName string) (*Quota, er
return nil, err
}

entity := sarama.QuotaFilterComponent{
EntityType: sarama.QuotaEntityType(entityType),
MatchType: sarama.QuotaMatchExact,
Match: entityName,
var entity sarama.QuotaFilterComponent

if entityName == "" {
entity = sarama.QuotaFilterComponent{
EntityType: sarama.QuotaEntityType(entityType),
MatchType: sarama.QuotaMatchDefault,
}
} else {
entity = sarama.QuotaFilterComponent{
EntityType: sarama.QuotaEntityType(entityType),
MatchType: sarama.QuotaMatchExact,
Match: entityName,
}
}

request := &sarama.DescribeClientQuotasRequest{
Expand Down
4 changes: 2 additions & 2 deletions kafka/resource_kafka_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func kafkaQuotaResource() *schema.Resource {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: "The name of the entity",
Description: "The name of the entity (if entity_name is empty string, it will create default-entity Kafka quota)",
},
"entity_type": {
Type: schema.TypeString,
Expand Down Expand Up @@ -74,7 +74,7 @@ func quotaDelete(ctx context.Context, d *schema.ResourceData, meta interface{})
func quotaRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
log.Println("[INFO] Reading Quota")
c := meta.(*LazyClient)

entityType := d.Get("entity_type").(string)
entityName := d.Get("entity_name").(string)
log.Printf("[INFO] Reading Quota %s", entityName)
Expand Down
38 changes: 38 additions & 0 deletions kafka/resource_kafka_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,44 @@ func TestAcc_QuotaConfigUpdate(t *testing.T) {
})
}

func TestAcc_DefaultEntityBasicQuota(t *testing.T) {
emptyEntityName := ""
bs := testBootstrapServers[0]

r.Test(t, r.TestCase{
ProviderFactories: overrideProviderFactory(),
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: testAccCheckQuotaDestroy,
Steps: []r.TestStep{
{
Config: cfgs(t, bs, fmt.Sprintf(testResourceQuota1, emptyEntityName, "4000000")),
Check: testResourceQuota_initialCheck,
},
},
})
}

func TestAcc_DefaultEntityQuotaConfigUpdate(t *testing.T) {
emptyEntityName := ""
bs := testBootstrapServers[0]

r.Test(t, r.TestCase{
ProviderFactories: overrideProviderFactory(),
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: testAccCheckQuotaDestroy,
Steps: []r.TestStep{
{
Config: cfg(t, bs, fmt.Sprintf(testResourceQuota1, emptyEntityName, "4000000")),
Check: testResourceQuota_initialCheck,
},
{
Config: cfg(t, bs, fmt.Sprintf(testResourceQuota1, emptyEntityName, "3000000")),
Check: testResourceQuota_updateCheck,
},
},
})
}

func testResourceQuota_initialCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["kafka_quota.test1"]
if resourceState == nil {
Expand Down

0 comments on commit 846e6a2

Please sign in to comment.