Skip to content

Commit

Permalink
add synchronized
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 committed Aug 27, 2024
1 parent c4ceef3 commit 26613bf
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ protected void submitE2EJob(String jobName, String[] args) {
});
}

protected boolean e2eJobIsRunning() {
return singleThreadExecutor.isRunning();
}

protected void cancelCurrentE2EJob(String jobName) {
LOG.info("{} e2e job will cancel", jobName);
singleThreadExecutor.cancelCurrentJob(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public synchronized Future<?> submitJob(String jobName, Runnable job) {
return currentJob;
}

public boolean isRunning() {
return currentJob != null || !currentJob.isDone() || !currentJob.isCancelled();
}

/**
* Cancels the currently running job if its name matches the provided job name. The job is
* interrupted if it is currently running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.flink.container.AbstractE2EService;
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +33,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
private static final String DATABASE = "test_e2e_mysql";
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
private static final String MYSQL_CONF = "--" + DatabaseSyncConfig.MYSQL_CONF;
private final Object lock = new Object();

private List<String> setMysql2DorisDefaultConfig(List<String> argList) {
// set default mysql config
Expand Down Expand Up @@ -79,16 +81,29 @@ private void initDorisEnvironment() {
"DROP TABLE IF EXISTS test_e2e_mysql.tbl5");
}

private void initEnvironment(String mysqlSourcePath) {
initMysqlEnvironment(mysqlSourcePath);
initDorisEnvironment();
private synchronized void initEnvironment(String jobName, String mysqlSourcePath) {
synchronized (lock) {
while (e2eJobIsRunning()) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
LOG.info(
"start to init mysql to doris environment. jobName={}, mysqlSourcePath={}",
jobName,
mysqlSourcePath);
initMysqlEnvironment(mysqlSourcePath);
initDorisEnvironment();
}
}

@Test
public void testMySQL2Doris() throws Exception {
initEnvironment("container/e2e/mysql2doris/testMySQL2Doris_init.sql");
String jobName = "testMySQL2Doris";
String resourcePath = "container/e2e/mysql2doris/testMySQL2Doris.txt";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2Doris_init.sql");
startMysql2DorisJob(jobName, resourcePath);

// wait 2 times checkpoint
Expand Down Expand Up @@ -142,8 +157,8 @@ public void testMySQL2Doris() throws Exception {

@Test
public void testAutoAddTable() throws InterruptedException {
initEnvironment("container/e2e/mysql2doris/testAutoAddTable_init.sql");
String jobName = "testAutoAddTable";
initEnvironment(jobName, "container/e2e/mysql2doris/testAutoAddTable_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testAutoAddTable.txt");

// wait 2 times checkpoint
Expand Down Expand Up @@ -217,9 +232,9 @@ public void testAutoAddTable() throws InterruptedException {

@Test
public void testMySQL2DorisSQLParse() throws Exception {
initEnvironment("container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql");
String jobName = "testMySQL2DorisSQLParse";
String resourcePath = "container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql");
startMysql2DorisJob(jobName, resourcePath);

// wait 2 times checkpoint
Expand Down Expand Up @@ -290,8 +305,8 @@ public void testMySQL2DorisSQLParse() throws Exception {

@Test
public void testMySQL2DorisByDefault() throws Exception {
initEnvironment("container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql");
String jobName = "testMySQL2DorisByDefault";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisByDefault.txt");

// wait 2 times checkpoint
Expand Down Expand Up @@ -325,8 +340,8 @@ public void testMySQL2DorisByDefault() throws Exception {

@Test
public void testMySQL2DorisEnableDelete() throws Exception {
initEnvironment("container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql");
String jobName = "testMySQL2DorisEnableDelete";
initEnvironment(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql");
startMysql2DorisJob(jobName, "container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt");

// wait 2 times checkpoint
Expand Down Expand Up @@ -365,4 +380,15 @@ public void testMySQL2DorisEnableDelete() throws Exception {
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql, 2);
cancelCurrentE2EJob(jobName);
}

@After
public void close() {
notifyJobCompletion();
}

private void notifyJobCompletion() {
synchronized (lock) {
lock.notifyAll();
}
}
}

0 comments on commit 26613bf

Please sign in to comment.