Skip to content

Commit

Permalink
初步重构 sql语句解析逻辑;
Browse files Browse the repository at this point in the history
字段替换和重命名相关的逻辑在解析阶段完成;
目标:经过解析sql之后的所有sql是确定的。
  • Loading branch information
xuchao committed Mar 23, 2020
1 parent f3ee3ce commit 14bc810
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 156 deletions.
8 changes: 7 additions & 1 deletion core/src/main/java/com/dtstack/flink/sql/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ public static void main(String[] args) throws Exception {
env.execute(name);
}

private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
private static void sqlTranslation(String localSqlPluginPath,
StreamTableEnvironment tableEnv,
SqlTree sqlTree,
Map<String, SideTableInfo> sideTableMap,
Map<String, Table> registerTableCache,
StreamQueryConfig queryConfig) throws Exception {

SideSqlExec sideSqlExec = new SideSqlExec();
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,13 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(table, fieldName);
}

@Override
public String toString() {
return "FieldInfo{" +
"table='" + table + '\'' +
", fieldName='" + fieldName + '\'' +
", typeInformation=" + typeInformation +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class JoinInfo implements Serializable {
private static final long serialVersionUID = -1L;

//左表是否是维表
private boolean leftIsSideTable;
private boolean leftIsSideTable = false;

//右表是否是维表
private boolean rightIsSideTable;
Expand Down
54 changes: 23 additions & 31 deletions core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public JoinNodeDealer(SideSQLParser sideSQLParser){
* 解析 join 操作
* @param joinNode
* @param sideTableSet 标明哪些表名是维表
* @param queueInfo
* @param queueInfo sql执行队列
* @param parentWhere join 关联的最上层的where 节点
* @param parentSelectList join 关联的最上层的select 节点
* @param joinFieldSet
Expand All @@ -99,18 +99,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
String rightTableName = "";
String rightTableAlias = "";

//TODO 含义需要更明确
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();

//如果是连续join 判断是否已经处理过添加到执行队列
//抽取join中的的条件
extractJoinField(joinNode.getCondition(), joinFieldSet);

if(leftNode.getKind() == IDENTIFIER){
leftTbName = leftNode.toString();
} else if (leftNode.getKind() == JOIN) {
if (leftNode.getKind() == JOIN) {
//处理连续join
dealNestJoin(joinNode, sideTableSet,
queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList, fieldReplaceRef);
queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList);
leftNode = joinNode.getLeft();
}

Expand All @@ -135,24 +130,16 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
JoinInfo tableInfo = new JoinInfo();
tableInfo.setLeftTableName(leftTbName);
tableInfo.setRightTableName(rightTableName);
if (StringUtils.isEmpty(leftTbAlias)){
tableInfo.setLeftTableAlias(leftTbName);
} else {
tableInfo.setLeftTableAlias(leftTbAlias);
}

if (StringUtils.isEmpty(rightTableAlias)){
tableInfo.setRightTableAlias(rightTableName);
} else {
tableInfo.setRightTableAlias(rightTableAlias);
}
leftTbAlias = StringUtils.isEmpty(leftTbAlias) ? leftTbName : leftTbAlias;
rightTableAlias = StringUtils.isEmpty(rightTableAlias) ? rightTableName : rightTableAlias;

tableInfo.setLeftIsSideTable(leftIsSide);
tableInfo.setLeftTableAlias(leftTbAlias);
tableInfo.setRightTableAlias(rightTableAlias);
tableInfo.setRightIsSideTable(rightIsSide);
tableInfo.setLeftNode(leftNode);
tableInfo.setRightNode(rightNode);
tableInfo.setJoinType(joinType);

tableInfo.setCondition(joinNode.getCondition());
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);

Expand Down Expand Up @@ -216,6 +203,15 @@ public void extractJoinNeedSelectField(SqlNode leftNode,
tableInfo.setRightSelectFieldInfo(rightTbSelectField);
}

/**
* 指定的节点关联到的 select 中的字段和 where中的字段
* @param sqlNode
* @param parentWhere
* @param parentSelectList
* @param tableRef
* @param joinFieldSet
* @return
*/
public Set<String> extractField(SqlNode sqlNode,
SqlNode parentWhere,
SqlNodeList parentSelectList,
Expand Down Expand Up @@ -250,8 +246,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
Set<Tuple2<String, String>> joinFieldSet,
Map<String, String> tableRef,
Map<String, String> fieldRef,
SqlNodeList parentSelectList,
HashBiMap<String, String> fieldReplaceRef){
SqlNodeList parentSelectList){

SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
SqlNode parentRightJoinNode = joinNode.getRight();
Expand All @@ -267,7 +262,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
SqlBasicCall buildAs = TableUtils.buildAsNodeByJoinInfo(joinInfo, null, null);

if(rightIsSide){
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, fieldReplaceRef, tableRef);
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, tableRef);
}

SqlNode newLeftNode = joinNode.getLeft();
Expand All @@ -280,7 +275,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,

//替换leftNode 为新的查询
joinNode.setLeft(buildAs);
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef);
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere);
}

