diff --git a/core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java b/core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java index b2486b1ab..10a6b4f63 100644 --- a/core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java +++ b/core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java @@ -219,14 +219,14 @@ public static String addJdbcParam(String dbUrl, Map addParams, b return preStr + "?" + sb.toString(); } - public static boolean isJosn(String str){ + public static boolean isJson(String str) { boolean flag = false; - if(StringUtils.isNotBlank(str)){ + if (StringUtils.isNotBlank(str)) { try { - objectMapper.readValue(str,Map.class); + objectMapper.readValue(str, Map.class); flag = true; } catch (Throwable e) { - flag=false; + flag = false; } } return flag; diff --git a/docs/kafkaSource.md b/docs/kafkaSource.md index 76096aba6..00c252c99 100644 --- a/docs/kafkaSource.md +++ b/docs/kafkaSource.md @@ -1,6 +1,5 @@ ## 1.格式: ``` -数据现在支持json格式{"xx":"bb","cc":"dd"} CREATE TABLE tableName( colName colType, @@ -15,9 +14,8 @@ CREATE TABLE tableName( topic ='topicName', groupId='test', parallelism ='parllNum', - --timezone='America/Los_Angeles', timezone='Asia/Shanghai', - sourcedatatype ='json' #可不设置 + sourcedatatype ='dt_nest' #可不设置 ); ``` @@ -47,7 +45,9 @@ CREATE TABLE tableName( |topicIsPattern | topic是否是正则表达式格式(true|false) |否| false |offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]|否|latest| |parallelism | 并行度设置|否|1| -|sourcedatatype | 数据类型|否|json| +|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式|否|dt_nest| +|schemaInfo | avro类型使用的schema信息|否|| +|fieldDelimiter |csv类型使用的数据分隔符|否| | | |timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai' **kafka相关参数可以自定义,使用kafka.开头即可。** ``` @@ -169,24 +169,10 @@ CREATE TABLE MyTable( parallelism ='1' ); ``` -# 二、csv格式数据源 -根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。 -## 1.参数: - -|参数名称|含义|是否必填|默认值| -|----|---|---|---| -|type | kafka09 | 是|| -|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| -|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| -|topic | 需要读取的 topic 名称|是|| -|offsetReset | 读取的topic 的offset初始位置[latest|earliest]|否|latest| -|parallelism | 并行度设置 |否|1| -|sourcedatatype | 数据类型|是 |csv| -|fielddelimiter | 字段分隔符|是 || -|lengthcheckpolicy | 单行字段条数检查策略 |否|可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。| -**kafka相关参数可以自定义,使用kafka.开头即可。** -## 2.样例: +## 7.csv格式数据源 + + ``` CREATE TABLE MyTable( name varchar, @@ -203,186 +189,28 @@ CREATE TABLE MyTable( --topic ='mqTest.*', --topicIsPattern='true' parallelism ='1', - sourcedatatype ='csv', - fielddelimiter ='\|', - lengthcheckpolicy = 'PAD' + sourceDatatype ='csv' ); ``` -# 三、text格式数据源UDF自定义拆分 -Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。 - 与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致: - -## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。 -``` -create table kafka_stream( - _topic STRING, - _messageKey STRING, - _message STRING, - _partition INT, - _offset BIGINT, -) with ( - type ='kafka09', - bootstrapServers ='172.16.8.198:9092', - zookeeperQuorum ='172.16.8.198:2181/kafka', - offsetReset ='latest', - topic ='nbTest1', - parallelism ='1', - sourcedatatype='text' - ) -``` -## 2.参数: - -|参数名称|含义|是否必填|默认值| -|----|---|---|---| -|type | kafka09 | 是|| -|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| -|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| -|topic | 需要读取的 topic 名称|是|| -|offsetReset | 读取的topic 的offset初始位置[latest|earliest]|否|latest| -|parallelism | 并行度设置|否|1| -|sourcedatatype | 数据类型|否|text| -**kafka相关参数可以自定义,使用kafka.开头即可。** +## 8.avro格式数据源 -## 2.自定义: -从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作, -需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下: -2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考) - -**SQL** ``` --- 定义解析Kakfa message的UDTF - CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF'; - CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF'; - -- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。 - create table kafka_src ( - _topic STRING, - _messageKey STRING, - _message STRING, - _partition INT, - _offset BIGINT, - ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。 - watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark - ) WITH ( - type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档 - topic = 'test_kafka_topic', - ... - ); - create table rds_sink ( - name VARCHAR, - age INT, - grade VARCHAR, - updateTime TIMESTAMP - ) WITH( - type='mysql', - url='jdbc:mysql://localhost:3306/test', - tableName='test4', - userName='test', - password='XXXXXX' +CREATE TABLE MyTable( + channel varchar, + pv varchar + --xctime bigint + )WITH( + type='kafka', + bootstrapServers='172.16.8.107:9092', + groupId='mqTest01', + offsetReset='latest', + topic='mqTest01', + parallelism ='1', + topicIsPattern ='false', + kafka.group.id='mqTest', + sourceDataType ='avro', + schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}' ); - -- 使用UDTF,将二进制数据解析成格式化数据 - CREATE VIEW input_view ( - name, - age, - grade, - updateTime - ) AS - SELECT - COUNT(*) as cnt, - T.ctime, - T.order, - T.name, - T.sex - from - kafka_src as S, - LATERAL TABLE (kafkapaser _message)) as T ( - ctime, - order, - name, - sex - ) - Group BY T.sex, - TUMBLE(ctime, INTERVAL '1' MINUTE); - -- 对input_view中输出的数据做计算 - CREATE VIEW view2 ( - cnt, - sex - ) AS - SELECT - COUNT(*) as cnt, - T.sex - from - input_view - Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE); - -- 使用解析出的格式化数据进行计算,并将结果输出到RDS中 - insert into rds_sink - SELECT - cnt,sex - from view2; - ``` -**UDF&UDTF** + ``` -package com.XXXX; - import com.XXXX.fastjson.JSONObject; - import org.apache.flink.table.functions.TableFunction; - import org.apache.flink.table.types.DataType; - import org.apache.flink.table.types.DataTypes; - import org.apache.flink.types.Row; - import java.io.UnsupportedEncodingException; - /** - 以下例子解析输入Kafka中的JSON字符串,并将其格式化输出 - **/ - public class kafkaUDTF extends TableFunction { - public void eval(byte[] message) { - try { - // 读入一个二进制数据,并将其转换为String格式 - String msg = new String(message, "UTF-8"); - // 提取JSON Object中各字段 - String ctime = Timestamp.valueOf(data.split('\\|')[0]); - String order = data.split('\\|')[1]; - String name = data.split('\\|')[2]; - String sex = data.split('\\|')[3]; - // 将解析出的字段放到要输出的Row()对象 - Row row = new Row(4); - row.setField(0, ctime); - row.setField(1, age); - row.setField(2, grade); - row.setField(3, updateTime); - System.out.println("Kafka message str ==>" + row.toString()); - // 输出一行 - collect(row); - } catch (ClassCastException e) { - System.out.println("Input data format error. Input data " + msg + "is not json string"); - } - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - @Override - // 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型 - // 定义输出Row()对象的字段类型 - public DataType getResultType(Object[] arguments, Class[] argTypes) { - return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING); - } - } - - package com.dp58; - package com.dp58.sql.udx; - import org.apache.flink.table.functions.FunctionContext; - import org.apache.flink.table.functions.ScalarFunction; - public class KafkaUDF extends ScalarFunction { - // 可选,open方法可以不写 - // 需要import org.apache.flink.table.functions.FunctionContext; - public String eval(byte[] message) { - // 读入一个二进制数据,并将其转换为String格式 - String msg = new String(message, "UTF-8"); - return msg.split('\\|')[0]; - } - public long eval(String b, String c) { - return eval(b) + eval(c); - } - //可选,close方法可以不写 - @Override - public void close() { - } - } - ``` + diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java new file mode 100644 index 000000000..852a381e2 --- /dev/null +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.source.kafka; + +import com.dtstack.flink.sql.source.IStreamSourceGener; +import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset; +import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; +import com.dtstack.flink.sql.util.DtStringUtil; +import com.dtstack.flink.sql.util.PluginUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.table.api.Table; +import org.apache.flink.types.Row; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Date: 2020/3/20 + * Company: www.dtstack.com + * @author maqi + */ +public abstract class AbstractKafkaSource implements IStreamSourceGener { + + private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}"; + + protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInfo) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceTableInfo.getBootstrapServers()); + + if (DtStringUtil.isJson(kafkaSourceTableInfo.getOffsetReset())) { + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.NONE.name().toLowerCase()); + } else { + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceTableInfo.getOffsetReset()); + } + + if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) { + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceTableInfo.getGroupId()); + } + + for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) { + props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key)); + } + return props; + } + + protected String generateOperatorName(String tabName, String topicName) { + return SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", tabName); + } + + protected TypeInformation getRowTypeInformation(KafkaSourceTableInfo kafkaSourceTableInfo) { + Class[] fieldClasses = kafkaSourceTableInfo.getFieldClasses(); + TypeInformation[] types = IntStream.range(0, fieldClasses.length) + .mapToObj(i -> TypeInformation.of(fieldClasses[i])) + .toArray(TypeInformation[]::new); + + return new RowTypeInfo(types, kafkaSourceTableInfo.getFields()); + } + + protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase kafkaSrc) { + if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) { + kafkaSrc.setStartFromEarliest(); + } else if (DtStringUtil.isJson(offset)) { + Map specificStartupOffsets = buildOffsetMap(offset, topicName); + kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); + } else { + kafkaSrc.setStartFromLatest(); + } + } + + /** + * kafka offset,eg.. {"0":12312,"1":12321,"2":12312} + * @param offsetJson + * @param topicName + * @return + */ + protected Map buildOffsetMap(String offsetJson, String topicName) { + try { + Properties properties = PluginUtil.jsonStrToObject(offsetJson, Properties.class); + Map offsetMap = PluginUtil.objectToMap(properties); + Map specificStartupOffsets = offsetMap + .entrySet() + .stream() + .collect(Collectors.toMap( + (Map.Entry entry) -> new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())), + (Map.Entry entry) -> Long.valueOf(entry.getValue().toString())) + ); + + return specificStartupOffsets; + } catch (Exception e) { + throw new RuntimeException("not support offsetReset type:" + offsetJson); + } + } + +} diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/enums/EKafkaOffset.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/enums/EKafkaOffset.java new file mode 100644 index 000000000..476ccc8b7 --- /dev/null +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/enums/EKafkaOffset.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.source.kafka.enums; + +/** + * Date: 2020/3/20 + * Company: www.dtstack.com + * @author maqi + */ +public enum EKafkaOffset { + + LATEST, + EARLIEST, + NONE +} diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java index 2dfb1e697..bef86f10d 100644 --- a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java @@ -19,11 +19,14 @@ package com.dtstack.flink.sql.source.kafka.table; +import com.dtstack.flink.sql.format.FormatType; +import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset; import com.dtstack.flink.sql.table.AbstractSourceParser; import com.dtstack.flink.sql.table.AbstractTableInfo; import com.dtstack.flink.sql.util.MathUtil; import java.util.Map; +import java.util.stream.Collectors; /** * Reason: @@ -37,28 +40,33 @@ public class KafkaSourceParser extends AbstractSourceParser { public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map props) throws Exception { KafkaSourceTableInfo kafkaSourceTableInfo = new KafkaSourceTableInfo(); - kafkaSourceTableInfo.setName(tableName); - kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase()))); parseFieldsInfo(fieldsInfo, kafkaSourceTableInfo); + kafkaSourceTableInfo.setName(tableName); + kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase()))); kafkaSourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase()))); - String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())); - if (bootstrapServer == null || "".equals(bootstrapServer.trim())) { - throw new Exception("BootstrapServers can not be empty!"); - } else { - kafkaSourceTableInfo.setBootstrapServers(bootstrapServer); - } + kafkaSourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase()))); kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()))); kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase()))); kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase()))); kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false)); + kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase()))); + kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()))); kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase()))); - for (String key : props.keySet()) { - if (!key.isEmpty() && key.startsWith("kafka.")) { - kafkaSourceTableInfo.addKafkaParam(key.substring(6), props.get(key).toString()); - } - } + + kafkaSourceTableInfo.setSchemaString(MathUtil.getString(props.get(KafkaSourceTableInfo.SCHEMA_STRING_KEY.toLowerCase()))); + kafkaSourceTableInfo.setFieldDelimiter(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.CSV_FIELD_DELIMITER_KEY.toLowerCase(), "|"))); + kafkaSourceTableInfo.setSourceDataType(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.SOURCE_DATA_TYPE_KEY.toLowerCase(), FormatType.DT_NEST.name()))); + + Map kafkaParams = props.keySet().stream() + .filter(key -> !key.isEmpty() && key.startsWith("kafka.")) + .collect(Collectors.toMap( + key -> key.substring(6), key -> props.get(key).toString()) + ); + + kafkaSourceTableInfo.addKafkaParam(kafkaParams); kafkaSourceTableInfo.check(); + return kafkaSourceTableInfo; } } diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java index 364cbff49..c27eee376 100644 --- a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java @@ -17,7 +17,6 @@ */ - package com.dtstack.flink.sql.source.kafka.table; import com.dtstack.flink.sql.format.FormatType; @@ -37,134 +36,124 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo { - public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers"; + public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers"; + + public static final String TOPIC_KEY = "topic"; - public static final String TOPIC_KEY = "topic"; + public static final String TYPE_KEY = "type"; - public static final String TYPE_KEY = "type"; + public static final String GROUPID_KEY = "groupId"; - public static final String GROUPID_KEY = "groupId"; + public static final String OFFSETRESET_KEY = "offsetReset"; - public static final String OFFSETRESET_KEY = "offsetReset"; + public static final String TOPICISPATTERN_KEY = "topicIsPattern"; - public static final String TOPICISPATTERN_KEY = "topicIsPattern"; + public static final String SCHEMA_STRING_KEY = "schemaInfo"; - private String bootstrapServers; + public static final String CSV_FIELD_DELIMITER_KEY = "fieldDelimiter"; - private String topic; + public static final String SOURCE_DATA_TYPE_KEY = "sourceDataType"; - private String groupId; + private String bootstrapServers; - //latest, earliest - private String offsetReset = "latest"; + private String topic; - private String offset; + private String groupId; - private Boolean topicIsPattern = false; + private String offsetReset; - private String sourceDataType = FormatType.DT_NEST.name(); + private Boolean topicIsPattern = false; - private String schemaString; + private String sourceDataType; - private String fieldDelimiter; + private String schemaString; - public String getBootstrapServers() { - return bootstrapServers; - } + private String fieldDelimiter; - public void setBootstrapServers(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - } + public Map kafkaParam = new HashMap<>(); - public String getTopic() { - return topic; - } - public void setTopic(String topic) { - this.topic = topic; - } + public String getBootstrapServers() { + return bootstrapServers; + } - public String getGroupId() { - return groupId; - } + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } - public void setGroupId(String groupId) { - this.groupId = groupId; - } + public String getTopic() { + return topic; + } - public String getOffsetReset() { - return offsetReset; - } + public void setTopic(String topic) { + this.topic = topic; + } - public void setOffsetReset(String offsetReset) { - if(offsetReset == null){ - return; - } - this.offsetReset = offsetReset; - } + public String getGroupId() { + return groupId; + } - public String getOffset() { - return offset; - } + public void setGroupId(String groupId) { + this.groupId = groupId; + } - public void setOffset(String offset) { - if (offsetReset == null) { - return; - } - this.offset = offset; - } + public String getOffsetReset() { + return offsetReset; + } - public Boolean getTopicIsPattern() { - return topicIsPattern; - } + public void setOffsetReset(String offsetReset) { + this.offsetReset = offsetReset; + } - public void setTopicIsPattern(Boolean topicIsPattern) { - this.topicIsPattern = topicIsPattern; - } + public Boolean getTopicIsPattern() { + return topicIsPattern; + } - public Map kafkaParam = new HashMap<>(); + public void setTopicIsPattern(Boolean topicIsPattern) { + this.topicIsPattern = topicIsPattern; + } - public void addKafkaParam(String key, String value) { - kafkaParam.put(key, value); - } + public void addKafkaParam(Map kafkaParam) { + kafkaParam.putAll(kafkaParam); + } - public String getKafkaParam(String key) { - return kafkaParam.get(key); - } + public String getKafkaParam(String key) { + return kafkaParam.get(key); + } - public Set getKafkaParamKeys() { - return kafkaParam.keySet(); - } + public Set getKafkaParamKeys() { + return kafkaParam.keySet(); + } - public String getSourceDataType() { - return sourceDataType; - } + public String getSourceDataType() { + return sourceDataType; + } - public void setSourceDataType(String sourceDataType) { - this.sourceDataType = sourceDataType; - } + public void setSourceDataType(String sourceDataType) { + this.sourceDataType = sourceDataType; + } - public String getSchemaString() { - return schemaString; - } + public String getSchemaString() { + return schemaString; + } - public void setSchemaString(String schemaString) { - this.schemaString = schemaString; - } + public void setSchemaString(String schemaString) { + this.schemaString = schemaString; + } - public String getFieldDelimiter() { - return fieldDelimiter; - } + public String getFieldDelimiter() { + return fieldDelimiter; + } - public void setFieldDelimiter(String fieldDelimiter) { - this.fieldDelimiter = fieldDelimiter; - } + public void setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + } - @Override - public boolean check() { - Preconditions.checkNotNull(getType(), "kafka of type is required"); - Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required"); - Preconditions.checkNotNull(topic, "kafka of topic is required"); - return false; - } + @Override + public boolean check() { + Preconditions.checkNotNull(getType(), "kafka of type is required"); + Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required"); + Preconditions.checkNotNull(topic, "kafka of topic is required"); + return false; + } } diff --git a/kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index 7e2d77c27..394ea86ee 100644 --- a/kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -18,24 +18,17 @@ package com.dtstack.flink.sql.source.kafka; -import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.AbstractSourceTableInfo; -import com.dtstack.flink.sql.util.DtStringUtil; -import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; /** @@ -43,75 +36,24 @@ * @create: 2019-11-05 10:55 * @description: **/ -public class KafkaSource implements IStreamSourceGener
{ +public class KafkaSource extends AbstractKafkaSource { - private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}"; - - /** - * Get kafka data source, you need to provide the data field names, data types - * If you do not specify auto.offset.reset, the default use groupoffset - * - * @param sourceTableInfo - * @return - */ - @SuppressWarnings("rawtypes") @Override public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { - KafkaSourceTableInfo kafkaSourceTableInfo = (KafkaSourceTableInfo) sourceTableInfo; String topicName = kafkaSourceTableInfo.getTopic(); - Properties props = new Properties(); - for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) { - props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key)); - } - props.setProperty("bootstrap.servers", kafkaSourceTableInfo.getBootstrapServers()); - if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) { - props.setProperty("auto.offset.reset", "none"); - } else { - props.setProperty("auto.offset.reset", kafkaSourceTableInfo.getOffsetReset()); - } - if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) { - props.setProperty("group.id", kafkaSourceTableInfo.getGroupId()); - } - - TypeInformation[] types = new TypeInformation[kafkaSourceTableInfo.getFields().length]; - for (int i = 0; i < kafkaSourceTableInfo.getFieldClasses().length; i++) { - types[i] = TypeInformation.of(kafkaSourceTableInfo.getFieldClasses()[i]); - } - - TypeInformation typeInformation = new RowTypeInfo(types, kafkaSourceTableInfo.getFields()); + Properties kafkaProperties = getKafkaProperties(kafkaSourceTableInfo); + TypeInformation typeInformation = getRowTypeInformation(kafkaSourceTableInfo); + FlinkKafkaConsumer kafkaSrc = (FlinkKafkaConsumer) new KafkaConsumerFactory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, kafkaProperties); - FlinkKafkaConsumer kafkaSrc = (FlinkKafkaConsumer) new KafkaConsumerFactory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, props); - - - //earliest,latest - if ("earliest".equalsIgnoreCase(kafkaSourceTableInfo.getOffsetReset())) { - kafkaSrc.setStartFromEarliest(); - } else if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) {// {"0":12312,"1":12321,"2":12312} - try { - Properties properties = PluginUtil.jsonStrToObject(kafkaSourceTableInfo.getOffsetReset(), Properties.class); - Map offsetMap = PluginUtil.objectToMap(properties); - Map specificStartupOffsets = new HashMap<>(); - for (Map.Entry entry : offsetMap.entrySet()) { - specificStartupOffsets.put(new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())), Long.valueOf(entry.getValue().toString())); - } - kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); - } catch (Exception e) { - throw new RuntimeException("not support offsetReset type:" + kafkaSourceTableInfo.getOffsetReset()); - } - } else { - kafkaSrc.setStartFromLatest(); - } + String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName); + DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); + kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism()); + setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc); String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ","); - String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName()); - DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); - Integer parallelism = kafkaSourceTableInfo.getParallelism(); - if (parallelism != null) { - kafkaSource.setParallelism(parallelism); - } return tableEnv.fromDataStream(kafkaSource, fields); } } diff --git a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index ffb466a4c..9f8917761 100644 --- a/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -17,17 +17,13 @@ */ - package com.dtstack.flink.sql.source.kafka; -import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.AbstractSourceTableInfo; import com.dtstack.flink.sql.util.DtStringUtil; -import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; @@ -36,7 +32,6 @@ import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -47,74 +42,23 @@ * @author xuchao */ -public class KafkaSource implements IStreamSourceGener
{ - - private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}"; - - /** - * Get kafka data source, you need to provide the data field names, data types - * If you do not specify auto.offset.reset, the default use groupoffset - * @param sourceTableInfo - * @return - */ - @SuppressWarnings("rawtypes") +public class KafkaSource extends AbstractKafkaSource { @Override public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { - KafkaSourceTableInfo kafkaSourceTableInfo = (KafkaSourceTableInfo) sourceTableInfo; String topicName = kafkaSourceTableInfo.getTopic(); - Properties props = new Properties(); - for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) { - props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key)); - } - props.setProperty("bootstrap.servers", kafkaSourceTableInfo.getBootstrapServers()); - if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())){ - props.setProperty("auto.offset.reset", "none"); - } else { - props.setProperty("auto.offset.reset", kafkaSourceTableInfo.getOffsetReset()); - } - if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())){ - props.setProperty("group.id", kafkaSourceTableInfo.getGroupId()); - } - // only required for Kafka 0.8 - //TODO props.setProperty("zookeeper.connect", kafkaSourceTableInfo.) + Properties kafkaProperties = getKafkaProperties(kafkaSourceTableInfo); + TypeInformation typeInformation = getRowTypeInformation(kafkaSourceTableInfo); + FlinkKafkaConsumer09 kafkaSrc = (FlinkKafkaConsumer09) new KafkaConsumer09Factory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, kafkaProperties); - TypeInformation[] types = new TypeInformation[kafkaSourceTableInfo.getFields().length]; - for(int i = 0; i< kafkaSourceTableInfo.getFieldClasses().length; i++){ - types[i] = TypeInformation.of(kafkaSourceTableInfo.getFieldClasses()[i]); - } - - TypeInformation typeInformation = new RowTypeInfo(types, kafkaSourceTableInfo.getFields()); - FlinkKafkaConsumer09 kafkaSrc = (FlinkKafkaConsumer09) new KafkaConsumer09Factory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, props); - - //earliest,latest - if("earliest".equalsIgnoreCase(kafkaSourceTableInfo.getOffsetReset())){ - kafkaSrc.setStartFromEarliest(); - }else if(DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())){// {"0":12312,"1":12321,"2":12312} - try { - Properties properties = PluginUtil.jsonStrToObject(kafkaSourceTableInfo.getOffsetReset(), Properties.class); - Map offsetMap = PluginUtil.objectToMap(properties); - Map specificStartupOffsets = new HashMap<>(); - for(Map.Entry entry:offsetMap.entrySet()){ - specificStartupOffsets.put(new KafkaTopicPartition(topicName,Integer.valueOf(entry.getKey())),Long.valueOf(entry.getValue().toString())); - } - kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); - } catch (Exception e) { - throw new RuntimeException("not support offsetReset type:" + kafkaSourceTableInfo.getOffsetReset()); - } - }else { - kafkaSrc.setStartFromLatest(); - } + String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName); + DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); + kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism()); + setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc); String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ","); - String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName()); - DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); - Integer parallelism = kafkaSourceTableInfo.getParallelism(); - if (parallelism != null) { - kafkaSource.setParallelism(parallelism); - } return tableEnv.fromDataStream(kafkaSource, fields); } } \ No newline at end of file diff --git a/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index ff4aed89d..23989ab7e 100644 --- a/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -19,24 +19,17 @@ package com.dtstack.flink.sql.source.kafka; -import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.AbstractSourceTableInfo; -import com.dtstack.flink.sql.util.DtStringUtil; -import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; /** @@ -47,74 +40,24 @@ * @author sishu.yss */ -public class KafkaSource implements IStreamSourceGener
{ +public class KafkaSource extends AbstractKafkaSource { - private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}"; + @Override + public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { + KafkaSourceTableInfo kafkaSourceTableInfo = (KafkaSourceTableInfo) sourceTableInfo; + String topicName = kafkaSourceTableInfo.getTopic(); - /** - * Get kafka data source, you need to provide the data field names, data types - * If you do not specify auto.offset.reset, the default use groupoffset - * - * @param sourceTableInfo - * @return - */ - @SuppressWarnings("rawtypes") - @Override - public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { + Properties kafkaProperties = getKafkaProperties(kafkaSourceTableInfo); + TypeInformation typeInformation = getRowTypeInformation(kafkaSourceTableInfo); + FlinkKafkaConsumer010 kafkaSrc = (FlinkKafkaConsumer010) new KafkaConsumer010Factory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, kafkaProperties); - KafkaSourceTableInfo kafkaSourceTableInfo = (KafkaSourceTableInfo) sourceTableInfo; - String topicName = kafkaSourceTableInfo.getTopic(); + String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName); + DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); + kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism()); - Properties props = new Properties(); - for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) { - props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key)); - } - props.setProperty("bootstrap.servers", kafkaSourceTableInfo.getBootstrapServers()); - if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())){ - props.setProperty("auto.offset.reset", "none"); - } else { - props.setProperty("auto.offset.reset", kafkaSourceTableInfo.getOffsetReset()); - } - if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())){ - props.setProperty("group.id", kafkaSourceTableInfo.getGroupId()); - } + setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc); + String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ","); - TypeInformation[] types = new TypeInformation[kafkaSourceTableInfo.getFields().length]; - for (int i = 0; i < kafkaSourceTableInfo.getFieldClasses().length; i++) { - types[i] = TypeInformation.of(kafkaSourceTableInfo.getFieldClasses()[i]); - } - - TypeInformation typeInformation = new RowTypeInfo(types, kafkaSourceTableInfo.getFields()); - - FlinkKafkaConsumer010 kafkaSrc = (FlinkKafkaConsumer010) new KafkaConsumer010Factory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, props); - - //earliest,latest - if ("earliest".equalsIgnoreCase(kafkaSourceTableInfo.getOffsetReset())) { - kafkaSrc.setStartFromEarliest(); - } else if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) {// {"0":12312,"1":12321,"2":12312} - try { - Properties properties = PluginUtil.jsonStrToObject(kafkaSourceTableInfo.getOffsetReset(), Properties.class); - Map offsetMap = PluginUtil.objectToMap(properties); - Map specificStartupOffsets = new HashMap<>(); - for (Map.Entry entry : offsetMap.entrySet()) { - specificStartupOffsets.put(new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())), Long.valueOf(entry.getValue().toString())); - } - kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); - } catch (Exception e) { - throw new RuntimeException("not support offsetReset type:" + kafkaSourceTableInfo.getOffsetReset()); - } - } else { - kafkaSrc.setStartFromLatest(); - } - - String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ","); - String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName()); - - DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); - Integer parallelism = kafkaSourceTableInfo.getParallelism(); - if (parallelism != null) { - kafkaSource.setParallelism(parallelism); - } - return tableEnv.fromDataStream(kafkaSource, fields); - } + return tableEnv.fromDataStream(kafkaSource, fields); + } } \ No newline at end of file diff --git a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index 11be1898a..f58d59d05 100644 --- a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -19,24 +19,16 @@ package com.dtstack.flink.sql.source.kafka; -import com.dtstack.flink.sql.source.IStreamSourceGener; import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo; import com.dtstack.flink.sql.table.AbstractSourceTableInfo; -import com.dtstack.flink.sql.util.DtStringUtil; -import com.dtstack.flink.sql.util.PluginUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; - -import java.util.HashMap; -import java.util.Map; import java.util.Properties; /** @@ -47,76 +39,24 @@ * @author sishu.yss */ -public class KafkaSource implements IStreamSourceGener
{ - - private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}"; - - /** - * Get kafka data source, you need to provide the data field names, data types - * If you do not specify auto.offset.reset, the default use groupoffset - * - * @param sourceTableInfo - * @return - */ - @SuppressWarnings("rawtypes") - @Override - public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { - - KafkaSourceTableInfo kafkaSourceTableInfo = (KafkaSourceTableInfo) sourceTableInfo; - String topicName = kafkaSourceTableInfo.getTopic(); - - Properties props = new Properties(); - for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) { - props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key)); - } - props.setProperty("bootstrap.servers", kafkaSourceTableInfo.getBootstrapServers()); - if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())){ - props.setProperty("auto.offset.reset", "none"); - } else { - props.setProperty("auto.offset.reset", kafkaSourceTableInfo.getOffsetReset()); - } - if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())){ - props.setProperty("group.id", kafkaSourceTableInfo.getGroupId()); - } - // only required for Kafka 0.8 - //TODO props.setProperty("zookeeper.connect", kafka09SourceTableInfo.) - - TypeInformation[] types = new TypeInformation[kafkaSourceTableInfo.getFields().length]; - for (int i = 0; i < kafkaSourceTableInfo.getFieldClasses().length; i++) { - types[i] = TypeInformation.of(kafkaSourceTableInfo.getFieldClasses()[i]); - } +public class KafkaSource extends AbstractKafkaSource { - TypeInformation typeInformation = new RowTypeInfo(types, kafkaSourceTableInfo.getFields()); + @Override + public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) { + KafkaSourceTableInfo kafkaSourceTableInfo = (KafkaSourceTableInfo) sourceTableInfo; + String topicName = kafkaSourceTableInfo.getTopic(); - FlinkKafkaConsumer011 kafkaSrc = (FlinkKafkaConsumer011) new KafkaConsumer011Factory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, props); + Properties kafkaProperties = getKafkaProperties(kafkaSourceTableInfo); + TypeInformation typeInformation = getRowTypeInformation(kafkaSourceTableInfo); + FlinkKafkaConsumer011 kafkaSrc = (FlinkKafkaConsumer011) new KafkaConsumer011Factory().createKafkaTableSource(kafkaSourceTableInfo, typeInformation, kafkaProperties); - //earliest,latest - if ("earliest".equalsIgnoreCase(kafkaSourceTableInfo.getOffsetReset())) { - kafkaSrc.setStartFromEarliest(); - } else if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) {// {"0":12312,"1":12321,"2":12312} - try { - Properties properties = PluginUtil.jsonStrToObject(kafkaSourceTableInfo.getOffsetReset(), Properties.class); - Map offsetMap = PluginUtil.objectToMap(properties); - Map specificStartupOffsets = new HashMap<>(); - for (Map.Entry entry : offsetMap.entrySet()) { - specificStartupOffsets.put(new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())), Long.valueOf(entry.getValue().toString())); - } - kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets); - } catch (Exception e) { - throw new RuntimeException("not support offsetReset type:" + kafkaSourceTableInfo.getOffsetReset()); - } - } else { - kafkaSrc.setStartFromLatest(); - } + String sourceOperatorName = generateOperatorName(sourceTableInfo.getName(), topicName); + DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); + kafkaSource.setParallelism(kafkaSourceTableInfo.getParallelism()); - String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ","); - String sourceOperatorName = SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", sourceTableInfo.getName()); + setStartPosition(kafkaSourceTableInfo.getOffsetReset(), topicName, kafkaSrc); + String fields = StringUtils.join(kafkaSourceTableInfo.getFields(), ","); - DataStreamSource kafkaSource = env.addSource(kafkaSrc, sourceOperatorName, typeInformation); - Integer parallelism = kafkaSourceTableInfo.getParallelism(); - if (parallelism != null) { - kafkaSource.setParallelism(parallelism); - } - return tableEnv.fromDataStream(kafkaSource, fields); - } + return tableEnv.fromDataStream(kafkaSource, fields); + } }