Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Apr 8, 2020
1 parent b43aec0 commit 8e4196a
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

import com.aiweiergou.tool.logger.api.ChangeLogLevelProcess;
import com.dtstack.flink.sql.parser.CreateFuncParser;
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
import com.dtstack.flink.sql.parser.FlinkPlanner;
import com.dtstack.flink.sql.parser.InsertSqlParser;
import com.dtstack.flink.sql.parser.SqlParser;
import com.dtstack.flink.sql.parser.SqlTree;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -63,17 +68,6 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Set;

import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
import static org.apache.calcite.sql.SqlKind.LITERAL;

/**
* Parsing sql, obtain execution information dimension table
Expand Down
30 changes: 9 additions & 21 deletions core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,20 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlWithItem;
import org.apache.calcite.sql.fun.SqlCase;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -168,8 +157,7 @@ public void exec(String sql,

}else if (pollObj instanceof JoinInfo){
LOG.info("----------exec join info----------\n{}", pollObj.toString());
preIsSideJoin = true;
joinFun(pollObj, localTableCache, sideTableMap, tableEnv, replaceInfoList);
joinFun(pollObj, localTableCache, sideTableMap, tableEnv);
}
}

Expand Down Expand Up @@ -295,23 +283,23 @@ private Table getTableFromCache(Map<String, Table> localTableCache, String table
*
* @return
*/
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, SideTableInfo sideTableInfo) {
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, AbstractSideTableInfo sideTableInfo) {
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
if(CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))){
return true;
}
return false;
}

private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo) {
private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
List<String> res = Lists.newArrayList();
sideTableInfo.getPrimaryKeys().forEach(field -> {
res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field));
});
return res;
}

public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, SideTableInfo sideTableInfo){
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, AbstractSideTableInfo sideTableInfo){
List<SqlNode> sqlNodeList = Lists.newArrayList();
ParseUtils.parseAnd(conditionNode, sqlNodeList);
List<String> conditionFields = Lists.newArrayList();
Expand Down Expand Up @@ -370,7 +358,7 @@ protected void dealAsSourceTable(StreamTableEnvironment tableEnv,

private void joinFun(Object pollObj,
Map<String, Table> localTableCache,
Map<String, SideTableInfo> sideTableMap,
Map<String, AbstractSideTableInfo> sideTableMap,
StreamTableEnvironment tableEnv) throws Exception{
JoinInfo joinInfo = (JoinInfo) pollObj;

Expand All @@ -386,7 +374,7 @@ private void joinFun(Object pollObj,
JoinScope.ScopeChild rightScopeChild = new JoinScope.ScopeChild();
rightScopeChild.setAlias(joinInfo.getRightTableAlias());
rightScopeChild.setTableName(joinInfo.getRightTableName());
SideTableInfo sideTableInfo = sideTableMap.get(joinInfo.getRightTableName());
AbstractSideTableInfo sideTableInfo = sideTableMap.get(joinInfo.getRightTableName());
if(sideTableInfo == null){
sideTableInfo = sideTableMap.get(joinInfo.getRightTableAlias());
}
Expand All @@ -395,9 +383,9 @@ private void joinFun(Object pollObj,
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
}

// if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
// throw new RuntimeException("ON condition must contain all equal fields!!!");
// }
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
throw new RuntimeException("ON condition must contain all equal fields!!!");
}

rightScopeChild.setRowTypeInfo(sideTableInfo.getRowTypeInfo());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){


Class fieldClass = null;
TableInfo.FieldExtraInfo fieldExtraInfo = null;
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;

Matcher matcher = charTypePattern.matcher(fieldType);
if (matcher.find()) {
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
fieldExtraInfo = new TableInfo.FieldExtraInfo();
fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo();
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
} else {
fieldClass = dbTypeConvertToJavaType(fieldType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -56,7 +56,7 @@ public String wrapperPlaceholder(String fieldName) {
String rpadFormat = "rpad(?, %d, ' ')";

if (StringUtils.contains(type.toLowerCase(), "char")) {
TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
if (charLength > 0) {
return String.format(rpadFormat, charLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.dtstack.flink.sql.sink.oracle;

import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -41,7 +41,7 @@ public class OracleDialect implements JDBCDialect {

private List<String> fieldList;
private List<String> fieldTypeList;
private List<TableInfo.FieldExtraInfo> fieldExtraInfoList;
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfoList;

@Override
public boolean canHandle(String url) {
Expand Down Expand Up @@ -133,7 +133,7 @@ public String wrapperPlaceholder(String fieldName) {
String type = fieldTypeList.get(pos);

if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos);
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos);
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
if (charLength > 0) {
return String.format(RPAD_FORMAT, charLength);
Expand All @@ -151,7 +151,7 @@ public void setFieldTypeList(List<String> fieldTypeList) {
this.fieldTypeList = fieldTypeList;
}

public void setFieldExtraInfoList(List<TableInfo.FieldExtraInfo> fieldExtraInfoList) {
public void setFieldExtraInfoList(List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfoList) {
this.fieldExtraInfoList = fieldExtraInfoList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.TargetTableInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -84,20 +83,20 @@ public abstract class AbstractRdbSink implements RetractStreamTableSink<Row>, Se

public List<String> fieldList;
public List<String> fieldTypeList;
public List<TableInfo.FieldExtraInfo> fieldExtraInfoList;
public List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfoList;

public AbstractRdbSink(JDBCDialect jdbcDialect) {
this.jdbcDialect = jdbcDialect;
}

@Override
public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
public AbstractRdbSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
RdbTableInfo rdbTableInfo = (RdbTableInfo) targetTableInfo;
this.batchNum = rdbTableInfo.getBatchSize() == null ? batchNum : rdbTableInfo.getBatchSize();
this.batchWaitInterval = rdbTableInfo.getBatchWaitInterval() == null ?
batchWaitInterval : rdbTableInfo.getBatchWaitInterval();
this.parallelism = rdbTableInfo.getParallelism() == null ? parallelism : rdbTableInfo.getParallelism();
this.dbURL = rdbTableInfo.getUrl();
this.dbUrl = rdbTableInfo.getUrl();
this.userName = rdbTableInfo.getUserName();
this.password = rdbTableInfo.getPassword();
this.tableName = rdbTableInfo.getTableName();
Expand Down

0 comments on commit 8e4196a

Please sign in to comment.