return joinInfo;
Expand All @@ -294,15 +289,13 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
* @param joinNode
* @param parentSelectList
* @param parentWhere
* @param fieldReplaceRef
* @param tableRef
*/
public void addSideInfoToExeQueue(Queue<Object> queueInfo,
JoinInfo joinInfo,
SqlJoin joinNode,
SqlNodeList parentSelectList,
SqlNode parentWhere,
HashBiMap<String, String> fieldReplaceRef,
Map<String, String> tableRef){
//只处理维表
if(!joinInfo.isRightIsSideTable()){
Expand All @@ -315,7 +308,7 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
//替换左表为新的表名称
joinNode.setLeft(buildAs);

replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef);
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere);
}

/**
Expand All @@ -325,14 +318,12 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
* @param tableRef
* @param parentSelectList
* @param parentWhere
* @param fieldReplaceRef
*/
public void replaceSelectAndWhereField(SqlBasicCall buildAs,
SqlNode leftJoinNode,
Map<String, String> tableRef,
SqlNodeList parentSelectList,
SqlNode parentWhere,
HashBiMap<String, String> fieldReplaceRef){
SqlNode parentWhere){

String newLeftTableName = buildAs.getOperands()[1].toString();
Set<String> fromTableNameSet = Sets.newHashSet();
Expand All @@ -343,6 +334,7 @@ public void replaceSelectAndWhereField(SqlBasicCall buildAs,
}

//替换select field 中的对应字段
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
for(SqlNode sqlNode : parentSelectList.getList()){
for(String tbTmp : fromTableNameSet) {
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);
Expand Down
73 changes: 12 additions & 61 deletions core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,48 +90,16 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
return queueInfo;
}

private void checkAndReplaceMultiJoin(SqlNode sqlNode, Set<String> sideTableSet) {
SqlKind sqlKind = sqlNode.getKind();
switch (sqlKind) {
case WITH: {
SqlWith sqlWith = (SqlWith) sqlNode;
SqlNodeList sqlNodeList = sqlWith.withList;
for (SqlNode withAsTable : sqlNodeList) {
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
checkAndReplaceMultiJoin(sqlWithItem.query, sideTableSet);
}
checkAndReplaceMultiJoin(sqlWith.body, sideTableSet);
break;
}
case INSERT:
SqlNode sqlSource = ((SqlInsert) sqlNode).getSource();
checkAndReplaceMultiJoin(sqlSource, sideTableSet);
break;
case SELECT:
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
if (sqlFrom.getKind() != IDENTIFIER) {
checkAndReplaceMultiJoin(sqlFrom, sideTableSet);
}
break;
case JOIN:
convertSideJoinToNewQuery((SqlJoin) sqlNode, sideTableSet);
break;
case AS:
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
if (info.getKind() != IDENTIFIER) {
checkAndReplaceMultiJoin(info, sideTableSet);
}
break;
case UNION:
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
checkAndReplaceMultiJoin(unionLeft, sideTableSet);
checkAndReplaceMultiJoin(unionRight, sideTableSet);
break;
}
}


/**
* 解析 sql 根据维表 join关系重新组装新的sql
* @param sqlNode
* @param sideTableSet
* @param queueInfo
* @param parentWhere
* @param parentSelectList
* @return
*/
public Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo, SqlNode parentWhere, SqlNodeList parentSelectList){
SqlKind sqlKind = sqlNode.getKind();
switch (sqlKind){
Expand Down Expand Up @@ -175,7 +143,9 @@ public Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
JoinNodeDealer joinNodeDealer = new JoinNodeDealer(this);
Set<Tuple2<String, String>> joinFieldSet = Sets.newHashSet();
Map<String, String> tableRef = Maps.newHashMap();
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef);
Map<String, String> fieldRef = Maps.newHashMap();
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef);
case AS:
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
Expand Down Expand Up @@ -207,26 +177,7 @@ public Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
return "";
}

