Skip to content

Commit

Permalink
deal oracle char type
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Mar 25, 2020
1 parent 0572368 commit 04028d5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
*/
public class OracleDialect implements JDBCDialect {

private final String SQL_DEFAULT_PLACEHOLDER = " ? ";
private final String DEAL_CHAR_KEY = "char";
private String RPAD_FORMAT = " rpad(?, %d, ' ') ";

private List<String> fieldList;
private List<String> fieldTypeList;
private List<TableInfo.FieldExtraInfo> fieldExtraInfoList;

@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:oracle:");
Expand All @@ -48,39 +56,50 @@ public Optional<String> defaultDriverName() {
@Override
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
tableName = DtStringUtil.getTableFullPath(schema, tableName);
StringBuilder sb = new StringBuilder();
sb.append("MERGE INTO " + tableName + " T1 USING "
+ "(" + buildDualQueryStatement(fieldNames) + ") T2 ON ("
+ buildConnectionConditions(uniqueKeyFields) + ") ");
StringBuilder mergeIntoSql = new StringBuilder();
mergeIntoSql.append("MERGE INTO " + tableName + " T1 USING (")
.append(buildDualQueryStatement(fieldNames))
.append(") T2 ON (")
.append(buildConnectionConditions(uniqueKeyFields) + ") ");

String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);

if (StringUtils.isNotEmpty(updateSql)) {
sb.append(" WHEN MATCHED THEN UPDATE SET ");
sb.append(updateSql);
mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET ");
mergeIntoSql.append(updateSql);
}

sb.append(" WHEN NOT MATCHED THEN "
+ "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES ("
+ Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")");
mergeIntoSql.append(" WHEN NOT MATCHED THEN ")
.append("INSERT (")
.append(Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")))
.append(") VALUES (")
.append(Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")))
.append(")");

return Optional.of(sb.toString());
return Optional.of(mergeIntoSql.toString());
}

/**
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
* @param fieldNames
* @param uniqueKeyFields
* @param allReplace
* @return
*/
private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
return Arrays.stream(fieldNames).filter(col -> !uniqueKeyList.contains(col)).map(col -> {
return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
}).collect(Collectors.joining(","));
String updateConnectionSql = Arrays.stream(fieldNames).
filter(col -> !uniqueKeyList.contains(col))
.map(col -> buildConnectionByAllReplace(allReplace, col))
.collect(Collectors.joining(","));
return updateConnectionSql;
}

private String buildConnectionByAllReplace(boolean allReplace, String col) {
String conncetionSql = allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
return conncetionSql;
}


Expand All @@ -96,8 +115,43 @@ private String buildConnectionConditions(String[] uniqueKeyFields) {
*/
public String buildDualQueryStatement(String[] column) {
StringBuilder sb = new StringBuilder("SELECT ");
String collect = Arrays.stream(column).map(col -> " ? " + quoteIdentifier(col)).collect(Collectors.joining(", "));
String collect = Arrays.stream(column)
.map(col -> wrapperPlaceholder(col) + quoteIdentifier(col))
.collect(Collectors.joining(", "));
sb.append(collect).append(" FROM DUAL");
return sb.toString();
}


/**
* char type is wrapped with rpad
* @param fieldName
* @return
*/
public String wrapperPlaceholder(String fieldName) {
int pos = fieldList.indexOf(fieldName);
String type = fieldTypeList.get(pos);

if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) {
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos);
int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength();
if (charLength > 0) {
return String.format(RPAD_FORMAT, charLength);
}
}
return SQL_DEFAULT_PLACEHOLDER;
}


public void setFieldList(List<String> fieldList) {
this.fieldList = fieldList;
}

public void setFieldTypeList(List<String> fieldTypeList) {
this.fieldTypeList = fieldTypeList;
}

public void setFieldExtraInfoList(List<TableInfo.FieldExtraInfo> fieldExtraInfoList) {
this.fieldExtraInfoList = fieldExtraInfoList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public OracleSink() {

@Override
public JDBCUpsertOutputFormat getOutputFormat() {
((OracleDialect) jdbcDialect).setFieldList(fieldList);
((OracleDialect) jdbcDialect).setFieldTypeList(fieldTypeList);
((OracleDialect) jdbcDialect).setFieldExtraInfoList(fieldExtraInfoList);

JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDBUrl(dbURL)
.setDialect(jdbcDialect)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.TargetTableInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -83,6 +84,10 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab

protected String updateMode;

public List<String> fieldList;
public List<String> fieldTypeList;
public List<TableInfo.FieldExtraInfo> fieldExtraInfoList;

public RdbSink(JDBCDialect jdbcDialect) {
this.jdbcDialect = jdbcDialect;
}
Expand All @@ -106,7 +111,9 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
this.sqlTypes = JDBCTypeConvertUtils.buildSqlTypes(fieldTypeArray);
this.allReplace = rdbTableInfo.isAllReplace();
this.updateMode = rdbTableInfo.getUpdateMode();
rdbTableInfo.getFieldList();
this.fieldList = rdbTableInfo.getFieldList();
this.fieldTypeList = rdbTableInfo.getFieldTypeList();
this.fieldExtraInfoList = rdbTableInfo.getFieldExtraInfoList();
return this;
}

Expand Down

0 comments on commit 04028d5

Please sign in to comment.