Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Add testcontainer utils for test. #2862

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
distribution: "adopt"
cache: "maven"
- name: Build with Maven
run: ./mvnw -B clean install -Pshaded -DskipTests
run: ./mvnw -B clean install -Pshaded,dist -DskipTests
- name: Test with Maven
run: ./mvnw -B test

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<module>streampark-spark</module>
<module>streampark-storage</module>
<module>streampark-console</module>
<module>streampark-test-utils</module>
</modules>

<properties>
Expand Down
14 changes: 14 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.rauschig</groupId>
<artifactId>jarchivelib</artifactId>
<version>0.7.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-testcontainer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!--Test dependencies end.-->

<!--log4j -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<assembly>
<id>bin</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,7 @@ public void start(Application appParam, boolean auto) throws Exception {
}

private Map<String, Object> getProperties(Application application) {
Map<String, Object> properties = application.getOptionMap();
Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public ResponseResult check(FlinkCluster cluster) {
@Override
public Boolean create(FlinkCluster flinkCluster) {
flinkCluster.setUserId(commonService.getUserId());
return internalCreate(flinkCluster);
}

@VisibleForTesting
public boolean internalCreate(FlinkCluster flinkCluster) {
boolean successful = validateQueueIfNeeded(flinkCluster);
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT, flinkCluster.getYarnQueue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.streampark.console.core.service.alert.AlertService;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.client5.http.config.RequestConfig;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
Expand All @@ -49,6 +50,8 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

Expand Down Expand Up @@ -84,7 +87,7 @@ public class FlinkHttpWatcher {
@Autowired private FlinkClusterWatcher flinkClusterWatcher;

// track interval every 5 seconds
private static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);

// option interval within 10 seconds
private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
Expand Down Expand Up @@ -197,6 +200,12 @@ public void start() {
}
}

@VisibleForTesting
public @Nullable FlinkAppState tryQueryFlinkAppState(@Nonnull Long appId) {
Application app = WATCHING_APPS.get(appId);
return (app == null || app.getState() == null) ? null : FlinkAppState.of(app.getState());
}

private void watch(Long id, Application application) {
EXECUTOR.execute(
() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console;

import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.util.SystemPropertyUtils;

import org.apache.commons.io.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.io.IOUtils;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rauschig.jarchivelib.Archiver;
import org.rauschig.jarchivelib.ArchiverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* Integration base tester. Note: The all children classes of the base must run after the
* project-level package phrase.
*/
@Slf4j
@EnableScheduling
@ActiveProfiles("integration-test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
@TestPropertySource(locations = {"classpath:application-integration-test.yml"})
@ExtendWith({MockitoExtension.class, SpringExtension.class})
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public abstract class SpringIntegrationTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(SpringIntegrationTestBase.class);

protected static final String RUN_PKG_SCRIPT_HINT =
"Please run package script before running the test case.";

protected static final String DEFAULT_APP_HOME_DIR_NAME = "apache-streampark";
protected static final String DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME =
"flink-official-releases";
protected static final String DEFAULT_LOCAL_WORKSPACE_DIR_NAME = "localWorkspace";
protected static final String DEFAULT_FLINK_VERSION = "1.17.1";
protected static final String DEFAULT_FLINK_DOWNLOAD_URL =
"https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz";
protected static final FileFilter PKG_NAME_FILTER =
file -> file.getName().startsWith(DEFAULT_APP_HOME_DIR_NAME) && file.isDirectory();

protected static String defaultFlinkHome;
protected static String appHome;

@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {

LOG.info("Start prepare the real running env.");
String tempAbsPath = tempPath.getAbsolutePath();
LOG.info("Integration test base tmp dir: {}", tempAbsPath);

FileUtils.copyDirectory(
tryFindStreamParkPackagedDirFile(), new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME));

prepareDefaultFlinkHome(tempAbsPath, 3);
Path localWorkspace =
Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());

appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
System.setProperty(
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath().toString());

LOG.info(
"Complete mock EnvInitializer init, app home: {}, {}: {}",
appHome,
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath());
}

private static void prepareDefaultFlinkHome(String tempAbsPath, int reties) throws IOException {
reties--;
if (reties < 0) {
LOG.error("Failed to prepareDefaultFlinkHome with multiple-retires.");
return;
}
try {
final File flinkOfficialReleases =
new File(tempAbsPath, DEFAULT_FLINK_OFFICIAL_RELEASES_DIR_NAME);
Files.deleteIfExists(flinkOfficialReleases.toPath());
Files.createDirectories(flinkOfficialReleases.toPath());

defaultFlinkHome =
prepareFlinkOfficialRelease(
DEFAULT_FLINK_DOWNLOAD_URL,
DEFAULT_FLINK_VERSION,
flinkOfficialReleases.getAbsolutePath());
} catch (Throwable t) {
LOG.error("Error in prepareDefaultFlinkHome with exception", t);
LOG.warn("Start the next retry.");
prepareDefaultFlinkHome(tempAbsPath, reties);
}
}

/**
* @param httpUrl flink official release download url.
* @return return the target un-packed flink home absolute dir.
*/
private static String prepareFlinkOfficialRelease(
@Nonnull String httpUrl, @Nonnull String flinkVersion, String workDirAbsolutePath)
throws IOException {
String downloadedFilePath =
new File(workDirAbsolutePath, flinkVersion).getAbsolutePath() + ".tgz";
httpDownload(httpUrl, downloadedFilePath);
File archive = new File(downloadedFilePath);
File destination = new File(archive.getParentFile().getAbsolutePath());
Files.createDirectories(destination.toPath());

Archiver archiver = ArchiverFactory.createArchiver("tar", "gz");
archiver.extract(archive, destination);
Optional<File> first =
Arrays.stream(
requireNonNull(
destination.listFiles(
file -> file.getName().contains(flinkVersion) && file.isDirectory())))
.findFirst();
File file =
first.orElseThrow(
() ->
new RuntimeException(
String.format(
"Error in prepareFlinkOfficialRelease for httpUrl: %s, flinkVersion: %s",
httpUrl, flinkVersion)));

LOG.info("Prepared flink release: {}.", file.getAbsolutePath());
return file.getAbsolutePath();
}

private static void httpDownload(String httpUrl, String saveFile) {

try {
URL url = new URL(httpUrl);
URLConnection conn = url.openConnection();
InputStream inStream = conn.getInputStream();
FileOutputStream fs = new FileOutputStream(saveFile);
IOUtils.copyBytes(inStream, fs, 2048);
inStream.close();
fs.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static File tryFindStreamParkPackagedDirFile() {
String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
File pkgTargetDirFile = new File(userDir, "target");
Preconditions.checkState(
pkgTargetDirFile.exists(),
"The target directory of %s doesn't exist. %s",
userDir,
RUN_PKG_SCRIPT_HINT);
Optional<File> availablePkgParentFileOpt =
Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
final File availablePkgParentFile =
availablePkgParentFileOpt.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
Optional<File> targetDirFile =
Arrays.stream(requireNonNull(availablePkgParentFile.listFiles(PKG_NAME_FILTER)))
.findFirst();
return targetDirFile.orElseThrow(() -> new RuntimeException(RUN_PKG_SCRIPT_HINT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.YarnQueue;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -33,18 +34,19 @@
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.annotation.Transactional;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/** base tester. */
@Transactional
@Slf4j
@EnableScheduling
@ActiveProfiles("test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
Expand All @@ -53,9 +55,9 @@
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public abstract class SpringTestBase {
public abstract class SpringUnitTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(SpringTestBase.class);
protected static final Logger LOG = LoggerFactory.getLogger(SpringUnitTestBase.class);

@BeforeAll
public static void init(@TempDir File tempPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.console.SpringTestBase;
import org.apache.streampark.console.SpringUnitTestBase;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.util.WebUtils;
Expand All @@ -33,7 +33,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class AccessTokenServiceTest extends SpringTestBase {
public class AccessTokenServiceTest extends SpringUnitTestBase {

@Autowired private AccessTokenService accessTokenService;

Expand Down
Loading
Loading