Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat_1.8_codereview' into 1.8_te…
Browse files Browse the repository at this point in the history
…st_3.10.x
  • Loading branch information
todd5167 committed Apr 1, 2020
2 parents 9b6d66e + c2b4d8a commit e1736f3
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 455 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flink.sql.sink.kafka;

import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.runtime.types.CRowTypeInfo;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Optional;
import java.util.Properties;
import java.util.stream.IntStream;

/**
* Date: 2020/4/1
* Company: www.dtstack.com
* @author maqi
*/
public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener {

public static final String SINK_OPERATOR_NAME_TPL = "${topic}_${table}";

protected String[] fieldNames;
protected TypeInformation<?>[] fieldTypes;

protected String[] partitionKeys;
protected String sinkOperatorName;
protected Properties properties;
protected int parallelism;
protected String topic;
protected String tableName;

protected TableSchema schema;
protected SinkFunction<CRow> kafkaProducer;


protected Optional<FlinkKafkaPartitioner<CRow>> partitioner;

protected Properties getKafkaProperties(KafkaSinkTableInfo KafkaSinkTableInfo) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSinkTableInfo.getBootstrapServers());

for (String key : KafkaSinkTableInfo.getKafkaParamKeys()) {
props.setProperty(key, KafkaSinkTableInfo.getKafkaParam(key));
}
return props;
}

protected TypeInformation[] getTypeInformations(KafkaSinkTableInfo kafka11SinkTableInfo) {
Class<?>[] fieldClasses = kafka11SinkTableInfo.getFieldClasses();
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
.toArray(TypeInformation[]::new);
return types;
}


protected TableSchema buildTableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "fieldNames length must equals fieldTypes length !");

TableSchema.Builder builder = TableSchema.builder();
IntStream.range(0, fieldTypes.length)
.forEach(i -> builder.field(fieldNames[i], fieldTypes[i]));

return builder.build();
}


@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<CRow> mapDataStream = dataStream
.map((Tuple2<Boolean, Row> record) -> new CRow(record.f1, record.f0))
.returns(getRowTypeInfo())
.setParallelism(parallelism);

mapDataStream.addSink(kafkaProducer).name(sinkOperatorName);
}

public CRowTypeInfo getRowTypeInfo() {
return new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames));
}

protected String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())) {
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
}
return null;
}

@Override
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
}

@Override
public String[] getFieldNames() {
return fieldNames;
}

@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}

@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
return this;
}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,8 @@

package com.dtstack.flink.sql.sink.kafka;

import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.runtime.types.CRowTypeInfo;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;
Expand All @@ -46,108 +29,23 @@
* @create: 2019-11-05 11:45
* @description:
**/
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {

protected String[] fieldNames;

protected TypeInformation<?>[] fieldTypes;

protected String topic;

protected int parallelism;

protected Properties properties;

protected FlinkKafkaProducer<CRow> flinkKafkaProducer;
protected CRowTypeInfo typeInformation;


/** The schema of the table. */
private TableSchema schema;

/** Partitioner to select Kafka partition for each item. */
protected Optional<FlinkKafkaPartitioner<CRow>> partitioner;

private String[] partitionKeys;

public class KafkaSink extends AbstractKafkaSink {
@Override
public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
KafkaSinkTableInfo kafkaSinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
this.topic = kafkaSinkTableInfo.getTopic();

properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaSinkTableInfo.getBootstrapServers());

for (String key : kafkaSinkTableInfo.getKafkaParamKeys()) {
properties.setProperty(key, kafkaSinkTableInfo.getKafkaParam(key));
}
Properties kafkaProperties = getKafkaProperties(kafkaSinkTableInfo);
this.tableName = kafkaSinkTableInfo.getName();
this.topic = kafkaSinkTableInfo.getTopic();
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);
this.fieldNames = kafkaSinkTableInfo.getFields();
TypeInformation[] types = new TypeInformation[kafkaSinkTableInfo.getFields().length];
for (int i = 0; i < kafkaSinkTableInfo.getFieldClasses().length; i++) {
types[i] = TypeInformation.of(kafkaSinkTableInfo.getFieldClasses()[i]);
}
this.fieldTypes = types;

TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i=0;i<fieldNames.length;i++) {
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
}
this.schema = schemaBuilder.build();

Integer parallelism = kafkaSinkTableInfo.getParallelism();
if (parallelism != null) {
this.parallelism = parallelism;
}

typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames));
this.flinkKafkaProducer = (FlinkKafkaProducer<CRow>) new KafkaProducerFactory()
.createKafkaProducer(kafkaSinkTableInfo, typeInformation, properties, partitioner, partitionKeys);
return this;
}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<CRow> mapDataStream = dataStream
.map((Tuple2<Boolean, Row> record) -> new CRow(record.f1, record.f0))
.returns(typeInformation)
.setParallelism(parallelism);

mapDataStream.addSink(flinkKafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

@Override
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
}

@Override
public String[] getFieldNames() {
return fieldNames;
}

@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}

@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.fieldTypes = getTypeInformations(kafkaSinkTableInfo);
this.schema = buildTableSchema(fieldNames, fieldTypes);
this.parallelism = kafkaSinkTableInfo.getParallelism();
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
this.kafkaProducer = new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getRowTypeInfo(), kafkaProperties, partitioner, partitionKeys);
return this;
}

private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo){
if(StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())){
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
}
return null;
}
}
Loading

0 comments on commit e1736f3

Please sign in to comment.