From a4b4b7533d27336c3ab6c11d0d6be3ff094083c8 Mon Sep 17 00:00:00 2001 From: Dennis Hume Date: Thu, 9 Nov 2023 11:20:05 -0600 Subject: [PATCH] Prevent Name Leaking --- pkg/materialize/identifiers.go | 8 +--- ...nnection_confluent_schema_registry_test.go | 22 ++++++--- .../resource_connection_kafka_test.go | 2 +- .../resource_connection_postgres_test.go | 43 +++++++++++------ pkg/resources/resource_sink_kafka_test.go | 42 +++++++++++++---- pkg/resources/resource_source_kafka_test.go | 25 ++++++++-- .../resource_source_postgres_test.go | 46 +++++++++++-------- pkg/resources/schema.go | 6 ++- 8 files changed, 130 insertions(+), 64 deletions(-) diff --git a/pkg/materialize/identifiers.go b/pkg/materialize/identifiers.go index a4c69bfe..055e05ba 100644 --- a/pkg/materialize/identifiers.go +++ b/pkg/materialize/identifiers.go @@ -14,15 +14,11 @@ func GetIdentifierSchemaStruct(databaseName string, schemaName string, v interfa if v, ok := u["name"]; ok { conn.Name = v.(string) } - if v, ok := u["schema_name"]; ok && v.(string) != "" { + if v, ok := u["schema_name"]; ok { conn.SchemaName = v.(string) - } else { - conn.SchemaName = schemaName } - if v, ok := u["database_name"]; ok && v.(string) != "" { + if v, ok := u["database_name"]; ok { conn.DatabaseName = v.(string) - } else { - conn.DatabaseName = databaseName } return conn } diff --git a/pkg/resources/resource_connection_confluent_schema_registry_test.go b/pkg/resources/resource_connection_confluent_schema_registry_test.go index 1487868d..a1aed02d 100644 --- a/pkg/resources/resource_connection_confluent_schema_registry_test.go +++ b/pkg/resources/resource_connection_confluent_schema_registry_test.go @@ -20,11 +20,21 @@ var inConfluentSchemaRegistry = map[string]interface{}{ "url": "http://localhost:8081", "ssl_certificate_authority": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "ssl"}}}}, "ssl_certificate": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "ssl"}}}}, - "ssl_key": []interface{}{map[string]interface{}{"name": "ssl"}}, - "password": []interface{}{map[string]interface{}{"name": "password"}}, - "username": []interface{}{map[string]interface{}{"text": "user"}}, - "ssh_tunnel": []interface{}{map[string]interface{}{"name": "tunnel"}}, - "aws_privatelink": []interface{}{map[string]interface{}{"name": "privatelink"}}, + "ssl_key": []interface{}{ + map[string]interface{}{ + "name": "ssl", + "database_name": "ssl_key", + }, + }, + "password": []interface{}{map[string]interface{}{"name": "password"}}, + "username": []interface{}{map[string]interface{}{"text": "user"}}, + "ssh_tunnel": []interface{}{ + map[string]interface{}{ + "name": "tunnel", + "schema_name": "tunnel_schema", + }, + }, + "aws_privatelink": []interface{}{map[string]interface{}{"name": "privatelink"}}, } func TestResourceConnectionConfluentSchemaRegistryCreate(t *testing.T) { @@ -35,7 +45,7 @@ func TestResourceConnectionConfluentSchemaRegistryCreate(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE CONNECTION "database"."schema"."conn" TO CONFLUENT SCHEMA REGISTRY \(URL 'http://localhost:8081', USERNAME = 'user', PASSWORD = SECRET "database"."schema"."password", SSL CERTIFICATE AUTHORITY = SECRET "database"."schema"."ssl", SSL CERTIFICATE = SECRET "database"."schema"."ssl", SSL KEY = SECRET "database"."schema"."ssl", AWS PRIVATELINK "database"."schema"."privatelink", SSH TUNNEL "database"."schema"."tunnel"\)`, + `CREATE CONNECTION "database"."schema"."conn" TO CONFLUENT SCHEMA REGISTRY \(URL 'http://localhost:8081', USERNAME = 'user', PASSWORD = SECRET "materialize"."public"."password", SSL CERTIFICATE AUTHORITY = SECRET "materialize"."public"."ssl", SSL CERTIFICATE = SECRET "materialize"."public"."ssl", SSL KEY = SECRET "ssl_key"."public"."ssl", AWS PRIVATELINK "materialize"."public"."privatelink", SSH TUNNEL "materialize"."tunnel_schema"."tunnel"\)`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id diff --git a/pkg/resources/resource_connection_kafka_test.go b/pkg/resources/resource_connection_kafka_test.go index 3eb06ee3..1d9c94e0 100644 --- a/pkg/resources/resource_connection_kafka_test.go +++ b/pkg/resources/resource_connection_kafka_test.go @@ -37,7 +37,7 @@ func TestResourceConnectionKafkaCreate(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE CONNECTION "database"."schema"."conn" TO KAFKA \(BROKERS \('b-1.hostname-1:9096' USING SSH TUNNEL "database"."schema"."tunnel"\), PROGRESS TOPIC 'topic', SSL CERTIFICATE AUTHORITY = 'key', SSL CERTIFICATE = SECRET "database"."schema"."cert", SSL KEY = SECRET "database"."schema"."key", SASL MECHANISMS = 'PLAIN', SASL USERNAME = 'username', SASL PASSWORD = SECRET "database"."schema"."password"\);`, + `CREATE CONNECTION "database"."schema"."conn" TO KAFKA \(BROKERS \('b-1.hostname-1:9096' USING SSH TUNNEL "materialize"."public"."tunnel"\), PROGRESS TOPIC 'topic', SSL CERTIFICATE AUTHORITY = 'key', SSL CERTIFICATE = SECRET "materialize"."public"."cert", SSL KEY = SECRET "materialize"."public"."key", SASL MECHANISMS = 'PLAIN', SASL USERNAME = 'username', SASL PASSWORD = SECRET "materialize"."public"."password"\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id diff --git a/pkg/resources/resource_connection_postgres_test.go b/pkg/resources/resource_connection_postgres_test.go index ab085ff2..e2efd80c 100644 --- a/pkg/resources/resource_connection_postgres_test.go +++ b/pkg/resources/resource_connection_postgres_test.go @@ -13,20 +13,33 @@ import ( ) var inPostgres = map[string]interface{}{ - "name": "conn", - "schema_name": "schema", - "database_name": "database", - "database": "default", - "host": "postgres_host", - "port": 5432, - "user": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "user"}}}}, - "password": []interface{}{map[string]interface{}{"name": "password"}}, - "ssh_tunnel": []interface{}{map[string]interface{}{"name": "ssh_conn"}}, - "ssl_certificate_authority": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "root"}}}}, - "ssl_certificate": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "cert"}}}}, - "ssl_key": []interface{}{map[string]interface{}{"name": "key"}}, - "ssl_mode": "verify-full", - "aws_privatelink": []interface{}{map[string]interface{}{"name": "link"}}, + "name": "conn", + "schema_name": "schema", + "database_name": "database", + "database": "default", + "host": "postgres_host", + "port": 5432, + "user": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "user"}}}}, + "password": []interface{}{map[string]interface{}{"name": "password"}}, + "ssh_tunnel": []interface{}{ + map[string]interface{}{ + "name": "ssh_conn", + "schema_name": "tunnel_schema", + "database_name": "tunnel_database", + }, + }, + "ssl_certificate_authority": []interface{}{ + map[string]interface{}{ + "secret": []interface{}{map[string]interface{}{ + "name": "root", + "database_name": "ssl_database", + }}, + }, + }, + "ssl_certificate": []interface{}{map[string]interface{}{"secret": []interface{}{map[string]interface{}{"name": "cert"}}}}, + "ssl_key": []interface{}{map[string]interface{}{"name": "key"}}, + "ssl_mode": "verify-full", + "aws_privatelink": []interface{}{map[string]interface{}{"name": "link"}}, } func TestResourceConnectionPostgresCreate(t *testing.T) { @@ -37,7 +50,7 @@ func TestResourceConnectionPostgresCreate(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE CONNECTION "database"."schema"."conn" TO POSTGRES \(HOST 'postgres_host', PORT 5432, USER SECRET "database"."schema"."user", PASSWORD SECRET "database"."schema"."password", SSL MODE 'verify-full', SSH TUNNEL "database"."schema"."ssh_conn", SSL CERTIFICATE AUTHORITY SECRET "database"."schema"."root", SSL CERTIFICATE SECRET "database"."schema"."cert", SSL KEY SECRET "database"."schema"."key", AWS PRIVATELINK "database"."schema"."link", DATABASE 'default'\);`, + `CREATE CONNECTION "database"."schema"."conn" TO POSTGRES \(HOST 'postgres_host', PORT 5432, USER SECRET "materialize"."public"."user", PASSWORD SECRET "materialize"."public"."password", SSL MODE 'verify-full', SSH TUNNEL "tunnel_database"."tunnel_schema"."ssh_conn", SSL CERTIFICATE AUTHORITY SECRET "ssl_database"."public"."root", SSL CERTIFICATE SECRET "materialize"."public"."cert", SSL KEY SECRET "materialize"."public"."key", AWS PRIVATELINK "materialize"."public"."link", DATABASE 'default'\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id diff --git a/pkg/resources/resource_sink_kafka_test.go b/pkg/resources/resource_sink_kafka_test.go index 2ee76f24..c26dee1f 100644 --- a/pkg/resources/resource_sink_kafka_test.go +++ b/pkg/resources/resource_sink_kafka_test.go @@ -13,18 +13,40 @@ import ( ) var inSinkKafka = map[string]interface{}{ - "name": "sink", - "schema_name": "schema", - "database_name": "database", - "cluster_name": "cluster", - "size": "small", - "from": []interface{}{map[string]interface{}{"name": "item", "schema_name": "public", "database_name": "database"}}, + "name": "sink", + "schema_name": "schema", + "database_name": "database", + "cluster_name": "cluster", + "size": "small", + "from": []interface{}{ + map[string]interface{}{ + "name": "item", + "schema_name": "public", + "database_name": "database", + }, + }, "kafka_connection": []interface{}{map[string]interface{}{"name": "kafka_conn"}}, "topic": "topic", "key": []interface{}{"key_1", "key_2"}, - "format": []interface{}{map[string]interface{}{"avro": []interface{}{map[string]interface{}{"avro_key_fullname": "avro_key_fullname", "avro_value_fullname": "avro_value_fullname", "schema_registry_connection": []interface{}{map[string]interface{}{"name": "csr_conn", "database_name": "database", "schema_name": "schema"}}}}}}, - "envelope": []interface{}{map[string]interface{}{"upsert": true}}, - "snapshot": false, + "format": []interface{}{ + map[string]interface{}{ + "avro": []interface{}{ + map[string]interface{}{ + "avro_key_fullname": "avro_key_fullname", + "avro_value_fullname": "avro_value_fullname", + "schema_registry_connection": []interface{}{ + map[string]interface{}{ + "name": "csr_conn", + "database_name": "database", + "schema_name": "schema", + }, + }, + }, + }, + }, + }, + "envelope": []interface{}{map[string]interface{}{"upsert": true}}, + "snapshot": false, } func TestResourceSinkKafkaCreate(t *testing.T) { @@ -35,7 +57,7 @@ func TestResourceSinkKafkaCreate(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE SINK "database"."schema"."sink" IN CLUSTER "cluster" FROM "database"."public"."item" INTO KAFKA CONNECTION "database"."schema"."kafka_conn" KEY \(key_1, key_2\) \(TOPIC 'topic'\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" WITH \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname'\) ENVELOPE UPSERT WITH \( SIZE = 'small' SNAPSHOT = false\);`, + `CREATE SINK "database"."schema"."sink" IN CLUSTER "cluster" FROM "database"."public"."item" INTO KAFKA CONNECTION "materialize"."public"."kafka_conn" KEY \(key_1, key_2\) \(TOPIC 'topic'\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" WITH \(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname'\) ENVELOPE UPSERT WITH \( SIZE = 'small' SNAPSHOT = false\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id diff --git a/pkg/resources/resource_source_kafka_test.go b/pkg/resources/resource_source_kafka_test.go index 0d836b31..7d43d04b 100644 --- a/pkg/resources/resource_source_kafka_test.go +++ b/pkg/resources/resource_source_kafka_test.go @@ -31,10 +31,25 @@ var inSourceKafka = map[string]interface{}{ "include_offset_alias": "offset", "include_timestamp": true, "include_timestamp_alias": "timestamp", - "format": []interface{}{map[string]interface{}{"avro": []interface{}{map[string]interface{}{"value_strategy": "avro_key_fullname", "schema_registry_connection": []interface{}{map[string]interface{}{"name": "csr_conn", "database_name": "database", "schema_name": "schema"}}}}}}, - "envelope": []interface{}{map[string]interface{}{"upsert": true}}, - "start_offset": []interface{}{1, 2, 3}, - "start_timestamp": -1000, + "format": []interface{}{ + map[string]interface{}{ + "avro": []interface{}{ + map[string]interface{}{ + "value_strategy": "avro_key_fullname", + "schema_registry_connection": []interface{}{ + map[string]interface{}{ + "name": "csr_conn", + "database_name": "database", + "schema_name": "schema", + }, + }, + }, + }, + }, + }, + "envelope": []interface{}{map[string]interface{}{"upsert": true}}, + "start_offset": []interface{}{1, 2, 3}, + "start_timestamp": -1000, } func TestResourceSourceKafkaCreate(t *testing.T) { @@ -45,7 +60,7 @@ func TestResourceSourceKafkaCreate(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM KAFKA CONNECTION "database"."schema"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname START OFFSET \[1, 2, 3\] INCLUDE KEY AS key, HEADERS AS headers, PARTITION AS partition, OFFSET AS offset, TIMESTAMP AS timestamp ENVELOPE UPSERT WITH \(SIZE = 'small'\);`, + `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM KAFKA CONNECTION "materialize"."public"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000\) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname START OFFSET \[1, 2, 3\] INCLUDE KEY AS key, HEADERS AS headers, PARTITION AS partition, OFFSET AS offset, TIMESTAMP AS timestamp ENVELOPE UPSERT WITH \(SIZE = 'small'\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id diff --git a/pkg/resources/resource_source_postgres_test.go b/pkg/resources/resource_source_postgres_test.go index ab6a1955..246caa59 100644 --- a/pkg/resources/resource_source_postgres_test.go +++ b/pkg/resources/resource_source_postgres_test.go @@ -14,14 +14,18 @@ import ( ) var inSourcePostgresTable = map[string]interface{}{ - "name": "source", - "schema_name": "schema", - "database_name": "database", - "cluster_name": "cluster", - "size": "small", - "postgres_connection": []interface{}{map[string]interface{}{"name": "pg_connection"}}, - "publication": "mz_source", - "text_columns": []interface{}{"table.unsupported_type_1"}, + "name": "source", + "schema_name": "schema", + "database_name": "database", + "cluster_name": "cluster", + "size": "small", + "postgres_connection": []interface{}{ + map[string]interface{}{ + "name": "pg_connection", + }, + }, + "publication": "mz_source", + "text_columns": []interface{}{"table.unsupported_type_1"}, "table": []interface{}{ map[string]interface{}{"name": "name1", "alias": "alias"}, map[string]interface{}{"name": "name2"}, @@ -37,7 +41,7 @@ func TestResourceSourcePostgresCreateTable(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "database"."schema"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR TABLES \(name1 AS alias, name2 AS name2\) WITH \(SIZE = 'small'\);`, + `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "materialize"."public"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR TABLES \(name1 AS alias, name2 AS name2\) WITH \(SIZE = 'small'\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id @@ -59,15 +63,19 @@ func TestResourceSourcePostgresCreateTable(t *testing.T) { } var inSourcePostgresSchema = map[string]interface{}{ - "name": "source", - "schema_name": "schema", - "database_name": "database", - "cluster_name": "cluster", - "size": "small", - "postgres_connection": []interface{}{map[string]interface{}{"name": "pg_connection"}}, - "publication": "mz_source", - "text_columns": []interface{}{"table.unsupported_type_1"}, - "schema": []interface{}{"schema1", "schema2"}, + "name": "source", + "schema_name": "schema", + "database_name": "database", + "cluster_name": "cluster", + "size": "small", + "postgres_connection": []interface{}{ + map[string]interface{}{ + "name": "pg_connection", + }, + }, + "publication": "mz_source", + "text_columns": []interface{}{"table.unsupported_type_1"}, + "schema": []interface{}{"schema1", "schema2"}, } func TestResourceSourcePostgresCreateSchema(t *testing.T) { @@ -78,7 +86,7 @@ func TestResourceSourcePostgresCreateSchema(t *testing.T) { testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) { // Create mock.ExpectExec( - `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "database"."schema"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR SCHEMAS \(schema1, schema2\) WITH \(SIZE = 'small'\);`, + `CREATE SOURCE "database"."schema"."source" IN CLUSTER "cluster" FROM POSTGRES CONNECTION "materialize"."public"."pg_connection" \(PUBLICATION 'mz_source', TEXT COLUMNS \(table.unsupported_type_1\)\) FOR SCHEMAS \(schema1, schema2\) WITH \(SIZE = 'small'\);`, ).WillReturnResult(sqlmock.NewResult(1, 1)) // Query Id diff --git a/pkg/resources/schema.go b/pkg/resources/schema.go index 74e3bc5b..0259ccbe 100644 --- a/pkg/resources/schema.go +++ b/pkg/resources/schema.go @@ -101,14 +101,16 @@ func IdentifierSchema(elem, description string, required bool) *schema.Schema { Required: true, }, "schema_name": { - Description: fmt.Sprintf("The %s schema name.", elem), + Description: fmt.Sprintf("The %s schema name. Defaults to `public`", elem), Type: schema.TypeString, Optional: true, + Default: defaultSchema, }, "database_name": { - Description: fmt.Sprintf("The %s database name.", elem), + Description: fmt.Sprintf("The %s database name. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.", elem), Type: schema.TypeString, Optional: true, + DefaultFunc: schema.EnvDefaultFunc("MZ_DATABASE", defaultDatabase), }, }, },