Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8541 Handle partially overridden transactions #220

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
LOGGER.info("No previous offset found");
}
else {
LOGGER.info("Found previous offset {}", previousOffset);
LOGGER.info("Found task {} previous offset {}", config.getString(VitessConnectorConfig.TASK_ID), previousOffset);
}

replicationConnection = new VitessReplicationConnection(connectorConfig, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,7 @@ private void parseGtid(String transactionId) {
}
}

public boolean isHostSetEqual(Gtid hosts) {
return this.hosts.equals(hosts.hosts);
}

public boolean isHostSetSupersetOf(Gtid previousHosts) {
return this.hosts.containsAll(previousHosts.hosts);
}

public boolean isHostSetSubsetOf(Gtid previousHosts) {
return previousHosts.hosts.containsAll(this.hosts);
public boolean isHostSetSupersetOf(Gtid otherHosts) {
return this.hosts.containsAll(otherHosts.hosts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class VitessEpochProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class);
private ShardEpochMap shardEpochMap;
private boolean isFirstTransaction = true;
private boolean isInheritEpochEnabled = false;

public VitessEpochProvider() {
Expand All @@ -33,36 +32,54 @@ public VitessEpochProvider(ShardEpochMap shardToEpoch, boolean isInheritEpochEna
this.isInheritEpochEnabled = isInheritEpochEnabled;
}

private static boolean isInvalidGtid(String gtid) {
private static boolean isGtidOverridden(String gtid) {
return gtid.equals(Vgtid.CURRENT_GTID) || gtid.equals(Vgtid.EMPTY_GTID);
}

public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString, boolean isFirstTransaction) {
if (isFirstTransaction && isInvalidGtid(previousGtidString)) {
private static boolean isStandardGtid(String gtid) {
return !isGtidOverridden(gtid);
}

public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) {
if (isGtidOverridden(previousGtidString) && isGtidOverridden(gtidString)) {
// GTID was overridden, and the current GTID is an overridden value, still waiting for first transaction
return previousEpoch;
}
else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) {
// GTID was overridden, received first transaction, increment epoch
LOGGER.info("Incrementing epoch: {}", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString));
return previousEpoch + 1;
}
else if (isInvalidGtid(previousGtidString)) {
throw new DebeziumException("Invalid GTID: The previous GTID cannot be one of current or empty after the first transaction " + gtidString);
else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) {
// previous GTID is standard, current GTID is overridden, should not be possible, raise exception
String message = String.format("Current GTID cannot be override value if previous is standard: %s",
getLogMessageForGtid(previousEpoch, previousGtidString, gtidString));
LOGGER.error(message);
throw new DebeziumException(message);
}
if (isInvalidGtid(gtidString)) {
throw new DebeziumException("Invalid GTID: The current GTID cannot be one of current or empty " + gtidString);
else {
// Both GTIDs are standard so parse them
return getEpochForStandardGtid(previousEpoch, previousGtidString, gtidString);
}
}

private static Long getEpochForStandardGtid(Long previousEpoch, String previousGtidString, String gtidString) {
Gtid previousGtid = new Gtid(previousGtidString);
Gtid gtid = new Gtid(gtidString);
if (previousGtid.isHostSetEqual(gtid) || gtid.isHostSetSupersetOf(previousGtid)) {
if (gtid.isHostSetSupersetOf(previousGtid)) {
return previousEpoch;
}
else if (gtid.isHostSetSubsetOf(previousGtid)) {
return previousEpoch + 1;
}
else {
LOGGER.error(
"Error determining epoch, previous host set: {}, host set: {}",
previousGtid, gtid);
throw new RuntimeException("Can't determine epoch");
// Any other case (disjoint set, previous is a superset), VStream has interpreted the previous GTID correctly and sent some new GTID
// in a continuous stream, so simply increment the epoch
return previousEpoch + 1;
}
}

private static String getLogMessageForGtid(Long previousEpoch, String previousGtidString, String gtidString) {
return String.format("GTID: %s, previous GTID: %s, previous Epoch: %s", gtidString, previousGtidString, previousEpoch);
}

public ShardEpochMap getShardEpochMap() {
return shardEpochMap;
}
Expand Down Expand Up @@ -105,16 +122,19 @@ public Long getEpoch(String shard, String previousVgtidString, String vgtidStrin
if (previousVgtidString == null) {
throw new DebeziumException(String.format("Previous vgtid string cannot be null shard %s current %s", shard, vgtidString));
}
Vgtid vgtid = Vgtid.of(vgtidString);
Vgtid previousVgtid = Vgtid.of(previousVgtidString);
this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid);
if (isFirstTransaction) {
isFirstTransaction = false;
try {
Vgtid vgtid = Vgtid.of(vgtidString);
Vgtid previousVgtid = Vgtid.of(previousVgtidString);
this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid, shard);
return shardEpochMap.get(shard);
}
catch (Exception e) {
LOGGER.error("Error providing epoch with shard {}, previousVgtid {}, vgtid {}", shard, previousVgtidString, vgtidString, e);
throw e;
}
return shardEpochMap.get(shard);
}

private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) {
private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid, String transactionShard) {
ShardEpochMap newShardEpochMap = new ShardEpochMap();
for (Vgtid.ShardGtid shardGtid : vgtid.getShardGtids()) {
String shard = shardGtid.getShard();
Expand All @@ -129,7 +149,7 @@ private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) {
"Previous epoch cannot be null for shard %s when shard present in previous vgtid %s",
shard, previousVgtid));
}
Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid, isFirstTransaction);
Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid);
newShardEpochMap.put(shard, epoch);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,53 @@

