Skip to content

Commit

Permalink
DBZ-6721 Add local vgtid transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed May 28, 2024
1 parent 1caeb61 commit dd79a7c
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/main/java/io/debezium/connector/vitess/Vgtid.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.debezium.connector.vitess;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -36,17 +37,24 @@ public class Vgtid {
@JsonIgnore
private final Binlogdata.VGtid rawVgtid;
private final List<ShardGtid> shardGtids = new ArrayList<>();
private final HashMap<String, ShardGtid> shardNameToShardGtid = new HashMap<>();

private Vgtid(Binlogdata.VGtid rawVgtid) {
this.rawVgtid = rawVgtid;
for (Binlogdata.ShardGtid shardGtid : rawVgtid.getShardGtidsList()) {
TablePrimaryKeys tabletablePrimaryKeys = new TablePrimaryKeys(shardGtid.getTablePKsList());
shardGtids.add(new ShardGtid(shardGtid.getKeyspace(), shardGtid.getShard(), shardGtid.getGtid(), tabletablePrimaryKeys.getTableLastPrimaryKeys()));
ShardGtid currentShardGtid = new ShardGtid(
shardGtid.getKeyspace(), shardGtid.getShard(), shardGtid.getGtid(), tabletablePrimaryKeys.getTableLastPrimaryKeys());
shardGtids.add(currentShardGtid);
shardNameToShardGtid.put(shardGtid.getShard(), currentShardGtid);
}
}

private Vgtid(List<ShardGtid> shardGtids) {
this.shardGtids.addAll(shardGtids);
for (ShardGtid shardGtid : shardGtids) {
this.shardNameToShardGtid.put(shardGtid.shard, shardGtid);
}

Binlogdata.VGtid.Builder builder = Binlogdata.VGtid.newBuilder();
for (ShardGtid shardGtid : shardGtids) {
Expand Down Expand Up @@ -89,13 +97,16 @@ public List<ShardGtid> getShardGtids() {
return shardGtids;
}

public Vgtid getLocalVgtid(String shard) {
return Vgtid.of(List.of(getShardGtid(shard)));
}

public ShardGtid getShardGtid(String shard) {
for (ShardGtid shardGtid : shardGtids) {
if (shardGtid.shard.equals(shard)) {
return shardGtid;
}
ShardGtid shardGtid = shardNameToShardGtid.get(shard);
if (shardGtid == null) {
throw new DebeziumException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
}
throw new DebeziumException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
return shardGtid;
}

public boolean isSingleShard() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.transforms;

import static io.debezium.connector.vitess.SourceInfo.SHARD_KEY;
import static io.debezium.connector.vitess.SourceInfo.VGTID_KEY;

import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;

import io.debezium.config.Configuration;
import io.debezium.connector.vitess.Module;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.transforms.SmtManager;

public class UseLocalVgtid<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {

private SmtManager<R> smtManager;

@Override
public R apply(R record) {
if (record.value() instanceof Struct) {
Struct value = (Struct) record.value();
Schema schema = record.valueSchema();

String localVgtid = getLocalVgtid(value);

if (localVgtid != null) {
Struct updatedValue = modifyStruct("", value, schema, localVgtid);
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
schema, updatedValue, record.timestamp());
}

}
return record;
}

private String getLocalVgtid(Struct value) {
if (value.schema().field("source") != null) {
Struct sourceStruct = (Struct) value.get("source");
String shard = sourceStruct.getString(SHARD_KEY);
String vgtid = sourceStruct.getString(VGTID_KEY);
return Vgtid.of(vgtid).getLocalVgtid(shard).toString();
}
else {
return null;
}
}

private Struct modifyStruct(String previousPath, Struct struct, Schema schema, String localVgtid) {
// change to work only with exact path match
Struct updatedStruct = new Struct(schema);
for (Field field : schema.fields()) {
String fullName = previousPath.isEmpty() ? field.name() : previousPath + "." + field.name();
Object fieldValue = struct.get(field);
if (fieldValue instanceof Struct) {
Struct nestedStruct = (Struct) fieldValue;
Struct updatedNestedStruct = modifyStruct(fullName, nestedStruct, field.schema(), localVgtid);
updatedStruct.put(field, updatedNestedStruct);
}
else {
if (fullName.equals("source." + VGTID_KEY)) {
updatedStruct.put(field, localVgtid);
}
else {
updatedStruct.put(field, fieldValue);
}
}
}
return updatedStruct;
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
smtManager = new SmtManager<>(config);
}

@Override
public String version() {
return Module.version();
}
}
7 changes: 7 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VgtidTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ public void shouldGetShardGtid() {
assertThat(shardGtid).isEqualTo(new Vgtid.ShardGtid(TEST_KEYSPACE, TEST_SHARD, TEST_GTID));
}

@Test
public void shouldGetLocalVgtid() {
Vgtid vgtid = Vgtid.of(VGTID_JSON);
Vgtid shardGtid = vgtid.getLocalVgtid(TEST_SHARD);
assertThat(shardGtid).isEqualTo(Vgtid.of(List.of(new Vgtid.ShardGtid(TEST_KEYSPACE, TEST_SHARD, TEST_GTID))));
}

@Test
public void shouldGetMissingShardGtidThrowsDebeziumException() {
Vgtid vgtid1 = Vgtid.of(VGTID_JSON);
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.TableId;
import io.debezium.transforms.ExcludeTransactionComponents;
import io.debezium.util.Collect;
import io.debezium.util.Testing;

Expand Down Expand Up @@ -541,6 +542,64 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception {
assertRecordEnd(expectedTxId1, expectedRecordsCount);
}

@Test
public void shouldUseLocalVgtid() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with("transforms", "useLocalVgtid")
.with("transforms.useLocalVgtid.type", "io.debezium.connector.vitess.transforms.UseLocalVgtid"),
true,
"-80,80-");
assertConnectorIsRunning();

Vgtid baseVgtid = TestHelper.getCurrentVgtid();
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount + 2);

String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
String insertQuery = "INSERT INTO numeric_table ("
+ "tinyint_col,"
+ "tinyint_unsigned_col,"
+ "smallint_col,"
+ "smallint_unsigned_col,"
+ "mediumint_col,"
+ "mediumint_unsigned_col,"
+ "int_col,"
+ "int_unsigned_col,"
+ "bigint_col,"
+ "bigint_unsigned_col,"
+ "bigint_unsigned_overflow_col,"
+ "float_col,"
+ "double_col,"
+ "decimal_col,"
+ "boolean_col)"
+ " VALUES " + rowValue;
StringBuilder insertRows = new StringBuilder().append(insertQuery);
for (int i = 1; i < expectedRecordsCount; i++) {
insertRows.append(", ").append(rowValue);
}

String insertRowsStatement = insertRows.toString();

// exercise SUT
executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE);
// First transaction.
SourceRecord beginRecord = assertRecordBeginSourceRecord();
String expectedTxId1 = ((Struct) beginRecord.value()).getString("id");
Long expectedEpoch = 0L;
for (int i = 1; i <= expectedRecordsCount; i++) {
SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD);
Struct source = (Struct) ((Struct) record.value()).get("source");
Vgtid sourceVgtid = Vgtid.of(source.getString("vgtid"));
// We have two shards for multi-shard keyspace, a local vgtid should only have one shard
assertThat(sourceVgtid.getShardGtids().size()).isEqualTo(1);
assertThat(sourceVgtid.getShardGtids().get(0).getShard()).isEqualTo(source.getString("shard"));
}
assertRecordEnd(expectedTxId1, expectedRecordsCount);
}

