Skip to content

Commit

Permalink
Merge branch 'hotfix_1.8_3.10.x_merged23442' into '1.8_release_3.10.x'
Browse files Browse the repository at this point in the history
oralce char type

See merge request dt-insight-engine/flinkStreamSQL!8
  • Loading branch information
todd5167 committed Mar 25, 2020
2 parents 6159bb9 + 04028d5 commit 0462e66
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 34 deletions.
18 changes: 16 additions & 2 deletions core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ public abstract class AbsTableParser {

private static final String PRIMARY_KEY = "primaryKey";
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";

private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");

private Map<String, Pattern> patternMap = Maps.newHashMap();

Expand Down Expand Up @@ -107,13 +109,25 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
String fieldName = String.join(" ", filedNameArr);
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
Class fieldClass = dbTypeConvertToJavaType(fieldType);


Class fieldClass = null;
TableInfo.FieldExtraInfo fieldExtraInfo = null;

Matcher matcher = charTypePattern.matcher(fieldType);
if (matcher.find()) {
fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH);
fieldExtraInfo = new TableInfo.FieldExtraInfo();
fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1)));
} else {
fieldClass = dbTypeConvertToJavaType(fieldType);
}

tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
tableInfo.addField(fieldName);
tableInfo.addFieldClass(fieldClass);
tableInfo.addFieldType(fieldType);
tableInfo.addFieldExtraInfo(null);
tableInfo.addFieldExtraInfo(fieldExtraInfo);
}

