Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8719 Test scaling down reads last offsets #230

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class VitessOffsetRetriever {

private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnector.class);
private static final Logger LOGGER = LoggerFactory.getLogger(VitessOffsetRetriever.class);

private final int numTasks;
private final int gen;
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ public class TestHelper {
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" +
"]";

public static final String VGTID_JSON_TEMPLATE_SINGLE_SHARD = "[" +
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" +
"]";

public static final String TEST_HOST2_GTID = "MySQL56/b653fbaa-b71d-11ef-a099-0afffae24b7d:1-3939";

public static final String VGTID_JSON_DISTINCT_HOSTS = String.format(
VGTID_JSON_TEMPLATE,
TEST_SHARDED_KEYSPACE,
VgtidTest.TEST_SHARD,
VgtidTest.TEST_GTID,
TEST_SHARDED_KEYSPACE,
VgtidTest.TEST_SHARD2,
TEST_HOST2_GTID);

public static final String VGTID_JSON_SHARD2 = String.format(
VGTID_JSON_TEMPLATE_SINGLE_SHARD,
TEST_SHARDED_KEYSPACE,
VgtidTest.TEST_SHARD2,
TEST_HOST2_GTID);

public static final String VGTID_JSON_SHARD1 = String.format(
VGTID_JSON_TEMPLATE_SINGLE_SHARD,
TEST_SHARDED_KEYSPACE,
VgtidTest.TEST_SHARD,
VgtidTest.TEST_GTID);

protected static final String INSERT_STMT = "INSERT INTO t1 (int_col) VALUES (1);";
protected static final List<String> SETUP_TABLES_STMT = Arrays.asList(
"DROP TABLE IF EXISTS t1;",
Expand Down Expand Up @@ -218,6 +245,10 @@ public static void execute(String statement, String database) {
execute(Collections.singletonList(statement), database);
}

public static void execute(String statement, String shard, VitessConnectorConfig config) {
new VitessMetadata(config).executeQuery(statement, shard);
}

protected static void executeDDL(String ddlFile) throws Exception {
executeDDL(ddlFile, TEST_UNSHARDED_KEYSPACE);
}
Expand Down
58 changes: 58 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,64 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, false);
}

@Test
public void shouldScaleDownTasksReadPriorOffsets() throws Exception {
final boolean hasMultipleShards = true;

Configuration.Builder configBuilder = TestHelper.defaultConfig(
true, true, 2, 0, 2, null,
VitessConnectorConfig.SnapshotMode.NEVER);
configBuilder = configBuilder
.with(VitessConnectorConfig.KEYSPACE, TEST_SHARDED_KEYSPACE);
VitessConnectorConfig config = new VitessConnectorConfig(configBuilder.build());

TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(Function.identity(), hasMultipleShards, true, 2, 0, 2, null,
VitessConnectorConfig.SnapshotMode.NEVER, null);
assertConnectorIsRunning();

int expectedRecordsCount = 2;
consumer = testConsumer(expectedRecordsCount);

TestHelper.execute(INSERT_NUMERIC_TYPES_STMT, "-80", config);
TestHelper.execute(INSERT_NUMERIC_TYPES_STMT, "-80", config);

consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
consumer.remove();
SourceRecord lastRecordShard1 = consumer.remove();

consumer.expects(expectedRecordsCount);

TestHelper.execute(INSERT_NUMERIC_TYPES_STMT, "80-", config);
TestHelper.execute(INSERT_NUMERIC_TYPES_STMT, "80-", config);

consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
consumer.remove();
SourceRecord lastRecordShard2 = consumer.remove();
String shard1Gtid = Vgtid.of((String) lastRecordShard1.sourceOffset().get(SourceInfo.VGTID_KEY)).getShardGtid("-80").getGtid();
String shard2Gtid = Vgtid.of((String) lastRecordShard2.sourceOffset().get(SourceInfo.VGTID_KEY)).getShardGtid("80-").getGtid();

stopConnector();
assertConnectorNotRunning();

consumer.expects(2);

startConnector(Function.identity(), hasMultipleShards, true, 1, 1, 2, null,
VitessConnectorConfig.SnapshotMode.NEVER, null);
assertConnectorIsRunning();

consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);

SourceRecord record1 = consumer.remove();
Vgtid record1Vgtid = Vgtid.of((String) record1.sourceOffset().get(SourceInfo.VGTID_KEY));
String record1Shard1Gtid = record1Vgtid.getShardGtid("-80").getGtid();
String record1Shard2Gtid = record1Vgtid.getShardGtid("80-").getGtid();
// Assert that we resumed from the last saved point with previous task number/generation
assertThat(record1Shard1Gtid).isEqualTo(shard1Gtid);
assertThat(record1Shard2Gtid).isEqualTo(shard2Gtid);
}

