diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index bc8a4f08f5dd..091dfc57945a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.sql.utils.SqlSplitUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -60,8 +61,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.slf4j.Logger; - import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -98,13 +97,8 @@ public class SqlTask extends AbstractTask { public static final int TEST_FLAG_YES = 1; - private static final String SQL_SEPARATOR = ";\n"; + private final DbType dbType; - /** - * Abstract Yarn Task - * - * @param taskRequest taskRequest - */ public SqlTask(TaskExecutionContext taskRequest) { super(taskRequest); this.taskExecutionContext = taskRequest; @@ -119,6 +113,7 @@ public SqlTask(TaskExecutionContext taskRequest) { sqlTaskExecutionContext = sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); + dbType = DbType.valueOf(sqlParameters.getType()); } @Override @@ -140,18 +135,17 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { sqlParameters.getConnParams(), sqlParameters.getVarPool(), sqlParameters.getLimit()); - String separator = SQL_SEPARATOR; try { // get datasource - baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams( - DbType.valueOf(sqlParameters.getType()), + baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType, sqlTaskExecutionContext.getConnectionParams()); - if (DbType.valueOf(sqlParameters.getType()).isSupportMultipleStatement()) { - separator = ""; - } + List subSqls = + dbType.isSupportMultipleStatement() ? Collections.singletonList(sqlParameters.getSql()) + : SqlSplitUtils.splitSql(sqlParameters.getSql()); + // ready to execute SQL and parameter entity Map - List mainStatementSqlBinds = split(sqlParameters.getSql(), separator) + List mainStatementSqlBinds = subSqls .stream() .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); @@ -167,7 +161,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); - List createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList(), log); + List createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList()); // execute sql task executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); @@ -186,31 +180,6 @@ public void cancel() throws TaskException { } - /** - * split sql by segment separator - *

The segment separator is used - * when the data source does not support multi-segment SQL execution, - * and the client needs to split the SQL and execute it multiple times.

- * @param sql - * @param segmentSeparator - * @return - */ - public static List split(String sql, String segmentSeparator) { - if (StringUtils.isEmpty(segmentSeparator)) { - return Collections.singletonList(sql); - } - - String[] lines = sql.split(segmentSeparator); - List segments = new ArrayList<>(); - for (String line : lines) { - if (line.trim().isEmpty() || line.startsWith("--")) { - continue; - } - segments.add(line); - } - return segments; - } - /** * execute function and sql * @@ -526,10 +495,9 @@ private String replaceOriginalValue(String content, String rgex, Map createFuncs(List udfFuncParameters, Logger log) { + private List createFuncs(List udfFuncParameters) { if (CollectionUtils.isEmpty(udfFuncParameters)) { log.info("can't find udf function resource"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/utils/SqlSplitUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/utils/SqlSplitUtils.java new file mode 100644 index 000000000000..e4b3306c375a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/utils/SqlSplitUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql.utils; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class SqlSplitUtils { + + private static final String UNIX_SQL_SEPARATOR = ";\n"; + private static final String WINDOWS_SQL_SEPARATOR = ";\r\n"; + + /** + * split sql to submit sql. + * e.g. + *
+     *     select * from table1\n;select * from table2\n;select * from table2\r\n;
+     * 
+ * will be split to + *
+     *     select * from table1
+     *     select * from table2
+     * 
+ */ + public static List splitSql(String sql) { + + return Arrays.stream(sql.replaceAll(WINDOWS_SQL_SEPARATOR, UNIX_SQL_SEPARATOR).split(UNIX_SQL_SEPARATOR)) + .filter(subSql -> { + String trim = subSql.trim(); + return !trim.isEmpty() && !trim.startsWith("--"); + }).collect(Collectors.toList()); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java index a46670f2274d..268e28db763f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java @@ -138,4 +138,8 @@ void testReplacingSqlNonGreedy() { sqlTask.setSqlParamsMap(querySql, sqlTask.rgex, sqlParamsMap, paramsMap, 1); Assertions.assertEquals(sqlParamsMap, expectedSQLParamsMap); } + + @Test + void splitSql() { + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/utils/SqlSplitUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/utils/SqlSplitUtilsTest.java new file mode 100644 index 000000000000..3f6de718095d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/utils/SqlSplitUtilsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Lists; + +class SqlSplitUtilsTest { + + @Test + void splitSql() { + String sql = "select * from table1;\nselect * from table2;\nselect * from table3;\r\n"; + Assertions.assertEquals( + Lists.newArrayList("select * from table1", "select * from table2", "select * from table3"), + SqlSplitUtils.splitSql(sql)); + } +}