Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve](test) improve some ut and itcase #483

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void setup()
props.put("sink.enable-2pc", "false");
catalog = new DorisCatalog(TEST_CATALOG_NAME, connectionOptions, TEST_DB, props);
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM);
// Use doris catalog.
tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tEnv.useCatalog(TEST_CATALOG_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public abstract class AbstractContainerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContainerTestBase.class);
private static ContainerService dorisContainerService;
public static final int DEFAULT_PARALLELISM = 2;

@BeforeClass
public static void initContainers() {
Expand Down Expand Up @@ -88,4 +95,20 @@ private static void closeDorisContainer() {
dorisContainerService.close();
LOG.info("Doris container was closed.");
}

// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
public static void assertEqualsInAnyOrder(List<Object> expected, List<Object> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}

public static void assertEqualsInOrder(List<Object> expected, List<Object> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new Object[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected void cancelE2EJob(String jobName) {

private StreamExecutionEnvironment configFlinkEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.function.SupplierWithException;

import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,16 +81,6 @@ protected static void waitUntilCondition(
}
}

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(2)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());

/** The type of failover. */
protected enum FailoverType {
TM,
Expand Down Expand Up @@ -138,4 +124,11 @@ protected static void triggerJobManagerFailover(
LOG.info("flink cluster will grant job master leadership. jobId={}", jobId);
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}

protected void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testDoris2Doris() throws Exception {
LOG.info("Start executing the test case of doris to doris.");
initializeDorisTable();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(2);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
Expand All @@ -35,6 +38,7 @@
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.doris.flink.utils.MockSource;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,6 +69,16 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(2)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());

@Test
public void testSinkCsvFormat() throws Exception {
initializeTable(TABLE_CSV);
Expand Down Expand Up @@ -131,6 +145,7 @@ private void submitJob(
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(DEFAULT_PARALLELISM);
Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();

Expand All @@ -147,7 +162,7 @@ private void submitJob(
public void testTableSinkJsonFormat() throws Exception {
initializeTable(TABLE_JSON_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -196,7 +211,7 @@ public void testTableSinkJsonFormat() throws Exception {
public void testTableBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_TBL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -244,6 +259,7 @@ public void testDataStreamBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_DS);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(DEFAULT_PARALLELISM);
DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();

DorisOptions.Builder dorisBuilder = DorisOptions.builder();
Expand Down Expand Up @@ -283,7 +299,7 @@ public void testDataStreamBatch() throws Exception {
public void testTableGroupCommit() throws Exception {
initializeTable(TABLE_GROUP_COMMIT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -332,7 +348,7 @@ public void testTableGroupCommit() throws Exception {
public void testTableGzFormat() throws Exception {
initializeTable(TABLE_GZ_FORMAT);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Expand Down Expand Up @@ -374,7 +390,7 @@ public void testJobManagerFailoverSink() throws Exception {
LOG.info("start to test JobManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_JM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));

Expand Down Expand Up @@ -434,7 +450,7 @@ public void testTaskManagerFailoverSink() throws Exception {
LOG.info("start to test TaskManagerFailoverSink.");
initializeFailoverTable(TABLE_CSV_TM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));

Expand Down Expand Up @@ -486,13 +502,6 @@ public void testTaskManagerFailoverSink() throws Exception {
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

private void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}

private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
Expand Down
Loading
Loading