diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 23b6cfce53..162334e539 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -88,7 +88,7 @@ jobs:
distribution: "adopt"
cache: "maven"
- name: Build with Maven
- run: ./mvnw -B clean install -Pshaded -DskipTests
+ run: ./mvnw -B clean install -Pshaded,dist -DskipTests
- name: Test with Maven
run: ./mvnw -B test
diff --git a/pom.xml b/pom.xml
index 2706cabc13..a57b3e6594 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
streampark-spark
streampark-storage
streampark-console
+ streampark-test-utils
diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml
index 4202d7c968..adbddafcc1 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -412,6 +412,20 @@
test
+
+ org.rauschig
+ jarchivelib
+ 0.7.1
+ test
+
+
+
+ org.apache.streampark
+ streampark-testcontainer
+ ${project.version}
+ test
+
+
diff --git a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
index 92c73dac48..aa4677c4a9 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
+++ b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
@@ -17,6 +17,7 @@
bin
+ dir
tar.gz
true
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ee2d996d50..7001eaa29c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1649,7 +1649,7 @@ public void start(Application appParam, boolean auto) throws Exception {
}
private Map getProperties(Application application) {
- Map properties = application.getOptionMap();
+ Map properties = new HashMap<>(application.getOptionMap());
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 7111e2758d..517779c462 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -139,6 +139,11 @@ public ResponseResult check(FlinkCluster cluster) {
@Override
public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
+ return internalCreate(flinkCluster);
+ }
+
+ @VisibleForTesting
+ public boolean internalCreate(FlinkCluster flinkCluster) {
boolean successful = validateQueueIfNeeded(flinkCluster);
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT, flinkCluster.getYarnQueue()));
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index 26302387ad..ce271da594 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -38,6 +38,7 @@
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -49,6 +50,8 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -84,7 +87,7 @@ public class FlinkHttpWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
// track interval every 5 seconds
- private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
+ public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
// option interval within 10 seconds
private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
@@ -197,6 +200,12 @@ public void start() {
}
}
+ @VisibleForTesting
+ public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
+ Application app = WATCHING_APPS.get(appId);
+ return (app == null || app.getState() == null) ? null : FlinkAppState.of(app.getState());
+ }
+
private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
new file mode 100644
index 0000000000..eca83226d6
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console;
+
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.SystemPropertyUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.io.IOUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.rauschig.jarchivelib.Archiver;
+import org.rauschig.jarchivelib.ArchiverFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Integration base tester. Note: The all children classes of the base must run after the
+ * project-level package phrase.
+ */
+@Slf4j
+@EnableScheduling
+@ActiveProfiles("integration-test")
+@AutoConfigureTestEntityManager
+@AutoConfigureWebTestClient(timeout = "60000")
+@TestPropertySource(locations = {"classpath:application-integration-test.yml"})
+@ExtendWith({MockitoExtension.class, SpringExtension.class})
+@SpringBootTest(
+ classes = StreamParkConsoleBootstrap.class,
+ webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
+public abstract class SpringIntegrationTestBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(SpringIntegrationTestBase.class);
+
+ protected static final String RUN_PKG_SCRIPT_HINT =
+ "Please run package script before running the test case.";
+
+ protected static final String DEFAULT_APP_HOME_DIR_NAME = "apache-streampark";
+ protected static final String DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME =
+ "flink-official-releases";
+ protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME = "localWorkspace";
+ protected static final String DEFAULT_FLINK_VERSION = "1.17.1";
+ protected static final String DEFAULT_FLINK_DOWNLOAD_URL =
+ "https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz";
+ protected static final FileFilter PKG_NAME_FILTER =
+ file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) && file.isDirectory();
+
+ protected static String defaultFlinkHome;
+ protected static String appHome;
+
+ @BeforeAll
+ public static void init(@TempDir File tempPath) throws IOException {
+
+ LOG.info("Start prepare the real running env.");
+ String tempAbsPath = tempPath.getAbsolutePath();
+ LOG.info("Integration test base tmp dir: {}", tempAbsPath);
+
+ FileUtils.copyDirectory(
+ tryFindStreamParkPackagedDirFile(), new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME));
+
+ prepareDefaultFlinkHome(tempAbsPath, 3);
+ Path localWorkspace =
+ Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());
+
+ appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
+ System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
+ System.setProperty(
+ CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
+ localWorkspace.toAbsolutePath().toString());
+
+ LOG.info(
+ "Complete mock EnvInitializer init, app home: {}, {}: {}",
+ appHome,
+ CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
+ localWorkspace.toAbsolutePath());
+ }
+
+ private static void prepareDefaultFlinkHome(String tempAbsPath, int reties) throws IOException {
+ reties--;
+ if (reties < 0) {
+ LOG.error("Failed to prepareDefaultFlinkHome with multiple-retires.");
+ return;
+ }
+ try {
+ final File flinkOfficialReleases =
+ new File(tempAbsPath, DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME);
+ Files.deleteIfExists(flinkOfficialReleases.toPath());
+ Files.createDirectories(flinkOfficialReleases.toPath());
+
+ defaultFlinkHome =
+ prepareFlinkOfficialRelease(
+ DEFAULT_FLINK_DOWNLOAD_URL,
+ DEFAULT_FLINK_VERSION,
+ flinkOfficialReleases.getAbsolutePath());
+ } catch (Throwable t) {
+ LOG.error("Error in prepareDefaultFlinkHome with exception", t);
+ LOG.warn("Start the next retry.");
+ prepareDefaultFlinkHome(tempAbsPath, reties);
+ }
+ }
+
+ /**
+ * @param httpUrl flink official release download url.
+ * @return return the target un-packed flink home absolute dir.
+ */
+ private static String prepareFlinkOfficialRelease(
+ @Nonnull String httpUrl, @Nonnull String flinkVersion, String workDirAbsolutePath)
+ throws IOException {
+ String downloadedFilePath =
+ new File(workDirAbsolutePath, flinkVersion).getAbsolutePath() + ".tgz";
+ httpDownload(httpUrl, downloadedFilePath);
+ File archive = new File(downloadedFilePath);
+ File destination = new File(archive.getParentFile().getAbsolutePath());
+ Files.createDirectories(destination.toPath());
+
+ Archiver archiver = ArchiverFactory.createArchiver("tar", "gz");
+ archiver.extract(archive, destination);
+ Optional first =
+ Arrays.stream(
+ requireNonNull(
+ destination.listFiles(
+ file -> file.getName().contains(flinkVersion) && file.isDirectory())))
+ .findFirst();
+ File file =
+ first.orElseThrow(
+ () ->
+ new RuntimeException(
+ String.format(
+ "Error in prepareFlinkOfficialRelease for httpUrl: %s, flinkVersion: %s",
+ httpUrl, flinkVersion)));
+
+ LOG.info("Prepared flink release: {}.", file.getAbsolutePath());
+ return file.getAbsolutePath();
+ }
+
+ private static void httpDownload(String httpUrl, String saveFile) {
+
+ try {
+ URL url = new URL(httpUrl);
+ URLConnection conn = url.openConnection();
+ InputStream inStream = conn.getInputStream();
+ FileOutputStream fs = new FileOutputStream(saveFile);
+ IOUtils.copyBytes(inStream, fs, 2048);
+ inStream.close();
+ fs.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static File tryFindStreamParkPackagedDirFile() {
+ String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
+ File pkgTargetDirFile = new File(userDir, "target");
+ Preconditions.checkState(
+ pkgTargetDirFile.exists(),
+ "The target directory of %s doesn't exist. %s",
+ userDir,
+ RUN_PKG_SCRIPT_HINT);
+ Optional availablePkgParentFileOpt =
+ Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
+ final File availablePkgParentFile =
+ availablePkgParentFileOpt.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
+ Optional targetDirFile =
+ Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER)))
+ .findFirst();
+ return targetDirFile.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
similarity index 95%
rename from streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
rename to streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
index 417a117cee..490df51c00 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
@@ -24,6 +24,7 @@
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@@ -33,10 +34,10 @@
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
@@ -44,7 +45,8 @@
import java.nio.file.Path;
/** base tester. */
-@Transactional
+@Slf4j
+@EnableScheduling
@ActiveProfiles("test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
@@ -53,9 +55,9 @@
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
-public abstract class SpringTestBase {
+public abstract class SpringUnitTestBase {
- protected static final Logger LOG = LoggerFactory.getLogger(SpringTestBase.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(SpringUnitTestBase.class);
@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
index f97823ce5f..f20d7fb801 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.util.WebUtils;
@@ -33,7 +33,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-public class AccessTokenServiceTest extends SpringTestBase {
+public class AccessTokenServiceTest extends SpringUnitTestBase {
@Autowired private AccessTokenService accessTokenService;
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
new file mode 100644
index 0000000000..9effabca0e
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.SpringIntegrationTestBase;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.enums.FlinkAppState;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
+import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.Base64;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test for {@link
+ * org.apache.streampark.console.core.service.impl.ApplicationServiceImpl}.
+ */
+class ApplicationServiceITest extends SpringIntegrationTestBase {
+
+ static FlinkStandaloneSessionCluster cluster =
+ FlinkStandaloneSessionCluster.builder().slotsNumPerTm(4).slf4jLogConsumer(null).build();
+
+ @Autowired private ApplicationService appService;
+
+ @Autowired private FlinkClusterService clusterService;
+
+ @Autowired private FlinkEnvService envService;
+
+ @Autowired private AppBuildPipeService appBuildPipeService;
+
+ @Autowired private FlinkSqlService sqlService;
+
+ @Autowired private FlinkHttpWatcher flinkHttpWatcher;
+
+ @BeforeAll
+ static void setup() {
+ cluster.start();
+ }
+
+ @AfterAll
+ static void teardown() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ void clear() {
+ appService.getBaseMapper().delete(new QueryWrapper<>());
+ clusterService.getBaseMapper().delete(new QueryWrapper<>());
+ envService.getBaseMapper().delete(new QueryWrapper<>());
+ appBuildPipeService.getBaseMapper().delete(new QueryWrapper<>());
+ sqlService.getBaseMapper().delete(new QueryWrapper<>());
+ }
+
+ @Test
+ @Timeout(value = 180)
+ void testStartAppOnRemoteSessionMode() throws Exception {
+ FlinkEnv flinkEnv = new FlinkEnv();
+ flinkEnv.setFlinkHome(defaultFlinkHome);
+ flinkEnv.setFlinkName(DEFAULT_FLINK_VERSION);
+ flinkEnv.setId(1L);
+ envService.create(flinkEnv);
+ FlinkCluster flinkCluster = new FlinkCluster();
+ flinkCluster.setId(1L);
+ flinkCluster.setAddress(cluster.getFlinkJobManagerUrl());
+ flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode());
+ flinkCluster.setClusterName("docker-Cluster-1.17.1");
+ flinkCluster.setVersionId(1L);
+ flinkCluster.setUserId(100000L);
+ ((FlinkClusterServiceImpl) clusterService).internalCreate(flinkCluster);
+ Application appParam = new Application();
+ appParam.setId(100000L);
+ appParam.setTeamId(100000L);
+ Application application = appService.getApp(appParam);
+ application.setFlinkClusterId(1L);
+ application.setSqlId(100000L);
+ application.setVersionId(1L);
+ application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+
+ // Avoid exceptional error.
+ application.setFlinkSql(
+ new String(Base64.getDecoder().decode(application.getFlinkSql().getBytes())));
+ FlinkSql flinkSql = sqlService.getEffective(application.getId(), false);
+ flinkSql.setSql(DeflaterUtils.zipString(flinkSql.getSql()));
+ sqlService.getBaseMapper().updateById(flinkSql);
+
+ // Continue operations link.
+ appService.update(application);
+ appBuildPipeService.buildApplication(100000L, false);
+
+ CompletableFuture buildCompletableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ while (true) {
+ Application app = appService.getById(100000L);
+ if (app != null && app.getReleaseState() == ReleaseState.DONE) {
+ break;
+ }
+ }
+ return true;
+ });
+ buildCompletableFuture.get();
+
+ appService.start(appService.getById(100000L), false);
+ CompletableFuture completableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ while (true) {
+ if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId())
+ == FlinkAppState.RUNNING) {
+ break;
+ }
+ }
+ return true;
+ });
+
+ assertThat(completableFuture.get(WATCHING_INTERVAL.toMillis() * 8, TimeUnit.MILLISECONDS))
+ .isEqualTo(true);
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
index ea5c345602..f6ce39a54a 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.YarnQueue;
import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl;
@@ -34,8 +34,8 @@
import static org.assertj.core.api.Assertions.assertThat;
-/** org.apache.streampark.console.core.service.ApplicationServiceTest. */
-class ApplicationServiceTest extends SpringTestBase {
+/** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */
+class ApplicationServiceTest extends SpringUnitTestBase {
@Autowired private ApplicationService applicationService;
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
index 6073556e81..c1fef933cf 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;
import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl;
@@ -31,7 +31,7 @@
import static org.assertj.core.api.Assertions.assertThat;
/** The unit test class for {@link FlinkClusterService}. */
-class FlinkClusterServiceTest extends SpringTestBase {
+class FlinkClusterServiceTest extends SpringUnitTestBase {
@Autowired private FlinkClusterService flinkClusterService;
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
index e6c00ec717..0426153687 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.hc.core5.http.ContentType;
@@ -35,7 +35,7 @@
import static org.assertj.core.api.Assertions.assertThat;
/** org.apache.streampark.console.core.service.ResourceServiceTest. */
-class ResourceServiceTest extends SpringTestBase {
+class ResourceServiceTest extends SpringUnitTestBase {
@Autowired private ResourceService resourceService;
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
index 53133143b7..93968dd6f1 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -21,7 +21,7 @@
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.DeflaterUtils;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.Effective;
@@ -46,7 +46,7 @@
* org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of {@link
* SavePointService}.
*/
-class SavePointServiceTest extends SpringTestBase {
+class SavePointServiceTest extends SpringUnitTestBase {
@Autowired private SavePointService savePointService;
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
index b3157193ed..1c05924639 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Resource;
@@ -27,16 +27,17 @@
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.UserService;
-import com.baomidou.mybatisplus.extension.toolkit.Db;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.Map;
/** org.apache.streampark.console.core.service.UserServiceTest. */
-class UserServiceTest extends SpringTestBase {
+@Transactional
+class UserServiceTest extends SpringUnitTestBase {
@Autowired private UserService userService;
@Autowired private ApplicationService applicationService;
@Autowired private ResourceService resourceService;
@@ -50,7 +51,7 @@ void testLockUser() throws Exception {
user.setPassword("test");
user.setUserType(UserType.USER);
user.setStatus(User.STATUS_VALID);
- Db.save(user);
+ userService.createUser(user);
// lock user
user.setStatus(User.STATUS_LOCK);
Map data =
@@ -74,7 +75,7 @@ void testLockUser() throws Exception {
resource.setEngineType(EngineType.FLINK);
resource.setTeamId(1L);
resource.setCreatorId(user.getUserId());
- Db.save(resource);
+ resourceService.save(resource);
// lock user when has resource
user.setStatus(User.STATUS_LOCK);
Map data2 =
@@ -93,7 +94,7 @@ void testTransferResource() {
user.setPassword("test");
user.setUserType(UserType.USER);
user.setStatus(User.STATUS_VALID);
- Db.save(user);
+ userService.save(user);
Resource resource = new Resource();
resource.setResourceName("test");
@@ -101,12 +102,12 @@ void testTransferResource() {
resource.setEngineType(EngineType.FLINK);
resource.setTeamId(1L);
resource.setCreatorId(user.getUserId());
- Db.save(resource);
+ resourceService.save(resource);
Application app = new Application();
app.setUserId(user.getUserId());
app.setTeamId(1L);
- Db.save(app);
+ applicationService.save(app);
User targetUser = new User();
targetUser.setUsername("test0");
@@ -114,7 +115,7 @@ void testTransferResource() {
targetUser.setPassword("test0");
targetUser.setUserType(UserType.USER);
targetUser.setStatus(User.STATUS_VALID);
- Db.save(targetUser);
+ userService.save(targetUser);
Assertions.assertTrue(applicationService.existsByUserId(user.getUserId()));
Assertions.assertTrue(resourceService.existsByUserId(user.getUserId()));
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
index d1c0436d48..8ac9859486 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.core.entity.Variable;
import org.junit.jupiter.api.Assertions;
@@ -25,7 +25,7 @@
import org.springframework.beans.factory.annotation.Autowired;
/** org.apache.streampark.console.core.service.VariableServiceTest */
-class VariableServiceTest extends SpringTestBase {
+class VariableServiceTest extends SpringUnitTestBase {
@Autowired private VariableService variableService;
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
index 317ff4b2af..9a91d7b68a 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.bean.ResponseResult;
@@ -47,7 +47,7 @@
* avoid noisy data form h2 database.
*/
@Execution(SAME_THREAD)
-class YarnQueueServiceTest extends SpringTestBase {
+class YarnQueueServiceTest extends SpringUnitTestBase {
@Autowired private FlinkClusterService flinkClusterService;
@@ -91,7 +91,7 @@ void testFindYarnQueues() {
yarnQueues.getRecords().stream()
.map(YarnQueue::getQueueLabel)
.collect(Collectors.toList()))
- .containsExactly(q3AtL3, q3AtL1);
+ .containsExactlyInAnyOrder(q3AtL3, q3AtL1);
// Test for 1st page, size = 2, order by create time with queue_label
queryParams.setQueueLabel("q3");
@@ -101,7 +101,7 @@ void testFindYarnQueues() {
yarnQueuesWithQueueLabelLikeQuery.getRecords().stream()
.map(YarnQueue::getQueueLabel)
.collect(Collectors.toList()))
- .containsExactly(q3AtL3, q3AtL1);
+ .containsExactlyInAnyOrder(q3AtL3, q3AtL1);
}
@Test
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
index 4acc63e8bd..e3501841f5 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.system.authentication;
import org.apache.streampark.common.util.DateUtils;
-import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.system.entity.AccessToken;
import com.auth0.jwt.JWT;
@@ -28,7 +28,7 @@
import java.util.Date;
import java.util.TimeZone;
-class JWTTest extends SpringTestBase {
+class JWTTest extends SpringUnitTestBase {
@Test
void testExpireTime() {
diff --git a/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
new file mode 100644
index 0000000000..aa6f18449e
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+logging:
+ level:
+ root: info
+
+spring:
+ datasource:
+ driver-class-name: org.h2.Driver
+ url: jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript from 'classpath:db/schema-h2.sql'
+ username: sa
+ password: sa
+ sql:
+ init:
+ data-locations: classpath:db/data-h2.sql
+ continue-on-error: true
+ username: sa
+ password: sa
+ mode: always
+server:
+ port: 6666
+
+streampark:
+ workspace:
+ local: /tmp
+ # remote: hdfs://hdfscluster/streampark
+ # hadoop-user-name: root
diff --git a/streampark-test-utils/pom.xml b/streampark-test-utils/pom.xml
new file mode 100644
index 0000000000..90c3b252fb
--- /dev/null
+++ b/streampark-test-utils/pom.xml
@@ -0,0 +1,40 @@
+
+
+
+ 4.0.0
+
+ org.apache.streampark
+ streampark
+ 2.2.0-SNAPSHOT
+
+
+ streampark-test-utils
+ pom
+
+ StreamPark : Test Utils
+ http://maven.apache.org
+
+ streampark-testcontainer
+
+
+
+ UTF-8
+
+
+
diff --git a/streampark-test-utils/streampark-testcontainer/pom.xml b/streampark-test-utils/streampark-testcontainer/pom.xml
new file mode 100644
index 0000000000..668f826a9f
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/pom.xml
@@ -0,0 +1,105 @@
+
+
+
+ 4.0.0
+
+ org.apache.streampark
+ streampark-test-utils
+ 2.2.0-SNAPSHOT
+
+
+ streampark-testcontainer
+ jar
+
+ StreamPark : Test Container
+ http://maven.apache.org
+
+
+ UTF-8
+ 1.6.21
+ 3.0.2
+ 1.16.2
+
+
+
+
+
+ org.testcontainers
+ testcontainers-bom
+ ${testcontainer.version}
+ pom
+ import
+
+
+
+
+
+
+ org.testcontainers
+ testcontainers
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ ${httpclient5.version}
+ test
+
+
+ org.jetbrains.kotlin
+ kotlin-reflect
+ ${kotlin-reflect.version}
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${jupiter.version}
+ test
+
+
+ org.apache.streampark
+ streampark-common_2.12
+ ${project.version}
+
+
+ com.google.code.findbugs
+ jsr305
+ ${findbugs.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
new file mode 100644
index 0000000000..13a62e01fc
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import javax.annotation.Nonnull;
+
+enum FlinkComponent {
+ JOBMANAGER("jobmanager"),
+ TASKMANAGER("taskmanager");
+
+ private final String name;
+
+ FlinkComponent(@Nonnull String name) {
+ this.name = name;
+ }
+
+ @Nonnull
+ public String getName() {
+ return name;
+ }
+}
diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
new file mode 100644
index 0000000000..d9814f0d22
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER;
+import static org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER;
+
+class FlinkContainer extends GenericContainer {
+
+ public static final AtomicInteger TM_COUNT = new AtomicInteger(0);
+
+ public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES";
+
+ private final @Nonnull FlinkComponent component;
+
+ FlinkContainer(
+ @Nonnull DockerImageName dockerImageName,
+ @Nonnull FlinkComponent component,
+ @Nonnull Network network,
+ @Nonnull String yamlPropStr,
+ @Nullable Slf4jLogConsumer slf4jLogConsumer) {
+ super(dockerImageName);
+ this.component = component;
+ this.withCommand("/docker-entrypoint.sh", component.getName());
+ this.withCreateContainerCmdModifier(
+ createContainerCmd -> createContainerCmd.withName(getFlinkContainerName()));
+ this.withNetwork(network);
+ this.withEnv(FLINK_PROPS_KEY, yamlPropStr);
+ Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer);
+ }
+
+ protected String getFlinkContainerName() {
+ if (component == JOBMANAGER) {
+ return JOBMANAGER.getName();
+ }
+ return String.format("%s_%s", TASKMANAGER.getName(), TM_COUNT.incrementAndGet());
+ }
+}
diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
new file mode 100644
index 0000000000..e5f01dea6f
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.apache.streampark.common.util.Utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.lifecycle.Startable;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER;
+import static org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER;
+
+/**
+ * Class to start a couple of flink 1-jobmanager & n-taskmanagers. The priority of flinkYamlConfStr
+ * is the highest. But: The 'jobmanager.rpc.address' is always 'jobmanager'. The 'rest.port' always
+ * is 8081.
+ */
+public class FlinkStandaloneSessionCluster implements Startable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(FlinkStandaloneSessionCluster.class);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address";
+ public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots";
+ public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", SLOT_CONF_KEY);
+
+ public static final int BLOB_SERVER_PORT = 6123;
+ public static final int WEB_PORT = 8081;
+
+ private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName());
+
+ private final FlinkContainer jobManagerContainer;
+
+ private final List taskManagerContainers = new ArrayList<>();
+
+ private FlinkStandaloneSessionCluster(
+ DockerImageName dockerImageName,
+ int taskManagerNum,
+ int slotsNumPerTm,
+ @Nullable String yamlConfStr,
+ Slf4jLogConsumer slf4jLogConsumer) {
+
+ renderJmRpcConfIfNeeded(yamlConfStr);
+
+ renderSlotNumIfNeeded(slotsNumPerTm);
+
+ // Set for JM.
+ this.jobManagerContainer =
+ new FlinkContainer(
+ dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer);
+ this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT);
+ this.jobManagerContainer.addExposedPort(WEB_PORT);
+
+ this.jobManagerContainer.setWaitStrategy(
+ Wait.forHttp("/config")
+ .forStatusCode(200)
+ .forPort(WEB_PORT)
+ .withStartupTimeout(Duration.ofMinutes(8)));
+
+ // Set for TMs.
+ for (int i = 0; i < taskManagerNum; i++) {
+ FlinkContainer taskManager =
+ new FlinkContainer(
+ dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer);
+ this.taskManagerContainers.add(taskManager);
+ }
+ }
+
+ public String getFlinkJobManagerUrl() {
+ return String.format(
+ "http://%s:%s", jobManagerContainer.getHost(), jobManagerContainer.getMappedPort(WEB_PORT));
+ }
+
+ @Override
+ public void start() {
+ Utils.notNull(jobManagerContainer);
+ jobManagerContainer.start();
+ Utils.notNull(taskManagerContainers);
+ for (FlinkContainer taskManagerContainer : taskManagerContainers) {
+ taskManagerContainer.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ Utils.notNull(taskManagerContainers);
+ for (FlinkContainer taskManagerContainer : taskManagerContainers) {
+ taskManagerContainer.stop();
+ }
+ Utils.notNull(jobManagerContainer);
+ jobManagerContainer.stop();
+ }
+
+ private void renderSlotNumIfNeeded(int slotsNumPerTm) {
+ if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) {
+ this.yamlConfStr =
+ String.format(
+ "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, slotsNumPerTm));
+ }
+ }
+
+ private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) {
+ this.yamlConfStr =
+ yamlConfStr == null
+ ? this.yamlConfStr
+ : (yamlConfStr.contains(JM_RPC_ADDR_KEY)
+ ? yamlConfStr
+ : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr));
+ }
+
+ public static class Builder {
+
+ private DockerImageName dockerImageName =
+ DockerImageName.parse("apache/flink:1.17.1-scala_2.12");
+ private int taskManagerNum = 1;
+ private int slotsNumPerTm = 8;
+ private @Nullable String yamlConfStr;
+ private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, false);
+
+ private Builder() {}
+
+ public Builder dockerImageName(DockerImageName dockerImageName) {
+ this.dockerImageName = dockerImageName;
+ return this;
+ }
+
+ public Builder taskManagerNum(int taskManagerNum) {
+ Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1.");
+ this.taskManagerNum = taskManagerNum;
+ return this;
+ }
+
+ public Builder slotsNumPerTm(int slotsNumPerTm) {
+ Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0.");
+ this.slotsNumPerTm = slotsNumPerTm;
+ return this;
+ }
+
+ public Builder yamlConfStr(@Nullable String yamlConfStr) {
+ this.yamlConfStr = yamlConfStr;
+ return this;
+ }
+
+ public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) {
+ this.slf4jLogConsumer = slf4jLogConsumer;
+ return this;
+ }
+
+ public FlinkStandaloneSessionCluster build() {
+ return new FlinkStandaloneSessionCluster(
+ dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, slf4jLogConsumer);
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+}
diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
new file mode 100644
index 0000000000..93e34cd43b
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.hadoop;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.annotation.Nonnull;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Hadoop Container for integration test. Note: It's experimental now. */
+public class HadoopContainer extends GenericContainer {
+
+ public static final Logger LOG = LoggerFactory.getLogger(HadoopContainer.class);
+
+ // Hadoop version is 2.7.0
+ public static final DockerImageName DOCKER_IMAGE_NAME =
+ DockerImageName.parse("sequenceiq/hadoop-docker:latest");
+
+ public static final Map MAPPED_PORTS =
+ new HashMap() {
+ {
+ put(50070, 50070);
+ put(8088, 8088);
+ put(9000, 9000);
+ }
+ };
+
+ public HadoopContainer() {
+ this(DOCKER_IMAGE_NAME);
+ }
+
+ public HadoopContainer(@Nonnull DockerImageName dockerImageName) {
+ super(dockerImageName);
+ MAPPED_PORTS.forEach(this::addFixedExposedPort);
+ this.setPrivilegedMode(true);
+ this.withCreateContainerCmdModifier(
+ createContainerCmd -> createContainerCmd.withName("one-container-hadoop-cluster"));
+ WaitAllStrategy waitAllStrategy =
+ new WaitAllStrategy()
+ .withStrategy(
+ Wait.forHttp("/ws/v1/cluster/info")
+ .forPort(8088)
+ .forStatusCode(200)
+ .withStartupTimeout(Duration.ofMinutes(8)))
+ .withStrategy(
+ Wait.forHttp("")
+ .forPort(50070)
+ .forStatusCode(200)
+ .withStartupTimeout(Duration.ofMinutes(8)))
+ .withStrategy(Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(8)))
+ .withStartupTimeout(Duration.ofMinutes(8));
+ this.waitingFor(waitAllStrategy);
+ this.withLogConsumer(new Slf4jLogConsumer(LOG));
+ this.withCommand("/etc/bootstrap.sh", "-d");
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ this.waitUntilContainerStarted();
+ }
+}
diff --git a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
new file mode 100644
index 0000000000..6f34e720f7
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.flink;
+
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for flink standalone session cluster env available. */
+class FlinkStandaloneSessionClusterITest {
+
+ private final FlinkStandaloneSessionCluster cluster =
+ FlinkStandaloneSessionCluster.builder().build();
+
+ @BeforeEach
+ void up() {
+ cluster.start();
+ }
+
+ @AfterEach
+ void down() {
+ cluster.stop();
+ }
+
+ @Test
+ void testRestApiAvailable() throws IOException {
+ String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), "config");
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
+ assertThat(response.getCode()).isEqualTo(200);
+ }
+}
diff --git a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
new file mode 100644
index 0000000000..961d76c433
--- /dev/null
+++ b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.testcontainer.hadoop;
+
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HadoopContainer}. */
+class HadoopContainerTest {
+ static final HadoopContainer HADOOP_CONTAINER = new HadoopContainer();
+
+ @BeforeAll
+ static void setup() {
+ HADOOP_CONTAINER.start();
+ }
+
+ @AfterAll
+ static void teardown() {
+ HADOOP_CONTAINER.stop();
+ }
+
+ @Test
+ @Timeout(value = 8, unit = TimeUnit.MINUTES)
+ void testOverview() throws IOException {
+
+ String url = String.format("http://%s:%s/ws/v1/cluster/info", HADOOP_CONTAINER.getHost(), 8088);
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
+ assertThat(response.getCode()).isEqualTo(200);
+
+ url = String.format("http://%s:%s", HADOOP_CONTAINER.getHost(), 50070);
+ httpClient = HttpClients.createDefault();
+ response = httpClient.execute(new HttpGet(url));
+ assertThat(response.getCode()).isEqualTo(200);
+ }
+}