From e9234f3fab0ca111cb0750e718b01b3e564e9780 Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Mon, 9 Sep 2024 16:21:50 +0800
Subject: [PATCH 01/12] improve test
---
flink-doris-connector/pom.xml | 6 +
.../flink/catalog/DorisCatalogITCase.java | 2 +-
.../container/AbstractContainerTestBase.java | 23 ++++
.../flink/container/AbstractE2EService.java | 1 +
.../container/AbstractITCaseService.java | 7 +
.../container/e2e/Doris2DorisE2ECase.java | 2 +-
.../doris/flink/sink/DorisSinkITCase.java | 21 ++-
.../doris/flink/source/DorisSourceITCase.java | 122 +++++++++++++++++-
.../enumerator/DorisSourceEnumeratorTest.java | 111 ++++++++++++++++
9 files changed, 273 insertions(+), 22 deletions(-)
create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 64ca73920..d773339b3 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -423,6 +423,12 @@ under the License.
${flink.version}
test
+
+ org.apache.flink
+ flink-connector-test-utils
+ ${flink.version}
+ test
+
com.github.jsqlparser
jsqlparser
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
index b3a3ce04f..099f6ebd6 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java
@@ -146,7 +146,7 @@ public void setup()
props.put("sink.enable-2pc", "false");
catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props);
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
- tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM);
// Use doris catalog.
tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tEnv.useCatalog(TEST_CATALOG_NAME);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 967e6f363..4731d4776 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -24,11 +24,18 @@
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public abstract class AbstractContainerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class);
private static ContainerService dorisContainerService;
+ public static final int DEFAULT_PARALLELISM = 2;
@BeforeClass
public static void initContainers() {
@@ -88,4 +95,20 @@ private static void closeDorisContainer() {
dorisContainerService.close();
LOG.info("Doris container was closed.");
}
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+ public static void assertEqualsInAnyOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEqualsInOrder(
+ expected.stream().sorted().collect(Collectors.toList()),
+ actual.stream().sorted().collect(Collectors.toList()));
+ }
+
+ public static void assertEqualsInOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
+ }
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
index 527f82cc5..ec536ee68 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
@@ -113,6 +113,7 @@ protected void cancelE2EJob(String jobName) {
private StreamExecutionEnvironment configFlinkEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
Map flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
index 956b8be65..fd19329a5 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java
@@ -138,4 +138,11 @@ protected static void triggerJobManagerFailover(
LOG.info("flink cluster will grant job master leadership. jobId={}", jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
+
+ protected void sleepMs(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ignored) {
+ }
+ }
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
index fcb4858a8..4b4e3b26a 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java
@@ -55,7 +55,7 @@ public void testDoris2Doris() throws Exception {
LOG.info("Start executing the test case of doris to doris.");
initializeDorisTable();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(2);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 50bcf6be1..18488229e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -131,6 +131,7 @@ private void submitJob(
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(DEFAULT_PARALLELISM);
Builder builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
@@ -147,7 +148,7 @@ private void submitJob(
public void testTableSinkJsonFormat() throws Exception {
initializeTable(TABLE_JSON_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -196,7 +197,7 @@ public void testTableSinkJsonFormat() throws Exception {
public void testTableBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -244,6 +245,7 @@ public void testDataStreamBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_DS);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(DEFAULT_PARALLELISM);
DorisBatchSink.Builder builder = DorisBatchSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
@@ -283,7 +285,7 @@ public void testDataStreamBatch() throws Exception {
public void testTableGroupCommit() throws Exception {
initializeTable(TABLE_GROUP_COMMIT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -332,7 +334,7 @@ public void testTableGroupCommit() throws Exception {
public void testTableGzFormat() throws Exception {
initializeTable(TABLE_GZ_FORMAT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -374,7 +376,7 @@ public void testJobManagerFailoverSink() throws Exception {
LOG.info("start to test JobManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
@@ -434,7 +436,7 @@ public void testTaskManagerFailoverSink() throws Exception {
LOG.info("start to test TaskManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
@@ -486,13 +488,6 @@ public void testTaskManagerFailoverSink() throws Exception {
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}
- private void sleepMs(long millis) {
- try {
- Thread.sleep(millis);
- } catch (InterruptedException ignored) {
- }
- }
-
private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 783e6bda2..4eb9621fe 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -17,8 +17,10 @@
package org.apache.doris.flink.source;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -40,6 +42,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -56,13 +59,15 @@ public class DorisSourceITCase extends AbstractITCaseService {
private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
"tbl_read_tbl_push_down_with_union_all";
+ static final String TABLE_CSV_JM = "tbl_csv_jm_source";
+ static final String TABLE_CSV_TM = "tbl_csv_tm_source";
@Test
public void testSource() throws Exception {
initializeTable(TABLE_READ);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-
+ env.setParallelism(DEFAULT_PARALLELISM);
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes(getFenodes())
@@ -91,6 +96,7 @@ public void testSource() throws Exception {
public void testOldSourceApi() throws Exception {
initializeTable(TABLE_READ_OLD_API);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
Properties properties = new Properties();
properties.put("fenodes", getFenodes());
properties.put("username", getDorisUsername());
@@ -116,7 +122,7 @@ options, new SimpleListDeserializationSchema()))
public void testTableSource() throws Exception {
initializeTable(TABLE_READ_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -165,7 +171,7 @@ public void testTableSource() throws Exception {
public void testTableSourceOldApi() throws Exception {
initializeTable(TABLE_READ_TBL_OLD_API);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -202,7 +208,7 @@ public void testTableSourceOldApi() throws Exception {
public void testTableSourceAllOptions() throws Exception {
initializeTable(TABLE_READ_TBL_ALL_OPTIONS);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -248,7 +254,7 @@ public void testTableSourceAllOptions() throws Exception {
public void testTableSourceFilterAndProjectionPushDown() throws Exception {
initializeTable(TABLE_READ_TBL_PUSH_DOWN);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -287,7 +293,7 @@ public void testTableSourceFilterWithUnionAll() {
LOG.info("starting to execute testTableSourceFilterWithUnionAll case.");
initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
+ env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
@@ -328,6 +334,98 @@ public void testTableSourceFilterWithUnionAll() {
}
}
+ @Test
+ public void testJobManagerFailoverSource() throws Exception {
+ LOG.info("start to test JobManagerFailoverSource.");
+ initializeTable(TABLE_CSV_JM);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(10000);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source_jm ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_CSV_JM,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from doris_source_jm");
+ CloseableIterator iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+ List expectedSnapshotData = new ArrayList<>();
+ expectedSnapshotData.addAll(
+ Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"));
+
+ if (iterator.hasNext()) {
+ LOG.info("trigger jobmanager failover...");
+ triggerFailover(
+ FailoverType.JM,
+ jobId,
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+
+ assertEqualsInAnyOrder(
+ expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
+ }
+
+ @Test
+ public void testTaskManagerFailoverSink() throws Exception {
+ LOG.info("start to test TaskManagerFailoverSink.");
+ initializeTable(TABLE_CSV_TM);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(10000);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source_tm ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_CSV_TM,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from doris_source_tm");
+ CloseableIterator iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+ List expectedSnapshotData = new ArrayList<>();
+ expectedSnapshotData.addAll(
+ Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"));
+
+ if (iterator.hasNext()) {
+ LOG.info("trigger taskmanager failover...");
+ triggerFailover(
+ FailoverType.TM,
+ jobId,
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+
+ assertEqualsInAnyOrder(
+ expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
+ }
+
private void checkResult(String testName, Object[] expected, Object[] actual) {
LOG.info(
"Checking DorisSourceITCase result. testName={}, actual={}, expected={}",
@@ -347,7 +445,7 @@ private void initializeTable(String table) {
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`age` int\n"
- + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
@@ -356,4 +454,14 @@ private void initializeTable(String table) {
String.format("insert into %s.%s values ('flink',10)", DATABASE, table),
String.format("insert into %s.%s values ('apache',12)", DATABASE, table));
}
+
+ private static List fetchRows(Iterator iter, int size) {
+ List rows = new ArrayList<>(size);
+ while (size > 0 && iter.hasNext()) {
+ Row row = iter.next();
+ rows.add(row.toString());
+ size--;
+ }
+ return rows;
+ }
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
new file mode 100644
index 000000000..687890154
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.source.DorisSource;
+import org.apache.doris.flink.source.assigners.SimpleSplitAssigner;
+import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link DorisSourceEnumerator}. */
+public class DorisSourceEnumeratorTest {
+ private static long splitId = 1L;
+ private TestingSplitEnumeratorContext context;
+ private DorisSourceSplit split;
+ private DorisSourceEnumerator enumerator;
+
+ @BeforeEach
+ void setup() {
+ this.context = new TestingSplitEnumeratorContext<>(2);
+ this.split = createRandomSplit();
+ this.enumerator = createEnumerator(context, split);
+ }
+
+ @Test
+ void testCheckpointNoSplitRequested() throws Exception {
+ PendingSplitsCheckpoint state = enumerator.snapshotState(1L);
+ assertThat(state.getSplits()).contains(split);
+ }
+
+ @Test
+ void testRestoreEnumerator() throws Exception {
+ PendingSplitsCheckpoint state = enumerator.snapshotState(1L);
+
+ DorisSource source = DorisSource.builder().build();
+ SplitEnumerator restoreEnumerator =
+ source.restoreEnumerator(context, state);
+ PendingSplitsCheckpoint pendingSplitsCheckpoint = restoreEnumerator.snapshotState(1L);
+ assertThat(pendingSplitsCheckpoint.getSplits()).contains(split);
+ }
+
+ @Test
+ void testSplitRequestForRegisteredReader() throws Exception {
+ context.registerReader(1, "somehost");
+ enumerator.addReader(1);
+ enumerator.handleSplitRequest(1, "somehost");
+ assertThat(enumerator.snapshotState(1L).getSplits()).isEmpty();
+ assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+ }
+
+ @Test
+ void testSplitRequestForNonRegisteredReader() throws Exception {
+ enumerator.handleSplitRequest(1, "somehost");
+ assertThat(context.getSplitAssignments()).doesNotContainKey(1);
+ assertThat(enumerator.snapshotState(1L).getSplits()).contains(split);
+ }
+
+ @Test
+ void testNoMoreSplits() {
+ // first split assignment
+ context.registerReader(1, "somehost");
+ enumerator.addReader(1);
+ enumerator.handleSplitRequest(1, "somehost");
+
+ // second request has no more split
+ enumerator.handleSplitRequest(1, "somehost");
+
+ assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+ assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
+ }
+
+ private static DorisSourceSplit createRandomSplit() {
+ Set tabletIds = new HashSet<>();
+ tabletIds.add(1001L);
+ return new DorisSourceSplit(
+ String.valueOf(splitId),
+ new PartitionDefinition("db", "tbl", "127.0.0.1", tabletIds, "queryPlan"));
+ }
+
+ private static DorisSourceEnumerator createEnumerator(
+ final SplitEnumeratorContext context,
+ final DorisSourceSplit... splits) {
+ return new DorisSourceEnumerator(context, new SimpleSplitAssigner(Arrays.asList(splits)));
+ }
+}
From 81c41ac6fed88fda45b1d3bc5a10b1ebd6f1a341 Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Mon, 9 Sep 2024 17:08:50 +0800
Subject: [PATCH 02/12] update
---
.../container/AbstractContainerTestBase.java | 6 +--
.../doris/flink/source/DorisSourceITCase.java | 42 ++++++++++++-------
2 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 4731d4776..61e0faac8 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -99,16 +99,16 @@ private static void closeDorisContainer() {
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
- public static void assertEqualsInAnyOrder(List expected, List actual) {
+ public static void assertEqualsInAnyOrder(List