Skip to content

Commit

Permalink
fix topic not found when namespace non exists (#131)
Browse files Browse the repository at this point in the history
* fix topic not found namespace

* fix ci

* fix ci

* fix ci

* fix ci

* fix ci

* fix ci

* fix ci

* fix

* fix ci

* fix ci
  • Loading branch information
freeznet authored Jan 6, 2025
1 parent fccccad commit e5b897d
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 15 deletions.
10 changes: 10 additions & 0 deletions pulsar/resource_pulsar_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func TestHandleExistingCluster(t *testing.T) {
PreCheck: func() {
testAccPreCheck(t)
createCluster(t, cName)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Clusters().Delete(cName); err != nil {
t.Fatalf("ERROR_DELETING_TEST_CLUSTER: %v", err)
}
})
},
CheckDestroy: testPulsarClusterDestroy,
ProviderFactories: testAccProviderFactories,
Expand All @@ -75,6 +80,11 @@ func TestImportExistingCluster(t *testing.T) {
PreCheck: func() {
testAccPreCheck(t)
createCluster(t, cName)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Clusters().Delete(cName); err != nil {
t.Fatalf("ERROR_DELETING_TEST_CLUSTER: %v", err)
}
})
},
CheckDestroy: testPulsarClusterDestroy,
ProviderFactories: testAccProviderFactories,
Expand Down
5 changes: 5 additions & 0 deletions pulsar/resource_pulsar_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ func TestImportExistingNamespace(t *testing.T) {
PreCheck: func() {
testAccPreCheck(t)
createNamespace(t, id)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Namespaces().DeleteNamespace(id); err != nil {
t.Fatalf("ERROR_DELETING_TEST_NS: %v", err)
}
})
},
CheckDestroy: testPulsarNamespaceDestroy,
ProviderFactories: testAccProviderFactories,
Expand Down
20 changes: 16 additions & 4 deletions pulsar/resource_pulsar_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,24 @@ func testPulsarSinkDestroy(s *terraform.State) error {

func TestImportExistingSink(t *testing.T) {
sinkName := acctest.RandString(6)
err := createSampleSink(sinkName)
if err != nil {
t.Fatal(err)
}

resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
createSampleSink(sinkName)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Sinks().DeleteSink(
"public",
"default",
sinkName,
); err != nil {
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
return
}
t.Fatalf("ERROR_DELETING_TEST_SINK: %v", err)
}
})
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testPulsarSinkDestroy,
Steps: []resource.TestStep{
Expand Down
19 changes: 15 additions & 4 deletions pulsar/resource_pulsar_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,23 @@ func getPulsarSourceByResourceID(id string) (*utils.SourceConfig, error) {

func TestImportExistingSource(t *testing.T) {
sourceName := acctest.RandString(6)
err := createSampleSource(sourceName)
if err != nil {
t.Fatal(err)
}

resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
createSampleSource(sourceName)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Sources().DeleteSource(
"public",
"default",
sourceName); err != nil {
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
return
}
t.Fatalf("ERROR_DELETING_TEST_SOURCE: %v", err)
}
})
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testPulsarSourceDestroy,
Steps: []resource.TestStep{
Expand Down
10 changes: 10 additions & 0 deletions pulsar/resource_pulsar_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func TestHandleExistingTenant(t *testing.T) {
PreCheck: func() {
testAccPreCheck(t)
createTenant(t, tName)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Tenants().Delete(tName); err != nil {
t.Fatalf("ERROR_DELETING_TEST_TENANT: %v", err)
}
})
},
ProviderFactories: testAccProviderFactories,
PreventPostDestroyRefresh: false,
Expand All @@ -77,6 +82,11 @@ func TestImportExistingTenant(t *testing.T) {
PreCheck: func() {
testAccPreCheck(t)
createTenant(t, tName)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Tenants().Delete(tName); err != nil {
t.Fatalf("ERROR_DELETING_TEST_TENANT: %v", err)
}
})
},
CheckDestroy: testPulsarTenantDestroy,
ProviderFactories: testAccProviderFactories,
Expand Down
5 changes: 5 additions & 0 deletions pulsar/resource_pulsar_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -162,6 +163,10 @@ func resourcePulsarTopicRead(ctx context.Context, d *schema.ResourceData, meta i

topicName, found, err := getTopic(d, meta)
if err != nil {
if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 {
d.SetId("")
return nil
}
return diag.Errorf("%v", err)
}
if !found {
Expand Down
117 changes: 110 additions & 7 deletions pulsar/resource_pulsar_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func TestTopic(t *testing.T) {
{
Config: testPulsarPartitionTopic,
Check: resource.ComposeTestCheckFunc(
testPulsarTopicExists("pulsar_topic.sample-topic-1"),
testPulsarTopicExists("pulsar_topic.sample-topic-2"),
testPulsarTopicExists("pulsar_topic.sample-topic-1", t),
testPulsarTopicExists("pulsar_topic.sample-topic-2", t),
),
},
{
Config: testPulsarNonPartitionTopic,
Check: resource.ComposeTestCheckFunc(
testPulsarTopicExists("pulsar_topic.sample-topic-3"),
testPulsarTopicExists("pulsar_topic.sample-topic-4"),
testPulsarTopicExists("pulsar_topic.sample-topic-3", t),
testPulsarTopicExists("pulsar_topic.sample-topic-4", t),
),
},
},
Expand All @@ -65,11 +65,20 @@ func TestImportExistingTopic(t *testing.T) {
pnum := 10

fullID := strings.Join([]string{ttype + ":/", "public", "default", tname}, "/")
topicName, err := utils.GetTopicName(fullID)
if err != nil {
t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err)
}

resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
createTopic(t, fullID, pnum)
t.Cleanup(func() {
if err := getClientFromMeta(testAccProvider.Meta()).Topics().Delete(*topicName, true, pnum == 0); err != nil {
t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err)
}
})
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testPulsarTopicDestroy,
Expand All @@ -93,6 +102,61 @@ func TestPartionedTopicWithPermissionGrantUpdate(t *testing.T) {
testTopicWithPermissionGrantUpdate(t, 10)
}

func TestTopicNamespaceExternallyRemoved(t *testing.T) {

resourceName := "pulsar_topic.test"
tName := acctest.RandString(10)
nsName := acctest.RandString(10)
topicName := acctest.RandString(10)

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
CheckDestroy: testPulsarTopicDestroy,
Steps: []resource.TestStep{
{
Config: testPulsarNamespaceWithTopic(testWebServiceURL, tName, nsName, topicName),
Check: resource.ComposeTestCheckFunc(
testPulsarTopicExists(resourceName, t),
),
ExpectError: nil,
},
{
PreConfig: func() {
client := getClientFromMeta(testAccProvider.Meta())
topicName, err := utils.GetTopicName(fmt.Sprintf("persistent://%s/%s/%s", tName, nsName, topicName))
if err != nil {
t.Fatalf("ERROR_GETTING_TOPIC_NAME: %v", err)
}
namespace, err := utils.GetNameSpaceName(topicName.GetTenant(), topicName.GetNamespace())
if err != nil {
t.Fatalf("ERROR_READ_NAMESPACE: %v", err)
}
partitionedTopics, nonPartitionedTopics, err := client.Topics().List(*namespace)
if err != nil {
t.Fatalf("ERROR_READ_TOPIC_DATA: %v", err)
}

for _, topic := range append(partitionedTopics, nonPartitionedTopics...) {
if topicName.String() == topic {
if err = client.Topics().Delete(*topicName, true, true); err != nil {
t.Fatalf("ERROR_DELETING_TEST_TOPIC: %v", err)
}
}
}
if err = client.Namespaces().DeleteNamespace(tName + "/" + nsName); err != nil {
t.Fatalf("ERROR_DELETING_TEST_NS: %v", err)
}
},
Config: testPulsarNamespaceWithTopic(testWebServiceURL, tName, nsName, topicName),
PlanOnly: true,
ExpectNonEmptyPlan: true,
ExpectError: nil,
},
},
})
}

