Skip to content

Commit

Permalink
DBZ-8325 Add vitess default value converter
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 29, 2024
1 parent 57c9c39 commit 0792f05
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.vitess.connection.DdlMessage;
import io.debezium.connector.vitess.jdbc.VitessBinlogValueConverter;
import io.debezium.connector.vitess.jdbc.VitessDefaultValueConverter;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -50,7 +49,7 @@ public VitessDatabaseSchema(
config.getColumnFilter(),
new TableSchemaBuilder(
VitessValueConverter.getInstance(config),
new VitessDefaultValueConverter(VitessBinlogValueConverter.getInstance(config)),
new VitessDefaultValueConverter(VitessValueConverter.getInstance(config)),
schemaNameAdjuster,
config.customConverterRegistry(),
config.getSourceInfoStructMaker().schema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public VitessSnapshotChangeEventSource(
this.connectorConfig = connectorConfig;
this.connection = connectionFactory.mainConnection();
this.schema = schema;
this.shards = new VitessMetadata(connectorConfig).getShards();
if (connectorConfig.getVitessTaskKeyShards() == null) {
this.shards = new VitessMetadata(connectorConfig).getShards();
}
else {
this.shards = connectorConfig.getVitessTaskKeyShards();
}
}

@Override
Expand Down Expand Up @@ -101,6 +106,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,

for (TableId tableId : capturedSchemaTables) {
String sql = "SHOW CREATE TABLE " + quote(tableId);
// TODO: We need retries for this
connection.query(sql, rs -> {
if (rs.next()) {
String ddlStatement = rs.getString(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
partition, offsetContext, ddlMessage,
connectorConfig.getKeyspace());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
TableId tableIdWithShard = VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), connectorConfig.getKeyspace(), tableId.table());
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableIdWithShard, (receiver) -> {
TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
if (tableId != null) {
tableId = VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), connectorConfig.getKeyspace(), tableId.table());
}
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,36 @@

package io.debezium.connector.vitess.jdbc;

import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import java.util.Optional;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.connector.vitess.VitessValueConverter;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.ValueConverter;

/**
* Create a binlog default value converter to be passed into the {@link io.debezium.relational.TableSchemaBuilder}
* in {@link io.debezium.connector.vitess.VitessDatabaseSchema}
* @author Thomas Thornton
*/
public class VitessDefaultValueConverter extends BinlogDefaultValueConverter {
public class VitessDefaultValueConverter implements DefaultValueConverter {
VitessValueConverter converter;

public VitessDefaultValueConverter(VitessValueConverter converter) {
this.converter = converter;
}

public VitessDefaultValueConverter(BinlogValueConverters converters) {
super(converters);
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValueExpression) {
final SchemaBuilder schemaBuilder = converter.schemaBuilder(column);
if (schemaBuilder == null) {
return Optional.of(defaultValueExpression);
}
final Field field = new Field(column.name(), -1, schemaBuilder.build());
final ValueConverter valueConverter = converter.converter(column, field);
return Optional.ofNullable(valueConverter.convert(defaultValueExpression));
}
}
12 changes: 1 addition & 11 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,6 @@ public static Configuration.Builder defaultConfig() {
return defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TEST_SHARD, null, null);
}

public static String getKeyspaceTopicPrefix(boolean hasMultipleShards) {
String keyspace;
if (hasMultipleShards) {
keyspace = TEST_SHARDED_KEYSPACE;
}
else {
keyspace = TEST_UNSHARDED_KEYSPACE;
}
return String.join(".", TEST_SERVER, keyspace);
}

/**
* Get the default configuration of the connector
*
Expand Down Expand Up @@ -166,6 +155,7 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards,
.with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT)
.with(VitessConnectorConfig.VTGATE_USER, USERNAME)
.with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class)
.with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, 5000)
.with(VitessConnectorConfig.POLL_INTERVAL_MS, 100);
Expand Down
21 changes: 10 additions & 11 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static io.debezium.connector.vitess.TestHelper.TEST_SHARD_TO_EPOCH;
import static io.debezium.connector.vitess.TestHelper.TEST_UNSHARDED_KEYSPACE;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE;
import static io.debezium.connector.vitess.TestHelper.getKeyspaceTopicPrefix;
import static junit.framework.TestCase.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -728,7 +727,7 @@ public void shouldOffsetIncrementAfterDDL() throws Exception {

// insert 1 row to get the initial vgtid
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedRecordsCount);
SourceRecord sourceRecord = assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);

// apply DDL
Expand Down Expand Up @@ -756,7 +755,7 @@ public void shouldSchemaUpdatedAfterOnlineDdl() throws Exception {
startConnector();
assertConnectorIsRunning();
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedRecordsCount);
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);
// Add a column using online ddl and wait until it is finished
String ddlId = TestHelper.applyOnlineDdl("ALTER TABLE numeric_table ADD COLUMN foo INT", TEST_UNSHARDED_KEYSPACE);
Expand Down Expand Up @@ -1942,7 +1941,7 @@ public void testCopyAndReplicateTable() throws Exception {

// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, TestHelper.getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table");
Expand Down Expand Up @@ -1972,7 +1971,7 @@ public void testSnapshotForTableWithEnums() throws Exception {
}

// We should receive a record written before starting the connector.
consumer = testConsumer(totalRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(totalRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 1; i <= totalRecordsCount; i++) {
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_TYPE_STMT), TestHelper.PK_FIELD);
Expand Down Expand Up @@ -2017,7 +2016,7 @@ public void testSnapshotForTableWithEnumsAmbiguous() throws Exception {
}

// We should receive a record written before starting the connector.
consumer = testConsumer(totalRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(totalRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 1; i <= totalRecordsCount; i++) {
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_AMBIGUOUS_TYPE_STMT), TestHelper.PK_FIELD);
Expand Down Expand Up @@ -2056,7 +2055,7 @@ public void testVgtidIncludesLastPkDuringTableCopy() throws Exception {
-1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD);

// We should receive a record written before starting the connector.
consumer = testConsumer(expectedSnapshotRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedSnapshotRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 1; i <= expectedSnapshotRecordsCount; i++) {
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
Expand Down Expand Up @@ -2182,7 +2181,7 @@ public void testCopyNoRecordsAndReplicateTable() throws Exception {
startConnector(Function.identity(), false, false, 1, -1, -1, tableInclude, null, null);

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedRecordsCount);

// We should receive record from numeric_table
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);
Expand All @@ -2200,7 +2199,7 @@ public void testInitialSnapshotModeHaveMultiShard() throws Exception {

// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(hasMultipleShards));
consumer = testConsumer(expectedRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT, TEST_SHARDED_KEYSPACE), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_SHARDED_KEYSPACE, "numeric_table");
Expand All @@ -2223,7 +2222,7 @@ public void testCopyTableAndRestart() throws Exception {

// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table");
Expand All @@ -2247,7 +2246,7 @@ public void testCopyAndReplicatePerTaskOffsetStorage() throws Exception {

// We should receive a record written before starting the connector.
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false));
consumer = testConsumer(expectedRecordsCount);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD);
assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table");
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/vitess_create_tables.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DROP TABLE IF EXISTS ddl_table;
CREATE TABLE ddl_table
(
id BIGINT NOT NULL AUTO_INCREMENT,
int_unsigned_col INT UNSIGNED DEFAULT 0,
PRIMARY KEY (id)
)
PARTITION BY RANGE (id) (
Expand Down

0 comments on commit 0792f05

Please sign in to comment.