Skip to content

Commit

Permalink
Extend data source to include upstream names
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Sep 24, 2024
1 parent 9b96939 commit e82ea93
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 68 deletions.
5 changes: 2 additions & 3 deletions pkg/datasources/datasource_source_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func TestSourceTableDatasource(t *testing.T) {
r.Equal("schema", table["schema_name"])
r.Equal("database", table["database_name"])
r.Equal("KAFKA", table["source_type"])
// TODO: Update once upstream_name and upstream_schema_name are supported
r.Equal("", table["upstream_name"])
r.Equal("", table["upstream_schema_name"])
r.Equal("table", table["upstream_name"])
r.Equal("schema", table["upstream_schema_name"])
r.Equal("comment", table["comment"])
r.Equal("materialize", table["owner_name"])

Expand Down
75 changes: 43 additions & 32 deletions pkg/materialize/source_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,49 @@ type SourceTableParams struct {
}

var sourceTableQuery = NewBaseQuery(`
SELECT
mz_tables.id,
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_sources.type AS source_type,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
FROM mz_tables
JOIN mz_schemas
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN (
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'table'
AND object_sub_id IS NULL
) comments
ON mz_tables.id = comments.id
SELECT
mz_tables.id,
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_sources.type AS source_type,
COALESCE(mz_kafka_source_tables.topic,
mz_mysql_source_tables.table_name,
mz_postgres_source_tables.table_name) AS upstream_table_name,
COALESCE(mz_mysql_source_tables.schema_name,
mz_postgres_source_tables.schema_name) AS upstream_schema_name,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
FROM mz_tables
JOIN mz_schemas
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
LEFT JOIN mz_internal.mz_kafka_source_tables
ON mz_tables.id = mz_kafka_source_tables.id
LEFT JOIN mz_internal.mz_mysql_source_tables
ON mz_tables.id = mz_mysql_source_tables.id
LEFT JOIN mz_internal.mz_postgres_source_tables
ON mz_tables.id = mz_postgres_source_tables.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN (
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'table'
AND object_sub_id IS NULL
) comments
ON mz_tables.id = comments.id
`)

func SourceTableId(conn *sqlx.DB, obj MaterializeObject) (string, error) {
Expand Down
77 changes: 44 additions & 33 deletions pkg/testhelpers/mock_scans.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,41 +912,52 @@ func MockSourceTableKafkaScan(mock sqlmock.Sqlmock, predicate string) {
func MockSourceTableScan(mock sqlmock.Sqlmock, predicate string) {
b := `
SELECT
mz_tables.id,
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_sources.type AS source_type,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
FROM mz_tables
JOIN mz_schemas
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN \(
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'table'
AND object_sub_id IS NULL
\) comments
ON mz_tables.id = comments.id`
mz_tables.id,
mz_tables.name,
mz_schemas.name AS schema_name,
mz_databases.name AS database_name,
mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_sources.type AS source_type,
COALESCE\(mz_kafka_source_tables.topic,
mz_mysql_source_tables.table_name,
mz_postgres_source_tables.table_name\) AS upstream_table_name,
COALESCE\(mz_mysql_source_tables.schema_name,
mz_postgres_source_tables.schema_name\) AS upstream_schema_name,
comments.comment AS comment,
mz_roles.name AS owner_name,
mz_tables.privileges
FROM mz_tables
JOIN mz_schemas
ON mz_tables.schema_id = mz_schemas.id
JOIN mz_databases
ON mz_schemas.database_id = mz_databases.id
JOIN mz_sources
ON mz_tables.source_id = mz_sources.id
JOIN mz_schemas AS source_schemas
ON mz_sources.schema_id = source_schemas.id
JOIN mz_databases AS source_databases
ON source_schemas.database_id = source_databases.id
LEFT JOIN mz_internal.mz_kafka_source_tables
ON mz_tables.id = mz_kafka_source_tables.id
LEFT JOIN mz_internal.mz_mysql_source_tables
ON mz_tables.id = mz_mysql_source_tables.id
LEFT JOIN mz_internal.mz_postgres_source_tables
ON mz_tables.id = mz_postgres_source_tables.id
JOIN mz_roles
ON mz_tables.owner_id = mz_roles.id
LEFT JOIN \(
SELECT id, comment
FROM mz_internal.mz_comments
WHERE object_type = 'table'
AND object_sub_id IS NULL
\) comments
ON mz_tables.id = comments.id`

q := mockQueryBuilder(b, predicate, "")
ir := mock.NewRows([]string{"id", "name", "schema_name", "database_name", "source_name", "source_schema_name", "source_database_name", "source_type", "comment", "owner_name", "privileges"}).
AddRow("u1", "table", "schema", "database", "source", "public", "materialize", "KAFKA", "comment", "materialize", defaultPrivilege)
ir := mock.NewRows([]string{"id", "name", "schema_name", "database_name", "source_name", "source_schema_name", "source_database_name", "upstream_table_name", "upstream_schema_name", "source_type", "comment", "owner_name", "privileges"}).
AddRow("u1", "table", "schema", "database", "source", "public", "materialize", "table", "schema", "KAFKA", "comment", "materialize", defaultPrivilege)
mock.ExpectQuery(q).WillReturnRows(ir)
}

Expand Down

0 comments on commit e82ea93

Please sign in to comment.