From 6f8c9aa6c560c47b42bfd51684481e1054abfc20 Mon Sep 17 00:00:00 2001 From: tiezhu Date: Tue, 21 Apr 2020 19:53:54 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E5=88=AB=E5=90=8Djoin=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/dtstack/flink/sql/side/SideInfo.java | 2 +- .../flink/sql/side/rdb/async/RdbAsyncSideInfo.java | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java index 029c86e25..4b53f77f7 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java @@ -93,7 +93,7 @@ public void parseSelectFields(JoinInfo joinInfo){ if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){ String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName()); fields.add(sideFieldName); - sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(sideFieldName)); + sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName())); sideFieldIndex.put(i, sideTableFieldIndex); sideFieldNameIndex.put(i, sideFieldName); sideTableFieldIndex++; diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java index 9e8c13080..f87b27ae4 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java @@ -18,8 +18,6 @@ package com.dtstack.flink.sql.side.rdb.async; -import org.apache.flink.api.java.typeutils.RowTypeInfo; - import com.dtstack.flink.sql.side.FieldInfo; import com.dtstack.flink.sql.side.JoinInfo; import com.dtstack.flink.sql.side.PredicateInfo; @@ -33,9 +31,11 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @@ -86,6 +86,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) { SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0]; SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1]; + Map physicalFields = sideTableInfo.getPhysicalFields(); String leftTableName = left.getComponent(0).getSimple(); String leftField = left.getComponent(1).getSimple(); @@ -94,7 +95,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) { String rightField = right.getComponent(1).getSimple(); if (leftTableName.equalsIgnoreCase(sideTableName)) { - equalFieldList.add(leftField); + equalFieldList.add(physicalFields.get(leftField)); int equalFieldIndex = -1; for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) { String fieldName = rowTypeInfo.getFieldNames()[i]; @@ -110,7 +111,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) { } else if (rightTableName.equalsIgnoreCase(sideTableName)) { - equalFieldList.add(rightField); + equalFieldList.add(physicalFields.get(rightField)); int equalFieldIndex = -1; for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) { String fieldName = rowTypeInfo.getFieldNames()[i];