Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat_1.8_kafkaSourceAvro' into v…
Browse files Browse the repository at this point in the history
…1.8.0_dev

# Conflicts:
#	kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java
  • Loading branch information
todd5167 committed Mar 28, 2020
2 parents 59b996b + 0b22f9c commit 94c2784
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 586 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
return preStr + "?" + sb.toString();
}

public static boolean isJosn(String str){
public static boolean isJson(String str) {
boolean flag = false;
if(StringUtils.isNotBlank(str)){
if (StringUtils.isNotBlank(str)) {
try {
objectMapper.readValue(str,Map.class);
objectMapper.readValue(str, Map.class);
flag = true;
} catch (Throwable e) {
flag=false;
flag = false;
}
}
return flag;
Expand Down
224 changes: 26 additions & 198 deletions docs/kafkaSource.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
## 1.格式:
```
数据现在支持json格式{"xx":"bb","cc":"dd"}
CREATE TABLE tableName(
colName colType,
Expand All @@ -15,9 +14,8 @@ CREATE TABLE tableName(
topic ='topicName',
groupId='test',
parallelism ='parllNum',
--timezone='America/Los_Angeles',
timezone='Asia/Shanghai',
sourcedatatype ='json' #可不设置
sourcedatatype ='dt_nest' #可不设置
);
```

Expand Down Expand Up @@ -47,7 +45,9 @@ CREATE TABLE tableName(
|topicIsPattern | topic是否是正则表达式格式(true&#124;false) |否| false
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
|parallelism | 并行度设置||1|
|sourcedatatype | 数据类型||json|
|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式||dt_nest|
|schemaInfo | avro类型使用的schema信息|||
|fieldDelimiter |csv类型使用的数据分隔符|| | |
|timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai'
**kafka相关参数可以自定义,使用kafka.开头即可。**
```
Expand Down Expand Up @@ -169,24 +169,10 @@ CREATE TABLE MyTable(
parallelism ='1'
);
```
# 二、csv格式数据源
根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
## 1.参数:

|参数名称|含义|是否必填|默认值|
|----|---|---|---|
|type | kafka09 |||
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
|topic | 需要读取的 topic 名称|||
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
|parallelism | 并行度设置 ||1|
|sourcedatatype | 数据类型||csv|
|fielddelimiter | 字段分隔符|||
|lengthcheckpolicy | 单行字段条数检查策略 ||可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。|
**kafka相关参数可以自定义,使用kafka.开头即可。**

## 2.样例:
## 7.csv格式数据源


```
CREATE TABLE MyTable(
name varchar,
Expand All @@ -203,186 +189,28 @@ CREATE TABLE MyTable(
--topic ='mqTest.*',
--topicIsPattern='true'
parallelism ='1',
sourcedatatype ='csv',
fielddelimiter ='\|',
lengthcheckpolicy = 'PAD'
sourceDatatype ='csv'
);
```
# 三、text格式数据源UDF自定义拆分
Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。
与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致:

## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
```
create table kafka_stream(
_topic STRING,
_messageKey STRING,
_message STRING,
_partition INT,
_offset BIGINT,
) with (
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
parallelism ='1',
sourcedatatype='text'
```
## 2.参数:

|参数名称|含义|是否必填|默认值|
|----|---|---|---|
|type | kafka09 |||
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
|topic | 需要读取的 topic 名称|||
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
|parallelism | 并行度设置||1|
|sourcedatatype | 数据类型||text|
**kafka相关参数可以自定义,使用kafka.开头即可。**
## 8.avro格式数据源

## 2.自定义:
从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作,
需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下:
2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考)

**SQL**
```
-- 定义解析Kakfa message的UDTF
CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF';
CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF';
-- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
create table kafka_src (
_topic STRING,
_messageKey STRING,
_message STRING,
_partition INT,
_offset BIGINT,
ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。
watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark
) WITH (
type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档
topic = 'test_kafka_topic',
...
);
create table rds_sink (
name VARCHAR,
age INT,
grade VARCHAR,
updateTime TIMESTAMP
) WITH(
type='mysql',
url='jdbc:mysql://localhost:3306/test',
tableName='test4',
userName='test',
password='XXXXXX'
CREATE TABLE MyTable(
channel varchar,
pv varchar
--xctime bigint
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
groupId='mqTest01',
offsetReset='latest',
topic='mqTest01',
parallelism ='1',
topicIsPattern ='false',
kafka.group.id='mqTest',
sourceDataType ='avro',
schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}'
);
-- 使用UDTF,将二进制数据解析成格式化数据
CREATE VIEW input_view (
name,
age,
grade,
updateTime
) AS
SELECT
COUNT(*) as cnt,
T.ctime,
T.order,
T.name,
T.sex
from
kafka_src as S,
LATERAL TABLE (kafkapaser _message)) as T (
ctime,
order,
name,
sex
)
Group BY T.sex,
TUMBLE(ctime, INTERVAL '1' MINUTE);
-- 对input_view中输出的数据做计算
CREATE VIEW view2 (
cnt,
sex
) AS
SELECT
COUNT(*) as cnt,
T.sex
from
input_view
Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);
-- 使用解析出的格式化数据进行计算,并将结果输出到RDS中
insert into rds_sink
SELECT
cnt,sex
from view2;
```
**UDF&UDTF**
```
package com.XXXX;
import com.XXXX.fastjson.JSONObject;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import java.io.UnsupportedEncodingException;
/**
以下例子解析输入Kafka中的JSON字符串,并将其格式化输出
**/
public class kafkaUDTF extends TableFunction<Row> {
public void eval(byte[] message) {
try {
// 读入一个二进制数据,并将其转换为String格式
String msg = new String(message, "UTF-8");
// 提取JSON Object中各字段
String ctime = Timestamp.valueOf(data.split('\\|')[0]);
String order = data.split('\\|')[1];
String name = data.split('\\|')[2];
String sex = data.split('\\|')[3];
// 将解析出的字段放到要输出的Row()对象
Row row = new Row(4);
row.setField(0, ctime);
row.setField(1, age);
row.setField(2, grade);
row.setField(3, updateTime);
System.out.println("Kafka message str ==>" + row.toString());
// 输出一行
collect(row);
} catch (ClassCastException e) {
System.out.println("Input data format error. Input data " + msg + "is not json string");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
// 定义输出Row()对象的字段类型
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING);
}
}
package com.dp58;
package com.dp58.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class KafkaUDF extends ScalarFunction {
// 可选,open方法可以不写
// 需要import org.apache.flink.table.functions.FunctionContext;
public String eval(byte[] message) {
// 读入一个二进制数据,并将其转换为String格式
String msg = new String(message, "UTF-8");
return msg.split('\\|')[0];
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
//可选,close方法可以不写
@Override
public void close() {
}
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.source.kafka;

import com.dtstack.flink.sql.source.IStreamSourceGener;
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
import com.dtstack.flink.sql.util.DtStringUtil;
import com.dtstack.flink.sql.util.PluginUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Date: 2020/3/20
* Company: www.dtstack.com
* @author maqi
*/
public abstract class AbstractKafkaSource implements IStreamSourceGener<Table> {

private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}";

protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInfo) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceTableInfo.getBootstrapServers());

if (DtStringUtil.isJson(kafkaSourceTableInfo.getOffsetReset())) {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.NONE.name().toLowerCase());
} else {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceTableInfo.getOffsetReset());
}

if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) {
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceTableInfo.getGroupId());
}

for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
}
return props;
}

protected String generateOperatorName(String tabName, String topicName) {
return SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", tabName);
}

protected TypeInformation<Row> getRowTypeInformation(KafkaSourceTableInfo kafkaSourceTableInfo) {
Class<?>[] fieldClasses = kafkaSourceTableInfo.getFieldClasses();
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
.toArray(TypeInformation[]::new);

return new RowTypeInfo(types, kafkaSourceTableInfo.getFields());
}

protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc) {
if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) {
kafkaSrc.setStartFromEarliest();
} else if (DtStringUtil.isJson(offset)) {
Map<KafkaTopicPartition, Long> specificStartupOffsets = buildOffsetMap(offset, topicName);
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
} else {
kafkaSrc.setStartFromLatest();
}
}

/**
* kafka offset,eg.. {"0":12312,"1":12321,"2":12312}
* @param offsetJson
* @param topicName
* @return
*/
protected Map<KafkaTopicPartition, Long> buildOffsetMap(String offsetJson, String topicName) {
try {
Properties properties = PluginUtil.jsonStrToObject(offsetJson, Properties.class);
Map<String, Object> offsetMap = PluginUtil.objectToMap(properties);
Map<KafkaTopicPartition, Long> specificStartupOffsets = offsetMap
.entrySet()
.stream()
.collect(Collectors.toMap(
(Map.Entry<String, Object> entry) -> new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())),
(Map.Entry<String, Object> entry) -> Long.valueOf(entry.getValue().toString()))
);

return specificStartupOffsets;
} catch (Exception e) {
throw new RuntimeException("not support offsetReset type:" + offsetJson);
}
}

}
Loading

0 comments on commit 94c2784

Please sign in to comment.