Skip to content

Commit

Permalink
Merge branch 'feat_1.8_kafkaUpdateMode_mergedDev' into 'v1.8.0_dev'
Browse files Browse the repository at this point in the history
Feat 1.8 kafka update mode merged dev

See merge request dt-insight-engine/flinkStreamSQL!12
  • Loading branch information
zoudaokoulife committed Mar 30, 2020
2 parents cf35435 + 8050cae commit 19d27b4
Show file tree
Hide file tree
Showing 24 changed files with 1,367 additions and 111 deletions.
24 changes: 1 addition & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,11 @@
> > * 支持原生FLinkSQL所有的语法
> > * 扩展了输入和输出的性能指标到promethus
## 新特性:
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
* 3.异步维表支持非等值连接,比如:<>,<,>。
* 4.增加kafka数组解析
* 5.增加kafka1.0以上版本的支持
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
* 7.支持插件的依赖方式,参考pluginLoadMode参数
* 8.支持cep处理
* 9.支持udaf
* 10.支持谓词下移
* 11.支持状态的ttl

## BUG修复:
* 1.修复不能解析sql中orderby,union语法。
* 2.修复yarnPer模式提交失败的异常。
* 3.一些bug的修复

# 已支持
* 源表:kafka 0.9、0.10、0.11、1.x版本
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver

# 后续开发计划
* 维表快照
* kafka avro格式
* topN

## 1 快速起步
### 1.1 运行模式

Expand Down Expand Up @@ -205,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* [impala 结果表插件](docs/impalaSink.md)
* [db2 结果表插件](docs/db2Sink.md)
* [sqlserver 结果表插件](docs/sqlserverSink.md)
* [kafka 结果表插件](docs/kafkaSink.md)