tableInfo.finish();
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ public static class FieldExtraInfo implements Serializable {
* default false:allow field is null
*/
boolean notNull = false;
/**
* field length,eg.char(4)
*/
int length;

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

public boolean getNotNull() {
return notNull;
Expand All @@ -202,5 +214,13 @@ public boolean getNotNull() {
public void setNotNull(boolean notNull) {
this.notNull = notNull;
}

@Override
public String toString() {
return "FieldExtraInfo{" +
"notNull=" + notNull +
", length=" + length +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo;
import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.util.DtStringUtil;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;


Expand All @@ -49,4 +46,21 @@ public String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

@Override
public String wrapperPlaceholder(String fieldName) {
int pos = sideTableInfo.getFieldList().indexOf(fieldName);
String type = sideTableInfo.getFieldTypeList().get(pos);

String sqlDefaultPlaceholder = " ? ";
String rpadFormat = "rpad(?, %d, ' ')";

if (StringUtils.contains(type.toLowerCase(), "char")) {
TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos);
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
if (charLength > 0) {
return String.format(rpadFormat, charLength);
}
}
return sqlDefaultPlaceholder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.flink.sql.sink.oracle;

import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.util.DtStringUtil;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -34,6 +35,14 @@
*/
public class OracleDialect implements JDBCDialect {

private final String SQL_DEFAULT_PLACEHOLDER = " ? ";
private final String DEAL_CHAR_KEY = "char";
private String RPAD_FORMAT = " rpad(?, %d, ' ') ";

private List<String> fieldList;
private List<String> fieldTypeList;
private List<TableInfo.FieldExtraInfo> fieldExtraInfoList;

@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle:");
Expand All @@ -47,39 +56,50 @@ public Optional<String> defaultDriverName() {
@Override
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
tableName = DtStringUtil.getTableFullPath(schema, tableName);
StringBuilder sb = new StringBuilder();
sb.append("MERGE INTO " + tableName + " T1 USING "
+ "(" + buildDualQueryStatement(fieldNames) + ") T2 ON ("
+ buildConnectionConditions(uniqueKeyFields) + ") ");
StringBuilder mergeIntoSql = new StringBuilder();
mergeIntoSql.append("MERGE INTO " + tableName + " T1 USING (")
.append(buildDualQueryStatement(fieldNames))
.append(") T2 ON (")
.append(buildConnectionConditions(uniqueKeyFields) + ") ");

String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);

if (StringUtils.isNotEmpty(updateSql)) {
sb.append(" WHEN MATCHED THEN UPDATE SET ");
sb.append(updateSql);
mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET ");
mergeIntoSql.append(updateSql);
}

sb.append(" WHEN NOT MATCHED THEN "
+ "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES ("
+ Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")");
mergeIntoSql.append(" WHEN NOT MATCHED THEN ")
.append("INSERT (")
.append(Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")))
.append(") VALUES (")
.append(Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")))
.append(")");

return Optional.of(sb.toString());
return Optional.of(mergeIntoSql.toString());
}

/**
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
* @param fieldNames
* @param uniqueKeyFields
* @param allReplace
* @return
*/
private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
return Arrays.stream(fieldNames).filter(col -> !uniqueKeyList.contains(col)).map(col -> {
return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
}).collect(Collectors.joining(","));
String updateConnectionSql = Arrays.stream(fieldNames).
filter(col -> !uniqueKeyList.contains(col))
.map(col -> buildConnectionByAllReplace(allReplace, col))
.collect(Collectors.joining(","));
return updateConnectionSql;
}

private String buildConnectionByAllReplace(boolean allReplace, String col) {
String conncetionSql = allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
return conncetionSql;
}


Expand All @@ -95,8 +115,43 @@ private String buildConnectionConditions(String[] uniqueKeyFields) {
*/
public String buildDualQueryStatement(String[] column) {
StringBuilder sb = new StringBuilder("SELECT ");
String collect = Arrays.stream(column).map(col -> " ? " + quoteIdentifier(col)).collect(Collectors.joining(", "));
String collect = Arrays.stream(column)
.map(col -> wrapperPlaceholder(col) + quoteIdentifier(col))
.collect(Collectors.joining(", "));
sb.append(collect).append(" FROM DUAL");
return sb.toString();
}


/**
* char type is wrapped with rpad
* @param fieldName
* @return
*/
public String wrapperPlaceholder(String fieldName) {
int pos = fieldList.indexOf(fieldName);
String type = fieldTypeList.get(pos);

if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos);
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
if (charLength > 0) {
return String.format(RPAD_FORMAT, charLength);
}
}
return SQL_DEFAULT_PLACEHOLDER;
}


public void setFieldList(List<String> fieldList) {
this.fieldList = fieldList;
}

public void setFieldTypeList(List<String> fieldTypeList) {
this.fieldTypeList = fieldTypeList;
}

public void setFieldExtraInfoList(List<TableInfo.FieldExtraInfo> fieldExtraInfoList) {
this.fieldExtraInfoList = fieldExtraInfoList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public OracleSink() {

@Override
public JDBCUpsertOutputFormat getOutputFormat() {
((OracleDialect) jdbcDialect).setFieldList(fieldList);
((OracleDialect) jdbcDialect).setFieldTypeList(fieldTypeList);
((OracleDialect) jdbcDialect).setFieldExtraInfoList(fieldExtraInfoList);

JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDBUrl(dbURL)
.setDialect(jdbcDialect)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,26 @@ public String getAdditionalWhereClause() {

public String getSelectFromStatement(String tableName, List<String> selectFields, List<String> conditionFields, List<String> sqlJoinCompareOperate,
List<PredicateInfo> predicateInfoes) {
String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", "));
String whereClause = conditionFields.stream().map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + " ? ")
String fromClause = selectFields.stream()
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));

String whereClause = conditionFields.stream()
.map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f))
.collect(Collectors.joining(" AND "));

String predicateClause = predicateInfoes.stream()
.map(this::buildFilterCondition)
.collect(Collectors.joining(" AND "));
String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND "));

String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
String dimQuerySql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "")
+ (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause();
return sql;

return dimQuerySql;
}

public String wrapperPlaceholder(String fieldName) {
return " ? ";
}

public String buildFilterCondition(PredicateInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public boolean check() {
Preconditions.checkNotNull(tableName, "rdb of tableName is required");
Preconditions.checkNotNull(userName, "rdb of userName is required");
Preconditions.checkNotNull(password, "rdb of password is required");
Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(),
"fields and fieldExtraInfoList attributes must be the same length");
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.TargetTableInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand All @@ -34,10 +34,6 @@
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;

import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -88,6 +84,10 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab

protected String updateMode;

public List<String> fieldList;
public List<String> fieldTypeList;
public List<TableInfo.FieldExtraInfo> fieldExtraInfoList;

public RdbSink(JDBCDialect jdbcDialect) {
this.jdbcDialect = jdbcDialect;
}
Expand All @@ -111,6 +111,9 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
this.sqlTypes = JDBCTypeConvertUtils.buildSqlTypes(fieldTypeArray);
this.allReplace = rdbTableInfo.isAllReplace();
this.updateMode = rdbTableInfo.getUpdateMode();
this.fieldList = rdbTableInfo.getFieldList();
this.fieldTypeList = rdbTableInfo.getFieldTypeList();
this.fieldExtraInfoList = rdbTableInfo.getFieldExtraInfoList();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public boolean check() {
}


Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(),
"fields and fieldExtraInfoList attributes must be the same length");
return true;
}

Expand Down

0 comments on commit 0462e66

Please sign in to comment.