private AliasInfo getSqlNodeAliasInfo(SqlNode sqlNode) {
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
String infoStr = info.getKind() == IDENTIFIER ? info.toString() : null;

AliasInfo aliasInfo = new AliasInfo();
aliasInfo.setName(infoStr);
aliasInfo.setAlias(alias.toString());
return aliasInfo;
}

/**
* 将和维表关联的join 替换为一个新的查询
* @param sqlNode
* @param sideTableSet
*/
private void convertSideJoinToNewQuery(SqlJoin sqlNode, Set<String> sideTableSet) {
checkAndReplaceMultiJoin(sqlNode.getLeft(), sideTableSet);
checkAndReplaceMultiJoin(sqlNode.getRight(), sideTableSet);
}


public void setLocalTableCache(Map<String, Table> localTableCache) {
Expand Down
68 changes: 6 additions & 62 deletions core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ public class SideSqlExec {

private Map<String, Table> localTableCache = Maps.newHashMap();

public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
Map<String, Table> tableCache, StreamQueryConfig queryConfig, CreateTmpTableParser.SqlParserResult createView) throws Exception {
public void exec(String sql,
Map<String, SideTableInfo> sideTableMap,
StreamTableEnvironment tableEnv,
Map<String, Table> tableCache,
StreamQueryConfig queryConfig,
CreateTmpTableParser.SqlParserResult createView) throws Exception {
if(localSqlPluginPath == null){
throw new RuntimeException("need to set localSqlPluginPath");
}
Expand Down Expand Up @@ -192,66 +196,6 @@ private FieldReplaceInfo parseAsQuery(SqlBasicCall asSqlNode, Map<String, Table>
}


/**
* 添加字段别名
* @param pollSqlNode
* @param fieldList
* @param mappingTable
*/
private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, HashBasedTable<String, String, String> mappingTable) {
SqlKind sqlKind = pollSqlNode.getKind();
switch (sqlKind) {
case INSERT:
SqlNode source = ((SqlInsert) pollSqlNode).getSource();
addAliasForFieldNode(source, fieldList, mappingTable);
break;
case AS:
addAliasForFieldNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable);
break;
case SELECT:
SqlNodeList selectList = ((SqlSelect) pollSqlNode).getSelectList();
selectList.getList().forEach(node -> {
if (node.getKind() == IDENTIFIER) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
if (sqlIdentifier.names.size() == 1) {
return;
}
// save real field
String fieldName = sqlIdentifier.names.get(1);
if (!fieldName.endsWith("0") || fieldName.endsWith("0") && mappingTable.columnMap().containsKey(fieldName)) {
fieldList.add(fieldName);
}

}
});
for (int i = 0; i < selectList.getList().size(); i++) {
SqlNode node = selectList.get(i);
if (node.getKind() == IDENTIFIER) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
if (sqlIdentifier.names.size() == 1) {
return;
}
String name = sqlIdentifier.names.get(1);
// avoid real field pv0 convert pv
if (name.endsWith("0") && !fieldList.contains(name) && !fieldList.contains(name.substring(0, name.length() - 1))) {
SqlOperator operator = new SqlAsOperator();
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);

SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(name.substring(0, name.length() - 1), null, sqlParserPos);
SqlNode[] sqlNodes = new SqlNode[2];
sqlNodes[0] = sqlIdentifier;
sqlNodes[1] = sqlIdentifierAlias;
SqlBasicCall sqlBasicCall = new SqlBasicCall(operator, sqlNodes, sqlParserPos);

selectList.set(i, sqlBasicCall);
}
}
}
break;
}
}


public AliasInfo parseASNode(SqlNode sqlNode) throws SqlParseException {
SqlKind sqlKind = sqlNode.getKind();
if(sqlKind != AS){
Expand Down
Loading

0 comments on commit 14bc810

Please sign in to comment.