public class GtidTest {

private static final String EXPECTED_VERSION = "MySQL56";
private static final String HOST_SET1 = "/host1:1,host2:2-10";
private static final String GTID1 = EXPECTED_VERSION + HOST_SET1;

@Test
public void shouldInit() {
String expectedVersion = "MySQL56";
Gtid gtid = new Gtid(expectedVersion + "/host1:1-4,host2:2-10");
assertThat(gtid.getVersion()).isEqualTo(expectedVersion);
Gtid gtid = new Gtid(EXPECTED_VERSION + "/host1:1-4,host2:2-10");
assertThat(gtid.getVersion()).isEqualTo(EXPECTED_VERSION);
assertThat(gtid.getSequenceValues()).isEqualTo(List.of("4", "10"));
assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2"));
}

@Test
public void shouldHandleSingleValue() {
String expectedVersion = "MySQL56";
Gtid gtid = new Gtid(expectedVersion + "/host1:1,host2:2-10");
assertThat(gtid.getVersion()).isEqualTo(expectedVersion);
Gtid gtid = new Gtid(GTID1);
assertThat(gtid.getVersion()).isEqualTo(EXPECTED_VERSION);
assertThat(gtid.getSequenceValues()).isEqualTo(List.of("1", "10"));
assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2"));
}

@Test
public void testHostSupersetWithLargerSet() {
Gtid gtid = new Gtid(GTID1);
Gtid gtidSuperset = new Gtid(EXPECTED_VERSION + "/host1:1,host2:2-10,host3:1-5");
assertThat(gtidSuperset.isHostSetSupersetOf(gtid)).isTrue();
assertThat(gtid.isHostSetSupersetOf(gtidSuperset)).isFalse();
}

@Test
public void testHostSupersetWithEqualSet() {
Gtid gtid = new Gtid(GTID1);
Gtid gtid2 = new Gtid(GTID1);
assertThat(gtid.isHostSetSupersetOf(gtid2)).isTrue();
assertThat(gtid2.isHostSetSupersetOf(gtid)).isTrue();
}

@Test
public void shouldThrowExceptionOnEmptyStringWithPrefix() {
String expectedVersion = "MySQL56";
assertThatThrownBy(() -> {
Gtid gtid = new Gtid(expectedVersion + "/");
Gtid gtid = new Gtid(EXPECTED_VERSION + "/");
}).isInstanceOf(DebeziumException.class);
}

@Test
public void shouldThrowExceptionOnVersionOnly() {
String expectedVersion = "MySQL56";
assertThatThrownBy(() -> {
Gtid gtid = new Gtid(expectedVersion);
Gtid gtid = new Gtid(EXPECTED_VERSION);
}).isInstanceOf(DebeziumException.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.List;
import java.util.Map;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import io.debezium.DebeziumException;
Expand All @@ -44,6 +43,7 @@ public class VitessEpochProviderTest {
private String txIdVersion8 = "MySQL82/" + String.join(",", host1Tx2);

private List<String> shards = List.of(VgtidTest.TEST_SHARD, VgtidTest.TEST_SHARD2);
private String errorOnCurrentOverrideValue = "Current GTID cannot be override value if previous is standard";

String vgtidJsonCurrent = String.format(
VGTID_JSON_TEMPLATE,
Expand All @@ -56,7 +56,7 @@ public class VitessEpochProviderTest {

@Test
public void testGetEpochSameHostSet() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId);
assertThat(epoch).isEqualTo(0);
}

Expand Down Expand Up @@ -136,7 +136,9 @@ public void fastForwardVgtidIncrementsEpoch() {
public void currentVgtidIncrementsEpochForAllShards() {
VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false);
Long epochShard1 = provider.getEpoch(VgtidTest.TEST_SHARD, vgtidJsonCurrent, VGTID_JSON);
assertThat(provider.getShardEpochMap()).isEqualTo(new ShardEpochMap(Map.of(VgtidTest.TEST_SHARD, 1L, VgtidTest.TEST_SHARD2, 1L)));
Long epochShard2 = provider.getEpoch(VgtidTest.TEST_SHARD2, VGTID_JSON, VGTID_JSON);
assertThat(provider.getShardEpochMap()).isEqualTo(new ShardEpochMap(Map.of(VgtidTest.TEST_SHARD, 1L, VgtidTest.TEST_SHARD2, 1L)));
assertThat(epochShard1).isEqualTo(1L);
assertThat(epochShard2).isEqualTo(1L);
}
Expand Down Expand Up @@ -218,6 +220,58 @@ public void missingEpochWithPreviousVgtidShouldThrowException() {
}).isInstanceOf(DebeziumException.class).hasMessageContaining("Previous epoch cannot be null");
}

@Test
public void testGtidPartialCurrent() {
VitessEpochProvider provider = new VitessEpochProvider();
VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty());
provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH,
new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config);
String shard = "f0-f8";
String vgtidAllCurrent = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"current\",\"table_p_ks\":[]}]";
String vgtidOneShardWithGtid = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525090\",\"table_p_ks\":[]}]";
String vgtidOneShardCurrent = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588997\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]";
String vgtidOneShardCurrentNewGtid = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588998\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]";
String vgtidNoShardCurrent = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588997\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"host4:1-3\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]";
// The first transaction will have at least one shard with an actual GTID
provider.getEpoch(shard, vgtidAllCurrent, vgtidOneShardWithGtid);
// Eventually all but one shard will have a GTID
provider.getEpoch(shard, vgtidOneShardWithGtid, vgtidOneShardCurrent);
// We have received a legit GTID for all shards except one, that one can still be current
provider.getEpoch(shard, vgtidOneShardCurrent, vgtidOneShardCurrentNewGtid);
// We can continue to receive current for that GTID indefinitely
provider.getEpoch(shard, vgtidOneShardCurrentNewGtid, vgtidOneShardCurrentNewGtid);
// Eventually, we receive a GTID for that shard
provider.getEpoch("b0-b8", vgtidOneShardCurrentNewGtid, vgtidNoShardCurrent);
// After that if we received current again that would be an error
assertThatThrownBy(() -> {
provider.getEpoch("b0-b8", vgtidNoShardCurrent, vgtidOneShardCurrent);
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
// Assert that if we received current again even for a non-transaction shard, that would be an error
assertThatThrownBy(() -> {
provider.getEpoch(shard, vgtidNoShardCurrent, vgtidOneShardCurrent);
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
}

@Test
public void matchingGtidReturnsInitialEpoch() {
VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false);
Expand All @@ -242,7 +296,7 @@ public void testInvalidCurrentGtid() {
Vgtid.EMPTY_GTID);
assertThatThrownBy(() -> {
provider.getEpoch("-80", VGTID_JSON, vgtidJsonCurrent);
}).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid");
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
}

@Test
Expand All @@ -261,31 +315,31 @@ public void testInvalidEmptyGtid() {
Vgtid.EMPTY_GTID);
assertThatThrownBy(() -> {
provider.getEpoch("-80", VGTID_JSON, vgtidJsonEmpty);
}).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid");
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
}

@Test
public void testGetEpochShrunkHostSet() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk);
assertThat(epoch).isEqualTo(1);
}

@Test
public void testGetEpochExpandHostSet() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId);
assertThat(epoch).isEqualTo(0);
}

@Test
public void testGetEpochDisjointThrowsException() {
Assertions.assertThatThrownBy(() -> {
VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4", false);
}).isInstanceOf(RuntimeException.class);
public void testGetEpochDisjointIncrementsEpoch() {
long previousEpoch = 0L;
long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4");
assertThat(epoch).isEqualTo(previousEpoch + 1);
}

@Test
public void testVersionUpgradeDoesNotAffectEpoch() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8);
assertThat(epoch).isEqualTo(0L);
}
}
Loading