From 22ac02a84f0c9dfe61b2635ab6cdd15ec47030cc Mon Sep 17 00:00:00 2001 From: xuchao Date: Fri, 3 Apr 2020 20:30:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=90=88=E5=B9=B6=E5=86=B2?= =?UTF-8?q?=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/flink/sql/exec/ExecuteProcessHelper.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java index 058ef6097..823cd5143 100644 --- a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java +++ b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java @@ -18,6 +18,7 @@ package com.dtstack.flink.sql.exec; +import com.dtstack.flink.sql.parser.*; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -27,11 +28,11 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import com.dtstack.flink.sql.classloader.ClassLoaderManager; -import com.dtstack.flink.sql.config.CalciteConfig; import com.dtstack.flink.sql.enums.ClusterMode; import com.dtstack.flink.sql.enums.ECacheType; import com.dtstack.flink.sql.enums.EPluginLoadMode; @@ -40,11 +41,6 @@ import com.dtstack.flink.sql.function.FunctionManager; import com.dtstack.flink.sql.option.OptionParser; import com.dtstack.flink.sql.option.Options; -import com.dtstack.flink.sql.parser.CreateFuncParser; -import com.dtstack.flink.sql.parser.CreateTmpTableParser; -import com.dtstack.flink.sql.parser.InsertSqlParser; -import com.dtstack.flink.sql.parser.SqlParser; -import com.dtstack.flink.sql.parser.SqlTree; import com.dtstack.flink.sql.side.SideSqlExec; import com.dtstack.flink.sql.side.SideTableInfo; import com.dtstack.flink.sql.sink.StreamSinkFactory; @@ -210,7 +206,8 @@ private static void sqlTranslation(String localSqlPluginPath, CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName); String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", ""); - SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt(); + FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner(); + SqlNode sqlNode = flinkPlanner.parse(realSql); String tmpSql = ((SqlInsert) sqlNode).getSource().toString(); tmp.setExecSql(tmpSql); sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);