From fea0cbc92081ee1e4def33b1d0a479b2705b94bf Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 19 Dec 2024 16:46:56 -0700 Subject: [PATCH 1/2] DBZ-8541 Handle partially overridden transactions --- .../connector/vitess/VitessConnectorTask.java | 2 +- .../vitess/pipeline/txmetadata/Gtid.java | 13 +--- .../txmetadata/VitessEpochProvider.java | 70 ++++++++++------- .../vitess/pipeline/txmetadata/GtidTest.java | 36 ++++++--- .../txmetadata/VitessEpochProviderTest.java | 75 ++++++++++++++++--- 5 files changed, 138 insertions(+), 58 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java index 64609f91..d0bef4ff 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java @@ -90,7 +90,7 @@ protected ChangeEventSourceCoordinator sta LOGGER.info("No previous offset found"); } else { - LOGGER.info("Found previous offset {}", previousOffset); + LOGGER.info("Found task {} previous offset {}", config.getString(VitessConnectorConfig.TASK_ID), previousOffset); } replicationConnection = new VitessReplicationConnection(connectorConfig, schema); diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java index 176f07c3..bf8c822b 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java @@ -6,6 +6,7 @@ package io.debezium.connector.vitess.pipeline.txmetadata; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -77,15 +78,7 @@ private void parseGtid(String transactionId) { } } - public boolean isHostSetEqual(Gtid hosts) { - return this.hosts.equals(hosts.hosts); - } - - public boolean isHostSetSupersetOf(Gtid previousHosts) { - return this.hosts.containsAll(previousHosts.hosts); - } - - public boolean isHostSetSubsetOf(Gtid previousHosts) { - return previousHosts.hosts.containsAll(this.hosts); + public boolean isHostSetSupersetOf(Gtid otherHosts) { + return this.hosts.containsAll(otherHosts.hosts); } } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java index ddd4682c..1eee3c9d 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -21,7 +21,6 @@ public class VitessEpochProvider { private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class); private ShardEpochMap shardEpochMap; - private boolean isFirstTransaction = true; private boolean isInheritEpochEnabled = false; public VitessEpochProvider() { @@ -33,34 +32,48 @@ public VitessEpochProvider(ShardEpochMap shardToEpoch, boolean isInheritEpochEna this.isInheritEpochEnabled = isInheritEpochEnabled; } - private static boolean isInvalidGtid(String gtid) { + private static boolean isGtidOverridden(String gtid) { return gtid.equals(Vgtid.CURRENT_GTID) || gtid.equals(Vgtid.EMPTY_GTID); } - public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString, boolean isFirstTransaction) { - if (isFirstTransaction && isInvalidGtid(previousGtidString)) { + private static boolean isStandardGtid(String gtid) { + return !isGtidOverridden(gtid); + } + + public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) { + if (isGtidOverridden(previousGtidString) && isGtidOverridden(gtidString)) { + // GTID was overridden, and the current GTID is an overridden value, still waiting for first transaction + return previousEpoch; + } else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) { + // GTID was overridden, received first transaction, increment epoch + LOGGER.info("Incrementing epoch: {}", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString)); return previousEpoch + 1; + } else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) { + // previous GTID is standard, current GTID is overridden, should not be possible, raise exception + String message = String.format("Current GTID cannot be override value if previous is standard: %s", + getLogMessageForGtid(previousEpoch, previousGtidString, gtidString)); + LOGGER.error(message); + throw new DebeziumException(message); + } else { + // Both GTIDs are standard so parse them + return getEpochForStandardGtid(previousEpoch, previousGtidString, gtidString); } - else if (isInvalidGtid(previousGtidString)) { - throw new DebeziumException("Invalid GTID: The previous GTID cannot be one of current or empty after the first transaction " + gtidString); - } - if (isInvalidGtid(gtidString)) { - throw new DebeziumException("Invalid GTID: The current GTID cannot be one of current or empty " + gtidString); - } + } + + private static Long getEpochForStandardGtid(Long previousEpoch, String previousGtidString, String gtidString) { Gtid previousGtid = new Gtid(previousGtidString); Gtid gtid = new Gtid(gtidString); - if (previousGtid.isHostSetEqual(gtid) || gtid.isHostSetSupersetOf(previousGtid)) { + if (gtid.isHostSetSupersetOf(previousGtid)) { return previousEpoch; - } - else if (gtid.isHostSetSubsetOf(previousGtid)) { + } else { + // Any other case (disjoint set, previous is a superset), VStream has interpreted the previous GTID correctly and sent some new GTID + // in a continuous stream, so simply increment the epoch return previousEpoch + 1; } - else { - LOGGER.error( - "Error determining epoch, previous host set: {}, host set: {}", - previousGtid, gtid); - throw new RuntimeException("Can't determine epoch"); - } + } + + private static String getLogMessageForGtid(Long previousEpoch, String previousGtidString, String gtidString) { + return String.format("GTID: %s, previous GTID: %s, previous Epoch: %s", gtidString, previousGtidString, previousEpoch); } public ShardEpochMap getShardEpochMap() { @@ -105,16 +118,19 @@ public Long getEpoch(String shard, String previousVgtidString, String vgtidStrin if (previousVgtidString == null) { throw new DebeziumException(String.format("Previous vgtid string cannot be null shard %s current %s", shard, vgtidString)); } - Vgtid vgtid = Vgtid.of(vgtidString); - Vgtid previousVgtid = Vgtid.of(previousVgtidString); - this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid); - if (isFirstTransaction) { - isFirstTransaction = false; + try { + Vgtid vgtid = Vgtid.of(vgtidString); + Vgtid previousVgtid = Vgtid.of(previousVgtidString); + this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid, shard); + return shardEpochMap.get(shard); + } + catch (Exception e) { + LOGGER.error("Error providing epoch with shard {}, previousVgtid {}, vgtid {}", shard, previousVgtidString, vgtidString, e); + throw e; } - return shardEpochMap.get(shard); } - private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) { + private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid, String transactionShard) { ShardEpochMap newShardEpochMap = new ShardEpochMap(); for (Vgtid.ShardGtid shardGtid : vgtid.getShardGtids()) { String shard = shardGtid.getShard(); @@ -129,7 +145,7 @@ private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) { "Previous epoch cannot be null for shard %s when shard present in previous vgtid %s", shard, previousVgtid)); } - Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid, isFirstTransaction); + Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid); newShardEpochMap.put(shard, epoch); } else { diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java index cdcef5fe..a027d121 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java @@ -17,37 +17,53 @@ public class GtidTest { + private static final String EXPECTED_VERSION = "MySQL56"; + private static final String HOST_SET1 = "/host1:1,host2:2-10"; + private static final String GTID1 = EXPECTED_VERSION + HOST_SET1; + @Test public void shouldInit() { - String expectedVersion = "MySQL56"; - Gtid gtid = new Gtid(expectedVersion + "/host1:1-4,host2:2-10"); - assertThat(gtid.getVersion()).isEqualTo(expectedVersion); + Gtid gtid = new Gtid(EXPECTED_VERSION + "/host1:1-4,host2:2-10"); + assertThat(gtid.getVersion()).isEqualTo(EXPECTED_VERSION); assertThat(gtid.getSequenceValues()).isEqualTo(List.of("4", "10")); assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2")); } @Test public void shouldHandleSingleValue() { - String expectedVersion = "MySQL56"; - Gtid gtid = new Gtid(expectedVersion + "/host1:1,host2:2-10"); - assertThat(gtid.getVersion()).isEqualTo(expectedVersion); + Gtid gtid = new Gtid(GTID1); + assertThat(gtid.getVersion()).isEqualTo(EXPECTED_VERSION); assertThat(gtid.getSequenceValues()).isEqualTo(List.of("1", "10")); assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2")); } + @Test + public void testHostSupersetWithLargerSet() { + Gtid gtid = new Gtid(GTID1); + Gtid gtidSuperset = new Gtid(EXPECTED_VERSION + "/host1:1,host2:2-10,host3:1-5"); + assertThat(gtidSuperset.isHostSetSupersetOf(gtid)).isTrue(); + assertThat(gtid.isHostSetSupersetOf(gtidSuperset)).isFalse(); + } + + @Test + public void testHostSupersetWithEqualSet() { + Gtid gtid = new Gtid(GTID1); + Gtid gtid2 = new Gtid(GTID1); + assertThat(gtid.isHostSetSupersetOf(gtid2)).isTrue(); + assertThat(gtid2.isHostSetSupersetOf(gtid)).isTrue(); + } + @Test public void shouldThrowExceptionOnEmptyStringWithPrefix() { - String expectedVersion = "MySQL56"; assertThatThrownBy(() -> { - Gtid gtid = new Gtid(expectedVersion + "/"); + Gtid gtid = new Gtid(EXPECTED_VERSION + "/"); }).isInstanceOf(DebeziumException.class); } @Test public void shouldThrowExceptionOnVersionOnly() { - String expectedVersion = "MySQL56"; assertThatThrownBy(() -> { - Gtid gtid = new Gtid(expectedVersion); + Gtid gtid = new Gtid(EXPECTED_VERSION); }).isInstanceOf(DebeziumException.class); } diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java index 12ba6f70..a735d5f9 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java @@ -28,6 +28,7 @@ import io.debezium.connector.vitess.Vgtid; import io.debezium.connector.vitess.VgtidTest; import io.debezium.connector.vitess.VitessConnectorConfig; +import io.debezium.junit.logging.LogInterceptor; public class VitessEpochProviderTest { @@ -44,6 +45,7 @@ public class VitessEpochProviderTest { private String txIdVersion8 = "MySQL82/" + String.join(",", host1Tx2); private List shards = List.of(VgtidTest.TEST_SHARD, VgtidTest.TEST_SHARD2); + private String errorOnCurrentOverrideValue = "Current GTID cannot be override value if previous is standard"; String vgtidJsonCurrent = String.format( VGTID_JSON_TEMPLATE, @@ -56,7 +58,7 @@ public class VitessEpochProviderTest { @Test public void testGetEpochSameHostSet() { - Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId, false); + Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId); assertThat(epoch).isEqualTo(0); } @@ -136,7 +138,9 @@ public void fastForwardVgtidIncrementsEpoch() { public void currentVgtidIncrementsEpochForAllShards() { VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); Long epochShard1 = provider.getEpoch(VgtidTest.TEST_SHARD, vgtidJsonCurrent, VGTID_JSON); + assertThat(provider.getShardEpochMap()).isEqualTo(new ShardEpochMap(Map.of(VgtidTest.TEST_SHARD, 1L, VgtidTest.TEST_SHARD2, 1L))); Long epochShard2 = provider.getEpoch(VgtidTest.TEST_SHARD2, VGTID_JSON, VGTID_JSON); + assertThat(provider.getShardEpochMap()).isEqualTo(new ShardEpochMap(Map.of(VgtidTest.TEST_SHARD, 1L, VgtidTest.TEST_SHARD2, 1L))); assertThat(epochShard1).isEqualTo(1L); assertThat(epochShard2).isEqualTo(1L); } @@ -218,6 +222,57 @@ public void missingEpochWithPreviousVgtidShouldThrowException() { }).isInstanceOf(DebeziumException.class).hasMessageContaining("Previous epoch cannot be null"); } + @Test + public void testGtidPartialCurrent() { + VitessEpochProvider provider = new VitessEpochProvider(); + VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config); + String shard = "f0-f8"; + String vgtidAllCurrent = "[" + + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"current\",\"table_p_ks\":[]}]"; + String vgtidOneShardWithGtid = "[" + + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525090\",\"table_p_ks\":[]}]"; + String vgtidOneShardCurrent = "[" + + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588997\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]"; + String vgtidOneShardCurrentNewGtid = "[" + + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588998\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]"; + String vgtidNoShardCurrent = "[" + + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588997\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"host4:1-3\",\"table_p_ks\":[]}," + + "{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]"; + // The first transaction will have at least one shard with an actual GTID + provider.getEpoch(shard, vgtidAllCurrent, vgtidOneShardWithGtid); + // Eventually all but one shard will have a GTID + provider.getEpoch(shard, vgtidOneShardWithGtid, vgtidOneShardCurrent); + // We have received a legit GTID for all shards except one, that one can still be current + provider.getEpoch(shard, vgtidOneShardCurrent, vgtidOneShardCurrentNewGtid); + // We can continue to receive current for that GTID indefinitely + provider.getEpoch(shard, vgtidOneShardCurrentNewGtid, vgtidOneShardCurrentNewGtid); + // Eventually, we receive a GTID for that shard + provider.getEpoch("b0-b8", vgtidOneShardCurrentNewGtid, vgtidNoShardCurrent); + // After that if we received current again that would be an error + assertThatThrownBy(() -> { + provider.getEpoch("b0-b8", vgtidNoShardCurrent, vgtidOneShardCurrent); + }).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue); + // Assert that if we received current again even for a non-transaction shard, that would be an error + assertThatThrownBy(() -> { + provider.getEpoch(shard, vgtidNoShardCurrent, vgtidOneShardCurrent); + }).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue); + } + @Test public void matchingGtidReturnsInitialEpoch() { VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); @@ -242,7 +297,7 @@ public void testInvalidCurrentGtid() { Vgtid.EMPTY_GTID); assertThatThrownBy(() -> { provider.getEpoch("-80", VGTID_JSON, vgtidJsonCurrent); - }).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid"); + }).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue); } @Test @@ -261,31 +316,31 @@ public void testInvalidEmptyGtid() { Vgtid.EMPTY_GTID); assertThatThrownBy(() -> { provider.getEpoch("-80", VGTID_JSON, vgtidJsonEmpty); - }).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid"); + }).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue); } @Test public void testGetEpochShrunkHostSet() { - Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk, false); + Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk); assertThat(epoch).isEqualTo(1); } @Test public void testGetEpochExpandHostSet() { - Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId, false); + Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId); assertThat(epoch).isEqualTo(0); } @Test - public void testGetEpochDisjointThrowsException() { - Assertions.assertThatThrownBy(() -> { - VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4", false); - }).isInstanceOf(RuntimeException.class); + public void testGetEpochDisjointIncrementsEpoch() { + long previousEpoch = 0L; + long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4"); + assertThat(epoch).isEqualTo(previousEpoch + 1); } @Test public void testVersionUpgradeDoesNotAffectEpoch() { - Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8, false); + Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8); assertThat(epoch).isEqualTo(0L); } } From b7a6bcf5c31e6359643f27607854cfd8ca312cc8 Mon Sep 17 00:00:00 2001 From: twthorn Date: Fri, 20 Dec 2024 11:53:42 -0800 Subject: [PATCH 2/2] DBZ-8541 Fix format --- .../connector/vitess/pipeline/txmetadata/Gtid.java | 1 - .../pipeline/txmetadata/VitessEpochProvider.java | 12 ++++++++---- .../pipeline/txmetadata/VitessEpochProviderTest.java | 5 ++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java index bf8c822b..808b710e 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java @@ -6,7 +6,6 @@ package io.debezium.connector.vitess.pipeline.txmetadata; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java index 1eee3c9d..6c8a039c 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -44,17 +44,20 @@ public static Long getEpochForGtid(Long previousEpoch, String previousGtidString if (isGtidOverridden(previousGtidString) && isGtidOverridden(gtidString)) { // GTID was overridden, and the current GTID is an overridden value, still waiting for first transaction return previousEpoch; - } else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) { + } + else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) { // GTID was overridden, received first transaction, increment epoch LOGGER.info("Incrementing epoch: {}", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString)); return previousEpoch + 1; - } else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) { + } + else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) { // previous GTID is standard, current GTID is overridden, should not be possible, raise exception String message = String.format("Current GTID cannot be override value if previous is standard: %s", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString)); LOGGER.error(message); throw new DebeziumException(message); - } else { + } + else { // Both GTIDs are standard so parse them return getEpochForStandardGtid(previousEpoch, previousGtidString, gtidString); } @@ -65,7 +68,8 @@ private static Long getEpochForStandardGtid(Long previousEpoch, String previousG Gtid gtid = new Gtid(gtidString); if (gtid.isHostSetSupersetOf(previousGtid)) { return previousEpoch; - } else { + } + else { // Any other case (disjoint set, previous is a superset), VStream has interpreted the previous GTID correctly and sent some new GTID // in a continuous stream, so simply increment the epoch return previousEpoch + 1; diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java index a735d5f9..967132de 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; -import org.assertj.core.api.Assertions; import org.junit.Test; import io.debezium.DebeziumException; @@ -28,7 +27,6 @@ import io.debezium.connector.vitess.Vgtid; import io.debezium.connector.vitess.VgtidTest; import io.debezium.connector.vitess.VitessConnectorConfig; -import io.debezium.junit.logging.LogInterceptor; public class VitessEpochProviderTest { @@ -226,7 +224,8 @@ public void missingEpochWithPreviousVgtidShouldThrowException() { public void testGtidPartialCurrent() { VitessEpochProvider provider = new VitessEpochProvider(); VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); - provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config); + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, + new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config); String shard = "f0-f8"; String vgtidAllCurrent = "[" + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," +