diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java index a7c6db9eb..e2940c4f7 100644 --- a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java @@ -57,6 +57,7 @@ public void parseSql(String sql, SqlTree sqlTree) { .configBuilder() .setLex(Lex.MYSQL) .build(); + SqlParser sqlParser = SqlParser.create(sql,config); SqlNode sqlNode = null; try { diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java index 17bc91b40..d321130cc 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java @@ -731,9 +731,9 @@ private boolean checkAndRemoveWhereCondition(Set fromTableNameSet, Set conditionRefTableNameSet = Sets.newHashSet(); fieldInfos.forEach(fieldInfo -> { - String[] splitInfo = StringUtils.split(fieldInfo, ","); + String[] splitInfo = StringUtils.split(fieldInfo, "."); if(splitInfo.length == 2){ - conditionRefTableNameSet.add(splitInfo[1]); + conditionRefTableNameSet.add(splitInfo[0]); } }); diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java index df41e1663..029c86e25 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java @@ -55,6 +55,8 @@ public abstract class SideInfo implements Serializable{ protected String sideSelectFields = ""; + protected Map sideSelectFieldsType = Maps.newHashMap(); + protected JoinType joinType; //key:Returns the value of the position, value: the ref field index​in the input table @@ -84,15 +86,17 @@ public void parseSelectFields(JoinInfo joinInfo){ String sideTableName = joinInfo.getSideTableName(); String nonSideTableName = joinInfo.getNonSideTable(); List fields = Lists.newArrayList(); + int sideTableFieldIndex = 0; - int sideIndex = 0; for( int i=0; i getSideFieldNameIndex() { public void setSideFieldNameIndex(Map sideFieldNameIndex) { this.sideFieldNameIndex = sideFieldNameIndex; } + + public Map getSideSelectFieldsType() { + return sideSelectFieldsType; + } + + public void setSideSelectFieldsType(Map sideSelectFieldsType) { + this.sideSelectFieldsType = sideSelectFieldsType; + } + + public String getSelectSideFieldType(int index){ + return sideSelectFieldsType.get(index); + } } diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java index 920dec07c..906149a96 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java @@ -394,7 +394,6 @@ private void joinFun(Object pollObj, HashBasedTable mappingTable = ((JoinInfo) pollObj).getTableFieldRef(); //获取两个表的所有字段 - //TODO 抽取 List sideJoinFieldInfo = ParserJoinField.getRowTypeInfo(joinInfo.getSelectNode(), joinScope, true); //通过join的查询字段信息过滤出需要的字段信息 sideJoinFieldInfo.removeIf(tmpFieldInfo -> mappingTable.get(tmpFieldInfo.getTable(), tmpFieldInfo.getFieldName()) == null); diff --git a/core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java b/core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java index 39d79c065..e62f3fcfa 100644 --- a/core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java +++ b/core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java @@ -26,6 +26,7 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.typesafe.config.ConfigException; import org.apache.calcite.sql.SqlAsOperator; import org.apache.calcite.sql.SqlBasicCall; @@ -200,7 +201,14 @@ public static SqlBasicCall buildAsNodeByJoinInfo(JoinInfo joinInfo, SqlNode sqlN String joinLeftTableAlias = joinInfo.getLeftTableAlias(); joinLeftTableName = Strings.isNullOrEmpty(joinLeftTableName) ? joinLeftTableAlias : joinLeftTableName; String newTableName = buildInternalTableName(joinLeftTableName, SPLIT, joinInfo.getRightTableName()); - String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : buildInternalTableName(joinInfo.getLeftTableAlias(), SPLIT, joinInfo.getRightTableAlias()); + String lefTbAlias = joinInfo.getLeftTableAlias(); + if(Strings.isNullOrEmpty(lefTbAlias)){ + Set fromTableSet = Sets.newHashSet(); + TableUtils.getFromTableInfo(joinInfo.getLeftNode(), fromTableSet); + lefTbAlias = StringUtils.join(fromTableSet, "_"); + } + + String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : buildInternalTableName(lefTbAlias, SPLIT, joinInfo.getRightTableAlias()); if (null == sqlNode0) { sqlNode0 = new SqlIdentifier(newTableName, null, sqlParserPos); diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java index a48e5eb0f..dd89d1671 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java @@ -201,7 +201,6 @@ protected List getRows(Row inputRow, List cacheContent, List entry : sideInfo.getInFieldIndex().entrySet()) { Object obj = input.getField(entry.getValue()); boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass()); @@ -216,7 +215,8 @@ public Row fillData(Row input, Object line) { if (jsonArray == null) { row.setField(entry.getKey(), null); } else { - Object object = SwitchUtil.getTarget(jsonArray.getValue(entry.getValue()), fields[entry.getValue()]); + String fieldType = sideInfo.getSelectSideFieldType(entry.getValue()); + Object object = SwitchUtil.getTarget(jsonArray.getValue(entry.getValue()), fieldType); row.setField(entry.getKey(), object); } }