Skip to content

Commit

Permalink
Merge branch '1.8_release_3.9.x' into 1.8_release_3.10.x
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/src/main/java/com/dtstack/flink/sql/Main.java
#	core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
  • Loading branch information
xuchao committed Apr 3, 2020
2 parents 738a565 + 911637a commit f389ba1
Show file tree
Hide file tree
Showing 17 changed files with 1,215 additions and 740 deletions.
35 changes: 0 additions & 35 deletions core/src/main/java/com/dtstack/flink/sql/config/CalciteConfig.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
package com.dtstack.flink.sql.parser;

import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import com.google.common.collect.Lists;
import org.apache.flink.table.calcite.FlinkPlannerImpl;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -71,17 +70,12 @@ public void parseSql(String sql, SqlTree sqlTree) {
tableName = matcher.group(1);
selectSql = "select " + matcher.group(2);
}

SqlParser.Config config = SqlParser
.configBuilder()
.setLex(Lex.MYSQL)
.build();
SqlParser sqlParser = SqlParser.create(selectSql,config);
FlinkPlannerImpl flinkPlanner = FlinkPlanner.getFlinkPlanner();

SqlNode sqlNode = null;
try {
sqlNode = sqlParser.parseStmt();
} catch (SqlParseException e) {
sqlNode = flinkPlanner.parse(selectSql);
} catch (Exception e) {
throw new RuntimeException("", e);
}

Expand Down
52 changes: 52 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/parser/FlinkPlanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.parser;

import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.calcite.FlinkTypeFactory;

/**
* Date: 2020/3/31
* Company: www.dtstack.com
* @author maqi
*/
public class FlinkPlanner {

public static volatile FlinkPlannerImpl flinkPlanner;

private FlinkPlanner() {
}

public static FlinkPlannerImpl createFlinkPlanner(FrameworkConfig frameworkConfig, RelOptPlanner relOptPlanner, FlinkTypeFactory typeFactory) {
if (flinkPlanner == null) {
synchronized (FlinkPlanner.class) {
if (flinkPlanner == null) {
flinkPlanner = new FlinkPlannerImpl(frameworkConfig, relOptPlanner, typeFactory);
}
}
}
return flinkPlanner;
}

public static FlinkPlannerImpl getFlinkPlanner() {
return flinkPlanner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void parseSql(String sql, SqlTree sqlTree) {
.configBuilder()
.setLex(Lex.MYSQL)
.build();

SqlParser sqlParser = SqlParser.create(sql,config);
SqlNode sqlNode = null;
try {
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,13 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(table, fieldName);
}

@Override
public String toString() {
return "FieldInfo{" +
"table='" + table + '\'' +
", fieldName='" + fieldName + '\'' +
", typeInformation=" + typeInformation +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.HashBasedTable;
import org.apache.commons.lang3.StringUtils;


/**
* 用于记录转换之后的表和原来表直接字段的关联关系
* Date: 2018/8/30
Expand Down Expand Up @@ -78,7 +79,7 @@ public void setTargetTableAlias(String targetTableAlias) {
* @param fieldName
* @return
*/
public String getTargetFieldName(String tableName, String fieldName){
public String getTargetFieldName(String tableName, String fieldName) {
String targetFieldName = mappingTable.get(tableName, fieldName);
if(StringUtils.isNotBlank(targetFieldName)){
return targetFieldName;
Expand Down
47 changes: 38 additions & 9 deletions core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package com.dtstack.flink.sql.side;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlNode;
import com.google.common.base.Strings;
Expand All @@ -31,7 +33,6 @@
* Join信息
* Date: 2018/7/24
* Company: www.dtstack.com
*
* @author xuchao
*/

Expand All @@ -40,9 +41,7 @@ public class JoinInfo implements Serializable {
private static final long serialVersionUID = -1L;

//左表是否是维表
private boolean leftIsSideTable;

private boolean leftIsTmpTable = false;
private boolean leftIsSideTable = false;

//右表是否是维表
private boolean rightIsSideTable;
Expand All @@ -67,6 +66,16 @@ public class JoinInfo implements Serializable {

private JoinType joinType;

/**
* 左表需要查询的字段信息和output的时候对应的列名称
*/
private Map<String, String> leftSelectFieldInfo = Maps.newHashMap();

/**
* 右表需要查询的字段信息和output的时候对应的列名称
*/
private Map<String, String> rightSelectFieldInfo = Maps.newHashMap();

public String getSideTableName(){
if(leftIsSideTable){
return leftTableAlias;
Expand Down Expand Up @@ -195,19 +204,39 @@ public void setJoinType(JoinType joinType) {
this.joinType = joinType;
}

public boolean isLeftIsTmpTable() {
return leftIsTmpTable;
public Map<String, String> getLeftSelectFieldInfo() {
return leftSelectFieldInfo;
}

public void setLeftSelectFieldInfo(Map<String, String> leftSelectFieldInfo) {
this.leftSelectFieldInfo = leftSelectFieldInfo;
}

public void setLeftIsTmpTable(boolean leftIsTmpTable) {
this.leftIsTmpTable = leftIsTmpTable;
public Map<String, String> getRightSelectFieldInfo() {
return rightSelectFieldInfo;
}

public void setRightSelectFieldInfo(Map<String, String> rightSelectFieldInfo) {
this.rightSelectFieldInfo = rightSelectFieldInfo;
}

public HashBasedTable<String, String, String> getTableFieldRef(){
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
getLeftSelectFieldInfo().forEach((key, value) -> {
mappingTable.put(getLeftTableAlias(), key, value);
});

getRightSelectFieldInfo().forEach((key, value) -> {
mappingTable.put(getRightTableAlias(), key, value);
});

return mappingTable;
}

@Override
public String toString() {
return "JoinInfo{" +
"leftIsSideTable=" + leftIsSideTable +
", leftIsTmpTable=" + leftIsTmpTable +
", rightIsSideTable=" + rightIsSideTable +
", leftTableName='" + leftTableName + '\'' +
", leftTableAlias='" + leftTableAlias + '\'' +
Expand Down
Loading

0 comments on commit f389ba1

Please sign in to comment.