Skip to content

Commit

Permalink
storage: Disable broken Kafka source tests (#31159)
Browse files Browse the repository at this point in the history
Kafka sources do not work without a primary export, see
https://github.com/MaterializeInc/database-issues/issues/8909. So this
commit comments out the test that tests Kafka sources without a primary
export.
  • Loading branch information
jkosh44 authored Jan 23, 2025
1 parent b6c1b0a commit e0b652b
Showing 1 changed file with 100 additions and 97 deletions.
197 changes: 100 additions & 97 deletions test/testdrive/force-source-tables.td
Original file line number Diff line number Diff line change
Expand Up @@ -368,97 +368,99 @@ contains:unknown catalog item 'mysql_table_3'
# Kafka source using source-fed tables
#

$ set keyschema={
"type": "record",
"name": "Key",
"fields": [
{"name": "key", "type": "string"}
]
}

$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"},
{"name":"f2", "type":"long"}
]
}

> CREATE CONNECTION kafka_conn
TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL '${testdrive.schema-registry-url}'
);

$ kafka-create-topic topic=avroavro

$ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
{"key": "fish"} {"f1": "fish", "f2": 1000}
{"key": "bird1"} {"f1":"goose", "f2": 1}
{"key": "birdmore"} {"f1":"geese", "f2": 2}
{"key": "mammal1"} {"f1": "moose", "f2": 1}
{"key": "bird1"}
{"key": "birdmore"} {"f1":"geese", "f2": 56}
{"key": "mammalmore"} {"f1": "moose", "f2": 42}
{"key": "mammal1"}
{"key": "mammalmore"} {"f1":"moose", "f2": 2}

> CREATE SOURCE avro_source
IN CLUSTER ${arg.single-replica-cluster}
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}');

> CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT

> CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE

> CREATE TABLE avro_table_append_cols (a, b) FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE

> SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'avro_table_upsert';
running

> SELECT * from avro_table_upsert
key f1 f2
---------------------------
fish fish 1000
birdmore geese 56
mammalmore moose 2

> SELECT * from avro_table_append
f1 f2
---------------
fish 1000
geese 2
geese 56
goose 1
moose 1
moose 2
moose 42

> SELECT * from avro_table_append_cols
a b
---------------
fish 1000
geese 2
geese 56
goose 1
moose 1
moose 2
moose 42

> SHOW TABLES ON avro_source;
avro_table_append ""
avro_table_append_cols ""
avro_table_upsert ""

> DROP SOURCE avro_source CASCADE
# TODO(database-issues#8909): Re-enable when kafka sources work with force_source_table_syntax.

# $ set keyschema={
# "type": "record",
# "name": "Key",
# "fields": [
# {"name": "key", "type": "string"}
# ]
# }
#
# $ set schema={
# "type" : "record",
# "name" : "test",
# "fields" : [
# {"name":"f1", "type":"string"},
# {"name":"f2", "type":"long"}
# ]
# }
#
# > CREATE CONNECTION kafka_conn
# TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
#
# > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
# URL '${testdrive.schema-registry-url}'
# );
#
# $ kafka-create-topic topic=avroavro
#
# $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
# {"key": "fish"} {"f1": "fish", "f2": 1000}
# {"key": "bird1"} {"f1":"goose", "f2": 1}
# {"key": "birdmore"} {"f1":"geese", "f2": 2}
# {"key": "mammal1"} {"f1": "moose", "f2": 1}
# {"key": "bird1"}
# {"key": "birdmore"} {"f1":"geese", "f2": 56}
# {"key": "mammalmore"} {"f1": "moose", "f2": 42}
# {"key": "mammal1"}
# {"key": "mammalmore"} {"f1":"moose", "f2": 2}
#
# > CREATE SOURCE avro_source
# IN CLUSTER ${arg.single-replica-cluster}
# FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}');
#
# > CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
# FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
# ENVELOPE UPSERT
#
# > CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
# FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
# ENVELOPE NONE
#
# > CREATE TABLE avro_table_append_cols (a, b) FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
# FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
# ENVELOPE NONE
#
# > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'avro_table_upsert';
# running
#
# > SELECT * from avro_table_upsert
# key f1 f2
# ---------------------------
# fish fish 1000
# birdmore geese 56
# mammalmore moose 2
#
# > SELECT * from avro_table_append
# f1 f2
# ---------------
# fish 1000
# geese 2
# geese 56
# goose 1
# moose 1
# moose 2
# moose 42
#
# > SELECT * from avro_table_append_cols
# a b
# ---------------
# fish 1000
# geese 2
# geese 56
# goose 1
# moose 1
# moose 2
# moose 42
#
# > SHOW TABLES ON avro_source;
# avro_table_append ""
# avro_table_append_cols ""
# avro_table_upsert ""
#
# > DROP SOURCE avro_source CASCADE

#
# Key-value load generator source using source-fed tables
Expand Down Expand Up @@ -533,9 +535,10 @@ contains:not supported; use CREATE TABLE .. FROM SOURCE instead
FOR SCHEMAS (public);
contains:not supported; use CREATE TABLE .. FROM SOURCE instead

! CREATE SOURCE avro_source_2
IN CLUSTER ${arg.single-replica-cluster}
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
contains:not supported; use CREATE TABLE .. FROM SOURCE instead
# TODO(database-issues#8909): Re-enable when kafka sources work with force_source_table_syntax.
#! CREATE SOURCE avro_source_2
# IN CLUSTER ${arg.single-replica-cluster}
# FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
# FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
# ENVELOPE UPSERT;
#contains:not supported; use CREATE TABLE .. FROM SOURCE instead

0 comments on commit e0b652b

Please sign in to comment.