@Test
public void shouldIncrementEpochWhenFastForwardVgtidWithOrderedTransactionMetadata() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.transforms;

import static io.debezium.connector.vitess.SourceInfo.SHARD_KEY;
import static io.debezium.connector.vitess.SourceInfo.VGTID_KEY;
import static io.debezium.connector.vitess.VgtidTest.TEST_GTID;
import static io.debezium.connector.vitess.VgtidTest.TEST_KEYSPACE;
import static io.debezium.connector.vitess.VgtidTest.TEST_SHARD;
import static io.debezium.connector.vitess.VgtidTest.VGTID_JSON;
import static org.assertj.core.api.Assertions.assertThat;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;

import io.debezium.data.Envelope;
import io.debezium.schema.SchemaFactory;

public class UseLocalVgtidTest {

public static final String LOCAL_VGTID_JSON_WITH_LAST_PK_TEMPLATE = "[" +
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" +
"]";

public static final String LOCAL_VGTID_JSON_WITH_LAST_PK = String.format(
LOCAL_VGTID_JSON_WITH_LAST_PK_TEMPLATE,
TEST_KEYSPACE,
TEST_SHARD,
TEST_GTID);

@Test
public void shouldNotReplaceATableWithVgtidColumn() {
Schema recordSchema = SchemaBuilder.struct()
.field(VGTID_KEY, Schema.STRING_SCHEMA)
.build();
Schema sourceSchema = SchemaBuilder.struct()
.field(VGTID_KEY, Schema.STRING_SCHEMA)
.field(SHARD_KEY, Schema.STRING_SCHEMA).build();
Envelope envelope = SchemaFactory.get().datatypeEnvelopeSchema()
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
Schema valueSchema = envelope.schema();
Struct valueStruct = new Struct(valueSchema)
.put("before", new Struct(recordSchema).put(VGTID_KEY, "foo"))
.put("after", new Struct(recordSchema).put(VGTID_KEY, "foo"))
.put("op", "c")
.put("source", new Struct(sourceSchema)
.put(VGTID_KEY, VGTID_JSON)
.put(SHARD_KEY, TEST_SHARD));

SourceRecord sourceRecord = new SourceRecord(
null,
null,
"topic",
0,
null,
null,
valueSchema,
valueStruct,
null);

UseLocalVgtid<SourceRecord> useLocalVgtid = new UseLocalVgtid();
SourceRecord result = useLocalVgtid.apply(sourceRecord);

Envelope expectedEnvelope = SchemaFactory.get().datatypeEnvelopeSchema()
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
Schema expectedValueSchema = expectedEnvelope.schema();
Struct expectedValueStruct = new Struct(expectedValueSchema)
.put("before", new Struct(recordSchema).put(VGTID_KEY, "foo"))
.put("after", new Struct(recordSchema).put(VGTID_KEY, "foo"))
.put("op", "c")
.put("source", new Struct(sourceSchema)
.put(VGTID_KEY, LOCAL_VGTID_JSON_WITH_LAST_PK)
.put(SHARD_KEY, TEST_SHARD));

assertThat(result.value()).isEqualTo(expectedValueStruct);
}

@Test
public void shouldReplaceWithLocalVgtid() {
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.STRING_SCHEMA)
.build();
Schema sourceSchema = SchemaBuilder.struct()
.field(VGTID_KEY, Schema.STRING_SCHEMA)
.field(SHARD_KEY, Schema.STRING_SCHEMA).build();
Envelope envelope = SchemaFactory.get().datatypeEnvelopeSchema()
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
Schema valueSchema = envelope.schema();
Struct valueStruct = new Struct(valueSchema)
.put("before", new Struct(recordSchema).put("id", "foo"))
.put("after", new Struct(recordSchema).put("id", "foo"))
.put("op", "c")
.put("source", new Struct(sourceSchema)
.put(VGTID_KEY, VGTID_JSON)
.put(SHARD_KEY, TEST_SHARD));

SourceRecord sourceRecord = new SourceRecord(
null,
null,
"topic",
0,
null,
null,
valueSchema,
valueStruct,
null);

UseLocalVgtid<SourceRecord> useLocalVgtid = new UseLocalVgtid();
SourceRecord result = useLocalVgtid.apply(sourceRecord);

Envelope expectedEnvelope = SchemaFactory.get().datatypeEnvelopeSchema()
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
Schema expectedValueSchema = expectedEnvelope.schema();
Struct expectedValueStruct = new Struct(expectedValueSchema)
.put("before", new Struct(recordSchema).put("id", "foo"))
.put("after", new Struct(recordSchema).put("id", "foo"))
.put("op", "c")
.put("source", new Struct(sourceSchema)
.put(VGTID_KEY, LOCAL_VGTID_JSON_WITH_LAST_PK)
.put(SHARD_KEY, TEST_SHARD));

assertThat(result.value()).isEqualTo(expectedValueStruct);
}
}

0 comments on commit dd79a7c

Please sign in to comment.