diff --git a/docs/kafkaSource.md b/docs/kafkaSource.md index 76096aba6..00c252c99 100644 --- a/docs/kafkaSource.md +++ b/docs/kafkaSource.md @@ -1,6 +1,5 @@ ## 1.格式: ``` -数据现在支持json格式{"xx":"bb","cc":"dd"} CREATE TABLE tableName( colName colType, @@ -15,9 +14,8 @@ CREATE TABLE tableName( topic ='topicName', groupId='test', parallelism ='parllNum', - --timezone='America/Los_Angeles', timezone='Asia/Shanghai', - sourcedatatype ='json' #可不设置 + sourcedatatype ='dt_nest' #可不设置 ); ``` @@ -47,7 +45,9 @@ CREATE TABLE tableName( |topicIsPattern | topic是否是正则表达式格式(true|false) |否| false |offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]|否|latest| |parallelism | 并行度设置|否|1| -|sourcedatatype | 数据类型|否|json| +|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式|否|dt_nest| +|schemaInfo | avro类型使用的schema信息|否|| +|fieldDelimiter |csv类型使用的数据分隔符|否| | | |timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai' **kafka相关参数可以自定义,使用kafka.开头即可。** ``` @@ -169,24 +169,10 @@ CREATE TABLE MyTable( parallelism ='1' ); ``` -# 二、csv格式数据源 -根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。 -## 1.参数: - -|参数名称|含义|是否必填|默认值| -|----|---|---|---| -|type | kafka09 | 是|| -|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| -|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| -|topic | 需要读取的 topic 名称|是|| -|offsetReset | 读取的topic 的offset初始位置[latest|earliest]|否|latest| -|parallelism | 并行度设置 |否|1| -|sourcedatatype | 数据类型|是 |csv| -|fielddelimiter | 字段分隔符|是 || -|lengthcheckpolicy | 单行字段条数检查策略 |否|可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。| -**kafka相关参数可以自定义,使用kafka.开头即可。** -## 2.样例: +## 7.csv格式数据源 + + ``` CREATE TABLE MyTable( name varchar, @@ -203,186 +189,28 @@ CREATE TABLE MyTable( --topic ='mqTest.*', --topicIsPattern='true' parallelism ='1', - sourcedatatype ='csv', - fielddelimiter ='\|', - lengthcheckpolicy = 'PAD' + sourceDatatype ='csv' ); ``` -# 三、text格式数据源UDF自定义拆分 -Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。 - 与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致: - -## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。 -``` -create table kafka_stream( - _topic STRING, - _messageKey STRING, - _message STRING, - _partition INT, - _offset BIGINT, -) with ( - type ='kafka09', - bootstrapServers ='172.16.8.198:9092', - zookeeperQuorum ='172.16.8.198:2181/kafka', - offsetReset ='latest', - topic ='nbTest1', - parallelism ='1', - sourcedatatype='text' - ) -``` -## 2.参数: - -|参数名称|含义|是否必填|默认值| -|----|---|---|---| -|type | kafka09 | 是|| -|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| -|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| -|topic | 需要读取的 topic 名称|是|| -|offsetReset | 读取的topic 的offset初始位置[latest|earliest]|否|latest| -|parallelism | 并行度设置|否|1| -|sourcedatatype | 数据类型|否|text| -**kafka相关参数可以自定义,使用kafka.开头即可。** +## 8.avro格式数据源 -## 2.自定义: -从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作, -需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下: -2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考) - -**SQL** ``` --- 定义解析Kakfa message的UDTF - CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF'; - CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF'; - -- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。 - create table kafka_src ( - _topic STRING, - _messageKey STRING, - _message STRING, - _partition INT, - _offset BIGINT, - ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。 - watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark - ) WITH ( - type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档 - topic = 'test_kafka_topic', - ... - ); - create table rds_sink ( - name VARCHAR, - age INT, - grade VARCHAR, - updateTime TIMESTAMP - ) WITH( - type='mysql', - url='jdbc:mysql://localhost:3306/test', - tableName='test4', - userName='test', - password='XXXXXX' +CREATE TABLE MyTable( + channel varchar, + pv varchar + --xctime bigint + )WITH( + type='kafka', + bootstrapServers='172.16.8.107:9092', + groupId='mqTest01', + offsetReset='latest', + topic='mqTest01', + parallelism ='1', + topicIsPattern ='false', + kafka.group.id='mqTest', + sourceDataType ='avro', + schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}' ); - -- 使用UDTF,将二进制数据解析成格式化数据 - CREATE VIEW input_view ( - name, - age, - grade, - updateTime - ) AS - SELECT - COUNT(*) as cnt, - T.ctime, - T.order, - T.name, - T.sex - from - kafka_src as S, - LATERAL TABLE (kafkapaser _message)) as T ( - ctime, - order, - name, - sex - ) - Group BY T.sex, - TUMBLE(ctime, INTERVAL '1' MINUTE); - -- 对input_view中输出的数据做计算 - CREATE VIEW view2 ( - cnt, - sex - ) AS - SELECT - COUNT(*) as cnt, - T.sex - from - input_view - Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE); - -- 使用解析出的格式化数据进行计算,并将结果输出到RDS中 - insert into rds_sink - SELECT - cnt,sex - from view2; - ``` -**UDF&UDTF** + ``` -package com.XXXX; - import com.XXXX.fastjson.JSONObject; - import org.apache.flink.table.functions.TableFunction; - import org.apache.flink.table.types.DataType; - import org.apache.flink.table.types.DataTypes; - import org.apache.flink.types.Row; - import java.io.UnsupportedEncodingException; - /** - 以下例子解析输入Kafka中的JSON字符串,并将其格式化输出 - **/ - public class kafkaUDTF extends TableFunction { - public void eval(byte[] message) { - try { - // 读入一个二进制数据,并将其转换为String格式 - String msg = new String(message, "UTF-8"); - // 提取JSON Object中各字段 - String ctime = Timestamp.valueOf(data.split('\\|')[0]); - String order = data.split('\\|')[1]; - String name = data.split('\\|')[2]; - String sex = data.split('\\|')[3]; - // 将解析出的字段放到要输出的Row()对象 - Row row = new Row(4); - row.setField(0, ctime); - row.setField(1, age); - row.setField(2, grade); - row.setField(3, updateTime); - System.out.println("Kafka message str ==>" + row.toString()); - // 输出一行 - collect(row); - } catch (ClassCastException e) { - System.out.println("Input data format error. Input data " + msg + "is not json string"); - } - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - @Override - // 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型 - // 定义输出Row()对象的字段类型 - public DataType getResultType(Object[] arguments, Class[] argTypes) { - return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING); - } - } - - package com.dp58; - package com.dp58.sql.udx; - import org.apache.flink.table.functions.FunctionContext; - import org.apache.flink.table.functions.ScalarFunction; - public class KafkaUDF extends ScalarFunction { - // 可选,open方法可以不写 - // 需要import org.apache.flink.table.functions.FunctionContext; - public String eval(byte[] message) { - // 读入一个二进制数据,并将其转换为String格式 - String msg = new String(message, "UTF-8"); - return msg.split('\\|')[0]; - } - public long eval(String b, String c) { - return eval(b) + eval(c); - } - //可选,close方法可以不写 - @Override - public void close() { - } - } - ``` + diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java index 867f48d6a..96ccd1783 100644 --- a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java @@ -19,6 +19,7 @@ package com.dtstack.flink.sql.source.kafka.table; +import com.dtstack.flink.sql.format.FormatType; import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset; import com.dtstack.flink.sql.table.AbstractSourceParser; import com.dtstack.flink.sql.table.AbstractTableInfo; @@ -51,6 +52,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map kafkaParams = props.keySet().stream() .filter(key -> !key.isEmpty() && key.startsWith("kafka.")) .collect(Collectors.toMap( diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java index e1c014a69..c27eee376 100644 --- a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java @@ -48,6 +48,12 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo { public static final String TOPICISPATTERN_KEY = "topicIsPattern"; + public static final String SCHEMA_STRING_KEY = "schemaInfo"; + + public static final String CSV_FIELD_DELIMITER_KEY = "fieldDelimiter"; + + public static final String SOURCE_DATA_TYPE_KEY = "sourceDataType"; + private String bootstrapServers; private String topic; @@ -58,7 +64,7 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo { private Boolean topicIsPattern = false; - private String sourceDataType = FormatType.DT_NEST.name(); + private String sourceDataType; private String schemaString;