Skip to content

Commit

Permalink
fix namespace config drift (#137)
Browse files Browse the repository at this point in the history
* fix namespace config drift

* fix ci

* fix ci

* fix ci

* stash

* revert changes

* fix lint

* fix test cases

* fix sink test case
  • Loading branch information
freeznet authored Jan 10, 2025
1 parent 369c1d6 commit 8dac191
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 53 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ require (
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
95 changes: 44 additions & 51 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,9 @@ func resourcePulsarNamespace() *schema.Resource {
Set: hashBacklogQuotaSubset(),
},
"namespace_config": {
Type: schema.TypeSet,
Type: schema.TypeList,
Optional: true,
Description: descriptions["namespace_config"],
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"anti_affinity": {
Expand All @@ -156,30 +155,31 @@ func resourcePulsarNamespace() *schema.Resource {
"max_consumers_per_subscription": {
Type: schema.TypeInt,
Optional: true,
Default: -1,
Default: 0,
ValidateFunc: validateGtEq0,
},
"max_consumers_per_topic": {
Type: schema.TypeInt,
Optional: true,
Default: -1,
Default: 0,
ValidateFunc: validateGtEq0,
},
"max_producers_per_topic": {
Type: schema.TypeInt,
Optional: true,
Default: -1,
Default: 0,
ValidateFunc: validateGtEq0,
},
"message_ttl_seconds": {
Type: schema.TypeInt,
Optional: true,
Default: -1,
Default: 0,
ValidateFunc: validateGtEq0,
},
"replication_clusters": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MinItems: 1,
Elem: &schema.Schema{
Type: schema.TypeString,
Expand All @@ -204,12 +204,11 @@ func resourcePulsarNamespace() *schema.Resource {
"offload_threshold_size_in_mb": {
Type: schema.TypeInt,
Optional: true,
Default: -1,
Default: 0,
ValidateFunc: validateGtEq0,
},
},
},
Set: namespaceConfigToHash,
},
"persistence_policies": {
Type: schema.TypeSet,
Expand Down Expand Up @@ -332,76 +331,87 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
_ = d.Set("namespace", namespace)
_ = d.Set("tenant", tenant)

if namespaceConfig, ok := d.GetOk("namespace_config"); ok && namespaceConfig.(*schema.Set).Len() > 0 {
if _, ok := d.GetOk("namespace_config"); ok {
var namespaceConfig = make(map[string]interface{})
afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err))
} else {
namespaceConfig["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"")
}

maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err))
} else {
namespaceConfig["max_consumers_per_subscription"] = maxConsPerSub
}

maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err))
} else {
namespaceConfig["max_consumers_per_topic"] = maxConsPerTopic
}

maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err))
} else {
namespaceConfig["max_producers_per_topic"] = maxProdPerTopic
}

messageTTL, err := client.GetNamespaceMessageTTL(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err))
} else {
namespaceConfig["message_ttl_seconds"] = messageTTL
}

schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err))
} else {
namespaceConfig["schema_validation_enforce"] = schemaValidationEnforce
}

schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err))
if !strings.Contains(err.Error(), "Invalid auth strategy") {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err))
}
} else {
namespaceConfig["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String()
}

replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err))
}

replClusters := make([]interface{}, len(replClustersRaw))
for i, cl := range replClustersRaw {
replClusters[i] = cl
} else {
replClusters := make([]interface{}, len(replClustersRaw))
for i, cl := range replClustersRaw {
replClusters[i] = cl
}
namespaceConfig["replication_clusters"] = replClusters
}

isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err))
} else {
namespaceConfig["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema
}

offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err))
} else {
namespaceConfig["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb)
}

_ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{
map[string]interface{}{
"anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""),
"max_consumers_per_subscription": maxConsPerSub,
"max_consumers_per_topic": maxConsPerTopic,
"max_producers_per_topic": maxProdPerTopic,
"message_ttl_seconds": messageTTL,
"replication_clusters": replClusters,
"schema_validation_enforce": schemaValidationEnforce,
"schema_compatibility_strategy": schemaCompatibilityStrategy.String(),
"is_allow_auto_update_schema": isAllowAutoUpdateSchema,
"offload_threshold_size_in_mb": int(offloadTresholdSizeInMb),
},
}))
_ = d.Set("namespace_config", []interface{}{
namespaceConfig,
})
}

