diff --git a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java index 1c0c2a702..8b0c9ec10 100644 --- a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java +++ b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java @@ -20,6 +20,11 @@ import com.aiweiergou.tool.logger.api.ChangeLogLevelProcess; import com.dtstack.flink.sql.parser.CreateFuncParser; +import com.dtstack.flink.sql.parser.CreateTmpTableParser; +import com.dtstack.flink.sql.parser.FlinkPlanner; +import com.dtstack.flink.sql.parser.InsertSqlParser; +import com.dtstack.flink.sql.parser.SqlParser; +import com.dtstack.flink.sql.parser.SqlTree; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -63,17 +68,6 @@ import org.apache.calcite.sql.SqlNode; import org.apache.commons.io.Charsets; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.StreamQueryConfig; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java index 61546bb60..2296f64c5 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java @@ -48,6 +48,7 @@ import java.util.Set; import static org.apache.calcite.sql.SqlKind.IDENTIFIER; +import static org.apache.calcite.sql.SqlKind.LITERAL; /** * Parsing sql, obtain execution information dimension table diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java index 7a657263c..c41e2264a 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java @@ -47,23 +47,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.calcite.sql.SqlAsOperator; import org.apache.calcite.sql.SqlBasicCall; -import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlInsert; -import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlWithItem; -import org.apache.calcite.sql.fun.SqlCase; import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -71,7 +61,6 @@ import java.sql.Timestamp; import java.util.Arrays; -import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -168,8 +157,7 @@ public void exec(String sql, }else if (pollObj instanceof JoinInfo){ LOG.info("----------exec join info----------\n{}", pollObj.toString()); - preIsSideJoin = true; - joinFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList); + joinFun(pollObj, localTableCache, sideTableMap, tableEnv); } } @@ -295,7 +283,7 @@ private Table getTableFromCache(Map localTableCache, String table * * @return */ - private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, SideTableInfo sideTableInfo) { + private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, AbstractSideTableInfo sideTableInfo) { List conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo); if(CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))){ return true; @@ -303,7 +291,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, return false; } - private List convertPrimaryAlias(SideTableInfo sideTableInfo) { + private List convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) { List res = Lists.newArrayList(); sideTableInfo.getPrimaryKeys().forEach(field -> { res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field)); @@ -311,7 +299,7 @@ private List convertPrimaryAlias(SideTableInfo sideTableInfo) { return res; } - public List getConditionFields(SqlNode conditionNode, String specifyTableName, SideTableInfo sideTableInfo){ + public List getConditionFields(SqlNode conditionNode, String specifyTableName, AbstractSideTableInfo sideTableInfo){ List sqlNodeList = Lists.newArrayList(); ParseUtils.parseAnd(conditionNode, sqlNodeList); List conditionFields = Lists.newArrayList(); @@ -370,7 +358,7 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv, private void joinFun(Object pollObj, Map localTableCache, - Map sideTableMap, + Map sideTableMap, StreamTableEnvironment tableEnv) throws Exception{ JoinInfo joinInfo = (JoinInfo) pollObj; @@ -386,7 +374,7 @@ private void joinFun(Object pollObj, JoinScope.ScopeChild rightScopeChild = new JoinScope.ScopeChild(); rightScopeChild.setAlias(joinInfo.getRightTableAlias()); rightScopeChild.setTableName(joinInfo.getRightTableName()); - SideTableInfo sideTableInfo = sideTableMap.get(joinInfo.getRightTableName()); + AbstractSideTableInfo sideTableInfo = sideTableMap.get(joinInfo.getRightTableName()); if(sideTableInfo == null){ sideTableInfo = sideTableMap.get(joinInfo.getRightTableAlias()); } @@ -395,9 +383,9 @@ private void joinFun(Object pollObj, throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName()); } -// if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){ -// throw new RuntimeException("ON condition must contain all equal fields!!!"); -// } + if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){ + throw new RuntimeException("ON condition must contain all equal fields!!!"); + } rightScopeChild.setRowTypeInfo(sideTableInfo.getRowTypeInfo()); diff --git a/core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java b/core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java index 136779cda..1f210cbf2 100644 --- a/core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java @@ -112,12 +112,12 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){ Class fieldClass = null; - TableInfo.FieldExtraInfo fieldExtraInfo = null; + AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null; Matcher matcher = charTypePattern.matcher(fieldType); if (matcher.find()) { fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); - fieldExtraInfo = new TableInfo.FieldExtraInfo(); + fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo(); fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1))); } else { fieldClass = dbTypeConvertToJavaType(fieldType); diff --git a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java index d2c75c716..658020b1c 100644 --- a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java +++ b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java @@ -23,7 +23,7 @@ import com.dtstack.flink.sql.side.AbstractSideTableInfo; import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo; import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; -import com.dtstack.flink.sql.table.TableInfo; +import com.dtstack.flink.sql.table.AbstractTableInfo; import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -56,7 +56,7 @@ public String wrapperPlaceholder(String fieldName) { String rpadFormat = "rpad(?, %d, ' ')"; if (StringUtils.contains(type.toLowerCase(), "char")) { - TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos); + AbstractTableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos); int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); if (charLength > 0) { return String.format(rpadFormat, charLength); diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java index a687aa012..3a320658c 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java @@ -19,7 +19,7 @@ package com.dtstack.flink.sql.sink.oracle; import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; -import com.dtstack.flink.sql.table.TableInfo; +import com.dtstack.flink.sql.table.AbstractTableInfo; import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.commons.lang3.StringUtils; @@ -41,7 +41,7 @@ public class OracleDialect implements JDBCDialect { private List fieldList; private List fieldTypeList; - private List fieldExtraInfoList; + private List fieldExtraInfoList; @Override public boolean canHandle(String url) { @@ -133,7 +133,7 @@ public String wrapperPlaceholder(String fieldName) { String type = fieldTypeList.get(pos); if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { - TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos); + AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos); int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); if (charLength > 0) { return String.format(RPAD_FORMAT, charLength); @@ -151,7 +151,7 @@ public void setFieldTypeList(List fieldTypeList) { this.fieldTypeList = fieldTypeList; } - public void setFieldExtraInfoList(List fieldExtraInfoList) { + public void setFieldExtraInfoList(List fieldExtraInfoList) { this.fieldExtraInfoList = fieldExtraInfoList; } } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/AbstractRdbSink.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/AbstractRdbSink.java index e71d4f427..ee51c22d1 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/AbstractRdbSink.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/AbstractRdbSink.java @@ -20,9 +20,8 @@ import com.dtstack.flink.sql.sink.IStreamSinkGener; import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat; import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo; +import com.dtstack.flink.sql.table.AbstractTableInfo; import com.dtstack.flink.sql.table.AbstractTargetTableInfo; -import com.dtstack.flink.sql.table.TableInfo; -import com.dtstack.flink.sql.table.TargetTableInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -84,20 +83,20 @@ public abstract class AbstractRdbSink implements RetractStreamTableSink, Se public List fieldList; public List fieldTypeList; - public List fieldExtraInfoList; + public List fieldExtraInfoList; public AbstractRdbSink(JDBCDialect jdbcDialect) { this.jdbcDialect = jdbcDialect; } @Override - public RdbSink genStreamSink(TargetTableInfo targetTableInfo) { + public AbstractRdbSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { RdbTableInfo rdbTableInfo = (RdbTableInfo) targetTableInfo; this.batchNum = rdbTableInfo.getBatchSize() == null ? batchNum : rdbTableInfo.getBatchSize(); this.batchWaitInterval = rdbTableInfo.getBatchWaitInterval() == null ? batchWaitInterval : rdbTableInfo.getBatchWaitInterval(); this.parallelism = rdbTableInfo.getParallelism() == null ? parallelism : rdbTableInfo.getParallelism(); - this.dbURL = rdbTableInfo.getUrl(); + this.dbUrl = rdbTableInfo.getUrl(); this.userName = rdbTableInfo.getUserName(); this.password = rdbTableInfo.getPassword(); this.tableName = rdbTableInfo.getTableName();