diff --git a/pkg/materialize/source_table_kafka.go b/pkg/materialize/source_table_kafka.go index ce933225..aa5ed194 100644 --- a/pkg/materialize/source_table_kafka.go +++ b/pkg/materialize/source_table_kafka.go @@ -63,7 +63,7 @@ func SourceTableKafkaId(conn *sqlx.DB, obj MaterializeObject) (string, error) { } q := NewBaseQuery(sourceTableKafkaQuery).QueryPredicate(p) - var t SourceTableParams + var t SourceTableKafkaParams if err := conn.Get(&t, q); err != nil { return "", err } diff --git a/pkg/testhelpers/mock_scans.go b/pkg/testhelpers/mock_scans.go index 5ed85b9d..0607e431 100644 --- a/pkg/testhelpers/mock_scans.go +++ b/pkg/testhelpers/mock_scans.go @@ -876,6 +876,9 @@ func MockSourceTableKafkaScan(mock sqlmock.Sqlmock, predicate string) { source_schemas.name AS source_schema_name, source_databases.name AS source_database_name, mz_kafka_source_tables.topic AS upstream_table_name, + mz_kafka_source_tables.envelope_type, + mz_kafka_source_tables.key_format, + mz_kafka_source_tables.value_format, mz_sources.type AS source_type, comments.comment AS comment, mz_roles.name AS owner_name,