Skip to content

Commit

Permalink
[improve]Transform the End-To-End(E2E) tasks on the assembly line (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Sep 4, 2024
1 parent b42bc6b commit 175f0ff
Show file tree
Hide file tree
Showing 30 changed files with 1,945 additions and 1,472 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ header:
- '.github/PULL_REQUEST_TEMPLATE.md'
- '.licenserc.yaml'
- 'custom_env.sh.tpl'
- 'flink-doris-connector/src/test/resources/container/'

comment: on-failure
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand All @@ -36,11 +37,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/** cdc sync tools. */
public class CdcTools {
private static final List<String> EMPTY_KEYS =
Collections.singletonList(DatabaseSyncConfig.PASSWORD);
private static StreamExecutionEnvironment flinkEnvironmentForTesting;
private static JobClient jobClient;

public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
Expand Down Expand Up @@ -146,7 +150,10 @@ private static void syncDatabase(
new DorisTableConfig(getConfigMap(params, DatabaseSyncConfig.TABLE_CONF));
Configuration sinkConfig = Configuration.fromMap(sinkMap);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =
Objects.nonNull(flinkEnvironmentForTesting)
? flinkEnvironmentForTesting
: StreamExecutionEnvironment.getExecutionEnvironment();
databaseSync
.setEnv(env)
.setDatabase(database)
Expand Down Expand Up @@ -174,7 +181,23 @@ private static void syncDatabase(
config.getString(
DatabaseSyncConfig.DATABASE_NAME, DatabaseSyncConfig.DB));
}
env.execute(jobName);
if (Objects.nonNull(flinkEnvironmentForTesting)) {
jobClient = env.executeAsync();
} else {
env.execute(jobName);
}
}

@VisibleForTesting
public static JobClient getJobClient() {
return jobClient;
}

// Only for testing, please do not use it in actual environment
@VisibleForTesting
public static void setStreamExecutionEnvironmentForTesting(
StreamExecutionEnvironment environment) {
flinkEnvironmentForTesting = environment;
}

@VisibleForTesting
Expand Down

This file was deleted.

Loading

0 comments on commit 175f0ff

Please sign in to comment.