diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 8da599cf..6437cee9 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -32,6 +32,7 @@ import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.vitess.connection.VitessTabletType; import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; import io.debezium.heartbeat.Heartbeat; import io.debezium.heartbeat.HeartbeatConnectionProvider; import io.debezium.heartbeat.HeartbeatErrorHandler; @@ -301,6 +302,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue .withDescription("Control StopOnReshard VStream flag." + " If set true, the old VStream will be stopped after a reshard operation."); + public static final Field INHERIT_EPOCH = Field.create(VITESS_CONFIG_GROUP_PREFIX + "inherit.epoch") + .withDisplayName("Inherit epoch") + .withType(Type.BOOLEAN) + .withDefault(false) + .withWidth(Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withValidation(VitessConnectorConfig::validateInheritEpoch) + .withDescription("Controls whether the epochs of a new shard after a re-shard operation inherits epochs from its parent shards"); + public static final Field KEEPALIVE_INTERVAL_MS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "keepalive.interval.ms") .withDisplayName("VStream gRPC keepalive interval (ms)") .withType(Type.LONG) @@ -622,6 +632,15 @@ private static int validateShardEpochMap(Configuration config, Field field, Vali return 0; } + private static int validateInheritEpoch(Configuration config, Field field, ValidationOutput problems) { + Boolean inheritEpoch = config.getBoolean(field); + String factory = config.getString(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY); + if (inheritEpoch && !factory.equals(VitessOrderedTransactionMetadataFactory.class.getName())) { + problems.accept(field, inheritEpoch, "Inherit epoch cannot be enabled without VitessOrderedTransactionMetadataFactory"); + } + return 0; + } + public String getVtgateHost() { return getConfig().getString(VTGATE_HOST); } @@ -646,6 +665,10 @@ public boolean getStopOnReshard() { return getConfig().getBoolean(STOP_ON_RESHARD_FLAG); } + public boolean getInheritEpoch() { + return getConfig().getBoolean(INHERIT_EPOCH); + } + public Duration getKeepaliveInterval() { return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS); } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/ShardLineage.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/ShardLineage.java new file mode 100644 index 00000000..697eadff --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/ShardLineage.java @@ -0,0 +1,95 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.pipeline.txmetadata; + +import java.util.Map; + +/** + * Class used to determine which parents a shard range descended from. Used to set the epoch to the succeediing + * epoch of its parents. + */ +public class ShardLineage { + + /** + * Return the epoch value of the shard, based on its parents epochs. + * If there are parents present, return the max of the parent epochs plus one. + * If there are no parents present, it returns zero. + * + * @param shardString The descendant shard to find parents of + * @param shardEpochMap The map to search for parents + * @return The epoch value of the descendant shard + */ + public static Long getInheritedEpoch(String shardString, ShardEpochMap shardEpochMap) { + Shard shard = new Shard(shardString); + + Long maxParentEpoch = -1L; + for (Map.Entry shardEpoch : shardEpochMap.getMap().entrySet()) { + String currentShardString = shardEpoch.getKey(); + Long currentEpoch = shardEpoch.getValue(); + Shard currentShard = new Shard(currentShardString); + if (shard.overlaps(currentShard)) { + maxParentEpoch = Math.max(maxParentEpoch, currentEpoch); + } + } + + return maxParentEpoch + 1; + } + + private static class Shard { + + // A string of a single char that is lexicographically less than all other chars + public static final String NEGATIVE_INFINITY = String.valueOf(Character.MIN_VALUE); + // A string of a single char that is lexicographically greater than all other chars + public static final String POSITIVE_INFINITY = String.valueOf(Character.MAX_VALUE); + + private final String lowerBound; + private final String upperBound; + + Shard(String shard) { + String[] shardInterval = getShardInterval(shard.toLowerCase()); + this.lowerBound = getLowerBound(shardInterval); + this.upperBound = getUpperBound(shardInterval); + validateBounds(); + } + + private void validateBounds() { + if (this.lowerBound.compareTo(this.upperBound) >= 0) { + throw new IllegalArgumentException("Invalid shard range " + this); + } + } + + public boolean overlaps(Shard shard) { + return this.lowerBound.compareTo(shard.upperBound) < 0 && this.upperBound.compareTo(shard.lowerBound) > 0; + } + + private static String getLowerBound(String[] shardInterval) { + if (shardInterval.length < 1 || shardInterval[0].isEmpty()) { + return NEGATIVE_INFINITY; + } + return shardInterval[0]; + } + + private static String getUpperBound(String[] shardInterval) { + if (shardInterval.length != 2 || shardInterval[1].isEmpty()) { + return POSITIVE_INFINITY; + } + return shardInterval[1]; + } + + private static String[] getShardInterval(String shard) { + return shard.split("-"); + } + + @Override + public String toString() { + return "Shard{" + + "lowerBound=" + lowerBound + + ", upperBound=" + upperBound + + "}"; + } + } +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java index 12c0de20..ddd4682c 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -22,13 +22,15 @@ public class VitessEpochProvider { private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class); private ShardEpochMap shardEpochMap; private boolean isFirstTransaction = true; + private boolean isInheritEpochEnabled = false; public VitessEpochProvider() { shardEpochMap = new ShardEpochMap(); } - public VitessEpochProvider(ShardEpochMap shardToEpoch) { + public VitessEpochProvider(ShardEpochMap shardToEpoch, boolean isInheritEpochEnabled) { this.shardEpochMap = shardToEpoch; + this.isInheritEpochEnabled = isInheritEpochEnabled; } private static boolean isInvalidGtid(String gtid) { @@ -76,7 +78,8 @@ public ShardEpochMap getShardEpochMap() { */ public static VitessEpochProvider initialize(VitessConnectorConfig config) { ShardEpochMap shardEpochMap = VitessReplicationConnection.defaultShardEpochMap(config); - return new VitessEpochProvider(shardEpochMap); + boolean isInheritEpochEnabled = config.getInheritEpoch(); + return new VitessEpochProvider(shardEpochMap, isInheritEpochEnabled); } public Map store(Map offset) { @@ -90,11 +93,12 @@ public Map store(Map offset) { * * @param offsets Offsets to load */ - public void load(Map offsets) { + public void load(Map offsets, VitessConnectorConfig config) { String shardToEpochString = (String) offsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH); if (!Strings.isNullOrEmpty(shardToEpochString)) { shardEpochMap = ShardEpochMap.of(shardToEpochString); } + isInheritEpochEnabled = config.getInheritEpoch(); } public Long getEpoch(String shard, String previousVgtidString, String vgtidString) { @@ -103,14 +107,15 @@ public Long getEpoch(String shard, String previousVgtidString, String vgtidStrin } Vgtid vgtid = Vgtid.of(vgtidString); Vgtid previousVgtid = Vgtid.of(previousVgtidString); - processVgtid(previousVgtid, vgtid); + this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid); if (isFirstTransaction) { isFirstTransaction = false; } return shardEpochMap.get(shard); } - private void processVgtid(Vgtid previousVgtid, Vgtid vgtid) { + private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) { + ShardEpochMap newShardEpochMap = new ShardEpochMap(); for (Vgtid.ShardGtid shardGtid : vgtid.getShardGtids()) { String shard = shardGtid.getShard(); String gtid = shardGtid.getGtid(); @@ -125,17 +130,24 @@ private void processVgtid(Vgtid previousVgtid, Vgtid vgtid) { shard, previousVgtid)); } Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid, isFirstTransaction); - shardEpochMap.put(shard, epoch); + newShardEpochMap.put(shard, epoch); } else { - // A re-shard happened while we are streaming set the new value to zero - // TODO: Add support to inherit epoch from ancestor shard - shardEpochMap.put(shard, 0L); + // A re-shard happened while we are streaming + Long epoch; + if (isInheritEpochEnabled) { + epoch = ShardLineage.getInheritedEpoch(shard, shardEpochMap); + } + else { + epoch = 0L; + } + newShardEpochMap.put(shard, epoch); } } - // Note: we could purge all shards from the shard epoch map that are not present in the current vgtid. - // However, this poses some risk of losing epoch values, so we leave them as is. There may be dormant shards - // that we still have epoch values for, but that should be fine. Once we allow for epochs to be inherited from other shards - // we could reconsider purging them to ensure the epoch shard map does not grow too large. + return newShardEpochMap; + } + + public boolean isInheritEpochEnabled() { + return isInheritEpochEnabled; } } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java index 17e833eb..227eeefd 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java @@ -8,6 +8,7 @@ import java.math.BigDecimal; import java.util.Map; +import io.debezium.annotation.VisibleForTesting; import io.debezium.connector.vitess.SourceInfo; import io.debezium.connector.vitess.Vgtid; import io.debezium.connector.vitess.VitessConnectorConfig; @@ -22,6 +23,7 @@ public class VitessOrderedTransactionContext extends TransactionContext { protected Long transactionEpoch = 0L; protected BigDecimal transactionRank = null; private VitessEpochProvider epochProvider = new VitessEpochProvider(); + private VitessConnectorConfig config = null; public VitessOrderedTransactionContext() { } @@ -76,14 +78,14 @@ public Map store(Map offset) { */ @Override public TransactionContext newTransactionContextFromOffsets(Map offsets) { - return VitessOrderedTransactionContext.load(offsets); + return VitessOrderedTransactionContext.load(offsets, this.config); } - public static VitessOrderedTransactionContext load(Map offsets) { + public static VitessOrderedTransactionContext load(Map offsets, VitessConnectorConfig config) { TransactionContext transactionContext = TransactionContext.load(offsets); VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext(transactionContext); vitessOrderedTransactionContext.previousVgtid = (String) offsets.get(SourceInfo.VGTID_KEY); - vitessOrderedTransactionContext.epochProvider.load(offsets); + vitessOrderedTransactionContext.epochProvider.load(offsets, config); return vitessOrderedTransactionContext; } @@ -99,6 +101,7 @@ public static VitessOrderedTransactionContext initialize(VitessConnectorConfig c VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext(); vitessOrderedTransactionContext.epochProvider = VitessEpochProvider.initialize(config); vitessOrderedTransactionContext.previousVgtid = VitessReplicationConnection.defaultVgtid(config).toString(); + vitessOrderedTransactionContext.config = config; return vitessOrderedTransactionContext; } @@ -141,4 +144,9 @@ public BigDecimal getTransactionRank() { return transactionRank; } + @VisibleForTesting + public VitessEpochProvider getEpochProvider() { + return epochProvider; + } + } diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index c8487dce..ca0b77fa 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -69,6 +69,10 @@ public class TestHelper { "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\"}" + "]"; + public static final String VGTID_SINGLE_SHARD_JSON_TEMPLATE = "[" + + "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" + + "]"; + public static final String VGTID_JSON_TEMPLATE = "[" + "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}," + "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" + diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java index 744c8053..ab583331 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java @@ -15,7 +15,9 @@ import org.junit.Test; +import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; import io.debezium.heartbeat.Heartbeat; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.schema.SchemaNameAdjuster; @@ -116,4 +118,38 @@ public void shouldImproperShardEpochMapFailValidation() { assertThat(inputs.size()).isEqualTo(1); } + @Test + public void shouldEnableInheritEpoch() { + Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.INHERIT_EPOCH, true).build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + assertThat(connectorConfig.getInheritEpoch()).isTrue(); + } + + @Test + public void shouldValidateInheritEpochWithoutOrderedTransactionMetadata() { + Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.INHERIT_EPOCH, true).build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.INHERIT_EPOCH), printConsumer); + assertThat(inputs.size()).isEqualTo(1); + } + + @Test + public void shouldValidateInheritEpochWithOrderedTransactionMetadata() { + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.INHERIT_EPOCH, true) + .with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.INHERIT_EPOCH), printConsumer); + assertThat(inputs.size()).isEqualTo(0); + } + } diff --git a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java index 005434bc..398ca8bd 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java @@ -133,6 +133,32 @@ public void shouldLoadVitessOrderedTransactionContext() throws JsonProcessingExc assertThat(orderedTransactionContext.getTransactionEpoch()).isEqualTo(expectedEpoch2); } + @Test + public void shouldLoadVitessOrderedTransactionContextWithInheritEpoch() throws JsonProcessingException { + VitessConnectorConfig config = new VitessConnectorConfig( + TestHelper.defaultConfig() + .with(VitessConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .with(VitessConnectorConfig.INHERIT_EPOCH, true) + .build()); + VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config); + ObjectMapper objectMapper = new ObjectMapper(); + Long expectedEpoch1 = 2L; + Long expectedEpoch2 = 3L; + String shard1 = "-80"; + String shard2 = "80-"; + Map offsets = Map.of( + SourceInfo.VGTID_KEY, VGTID_JSON, + TransactionContext.OFFSET_TRANSACTION_ID, VGTID_JSON, + VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, objectMapper.writeValueAsString(Map.of( + shard1, expectedEpoch1, + shard2, expectedEpoch2))); + VitessOffsetContext context = loader.load(offsets); + TransactionContext transactionContext = context.getTransactionContext(); + assertThat(transactionContext).isInstanceOf(VitessOrderedTransactionContext.class); + VitessOrderedTransactionContext orderedTransactionContext = (VitessOrderedTransactionContext) transactionContext; + assertThat(orderedTransactionContext.getEpochProvider().isInheritEpochEnabled()).isTrue(); + } + @Test public void shouldGetInitialVitessOrderedTransactionContext() { VitessConnectorConfig config = new VitessConnectorConfig( @@ -146,4 +172,18 @@ public void shouldGetInitialVitessOrderedTransactionContext() { assertThat(orderedTransactionContext.getPreviousVgtid()).isEqualTo( "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"current\",\"table_p_ks\":[]}]"); } + + @Test + public void shouldGetInitialVitessOrderedTransactionContextWithInheritEpoch() { + VitessConnectorConfig config = new VitessConnectorConfig( + TestHelper.defaultConfig() + .with(VitessConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .with(VitessConnectorConfig.INHERIT_EPOCH, true) + .build()); + VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system()); + TransactionContext transactionContext = context.getTransactionContext(); + assertThat(transactionContext).isInstanceOf(VitessOrderedTransactionContext.class); + VitessOrderedTransactionContext orderedTransactionContext = (VitessOrderedTransactionContext) transactionContext; + assertThat(orderedTransactionContext.getEpochProvider().isInheritEpochEnabled()).isTrue(); + } } diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/ShardLineageTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/ShardLineageTest.java new file mode 100644 index 00000000..09cd2d4c --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/ShardLineageTest.java @@ -0,0 +1,110 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Map; + +import org.junit.Test; + +public class ShardLineageTest { + + @Test + public void shouldGetInheritedEpoch_SingleShard_SplitOneShard() { + Long parentEpoch = 1L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("0", parentEpoch)); + Long epoch = ShardLineage.getInheritedEpoch("-80", shardEpochMap); + assertThat(epoch).isEqualTo(parentEpoch + 1); + Long epoch2 = ShardLineage.getInheritedEpoch("80-", shardEpochMap); + assertThat(epoch2).isEqualTo(parentEpoch + 1); + } + + @Test + public void shouldGetInheritedEpoch_IllegalShardRange() { + Long parentEpoch = 1L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("0", parentEpoch)); + assertThatThrownBy(() -> ShardLineage.getInheritedEpoch("80-30", shardEpochMap)) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Invalid shard range"); + } + + @Test + public void shouldGetInheritedEpoch_MultiShard_SplitOneShard() { + Long parentEpoch = 1L; + Long parentEpoch2 = 3L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("-80", parentEpoch, "80-", parentEpoch2)); + Long epoch = ShardLineage.getInheritedEpoch("-40", shardEpochMap); + assertThat(epoch).isEqualTo(parentEpoch + 1); + Long epoch2 = ShardLineage.getInheritedEpoch("80-c0", shardEpochMap); + assertThat(epoch2).isEqualTo(parentEpoch2 + 1); + Long epoch3 = ShardLineage.getInheritedEpoch("c0-", shardEpochMap); + assertThat(epoch3).isEqualTo(parentEpoch2 + 1); + } + + @Test + public void shouldGetInheritedEpoch_MultiShard_SplitOneShard_UpperEqualsLowerBound() { + Long parentEpoch = 3L; + Long parentEpoch2 = 1L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("-80", parentEpoch, "80-", parentEpoch2)); + // The lower bound (80) is equal to a previous shard's upper bound (-80) + Long epoch2 = ShardLineage.getInheritedEpoch("80-c0", shardEpochMap); + // This is not a descendant so assert the epoch is from the actual parent (which is less than the other parent) + assertThat(epoch2).isEqualTo(parentEpoch2 + 1); + } + + @Test + public void shouldGetInheritedEpoch_TwoToFourShards() { + Long parentEpoch = 1L; + Long parentEpoch2 = 3L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("-80", parentEpoch, "80-", parentEpoch2)); + Long epoch = ShardLineage.getInheritedEpoch("-4000", shardEpochMap); + assertThat(epoch).isEqualTo(parentEpoch + 1); + Long epoch2 = ShardLineage.getInheritedEpoch("8000-c000", shardEpochMap); + assertThat(epoch2).isEqualTo(parentEpoch2 + 1); + } + + @Test + public void shouldGetInheritedEpoch_FourShards() { + Long parentEpoch1 = 1L; + Long parentEpoch2 = 2L; + Long parentEpoch3 = 3L; + Long parentEpoch4 = 4L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("-4000", parentEpoch1, + "4000-8000", parentEpoch2, "8000-c000", parentEpoch3, "c000-", parentEpoch4)); + Long epoch = ShardLineage.getInheritedEpoch("4000-6000", shardEpochMap); + assertThat(epoch).isEqualTo(parentEpoch2 + 1); + Long epoch2 = ShardLineage.getInheritedEpoch("5000-9000", shardEpochMap); + assertThat(epoch2).isEqualTo(Math.max(parentEpoch2, parentEpoch3) + 1); + } + + @Test + public void shouldGetInheritedEpoch_OneToTwoShardsHigherHexRange() { + Long parentEpoch1 = 1L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("b7-b8", parentEpoch1)); + Long epoch = ShardLineage.getInheritedEpoch("b720-b750", shardEpochMap); + assertThat(epoch).isEqualTo(parentEpoch1 + 1); + Long epoch2 = ShardLineage.getInheritedEpoch("b750-b820", shardEpochMap); + assertThat(epoch2).isEqualTo(parentEpoch1 + 1); + } + + @Test + public void shouldGetInheritedEpoch_OneToTwoShardsMixedCase() { + Long parentEpoch1 = 4L; + ShardEpochMap shardEpochMap = new ShardEpochMap(Map.of("B7-B8", parentEpoch1)); + Long epoch = ShardLineage.getInheritedEpoch("b720-b750", shardEpochMap); + assertThat(epoch).isEqualTo(parentEpoch1 + 1); + Long epoch2 = ShardLineage.getInheritedEpoch("b750-b820", shardEpochMap); + assertThat(epoch2).isEqualTo(parentEpoch1 + 1); + + ShardEpochMap shardEpochMap2 = new ShardEpochMap(Map.of("b7-b8", parentEpoch1)); + Long epoch3 = ShardLineage.getInheritedEpoch("B720-B750", shardEpochMap); + assertThat(epoch3).isEqualTo(parentEpoch1 + 1); + Long epoch4 = ShardLineage.getInheritedEpoch("B750-B820", shardEpochMap); + assertThat(epoch4).isEqualTo(parentEpoch1 + 1); + } +} diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java index 79e2b910..12ba6f70 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java @@ -9,6 +9,7 @@ import static io.debezium.connector.vitess.TestHelper.TEST_SHARD1_EPOCH; import static io.debezium.connector.vitess.TestHelper.TEST_SHARD_TO_EPOCH; import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE; +import static io.debezium.connector.vitess.TestHelper.VGTID_SINGLE_SHARD_JSON_TEMPLATE; import static io.debezium.connector.vitess.VgtidTest.VGTID_BOTH_CURRENT; import static io.debezium.connector.vitess.VgtidTest.VGTID_JSON; import static org.assertj.core.api.Assertions.assertThat; @@ -61,8 +62,9 @@ public void testGetEpochSameHostSet() { @Test public void testLoadsEpochFromOffsets() { - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); - provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, TEST_SHARD_TO_EPOCH.toString())); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); + VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, TEST_SHARD_TO_EPOCH.toString()), config); Long epoch = provider.getEpoch(TEST_SHARD1, VGTID_JSON, VGTID_JSON); assertThat(epoch).isEqualTo(TEST_SHARD1_EPOCH); } @@ -89,9 +91,20 @@ public void testInitializeConfigEpochWithShardList() { assertThat(provider.getShardEpochMap().get(TEST_SHARD1)).isEqualTo(0); } + @Test + public void testInitializeConfigEpochWithInheritEpoch() { + Configuration config = Configuration.create() + .with(VitessConnectorConfig.SHARD, TestHelper.TEST_SHARD1) + .with(VitessConnectorConfig.INHERIT_EPOCH, true) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config); + VitessEpochProvider provider = VitessEpochProvider.initialize(connectorConfig); + assertThat(provider.isInheritEpochEnabled()).isEqualTo(true); + } + @Test public void snapshotIncrementsEpoch() { - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); String vgtidJsonEmpty = String.format( VGTID_JSON_TEMPLATE, VgtidTest.TEST_KEYSPACE, @@ -106,7 +119,7 @@ public void snapshotIncrementsEpoch() { @Test public void fastForwardVgtidIncrementsEpoch() { - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); String vgtidJsonCurrent = String.format( VGTID_JSON_TEMPLATE, VgtidTest.TEST_KEYSPACE, @@ -121,7 +134,7 @@ public void fastForwardVgtidIncrementsEpoch() { @Test public void currentVgtidIncrementsEpochForAllShards() { - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); Long epochShard1 = provider.getEpoch(VgtidTest.TEST_SHARD, vgtidJsonCurrent, VGTID_JSON); Long epochShard2 = provider.getEpoch(VgtidTest.TEST_SHARD2, VGTID_JSON, VGTID_JSON); assertThat(epochShard1).isEqualTo(1L); @@ -154,12 +167,40 @@ public void splitShard() { assertThat(epochShard2).isEqualTo(0L); } + @Test + public void splitShardInheritsEpoch() { + VitessEpochProvider provider = new VitessEpochProvider(new ShardEpochMap(Map.of("0", 0L)), true); + String vgtidSingleCurrent = String.format( + VGTID_SINGLE_SHARD_JSON_TEMPLATE, + TestHelper.TEST_SHARDED_KEYSPACE, + TestHelper.TEST_SHARD, + Vgtid.CURRENT_GTID); + String vgtid1 = String.format( + VGTID_SINGLE_SHARD_JSON_TEMPLATE, + TestHelper.TEST_SHARDED_KEYSPACE, + TestHelper.TEST_SHARD, + txId); + String vgtid2 = String.format( + VGTID_JSON_TEMPLATE, + TestHelper.TEST_SHARDED_KEYSPACE, + TestHelper.TEST_SHARD1, + txId, + TestHelper.TEST_SHARDED_KEYSPACE, + TestHelper.TEST_SHARD2, + txId); + Long epochShard1 = provider.getEpoch(TestHelper.TEST_SHARD, vgtidSingleCurrent, vgtid1); + assertThat(epochShard1).isEqualTo(1L); + Long epochShard2 = provider.getEpoch(TestHelper.TEST_SHARD1, vgtid1, vgtid2); + assertThat(epochShard2).isEqualTo(2L); + } + @Test public void nullPreviousVgtidWithStoredEpochShouldThrowException() { - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); int expectedEpoch = 1; String shardToEpoch = String.format("{\"%s\": %d}", TEST_SHARD1, expectedEpoch); - provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, shardToEpoch)); + VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, shardToEpoch), config); assertThatThrownBy(() -> { provider.getEpoch(VgtidTest.TEST_SHARD, null, VGTID_JSON); }).isInstanceOf(DebeziumException.class).hasMessageContaining("Previous vgtid string cannot be null"); @@ -170,7 +211,8 @@ public void missingEpochWithPreviousVgtidShouldThrowException() { VitessEpochProvider provider = new VitessEpochProvider(); int expectedEpoch = 1; String shardToEpoch = String.format("{\"%s\": %d}", TEST_SHARD1, expectedEpoch); - provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, shardToEpoch)); + VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, shardToEpoch), config); assertThatThrownBy(() -> { provider.getEpoch(VgtidTest.TEST_SHARD, VGTID_JSON, VGTID_JSON); }).isInstanceOf(DebeziumException.class).hasMessageContaining("Previous epoch cannot be null"); @@ -178,7 +220,7 @@ public void missingEpochWithPreviousVgtidShouldThrowException() { @Test public void matchingGtidReturnsInitialEpoch() { - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); int expectedEpoch = 0; Long epoch = provider.getEpoch(VgtidTest.TEST_SHARD, VGTID_JSON, VGTID_JSON); assertThat(epoch).isEqualTo(expectedEpoch); @@ -187,7 +229,7 @@ public void matchingGtidReturnsInitialEpoch() { @Test public void testInvalidCurrentGtid() { Long expectedEpoch = 0L; - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); Long epoch = provider.getEpoch("-80", VGTID_JSON, VGTID_JSON); assertThat(epoch).isEqualTo(expectedEpoch); String vgtidJsonCurrent = String.format( @@ -206,7 +248,7 @@ public void testInvalidCurrentGtid() { @Test public void testInvalidEmptyGtid() { Long expectedEpoch = 0L; - VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards)); + VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false); Long epoch = provider.getEpoch("-80", VGTID_JSON, VGTID_JSON); assertThat(epoch).isEqualTo(expectedEpoch); String vgtidJsonEmpty = String.format( diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java index f7baec7d..f59ed540 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.Test; +import io.debezium.config.Configuration; import io.debezium.connector.vitess.SourceInfo; import io.debezium.connector.vitess.TestHelper; import io.debezium.connector.vitess.VgtidTest; @@ -39,7 +40,8 @@ public void shouldLoad() { Map offsets = Map.of( VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, TEST_SHARD_TO_EPOCH.toString(), SourceInfo.VGTID_KEY, expectedId); - VitessOrderedTransactionContext context = VitessOrderedTransactionContext.load(offsets); + VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); + VitessOrderedTransactionContext context = VitessOrderedTransactionContext.load(offsets, config); assertThat(context.previousVgtid).isEqualTo(expectedId); context.beginTransaction(new VitessTransactionInfo(VgtidTest.VGTID_JSON, TEST_SHARD1)); assertThat(context.transactionEpoch).isEqualTo(TEST_SHARD1_EPOCH); @@ -78,7 +80,7 @@ public void shouldUpdateEpoch() { VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard); metadata.beginTransaction(transactionInfo); assertThat(metadata.transactionRank).isEqualTo(expectedRank); - assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch); + assertThat(metadata.getTransactionEpoch()).isEqualTo(expectedEpoch); String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]"; BigDecimal expectedRank2 = new BigDecimal("3"); @@ -90,6 +92,43 @@ public void shouldUpdateEpoch() { assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch2); } + @Test + public void shouldInheritEpoch() { + String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"0\"}]"; + VitessConnectorConfig config = new VitessConnectorConfig(TestHelper.defaultConfig(true, + false, + 0, + 0, + 0, + null, + VitessConnectorConfig.SnapshotMode.NEVER) + .with(VitessConnectorConfig.VGTID, expectedTxId) + .with(VitessConnectorConfig.SHARD, "0") + .with(VitessConnectorConfig.INHERIT_EPOCH, true) + .build()); + VitessOrderedTransactionContext metadata = VitessOrderedTransactionContext.initialize(config); + + BigDecimal expectedRank = new BigDecimal("7"); + long expectedEpoch = 0; + String expectedShard = "0"; + + VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard); + metadata.beginTransaction(transactionInfo); + assertThat(metadata.getTransactionRank()).isEqualTo(expectedRank); + assertThat(metadata.getTransactionEpoch()).isEqualTo(expectedEpoch); + + String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}," + + "{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"80-\"}]"; + BigDecimal expectedRank2 = new BigDecimal("3"); + long expectedEpoch2 = 1; + + String expectedShard2 = "-80"; + VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard2); + metadata.beginTransaction(transactionInfo2); + assertThat(metadata.getTransactionRank()).isEqualTo(expectedRank2); + assertThat(metadata.getTransactionEpoch()).isEqualTo(expectedEpoch2); + } + @Test public void shouldUpdateRank() { VitessOrderedTransactionContext metadata = VitessOrderedTransactionContext.initialize(