diff --git a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java index b34996ff..a2083523 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java +++ b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java @@ -17,7 +17,6 @@ import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.connector.vitess.connection.DdlMessage; -import io.debezium.connector.vitess.jdbc.VitessBinlogValueConverter; import io.debezium.connector.vitess.jdbc.VitessDefaultValueConverter; import io.debezium.relational.HistorizedRelationalDatabaseSchema; import io.debezium.relational.Table; @@ -50,7 +49,7 @@ public VitessDatabaseSchema( config.getColumnFilter(), new TableSchemaBuilder( VitessValueConverter.getInstance(config), - new VitessDefaultValueConverter(VitessBinlogValueConverter.getInstance(config)), + new VitessDefaultValueConverter(VitessValueConverter.getInstance(config)), schemaNameAdjuster, config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), diff --git a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java index be529074..4ff8343a 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java @@ -59,7 +59,12 @@ public VitessSnapshotChangeEventSource( this.connectorConfig = connectorConfig; this.connection = connectionFactory.mainConnection(); this.schema = schema; - this.shards = new VitessMetadata(connectorConfig).getShards(); + if (connectorConfig.getVitessTaskKeyShards() == null) { + this.shards = new VitessMetadata(connectorConfig).getShards(); + } + else { + this.shards = connectorConfig.getVitessTaskKeyShards(); + } } @Override @@ -101,6 +106,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, for (TableId tableId : capturedSchemaTables) { String sql = "SHOW CREATE TABLE " + quote(tableId); + // TODO: We need retries for this connection.query(sql, rs -> { if (rs.next()) { String ddlStatement = rs.getString(2); diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index 68905b23..ca1decf3 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -124,9 +124,11 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) { partition, offsetContext, ddlMessage, connectorConfig.getKeyspace()); for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { - final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id(); - TableId tableIdWithShard = VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), connectorConfig.getKeyspace(), tableId.table()); - dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableIdWithShard, (receiver) -> { + TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id(); + if (tableId != null) { + tableId = VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), connectorConfig.getKeyspace(), tableId.table()); + } + dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> { try { receiver.schemaChangeEvent(schemaChangeEvent); } diff --git a/src/main/java/io/debezium/connector/vitess/jdbc/VitessDefaultValueConverter.java b/src/main/java/io/debezium/connector/vitess/jdbc/VitessDefaultValueConverter.java index a275e21c..2c609808 100644 --- a/src/main/java/io/debezium/connector/vitess/jdbc/VitessDefaultValueConverter.java +++ b/src/main/java/io/debezium/connector/vitess/jdbc/VitessDefaultValueConverter.java @@ -6,17 +6,36 @@ package io.debezium.connector.vitess.jdbc; -import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter; -import io.debezium.connector.binlog.jdbc.BinlogValueConverters; +import java.util.Optional; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.connector.vitess.VitessValueConverter; +import io.debezium.relational.Column; +import io.debezium.relational.DefaultValueConverter; +import io.debezium.relational.ValueConverter; /** * Create a binlog default value converter to be passed into the {@link io.debezium.relational.TableSchemaBuilder} * in {@link io.debezium.connector.vitess.VitessDatabaseSchema} * @author Thomas Thornton */ -public class VitessDefaultValueConverter extends BinlogDefaultValueConverter { +public class VitessDefaultValueConverter implements DefaultValueConverter { + VitessValueConverter converter; + + public VitessDefaultValueConverter(VitessValueConverter converter) { + this.converter = converter; + } - public VitessDefaultValueConverter(BinlogValueConverters converters) { - super(converters); + @Override + public Optional parseDefaultValue(Column column, String defaultValueExpression) { + final SchemaBuilder schemaBuilder = converter.schemaBuilder(column); + if (schemaBuilder == null) { + return Optional.of(defaultValueExpression); + } + final Field field = new Field(column.name(), -1, schemaBuilder.build()); + final ValueConverter valueConverter = converter.converter(column, field); + return Optional.ofNullable(valueConverter.convert(defaultValueExpression)); } } diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index 8a83a82b..00afa4a1 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -93,17 +93,6 @@ public static Configuration.Builder defaultConfig() { return defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TEST_SHARD, null, null); } - public static String getKeyspaceTopicPrefix(boolean hasMultipleShards) { - String keyspace; - if (hasMultipleShards) { - keyspace = TEST_SHARDED_KEYSPACE; - } - else { - keyspace = TEST_UNSHARDED_KEYSPACE; - } - return String.join(".", TEST_SERVER, keyspace); - } - /** * Get the default configuration of the connector * @@ -166,6 +155,7 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards, .with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT) .with(VitessConnectorConfig.VTGATE_USER, USERNAME) .with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD) + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class) .with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, 5000) .with(VitessConnectorConfig.POLL_INTERVAL_MS, 100); diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index d2a392a2..fddd40a1 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -17,7 +17,6 @@ import static io.debezium.connector.vitess.TestHelper.TEST_SHARD_TO_EPOCH; import static io.debezium.connector.vitess.TestHelper.TEST_UNSHARDED_KEYSPACE; import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE; -import static io.debezium.connector.vitess.TestHelper.getKeyspaceTopicPrefix; import static junit.framework.TestCase.assertEquals; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; @@ -728,7 +727,7 @@ public void shouldOffsetIncrementAfterDDL() throws Exception { // insert 1 row to get the initial vgtid int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedRecordsCount); SourceRecord sourceRecord = assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD); // apply DDL @@ -756,7 +755,7 @@ public void shouldSchemaUpdatedAfterOnlineDdl() throws Exception { startConnector(); assertConnectorIsRunning(); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedRecordsCount); assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD); // Add a column using online ddl and wait until it is finished String ddlId = TestHelper.applyOnlineDdl("ALTER TABLE numeric_table ADD COLUMN foo INT", TEST_UNSHARDED_KEYSPACE); @@ -1942,7 +1941,7 @@ public void testCopyAndReplicateTable() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, TestHelper.getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table"); @@ -1972,7 +1971,7 @@ public void testSnapshotForTableWithEnums() throws Exception { } // We should receive a record written before starting the connector. - consumer = testConsumer(totalRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(totalRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= totalRecordsCount; i++) { SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_TYPE_STMT), TestHelper.PK_FIELD); @@ -2017,7 +2016,7 @@ public void testSnapshotForTableWithEnumsAmbiguous() throws Exception { } // We should receive a record written before starting the connector. - consumer = testConsumer(totalRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(totalRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= totalRecordsCount; i++) { SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_AMBIGUOUS_TYPE_STMT), TestHelper.PK_FIELD); @@ -2056,7 +2055,7 @@ public void testVgtidIncludesLastPkDuringTableCopy() throws Exception { -1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD); // We should receive a record written before starting the connector. - consumer = testConsumer(expectedSnapshotRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedSnapshotRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= expectedSnapshotRecordsCount; i++) { SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); @@ -2182,7 +2181,7 @@ public void testCopyNoRecordsAndReplicateTable() throws Exception { startConnector(Function.identity(), false, false, 1, -1, -1, tableInclude, null, null); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedRecordsCount); // We should receive record from numeric_table assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD); @@ -2200,7 +2199,7 @@ public void testInitialSnapshotModeHaveMultiShard() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(hasMultipleShards)); + consumer = testConsumer(expectedRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT, TEST_SHARDED_KEYSPACE), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_SHARDED_KEYSPACE, "numeric_table"); @@ -2223,7 +2222,7 @@ public void testCopyTableAndRestart() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table"); @@ -2247,7 +2246,7 @@ public void testCopyAndReplicatePerTaskOffsetStorage() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount, getKeyspaceTopicPrefix(false)); + consumer = testConsumer(expectedRecordsCount); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table"); diff --git a/src/test/resources/vitess_create_tables.ddl b/src/test/resources/vitess_create_tables.ddl index 7411f043..84b48cd3 100644 --- a/src/test/resources/vitess_create_tables.ddl +++ b/src/test/resources/vitess_create_tables.ddl @@ -24,6 +24,7 @@ DROP TABLE IF EXISTS ddl_table; CREATE TABLE ddl_table ( id BIGINT NOT NULL AUTO_INCREMENT, + int_unsigned_col INT UNSIGNED DEFAULT 0, PRIMARY KEY (id) ) PARTITION BY RANGE (id) (