Skip to content

Commit

Permalink
add avro params
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Mar 20, 2020
1 parent baabfd4 commit 0b22f9c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 199 deletions.
224 changes: 26 additions & 198 deletions docs/kafkaSource.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
## 1.格式:
```
数据现在支持json格式{"xx":"bb","cc":"dd"}
CREATE TABLE tableName(
colName colType,
Expand All @@ -15,9 +14,8 @@ CREATE TABLE tableName(
topic ='topicName',
groupId='test',
parallelism ='parllNum',
--timezone='America/Los_Angeles',
timezone='Asia/Shanghai',
sourcedatatype ='json' #可不设置
sourcedatatype ='dt_nest' #可不设置
);
```

Expand Down Expand Up @@ -47,7 +45,9 @@ CREATE TABLE tableName(
|topicIsPattern | topic是否是正则表达式格式(true|false) |否| false
|offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
|parallelism | 并行度设置||1|
|sourcedatatype | 数据类型||json|
|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式||dt_nest|
|schemaInfo | avro类型使用的schema信息|||
|fieldDelimiter |csv类型使用的数据分隔符|| | |
|timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai'
**kafka相关参数可以自定义,使用kafka.开头即可。**
```
Expand Down Expand Up @@ -169,24 +169,10 @@ CREATE TABLE MyTable(
parallelism ='1'
);
```
# 二、csv格式数据源
根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
## 1.参数:

|参数名称|含义|是否必填|默认值|
|----|---|---|---|
|type | kafka09 |||
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
|topic | 需要读取的 topic 名称|||
|offsetReset | 读取的topic 的offset初始位置[latest|earliest]||latest|
|parallelism | 并行度设置 ||1|
|sourcedatatype | 数据类型||csv|
|fielddelimiter | 字段分隔符|||
|lengthcheckpolicy | 单行字段条数检查策略 ||可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。|
**kafka相关参数可以自定义,使用kafka.开头即可。**

## 2.样例:
## 7.csv格式数据源


```
CREATE TABLE MyTable(
name varchar,
Expand All @@ -203,186 +189,28 @@ CREATE TABLE MyTable(
--topic ='mqTest.*',
--topicIsPattern='true'
parallelism ='1',
sourcedatatype ='csv',
fielddelimiter ='\|',
lengthcheckpolicy = 'PAD'
sourceDatatype ='csv'
);
```
# 三、text格式数据源UDF自定义拆分
Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。
与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致:

## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
```
create table kafka_stream(
_topic STRING,
_messageKey STRING,
_message STRING,
_partition INT,
_offset BIGINT,
) with (
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
parallelism ='1',
sourcedatatype='text'
```
## 2.参数:

|参数名称|含义|是否必填|默认值|
|----|---|---|---|
|type | kafka09 |||
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
|topic | 需要读取的 topic 名称|||
|offsetReset | 读取的topic 的offset初始位置[latest|earliest]||latest|
|parallelism | 并行度设置||1|
|sourcedatatype | 数据类型||text|
**kafka相关参数可以自定义,使用kafka.开头即可。**
## 8.avro格式数据源

## 2.自定义:
从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作,
需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下:
2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考)

**SQL**
```
-- 定义解析Kakfa message的UDTF
CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF';
CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF';
-- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
create table kafka_src (
_topic STRING,
_messageKey STRING,
_message STRING,
_partition INT,
_offset BIGINT,
ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。
watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark
) WITH (
type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档
topic = 'test_kafka_topic',
...
);
create table rds_sink (
name VARCHAR,
age INT,
grade VARCHAR,
updateTime TIMESTAMP
) WITH(
type='mysql',
url='jdbc:mysql://localhost:3306/test',
tableName='test4',
userName='test',
password='XXXXXX'
CREATE TABLE MyTable(
channel varchar,
pv varchar
--xctime bigint
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
groupId='mqTest01',
offsetReset='latest',
topic='mqTest01',
parallelism ='1',
topicIsPattern ='false',
kafka.group.id='mqTest',
sourceDataType ='avro',
schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}'
);
-- 使用UDTF,将二进制数据解析成格式化数据
CREATE VIEW input_view (
name,
age,
grade,
updateTime
) AS
SELECT
COUNT(*) as cnt,
T.ctime,
T.order,
T.name,
T.sex
from
kafka_src as S,
LATERAL TABLE (kafkapaser _message)) as T (
ctime,
order,
name,
sex
)
Group BY T.sex,
TUMBLE(ctime, INTERVAL '1' MINUTE);
-- 对input_view中输出的数据做计算
CREATE VIEW view2 (
cnt,
sex
) AS
SELECT
COUNT(*) as cnt,
T.sex
from
input_view
Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);
-- 使用解析出的格式化数据进行计算,并将结果输出到RDS中
insert into rds_sink
SELECT
cnt,sex
from view2;
```
**UDF&UDTF**
```
package com.XXXX;
import com.XXXX.fastjson.JSONObject;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import java.io.UnsupportedEncodingException;
/**
以下例子解析输入Kafka中的JSON字符串,并将其格式化输出
**/
public class kafkaUDTF extends TableFunction<Row> {
public void eval(byte[] message) {
try {
// 读入一个二进制数据,并将其转换为String格式
String msg = new String(message, "UTF-8");
// 提取JSON Object中各字段
String ctime = Timestamp.valueOf(data.split('\\|')[0]);
String order = data.split('\\|')[1];
String name = data.split('\\|')[2];
String sex = data.split('\\|')[3];
// 将解析出的字段放到要输出的Row()对象
Row row = new Row(4);
row.setField(0, ctime);
row.setField(1, age);
row.setField(2, grade);
row.setField(3, updateTime);
System.out.println("Kafka message str ==>" + row.toString());
// 输出一行
collect(row);
} catch (ClassCastException e) {
System.out.println("Input data format error. Input data " + msg + "is not json string");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
// 定义输出Row()对象的字段类型
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING);
}
}
package com.dp58;
package com.dp58.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class KafkaUDF extends ScalarFunction {
// 可选,open方法可以不写
// 需要import org.apache.flink.table.functions.FunctionContext;
public String eval(byte[] message) {
// 读入一个二进制数据,并将其转换为String格式
String msg = new String(message, "UTF-8");
return msg.split('\\|')[0];
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
//可选,close方法可以不写
@Override
public void close() {
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.dtstack.flink.sql.source.kafka.table;

import com.dtstack.flink.sql.format.FormatType;
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
import com.dtstack.flink.sql.table.AbstractSourceParser;
import com.dtstack.flink.sql.table.AbstractTableInfo;
Expand Down Expand Up @@ -51,6 +52,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));

kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase())));
kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|")));
kafkaSourceTableInfo.setSourceDataType(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.SOURCE_DATA_TYPE_KEY.toLowerCase(), FormatType.DT_NEST.name())));

Map<String, String> kafkaParams = props.keySet().stream()
.filter(key -> !key.isEmpty() && key.startsWith("kafka."))
.collect(Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {

public static final String TOPICISPATTERN_KEY = "topicIsPattern";

public static final String SCHEMA_STRING_KEY = "schemaInfo";

public static final String CSV_FIELD_DELIMITER_KEY = "fieldDelimiter";

public static final String SOURCE_DATA_TYPE_KEY = "sourceDataType";

private String bootstrapServers;

private String topic;
Expand All @@ -58,7 +64,7 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {

private Boolean topicIsPattern = false;

private String sourceDataType = FormatType.DT_NEST.name();
private String sourceDataType;

private String schemaString;

Expand Down

0 comments on commit 0b22f9c

Please sign in to comment.