From 93f9d79e6b22124128f603a76338aaf66b976142 Mon Sep 17 00:00:00 2001 From: maqi Date: Fri, 13 Mar 2020 20:15:15 +0800 Subject: [PATCH 1/5] deal char length --- .../flink/sql/table/AbsTableParser.java | 18 ++++++++++++-- .../dtstack/flink/sql/table/TableInfo.java | 20 ++++++++++++++++ .../sql/side/oracle/OracleAsyncSideInfo.java | 24 +++++++++++++++++++ .../sql/side/rdb/async/RdbAsyncSideInfo.java | 22 +++++++++++++---- 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java b/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java index 3f4d1217d..dfaf5decc 100644 --- a/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java @@ -42,9 +42,11 @@ public abstract class AbsTableParser { private static final String PRIMARY_KEY = "primaryKey"; private static final String NEST_JSON_FIELD_KEY = "nestFieldKey"; + private static final String CHAR_TYPE_NO_LENGTH = "CHAR"; private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)"); private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$"); + private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$"); private Map patternMap = Maps.newHashMap(); @@ -105,13 +107,25 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){ System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1); String fieldName = String.join(" ", filedNameArr); String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim(); - Class fieldClass = dbTypeConvertToJavaType(fieldType); + + + Class fieldClass = null; + TableInfo.FieldExtraInfo fieldExtraInfo = null; + + Matcher matcher = charTypePattern.matcher(fieldType); + if (matcher.find()) { + fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); + fieldExtraInfo = new TableInfo.FieldExtraInfo(); + fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1))); + } else { + fieldClass = dbTypeConvertToJavaType(fieldType); + } tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]); tableInfo.addField(fieldName); tableInfo.addFieldClass(fieldClass); tableInfo.addFieldType(fieldType); - tableInfo.addFieldExtraInfo(null); + tableInfo.addFieldExtraInfo(fieldExtraInfo); } tableInfo.finish(); diff --git a/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java b/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java index 59e4fdd39..2fdc297a2 100644 --- a/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java +++ b/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java @@ -194,6 +194,18 @@ public static class FieldExtraInfo implements Serializable { * default false:allow field is null */ boolean notNull = false; + /** + * field length,eg.char(4) + */ + int length; + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } public boolean getNotNull() { return notNull; @@ -202,5 +214,13 @@ public boolean getNotNull() { public void setNotNull(boolean notNull) { this.notNull = notNull; } + + @Override + public String toString() { + return "FieldExtraInfo{" + + "notNull=" + notNull + + ", length=" + length + + '}'; + } } } diff --git a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java index b811cf783..82d5640ce 100644 --- a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java +++ b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java @@ -23,9 +23,12 @@ import com.dtstack.flink.sql.side.SideTableInfo; import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo; import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.util.DtStringUtil; import com.dtstack.flink.sql.util.ParseUtils; +import com.mchange.lang.CharUtils; import org.apache.calcite.sql.SqlNode; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.typeutils.RowTypeInfo; import com.google.common.collect.Lists; @@ -35,6 +38,13 @@ public class OracleAsyncSideInfo extends RdbAsyncSideInfo { + private final static String SQL_DEFAULT_PLACEHOLDER = " ? "; + private final static String DEAL_CHAR_KEY = "char"; + + private static String rpadFormat = "rpad(?, %d, ' ')"; + + + public OracleAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo); } @@ -49,4 +59,18 @@ public String quoteIdentifier(String identifier) { return "\"" + identifier + "\""; } + @Override + public String wrapperPlaceholder(String fieldName) { + int pos = sideTableInfo.getFieldList().indexOf(fieldName); + String type = sideTableInfo.getFieldTypeList().get(pos); + + if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { + TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos); + int charLength = fieldExtraInfo.getLength(); + if (fieldExtraInfo.getLength() > 0) { + return String.format(rpadFormat, charLength); + } + } + return SQL_DEFAULT_PLACEHOLDER; + } } diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java index 0fe3eb0b9..d853d39fb 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java @@ -136,14 +136,26 @@ public String getAdditionalWhereClause() { public String getSelectFromStatement(String tableName, List selectFields, List conditionFields, List sqlJoinCompareOperate, List predicateInfoes) { - String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", ")); - String whereClause = conditionFields.stream().map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + " ? ") + String fromClause = selectFields.stream() + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + + String whereClause = conditionFields.stream() + .map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f)) + .collect(Collectors.joining(" AND ")); + + String predicateClause = predicateInfoes.stream() + .map(this::buildFilterCondition) .collect(Collectors.joining(" AND ")); - String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND ")); - String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "") + String dimQuerySql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "") + (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause(); - return sql; + + return dimQuerySql; + } + + public String wrapperPlaceholder(String fieldName) { + return " ? "; } public String buildFilterCondition(PredicateInfo info) { From 31e706862ec103594203084b7b4c1acd289b5f35 Mon Sep 17 00:00:00 2001 From: maqi Date: Mon, 16 Mar 2020 11:42:23 +0800 Subject: [PATCH 2/5] use rpad deal char --- .../sql/side/oracle/OracleAsyncSideInfo.java | 14 +++---- .../flink/sql/sink/oracle/OracleSink.java | 38 ++++++++++++++++--- .../dtstack/flink/sql/sink/rdb/RdbSink.java | 6 +-- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java index 82d5640ce..5ff27c542 100644 --- a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java +++ b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java @@ -38,11 +38,9 @@ public class OracleAsyncSideInfo extends RdbAsyncSideInfo { - private final static String SQL_DEFAULT_PLACEHOLDER = " ? "; - private final static String DEAL_CHAR_KEY = "char"; - - private static String rpadFormat = "rpad(?, %d, ' ')"; - + private final String SQL_DEFAULT_PLACEHOLDER = " ? "; + private final String DEAL_CHAR_KEY = "char"; + private String RPAD_FORMAT = "rpad(?, %d, ' ')"; public OracleAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { @@ -66,9 +64,9 @@ public String wrapperPlaceholder(String fieldName) { if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos); - int charLength = fieldExtraInfo.getLength(); - if (fieldExtraInfo.getLength() > 0) { - return String.format(rpadFormat, charLength); + int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); + if (charLength > 0) { + return String.format(RPAD_FORMAT, charLength); } } return SQL_DEFAULT_PLACEHOLDER; diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java index c30dc9c60..5a82fdf47 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java @@ -21,14 +21,17 @@ import com.dtstack.flink.sql.sink.rdb.RdbSink; import com.dtstack.flink.sql.sink.rdb.format.ExtendOutputFormat; import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.commons.lang3.StringUtils; import com.google.common.collect.Lists; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Reason: @@ -40,6 +43,10 @@ public class OracleSink extends RdbSink implements IStreamSinkGener { private static final String ORACLE_DRIVER = "oracle.jdbc.driver.OracleDriver"; + private final String SQL_DEFAULT_PLACEHOLDER = " ? "; + private final String DEAL_CHAR_KEY = "char"; + private String RPAD_FORMAT = " rpad(?, %d, ' ') "; + @Override public String getDriverName() { return ORACLE_DRIVER; @@ -193,16 +200,35 @@ public String updateKeySql(Map> updateKey) { */ public String makeValues(List column) { StringBuilder sb = new StringBuilder("SELECT "); - for (int i = 0; i < column.size(); ++i) { - if (i != 0) { - sb.append(","); + String collect = column.stream() + .map(col -> wrapperPlaceholder(col) + DtStringUtil.addQuoteForStr(col)) + .collect(Collectors.joining(", ")); + + sb.append(collect).append(" FROM DUAL"); + return sb.toString(); + } + + /** + * char type is wrapped with rpad + * @param fieldName + * @return + */ + public String wrapperPlaceholder(String fieldName) { + int pos = rdbTableInfo.getFieldList().indexOf(fieldName); + String type = rdbTableInfo.getFieldTypeList().get(pos); + + if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { + TableInfo.FieldExtraInfo fieldExtraInfo = rdbTableInfo.getFieldExtraInfoList().get(pos); + int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); + if (charLength > 0) { + return String.format(RPAD_FORMAT, charLength); } - sb.append("? " + DtStringUtil.addQuoteForStr(column.get(i))); } - sb.append(" FROM DUAL"); - return sb.toString(); + return SQL_DEFAULT_PLACEHOLDER; } + + public boolean containsIgnoreCase(List l, String s) { Iterator it = l.iterator(); while (it.hasNext()) { diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java index babff68f9..e689169ea 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java @@ -83,6 +83,8 @@ public abstract class RdbSink implements RetractStreamTableSink, Serializab private String schema; + protected RdbTableInfo rdbTableInfo; + public RichSinkFunction createJdbcSinkFunc() { if (driverName == null || dbURL == null || userName == null || password == null || sqlTypes == null || tableName == null) { @@ -111,8 +113,7 @@ public RichSinkFunction createJdbcSinkFunc() { @Override public RdbSink genStreamSink(TargetTableInfo targetTableInfo) { - RdbTableInfo rdbTableInfo = (RdbTableInfo) targetTableInfo; - + this.rdbTableInfo = (RdbTableInfo) targetTableInfo; String tmpDbURL = rdbTableInfo.getUrl(); String tmpUserName = rdbTableInfo.getUserName(); String tmpPassword = rdbTableInfo.getPassword(); @@ -263,7 +264,6 @@ public void setDbType(String dbType) { /** * sqlserver and oracle maybe implement * - * @param tableName * @param fieldNames * @param realIndexes * @return From 7c3f0a268d1fad776168ebb3b721653f7043ca97 Mon Sep 17 00:00:00 2001 From: maqi Date: Wed, 18 Mar 2020 18:21:06 +0800 Subject: [PATCH 3/5] field length check --- .../sql/side/oracle/OracleAsyncSideInfo.java | 20 ++++++------------- .../sql/side/rdb/table/RdbSideTableInfo.java | 2 ++ .../sql/sink/rdb/table/RdbTableInfo.java | 2 ++ 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java index 5ff27c542..8e801970f 100644 --- a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java +++ b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java @@ -25,24 +25,13 @@ import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.util.DtStringUtil; -import com.dtstack.flink.sql.util.ParseUtils; -import com.mchange.lang.CharUtils; -import org.apache.calcite.sql.SqlNode; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import com.google.common.collect.Lists; - -import java.util.Arrays; import java.util.List; public class OracleAsyncSideInfo extends RdbAsyncSideInfo { - private final String SQL_DEFAULT_PLACEHOLDER = " ? "; - private final String DEAL_CHAR_KEY = "char"; - private String RPAD_FORMAT = "rpad(?, %d, ' ')"; - - public OracleAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, SideTableInfo sideTableInfo) { super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo); } @@ -62,13 +51,16 @@ public String wrapperPlaceholder(String fieldName) { int pos = sideTableInfo.getFieldList().indexOf(fieldName); String type = sideTableInfo.getFieldTypeList().get(pos); - if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { + String sqlDefaultPlaceholder = " ? "; + String rpadFormat = "rpad(?, %d, ' ')"; + + if (StringUtils.contains(type.toLowerCase(), "char")) { TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos); int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); if (charLength > 0) { - return String.format(RPAD_FORMAT, charLength); + return String.format(rpadFormat, charLength); } } - return SQL_DEFAULT_PLACEHOLDER; + return sqlDefaultPlaceholder; } } diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java index 0dfbef325..2c8c18fcd 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java @@ -47,6 +47,8 @@ public boolean check() { Preconditions.checkNotNull(tableName, "rdb of tableName is required"); Preconditions.checkNotNull(userName, "rdb of userName is required"); Preconditions.checkNotNull(password, "rdb of password is required"); + Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(), + "fields and fieldExtraInfoList attributes must be the same length"); return true; } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java index d43104fcb..c9dd3f5d5 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java @@ -144,6 +144,8 @@ public boolean check() { Preconditions.checkNotNull(tableName, "rdb field of tableName is required"); Preconditions.checkNotNull(userName, "rdb field of userName is required"); Preconditions.checkNotNull(password, "rdb field of password is required"); + Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(), + "fields and fieldExtraInfoList attributes must be the same length"); return true; } From 05723687299f0c00b1ef33dec7179e9e10512a3a Mon Sep 17 00:00:00 2001 From: maqi Date: Tue, 24 Mar 2020 10:43:32 +0800 Subject: [PATCH 4/5] oracle char type --- .../java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java | 1 + .../java/com/dtstack/flink/sql/sink/oracle/OracleSink.java | 4 ---- .../src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java | 1 + 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java index d51e33056..bc248f8b7 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java @@ -19,6 +19,7 @@ package com.dtstack.flink.sql.sink.oracle; import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.commons.lang3.StringUtils; diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java index db526ce79..ee0239a1f 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java @@ -31,10 +31,6 @@ */ public class OracleSink extends RdbSink implements IStreamSinkGener { - private final String SQL_DEFAULT_PLACEHOLDER = " ? "; - private final String DEAL_CHAR_KEY = "char"; - private String RPAD_FORMAT = " rpad(?, %d, ' ') "; - public OracleSink() { super(new OracleDialect()); } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java index a03bc81b6..ddd929d2c 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java @@ -106,6 +106,7 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) { this.sqlTypes = JDBCTypeConvertUtils.buildSqlTypes(fieldTypeArray); this.allReplace = rdbTableInfo.isAllReplace(); this.updateMode = rdbTableInfo.getUpdateMode(); + rdbTableInfo.getFieldList(); return this; } From 04028d5914fb245301f4faec51cb4fb4a2131783 Mon Sep 17 00:00:00 2001 From: maqi Date: Wed, 25 Mar 2020 12:38:45 +0800 Subject: [PATCH 5/5] deal oracle char type --- .../flink/sql/sink/oracle/OracleDialect.java | 88 +++++++++++++++---- .../flink/sql/sink/oracle/OracleSink.java | 4 + .../dtstack/flink/sql/sink/rdb/RdbSink.java | 9 +- 3 files changed, 83 insertions(+), 18 deletions(-) diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java index bc248f8b7..a687aa012 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java @@ -35,6 +35,14 @@ */ public class OracleDialect implements JDBCDialect { + private final String SQL_DEFAULT_PLACEHOLDER = " ? "; + private final String DEAL_CHAR_KEY = "char"; + private String RPAD_FORMAT = " rpad(?, %d, ' ') "; + + private List fieldList; + private List fieldTypeList; + private List fieldExtraInfoList; + @Override public boolean canHandle(String url) { return url.startsWith("jdbc:oracle:"); @@ -48,27 +56,31 @@ public Optional defaultDriverName() { @Override public Optional getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) { tableName = DtStringUtil.getTableFullPath(schema, tableName); - StringBuilder sb = new StringBuilder(); - sb.append("MERGE INTO " + tableName + " T1 USING " - + "(" + buildDualQueryStatement(fieldNames) + ") T2 ON (" - + buildConnectionConditions(uniqueKeyFields) + ") "); + StringBuilder mergeIntoSql = new StringBuilder(); + mergeIntoSql.append("MERGE INTO " + tableName + " T1 USING (") + .append(buildDualQueryStatement(fieldNames)) + .append(") T2 ON (") + .append(buildConnectionConditions(uniqueKeyFields) + ") "); String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace); if (StringUtils.isNotEmpty(updateSql)) { - sb.append(" WHEN MATCHED THEN UPDATE SET "); - sb.append(updateSql); + mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET "); + mergeIntoSql.append(updateSql); } - sb.append(" WHEN NOT MATCHED THEN " - + "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES (" - + Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")"); + mergeIntoSql.append(" WHEN NOT MATCHED THEN ") + .append("INSERT (") + .append(Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(","))) + .append(") VALUES (") + .append(Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(","))) + .append(")"); - return Optional.of(sb.toString()); + return Optional.of(mergeIntoSql.toString()); } /** - * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") + * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") * @param fieldNames * @param uniqueKeyFields * @param allReplace @@ -76,11 +88,18 @@ public Optional getUpsertStatement(String schema, String tableName, Stri */ private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) { List uniqueKeyList = Arrays.asList(uniqueKeyFields); - return Arrays.stream(fieldNames).filter(col -> !uniqueKeyList.contains(col)).map(col -> { - return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) : - quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + "," - + quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")"; - }).collect(Collectors.joining(",")); + String updateConnectionSql = Arrays.stream(fieldNames). + filter(col -> !uniqueKeyList.contains(col)) + .map(col -> buildConnectionByAllReplace(allReplace, col)) + .collect(Collectors.joining(",")); + return updateConnectionSql; + } + + private String buildConnectionByAllReplace(boolean allReplace, String col) { + String conncetionSql = allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) : + quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + "," + + quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")"; + return conncetionSql; } @@ -96,8 +115,43 @@ private String buildConnectionConditions(String[] uniqueKeyFields) { */ public String buildDualQueryStatement(String[] column) { StringBuilder sb = new StringBuilder("SELECT "); - String collect = Arrays.stream(column).map(col -> " ? " + quoteIdentifier(col)).collect(Collectors.joining(", ")); + String collect = Arrays.stream(column) + .map(col -> wrapperPlaceholder(col) + quoteIdentifier(col)) + .collect(Collectors.joining(", ")); sb.append(collect).append(" FROM DUAL"); return sb.toString(); } + + + /** + * char type is wrapped with rpad + * @param fieldName + * @return + */ + public String wrapperPlaceholder(String fieldName) { + int pos = fieldList.indexOf(fieldName); + String type = fieldTypeList.get(pos); + + if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { + TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos); + int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); + if (charLength > 0) { + return String.format(RPAD_FORMAT, charLength); + } + } + return SQL_DEFAULT_PLACEHOLDER; + } + + + public void setFieldList(List fieldList) { + this.fieldList = fieldList; + } + + public void setFieldTypeList(List fieldTypeList) { + this.fieldTypeList = fieldTypeList; + } + + public void setFieldExtraInfoList(List fieldExtraInfoList) { + this.fieldExtraInfoList = fieldExtraInfoList; + } } diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java index ee0239a1f..7a37aa6cd 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java @@ -37,6 +37,10 @@ public OracleSink() { @Override public JDBCUpsertOutputFormat getOutputFormat() { + ((OracleDialect) jdbcDialect).setFieldList(fieldList); + ((OracleDialect) jdbcDialect).setFieldTypeList(fieldTypeList); + ((OracleDialect) jdbcDialect).setFieldExtraInfoList(fieldExtraInfoList); + JDBCOptions jdbcOptions = JDBCOptions.builder() .setDBUrl(dbURL) .setDialect(jdbcDialect) diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java index ddd929d2c..84eea68ff 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java @@ -20,6 +20,7 @@ import com.dtstack.flink.sql.sink.IStreamSinkGener; import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat; import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.table.TargetTableInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -83,6 +84,10 @@ public abstract class RdbSink implements RetractStreamTableSink, Serializab protected String updateMode; + public List fieldList; + public List fieldTypeList; + public List fieldExtraInfoList; + public RdbSink(JDBCDialect jdbcDialect) { this.jdbcDialect = jdbcDialect; } @@ -106,7 +111,9 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) { this.sqlTypes = JDBCTypeConvertUtils.buildSqlTypes(fieldTypeArray); this.allReplace = rdbTableInfo.isAllReplace(); this.updateMode = rdbTableInfo.getUpdateMode(); - rdbTableInfo.getFieldList(); + this.fieldList = rdbTableInfo.getFieldList(); + this.fieldTypeList = rdbTableInfo.getFieldTypeList(); + this.fieldExtraInfoList = rdbTableInfo.getFieldExtraInfoList(); return this; }