if persPoliciesCfg, ok := d.GetOk("persistence_policies"); ok && persPoliciesCfg.(*schema.Set).Len() > 0 {
Expand Down Expand Up @@ -518,7 +528,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
namespace := d.Get("namespace").(string)
tenant := d.Get("tenant").(string)
enableDeduplication, deduplicationDefined := d.GetOk("enable_deduplication")
namespaceConfig := d.Get("namespace_config").(*schema.Set)
namespaceConfig := d.Get("namespace_config").([]interface{})
retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set)
backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set)
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
Expand All @@ -534,8 +544,8 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,

var errs error

if namespaceConfig.Len() > 0 {
nsCfg := unmarshalNamespaceConfig(namespaceConfig)
if len(namespaceConfig) > 0 {
nsCfg := unmarshalNamespaceConfigList(namespaceConfig)

if len(nsCfg.AntiAffinity) > 0 {
if err = client.SetNamespaceAntiAffinityGroup(nsName.String(), nsCfg.AntiAffinity); err != nil {
Expand Down Expand Up @@ -746,23 +756,6 @@ func retentionPoliciesToHash(v interface{}) int {
return hashcode.String(buf.String())
}

func namespaceConfigToHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

buf.WriteString(fmt.Sprintf("%s-", m["anti_affinity"].(string)))
buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_subscription"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_topic"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["max_producers_per_topic"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["message_ttl_seconds"].(int)))
buf.WriteString(fmt.Sprintf("%s-", m["replication_clusters"].([]interface{})))
buf.WriteString(fmt.Sprintf("%t-", m["schema_validation_enforce"].(bool)))
buf.WriteString(fmt.Sprintf("%s-", m["schema_compatibility_strategy"].(string)))
buf.WriteString(fmt.Sprintf("%d-", m["offload_threshold_size_in_mb"].(int)))

return hashcode.String(buf.String())
}

func persistencePoliciesToHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})
Expand Down Expand Up @@ -819,10 +812,10 @@ func unmarshalRetentionPolicies(v *schema.Set) *utils.RetentionPolicies {
return &rtnPolicies
}

func unmarshalNamespaceConfig(v *schema.Set) *types.NamespaceConfig {
func unmarshalNamespaceConfigList(v []interface{}) *types.NamespaceConfig {
var nsConfig types.NamespaceConfig

for _, ns := range v.List() {
for _, ns := range v {
data := ns.(map[string]interface{})
rplClusters := data["replication_clusters"].([]interface{})

Expand Down
44 changes: 44 additions & 0 deletions pulsar/resource_pulsar_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) {
resource.TestCheckNoResourceAttr(resourceName, "enable_deduplication"),
resource.TestCheckNoResourceAttr(resourceName, "permission_grant.#"),
),
},
{
Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName),
PlanOnly: true,
ExpectNonEmptyPlan: false,
},
{
Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName),
PlanOnly: true,
ExpectNonEmptyPlan: true,
},
},
Expand Down Expand Up @@ -391,6 +400,41 @@ func TestNamespaceExternallyRemoved(t *testing.T) {
})
}

func TestNamespaceWithUndefinedOptionalsDrift(t *testing.T) {

resourceName := "pulsar_namespace.test"
cName := acctest.RandString(10)
tName := acctest.RandString(10)
nsName := acctest.RandString(10)

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
IDRefreshName: resourceName,
CheckDestroy: testPulsarNamespaceDestroy,
Steps: []resource.TestStep{
{
Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
),
},
{
Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
),
ExpectNonEmptyPlan: false,
},
{
Config: testPulsarNamespaceWithUndefinedOptionalsInNsConf(testWebServiceURL, cName, tName, nsName),
PlanOnly: true,
ExpectNonEmptyPlan: false,
},
},
})
}

func createNamespace(t *testing.T, id string) {
client, err := sharedClient(testWebServiceURL)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pulsar/resource_pulsar_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func TestSinkUpdate(t *testing.T) {
t.Fatal(err)
}
configString := string(configBytes)
configString = strings.ReplaceAll(configString, "sink-1", "update-sink-test-1")
newName := "sink" + acctest.RandString(10)
configString = strings.ReplaceAll(configString, "sink-1", newName)

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Expand All @@ -265,7 +266,7 @@ func TestSinkUpdate(t *testing.T) {
{
Config: configString,
Check: resource.ComposeTestCheckFunc(func(s *terraform.State) error {
name := "pulsar_sink.update-sink-test-1"
name := "pulsar_sink." + newName
rs, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("%s not be found", name)
Expand Down

0 comments on commit 8dac191

Please sign in to comment.