diff --git a/core/src/main/java/com/dtstack/flink/sql/Main.java b/core/src/main/java/com/dtstack/flink/sql/Main.java index fed359c2f..443033b3b 100644 --- a/core/src/main/java/com/dtstack/flink/sql/Main.java +++ b/core/src/main/java/com/dtstack/flink/sql/Main.java @@ -149,7 +149,13 @@ public static void main(String[] args) throws Exception { env.execute(name); } - private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map sideTableMap,Map registerTableCache, StreamQueryConfig queryConfig) throws Exception { + private static void sqlTranslation(String localSqlPluginPath, + StreamTableEnvironment tableEnv, + SqlTree sqlTree, + Map sideTableMap, + Map registerTableCache, + StreamQueryConfig queryConfig) throws Exception { + SideSqlExec sideSqlExec = new SideSqlExec(); sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath); for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) { diff --git a/core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java index 85bad8c2c..1259ddecf 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java @@ -85,4 +85,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(table, fieldName); } + + @Override + public String toString() { + return "FieldInfo{" + + "table='" + table + '\'' + + ", fieldName='" + fieldName + '\'' + + ", typeInformation=" + typeInformation + + '}'; + } } diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java index 8254b763e..8854ff4ec 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java @@ -41,7 +41,7 @@ public class JoinInfo implements Serializable { private static final long serialVersionUID = -1L; //左表是否是维表 - private boolean leftIsSideTable; + private boolean leftIsSideTable = false; //右表是否是维表 private boolean rightIsSideTable; diff --git a/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java b/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java index 33bd2b293..17bc91b40 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java @@ -74,7 +74,7 @@ public JoinNodeDealer(SideSQLParser sideSQLParser){ * 解析 join 操作 * @param joinNode * @param sideTableSet 标明哪些表名是维表 - * @param queueInfo + * @param queueInfo sql执行队列 * @param parentWhere join 关联的最上层的where 节点 * @param parentSelectList join 关联的最上层的select 节点 * @param joinFieldSet @@ -99,18 +99,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, String rightTableName = ""; String rightTableAlias = ""; - //TODO 含义需要更明确 - HashBiMap fieldReplaceRef = HashBiMap.create(); - - //如果是连续join 判断是否已经处理过添加到执行队列 + //抽取join中的的条件 extractJoinField(joinNode.getCondition(), joinFieldSet); - if(leftNode.getKind() == IDENTIFIER){ - leftTbName = leftNode.toString(); - } else if (leftNode.getKind() == JOIN) { + if (leftNode.getKind() == JOIN) { //处理连续join dealNestJoin(joinNode, sideTableSet, - queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList, fieldReplaceRef); + queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList); leftNode = joinNode.getLeft(); } @@ -135,24 +130,16 @@ public JoinInfo dealJoinNode(SqlJoin joinNode, JoinInfo tableInfo = new JoinInfo(); tableInfo.setLeftTableName(leftTbName); tableInfo.setRightTableName(rightTableName); - if (StringUtils.isEmpty(leftTbAlias)){ - tableInfo.setLeftTableAlias(leftTbName); - } else { - tableInfo.setLeftTableAlias(leftTbAlias); - } - if (StringUtils.isEmpty(rightTableAlias)){ - tableInfo.setRightTableAlias(rightTableName); - } else { - tableInfo.setRightTableAlias(rightTableAlias); - } + leftTbAlias = StringUtils.isEmpty(leftTbAlias) ? leftTbName : leftTbAlias; + rightTableAlias = StringUtils.isEmpty(rightTableAlias) ? rightTableName : rightTableAlias; - tableInfo.setLeftIsSideTable(leftIsSide); + tableInfo.setLeftTableAlias(leftTbAlias); + tableInfo.setRightTableAlias(rightTableAlias); tableInfo.setRightIsSideTable(rightIsSide); tableInfo.setLeftNode(leftNode); tableInfo.setRightNode(rightNode); tableInfo.setJoinType(joinType); - tableInfo.setCondition(joinNode.getCondition()); TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef); @@ -216,6 +203,15 @@ public void extractJoinNeedSelectField(SqlNode leftNode, tableInfo.setRightSelectFieldInfo(rightTbSelectField); } + /** + * 指定的节点关联到的 select 中的字段和 where中的字段 + * @param sqlNode + * @param parentWhere + * @param parentSelectList + * @param tableRef + * @param joinFieldSet + * @return + */ public Set extractField(SqlNode sqlNode, SqlNode parentWhere, SqlNodeList parentSelectList, @@ -250,8 +246,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, Set> joinFieldSet, Map tableRef, Map fieldRef, - SqlNodeList parentSelectList, - HashBiMap fieldReplaceRef){ + SqlNodeList parentSelectList){ SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft(); SqlNode parentRightJoinNode = joinNode.getRight(); @@ -267,7 +262,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, SqlBasicCall buildAs = TableUtils.buildAsNodeByJoinInfo(joinInfo, null, null); if(rightIsSide){ - addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, fieldReplaceRef, tableRef); + addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, tableRef); } SqlNode newLeftNode = joinNode.getLeft(); @@ -280,7 +275,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, //替换leftNode 为新的查询 joinNode.setLeft(buildAs); - replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef); + replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere); } return joinInfo; @@ -294,7 +289,6 @@ private JoinInfo dealNestJoin(SqlJoin joinNode, * @param joinNode * @param parentSelectList * @param parentWhere - * @param fieldReplaceRef * @param tableRef */ public void addSideInfoToExeQueue(Queue queueInfo, @@ -302,7 +296,6 @@ public void addSideInfoToExeQueue(Queue queueInfo, SqlJoin joinNode, SqlNodeList parentSelectList, SqlNode parentWhere, - HashBiMap fieldReplaceRef, Map tableRef){ //只处理维表 if(!joinInfo.isRightIsSideTable()){ @@ -315,7 +308,7 @@ public void addSideInfoToExeQueue(Queue queueInfo, //替换左表为新的表名称 joinNode.setLeft(buildAs); - replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef); + replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere); } /** @@ -325,14 +318,12 @@ public void addSideInfoToExeQueue(Queue queueInfo, * @param tableRef * @param parentSelectList * @param parentWhere - * @param fieldReplaceRef */ public void replaceSelectAndWhereField(SqlBasicCall buildAs, SqlNode leftJoinNode, Map tableRef, SqlNodeList parentSelectList, - SqlNode parentWhere, - HashBiMap fieldReplaceRef){ + SqlNode parentWhere){ String newLeftTableName = buildAs.getOperands()[1].toString(); Set fromTableNameSet = Sets.newHashSet(); @@ -343,6 +334,7 @@ public void replaceSelectAndWhereField(SqlBasicCall buildAs, } //替换select field 中的对应字段 + HashBiMap fieldReplaceRef = HashBiMap.create(); for(SqlNode sqlNode : parentSelectList.getList()){ for(String tbTmp : fromTableNameSet) { TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef); diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java index 061fe52a2..a94d218a2 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java @@ -90,48 +90,16 @@ public Queue getExeQueue(String exeSql, Set sideTableSet) throws return queueInfo; } - private void checkAndReplaceMultiJoin(SqlNode sqlNode, Set sideTableSet) { - SqlKind sqlKind = sqlNode.getKind(); - switch (sqlKind) { - case WITH: { - SqlWith sqlWith = (SqlWith) sqlNode; - SqlNodeList sqlNodeList = sqlWith.withList; - for (SqlNode withAsTable : sqlNodeList) { - SqlWithItem sqlWithItem = (SqlWithItem) withAsTable; - checkAndReplaceMultiJoin(sqlWithItem.query, sideTableSet); - } - checkAndReplaceMultiJoin(sqlWith.body, sideTableSet); - break; - } - case INSERT: - SqlNode sqlSource = ((SqlInsert) sqlNode).getSource(); - checkAndReplaceMultiJoin(sqlSource, sideTableSet); - break; - case SELECT: - SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom(); - if (sqlFrom.getKind() != IDENTIFIER) { - checkAndReplaceMultiJoin(sqlFrom, sideTableSet); - } - break; - case JOIN: - convertSideJoinToNewQuery((SqlJoin) sqlNode, sideTableSet); - break; - case AS: - SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0]; - if (info.getKind() != IDENTIFIER) { - checkAndReplaceMultiJoin(info, sideTableSet); - } - break; - case UNION: - SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0]; - SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1]; - checkAndReplaceMultiJoin(unionLeft, sideTableSet); - checkAndReplaceMultiJoin(unionRight, sideTableSet); - break; - } - } - + /** + * 解析 sql 根据维表 join关系重新组装新的sql + * @param sqlNode + * @param sideTableSet + * @param queueInfo + * @param parentWhere + * @param parentSelectList + * @return + */ public Object parseSql(SqlNode sqlNode, Set sideTableSet, Queue queueInfo, SqlNode parentWhere, SqlNodeList parentSelectList){ SqlKind sqlKind = sqlNode.getKind(); switch (sqlKind){ @@ -175,7 +143,9 @@ public Object parseSql(SqlNode sqlNode, Set sideTableSet, Queue JoinNodeDealer joinNodeDealer = new JoinNodeDealer(this); Set> joinFieldSet = Sets.newHashSet(); Map tableRef = Maps.newHashMap(); - return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef); + Map fieldRef = Maps.newHashMap(); + return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo, + parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef); case AS: SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0]; SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1]; @@ -207,26 +177,7 @@ public Object parseSql(SqlNode sqlNode, Set sideTableSet, Queue return ""; } - private AliasInfo getSqlNodeAliasInfo(SqlNode sqlNode) { - SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0]; - SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1]; - String infoStr = info.getKind() == IDENTIFIER ? info.toString() : null; - - AliasInfo aliasInfo = new AliasInfo(); - aliasInfo.setName(infoStr); - aliasInfo.setAlias(alias.toString()); - return aliasInfo; - } - /** - * 将和维表关联的join 替换为一个新的查询 - * @param sqlNode - * @param sideTableSet - */ - private void convertSideJoinToNewQuery(SqlJoin sqlNode, Set sideTableSet) { - checkAndReplaceMultiJoin(sqlNode.getLeft(), sideTableSet); - checkAndReplaceMultiJoin(sqlNode.getRight(), sideTableSet); - } public void setLocalTableCache(Map localTableCache) { diff --git a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java index 0a9668916..920dec07c 100644 --- a/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java +++ b/core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java @@ -84,8 +84,12 @@ public class SideSqlExec { private Map localTableCache = Maps.newHashMap(); - public void exec(String sql, Map sideTableMap, StreamTableEnvironment tableEnv, - Map tableCache, StreamQueryConfig queryConfig, CreateTmpTableParser.SqlParserResult createView) throws Exception { + public void exec(String sql, + Map sideTableMap, + StreamTableEnvironment tableEnv, + Map tableCache, + StreamQueryConfig queryConfig, + CreateTmpTableParser.SqlParserResult createView) throws Exception { if(localSqlPluginPath == null){ throw new RuntimeException("need to set localSqlPluginPath"); } @@ -192,66 +196,6 @@ private FieldReplaceInfo parseAsQuery(SqlBasicCall asSqlNode, Map } - /** - * 添加字段别名 - * @param pollSqlNode - * @param fieldList - * @param mappingTable - */ - private void addAliasForFieldNode(SqlNode pollSqlNode, List fieldList, HashBasedTable mappingTable) { - SqlKind sqlKind = pollSqlNode.getKind(); - switch (sqlKind) { - case INSERT: - SqlNode source = ((SqlInsert) pollSqlNode).getSource(); - addAliasForFieldNode(source, fieldList, mappingTable); - break; - case AS: - addAliasForFieldNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable); - break; - case SELECT: - SqlNodeList selectList = ((SqlSelect) pollSqlNode).getSelectList(); - selectList.getList().forEach(node -> { - if (node.getKind() == IDENTIFIER) { - SqlIdentifier sqlIdentifier = (SqlIdentifier) node; - if (sqlIdentifier.names.size() == 1) { - return; - } - // save real field - String fieldName = sqlIdentifier.names.get(1); - if (!fieldName.endsWith("0") || fieldName.endsWith("0") && mappingTable.columnMap().containsKey(fieldName)) { - fieldList.add(fieldName); - } - - } - }); - for (int i = 0; i < selectList.getList().size(); i++) { - SqlNode node = selectList.get(i); - if (node.getKind() == IDENTIFIER) { - SqlIdentifier sqlIdentifier = (SqlIdentifier) node; - if (sqlIdentifier.names.size() == 1) { - return; - } - String name = sqlIdentifier.names.get(1); - // avoid real field pv0 convert pv - if (name.endsWith("0") && !fieldList.contains(name) && !fieldList.contains(name.substring(0, name.length() - 1))) { - SqlOperator operator = new SqlAsOperator(); - SqlParserPos sqlParserPos = new SqlParserPos(0, 0); - - SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(name.substring(0, name.length() - 1), null, sqlParserPos); - SqlNode[] sqlNodes = new SqlNode[2]; - sqlNodes[0] = sqlIdentifier; - sqlNodes[1] = sqlIdentifierAlias; - SqlBasicCall sqlBasicCall = new SqlBasicCall(operator, sqlNodes, sqlParserPos); - - selectList.set(i, sqlBasicCall); - } - } - } - break; - } - } - - public AliasInfo parseASNode(SqlNode sqlNode) throws SqlParseException { SqlKind sqlKind = sqlNode.getKind(); if(sqlKind != AS){ diff --git a/core/src/main/java/com/dtstack/flink/sql/util/FieldReplaceUtil.java b/core/src/main/java/com/dtstack/flink/sql/util/FieldReplaceUtil.java new file mode 100644 index 000000000..10919ca5b --- /dev/null +++ b/core/src/main/java/com/dtstack/flink/sql/util/FieldReplaceUtil.java @@ -0,0 +1,339 @@ +package com.dtstack.flink.sql.util; + +import com.dtstack.flink.sql.side.FieldReplaceInfo; +import com.google.common.collect.Lists; +import org.apache.calcite.sql.*; +import org.apache.calcite.sql.fun.SqlCase; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.commons.collections.CollectionUtils; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.apache.calcite.sql.SqlKind.*; + +/** + * 替换 字段 + */ +public class FieldReplaceUtil { + + /** + * 需要考虑更多的情况 + */ + public static void replaceFieldName(SqlNode sqlNode, + String oldTbName, + String newTbName, + Map mappingField) { + SqlKind sqlKind = sqlNode.getKind(); + switch (sqlKind) { + case INSERT: + SqlNode sqlSource = ((SqlInsert) sqlNode).getSource(); + replaceFieldName(sqlSource, oldTbName, newTbName, mappingField); + break; + case AS: + SqlNode asNode = ((SqlBasicCall) sqlNode).getOperands()[0]; + replaceFieldName(asNode, oldTbName, newTbName, mappingField); + break; + case SELECT: + SqlSelect sqlSelect = (SqlSelect) sqlNode; + SqlNodeList sqlSelectList = sqlSelect.getSelectList(); + SqlNode whereNode = sqlSelect.getWhere(); + SqlNodeList sqlGroup = sqlSelect.getGroup(); + + //TODO 抽取,暂时使用使用单个join条件作为测试 + if(sqlSelect.getFrom().getKind().equals(JOIN)){ + SqlJoin joinNode = (SqlJoin) sqlSelect.getFrom(); + SqlNode joinCondition = joinNode.getCondition(); + replaceFieldName(((SqlBasicCall)joinCondition).operands[0], oldTbName, newTbName, mappingField); + replaceFieldName(((SqlBasicCall)joinCondition).operands[1], oldTbName, newTbName, mappingField); + } + + //TODO 暂时不处理having + SqlNode sqlHaving = sqlSelect.getHaving(); + + List newSelectNodeList = Lists.newArrayList(); + for( int i=0; i replaceNodeList = replaceSelectStarFieldName(selectNode, replaceInfo); + //newSelectNodeList.addAll(replaceNodeList); + throw new RuntimeException("not support table.* now"); + } + + SqlNode replaceNode = replaceSelectFieldName(selectNode, oldTbName, newTbName, mappingField); + if(replaceNode == null){ + continue; + } + + newSelectNodeList.add(replaceNode); + } + + SqlNodeList newSelectList = new SqlNodeList(newSelectNodeList, sqlSelectList.getParserPosition()); + sqlSelect.setSelectList(newSelectList); + + //where + if(whereNode != null){ + SqlNode[] sqlNodeList = ((SqlBasicCall)whereNode).getOperands(); + for(int i =0; i mappingField) { + if(orderNode.getKind() == IDENTIFIER){ + return createNewIdentify((SqlIdentifier) orderNode, oldTbName, newTbName, mappingField); + } else if (orderNode instanceof SqlBasicCall) { + SqlBasicCall sqlBasicCall = (SqlBasicCall) orderNode; + for(int i=0; i mappingField){ + if(groupNode.getKind() == IDENTIFIER){ + return createNewIdentify((SqlIdentifier) groupNode, oldTbName, newTbName, mappingField); + }else if(groupNode instanceof SqlBasicCall){ + SqlBasicCall sqlBasicCall = (SqlBasicCall) groupNode; + for(int i=0; i mappingField){ + + if (sqlIdentifier.names.size() == 1) { + return sqlIdentifier; + } + + String tableName = sqlIdentifier.names.get(0); + String fieldName = sqlIdentifier.names.get(1); + if(!tableName.equalsIgnoreCase(oldTbName)){ + return sqlIdentifier; + } + + String mappingFieldName = mappingField.get(fieldName); + if(mappingFieldName == null){ + return sqlIdentifier; + } + + sqlIdentifier = sqlIdentifier.setName(0, newTbName); + sqlIdentifier = sqlIdentifier.setName(1, mappingFieldName); + return sqlIdentifier; + } + + public static boolean filterNodeWithTargetName(SqlNode sqlNode, String targetTableName) { + + SqlKind sqlKind = sqlNode.getKind(); + switch (sqlKind){ + case SELECT: + SqlNode fromNode = ((SqlSelect)sqlNode).getFrom(); + if(fromNode.getKind() == AS && ((SqlBasicCall)fromNode).getOperands()[0].getKind() == IDENTIFIER){ + if(((SqlBasicCall)fromNode).getOperands()[0].toString().equalsIgnoreCase(targetTableName)){ + return true; + }else{ + return false; + } + }else{ + return filterNodeWithTargetName(fromNode, targetTableName); + } + case AS: + SqlNode aliasName = ((SqlBasicCall)sqlNode).getOperands()[1]; + return aliasName.toString().equalsIgnoreCase(targetTableName); + case JOIN: + SqlNode leftNode = ((SqlJoin)sqlNode).getLeft(); + SqlNode rightNode = ((SqlJoin)sqlNode).getRight(); + boolean leftReturn = filterNodeWithTargetName(leftNode, targetTableName); + boolean rightReturn = filterNodeWithTargetName(rightNode, targetTableName); + + return leftReturn || rightReturn; + + default: + return false; + } + } + + public static SqlNode replaceSelectFieldName(SqlNode selectNode, + String oldTbName, + String newTbName, + Map mappingField) { + + if (selectNode.getKind() == AS) { + SqlNode leftNode = ((SqlBasicCall) selectNode).getOperands()[0]; + SqlNode replaceNode = replaceSelectFieldName(leftNode, oldTbName, newTbName, mappingField); + if (replaceNode != null) { + ((SqlBasicCall) selectNode).getOperands()[0] = replaceNode; + } + + return selectNode; + }else if(selectNode.getKind() == IDENTIFIER){ + return createNewIdentify((SqlIdentifier) selectNode, oldTbName, newTbName, mappingField); + }else if(selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN){//字面含义 + return selectNode; + }else if( AGGREGATE.contains(selectNode.getKind()) + || AVG_AGG_FUNCTIONS.contains(selectNode.getKind()) + || COMPARISON.contains(selectNode.getKind()) + || selectNode.getKind() == OTHER_FUNCTION + || selectNode.getKind() == DIVIDE + || selectNode.getKind() == CAST + || selectNode.getKind() == TRIM + || selectNode.getKind() == TIMES + || selectNode.getKind() == PLUS + || selectNode.getKind() == NOT_IN + || selectNode.getKind() == OR + || selectNode.getKind() == AND + || selectNode.getKind() == MINUS + || selectNode.getKind() == TUMBLE + || selectNode.getKind() == TUMBLE_START + || selectNode.getKind() == TUMBLE_END + || selectNode.getKind() == SESSION + || selectNode.getKind() == SESSION_START + || selectNode.getKind() == SESSION_END + || selectNode.getKind() == HOP + || selectNode.getKind() == HOP_START + || selectNode.getKind() == HOP_END + || selectNode.getKind() == BETWEEN + || selectNode.getKind() == IS_NULL + || selectNode.getKind() == IS_NOT_NULL + || selectNode.getKind() == CONTAINS + || selectNode.getKind() == TIMESTAMP_ADD + || selectNode.getKind() == TIMESTAMP_DIFF + || selectNode.getKind() == LIKE + + ){ + SqlBasicCall sqlBasicCall = (SqlBasicCall) selectNode; + for(int i=0; i replaceSelectStarFieldName(SqlNode selectNode, FieldReplaceInfo replaceInfo){ + SqlIdentifier sqlIdentifier = (SqlIdentifier) selectNode; + List sqlNodes = Lists.newArrayList(); + if(sqlIdentifier.isStar()){//处理 [* or table.*] + int identifierSize = sqlIdentifier.names.size(); + Collection columns = null; + if(identifierSize == 1){ + columns = replaceInfo.getMappingTable().values(); + }else{ + columns = replaceInfo.getMappingTable().row(sqlIdentifier.names.get(0)).values(); + } + + for(String colAlias : columns){ + SqlParserPos sqlParserPos = new SqlParserPos(0, 0); + List columnInfo = Lists.newArrayList(); + columnInfo.add(replaceInfo.getTargetTableAlias()); + columnInfo.add(colAlias); + SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(columnInfo, sqlParserPos); + sqlNodes.add(sqlIdentifierAlias); + } + + return sqlNodes; + }else{ + throw new RuntimeException("is not a star select field." + selectNode); + } + } + +}