diff --git a/README.md b/README.md index bf691db..4e0815f 100644 --- a/README.md +++ b/README.md @@ -270,18 +270,19 @@ resource "pulsar_namespace" "test" { namespace_config nested schema -| Property | Description | Required | -| -------------------------------- | ---------------------------------------------- | -------- | -| `anti_affinity` | Anti-affinity group name | No | -| `is_allow_auto_update_schema` | Is schema auto-update allowed | No | -| `max_consumers_per_subscription` | Sets the max consumers per subscription | No | -| `max_consumers_per_topic` | Sets the max consumers per topic | No | -| `max_producers_per_topic` | Sets the max producers per topic | No | -| `message_ttl_seconds` | Sets the message TTL in seconds | No | -| `replication_clusters` | List of replication clusters for the namespace | No | -| `schema_compatibility_strategy` | Set schema compatibility strategy | No | -| `schema_validation_enforce` | Enable or disable schema validation | No | -| `offload_threshold_size_in_mb` | Set topic offload threshold size in MB | No | +| Property | Description | Required | +|----------------------------------------|--------------------------------------------------|----------| +| `anti_affinity` | Anti-affinity group name | No | +| `is_allow_auto_update_schema` | Is schema auto-update allowed | No | +| `max_consumers_per_subscription` | Sets the max consumers per subscription | No | +| `max_consumers_per_topic` | Sets the max consumers per topic | No | +| `max_producers_per_topic` | Sets the max producers per topic | No | +| `message_ttl_seconds` | Sets the message TTL in seconds | No | +| `offload_threshold_size_in_mb` | Set topic offload threshold size in MB | No | +| `replication_clusters` | List of replication clusters for the namespace | No | +| `schema_compatibility_strategy` | Set schema compatibility strategy | No | +| `schema_validation_enforce` | Enable or disable schema validation | No | +| `subscription_expiration_time_minutes` | Sets the subscription expiration time in minutes | No | The `schema_compatibility_strategy` can take the following values: diff --git a/docs/resources/namespace.md b/docs/resources/namespace.md index 36de9bd..800221e 100644 --- a/docs/resources/namespace.md +++ b/docs/resources/namespace.md @@ -83,6 +83,7 @@ Optional: - `replication_clusters` (List of String) - `schema_compatibility_strategy` (String) - `schema_validation_enforce` (Boolean) +- `subscription_expiration_time_minutes` (Number) diff --git a/examples/namespaces/main.tf b/examples/namespaces/main.tf index 4d45735..56db108 100644 --- a/examples/namespaces/main.tf +++ b/examples/namespaces/main.tf @@ -49,13 +49,13 @@ resource "pulsar_namespace" "test" { namespace = "eternals" namespace_config { - anti_affinity = "anti-aff" - max_consumers_per_subscription = "50" - max_consumers_per_topic = "50" - max_producers_per_topic = "50" - message_ttl_seconds = "86400" - replication_clusters = [ - "standalone"] + anti_affinity = "anti-aff" + max_consumers_per_subscription = "50" + max_consumers_per_topic = "50" + max_producers_per_topic = "50" + message_ttl_seconds = "86400" + replication_clusters = ["standalone"] + subscription_expiration_time_minutes = 90 } dispatch_rate { diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index fd07e29..a098658 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -152,6 +152,11 @@ func resourcePulsarNamespace() *schema.Resource { Optional: true, ValidateFunc: validateNotBlank, }, + "is_allow_auto_update_schema": { + Type: schema.TypeBool, + Optional: true, + Default: true, + }, "max_consumers_per_subscription": { Type: schema.TypeInt, Optional: true, @@ -176,6 +181,12 @@ func resourcePulsarNamespace() *schema.Resource { Default: 0, ValidateFunc: validateGtEq0, }, + "offload_threshold_size_in_mb": { + Type: schema.TypeInt, + Optional: true, + Default: -1, + ValidateFunc: validateGtEq0, + }, "replication_clusters": { Type: schema.TypeList, Optional: true, @@ -185,23 +196,18 @@ func resourcePulsarNamespace() *schema.Resource { Type: schema.TypeString, }, }, - "schema_validation_enforce": { - Type: schema.TypeBool, - Optional: true, - Default: false, - }, "schema_compatibility_strategy": { Type: schema.TypeString, Optional: true, Default: "Full", ValidateFunc: validateNotBlank, }, - "is_allow_auto_update_schema": { + "schema_validation_enforce": { Type: schema.TypeBool, Optional: true, - Default: true, + Default: false, }, - "offload_threshold_size_in_mb": { + "subscription_expiration_time_minutes": { Type: schema.TypeInt, Optional: true, Default: 0, @@ -340,6 +346,13 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me namespaceConfig["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"") } + isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) + } else { + namespaceConfig["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema + } + maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err)) @@ -368,20 +381,11 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me namespaceConfig["message_ttl_seconds"] = messageTTL } - schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) - } else { - namespaceConfig["schema_validation_enforce"] = schemaValidationEnforce - } - - schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) + offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) if err != nil { - if !strings.Contains(err.Error(), "Invalid auth strategy") { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) - } + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) } else { - namespaceConfig["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String() + namespaceConfig["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb) } replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String()) @@ -395,18 +399,27 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me namespaceConfig["replication_clusters"] = replClusters } - isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns) + schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns) if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err)) + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)) } else { - namespaceConfig["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema + namespaceConfig["schema_validation_enforce"] = schemaValidationEnforce } - offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns) + schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns) if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err)) + if !strings.Contains(err.Error(), "Invalid auth strategy") { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err)) + } } else { - namespaceConfig["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb) + namespaceConfig["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String() + } + + subscriptionExpirationTimeMinutes, err := client.GetSubscriptionExpirationTime(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSubscriptionExpirationTime: %w", err)) + } else { + namespaceConfig["subscription_expiration_time_minutes"] = subscriptionExpirationTimeMinutes } _ = d.Set("namespace_config", []interface{}{ @@ -553,10 +566,8 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } } - if len(nsCfg.ReplicationClusters) > 0 { - if err = client.SetNamespaceReplicationClusters(nsName.String(), nsCfg.ReplicationClusters); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetNamespaceReplicationClusters: %w", err)) - } + if err = client.SetIsAllowAutoUpdateSchema(*nsName, nsCfg.IsAllowAutoUpdateSchema); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetIsAllowAutoUpdateSchema: %w", err)) } if nsCfg.MaxConsumersPerTopic >= 0 { @@ -589,8 +600,10 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } } - if err = client.SetSchemaValidationEnforced(*nsName, nsCfg.SchemaValidationEnforce); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err)) + if len(nsCfg.ReplicationClusters) > 0 { + if err = client.SetNamespaceReplicationClusters(nsName.String(), nsCfg.ReplicationClusters); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetNamespaceReplicationClusters: %w", err)) + } } if len(nsCfg.SchemaCompatibilityStrategy) > 0 { @@ -601,8 +614,19 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, errs = multierror.Append(errs, fmt.Errorf("SetSchemaCompatibilityStrategy: %w", err)) } } - if err = client.SetIsAllowAutoUpdateSchema(*nsName, nsCfg.IsAllowAutoUpdateSchema); err != nil { - errs = multierror.Append(errs, fmt.Errorf("SetIsAllowAutoUpdateSchema: %w", err)) + + if err = client.SetSchemaValidationEnforced(*nsName, nsCfg.SchemaValidationEnforce); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err)) + } + + if nsCfg.SubscriptionExpirationTimeMinutes >= 0 { + if err = client.SetSubscriptionExpirationTime(*nsName, nsCfg.SubscriptionExpirationTimeMinutes); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetSubscriptionExpirationTime: %w", err)) + } + } else { // remove the subscription expiration time + if err = client.RemoveSubscriptionExpirationTime(*nsName); err != nil { + errs = multierror.Append(errs, fmt.Errorf("RemoveSubscriptionExpirationTime: %w", err)) + } } } @@ -819,16 +843,17 @@ func unmarshalNamespaceConfigList(v []interface{}) *types.NamespaceConfig { data := ns.(map[string]interface{}) rplClusters := data["replication_clusters"].([]interface{}) - nsConfig.ReplicationClusters = handleHCLArrayV2(rplClusters) + nsConfig.AntiAffinity = data["anti_affinity"].(string) + nsConfig.IsAllowAutoUpdateSchema = data["is_allow_auto_update_schema"].(bool) nsConfig.MaxProducersPerTopic = data["max_producers_per_topic"].(int) nsConfig.MaxConsumersPerTopic = data["max_consumers_per_topic"].(int) nsConfig.MaxConsumersPerSubscription = data["max_consumers_per_subscription"].(int) nsConfig.MessageTTLInSeconds = data["message_ttl_seconds"].(int) - nsConfig.AntiAffinity = data["anti_affinity"].(string) - nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool) - nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string) - nsConfig.IsAllowAutoUpdateSchema = data["is_allow_auto_update_schema"].(bool) nsConfig.OffloadThresholdSizeInMb = data["offload_threshold_size_in_mb"].(int) + nsConfig.ReplicationClusters = handleHCLArrayV2(rplClusters) + nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string) + nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool) + nsConfig.SubscriptionExpirationTimeMinutes = data["subscription_expiration_time_minutes"].(int) } return &nsConfig diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index b792833..54032f0 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -584,14 +584,15 @@ resource "pulsar_namespace" "test" { enable_deduplication = true namespace_config { - anti_affinity = "anti-aff" - max_consumers_per_subscription = "50" - max_consumers_per_topic = "50" - max_producers_per_topic = "50" - message_ttl_seconds = "86400" - replication_clusters = ["standalone"] - is_allow_auto_update_schema = false - offload_threshold_size_in_mb = "100" + anti_affinity = "anti-aff" + is_allow_auto_update_schema = false + max_consumers_per_subscription = "50" + max_consumers_per_topic = "50" + max_producers_per_topic = "50" + message_ttl_seconds = "86400" + offload_threshold_size_in_mb = "100" + replication_clusters = ["standalone"] + subscription_expiration_time_minutes = 90 } dispatch_rate { diff --git a/types/types.go b/types/types.go index 18a3296..530c3f4 100644 --- a/types/types.go +++ b/types/types.go @@ -24,16 +24,17 @@ import ( type ( // configurable features of the Pulsar Namespace Entity via Terraform NamespaceConfig struct { - AntiAffinity string - ReplicationClusters []string - MaxConsumersPerTopic int - MaxProducersPerTopic int - MaxConsumersPerSubscription int - MessageTTLInSeconds int - SchemaValidationEnforce bool - SchemaCompatibilityStrategy string - IsAllowAutoUpdateSchema bool - OffloadThresholdSizeInMb int + AntiAffinity string + IsAllowAutoUpdateSchema bool + MaxConsumersPerTopic int + MaxProducersPerTopic int + MaxConsumersPerSubscription int + MessageTTLInSeconds int + OffloadThresholdSizeInMb int + ReplicationClusters []string + SchemaCompatibilityStrategy string + SchemaValidationEnforce bool + SubscriptionExpirationTimeMinutes int } SplitNS struct {