From e9234f3fab0ca111cb0750e718b01b3e564e9780 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 16:21:50 +0800 Subject: [PATCH 01/12] improve test --- flink-doris-connector/pom.xml | 6 + .../flink/catalog/DorisCatalogITCase.java | 2 +- .../container/AbstractContainerTestBase.java | 23 ++++ .../flink/container/AbstractE2EService.java | 1 + .../container/AbstractITCaseService.java | 7 + .../container/e2e/Doris2DorisE2ECase.java | 2 +- .../doris/flink/sink/DorisSinkITCase.java | 21 ++- .../doris/flink/source/DorisSourceITCase.java | 122 +++++++++++++++++- .../enumerator/DorisSourceEnumeratorTest.java | 111 ++++++++++++++++ 9 files changed, 273 insertions(+), 22 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 64ca73920..d773339b3 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -423,6 +423,12 @@ under the License. ${flink.version} test + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + com.github.jsqlparser jsqlparser diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java index b3a3ce04f..099f6ebd6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/DorisCatalogITCase.java @@ -146,7 +146,7 @@ public void setup() props.put("sink.enable-2pc", "false"); catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props); this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); - tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM); // Use doris catalog. tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); tEnv.useCatalog(TEST_CATALOG_NAME); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java index 967e6f363..4731d4776 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java @@ -24,11 +24,18 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public abstract class AbstractContainerTestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class); private static ContainerService dorisContainerService; + public static final int DEFAULT_PARALLELISM = 2; @BeforeClass public static void initContainers() { @@ -88,4 +95,20 @@ private static void closeDorisContainer() { dorisContainerService.close(); LOG.info("Doris container was closed."); } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java index 527f82cc5..ec536ee68 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java @@ -113,6 +113,7 @@ protected void cancelE2EJob(String jobName) { private StreamExecutionEnvironment configFlinkEnvironment() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); Map flinkMap = new HashMap<>(); flinkMap.put("execution.checkpointing.interval", "10s"); flinkMap.put("pipeline.operator-chaining", "false"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index 956b8be65..fd19329a5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -138,4 +138,11 @@ protected static void triggerJobManagerFailover( LOG.info("flink cluster will grant job master leadership. jobId={}", jobId); haLeadershipControl.grantJobMasterLeadership(jobId).get(); } + + protected void sleepMs(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java index fcb4858a8..4b4e3b26a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Doris2DorisE2ECase.java @@ -55,7 +55,7 @@ public void testDoris2Doris() throws Exception { LOG.info("Start executing the test case of doris to doris."); initializeDorisTable(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 50bcf6be1..18488229e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -131,6 +131,7 @@ private void submitJob( throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(DEFAULT_PARALLELISM); Builder builder = DorisSink.builder(); final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); @@ -147,7 +148,7 @@ private void submitJob( public void testTableSinkJsonFormat() throws Exception { initializeTable(TABLE_JSON_TBL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -196,7 +197,7 @@ public void testTableSinkJsonFormat() throws Exception { public void testTableBatch() throws Exception { initializeTable(TABLE_CSV_BATCH_TBL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -244,6 +245,7 @@ public void testDataStreamBatch() throws Exception { initializeTable(TABLE_CSV_BATCH_DS); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(DEFAULT_PARALLELISM); DorisBatchSink.Builder builder = DorisBatchSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); @@ -283,7 +285,7 @@ public void testDataStreamBatch() throws Exception { public void testTableGroupCommit() throws Exception { initializeTable(TABLE_GROUP_COMMIT); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -332,7 +334,7 @@ public void testTableGroupCommit() throws Exception { public void testTableGzFormat() throws Exception { initializeTable(TABLE_GZ_FORMAT); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -374,7 +376,7 @@ public void testJobManagerFailoverSink() throws Exception { LOG.info("start to test JobManagerFailoverSink."); initializeFailoverTable(TABLE_CSV_JM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); + env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(10000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); @@ -434,7 +436,7 @@ public void testTaskManagerFailoverSink() throws Exception { LOG.info("start to test TaskManagerFailoverSink."); initializeFailoverTable(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); + env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(10000); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); @@ -486,13 +488,6 @@ public void testTaskManagerFailoverSink() throws Exception { ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } - private void sleepMs(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException ignored) { - } - } - private void initializeTable(String table) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 783e6bda2..4eb9621fe 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -17,8 +17,10 @@ package org.apache.doris.flink.source; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -40,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.Set; @@ -56,13 +59,15 @@ public class DorisSourceITCase extends AbstractITCaseService { private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down"; private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL = "tbl_read_tbl_push_down_with_union_all"; + static final String TABLE_CSV_JM = "tbl_csv_jm_source"; + static final String TABLE_CSV_TM = "tbl_csv_tm_source"; @Test public void testSource() throws Exception { initializeTable(TABLE_READ); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); - + env.setParallelism(DEFAULT_PARALLELISM); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder .setFenodes(getFenodes()) @@ -91,6 +96,7 @@ public void testSource() throws Exception { public void testOldSourceApi() throws Exception { initializeTable(TABLE_READ_OLD_API); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); Properties properties = new Properties(); properties.put("fenodes", getFenodes()); properties.put("username", getDorisUsername()); @@ -116,7 +122,7 @@ options, new SimpleListDeserializationSchema())) public void testTableSource() throws Exception { initializeTable(TABLE_READ_TBL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -165,7 +171,7 @@ public void testTableSource() throws Exception { public void testTableSourceOldApi() throws Exception { initializeTable(TABLE_READ_TBL_OLD_API); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -202,7 +208,7 @@ public void testTableSourceOldApi() throws Exception { public void testTableSourceAllOptions() throws Exception { initializeTable(TABLE_READ_TBL_ALL_OPTIONS); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -248,7 +254,7 @@ public void testTableSourceAllOptions() throws Exception { public void testTableSourceFilterAndProjectionPushDown() throws Exception { initializeTable(TABLE_READ_TBL_PUSH_DOWN); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -287,7 +293,7 @@ public void testTableSourceFilterWithUnionAll() { LOG.info("starting to execute testTableSourceFilterWithUnionAll case."); initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + env.setParallelism(DEFAULT_PARALLELISM); final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -328,6 +334,98 @@ public void testTableSourceFilterWithUnionAll() { } } + @Test + public void testJobManagerFailoverSource() throws Exception { + LOG.info("start to test JobManagerFailoverSource."); + initializeTable(TABLE_CSV_JM); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(10000); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + String sourceDDL = + String.format( + "CREATE TABLE doris_source_jm (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_CSV_JM, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from doris_source_jm"); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + List expectedSnapshotData = new ArrayList<>(); + expectedSnapshotData.addAll( + Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]")); + + if (iterator.hasNext()) { + LOG.info("trigger jobmanager failover..."); + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + } + + @Test + public void testTaskManagerFailoverSink() throws Exception { + LOG.info("start to test TaskManagerFailoverSink."); + initializeTable(TABLE_CSV_TM); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(10000); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + String sourceDDL = + String.format( + "CREATE TABLE doris_source_tm (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_CSV_TM, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from doris_source_tm"); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + List expectedSnapshotData = new ArrayList<>(); + expectedSnapshotData.addAll( + Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]")); + + if (iterator.hasNext()) { + LOG.info("trigger taskmanager failover..."); + triggerFailover( + FailoverType.TM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + } + private void checkResult(String testName, Object[] expected, Object[] actual) { LOG.info( "Checking DorisSourceITCase result. testName={}, actual={}, expected={}", @@ -347,7 +445,7 @@ private void initializeTable(String table) { "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" - + ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", @@ -356,4 +454,14 @@ private void initializeTable(String table) { String.format("insert into %s.%s values ('flink',10)", DATABASE, table), String.format("insert into %s.%s values ('apache',12)", DATABASE, table)); } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java new file mode 100644 index 000000000..687890154 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumeratorTest.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; + +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.source.DorisSource; +import org.apache.doris.flink.source.assigners.SimpleSplitAssigner; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link DorisSourceEnumerator}. */ +public class DorisSourceEnumeratorTest { + private static long splitId = 1L; + private TestingSplitEnumeratorContext context; + private DorisSourceSplit split; + private DorisSourceEnumerator enumerator; + + @BeforeEach + void setup() { + this.context = new TestingSplitEnumeratorContext<>(2); + this.split = createRandomSplit(); + this.enumerator = createEnumerator(context, split); + } + + @Test + void testCheckpointNoSplitRequested() throws Exception { + PendingSplitsCheckpoint state = enumerator.snapshotState(1L); + assertThat(state.getSplits()).contains(split); + } + + @Test + void testRestoreEnumerator() throws Exception { + PendingSplitsCheckpoint state = enumerator.snapshotState(1L); + + DorisSource source = DorisSource.builder().build(); + SplitEnumerator restoreEnumerator = + source.restoreEnumerator(context, state); + PendingSplitsCheckpoint pendingSplitsCheckpoint = restoreEnumerator.snapshotState(1L); + assertThat(pendingSplitsCheckpoint.getSplits()).contains(split); + } + + @Test + void testSplitRequestForRegisteredReader() throws Exception { + context.registerReader(1, "somehost"); + enumerator.addReader(1); + enumerator.handleSplitRequest(1, "somehost"); + assertThat(enumerator.snapshotState(1L).getSplits()).isEmpty(); + assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split); + } + + @Test + void testSplitRequestForNonRegisteredReader() throws Exception { + enumerator.handleSplitRequest(1, "somehost"); + assertThat(context.getSplitAssignments()).doesNotContainKey(1); + assertThat(enumerator.snapshotState(1L).getSplits()).contains(split); + } + + @Test + void testNoMoreSplits() { + // first split assignment + context.registerReader(1, "somehost"); + enumerator.addReader(1); + enumerator.handleSplitRequest(1, "somehost"); + + // second request has no more split + enumerator.handleSplitRequest(1, "somehost"); + + assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split); + assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue(); + } + + private static DorisSourceSplit createRandomSplit() { + Set tabletIds = new HashSet<>(); + tabletIds.add(1001L); + return new DorisSourceSplit( + String.valueOf(splitId), + new PartitionDefinition("db", "tbl", "127.0.0.1", tabletIds, "queryPlan")); + } + + private static DorisSourceEnumerator createEnumerator( + final SplitEnumeratorContext context, + final DorisSourceSplit... splits) { + return new DorisSourceEnumerator(context, new SimpleSplitAssigner(Arrays.asList(splits))); + } +} From 81c41ac6fed88fda45b1d3bc5a10b1ebd6f1a341 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 17:08:50 +0800 Subject: [PATCH 02/12] update --- .../container/AbstractContainerTestBase.java | 6 +-- .../doris/flink/source/DorisSourceITCase.java | 42 ++++++++++++------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java index 4731d4776..61e0faac8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java @@ -99,16 +99,16 @@ private static void closeDorisContainer() { // ------------------------------------------------------------------------ // test utilities // ------------------------------------------------------------------------ - public static void assertEqualsInAnyOrder(List expected, List actual) { + public static void assertEqualsInAnyOrder(List expected, List actual) { assertTrue(expected != null && actual != null); assertEqualsInOrder( expected.stream().sorted().collect(Collectors.toList()), actual.stream().sorted().collect(Collectors.toList())); } - public static void assertEqualsInOrder(List expected, List actual) { + public static void assertEqualsInOrder(List expected, List actual) { assertTrue(expected != null && actual != null); assertEquals(expected.size(), actual.size()); - assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new Object[0])); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 4eb9621fe..500e96d41 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -41,11 +41,9 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Properties; -import java.util.Set; /** DorisSource ITCase. */ public class DorisSourceITCase extends AbstractITCaseService { @@ -89,7 +87,7 @@ public void testSource() throws Exception { } } List expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]"); - checkResult("testSource", expected.toArray(), actual.toArray()); + checkResultInAnyOrder("testSource", expected.toArray(), actual.toArray()); } @Test @@ -115,7 +113,7 @@ options, new SimpleListDeserializationSchema())) } } List expected = Arrays.asList("[doris, 18]", "[flink, 10]", "[apache, 12]"); - checkResult("testOldSourceApi", expected.toArray(), actual.toArray()); + checkResultInAnyOrder("testOldSourceApi", expected.toArray(), actual.toArray()); } @Test @@ -164,7 +162,7 @@ public void testTableSource() throws Exception { } } String[] expectedFilter = new String[] {"+I[doris, 18]"}; - checkResult("testTableSource", expectedFilter, actualFilter.toArray()); + checkResultInAnyOrder("testTableSource", expectedFilter, actualFilter.toArray()); } @Test @@ -201,7 +199,7 @@ public void testTableSourceOldApi() throws Exception { } } String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"}; - checkResult("testTableSourceOldApi", expected, actual.toArray()); + checkResultInAnyOrder("testTableSourceOldApi", expected, actual.toArray()); } @Test @@ -247,7 +245,7 @@ public void testTableSourceAllOptions() throws Exception { } } String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"}; - checkResult("testTableSourceAllOptions", expected, actual.toArray()); + checkResultInAnyOrder("testTableSourceAllOptions", expected, actual.toArray()); } @Test @@ -285,7 +283,8 @@ public void testTableSourceFilterAndProjectionPushDown() throws Exception { } } String[] expected = new String[] {"+I[18]"}; - checkResult("testTableSourceFilterAndProjectionPushDown", expected, actual.toArray()); + checkResultInAnyOrder( + "testTableSourceFilterAndProjectionPushDown", expected, actual.toArray()); } @Test @@ -328,10 +327,8 @@ public void testTableSourceFilterWithUnionAll() { LOG.error("Failed to execute sql. sql={}", querySql, e); throw new DorisRuntimeException(e); } - Set expected = new HashSet<>(Arrays.asList("+I[flink, 10]", "+I[doris, 18]")); - for (String a : actual) { - Assert.assertTrue(expected.contains(a)); - } + String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"}; + checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray()); } @Test @@ -376,8 +373,10 @@ public void testJobManagerFailoverSource() throws Exception { () -> sleepMs(100)); } - assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + checkResultInAnyOrder( + "testJobManagerFailoverSource", + expectedSnapshotData.toArray(), + fetchRows(iterator, expectedSnapshotData.size()).toArray()); } @Test @@ -422,8 +421,10 @@ public void testTaskManagerFailoverSink() throws Exception { () -> sleepMs(100)); } - assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + checkResultInAnyOrder( + "testTaskManagerFailoverSink", + expectedSnapshotData.toArray(), + fetchRows(iterator, expectedSnapshotData.size()).toArray()); } private void checkResult(String testName, Object[] expected, Object[] actual) { @@ -435,6 +436,15 @@ private void checkResult(String testName, Object[] expected, Object[] actual) { Assert.assertArrayEquals(expected, actual); } + private void checkResultInAnyOrder(String testName, Object[] expected, Object[] actual) { + LOG.info( + "Checking DorisSourceITCase result. testName={}, actual={}, expected={}", + testName, + actual, + expected); + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual)); + } + private void initializeTable(String table) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), From ebe674ffd800c07d16763f922a8ad265c48edf19 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 18:31:38 +0800 Subject: [PATCH 03/12] update --- .../flink/source/reader/DorisSourceSplitReader.java | 2 ++ .../org/apache/doris/flink/source/DorisSourceITCase.java | 9 +++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index c9ed6f9ce..54879067c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -80,8 +80,10 @@ private void checkSplitOrStartNext() throws IOException, DorisException { } private DorisSplitRecords finishSplit() { + LOG.info("Finished split {}", currentSplitId); final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; + LOG.info("After Finished split {}, {} ", currentSplitId, finishRecords.finishedSplits()); return finishRecords; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 500e96d41..7cb025fe3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -33,7 +33,6 @@ import org.apache.doris.flink.container.ContainerUtils; import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; -import org.apache.doris.flink.exception.DorisRuntimeException; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -150,7 +149,7 @@ public void testTableSource() throws Exception { } } String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]"}; - Assert.assertArrayEquals(expected, actual.toArray()); + assertEqualsInAnyOrder(Arrays.asList(expected), Arrays.asList(actual.toArray())); // fitler query List actualFilter = new ArrayList<>(); @@ -288,7 +287,7 @@ public void testTableSourceFilterAndProjectionPushDown() throws Exception { } @Test - public void testTableSourceFilterWithUnionAll() { + public void testTableSourceFilterWithUnionAll() throws Exception { LOG.info("starting to execute testTableSourceFilterWithUnionAll case."); initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -323,10 +322,8 @@ public void testTableSourceFilterWithUnionAll() { while (iterator.hasNext()) { actual.add(iterator.next().toString()); } - } catch (Exception e) { - LOG.error("Failed to execute sql. sql={}", querySql, e); - throw new DorisRuntimeException(e); } + String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"}; checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray()); } From 9c05ed526b0e11e72939bb25cb04f2f70bbae54c Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 20:51:28 +0800 Subject: [PATCH 04/12] update --- .../doris/flink/source/reader/DorisSourceSplitReader.java | 2 +- .../java/org/apache/doris/flink/source/DorisSourceITCase.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index 54879067c..d29dc0db8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -89,7 +89,7 @@ private DorisSplitRecords finishSplit() { @Override public void handleSplitsChanges(SplitsChange splitsChange) { - LOG.debug("Handling split change {}", splitsChange); + LOG.info("Handling split change {}", splitsChange); splits.addAll(splitsChange.splits()); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 7cb025fe3..c6be738af 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -334,7 +334,7 @@ public void testJobManagerFailoverSource() throws Exception { initializeTable(TABLE_CSV_JM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(10000); + env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -382,7 +382,7 @@ public void testTaskManagerFailoverSink() throws Exception { initializeTable(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(10000); + env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = From a1c84f8c611fee7ce731e10fd039bedda0aceb09 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 21:08:28 +0800 Subject: [PATCH 05/12] update --- .../main/java/org/apache/doris/flink/source/DorisSource.java | 1 + .../doris/flink/source/assigners/SimpleSplitAssigner.java | 1 + .../doris/flink/source/enumerator/DorisSourceEnumerator.java | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index 6faed87df..2b1b31671 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -125,6 +125,7 @@ public SplitEnumerator restoreEnumera SplitEnumeratorContext context, PendingSplitsCheckpoint checkpoint) throws Exception { Collection splits = checkpoint.getSplits(); + LOG.info("Restore {} splits from checkpoint, detail {}", splits.size(), splits); DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(splits); return new DorisSourceEnumerator(context, splitAssigner); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index ee96f6873..52afa89b0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -52,6 +52,7 @@ public void addSplits(Collection splits) { @Override public PendingSplitsCheckpoint snapshotState(long checkpointId) { + LOG.info("Checkpointing {} splits: {}", checkpointId, splits); return new PendingSplitsCheckpoint(splits); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java index 65fcc6fa7..e0600787b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java @@ -85,7 +85,7 @@ public void addSplitsBack(List splits, int subtaskId) { @Override public void addReader(int subtaskId) { - // do nothing + LOG.info("Doris Source Enumerator adds reader: {}", subtaskId); } @Override From 19786a9cb26fc1e2368825727a08eaa01daecc9e Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 21:52:43 +0800 Subject: [PATCH 06/12] update --- .../container/AbstractITCaseService.java | 14 ------------- .../doris/flink/sink/DorisSinkITCase.java | 14 +++++++++++++ .../doris/flink/source/DorisSourceITCase.java | 20 ++++++++++++++++--- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java index fd19329a5..6628933c5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractITCaseService.java @@ -23,12 +23,8 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.RpcServiceSharing; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.SupplierWithException; -import org.junit.Rule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,16 +81,6 @@ protected static void waitUntilCondition( } } - @Rule - public final MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(2) - .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .withHaLeadershipControl() - .build()); - /** The type of failover. */ protected enum FailoverType { TM, diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 18488229e..96562fa40 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -22,8 +22,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -35,6 +38,7 @@ import org.apache.doris.flink.sink.batch.DorisBatchSink; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; import org.apache.doris.flink.utils.MockSource; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +69,16 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(2) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + @Test public void testSinkCsvFormat() throws Exception { initializeTable(TABLE_CSV); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index c6be738af..98b5d459b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -21,9 +21,12 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -34,6 +37,7 @@ import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +63,16 @@ public class DorisSourceITCase extends AbstractITCaseService { static final String TABLE_CSV_JM = "tbl_csv_jm_source"; static final String TABLE_CSV_TM = "tbl_csv_tm_source"; + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(2) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build()); + @Test public void testSource() throws Exception { initializeTable(TABLE_READ); @@ -377,8 +391,8 @@ public void testJobManagerFailoverSource() throws Exception { } @Test - public void testTaskManagerFailoverSink() throws Exception { - LOG.info("start to test TaskManagerFailoverSink."); + public void testTaskManagerFailoverSounce() throws Exception { + LOG.info("start to test TaskManagerFailoverSource."); initializeTable(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); @@ -419,7 +433,7 @@ public void testTaskManagerFailoverSink() throws Exception { } checkResultInAnyOrder( - "testTaskManagerFailoverSink", + "testTaskManagerFailoverSource", expectedSnapshotData.toArray(), fetchRows(iterator, expectedSnapshotData.size()).toArray()); } From e7a679bb1de79f5421c1539c8a0f5ccebac4e482 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 9 Sep 2024 22:20:34 +0800 Subject: [PATCH 07/12] update --- .../doris/flink/source/DorisSourceITCase.java | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 98b5d459b..3af5dad0e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -345,10 +345,10 @@ public void testTableSourceFilterWithUnionAll() throws Exception { @Test public void testJobManagerFailoverSource() throws Exception { LOG.info("start to test JobManagerFailoverSource."); - initializeTable(TABLE_CSV_JM); + initializeTableWithData(TABLE_CSV_JM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(200L); + env.enableCheckpointing(100L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -391,12 +391,12 @@ public void testJobManagerFailoverSource() throws Exception { } @Test - public void testTaskManagerFailoverSounce() throws Exception { + public void testTaskManagerFailoverSource() throws Exception { LOG.info("start to test TaskManagerFailoverSource."); - initializeTable(TABLE_CSV_TM); + initializeTableWithData(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(200L); + env.enableCheckpointing(100L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -476,6 +476,43 @@ private void initializeTable(String table) { String.format("insert into %s.%s values ('apache',12)", DATABASE, table)); } + private void initializeTableWithData(String table) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, table), + String.format("insert into %s.%s values ('1',1),('2',2),('3',3)", DATABASE, table), + String.format( + "insert into %s.%s values ('101',1),('102',1),('103',1)", DATABASE, table), + String.format( + "insert into %s.%s values ('201',2),('202',2),('203',2)", DATABASE, table), + String.format( + "insert into %s.%s values ('301',3),('302',3),('303',3)", DATABASE, table), + String.format( + "insert into %s.%s values ('401',4),('402',4),('403',4)", DATABASE, table), + String.format( + "insert into %s.%s values ('501',5),('502',5),('503',5)", DATABASE, table), + String.format( + "insert into %s.%s values ('601',6),('602',6),('603',6)", DATABASE, table), + String.format( + "insert into %s.%s values ('701',7),('702',7),('703',7)", DATABASE, table), + String.format( + "insert into %s.%s values ('801',8),('802',8),('803',8)", DATABASE, table), + String.format( + "insert into %s.%s values ('901',9),('902',9),('903',9)", + DATABASE, table)); + } + private static List fetchRows(Iterator iter, int size) { List rows = new ArrayList<>(size); while (size > 0 && iter.hasNext()) { From 5f7ad3f2e44652993c69de4a7c4b86620b838223 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 10 Sep 2024 14:44:58 +0800 Subject: [PATCH 08/12] update --- .../source/reader/DorisSourceReader.java | 5 ++ .../doris/flink/source/DorisSourceITCase.java | 56 ++++++++++++++----- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java index 7e2a72087..287f51370 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java @@ -27,6 +27,8 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.source.split.DorisSourceSplit; import org.apache.doris.flink.source.split.DorisSourceSplitState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; @@ -36,6 +38,8 @@ public class DorisSourceReader extends SingleThreadMultiplexSourceReaderBase< List, T, DorisSourceSplit, DorisSourceSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceReader.class); + public DorisSourceReader( DorisOptions options, DorisReadOptions readOptions, @@ -64,6 +68,7 @@ protected void onSplitFinished(Map finishedSplitI @Override protected DorisSourceSplitState initializedState(DorisSourceSplit split) { + LOG.info("Initialized state for split: {}", split); return new DorisSourceSplitState(split); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 3af5dad0e..aaf973e43 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -371,10 +371,8 @@ public void testJobManagerFailoverSource() throws Exception { TableResult tableResult = tEnv.executeSql("select * from doris_source_jm"); CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); - List expectedSnapshotData = new ArrayList<>(); - expectedSnapshotData.addAll( - Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]")); + List expectedSnapshotData = getExpectedData(); if (iterator.hasNext()) { LOG.info("trigger jobmanager failover..."); triggerFailover( @@ -383,11 +381,44 @@ public void testJobManagerFailoverSource() throws Exception { miniClusterResource.getMiniCluster(), () -> sleepMs(100)); } - checkResultInAnyOrder( "testJobManagerFailoverSource", expectedSnapshotData.toArray(), - fetchRows(iterator, expectedSnapshotData.size()).toArray()); + fetchRows(iterator).toArray()); + } + + private static List getExpectedData() { + String[] expected = + new String[] { + "+I[101, 1]", + "+I[102, 1]", + "+I[103, 1]", + "+I[201, 2]", + "+I[202, 2]", + "+I[203, 2]", + "+I[301, 3]", + "+I[302, 3]", + "+I[303, 3]", + "+I[401, 4]", + "+I[402, 4]", + "+I[403, 4]", + "+I[501, 5]", + "+I[502, 5]", + "+I[503, 5]", + "+I[601, 6]", + "+I[602, 6]", + "+I[603, 6]", + "+I[701, 7]", + "+I[702, 7]", + "+I[703, 7]", + "+I[801, 8]", + "+I[802, 8]", + "+I[803, 8]", + "+I[901, 9]", + "+I[902, 9]", + "+I[903, 9]" + }; + return Arrays.asList(expected); } @Test @@ -419,10 +450,7 @@ public void testTaskManagerFailoverSource() throws Exception { TableResult tableResult = tEnv.executeSql("select * from doris_source_tm"); CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); - List expectedSnapshotData = new ArrayList<>(); - expectedSnapshotData.addAll( - Arrays.asList("+I[doris, 18]", "+I[flink, 10]", "+I[apache, 12]")); - + List expectedSnapshotData = getExpectedData(); if (iterator.hasNext()) { LOG.info("trigger taskmanager failover..."); triggerFailover( @@ -435,7 +463,7 @@ public void testTaskManagerFailoverSource() throws Exception { checkResultInAnyOrder( "testTaskManagerFailoverSource", expectedSnapshotData.toArray(), - fetchRows(iterator, expectedSnapshotData.size()).toArray()); + fetchRows(iterator).toArray()); } private void checkResult(String testName, Object[] expected, Object[] actual) { @@ -491,7 +519,6 @@ private void initializeTableWithData(String table) { + "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table), - String.format("insert into %s.%s values ('1',1),('2',2),('3',3)", DATABASE, table), String.format( "insert into %s.%s values ('101',1),('102',1),('103',1)", DATABASE, table), String.format( @@ -513,12 +540,11 @@ private void initializeTableWithData(String table) { DATABASE, table)); } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { + private static List fetchRows(Iterator iter) { + List rows = new ArrayList<>(); + while (iter.hasNext()) { Row row = iter.next(); rows.add(row.toString()); - size--; } return rows; } From d3143f87aa8d78fef378eb0e92ba95a7da9b5407 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 10 Sep 2024 16:04:50 +0800 Subject: [PATCH 09/12] update --- .../source/assigners/SimpleSplitAssigner.java | 2 +- .../doris/flink/source/DorisSourceITCase.java | 22 +++++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index 52afa89b0..3409f6038 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -52,7 +52,7 @@ public void addSplits(Collection splits) { @Override public PendingSplitsCheckpoint snapshotState(long checkpointId) { - LOG.info("Checkpointing {} splits: {}", checkpointId, splits); + LOG.info("Checkpointing splits: {}, id {}", splits, checkpointId); return new PendingSplitsCheckpoint(splits); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index aaf973e43..5a9c200f8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -348,7 +348,7 @@ public void testJobManagerFailoverSource() throws Exception { initializeTableWithData(TABLE_CSV_JM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(100L); + env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -372,7 +372,7 @@ public void testJobManagerFailoverSource() throws Exception { CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); - List expectedSnapshotData = getExpectedData(); + List expectedData = getExpectedData(); if (iterator.hasNext()) { LOG.info("trigger jobmanager failover..."); triggerFailover( @@ -381,10 +381,9 @@ public void testJobManagerFailoverSource() throws Exception { miniClusterResource.getMiniCluster(), () -> sleepMs(100)); } - checkResultInAnyOrder( - "testJobManagerFailoverSource", - expectedSnapshotData.toArray(), - fetchRows(iterator).toArray()); + List actual = fetchRows(iterator); + LOG.info("actual data: {}, expected: {}", actual, expectedData); + Assert.assertTrue(actual.containsAll(expectedData)); } private static List getExpectedData() { @@ -427,7 +426,7 @@ public void testTaskManagerFailoverSource() throws Exception { initializeTableWithData(TABLE_CSV_TM); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); - env.enableCheckpointing(100L); + env.enableCheckpointing(200L); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sourceDDL = @@ -450,7 +449,7 @@ public void testTaskManagerFailoverSource() throws Exception { TableResult tableResult = tEnv.executeSql("select * from doris_source_tm"); CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); - List expectedSnapshotData = getExpectedData(); + List expectedData = getExpectedData(); if (iterator.hasNext()) { LOG.info("trigger taskmanager failover..."); triggerFailover( @@ -460,10 +459,9 @@ public void testTaskManagerFailoverSource() throws Exception { () -> sleepMs(100)); } - checkResultInAnyOrder( - "testTaskManagerFailoverSource", - expectedSnapshotData.toArray(), - fetchRows(iterator).toArray()); + List actual = fetchRows(iterator); + LOG.info("actual data: {}, expected: {}", actual, expectedData); + Assert.assertTrue(actual.containsAll(expectedData)); } private void checkResult(String testName, Object[] expected, Object[] actual) { From fadae59decda28d3815b6bd71a557b47aa91e27b Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 10 Sep 2024 16:45:51 +0800 Subject: [PATCH 10/12] update --- .../doris/flink/source/reader/DorisSourceSplitReader.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index d29dc0db8..ce863377b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -66,7 +66,7 @@ public RecordsWithSplitIds fetch() throws IOException { } private void checkSplitOrStartNext() throws IOException, DorisException { - if (valueReader != null) { + if (valueReader != null && valueReader.hasNext()) { return; } final DorisSourceSplit nextSplit = splits.poll(); @@ -74,6 +74,7 @@ private void checkSplitOrStartNext() throws IOException, DorisException { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); + LOG.info("currentSplitId {}, split {}", currentSplitId, nextSplit); valueReader = ValueReader.createReader( nextSplit.getPartitionDefinition(), options, readOptions, LOG); From 000f2b57d4245df722d0efcd1947523a0ee3b2b0 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 10 Sep 2024 17:06:51 +0800 Subject: [PATCH 11/12] update --- .../flink/source/reader/DorisSourceSplitReader.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index ce863377b..a0337b9f5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -66,7 +66,7 @@ public RecordsWithSplitIds fetch() throws IOException { } private void checkSplitOrStartNext() throws IOException, DorisException { - if (valueReader != null && valueReader.hasNext()) { + if (valueReader != null) { return; } final DorisSourceSplit nextSplit = splits.poll(); @@ -74,14 +74,23 @@ private void checkSplitOrStartNext() throws IOException, DorisException { throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); - LOG.info("currentSplitId {}, split {}", currentSplitId, nextSplit); + LOG.info("Fetch a new split {}", nextSplit); valueReader = ValueReader.createReader( nextSplit.getPartitionDefinition(), options, readOptions, LOG); } private DorisSplitRecords finishSplit() { + if (valueReader != null) { + try { + valueReader.close(); + } catch (Exception e) { + LOG.warn("Error while closing value reader: {}", e.getMessage()); + } + valueReader = null; + } LOG.info("Finished split {}", currentSplitId); + final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; LOG.info("After Finished split {}, {} ", currentSplitId, finishRecords.finishedSplits()); From 75185686d7fc0ca32c3ff642f019a66677fb213a Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 10 Sep 2024 17:17:14 +0800 Subject: [PATCH 12/12] fix --- .../java/org/apache/doris/flink/source/DorisSourceITCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 5a9c200f8..96a08d1cd 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -383,6 +383,7 @@ public void testJobManagerFailoverSource() throws Exception { } List actual = fetchRows(iterator); LOG.info("actual data: {}, expected: {}", actual, expectedData); + Assert.assertTrue(actual.size() >= expectedData.size()); Assert.assertTrue(actual.containsAll(expectedData)); } @@ -461,6 +462,7 @@ public void testTaskManagerFailoverSource() throws Exception { List actual = fetchRows(iterator); LOG.info("actual data: {}, expected: {}", actual, expectedData); + Assert.assertTrue(actual.size() >= expectedData.size()); Assert.assertTrue(actual.containsAll(expectedData)); }