Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve]Transform the End-To-End(E2E) tasks on the assembly line #466

Merged
merged 48 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
c31b582
improve e2e case
DongLiang-0 Aug 15, 2024
ebb21da
fix
DongLiang-0 Aug 15, 2024
a812ae5
fix
DongLiang-0 Aug 15, 2024
3b0d92e
fix
DongLiang-0 Aug 15, 2024
1788ae3
fix
DongLiang-0 Aug 15, 2024
b3592a9
fix
DongLiang-0 Aug 15, 2024
4932b65
fix
DongLiang-0 Aug 15, 2024
d33add3
mini fix
DongLiang-0 Aug 15, 2024
704c355
mini fix
DongLiang-0 Aug 15, 2024
8007a92
fix comment
DongLiang-0 Aug 15, 2024
d5f7a4a
fix
DongLiang-0 Aug 15, 2024
fe596b3
fix
DongLiang-0 Aug 22, 2024
ad98652
fix
DongLiang-0 Aug 26, 2024
d7b29d4
fix
DongLiang-0 Aug 26, 2024
e51a436
fix
DongLiang-0 Aug 26, 2024
257431a
fix
DongLiang-0 Aug 26, 2024
3a03df2
fix
DongLiang-0 Aug 26, 2024
edbda2a
fix
DongLiang-0 Aug 26, 2024
91e0515
fix
DongLiang-0 Aug 26, 2024
b17b6cd
fix
DongLiang-0 Aug 26, 2024
4bafebd
fix
DongLiang-0 Aug 26, 2024
732a1a6
fix
DongLiang-0 Aug 26, 2024
46820c8
fix
DongLiang-0 Aug 26, 2024
ff3ba64
fix
DongLiang-0 Aug 26, 2024
9f35458
fix
DongLiang-0 Aug 27, 2024
2f677dc
add synchronized
DongLiang-0 Aug 27, 2024
848d9db
fix synchronized
DongLiang-0 Aug 27, 2024
4f84984
fix synchronized
DongLiang-0 Aug 27, 2024
42b0a00
use CountDownLatch
DongLiang-0 Aug 28, 2024
3a0bfb3
fix latch
DongLiang-0 Aug 28, 2024
c9d0e19
use semaphore
DongLiang-0 Aug 28, 2024
d7d5c33
fix result
DongLiang-0 Aug 28, 2024
fd730f1
fix doris to doris
DongLiang-0 Aug 28, 2024
aded507
fix
DongLiang-0 Aug 28, 2024
043e608
fix thread
DongLiang-0 Aug 28, 2024
232f1c2
fix pom
DongLiang-0 Aug 28, 2024
ac65c6a
fix pom
DongLiang-0 Aug 28, 2024
993291d
fix thread pool
DongLiang-0 Aug 28, 2024
58cea54
fix thread pool
DongLiang-0 Aug 29, 2024
ebd9e02
fix
DongLiang-0 Aug 30, 2024
2e76bc4
rebase master
DongLiang-0 Sep 2, 2024
d368827
remove customer single thread
DongLiang-0 Sep 3, 2024
cc0d07d
fix DorisSourceITCase
DongLiang-0 Sep 3, 2024
b897ff0
fix DorisSinkITCase name
DongLiang-0 Sep 3, 2024
b15884d
fix sourceIT
DongLiang-0 Sep 3, 2024
6f366d0
fix sourceIT
DongLiang-0 Sep 3, 2024
a369925
fix sourceIT
DongLiang-0 Sep 3, 2024
71f37d7
fix sourceIT
DongLiang-0 Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ header:
- '.github/PULL_REQUEST_TEMPLATE.md'
- '.licenserc.yaml'
- 'custom_env.sh.tpl'
- 'flink-doris-connector/src/test/resources/container/'

comment: on-failure
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand All @@ -36,11 +37,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/** cdc sync tools. */
public class CdcTools {
private static final List<String> EMPTY_KEYS =
Collections.singletonList(DatabaseSyncConfig.PASSWORD);
private static StreamExecutionEnvironment flinkEnvironmentForTesting;
private static JobClient jobClient;

public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
Expand Down Expand Up @@ -146,7 +150,10 @@ private static void syncDatabase(
new DorisTableConfig(getConfigMap(params, DatabaseSyncConfig.TABLE_CONF));
Configuration sinkConfig = Configuration.fromMap(sinkMap);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =
Objects.nonNull(flinkEnvironmentForTesting)
? flinkEnvironmentForTesting
: StreamExecutionEnvironment.getExecutionEnvironment();
databaseSync
.setEnv(env)
.setDatabase(database)
Expand Down Expand Up @@ -174,7 +181,23 @@ private static void syncDatabase(
config.getString(
DatabaseSyncConfig.DATABASE_NAME, DatabaseSyncConfig.DB));
}
env.execute(jobName);
if (Objects.nonNull(flinkEnvironmentForTesting)) {
jobClient = env.executeAsync();
} else {
env.execute(jobName);
}
}

@VisibleForTesting
public static JobClient getJobClient() {
return jobClient;
}

// Only for testing, please do not use it in actual environment
@VisibleForTesting
public static void setStreamExecutionEnvironmentForTesting(
StreamExecutionEnvironment environment) {
flinkEnvironmentForTesting = environment;
}

@VisibleForTesting
Expand Down

This file was deleted.

Loading
Loading