Skip to content

Commit

Permalink
Add Tests for INCLUDE KEY AS for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Jan 12, 2024
1 parent 029fbc2 commit 5878301
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions pkg/resources/resource_source_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,95 @@ func TestResourceSourceKafkaCreate(t *testing.T) {
}
})
}

func TestResourceSourceKafkaCreateIncludeTrueNoAlias(t *testing.T) {
r := require.New(t)

testInSourceKafka := inSourceKafka
testInSourceKafka["include_key"] = true
delete(testInSourceKafka, "include_key_alias")
testInSourceKafka["include_headers"] = true
delete(testInSourceKafka, "include_headers_alias")
testInSourceKafka["include_partition"] = true
delete(testInSourceKafka, "include_partition_alias")
testInSourceKafka["include_offset"] = true
delete(testInSourceKafka, "include_offset_alias")
testInSourceKafka["include_timestamp"] = true
delete(testInSourceKafka, "include_timestamp_alias")

d := schema.TestResourceDataRaw(t, SourceKafka().Schema, testInSourceKafka)
r.NotNil(d)

testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE SOURCE "database"."schema"."source"
IN CLUSTER "cluster" FROM KAFKA CONNECTION "materialize"."public"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000, START OFFSET \(1,2,3\)\)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname
INCLUDE KEY,
HEADERS,
PARTITION,
OFFSET,
TIMESTAMP
ENVELOPE UPSERT
WITH \(SIZE = 'small'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_sources.name = 'source'`
testhelpers.MockSourceScan(mock, ip)

// Query Params
pp := `WHERE mz_sources.id = 'u1'`
testhelpers.MockSourceScan(mock, pp)

// Query Subsources
ps := `WHERE mz_object_dependencies.object_id = 'u1' AND mz_objects.type = 'source'`
testhelpers.MockSubsourceScan(mock, ps)

if err := sourceKafkaCreate(context.TODO(), d, db); err != nil {
t.Fatal(err)
}
})
}

func TestResourceSourceKafkaCreateIncludeFalseWithAlias(t *testing.T) {
r := require.New(t)

testInSourceKafka := inSourceKafka
testInSourceKafka["include_key"] = false
testInSourceKafka["include_headers"] = false
testInSourceKafka["include_partition"] = false
testInSourceKafka["include_offset"] = false
testInSourceKafka["include_timestamp"] = false

d := schema.TestResourceDataRaw(t, SourceKafka().Schema, testInSourceKafka)
r.NotNil(d)

testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
// Create
mock.ExpectExec(
`CREATE SOURCE "database"."schema"."source"
IN CLUSTER "cluster" FROM KAFKA CONNECTION "materialize"."public"."kafka_conn" \(TOPIC 'topic', START TIMESTAMP -1000, START OFFSET \(1,2,3\)\)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn" VALUE STRATEGY avro_key_fullname
ENVELOPE UPSERT
WITH \(SIZE = 'small'\);`,
).WillReturnResult(sqlmock.NewResult(1, 1))

// Query Id
ip := `WHERE mz_databases.name = 'database' AND mz_schemas.name = 'schema' AND mz_sources.name = 'source'`
testhelpers.MockSourceScan(mock, ip)

// Query Params
pp := `WHERE mz_sources.id = 'u1'`
testhelpers.MockSourceScan(mock, pp)

// Query Subsources
ps := `WHERE mz_object_dependencies.object_id = 'u1' AND mz_objects.type = 'source'`
testhelpers.MockSubsourceScan(mock, ps)

if err := sourceKafkaCreate(context.TODO(), d, db); err != nil {
t.Fatal(err)
}
})
}

0 comments on commit 5878301

Please sign in to comment.