func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
resourceName := "pulsar_topic.test"
tname := acctest.RandString(10)
Expand All @@ -114,7 +178,7 @@ func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
actions = ["produce", "consume"]
}`),
Check: resource.ComposeTestCheckFunc(
testPulsarTopicExists(resourceName),
testPulsarTopicExists(resourceName, t),
resource.TestCheckResourceAttr(resourceName, "permission_grant.#", "2"),
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.role", "some-role-1"),
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.actions.#", "3"),
Expand All @@ -134,7 +198,7 @@ func testTopicWithPermissionGrantUpdate(t *testing.T, pnum int) {
actions = ["produce"]
}`),
Check: resource.ComposeTestCheckFunc(
testPulsarTopicExists(resourceName),
testPulsarTopicExists(resourceName, t),
resource.TestCheckResourceAttr(resourceName, "permission_grant.#", "1"),
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.role", "some-role-2"),
resource.TestCheckResourceAttr(resourceName, "permission_grant.0.actions.#", "1"),
Expand Down Expand Up @@ -177,7 +241,7 @@ func testPulsarTopicDestroy(s *terraform.State) error {
return nil
}

func testPulsarTopicExists(topic string) resource.TestCheckFunc {
func testPulsarTopicExists(topic string, t *testing.T) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[topic]
if !ok {
Expand All @@ -188,6 +252,7 @@ func testPulsarTopicExists(topic string) resource.TestCheckFunc {
if err != nil {
return fmt.Errorf("ERROR_READ_TOPIC: %w", err)
}
t.Logf("topicName: %v", topicName)
namespace, err := utils.GetNameSpaceName(topicName.GetTenant(), topicName.GetNamespace())
if err != nil {
return fmt.Errorf("ERROR_READ_NAMESPACE: %w", err)
Expand Down Expand Up @@ -338,3 +403,41 @@ resource "pulsar_topic" "test" {
}
`, url, ttype, tname, pnum, permissionGrants)
}

func testPulsarNamespaceWithTopic(wsURL, tenant, ns, topicName string) string {
return fmt.Sprintf(`
provider "pulsar" {
web_service_url = "%s"
}
resource "pulsar_tenant" "test_tenant" {
tenant = "%s"
allowed_clusters = ["standalone"]
}
resource "pulsar_namespace" "test" {
tenant = pulsar_tenant.test_tenant.tenant
namespace = "%s"
topic_auto_creation {
enable = false
}
depends_on = [
pulsar_tenant.test_tenant
]
}
resource "pulsar_topic" "test" {
tenant = "%s"
namespace = "%s"
topic_type = "persistent"
topic_name = "%s"
partitions = 0
depends_on = [
pulsar_namespace.test
]
}
`, wsURL, tenant, ns, tenant, ns, topicName)
}

0 comments on commit e5b897d

Please sign in to comment.