### 2.3 维表插件
* [hbase 维表插件](docs/hbaseSide.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.types.Row;


Expand All @@ -34,11 +35,11 @@
* author: toutian
* create: 2019/12/24
*/
public class SerializationMetricWrapper implements SerializationSchema<Row> {
public class SerializationMetricWrapper implements SerializationSchema<CRow> {

private static final long serialVersionUID = 1L;

private SerializationSchema<Row> serializationSchema;
private SerializationSchema<CRow> serializationSchema;

private transient RuntimeContext runtimeContext;

Expand All @@ -47,7 +48,7 @@ public class SerializationMetricWrapper implements SerializationSchema<Row> {
protected transient Meter dtNumRecordsOutRate;


public SerializationMetricWrapper(SerializationSchema<Row> serializationSchema) {
public SerializationMetricWrapper(SerializationSchema<CRow> serializationSchema) {
this.serializationSchema = serializationSchema;
}

Expand All @@ -57,7 +58,7 @@ public void initMetric() {
}

@Override
public byte[] serialize(Row element) {
public byte[] serialize(CRow element) {
beforeSerialize();
byte[] row = serializationSchema.serialize(element);
afterSerialize();
Expand All @@ -79,7 +80,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}

public SerializationSchema<Row> getSerializationSchema() {
public SerializationSchema<CRow> getSerializationSchema() {
return serializationSchema;
}

Expand Down
223 changes: 223 additions & 0 deletions docs/kafkaSink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
## 1.格式:
```
CREATE TABLE tableName(
colName colType,
...
function(colNameX) AS aliasName,
WATERMARK FOR colName AS withOffset( colName , delayTime )
)WITH(
type ='kafka11',
bootstrapServers ='ip:port,ip:port...',
zookeeperQuorum ='ip:port,ip:port/zkparent',
offsetReset ='latest',
topic ='topicName',
groupId='test',
parallelism ='parllNum',
);
```

## 2.支持的版本
kafka09,kafka10,kafka11及以上版本
**kafka读取和写入的版本必须一致,否则会有兼容性错误。**

## 3.表结构定义

|参数名称|含义|
|----|---|
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称|
| colName | 列名称|
| colType | 列类型 [colType支持的类型](colType.md)|

## 4.参数:

|参数名称|含义|是否必填|默认值|
|----|---|---|---|
|type | kafka09 ||kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本)|
|groupId | 需要读取的 groupId 名称|||
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
|topic | 需要读取的 topic 名称|||
|parallelism | 并行度设置||1|
|partitionKeys | 用来分区的字段|||
|updateMode | 回溯流数据下发模式,append,upsert.upsert模式下会将是否为回溯信息以字段形式进行下发。||append|
|sinkdatatype | 写入kafka数据格式,json,avro,csv||json|
|fieldDelimiter | csv数据分隔符|| , |


**kafka相关参数可以自定义,使用kafka.开头即可。**
```
kafka.consumer.id
kafka.socket.timeout.ms
kafka.fetch.message.max.bytes
kafka.num.consumer.fetchers
kafka.auto.commit.enable
kafka.auto.commit.interval.ms
kafka.queued.max.message.chunks
kafka.rebalance.max.retries
kafka.fetch.min.bytes
kafka.fetch.wait.max.ms
kafka.rebalance.backoff.ms
kafka.refresh.leader.backoff.ms
kafka.consumer.timeout.ms
kafka.exclude.internal.topics
kafka.partition.assignment.strategy
kafka.client.id
kafka.zookeeper.session.timeout.ms
kafka.zookeeper.connection.timeout.ms
kafka.zookeeper.sync.time.ms
kafka.offsets.storage
kafka.offsets.channel.backoff.ms
kafka.offsets.channel.socket.timeout.ms
kafka.offsets.commit.max.retries
kafka.dual.commit.enabled
kafka.partition.assignment.strategy
kafka.socket.receive.buffer.bytes
kafka.fetch.min.bytes
```

## 5.样例:

### json格式:
```
CREATE TABLE MyResult(
channel varchar,
pv varchar
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
topic='mqTest02',
parallelism ='2',
partitionKeys = 'channel,pv',
updateMode='upsert'
);
upsert模式下发的数据格式:{"channel":"zs","pv":"330",retract:true}
append模式下发的数据格式:{"channel":"zs","pv":"330"}
```

### avro格式:

如果updateMode='upsert',schemaInfo需要包含retract属性信息。

```
CREATE TABLE MyTable(
channel varchar,
pv varchar
--xctime bigint
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
groupId='mqTest01',
offsetReset='latest',
topic='mqTest01',
parallelism ='1',
topicIsPattern ='false'
);
create table sideTable(
channel varchar,
xccount int,
PRIMARY KEY(channel),
PERIOD FOR SYSTEM_TIME
)WITH(
type='mysql',
url='jdbc:mysql://172.16.8.109:3306/test?charset=utf8',
userName='dtstack',
password='abc123',
tableName='sidetest',
cache = 'LRU',
cacheTTLMs='10000',
parallelism ='1'
);
CREATE TABLE MyResult(
channel varchar,
pv varchar
)WITH(
--type='console'
type='kafka',
bootstrapServers='172.16.8.107:9092',
topic='mqTest02',
parallelism ='1',
updateMode='upsert',
sinkdatatype = 'avro',
schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"}
,{"name":"pv","type":"string"},{"name":"channel","type":"string"},
{"name":"retract","type":"boolean"}]}'
);
insert
into
MyResult
select
a.channel as channel,
a.pv as pv
from
MyTable a
```
### csv格式:

```
CREATE TABLE MyTable(
channel varchar,
pv varchar
--xctime bigint
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
groupId='mqTest01',
offsetReset='latest',
topic='mqTest01',
parallelism ='2',
topicIsPattern ='false'
);
create table sideTable(
channel varchar,
xccount int,
PRIMARY KEY(channel),
PERIOD FOR SYSTEM_TIME
)WITH(
type='mysql',
url='jdbc:mysql://172.16.8.109:3306/test?charset=utf8',
userName='dtstack',
password='abc123',
tableName='sidetest',
cache = 'LRU',
cacheTTLMs='10000',
parallelism ='1'
);
CREATE TABLE MyResult(
channel varchar,
pv varchar
)WITH(
type='kafka',
bootstrapServers='172.16.8.107:9092',
topic='mqTest02',
parallelism ='2',
updateMode='upsert',
sinkdatatype = 'csv',
fieldDelimiter='*'
);
insert
into
MyResult
select
a.channel as channel,
a.pv as pv
from
MyTable a
```
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

import com.dtstack.flink.sql.format.FormatType;
import com.dtstack.flink.sql.format.SerializationMetricWrapper;
import com.dtstack.flink.sql.sink.kafka.serialization.AvroCRowSerializationSchema;
import com.dtstack.flink.sql.sink.kafka.serialization.CsvCRowSerializationSchema;
import com.dtstack.flink.sql.sink.kafka.serialization.JsonCRowSerializationSchema;
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
import org.apache.flink.formats.csv.CsvRowSerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.table.runtime.types.CRow;

import java.util.Optional;
import java.util.Properties;
Expand All @@ -51,42 +51,37 @@ public abstract class AbstractKafkaProducerFactory {
* @param partitioner
* @return
*/
public abstract RichSinkFunction<Row> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<Row>> partitioner, String[] partitionKeys);
public abstract RichSinkFunction<CRow> createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<CRow> typeInformation, Properties properties, Optional<FlinkKafkaPartitioner<CRow>> partitioner, String[] partitionKeys);

protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
return new SerializationMetricWrapper(createSerializationSchema(kafkaSinkTableInfo, typeInformation));
protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<CRow> typeInformation) {
SerializationSchema<CRow> serializationSchema = createSerializationSchema(kafkaSinkTableInfo, typeInformation);
return new SerializationMetricWrapper(serializationSchema);
}

private SerializationSchema<Row> createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<Row> typeInformation) {
SerializationSchema<Row> serializationSchema = null;
private SerializationSchema<CRow> createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation<CRow> typeInformation) {
SerializationSchema<CRow> serializationSchema = null;
if (FormatType.JSON.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {

if (StringUtils.isNotBlank(kafkaSinkTableInfo.getSchemaString())) {
serializationSchema = new JsonRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());
serializationSchema = new JsonCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(), kafkaSinkTableInfo.getUpdateMode());
} else if (typeInformation != null && typeInformation.getArity() != 0) {
serializationSchema = new JsonRowSerializationSchema(typeInformation);
serializationSchema = new JsonCRowSerializationSchema(typeInformation, kafkaSinkTableInfo.getUpdateMode());
} else {
throw new IllegalArgumentException("sinkDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
}

} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {

if (StringUtils.isBlank(kafkaSinkTableInfo.getFieldDelimiter())) {
throw new IllegalArgumentException("sinkDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
}

final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(typeInformation);
final CsvCRowSerializationSchema.Builder serSchemaBuilder = new CsvCRowSerializationSchema.Builder(typeInformation);
serSchemaBuilder.setFieldDelimiter(kafkaSinkTableInfo.getFieldDelimiter().toCharArray()[0]);
serializationSchema = serSchemaBuilder.build();
serSchemaBuilder.setUpdateMode(kafkaSinkTableInfo.getUpdateMode());

serializationSchema = serSchemaBuilder.build();
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) {

if (StringUtils.isBlank(kafkaSinkTableInfo.getSchemaString())) {
throw new IllegalArgumentException("sinkDataType:" + FormatType.AVRO.name() + " must set schemaString");
}

serializationSchema = new AvroRowSerializationSchema(kafkaSinkTableInfo.getSchemaString());

serializationSchema = new AvroCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(), kafkaSinkTableInfo.getUpdateMode());
}

if (null == serializationSchema) {
Expand Down
Loading

0 comments on commit 19d27b4

Please sign in to comment.