From 90a518a5f0fbcdd9aad0e25e9254f5870a132b9f Mon Sep 17 00:00:00 2001 From: Roc Marshal Date: Sun, 16 Jul 2023 22:22:39 +0800 Subject: [PATCH] Add test-container utils for integration test. --- .github/workflows/maven.yml | 2 +- pom.xml | 1 + .../streampark-console-service/pom.xml | 14 ++ .../src/main/assembly/assembly.xml | 1 + .../service/impl/ApplicationServiceImpl.java | 2 +- .../service/impl/FlinkClusterServiceImpl.java | 5 + .../console/core/task/FlinkHttpWatcher.java | 11 +- .../console/SpringIntegrationTestBase.java | 211 ++++++++++++++++++ ...gTestBase.java => SpringUnitTestBase.java} | 10 +- .../core/service/AccessTokenServiceTest.java | 4 +- .../core/service/ApplicationServiceITest.java | 153 +++++++++++++ .../core/service/ApplicationServiceTest.java | 6 +- .../core/service/FlinkClusterServiceTest.java | 4 +- .../core/service/ResourceServiceTest.java | 4 +- .../core/service/SavePointServiceTest.java | 4 +- .../console/core/service/UserServiceTest.java | 19 +- .../core/service/VariableServiceTest.java | 4 +- .../core/service/YarnQueueServiceTest.java | 8 +- .../system/authentication/JWTTest.java | 4 +- .../application-integration-test.yml | 41 ++++ streampark-test-utils/pom.xml | 40 ++++ .../streampark-testcontainer/pom.xml | 105 +++++++++ .../testcontainer/flink/FlinkComponent.java | 36 +++ .../testcontainer/flink/FlinkContainer.java | 64 ++++++ .../flink/FlinkStandaloneSessionCluster.java | 185 +++++++++++++++ .../testcontainer/hadoop/HadoopContainer.java | 86 +++++++ .../FlinkStandaloneSessionClusterITest.java | 56 +++++ .../hadoop/HadoopContainerTest.java | 63 ++++++ 28 files changed, 1108 insertions(+), 35 deletions(-) create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java rename streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/{SpringTestBase.java => SpringUnitTestBase.java} (95%) create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java create mode 100644 streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml create mode 100644 streampark-test-utils/pom.xml create mode 100644 streampark-test-utils/streampark-testcontainer/pom.xml create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 23b6cfce53..162334e539 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -88,7 +88,7 @@ jobs: distribution: "adopt" cache: "maven" - name: Build with Maven - run: ./mvnw -B clean install -Pshaded -DskipTests + run: ./mvnw -B clean install -Pshaded,dist -DskipTests - name: Test with Maven run: ./mvnw -B test diff --git a/pom.xml b/pom.xml index 2706cabc13..a57b3e6594 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ streampark-spark streampark-storage streampark-console + streampark-test-utils diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index 4202d7c968..adbddafcc1 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -412,6 +412,20 @@ test + + org.rauschig + jarchivelib + 0.7.1 + test + + + + org.apache.streampark + streampark-testcontainer + ${project.version} + test + + diff --git a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml index 92c73dac48..aa4677c4a9 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml +++ b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml @@ -17,6 +17,7 @@ bin + dir tar.gz true diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index ee2d996d50..7001eaa29c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1649,7 +1649,7 @@ public void start(Application appParam, boolean auto) throws Exception { } private Map getProperties(Application application) { - Map properties = application.getOptionMap(); + Map properties = new HashMap<>(application.getOptionMap()); if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); ApiAlertException.throwIfNull( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 7111e2758d..517779c462 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -139,6 +139,11 @@ public ResponseResult check(FlinkCluster cluster) { @Override public Boolean create(FlinkCluster flinkCluster) { flinkCluster.setUserId(commonService.getUserId()); + return internalCreate(flinkCluster); + } + + @VisibleForTesting + public boolean internalCreate(FlinkCluster flinkCluster) { boolean successful = validateQueueIfNeeded(flinkCluster); ApiAlertException.throwIfFalse( successful, String.format(ERROR_CLUSTER_QUEUE_HINT, flinkCluster.getYarnQueue())); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java index 26302387ad..ce271da594 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java @@ -38,6 +38,7 @@ import org.apache.streampark.console.core.service.alert.AlertService; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.hc.client5.http.config.RequestConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -49,6 +50,8 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -84,7 +87,7 @@ public class FlinkHttpWatcher { @Autowired private FlinkClusterWatcher flinkClusterWatcher; // track interval every 5 seconds - private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5); + public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5); // option interval within 10 seconds private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10); @@ -197,6 +200,12 @@ public void start() { } } + @VisibleForTesting + public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) { + Application app = WATCHING_APPS.get(appId); + return (app == null || app.getState() == null) ? null : FlinkAppState.of(app.getState()); + } + private void watch(Long id, Application application) { EXECUTOR.execute( () -> { diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java new file mode 100644 index 0000000000..eca83226d6 --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console; + +import org.apache.streampark.common.conf.CommonConfig; +import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.util.SystemPropertyUtils; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.io.IOUtils; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.junit.jupiter.MockitoExtension; +import org.rauschig.jarchivelib.Archiver; +import org.rauschig.jarchivelib.ArchiverFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * Integration base tester. Note: The all children classes of the base must run after the + * project-level package phrase. + */ +@Slf4j +@EnableScheduling +@ActiveProfiles("integration-test") +@AutoConfigureTestEntityManager +@AutoConfigureWebTestClient(timeout = "60000") +@TestPropertySource(locations = {"classpath:application-integration-test.yml"}) +@ExtendWith({MockitoExtension.class, SpringExtension.class}) +@SpringBootTest( + classes = StreamParkConsoleBootstrap.class, + webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +public abstract class SpringIntegrationTestBase { + protected static final Logger LOG = LoggerFactory.getLogger(SpringIntegrationTestBase.class); + + protected static final String RUN_PKG_SCRIPT_HINT = + "Please run package script before running the test case."; + + protected static final String DEFAULT_APP_HOME_DIR_NAME = "apache-streampark"; + protected static final String DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME = + "flink-official-releases"; + protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME = "localWorkspace"; + protected static final String DEFAULT_FLINK_VERSION = "1.17.1"; + protected static final String DEFAULT_FLINK_DOWNLOAD_URL = + "https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz"; + protected static final FileFilter PKG_NAME_FILTER = + file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) && file.isDirectory(); + + protected static String defaultFlinkHome; + protected static String appHome; + + @BeforeAll + public static void init(@TempDir File tempPath) throws IOException { + + LOG.info("Start prepare the real running env."); + String tempAbsPath = tempPath.getAbsolutePath(); + LOG.info("Integration test base tmp dir: {}", tempAbsPath); + + FileUtils.copyDirectory( + tryFindStreamParkPackagedDirFile(), new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME)); + + prepareDefaultFlinkHome(tempAbsPath, 3); + Path localWorkspace = + Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath()); + + appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath(); + System.setProperty(ConfigConst.KEY_APP_HOME(), appHome); + System.setProperty( + CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(), + localWorkspace.toAbsolutePath().toString()); + + LOG.info( + "Complete mock EnvInitializer init, app home: {}, {}: {}", + appHome, + CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(), + localWorkspace.toAbsolutePath()); + } + + private static void prepareDefaultFlinkHome(String tempAbsPath, int reties) throws IOException { + reties--; + if (reties < 0) { + LOG.error("Failed to prepareDefaultFlinkHome with multiple-retires."); + return; + } + try { + final File flinkOfficialReleases = + new File(tempAbsPath, DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME); + Files.deleteIfExists(flinkOfficialReleases.toPath()); + Files.createDirectories(flinkOfficialReleases.toPath()); + + defaultFlinkHome = + prepareFlinkOfficialRelease( + DEFAULT_FLINK_DOWNLOAD_URL, + DEFAULT_FLINK_VERSION, + flinkOfficialReleases.getAbsolutePath()); + } catch (Throwable t) { + LOG.error("Error in prepareDefaultFlinkHome with exception", t); + LOG.warn("Start the next retry."); + prepareDefaultFlinkHome(tempAbsPath, reties); + } + } + + /** + * @param httpUrl flink official release download url. + * @return return the target un-packed flink home absolute dir. + */ + private static String prepareFlinkOfficialRelease( + @Nonnull String httpUrl, @Nonnull String flinkVersion, String workDirAbsolutePath) + throws IOException { + String downloadedFilePath = + new File(workDirAbsolutePath, flinkVersion).getAbsolutePath() + ".tgz"; + httpDownload(httpUrl, downloadedFilePath); + File archive = new File(downloadedFilePath); + File destination = new File(archive.getParentFile().getAbsolutePath()); + Files.createDirectories(destination.toPath()); + + Archiver archiver = ArchiverFactory.createArchiver("tar", "gz"); + archiver.extract(archive, destination); + Optional first = + Arrays.stream( + requireNonNull( + destination.listFiles( + file -> file.getName().contains(flinkVersion) && file.isDirectory()))) + .findFirst(); + File file = + first.orElseThrow( + () -> + new RuntimeException( + String.format( + "Error in prepareFlinkOfficialRelease for httpUrl: %s, flinkVersion: %s", + httpUrl, flinkVersion))); + + LOG.info("Prepared flink release: {}.", file.getAbsolutePath()); + return file.getAbsolutePath(); + } + + private static void httpDownload(String httpUrl, String saveFile) { + + try { + URL url = new URL(httpUrl); + URLConnection conn = url.openConnection(); + InputStream inStream = conn.getInputStream(); + FileOutputStream fs = new FileOutputStream(saveFile); + IOUtils.copyBytes(inStream, fs, 2048); + inStream.close(); + fs.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static File tryFindStreamParkPackagedDirFile() { + String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir")); + File pkgTargetDirFile = new File(userDir, "target"); + Preconditions.checkState( + pkgTargetDirFile.exists(), + "The target directory of %s doesn't exist. %s", + userDir, + RUN_PKG_SCRIPT_HINT); + Optional availablePkgParentFileOpt = + Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst(); + final File availablePkgParentFile = + availablePkgParentFileOpt.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT)); + Optional targetDirFile = + Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER))) + .findFirst(); + return targetDirFile.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT)); + } +} diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java similarity index 95% rename from streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java rename to streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java index 417a117cee..490df51c00 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java @@ -24,6 +24,7 @@ import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.YarnQueue; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -33,10 +34,10 @@ import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.transaction.annotation.Transactional; import java.io.File; import java.io.IOException; @@ -44,7 +45,8 @@ import java.nio.file.Path; /** base tester. */ -@Transactional +@Slf4j +@EnableScheduling @ActiveProfiles("test") @AutoConfigureTestEntityManager @AutoConfigureWebTestClient(timeout = "60000") @@ -53,9 +55,9 @@ @SpringBootTest( classes = StreamParkConsoleBootstrap.class, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) -public abstract class SpringTestBase { +public abstract class SpringUnitTestBase { - protected static final Logger LOG = LoggerFactory.getLogger(SpringTestBase.class); + protected static final Logger LOG = LoggerFactory.getLogger(SpringUnitTestBase.class); @BeforeAll public static void init(@TempDir File tempPath) throws IOException { diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java index f97823ce5f..f20d7fb801 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/AccessTokenServiceTest.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.util.WebUtils; @@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -public class AccessTokenServiceTest extends SpringTestBase { +public class AccessTokenServiceTest extends SpringUnitTestBase { @Autowired private AccessTokenService accessTokenService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java new file mode 100644 index 0000000000..9effabca0e --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceITest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.common.enums.ExecutionMode; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.console.SpringIntegrationTestBase; +import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.entity.FlinkCluster; +import org.apache.streampark.console.core.entity.FlinkEnv; +import org.apache.streampark.console.core.entity.FlinkSql; +import org.apache.streampark.console.core.enums.FlinkAppState; +import org.apache.streampark.console.core.enums.ReleaseState; +import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl; +import org.apache.streampark.console.core.task.FlinkHttpWatcher; +import org.apache.streampark.testcontainer.flink.FlinkStandaloneSessionCluster; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Base64; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.streampark.console.core.task.FlinkHttpWatcher.WATCHING_INTERVAL; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for {@link + * org.apache.streampark.console.core.service.impl.ApplicationServiceImpl}. + */ +class ApplicationServiceITest extends SpringIntegrationTestBase { + + static FlinkStandaloneSessionCluster cluster = + FlinkStandaloneSessionCluster.builder().slotsNumPerTm(4).slf4jLogConsumer(null).build(); + + @Autowired private ApplicationService appService; + + @Autowired private FlinkClusterService clusterService; + + @Autowired private FlinkEnvService envService; + + @Autowired private AppBuildPipeService appBuildPipeService; + + @Autowired private FlinkSqlService sqlService; + + @Autowired private FlinkHttpWatcher flinkHttpWatcher; + + @BeforeAll + static void setup() { + cluster.start(); + } + + @AfterAll + static void teardown() { + cluster.stop(); + } + + @AfterEach + void clear() { + appService.getBaseMapper().delete(new QueryWrapper<>()); + clusterService.getBaseMapper().delete(new QueryWrapper<>()); + envService.getBaseMapper().delete(new QueryWrapper<>()); + appBuildPipeService.getBaseMapper().delete(new QueryWrapper<>()); + sqlService.getBaseMapper().delete(new QueryWrapper<>()); + } + + @Test + @Timeout(value = 180) + void testStartAppOnRemoteSessionMode() throws Exception { + FlinkEnv flinkEnv = new FlinkEnv(); + flinkEnv.setFlinkHome(defaultFlinkHome); + flinkEnv.setFlinkName(DEFAULT_FLINK_VERSION); + flinkEnv.setId(1L); + envService.create(flinkEnv); + FlinkCluster flinkCluster = new FlinkCluster(); + flinkCluster.setId(1L); + flinkCluster.setAddress(cluster.getFlinkJobManagerUrl()); + flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode()); + flinkCluster.setClusterName("docker-Cluster-1.17.1"); + flinkCluster.setVersionId(1L); + flinkCluster.setUserId(100000L); + ((FlinkClusterServiceImpl) clusterService).internalCreate(flinkCluster); + Application appParam = new Application(); + appParam.setId(100000L); + appParam.setTeamId(100000L); + Application application = appService.getApp(appParam); + application.setFlinkClusterId(1L); + application.setSqlId(100000L); + application.setVersionId(1L); + application.setExecutionMode(ExecutionMode.REMOTE.getMode()); + + // Avoid exceptional error. + application.setFlinkSql( + new String(Base64.getDecoder().decode(application.getFlinkSql().getBytes()))); + FlinkSql flinkSql = sqlService.getEffective(application.getId(), false); + flinkSql.setSql(DeflaterUtils.zipString(flinkSql.getSql())); + sqlService.getBaseMapper().updateById(flinkSql); + + // Continue operations link. + appService.update(application); + appBuildPipeService.buildApplication(100000L, false); + + CompletableFuture buildCompletableFuture = + CompletableFuture.supplyAsync( + () -> { + while (true) { + Application app = appService.getById(100000L); + if (app != null && app.getReleaseState() == ReleaseState.DONE) { + break; + } + } + return true; + }); + buildCompletableFuture.get(); + + appService.start(appService.getById(100000L), false); + CompletableFuture completableFuture = + CompletableFuture.supplyAsync( + () -> { + while (true) { + if (flinkHttpWatcher.tryQueryFlinkAppState(application.getId()) + == FlinkAppState.RUNNING) { + break; + } + } + return true; + }); + + assertThat(completableFuture.get(WATCHING_INTERVAL.toMillis() * 8, TimeUnit.MILLISECONDS)) + .isEqualTo(true); + } +} diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java index ea5c345602..f6ce39a54a 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.common.enums.ExecutionMode; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.YarnQueue; import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl; @@ -34,8 +34,8 @@ import static org.assertj.core.api.Assertions.assertThat; -/** org.apache.streampark.console.core.service.ApplicationServiceTest. */ -class ApplicationServiceTest extends SpringTestBase { +/** org.apache.streampark.console.core.service.ApplicationServiceUnitTest. */ +class ApplicationServiceTest extends SpringUnitTestBase { @Autowired private ApplicationService applicationService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java index 6073556e81..c1fef933cf 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkClusterServiceTest.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.common.enums.ExecutionMode; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.entity.YarnQueue; import org.apache.streampark.console.core.service.impl.FlinkClusterServiceImpl; @@ -31,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** The unit test class for {@link FlinkClusterService}. */ -class FlinkClusterServiceTest extends SpringTestBase { +class FlinkClusterServiceTest extends SpringUnitTestBase { @Autowired private FlinkClusterService flinkClusterService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java index e6c00ec717..0426153687 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.hc.core5.http.ContentType; @@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** org.apache.streampark.console.core.service.ResourceServiceTest. */ -class ResourceServiceTest extends SpringTestBase { +class ResourceServiceTest extends SpringUnitTestBase { @Autowired private ResourceService resourceService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java index 53133143b7..93968dd6f1 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java @@ -21,7 +21,7 @@ import org.apache.streampark.common.enums.DevelopmentMode; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ApplicationConfig; import org.apache.streampark.console.core.entity.Effective; @@ -46,7 +46,7 @@ * org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of {@link * SavePointService}. */ -class SavePointServiceTest extends SpringTestBase { +class SavePointServiceTest extends SpringUnitTestBase { @Autowired private SavePointService savePointService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java index b3157193ed..1c05924639 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/UserServiceTest.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.Resource; @@ -27,16 +27,17 @@ import org.apache.streampark.console.system.entity.User; import org.apache.streampark.console.system.service.UserService; -import com.baomidou.mybatisplus.extension.toolkit.Db; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; import java.util.Collections; import java.util.Map; /** org.apache.streampark.console.core.service.UserServiceTest. */ -class UserServiceTest extends SpringTestBase { +@Transactional +class UserServiceTest extends SpringUnitTestBase { @Autowired private UserService userService; @Autowired private ApplicationService applicationService; @Autowired private ResourceService resourceService; @@ -50,7 +51,7 @@ void testLockUser() throws Exception { user.setPassword("test"); user.setUserType(UserType.USER); user.setStatus(User.STATUS_VALID); - Db.save(user); + userService.createUser(user); // lock user user.setStatus(User.STATUS_LOCK); Map data = @@ -74,7 +75,7 @@ void testLockUser() throws Exception { resource.setEngineType(EngineType.FLINK); resource.setTeamId(1L); resource.setCreatorId(user.getUserId()); - Db.save(resource); + resourceService.save(resource); // lock user when has resource user.setStatus(User.STATUS_LOCK); Map data2 = @@ -93,7 +94,7 @@ void testTransferResource() { user.setPassword("test"); user.setUserType(UserType.USER); user.setStatus(User.STATUS_VALID); - Db.save(user); + userService.save(user); Resource resource = new Resource(); resource.setResourceName("test"); @@ -101,12 +102,12 @@ void testTransferResource() { resource.setEngineType(EngineType.FLINK); resource.setTeamId(1L); resource.setCreatorId(user.getUserId()); - Db.save(resource); + resourceService.save(resource); Application app = new Application(); app.setUserId(user.getUserId()); app.setTeamId(1L); - Db.save(app); + applicationService.save(app); User targetUser = new User(); targetUser.setUsername("test0"); @@ -114,7 +115,7 @@ void testTransferResource() { targetUser.setPassword("test0"); targetUser.setUserType(UserType.USER); targetUser.setStatus(User.STATUS_VALID); - Db.save(targetUser); + userService.save(targetUser); Assertions.assertTrue(applicationService.existsByUserId(user.getUserId())); Assertions.assertTrue(resourceService.existsByUserId(user.getUserId())); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java index d1c0436d48..8ac9859486 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/VariableServiceTest.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.core.entity.Variable; import org.junit.jupiter.api.Assertions; @@ -25,7 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired; /** org.apache.streampark.console.core.service.VariableServiceTest */ -class VariableServiceTest extends SpringTestBase { +class VariableServiceTest extends SpringUnitTestBase { @Autowired private VariableService variableService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java index 317ff4b2af..9a91d7b68a 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/YarnQueueServiceTest.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.common.enums.ExecutionMode; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.core.bean.ResponseResult; @@ -47,7 +47,7 @@ * avoid noisy data form h2 database. */ @Execution(SAME_THREAD) -class YarnQueueServiceTest extends SpringTestBase { +class YarnQueueServiceTest extends SpringUnitTestBase { @Autowired private FlinkClusterService flinkClusterService; @@ -91,7 +91,7 @@ void testFindYarnQueues() { yarnQueues.getRecords().stream() .map(YarnQueue::getQueueLabel) .collect(Collectors.toList())) - .containsExactly(q3AtL3, q3AtL1); + .containsExactlyInAnyOrder(q3AtL3, q3AtL1); // Test for 1st page, size = 2, order by create time with queue_label queryParams.setQueueLabel("q3"); @@ -101,7 +101,7 @@ void testFindYarnQueues() { yarnQueuesWithQueueLabelLikeQuery.getRecords().stream() .map(YarnQueue::getQueueLabel) .collect(Collectors.toList())) - .containsExactly(q3AtL3, q3AtL1); + .containsExactlyInAnyOrder(q3AtL3, q3AtL1); } @Test diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java index 4acc63e8bd..e3501841f5 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/system/authentication/JWTTest.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.system.authentication; import org.apache.streampark.common.util.DateUtils; -import org.apache.streampark.console.SpringTestBase; +import org.apache.streampark.console.SpringUnitTestBase; import org.apache.streampark.console.system.entity.AccessToken; import com.auth0.jwt.JWT; @@ -28,7 +28,7 @@ import java.util.Date; import java.util.TimeZone; -class JWTTest extends SpringTestBase { +class JWTTest extends SpringUnitTestBase { @Test void testExpireTime() { diff --git a/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml new file mode 100644 index 0000000000..aa6f18449e --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +logging: + level: + root: info + +spring: + datasource: + driver-class-name: org.h2.Driver + url: jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript from 'classpath:db/schema-h2.sql' + username: sa + password: sa + sql: + init: + data-locations: classpath:db/data-h2.sql + continue-on-error: true + username: sa + password: sa + mode: always +server: + port: 6666 + +streampark: + workspace: + local: /tmp + # remote: hdfs://hdfscluster/streampark + # hadoop-user-name: root diff --git a/streampark-test-utils/pom.xml b/streampark-test-utils/pom.xml new file mode 100644 index 0000000000..90c3b252fb --- /dev/null +++ b/streampark-test-utils/pom.xml @@ -0,0 +1,40 @@ + + + + 4.0.0 + + org.apache.streampark + streampark + 2.2.0-SNAPSHOT + + + streampark-test-utils + pom + + StreamPark : Test Utils + http://maven.apache.org + + streampark-testcontainer + + + + UTF-8 + + + diff --git a/streampark-test-utils/streampark-testcontainer/pom.xml b/streampark-test-utils/streampark-testcontainer/pom.xml new file mode 100644 index 0000000000..668f826a9f --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/pom.xml @@ -0,0 +1,105 @@ + + + + 4.0.0 + + org.apache.streampark + streampark-test-utils + 2.2.0-SNAPSHOT + + + streampark-testcontainer + jar + + StreamPark : Test Container + http://maven.apache.org + + + UTF-8 + 1.6.21 + 3.0.2 + 1.16.2 + + + + + + org.testcontainers + testcontainers-bom + ${testcontainer.version} + pom + import + + + + + + + org.testcontainers + testcontainers + + + org.apache.httpcomponents.client5 + httpclient5 + ${httpclient5.version} + test + + + org.jetbrains.kotlin + kotlin-reflect + ${kotlin-reflect.version} + + + org.assertj + assertj-core + test + + + org.junit.jupiter + junit-jupiter + ${jupiter.version} + test + + + org.apache.streampark + streampark-common_2.12 + ${project.version} + + + com.google.code.findbugs + jsr305 + ${findbugs.version} + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java new file mode 100644 index 0000000000..13a62e01fc --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import javax.annotation.Nonnull; + +enum FlinkComponent { + JOBMANAGER("jobmanager"), + TASKMANAGER("taskmanager"); + + private final String name; + + FlinkComponent(@Nonnull String name) { + this.name = name; + } + + @Nonnull + public String getName() { + return name; + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java new file mode 100644 index 0000000000..d9814f0d22 --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER; +import static org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER; + +class FlinkContainer extends GenericContainer { + + public static final AtomicInteger TM_COUNT = new AtomicInteger(0); + + public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES"; + + private final @Nonnull FlinkComponent component; + + FlinkContainer( + @Nonnull DockerImageName dockerImageName, + @Nonnull FlinkComponent component, + @Nonnull Network network, + @Nonnull String yamlPropStr, + @Nullable Slf4jLogConsumer slf4jLogConsumer) { + super(dockerImageName); + this.component = component; + this.withCommand("/docker-entrypoint.sh", component.getName()); + this.withCreateContainerCmdModifier( + createContainerCmd -> createContainerCmd.withName(getFlinkContainerName())); + this.withNetwork(network); + this.withEnv(FLINK_PROPS_KEY, yamlPropStr); + Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer); + } + + protected String getFlinkContainerName() { + if (component == JOBMANAGER) { + return JOBMANAGER.getName(); + } + return String.format("%s_%s", TASKMANAGER.getName(), TM_COUNT.incrementAndGet()); + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java new file mode 100644 index 0000000000..e5f01dea6f --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import org.apache.streampark.common.util.Utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER; +import static org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER; + +/** + * Class to start a couple of flink 1-jobmanager & n-taskmanagers. The priority of flinkYamlConfStr + * is the highest. But: The 'jobmanager.rpc.address' is always 'jobmanager'. The 'rest.port' always + * is 8081. + */ +public class FlinkStandaloneSessionCluster implements Startable { + + public static final Logger LOG = LoggerFactory.getLogger(FlinkStandaloneSessionCluster.class); + + public static final Network NETWORK = Network.newNetwork(); + + public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address"; + public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots"; + public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", SLOT_CONF_KEY); + + public static final int BLOB_SERVER_PORT = 6123; + public static final int WEB_PORT = 8081; + + private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName()); + + private final FlinkContainer jobManagerContainer; + + private final List taskManagerContainers = new ArrayList<>(); + + private FlinkStandaloneSessionCluster( + DockerImageName dockerImageName, + int taskManagerNum, + int slotsNumPerTm, + @Nullable String yamlConfStr, + Slf4jLogConsumer slf4jLogConsumer) { + + renderJmRpcConfIfNeeded(yamlConfStr); + + renderSlotNumIfNeeded(slotsNumPerTm); + + // Set for JM. + this.jobManagerContainer = + new FlinkContainer( + dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer); + this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT); + this.jobManagerContainer.addExposedPort(WEB_PORT); + + this.jobManagerContainer.setWaitStrategy( + Wait.forHttp("/config") + .forStatusCode(200) + .forPort(WEB_PORT) + .withStartupTimeout(Duration.ofMinutes(8))); + + // Set for TMs. + for (int i = 0; i < taskManagerNum; i++) { + FlinkContainer taskManager = + new FlinkContainer( + dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer); + this.taskManagerContainers.add(taskManager); + } + } + + public String getFlinkJobManagerUrl() { + return String.format( + "http://%s:%s", jobManagerContainer.getHost(), jobManagerContainer.getMappedPort(WEB_PORT)); + } + + @Override + public void start() { + Utils.notNull(jobManagerContainer); + jobManagerContainer.start(); + Utils.notNull(taskManagerContainers); + for (FlinkContainer taskManagerContainer : taskManagerContainers) { + taskManagerContainer.start(); + } + } + + @Override + public void stop() { + Utils.notNull(taskManagerContainers); + for (FlinkContainer taskManagerContainer : taskManagerContainers) { + taskManagerContainer.stop(); + } + Utils.notNull(jobManagerContainer); + jobManagerContainer.stop(); + } + + private void renderSlotNumIfNeeded(int slotsNumPerTm) { + if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) { + this.yamlConfStr = + String.format( + "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, slotsNumPerTm)); + } + } + + private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) { + this.yamlConfStr = + yamlConfStr == null + ? this.yamlConfStr + : (yamlConfStr.contains(JM_RPC_ADDR_KEY) + ? yamlConfStr + : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr)); + } + + public static class Builder { + + private DockerImageName dockerImageName = + DockerImageName.parse("apache/flink:1.17.1-scala_2.12"); + private int taskManagerNum = 1; + private int slotsNumPerTm = 8; + private @Nullable String yamlConfStr; + private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, false); + + private Builder() {} + + public Builder dockerImageName(DockerImageName dockerImageName) { + this.dockerImageName = dockerImageName; + return this; + } + + public Builder taskManagerNum(int taskManagerNum) { + Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1."); + this.taskManagerNum = taskManagerNum; + return this; + } + + public Builder slotsNumPerTm(int slotsNumPerTm) { + Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0."); + this.slotsNumPerTm = slotsNumPerTm; + return this; + } + + public Builder yamlConfStr(@Nullable String yamlConfStr) { + this.yamlConfStr = yamlConfStr; + return this; + } + + public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) { + this.slf4jLogConsumer = slf4jLogConsumer; + return this; + } + + public FlinkStandaloneSessionCluster build() { + return new FlinkStandaloneSessionCluster( + dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, slf4jLogConsumer); + } + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java new file mode 100644 index 0000000000..93e34cd43b --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/hadoop/HadoopContainer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.hadoop; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitAllStrategy; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nonnull; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** Hadoop Container for integration test. Note: It's experimental now. */ +public class HadoopContainer extends GenericContainer { + + public static final Logger LOG = LoggerFactory.getLogger(HadoopContainer.class); + + // Hadoop version is 2.7.0 + public static final DockerImageName DOCKER_IMAGE_NAME = + DockerImageName.parse("sequenceiq/hadoop-docker:latest"); + + public static final Map MAPPED_PORTS = + new HashMap() { + { + put(50070, 50070); + put(8088, 8088); + put(9000, 9000); + } + }; + + public HadoopContainer() { + this(DOCKER_IMAGE_NAME); + } + + public HadoopContainer(@Nonnull DockerImageName dockerImageName) { + super(dockerImageName); + MAPPED_PORTS.forEach(this::addFixedExposedPort); + this.setPrivilegedMode(true); + this.withCreateContainerCmdModifier( + createContainerCmd -> createContainerCmd.withName("one-container-hadoop-cluster")); + WaitAllStrategy waitAllStrategy = + new WaitAllStrategy() + .withStrategy( + Wait.forHttp("/ws/v1/cluster/info") + .forPort(8088) + .forStatusCode(200) + .withStartupTimeout(Duration.ofMinutes(8))) + .withStrategy( + Wait.forHttp("") + .forPort(50070) + .forStatusCode(200) + .withStartupTimeout(Duration.ofMinutes(8))) + .withStrategy(Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(8))) + .withStartupTimeout(Duration.ofMinutes(8)); + this.waitingFor(waitAllStrategy); + this.withLogConsumer(new Slf4jLogConsumer(LOG)); + this.withCommand("/etc/bootstrap.sh", "-d"); + } + + @Override + public void start() { + super.start(); + this.waitUntilContainerStarted(); + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java new file mode 100644 index 0000000000..6f34e720f7 --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for flink standalone session cluster env available. */ +class FlinkStandaloneSessionClusterITest { + + private final FlinkStandaloneSessionCluster cluster = + FlinkStandaloneSessionCluster.builder().build(); + + @BeforeEach + void up() { + cluster.start(); + } + + @AfterEach + void down() { + cluster.stop(); + } + + @Test + void testRestApiAvailable() throws IOException { + String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), "config"); + CloseableHttpClient httpClient = HttpClients.createDefault(); + CloseableHttpResponse response = httpClient.execute(new HttpGet(url)); + assertThat(response.getCode()).isEqualTo(200); + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java new file mode 100644 index 0000000000..961d76c433 --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/hadoop/HadoopContainerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.hadoop; + +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link HadoopContainer}. */ +class HadoopContainerTest { + static final HadoopContainer HADOOP_CONTAINER = new HadoopContainer(); + + @BeforeAll + static void setup() { + HADOOP_CONTAINER.start(); + } + + @AfterAll + static void teardown() { + HADOOP_CONTAINER.stop(); + } + + @Test + @Timeout(value = 8, unit = TimeUnit.MINUTES) + void testOverview() throws IOException { + + String url = String.format("http://%s:%s/ws/v1/cluster/info", HADOOP_CONTAINER.getHost(), 8088); + CloseableHttpClient httpClient = HttpClients.createDefault(); + CloseableHttpResponse response = httpClient.execute(new HttpGet(url)); + assertThat(response.getCode()).isEqualTo(200); + + url = String.format("http://%s:%s", HADOOP_CONTAINER.getHost(), 50070); + httpClient = HttpClients.createDefault(); + response = httpClient.execute(new HttpGet(url)); + assertThat(response.getCode()).isEqualTo(200); + } +}