diff --git a/docs/data-sources/source_reference.md b/docs/data-sources/source_reference.md index 8535ace1..eefa72c9 100644 --- a/docs/data-sources/source_reference.md +++ b/docs/data-sources/source_reference.md @@ -3,12 +3,12 @@ page_title: "materialize_source_reference Data Source - terraform-provider-materialize" subcategory: "" description: |- - The materialize_source_reference data source retrieves information about a Materialize source's references, including details about namespaces, columns, and last update times. + The materialize_source_reference data source retrieves a list of available upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables. --- # materialize_source_reference (Data Source) -The `materialize_source_reference` data source retrieves information about a Materialize source's references, including details about namespaces, columns, and last update times. +The `materialize_source_reference` data source retrieves a list of *available* upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables. ## Example Usage diff --git a/pkg/datasources/datasource_source_reference.go b/pkg/datasources/datasource_source_reference.go index 17e16750..2b6e4572 100644 --- a/pkg/datasources/datasource_source_reference.go +++ b/pkg/datasources/datasource_source_reference.go @@ -13,7 +13,7 @@ import ( func SourceReference() *schema.Resource { return &schema.Resource{ ReadContext: sourceReferenceRead, - Description: "The `materialize_source_reference` data source retrieves information about a Materialize source's references, including details about namespaces, columns, and last update times.", + Description: "The `materialize_source_reference` data source retrieves a list of *available* upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.", Schema: map[string]*schema.Schema{ "source_id": { Type: schema.TypeString, @@ -79,6 +79,7 @@ func SourceReference() *schema.Resource { func sourceReferenceRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { sourceID := d.Get("source_id").(string) + sourceID = utils.ExtractId(sourceID) var diags diag.Diagnostics diff --git a/pkg/materialize/source_postgres.go b/pkg/materialize/source_postgres.go index d744af9b..2c18633f 100644 --- a/pkg/materialize/source_postgres.go +++ b/pkg/materialize/source_postgres.go @@ -80,26 +80,28 @@ func (b *SourcePostgresBuilder) Create() error { q.WriteString(fmt.Sprintf(` (%s)`, p)) - q.WriteString(` FOR TABLES (`) - for i, t := range b.table { - if t.UpstreamSchemaName == "" { - t.UpstreamSchemaName = b.SchemaName - } - if t.Name == "" { - t.Name = t.UpstreamName - } - if t.SchemaName == "" { - t.SchemaName = b.SchemaName - } - if t.DatabaseName == "" { - t.DatabaseName = b.DatabaseName - } - q.WriteString(fmt.Sprintf(`%s.%s AS %s.%s.%s`, QuoteIdentifier(t.UpstreamSchemaName), QuoteIdentifier(t.UpstreamName), QuoteIdentifier(t.DatabaseName), QuoteIdentifier(t.SchemaName), QuoteIdentifier(t.Name))) - if i < len(b.table)-1 { - q.WriteString(`, `) + if b.table != nil && len(b.table) > 0 { + q.WriteString(` FOR TABLES (`) + for i, t := range b.table { + if t.UpstreamSchemaName == "" { + t.UpstreamSchemaName = b.SchemaName + } + if t.Name == "" { + t.Name = t.UpstreamName + } + if t.SchemaName == "" { + t.SchemaName = b.SchemaName + } + if t.DatabaseName == "" { + t.DatabaseName = b.DatabaseName + } + q.WriteString(fmt.Sprintf(`%s.%s AS %s.%s.%s`, QuoteIdentifier(t.UpstreamSchemaName), QuoteIdentifier(t.UpstreamName), QuoteIdentifier(t.DatabaseName), QuoteIdentifier(t.SchemaName), QuoteIdentifier(t.Name))) + if i < len(b.table)-1 { + q.WriteString(`, `) + } } + q.WriteString(`)`) } - q.WriteString(`)`) if b.exposeProgress.Name != "" { q.WriteString(fmt.Sprintf(` EXPOSE PROGRESS AS %s`, b.exposeProgress.QualifiedName())) diff --git a/pkg/materialize/source_reference.go b/pkg/materialize/source_reference.go index 49eadf9e..408d657e 100644 --- a/pkg/materialize/source_reference.go +++ b/pkg/materialize/source_reference.go @@ -2,6 +2,7 @@ package materialize import ( "database/sql" + "fmt" "github.com/jmoiron/sqlx" "github.com/lib/pq" @@ -61,9 +62,22 @@ func ScanSourceReference(conn *sqlx.DB, id string) (SourceReferenceParams, error return s, nil } -func ListSourceReferences(conn *sqlx.DB, sourceId string) ([]SourceReferenceParams, error) { +func refreshSourceReferences(conn *sqlx.DB, sourceName, schemaName, databaseName string) error { + query := fmt.Sprintf(`ALTER SOURCE %s REFRESH REFERENCES`, QualifiedName(databaseName, schemaName, sourceName)) + _, err := conn.Exec(query) + return err +} + +func ListSourceReferences(conn *sqlx.DB, id string) ([]SourceReferenceParams, error) { + source, err := ScanSource(conn, id) + if err == nil { + if err := refreshSourceReferences(conn, source.SourceName.String, source.SchemaName.String, source.DatabaseName.String); err != nil { + return nil, fmt.Errorf("error refreshing source references: %v", err) + } + } + p := map[string]string{ - "sr.source_id": sourceId, + "sr.source_id": id, } q := sourceReferenceQuery.QueryPredicate(p) diff --git a/pkg/materialize/source_reference_test.go b/pkg/materialize/source_reference_test.go new file mode 100644 index 00000000..b34bfbe4 --- /dev/null +++ b/pkg/materialize/source_reference_test.go @@ -0,0 +1,95 @@ +package materialize + +import ( + "testing" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers" + "github.com/jmoiron/sqlx" + "github.com/lib/pq" +) + +func TestSourceReferenceId(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectQuery( + `SELECT sr\.source_id, sr\.namespace, sr\.name, sr\.updated_at, sr\.columns, s\.name AS source_name, ss\.name AS source_schema_name, sd\.name AS source_database_name, s\.type AS source_type + FROM mz_internal\.mz_source_references sr + JOIN mz_sources s ON sr\.source_id = s\.id + JOIN mz_schemas ss ON s\.schema_id = ss\.id + JOIN mz_databases sd ON ss\.database_id = sd\.id + WHERE sr\.source_id = 'test-source-id'`, + ). + WillReturnRows(sqlmock.NewRows([]string{"source_id"}).AddRow("test-source-id")) + + result, err := SourceReferenceId(db, "test-source-id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "test-source-id" { + t.Errorf("expected source id to be 'test-source-id', got %v", result) + } + }) +} + +func TestScanSourceReference(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectQuery( + `SELECT sr\.source_id, sr\.namespace, sr\.name, sr\.updated_at, sr\.columns, s\.name AS source_name, ss\.name AS source_schema_name, sd\.name AS source_database_name, s\.type AS source_type + FROM mz_internal\.mz_source_references sr + JOIN mz_sources s ON sr\.source_id = s\.id + JOIN mz_schemas ss ON s\.schema_id = ss\.id + JOIN mz_databases sd ON ss\.database_id = sd\.id + WHERE sr\.source_id = 'test-source-id'`, + ). + WillReturnRows(sqlmock.NewRows([]string{"source_id", "namespace", "name", "updated_at", "columns", "source_name", "source_schema_name", "source_database_name", "source_type"}). + AddRow("test-source-id", "test-namespace", "test-name", "2024-10-28", pq.StringArray{"col1", "col2"}, "source-name", "source-schema-name", "source-database-name", "source-type")) + + result, err := ScanSourceReference(db, "test-source-id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.SourceId.String != "test-source-id" { + t.Errorf("expected source id to be 'test-source-id', got %v", result.SourceId.String) + } + }) +} + +func TestRefreshSourceReferences(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectExec( + `ALTER SOURCE "test-database"\."test-schema"\."test-source" REFRESH REFERENCES`, + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := refreshSourceReferences(db, "test-source", "test-schema", "test-database") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func TestListSourceReferences(t *testing.T) { + testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { + mock.ExpectQuery( + `SELECT sr\.source_id, sr\.namespace, sr\.name, sr\.updated_at, sr\.columns, s\.name AS source_name, ss\.name AS source_schema_name, sd\.name AS source_database_name, s\.type AS source_type + FROM mz_internal\.mz_source_references sr + JOIN mz_sources s ON sr\.source_id = s\.id + JOIN mz_schemas ss ON s\.schema_id = ss\.id + JOIN mz_databases sd ON ss\.database_id = sd\.id + WHERE sr\.source_id = 'test-source-id'`, + ). + WillReturnRows(sqlmock.NewRows([]string{"source_id", "namespace", "name", "updated_at", "columns", "source_name", "source_schema_name", "source_database_name", "source_type"}). + AddRow("test-source-id", "test-namespace", "test-name", "2024-10-28", pq.StringArray{"col1", "col2"}, "source-name", "source-schema-name", "source-database-name", "source-type")) + + result, err := ListSourceReferences(db, "test-source-id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(result) != 1 { + t.Errorf("expected 1 result, got %d", len(result)) + } + if result[0].SourceId.String != "test-source-id" { + t.Errorf("expected source id to be 'test-source-id', got %v", result[0].SourceId.String) + } + }) +} diff --git a/pkg/provider/acceptance_cluster_test.go b/pkg/provider/acceptance_cluster_test.go index 0ab70632..83182933 100644 --- a/pkg/provider/acceptance_cluster_test.go +++ b/pkg/provider/acceptance_cluster_test.go @@ -459,7 +459,6 @@ func testAccManagedClusterResourceAlterGraceful(clusterName, clusterSize string, enabled = true timeout = "10m" on_timeout = "%[4]s" - } } `, diff --git a/pkg/provider/acceptance_datasource_source_reference_test.go b/pkg/provider/acceptance_datasource_source_reference_test.go new file mode 100644 index 00000000..c20a44f2 --- /dev/null +++ b/pkg/provider/acceptance_datasource_source_reference_test.go @@ -0,0 +1,172 @@ +package provider + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" +) + +func TestAccDataSourceSourceReference_basic(t *testing.T) { + addTestTopic() + nameSpace := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + Steps: []resource.TestStep{ + { + Config: testAccDataSourceSourceReferenceConfig(nameSpace), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrSet("data.materialize_source_reference.kafka", "source_id"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.postgres", "source_id"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.mysql", "source_id"), + + // Check total references + resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.#", "1"), + resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.#", "3"), + resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.#", "4"), + + // Check Postgres reference attributes + resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.0.namespace", "public"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.postgres", "references.0.name"), + resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.0.source_name", fmt.Sprintf("%s_source_postgres", nameSpace)), + resource.TestCheckResourceAttr("data.materialize_source_reference.postgres", "references.0.source_type", "postgres"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.postgres", "references.0.updated_at"), + + // Check MySQL reference attributes + resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.0.namespace", "shop"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.mysql", "references.0.name"), + resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.0.source_name", fmt.Sprintf("%s_source_mysql", nameSpace)), + resource.TestCheckResourceAttr("data.materialize_source_reference.mysql", "references.1.source_type", "mysql"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.mysql", "references.1.updated_at"), + + // Check Kafka reference attributes + resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.0.name", "terraform"), + resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.0.source_name", fmt.Sprintf("%s_source_kafka", nameSpace)), + resource.TestCheckResourceAttr("data.materialize_source_reference.kafka", "references.0.source_type", "kafka"), + resource.TestCheckResourceAttrSet("data.materialize_source_reference.kafka", "references.0.updated_at"), + ), + }, + }, + }) +} + +func testAccDataSourceSourceReferenceConfig(nameSpace string) string { + return fmt.Sprintf(` + // Postgres setup + resource "materialize_secret" "postgres_password" { + name = "%[1]s_secret_postgres" + value = "c2VjcmV0Cg==" + } + + resource "materialize_connection_postgres" "postgres_connection" { + name = "%[1]s_connection_postgres" + host = "postgres" + port = 5432 + user { + text = "postgres" + } + password { + name = materialize_secret.postgres_password.name + } + database = "postgres" + } + + resource "materialize_source_postgres" "test_source_postgres" { + name = "%[1]s_source_postgres" + cluster_name = "quickstart" + + postgres_connection { + name = materialize_connection_postgres.postgres_connection.name + } + publication = "mz_source" + } + + resource "materialize_source_table_postgres" "table_from_source_pg" { + name = "%[1]s_table" + schema_name = "public" + database_name = "materialize" + + source { + name = materialize_source_postgres.test_source_postgres.name + } + + upstream_name = "table2" + upstream_schema_name = "public" + } + + // MySQL setup + resource "materialize_secret" "mysql_password" { + name = "%[1]s_secret_mysql" + value = "c2VjcmV0Cg==" + } + + resource "materialize_connection_mysql" "mysql_connection" { + name = "%[1]s_connection_mysql" + host = "mysql" + port = 3306 + user { + text = "repluser" + } + password { + name = materialize_secret.mysql_password.name + } + } + + resource "materialize_source_mysql" "test_source_mysql" { + name = "%[1]s_source_mysql" + cluster_name = "quickstart" + + mysql_connection { + name = materialize_connection_mysql.mysql_connection.name + } + } + + // Kafka setup + resource "materialize_connection_kafka" "kafka_connection" { + name = "%[1]s_connection_kafka" + kafka_broker { + broker = "redpanda:9092" + } + security_protocol = "PLAINTEXT" + } + + resource "materialize_source_kafka" "test_source_kafka" { + name = "%[1]s_source_kafka" + cluster_name = "quickstart" + topic = "terraform" + + kafka_connection { + name = materialize_connection_kafka.kafka_connection.name + } + value_format { + json = true + } + key_format { + json = true + } + } + + data "materialize_source_reference" "kafka" { + source_id = materialize_source_kafka.test_source_kafka.id + depends_on = [ + materialize_source_kafka.test_source_kafka + ] + } + + data "materialize_source_reference" "postgres" { + source_id = materialize_source_postgres.test_source_postgres.id + depends_on = [ + materialize_source_postgres.test_source_postgres + ] + } + + data "materialize_source_reference" "mysql" { + source_id = materialize_source_mysql.test_source_mysql.id + depends_on = [ + materialize_source_mysql.test_source_mysql + ] + } + `, nameSpace) +}