Skip to content

Commit

Permalink
csv sink default delimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Mar 30, 2020
1 parent 1cb21db commit ba3680f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/kafkaSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ CREATE TABLE tableName(
|partitionKeys | 用来分区的字段|||
|updateMode | 回溯流数据下发模式,append,upsert.upsert模式下会将是否为回溯信息以字段形式进行下发。||append|
|sinkdatatype | 写入kafka数据格式,json,avro,csv||json|
|fieldDelimiter | csv数据分隔符|| \ |
|fieldDelimiter | csv数据分隔符|| , |


**kafka相关参数可以自定义,使用kafka.开头即可。**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
}

kafkaSinkTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSinkTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
kafkaSinkTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
kafkaSinkTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), ",")));
kafkaSinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
kafkaSinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));

Expand Down

0 comments on commit ba3680f

Please sign in to comment.