diff --git a/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java b/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java index 31e70caa2..3e4027b2c 100644 --- a/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java @@ -43,10 +43,12 @@ public abstract class AbsTableParser { private static final String PRIMARY_KEY = "primaryKey"; private static final String NEST_JSON_FIELD_KEY = "nestFieldKey"; + private static final String CHAR_TYPE_NO_LENGTH = "CHAR"; private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)"); private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$"); private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$"); + private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$"); private Map patternMap = Maps.newHashMap(); @@ -107,13 +109,25 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){ System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1); String fieldName = String.join(" ", filedNameArr); String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim(); - Class fieldClass = dbTypeConvertToJavaType(fieldType); + + + Class fieldClass = null; + TableInfo.FieldExtraInfo fieldExtraInfo = null; + + Matcher matcher = charTypePattern.matcher(fieldType); + if (matcher.find()) { + fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); + fieldExtraInfo = new TableInfo.FieldExtraInfo(); + fieldExtraInfo.setLength(Integer.valueOf(matcher.group(1))); + } else { + fieldClass = dbTypeConvertToJavaType(fieldType); + } tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]); tableInfo.addField(fieldName); tableInfo.addFieldClass(fieldClass); tableInfo.addFieldType(fieldType); - tableInfo.addFieldExtraInfo(null); + tableInfo.addFieldExtraInfo(fieldExtraInfo); } tableInfo.finish(); diff --git a/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java b/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java index 59e4fdd39..2fdc297a2 100644 --- a/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java +++ b/core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java @@ -194,6 +194,18 @@ public static class FieldExtraInfo implements Serializable { * default false:allow field is null */ boolean notNull = false; + /** + * field length,eg.char(4) + */ + int length; + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } public boolean getNotNull() { return notNull; @@ -202,5 +214,13 @@ public boolean getNotNull() { public void setNotNull(boolean notNull) { this.notNull = notNull; } + + @Override + public String toString() { + return "FieldExtraInfo{" + + "notNull=" + notNull + + ", length=" + length + + '}'; + } } } diff --git a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java index b811cf783..8e801970f 100644 --- a/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java +++ b/oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java @@ -23,13 +23,10 @@ import com.dtstack.flink.sql.side.SideTableInfo; import com.dtstack.flink.sql.side.rdb.async.RdbAsyncSideInfo; import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.util.DtStringUtil; -import com.dtstack.flink.sql.util.ParseUtils; -import org.apache.calcite.sql.SqlNode; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import com.google.common.collect.Lists; - -import java.util.Arrays; import java.util.List; @@ -49,4 +46,21 @@ public String quoteIdentifier(String identifier) { return "\"" + identifier + "\""; } + @Override + public String wrapperPlaceholder(String fieldName) { + int pos = sideTableInfo.getFieldList().indexOf(fieldName); + String type = sideTableInfo.getFieldTypeList().get(pos); + + String sqlDefaultPlaceholder = " ? "; + String rpadFormat = "rpad(?, %d, ' ')"; + + if (StringUtils.contains(type.toLowerCase(), "char")) { + TableInfo.FieldExtraInfo fieldExtraInfo = sideTableInfo.getFieldExtraInfoList().get(pos); + int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); + if (charLength > 0) { + return String.format(rpadFormat, charLength); + } + } + return sqlDefaultPlaceholder; + } } diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java index d51e33056..a687aa012 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleDialect.java @@ -19,6 +19,7 @@ package com.dtstack.flink.sql.sink.oracle; import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.commons.lang3.StringUtils; @@ -34,6 +35,14 @@ */ public class OracleDialect implements JDBCDialect { + private final String SQL_DEFAULT_PLACEHOLDER = " ? "; + private final String DEAL_CHAR_KEY = "char"; + private String RPAD_FORMAT = " rpad(?, %d, ' ') "; + + private List fieldList; + private List fieldTypeList; + private List fieldExtraInfoList; + @Override public boolean canHandle(String url) { return url.startsWith("jdbc:oracle:"); @@ -47,27 +56,31 @@ public Optional defaultDriverName() { @Override public Optional getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) { tableName = DtStringUtil.getTableFullPath(schema, tableName); - StringBuilder sb = new StringBuilder(); - sb.append("MERGE INTO " + tableName + " T1 USING " - + "(" + buildDualQueryStatement(fieldNames) + ") T2 ON (" - + buildConnectionConditions(uniqueKeyFields) + ") "); + StringBuilder mergeIntoSql = new StringBuilder(); + mergeIntoSql.append("MERGE INTO " + tableName + " T1 USING (") + .append(buildDualQueryStatement(fieldNames)) + .append(") T2 ON (") + .append(buildConnectionConditions(uniqueKeyFields) + ") "); String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace); if (StringUtils.isNotEmpty(updateSql)) { - sb.append(" WHEN MATCHED THEN UPDATE SET "); - sb.append(updateSql); + mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET "); + mergeIntoSql.append(updateSql); } - sb.append(" WHEN NOT MATCHED THEN " - + "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES (" - + Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")"); + mergeIntoSql.append(" WHEN NOT MATCHED THEN ") + .append("INSERT (") + .append(Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(","))) + .append(") VALUES (") + .append(Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(","))) + .append(")"); - return Optional.of(sb.toString()); + return Optional.of(mergeIntoSql.toString()); } /** - * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") + * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") * @param fieldNames * @param uniqueKeyFields * @param allReplace @@ -75,11 +88,18 @@ public Optional getUpsertStatement(String schema, String tableName, Stri */ private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) { List uniqueKeyList = Arrays.asList(uniqueKeyFields); - return Arrays.stream(fieldNames).filter(col -> !uniqueKeyList.contains(col)).map(col -> { - return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) : - quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + "," - + quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")"; - }).collect(Collectors.joining(",")); + String updateConnectionSql = Arrays.stream(fieldNames). + filter(col -> !uniqueKeyList.contains(col)) + .map(col -> buildConnectionByAllReplace(allReplace, col)) + .collect(Collectors.joining(",")); + return updateConnectionSql; + } + + private String buildConnectionByAllReplace(boolean allReplace, String col) { + String conncetionSql = allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) : + quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + "," + + quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")"; + return conncetionSql; } @@ -95,8 +115,43 @@ private String buildConnectionConditions(String[] uniqueKeyFields) { */ public String buildDualQueryStatement(String[] column) { StringBuilder sb = new StringBuilder("SELECT "); - String collect = Arrays.stream(column).map(col -> " ? " + quoteIdentifier(col)).collect(Collectors.joining(", ")); + String collect = Arrays.stream(column) + .map(col -> wrapperPlaceholder(col) + quoteIdentifier(col)) + .collect(Collectors.joining(", ")); sb.append(collect).append(" FROM DUAL"); return sb.toString(); } + + + /** + * char type is wrapped with rpad + * @param fieldName + * @return + */ + public String wrapperPlaceholder(String fieldName) { + int pos = fieldList.indexOf(fieldName); + String type = fieldTypeList.get(pos); + + if (StringUtils.contains(type.toLowerCase(), DEAL_CHAR_KEY)) { + TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfoList.get(pos); + int charLength = fieldExtraInfo == null ? 0 : fieldExtraInfo.getLength(); + if (charLength > 0) { + return String.format(RPAD_FORMAT, charLength); + } + } + return SQL_DEFAULT_PLACEHOLDER; + } + + + public void setFieldList(List fieldList) { + this.fieldList = fieldList; + } + + public void setFieldTypeList(List fieldTypeList) { + this.fieldTypeList = fieldTypeList; + } + + public void setFieldExtraInfoList(List fieldExtraInfoList) { + this.fieldExtraInfoList = fieldExtraInfoList; + } } diff --git a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java index ee0239a1f..7a37aa6cd 100644 --- a/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java +++ b/oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java @@ -37,6 +37,10 @@ public OracleSink() { @Override public JDBCUpsertOutputFormat getOutputFormat() { + ((OracleDialect) jdbcDialect).setFieldList(fieldList); + ((OracleDialect) jdbcDialect).setFieldTypeList(fieldTypeList); + ((OracleDialect) jdbcDialect).setFieldExtraInfoList(fieldExtraInfoList); + JDBCOptions jdbcOptions = JDBCOptions.builder() .setDBUrl(dbURL) .setDialect(jdbcDialect) diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java index 37a77ee84..9e8c13080 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java @@ -137,14 +137,26 @@ public String getAdditionalWhereClause() { public String getSelectFromStatement(String tableName, List selectFields, List conditionFields, List sqlJoinCompareOperate, List predicateInfoes) { - String fromClause = selectFields.stream().map(this::quoteIdentifier).collect(Collectors.joining(", ")); - String whereClause = conditionFields.stream().map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + " ? ") + String fromClause = selectFields.stream() + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + + String whereClause = conditionFields.stream() + .map(f -> quoteIdentifier(f) + sqlJoinCompareOperate.get(conditionFields.indexOf(f)) + wrapperPlaceholder(f)) + .collect(Collectors.joining(" AND ")); + + String predicateClause = predicateInfoes.stream() + .map(this::buildFilterCondition) .collect(Collectors.joining(" AND ")); - String predicateClause = predicateInfoes.stream().map(this::buildFilterCondition).collect(Collectors.joining(" AND ")); - String sql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "") + String dimQuerySql = "SELECT " + fromClause + " FROM " + tableName + (conditionFields.size() > 0 ? " WHERE " + whereClause : "") + (predicateInfoes.size() > 0 ? " AND " + predicateClause : "") + getAdditionalWhereClause(); - return sql; + + return dimQuerySql; + } + + public String wrapperPlaceholder(String fieldName) { + return " ? "; } public String buildFilterCondition(PredicateInfo info) { diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java index 0dfbef325..2c8c18fcd 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideTableInfo.java @@ -47,6 +47,8 @@ public boolean check() { Preconditions.checkNotNull(tableName, "rdb of tableName is required"); Preconditions.checkNotNull(userName, "rdb of userName is required"); Preconditions.checkNotNull(password, "rdb of password is required"); + Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(), + "fields and fieldExtraInfoList attributes must be the same length"); return true; } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java index c65696903..84eea68ff 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java @@ -20,8 +20,8 @@ import com.dtstack.flink.sql.sink.IStreamSinkGener; import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat; import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo; +import com.dtstack.flink.sql.table.TableInfo; import com.dtstack.flink.sql.table.TargetTableInfo; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -34,10 +34,6 @@ import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; import java.io.Serializable; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; -import java.sql.Types; import java.util.Arrays; import java.util.List; @@ -88,6 +84,10 @@ public abstract class RdbSink implements RetractStreamTableSink, Serializab protected String updateMode; + public List fieldList; + public List fieldTypeList; + public List fieldExtraInfoList; + public RdbSink(JDBCDialect jdbcDialect) { this.jdbcDialect = jdbcDialect; } @@ -111,6 +111,9 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) { this.sqlTypes = JDBCTypeConvertUtils.buildSqlTypes(fieldTypeArray); this.allReplace = rdbTableInfo.isAllReplace(); this.updateMode = rdbTableInfo.getUpdateMode(); + this.fieldList = rdbTableInfo.getFieldList(); + this.fieldTypeList = rdbTableInfo.getFieldTypeList(); + this.fieldExtraInfoList = rdbTableInfo.getFieldExtraInfoList(); return this; } diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java index d5ad4eab6..e490978e4 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java @@ -183,6 +183,8 @@ public boolean check() { } + Preconditions.checkArgument(getFieldList().size() == getFieldExtraInfoList().size(), + "fields and fieldExtraInfoList attributes must be the same length"); return true; }