diff --git a/pkg/datasources/datasource_source_table_test.go b/pkg/datasources/datasource_source_table_test.go index 7b084a08..59413f73 100644 --- a/pkg/datasources/datasource_source_table_test.go +++ b/pkg/datasources/datasource_source_table_test.go @@ -40,9 +40,8 @@ func TestSourceTableDatasource(t *testing.T) { r.Equal("schema", table["schema_name"]) r.Equal("database", table["database_name"]) r.Equal("KAFKA", table["source_type"]) - // TODO: Update once upstream_name and upstream_schema_name are supported - r.Equal("", table["upstream_name"]) - r.Equal("", table["upstream_schema_name"]) + r.Equal("table", table["upstream_name"]) + r.Equal("schema", table["upstream_schema_name"]) r.Equal("comment", table["comment"]) r.Equal("materialize", table["owner_name"]) diff --git a/pkg/materialize/source_table.go b/pkg/materialize/source_table.go index 1b6fdc34..e73ba519 100644 --- a/pkg/materialize/source_table.go +++ b/pkg/materialize/source_table.go @@ -27,38 +27,49 @@ type SourceTableParams struct { } var sourceTableQuery = NewBaseQuery(` - SELECT - mz_tables.id, - mz_tables.name, - mz_schemas.name AS schema_name, - mz_databases.name AS database_name, - mz_sources.name AS source_name, - source_schemas.name AS source_schema_name, - source_databases.name AS source_database_name, - mz_sources.type AS source_type, - comments.comment AS comment, - mz_roles.name AS owner_name, - mz_tables.privileges - FROM mz_tables - JOIN mz_schemas - ON mz_tables.schema_id = mz_schemas.id - JOIN mz_databases - ON mz_schemas.database_id = mz_databases.id - JOIN mz_sources - ON mz_tables.source_id = mz_sources.id - JOIN mz_schemas AS source_schemas - ON mz_sources.schema_id = source_schemas.id - JOIN mz_databases AS source_databases - ON source_schemas.database_id = source_databases.id - JOIN mz_roles - ON mz_tables.owner_id = mz_roles.id - LEFT JOIN ( - SELECT id, comment - FROM mz_internal.mz_comments - WHERE object_type = 'table' - AND object_sub_id IS NULL - ) comments - ON mz_tables.id = comments.id + SELECT + mz_tables.id, + mz_tables.name, + mz_schemas.name AS schema_name, + mz_databases.name AS database_name, + mz_sources.name AS source_name, + source_schemas.name AS source_schema_name, + source_databases.name AS source_database_name, + mz_sources.type AS source_type, + COALESCE(mz_kafka_source_tables.topic, + mz_mysql_source_tables.table_name, + mz_postgres_source_tables.table_name) AS upstream_table_name, + COALESCE(mz_mysql_source_tables.schema_name, + mz_postgres_source_tables.schema_name) AS upstream_schema_name, + comments.comment AS comment, + mz_roles.name AS owner_name, + mz_tables.privileges + FROM mz_tables + JOIN mz_schemas + ON mz_tables.schema_id = mz_schemas.id + JOIN mz_databases + ON mz_schemas.database_id = mz_databases.id + JOIN mz_sources + ON mz_tables.source_id = mz_sources.id + JOIN mz_schemas AS source_schemas + ON mz_sources.schema_id = source_schemas.id + JOIN mz_databases AS source_databases + ON source_schemas.database_id = source_databases.id + LEFT JOIN mz_internal.mz_kafka_source_tables + ON mz_tables.id = mz_kafka_source_tables.id + LEFT JOIN mz_internal.mz_mysql_source_tables + ON mz_tables.id = mz_mysql_source_tables.id + LEFT JOIN mz_internal.mz_postgres_source_tables + ON mz_tables.id = mz_postgres_source_tables.id + JOIN mz_roles + ON mz_tables.owner_id = mz_roles.id + LEFT JOIN ( + SELECT id, comment + FROM mz_internal.mz_comments + WHERE object_type = 'table' + AND object_sub_id IS NULL + ) comments + ON mz_tables.id = comments.id `) func SourceTableId(conn *sqlx.DB, obj MaterializeObject) (string, error) { diff --git a/pkg/testhelpers/mock_scans.go b/pkg/testhelpers/mock_scans.go index 968fd60e..0ce09e64 100644 --- a/pkg/testhelpers/mock_scans.go +++ b/pkg/testhelpers/mock_scans.go @@ -912,41 +912,52 @@ func MockSourceTableKafkaScan(mock sqlmock.Sqlmock, predicate string) { func MockSourceTableScan(mock sqlmock.Sqlmock, predicate string) { b := ` SELECT - mz_tables.id, - mz_tables.name, - mz_schemas.name AS schema_name, - mz_databases.name AS database_name, - mz_sources.name AS source_name, - source_schemas.name AS source_schema_name, - source_databases.name AS source_database_name, - mz_sources.type AS source_type, - comments.comment AS comment, - mz_roles.name AS owner_name, - mz_tables.privileges - FROM mz_tables - JOIN mz_schemas - ON mz_tables.schema_id = mz_schemas.id - JOIN mz_databases - ON mz_schemas.database_id = mz_databases.id - JOIN mz_sources - ON mz_tables.source_id = mz_sources.id - JOIN mz_schemas AS source_schemas - ON mz_sources.schema_id = source_schemas.id - JOIN mz_databases AS source_databases - ON source_schemas.database_id = source_databases.id - JOIN mz_roles - ON mz_tables.owner_id = mz_roles.id - LEFT JOIN \( - SELECT id, comment - FROM mz_internal.mz_comments - WHERE object_type = 'table' - AND object_sub_id IS NULL - \) comments - ON mz_tables.id = comments.id` + mz_tables.id, + mz_tables.name, + mz_schemas.name AS schema_name, + mz_databases.name AS database_name, + mz_sources.name AS source_name, + source_schemas.name AS source_schema_name, + source_databases.name AS source_database_name, + mz_sources.type AS source_type, + COALESCE\(mz_kafka_source_tables.topic, + mz_mysql_source_tables.table_name, + mz_postgres_source_tables.table_name\) AS upstream_table_name, + COALESCE\(mz_mysql_source_tables.schema_name, + mz_postgres_source_tables.schema_name\) AS upstream_schema_name, + comments.comment AS comment, + mz_roles.name AS owner_name, + mz_tables.privileges + FROM mz_tables + JOIN mz_schemas + ON mz_tables.schema_id = mz_schemas.id + JOIN mz_databases + ON mz_schemas.database_id = mz_databases.id + JOIN mz_sources + ON mz_tables.source_id = mz_sources.id + JOIN mz_schemas AS source_schemas + ON mz_sources.schema_id = source_schemas.id + JOIN mz_databases AS source_databases + ON source_schemas.database_id = source_databases.id + LEFT JOIN mz_internal.mz_kafka_source_tables + ON mz_tables.id = mz_kafka_source_tables.id + LEFT JOIN mz_internal.mz_mysql_source_tables + ON mz_tables.id = mz_mysql_source_tables.id + LEFT JOIN mz_internal.mz_postgres_source_tables + ON mz_tables.id = mz_postgres_source_tables.id + JOIN mz_roles + ON mz_tables.owner_id = mz_roles.id + LEFT JOIN \( + SELECT id, comment + FROM mz_internal.mz_comments + WHERE object_type = 'table' + AND object_sub_id IS NULL + \) comments + ON mz_tables.id = comments.id` q := mockQueryBuilder(b, predicate, "") - ir := mock.NewRows([]string{"id", "name", "schema_name", "database_name", "source_name", "source_schema_name", "source_database_name", "source_type", "comment", "owner_name", "privileges"}). - AddRow("u1", "table", "schema", "database", "source", "public", "materialize", "KAFKA", "comment", "materialize", defaultPrivilege) + ir := mock.NewRows([]string{"id", "name", "schema_name", "database_name", "source_name", "source_schema_name", "source_database_name", "upstream_table_name", "upstream_schema_name", "source_type", "comment", "owner_name", "privileges"}). + AddRow("u1", "table", "schema", "database", "source", "public", "materialize", "table", "schema", "KAFKA", "comment", "materialize", defaultPrivilege) mock.ExpectQuery(q).WillReturnRows(ir) }