@Test
public void shouldMaintainEpochMapWithChangeInOffsetStoragePerTask() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package io.debezium.connector.vitess;

import static io.debezium.connector.vitess.TestHelper.TEST_SERVER;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_DISTINCT_HOSTS;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_SHARD1;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_SHARD2;
import static io.debezium.connector.vitess.VgtidTest.VGTID_JSON;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -135,6 +138,42 @@ public void shouldReadPreviousGenOffsets() {
assertThat(vitessLogInterceptor.containsMessage(expectedMessage)).isTrue();
}

@Test
public void shouldReadPreviousGenOffsetsWhenTasksDecrease() {
int prevGen = 0;
int gen = 1;
int prevNumTasks = 2;
int numTasks = 1;
int taskId0 = 0;
int taskId1 = 1;
String taskKeyPrevGen0 = VitessConnector.getTaskKeyName(taskId0, prevNumTasks, prevGen);
String taskKeyPrevGen1 = VitessConnector.getTaskKeyName(taskId1, prevNumTasks, prevGen);
ContextHelper helper = new ContextHelper();
helper.storeOffsets(null, Map.of(taskKeyPrevGen0, VGTID_JSON_SHARD1, taskKeyPrevGen1, VGTID_JSON_SHARD2));

String taskKey = VitessConnector.getTaskKeyName(taskId0, numTasks, gen);
String shards = "-80,80-";
Configuration config = TestHelper.defaultConfig(
true,
true,
numTasks,
gen,
prevNumTasks,
null,
VitessConnectorConfig.SnapshotMode.NEVER,
shards)
.with(VitessConnectorConfig.VITESS_TASK_KEY_CONFIG, taskKey)
.with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, shards)
.with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, numTasks)
.build();
VitessConnectorTask task = new VitessConnectorTask();
task.initialize(helper.getSourceTaskContext());
task.start(config);
Configuration configWithOffsets = task.getConfigWithOffsets(config);
Vgtid loadedVgtid = Vgtid.of(configWithOffsets.getString(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG));
assertThat(loadedVgtid).isEqualTo(Vgtid.of(VGTID_JSON_DISTINCT_HOSTS));
}

@Test
public void shouldReadConfiguredOffsets() {
int gen = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
package io.debezium.connector.vitess;

import static io.debezium.connector.vitess.TestHelper.TEST_SERVER;
import static io.debezium.connector.vitess.TestHelper.TEST_SHARD1;
import static io.debezium.connector.vitess.TestHelper.TEST_SHARD2;
import static io.debezium.connector.vitess.TestHelper.TEST_SHARDED_KEYSPACE;
import static io.debezium.connector.vitess.TestHelper.TEST_UNSHARDED_KEYSPACE;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_DISTINCT_HOSTS;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_SHARD1;
import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_SHARD2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertArrayEquals;
Expand Down Expand Up @@ -291,6 +296,24 @@ public void testTaskConfigsSingleTaskMultipleShardsMultipleTasks() {
assertEquals("value", firstConfig.get("key"));
}

@Test
public void testScaleDownTasks() {
List<String> shards = Arrays.asList(TEST_SHARD1, TEST_SHARD2);
int prevNumTasks = 2;
int prevGen = 0;
final Map<String, Map<String, ?>> prevVgtids = Collect.hashMapOf(
VitessConnector.getTaskKeyName(0, prevNumTasks, prevGen), getVgtidOffset(VGTID_JSON_SHARD1),
VitessConnector.getTaskKeyName(1, prevNumTasks, prevGen), getVgtidOffset(VGTID_JSON_SHARD2));

int numTasks = 1;
int gen = 1;
Map<String, Map<String, String>> offsets = getOffsetFromStorage(numTasks, shards, gen, prevNumTasks, null, prevVgtids,
(config) -> config.with(VitessConnectorConfig.KEYSPACE.name(), TEST_SHARDED_KEYSPACE));
String taskKeyName = VitessConnector.getTaskKeyName(0, numTasks, gen);
Map<String, String> taskOffsets = offsets.get(taskKeyName);
assertThat(Vgtid.of(taskOffsets.get(SourceInfo.VGTID_KEY))).isEqualTo(Vgtid.of(VGTID_JSON_DISTINCT_HOSTS));
}

@Test
public void testTaskConfigsSingleTaskMultipleShardsMultipleTasksOrderMetadata() {
VitessConnector connector = new VitessConnector();
Expand Down