diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java index 61e0faac8..5c7c151ec 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java @@ -19,6 +19,7 @@ import org.apache.doris.flink.container.instance.ContainerService; import org.apache.doris.flink.container.instance.DorisContainer; +import org.apache.doris.flink.container.instance.DorisCustomerContainer; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,8 @@ private static void initDorisContainer() { LOG.info("The doris container has been started and is running status."); return; } - dorisContainerService = new DorisContainer(); + Boolean customerEnv = Boolean.valueOf(System.getProperty("customer_env", "false")); + dorisContainerService = customerEnv ? new DorisCustomerContainer() : new DorisContainer(); dorisContainerService.startContainer(); LOG.info("Doris container was started."); } @@ -74,9 +76,7 @@ protected String getDorisPassword() { } protected String getDorisQueryUrl() { - return String.format( - "jdbc:mysql://%s:%s", - getDorisInstanceHost(), dorisContainerService.getMappedPort(9030)); + return dorisContainerService.getJdbcUrl(); } protected String getDorisInstanceHost() { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java index 6ad1e3cd0..684de5a0a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java @@ -29,6 +29,8 @@ public interface ContainerService { Connection getQueryConnection(); + String getJdbcUrl(); + String getInstanceHost(); Integer getMappedPort(int originalPort); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java index 6af827b8d..ef399d0d4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java @@ -115,6 +115,11 @@ public Connection getQueryConnection() { } } + @Override + public String getJdbcUrl() { + return String.format(JDBC_URL, dorisContainer.getHost()); + } + @Override public String getInstanceHost() { return dorisContainer.getHost(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java new file mode 100644 index 000000000..3d4173035 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.container.instance; + +import org.apache.flink.util.Preconditions; + +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** Using a custom Doris environment */ +public class DorisCustomerContainer implements ContainerService { + private static final Logger LOG = LoggerFactory.getLogger(DorisCustomerContainer.class); + private static final String JDBC_URL = "jdbc:mysql://%s:%s"; + + @Override + public void startContainer() { + LOG.info("Using doris customer containers env."); + checkParams(); + if (!isRunning()) { + throw new DorisRuntimeException( + "Backend is not alive. Please check the doris cluster."); + } + } + + private void checkParams() { + Preconditions.checkArgument( + System.getProperty("doris_host") != null, "doris_host is required."); + Preconditions.checkArgument( + System.getProperty("doris_query_port") != null, "doris_query_port is required."); + Preconditions.checkArgument( + System.getProperty("doris_http_port") != null, "doris_http_port is required."); + Preconditions.checkArgument( + System.getProperty("doris_user") != null, "doris_user is required."); + Preconditions.checkArgument( + System.getProperty("doris_passwd") != null, "doris_passwd is required."); + } + + @Override + public boolean isRunning() { + try (Connection conn = getQueryConnection(); + Statement stmt = conn.createStatement()) { + ResultSet showBackends = stmt.executeQuery("show backends"); + while (showBackends.next()) { + String isAlive = showBackends.getString("Alive").trim(); + if (Boolean.toString(true).equalsIgnoreCase(isAlive)) { + return true; + } + } + } catch (SQLException e) { + LOG.error("Failed to connect doris cluster.", e); + return false; + } + return false; + } + + @Override + public Connection getQueryConnection() { + LOG.info("Try to get query connection from doris."); + String jdbcUrl = + String.format( + JDBC_URL, + System.getProperty("doris_host"), + System.getProperty("doris_query_port")); + try { + return DriverManager.getConnection(jdbcUrl, getUsername(), getPassword()); + } catch (SQLException e) { + LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e); + throw new DorisRuntimeException(e); + } + } + + @Override + public String getJdbcUrl() { + return String.format( + JDBC_URL, System.getProperty("doris_host"), System.getProperty("doris_query_port")); + } + + @Override + public String getInstanceHost() { + return System.getProperty("doris_host"); + } + + @Override + public Integer getMappedPort(int originalPort) { + return originalPort; + } + + @Override + public String getUsername() { + return System.getProperty("doris_user"); + } + + @Override + public String getPassword() { + return System.getProperty("doris_passwd"); + } + + @Override + public String getFenodes() { + return System.getProperty("doris_host") + ":" + System.getProperty("doris_http_port"); + } + + @Override + public String getBenodes() { + return null; + } + + @Override + public void close() {} +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java index 21b30e814..4e50ac64a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java @@ -92,6 +92,11 @@ public Connection getQueryConnection() { } } + @Override + public String getJdbcUrl() { + return mysqlcontainer.getJdbcUrl(); + } + @Override public void close() { LOG.info("Stopping MySQL container."); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 877074edf..80986ea3c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -63,6 +64,7 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV = "tbl_csv"; static final String TABLE_JSON = "tbl_json"; static final String TABLE_JSON_TBL = "tbl_json_tbl"; + static final String TABLE_TBL_AUTO_REDIRECT = "tbl_tbl_auto_redirect"; static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl"; static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS"; static final String TABLE_GROUP_COMMIT = "tbl_group_commit"; @@ -177,8 +179,6 @@ public void testTableSinkJsonFormat() throws Exception { + DorisConfigOptions.IDENTIFIER + "'," + " 'fenodes' = '%s'," - + " 'benodes' = '%s'," - + " 'auto-redirect' = 'false'," + " 'table.identifier' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," @@ -196,7 +196,6 @@ public void testTableSinkJsonFormat() throws Exception { + "'" + ")", getFenodes(), - getBenodes(), DATABASE + "." + TABLE_JSON_TBL, getDorisUsername(), getDorisPassword()); @@ -210,6 +209,52 @@ public void testTableSinkJsonFormat() throws Exception { ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } + @Test + public void testTableSinkAutoRedirectFalse() throws Exception { + if (StringUtils.isNullOrWhitespaceOnly(getBenodes())) { + LOG.info("benodes is empty, skip the test."); + return; + } + initializeTable(TABLE_TBL_AUTO_REDIRECT); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'benodes' = '%s'," + + " 'auto-redirect' = 'false'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.label-prefix' = 'doris_sink" + + UUID.randomUUID() + + "'" + + ")", + getFenodes(), + getBenodes(), + DATABASE + "." + TABLE_TBL_AUTO_REDIRECT, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sinkDDL); + tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2"); + + Thread.sleep(10000); + List expected = Arrays.asList("doris,1", "flink,2"); + String query = + String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + } + @Test public void testTableBatch() throws Exception { initializeTable(TABLE_CSV_BATCH_TBL);