From 6538b40f5bf0861b09cbc548d4a914a8758313af Mon Sep 17 00:00:00 2001 From: Roc Marshal Date: Sun, 16 Jul 2023 22:22:39 +0800 Subject: [PATCH] Add testcontainer utils for test. --- pom.xml | 1 + streampark-test-utils/pom.xml | 40 ++++ .../streampark-testcontainer/pom.xml | 104 ++++++++++ .../testcontainer/flink/FlinkComponent.java | 36 ++++ .../testcontainer/flink/FlinkContainer.java | 69 +++++++ .../flink/FlinkStandaloneSessionCluster.java | 184 ++++++++++++++++++ .../FlinkStandaloneSessionClusterITest.java | 39 ++++ 7 files changed, 473 insertions(+) create mode 100644 streampark-test-utils/pom.xml create mode 100644 streampark-test-utils/streampark-testcontainer/pom.xml create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java create mode 100644 streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java diff --git a/pom.xml b/pom.xml index f5165c7809..2e5b3cce7e 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ streampark-spark streampark-storage streampark-console + streampark-test-utils diff --git a/streampark-test-utils/pom.xml b/streampark-test-utils/pom.xml new file mode 100644 index 0000000000..06566d14bf --- /dev/null +++ b/streampark-test-utils/pom.xml @@ -0,0 +1,40 @@ + + + + 4.0.0 + + org.apache.streampark + streampark + 2.2.0-SNAPSHOT + + + streampark-test-utils + pom + + streampark-test-utils + http://maven.apache.org + + streampark-testcontainer + + + + UTF-8 + + + diff --git a/streampark-test-utils/streampark-testcontainer/pom.xml b/streampark-test-utils/streampark-testcontainer/pom.xml new file mode 100644 index 0000000000..b53ae209db --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/pom.xml @@ -0,0 +1,104 @@ + + + + 4.0.0 + + org.apache.streampark + streampark-test-utils + 2.2.0-SNAPSHOT + + + streampark-testcontainer + jar + + streampark-testcontainer + http://maven.apache.org + + + UTF-8 + 1.6.21 + 3.0.2 + + + + + + org.testcontainers + testcontainers-bom + 1.16.2 + pom + import + + + + + + + org.testcontainers + testcontainers + + + org.apache.httpcomponents.client5 + httpclient5 + ${httpclient5.version} + test + + + org.jetbrains.kotlin + kotlin-reflect + ${kotlin-reflect.version} + + + org.assertj + assertj-core + test + + + org.junit.jupiter + junit-jupiter + ${jupiter.version} + test + + + org.apache.streampark + streampark-common_2.12 + ${project.version} + + + com.google.code.findbugs + jsr305 + ${findbugs.version} + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java new file mode 100644 index 0000000000..13a62e01fc --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkComponent.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import javax.annotation.Nonnull; + +enum FlinkComponent { + JOBMANAGER("jobmanager"), + TASKMANAGER("taskmanager"); + + private final String name; + + FlinkComponent(@Nonnull String name) { + this.name = name; + } + + @Nonnull + public String getName() { + return name; + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java new file mode 100644 index 0000000000..c4e9af0d15 --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER; +import static org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER; + +class FlinkContainer extends GenericContainer { + + public static final AtomicInteger TM_COUNT = new AtomicInteger(0); + + public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES"; + + private final @Nonnull FlinkComponent component; + + FlinkContainer( + @Nonnull DockerImageName dockerImageName, + @Nonnull FlinkComponent component, + @Nonnull Network network, + @Nonnull String yamlPropStr, + @Nullable Slf4jLogConsumer slf4jLogConsumer) { + super(dockerImageName); + this.component = component; + this.withCommand("/docker-entrypoint.sh", component.getName()); + this.withCreateContainerCmdModifier( + createContainerCmd -> createContainerCmd.withName(getFlinkContainerName())); + this.withNetwork(network); + this.withEnv(FLINK_PROPS_KEY, yamlPropStr); + Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer); + } + + protected String getFlinkContainerName() { + if (component == JOBMANAGER) { + return JOBMANAGER.getName(); + } + return String.format("%s_%s", TASKMANAGER.getName(), TM_COUNT.incrementAndGet()); + } + + public void addRequiredFixedExposedPorts(int hostPort, int containerPort) { + + this.addFixedExposedPort(hostPort, containerPort); + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java new file mode 100644 index 0000000000..97f4a137ec --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.testcontainer.flink; + +import org.apache.streampark.common.util.Utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.streampark.testcontainer.flink.FlinkComponent.JOBMANAGER; +import static org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAGER; + +/** + * Class to start a couple of flink 1-jobmanager & n-taskmanagers. The priority of flinkYamlConfStr + * is the highest. But: The 'jobmanager.rpc.address' is always 'jobmanager'. The 'rest.port' always + * is 8081. + */ +public class FlinkStandaloneSessionCluster implements Startable { + + public static final Logger LOG = LoggerFactory.getLogger(FlinkStandaloneSessionCluster.class); + + public static final Network NETWORK = Network.newNetwork(); + + public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address"; + public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots"; + public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", SLOT_CONF_KEY); + + public static final int BLOB_SERVER_PORT = 6123; + public static final int WEB_PORT = 8081; + + private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName()); + + private final FlinkContainer jobManagerContainer; + + private final List taskManagerContainers = new ArrayList<>(); + + private FlinkStandaloneSessionCluster( + DockerImageName dockerImageName, + int taskManagerNum, + int slotsNumPerTm, + @Nullable String yamlConfStr, + Slf4jLogConsumer slf4jLogConsumer) { + + renderJmRpcConfIfNeeded(yamlConfStr); + + renderSlotNumIfNeeded(slotsNumPerTm); + + // Set for JM. + this.jobManagerContainer = + new FlinkContainer( + dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer); + this.jobManagerContainer.addRequiredFixedExposedPorts(BLOB_SERVER_PORT, BLOB_SERVER_PORT); + this.jobManagerContainer.addRequiredFixedExposedPorts(WEB_PORT, WEB_PORT); + + this.jobManagerContainer.setWaitStrategy( + Wait.forHttp("/config") + .forStatusCode(200) + .forPort(WEB_PORT) + .withStartupTimeout(Duration.ofMinutes(3))); + + // Set for TMs. + for (int i = 0; i < taskManagerNum; i++) { + FlinkContainer taskManager = + new FlinkContainer( + dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer); + this.taskManagerContainers.add(taskManager); + } + } + + public String getFlinkJobManagerUrl() { + return String.format("http://%s:%s", jobManagerContainer.getHost(), WEB_PORT); + } + + @Override + public void start() { + Utils.notNull(jobManagerContainer); + jobManagerContainer.start(); + Utils.notNull(taskManagerContainers); + for (FlinkContainer taskManagerContainer : taskManagerContainers) { + taskManagerContainer.start(); + } + } + + @Override + public void stop() { + Utils.notNull(taskManagerContainers); + for (FlinkContainer taskManagerContainer : taskManagerContainers) { + taskManagerContainer.stop(); + } + Utils.notNull(jobManagerContainer); + jobManagerContainer.stop(); + } + + private void renderSlotNumIfNeeded(int slotsNumPerTm) { + if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) { + this.yamlConfStr = + String.format( + "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, slotsNumPerTm)); + } + } + + private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) { + this.yamlConfStr = + yamlConfStr == null + ? this.yamlConfStr + : (yamlConfStr.contains(JM_RPC_ADDR_KEY) + ? yamlConfStr + : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr)); + } + + public static class Builder { + + private DockerImageName dockerImageName = + DockerImageName.parse("apache/flink:1.17.1-scala_2.12"); + private int taskManagerNum = 1; + private int slotsNumPerTm = 1; + private @Nullable String yamlConfStr; + private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, false); + + private Builder() {} + + public Builder dockerImageName(DockerImageName dockerImageName) { + this.dockerImageName = dockerImageName; + return this; + } + + public Builder taskManagerNum(int taskManagerNum) { + Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1."); + this.taskManagerNum = taskManagerNum; + return this; + } + + public Builder slotsNumPerTm(int slotsNumPerTm) { + Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0."); + this.slotsNumPerTm = slotsNumPerTm; + return this; + } + + public Builder yamlConfStr(@Nullable String yamlConfStr) { + this.yamlConfStr = yamlConfStr; + return this; + } + + public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) { + this.slf4jLogConsumer = slf4jLogConsumer; + return this; + } + + public FlinkStandaloneSessionCluster build() { + return new FlinkStandaloneSessionCluster( + dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, slf4jLogConsumer); + } + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java new file mode 100644 index 0000000000..a1efa0c104 --- /dev/null +++ b/streampark-test-utils/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java @@ -0,0 +1,39 @@ +package org.apache.streampark.testcontainer.flink; + +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for flink standalone session cluster env available. */ +class FlinkStandaloneSessionClusterITest { + + private final FlinkStandaloneSessionCluster cluster = + FlinkStandaloneSessionCluster.builder().build(); + + @BeforeEach + void up() { + cluster.start(); + } + + @AfterEach + void down() { + cluster.stop(); + } + + @Test + void testRestApiAvailable() throws IOException { + String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), "config"); + CloseableHttpClient httpClient = HttpClients.createDefault(); + CloseableHttpResponse response = httpClient.execute(new HttpGet(url)); + assertThat(response.getCode()).isEqualTo(200); + } +}