Skip to content

Commit

Permalink
Fix SqlTask cannot split the given sql when browser in windows (#15062)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Oct 26, 2023
1 parent a5284e4 commit 99214d0
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.sql.utils.SqlSplitUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;

Expand All @@ -60,8 +61,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.slf4j.Logger;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

Expand Down Expand Up @@ -98,13 +97,8 @@ public class SqlTask extends AbstractTask {

public static final int TEST_FLAG_YES = 1;

private static final String SQL_SEPARATOR = ";\n";
private final DbType dbType;

/**
* Abstract Yarn Task
*
* @param taskRequest taskRequest
*/
public SqlTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
Expand All @@ -119,6 +113,7 @@ public SqlTask(TaskExecutionContext taskRequest) {

sqlTaskExecutionContext =
sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
dbType = DbType.valueOf(sqlParameters.getType());
}

@Override
Expand All @@ -140,18 +135,17 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
sqlParameters.getConnParams(),
sqlParameters.getVarPool(),
sqlParameters.getLimit());
String separator = SQL_SEPARATOR;
try {

// get datasource
baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(
DbType.valueOf(sqlParameters.getType()),
baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
sqlTaskExecutionContext.getConnectionParams());
if (DbType.valueOf(sqlParameters.getType()).isSupportMultipleStatement()) {
separator = "";
}
List<String> subSqls =
dbType.isSupportMultipleStatement() ? Collections.singletonList(sqlParameters.getSql())
: SqlSplitUtils.splitSql(sqlParameters.getSql());

// ready to execute SQL and parameter entity Map
List<SqlBinds> mainStatementSqlBinds = split(sqlParameters.getSql(), separator)
List<SqlBinds> mainStatementSqlBinds = subSqls
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
Expand All @@ -167,7 +161,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());

List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList(), log);
List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList());

// execute sql task
executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
Expand All @@ -186,31 +180,6 @@ public void cancel() throws TaskException {

}

/**
* split sql by segment separator
* <p>The segment separator is used
* when the data source does not support multi-segment SQL execution,
* and the client needs to split the SQL and execute it multiple times.</p>
* @param sql
* @param segmentSeparator
* @return
*/
public static List<String> split(String sql, String segmentSeparator) {
if (StringUtils.isEmpty(segmentSeparator)) {
return Collections.singletonList(sql);
}

String[] lines = sql.split(segmentSeparator);
List<String> segments = new ArrayList<>();
for (String line : lines) {
if (line.trim().isEmpty() || line.startsWith("--")) {
continue;
}
segments.add(line);
}
return segments;
}

/**
* execute function and sql
*
Expand Down Expand Up @@ -526,10 +495,9 @@ private String replaceOriginalValue(String content, String rgex, Map<String, Pro
* create function list
*
* @param udfFuncParameters udfFuncParameters
* @param log log
* @return
*/
private List<String> createFuncs(List<UdfFuncParameters> udfFuncParameters, Logger log) {
private List<String> createFuncs(List<UdfFuncParameters> udfFuncParameters) {

if (CollectionUtils.isEmpty(udfFuncParameters)) {
log.info("can't find udf function resource");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.sql.utils;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class SqlSplitUtils {

private static final String UNIX_SQL_SEPARATOR = ";\n";
private static final String WINDOWS_SQL_SEPARATOR = ";\r\n";

/**
* split sql to submit sql.
* e.g.
* <pre>
* select * from table1\n;select * from table2\n;select * from table2\r\n;
* </pre>
* will be split to
* <pre>
* select * from table1
* select * from table2
* </pre>
*/
public static List<String> splitSql(String sql) {

return Arrays.stream(sql.replaceAll(WINDOWS_SQL_SEPARATOR, UNIX_SQL_SEPARATOR).split(UNIX_SQL_SEPARATOR))
.filter(subSql -> {
String trim = subSql.trim();
return !trim.isEmpty() && !trim.startsWith("--");
}).collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,8 @@ void testReplacingSqlNonGreedy() {
sqlTask.setSqlParamsMap(querySql, sqlTask.rgex, sqlParamsMap, paramsMap, 1);
Assertions.assertEquals(sqlParamsMap, expectedSQLParamsMap);
}

@Test
void splitSql() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.sql.utils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Lists;

class SqlSplitUtilsTest {

@Test
void splitSql() {
String sql = "select * from table1;\nselect * from table2;\nselect * from table3;\r\n";
Assertions.assertEquals(
Lists.newArrayList("select * from table1", "select * from table2", "select * from table3"),
SqlSplitUtils.splitSql(sql));
}
}

0 comments on commit 99214d0

Please sign in to comment.