diff --git a/README.md b/README.md index ccf94f1ff..c84d77d5a 100644 --- a/README.md +++ b/README.md @@ -7,34 +7,11 @@ > > * 支持原生FLinkSQL所有的语法 > > * 扩展了输入和输出的性能指标到promethus - ## 新特性: - * 1.kafka源表支持not null语法,支持字符串类型的时间转换。 - * 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。 - * 3.异步维表支持非等值连接,比如:<>,<,>。 - * 4.增加kafka数组解析 - * 5.增加kafka1.0以上版本的支持 - * 6.增加postgresql、kudu、clickhouse维表、结果表的支持 - * 7.支持插件的依赖方式,参考pluginLoadMode参数 - * 8.支持cep处理 - * 9.支持udaf - * 10.支持谓词下移 - * 11.支持状态的ttl - - ## BUG修复: - * 1.修复不能解析sql中orderby,union语法。 - * 2.修复yarnPer模式提交失败的异常。 - * 3.一些bug的修复 - # 已支持 * 源表:kafka 0.9、0.10、0.11、1.x版本 * 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver * 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver -# 后续开发计划 - * 维表快照 - * kafka avro格式 - * topN - ## 1 快速起步 ### 1.1 运行模式 @@ -205,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack * [impala 结果表插件](docs/impalaSink.md) * [db2 结果表插件](docs/db2Sink.md) * [sqlserver 结果表插件](docs/sqlserverSink.md) +* [kafka 结果表插件](docs/kafkaSink.md) ### 2.3 维表插件 * [hbase 维表插件](docs/hbaseSide.md) diff --git a/docs/kafkaSink.md b/docs/kafkaSink.md new file mode 100644 index 000000000..3c6eb1dc6 --- /dev/null +++ b/docs/kafkaSink.md @@ -0,0 +1,223 @@ +## 1.格式: +``` +CREATE TABLE tableName( + colName colType, + ... + function(colNameX) AS aliasName, + WATERMARK FOR colName AS withOffset( colName , delayTime ) + )WITH( + type ='kafka11', + bootstrapServers ='ip:port,ip:port...', + zookeeperQuorum ='ip:port,ip:port/zkparent', + offsetReset ='latest', + topic ='topicName', + groupId='test', + parallelism ='parllNum', + ); +``` + +## 2.支持的版本 + kafka09,kafka10,kafka11及以上版本 + **kafka读取和写入的版本必须一致,否则会有兼容性错误。** + +## 3.表结构定义 + +|参数名称|含义| +|----|---| +| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称| +| colName | 列名称| +| colType | 列类型 [colType支持的类型](colType.md)| + +## 4.参数: + +|参数名称|含义|是否必填|默认值| +|----|---|---|---| +|type | kafka09 | 是|kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本)| +|groupId | 需要读取的 groupId 名称|否|| +|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| +|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| +|topic | 需要读取的 topic 名称|是|| +|parallelism | 并行度设置|否|1| +|partitionKeys | 用来分区的字段|否|| +|updateMode | 回溯流数据下发模式,append,upsert.upsert模式下会将是否为回溯信息以字段形式进行下发。|否|append| +|sinkdatatype | 写入kafka数据格式,json,avro,csv|否|json| +|fieldDelimiter | csv数据分隔符|否| \ | + + +**kafka相关参数可以自定义,使用kafka.开头即可。** +``` +kafka.consumer.id +kafka.socket.timeout.ms +kafka.fetch.message.max.bytes +kafka.num.consumer.fetchers +kafka.auto.commit.enable +kafka.auto.commit.interval.ms +kafka.queued.max.message.chunks +kafka.rebalance.max.retries +kafka.fetch.min.bytes +kafka.fetch.wait.max.ms +kafka.rebalance.backoff.ms +kafka.refresh.leader.backoff.ms +kafka.consumer.timeout.ms +kafka.exclude.internal.topics +kafka.partition.assignment.strategy +kafka.client.id +kafka.zookeeper.session.timeout.ms +kafka.zookeeper.connection.timeout.ms +kafka.zookeeper.sync.time.ms +kafka.offsets.storage +kafka.offsets.channel.backoff.ms +kafka.offsets.channel.socket.timeout.ms +kafka.offsets.commit.max.retries +kafka.dual.commit.enabled +kafka.partition.assignment.strategy +kafka.socket.receive.buffer.bytes +kafka.fetch.min.bytes +``` + +## 5.样例: + +### json格式: +``` +CREATE TABLE MyResult( + channel varchar, + pv varchar + )WITH( + type='kafka', + bootstrapServers='172.16.8.107:9092', + topic='mqTest02', + parallelism ='2', + partitionKeys = 'channel,pv', + updateMode='upsert' + ); + +upsert模式下发的数据格式:{"channel":"zs","pv":"330",retract:true} +append模式下发的数据格式:{"channel":"zs","pv":"330"} + +``` + +### avro格式: + +如果updateMode='upsert',schemaInfo需要包含retract属性信息。 + +``` +CREATE TABLE MyTable( + channel varchar, + pv varchar + --xctime bigint + )WITH( + type='kafka', + bootstrapServers='172.16.8.107:9092', + groupId='mqTest01', + offsetReset='latest', + topic='mqTest01', + parallelism ='1', + topicIsPattern ='false' + ); + +create table sideTable( + channel varchar, + xccount int, + PRIMARY KEY(channel), + PERIOD FOR SYSTEM_TIME + )WITH( + type='mysql', + url='jdbc:mysql://172.16.8.109:3306/test?charset=utf8', + userName='dtstack', + password='abc123', + tableName='sidetest', + cache = 'LRU', + cacheTTLMs='10000', + parallelism ='1' + + ); + + +CREATE TABLE MyResult( + channel varchar, + pv varchar + )WITH( + --type='console' + type='kafka', + bootstrapServers='172.16.8.107:9092', + topic='mqTest02', + parallelism ='1', + updateMode='upsert', + sinkdatatype = 'avro', + schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"} + ,{"name":"pv","type":"string"},{"name":"channel","type":"string"}, + {"name":"retract","type":"boolean"}]}' + + ); + + +insert +into + MyResult + select + a.channel as channel, + a.pv as pv + from + MyTable a +``` +### csv格式: + +``` +CREATE TABLE MyTable( + channel varchar, + pv varchar + --xctime bigint + )WITH( + type='kafka', + bootstrapServers='172.16.8.107:9092', + groupId='mqTest01', + offsetReset='latest', + topic='mqTest01', + parallelism ='2', + topicIsPattern ='false' + ); + +create table sideTable( + channel varchar, + xccount int, + PRIMARY KEY(channel), + PERIOD FOR SYSTEM_TIME + )WITH( + type='mysql', + url='jdbc:mysql://172.16.8.109:3306/test?charset=utf8', + userName='dtstack', + password='abc123', + tableName='sidetest', + cache = 'LRU', + cacheTTLMs='10000', + parallelism ='1' + + ); + + +CREATE TABLE MyResult( + channel varchar, + pv varchar + )WITH( + type='kafka', + bootstrapServers='172.16.8.107:9092', + topic='mqTest02', + parallelism ='2', + updateMode='upsert', + sinkdatatype = 'csv', + fieldDelimiter='*' + + + + ); + + +insert +into + MyResult + select + a.channel as channel, + a.pv as pv + from + MyTable a +``` diff --git a/docs/kafkaSource.md b/docs/kafkaSource.md index 76096aba6..9b488863e 100644 --- a/docs/kafkaSource.md +++ b/docs/kafkaSource.md @@ -1,14 +1,12 @@ ## 1.格式: ``` -数据现在支持json格式{"xx":"bb","cc":"dd"} - CREATE TABLE tableName( colName colType, ... function(colNameX) AS aliasName, WATERMARK FOR colName AS withOffset( colName , delayTime ) )WITH( - type ='kafka09', + type ='kafka11', bootstrapServers ='ip:port,ip:port...', zookeeperQuorum ='ip:port,ip:port/zkparent', offsetReset ='latest', @@ -89,7 +87,7 @@ CREATE TABLE MyTable( xctime bigint, CHARACTER_LENGTH(channel) AS timeLeng )WITH( - type ='kafka09', + type ='kafka11', bootstrapServers ='172.16.8.198:9092', zookeeperQuorum ='172.16.8.198:2181/kafka', offsetReset ='latest', @@ -208,181 +206,3 @@ CREATE TABLE MyTable( lengthcheckpolicy = 'PAD' ); ``` -# 三、text格式数据源UDF自定义拆分 -Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。 - 与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致: - -## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。 -``` -create table kafka_stream( - _topic STRING, - _messageKey STRING, - _message STRING, - _partition INT, - _offset BIGINT, -) with ( - type ='kafka09', - bootstrapServers ='172.16.8.198:9092', - zookeeperQuorum ='172.16.8.198:2181/kafka', - offsetReset ='latest', - topic ='nbTest1', - parallelism ='1', - sourcedatatype='text' - ) -``` -## 2.参数: - -|参数名称|含义|是否必填|默认值| -|----|---|---|---| -|type | kafka09 | 是|| -|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|是|| -|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|是|| -|topic | 需要读取的 topic 名称|是|| -|offsetReset | 读取的topic 的offset初始位置[latest|earliest]|否|latest| -|parallelism | 并行度设置|否|1| -|sourcedatatype | 数据类型|否|text| -**kafka相关参数可以自定义,使用kafka.开头即可。** - -## 2.自定义: -从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作, -需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下: -2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考) - -**SQL** -``` --- 定义解析Kakfa message的UDTF - CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF'; - CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF'; - -- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。 - create table kafka_src ( - _topic STRING, - _messageKey STRING, - _message STRING, - _partition INT, - _offset BIGINT, - ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。 - watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark - ) WITH ( - type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档 - topic = 'test_kafka_topic', - ... - ); - create table rds_sink ( - name VARCHAR, - age INT, - grade VARCHAR, - updateTime TIMESTAMP - ) WITH( - type='mysql', - url='jdbc:mysql://localhost:3306/test', - tableName='test4', - userName='test', - password='XXXXXX' - ); - -- 使用UDTF,将二进制数据解析成格式化数据 - CREATE VIEW input_view ( - name, - age, - grade, - updateTime - ) AS - SELECT - COUNT(*) as cnt, - T.ctime, - T.order, - T.name, - T.sex - from - kafka_src as S, - LATERAL TABLE (kafkapaser _message)) as T ( - ctime, - order, - name, - sex - ) - Group BY T.sex, - TUMBLE(ctime, INTERVAL '1' MINUTE); - -- 对input_view中输出的数据做计算 - CREATE VIEW view2 ( - cnt, - sex - ) AS - SELECT - COUNT(*) as cnt, - T.sex - from - input_view - Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE); - -- 使用解析出的格式化数据进行计算,并将结果输出到RDS中 - insert into rds_sink - SELECT - cnt,sex - from view2; - ``` -**UDF&UDTF** -``` -package com.XXXX; - import com.XXXX.fastjson.JSONObject; - import org.apache.flink.table.functions.TableFunction; - import org.apache.flink.table.types.DataType; - import org.apache.flink.table.types.DataTypes; - import org.apache.flink.types.Row; - import java.io.UnsupportedEncodingException; - /** - 以下例子解析输入Kafka中的JSON字符串,并将其格式化输出 - **/ - public class kafkaUDTF extends TableFunction { - public void eval(byte[] message) { - try { - // 读入一个二进制数据,并将其转换为String格式 - String msg = new String(message, "UTF-8"); - // 提取JSON Object中各字段 - String ctime = Timestamp.valueOf(data.split('\\|')[0]); - String order = data.split('\\|')[1]; - String name = data.split('\\|')[2]; - String sex = data.split('\\|')[3]; - // 将解析出的字段放到要输出的Row()对象 - Row row = new Row(4); - row.setField(0, ctime); - row.setField(1, age); - row.setField(2, grade); - row.setField(3, updateTime); - System.out.println("Kafka message str ==>" + row.toString()); - // 输出一行 - collect(row); - } catch (ClassCastException e) { - System.out.println("Input data format error. Input data " + msg + "is not json string"); - } - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - @Override - // 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型 - // 定义输出Row()对象的字段类型 - public DataType getResultType(Object[] arguments, Class[] argTypes) { - return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING); - } - } - - package com.dp58; - package com.dp58.sql.udx; - import org.apache.flink.table.functions.FunctionContext; - import org.apache.flink.table.functions.ScalarFunction; - public class KafkaUDF extends ScalarFunction { - // 可选,open方法可以不写 - // 需要import org.apache.flink.table.functions.FunctionContext; - public String eval(byte[] message) { - // 读入一个二进制数据,并将其转换为String格式 - String msg = new String(message, "UTF-8"); - return msg.split('\\|')[0]; - } - public long eval(String b, String c) { - return eval(b) + eval(c); - } - //可选,close方法可以不写 - @Override - public void close() { - } - } - ``` diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java index ebd313b29..9dcaf222b 100644 --- a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java @@ -75,15 +75,16 @@ private SerializationSchema createSerializationSchema(KafkaSinkTableInfo k if (StringUtils.isBlank(kafkaSinkTableInfo.getFieldDelimiter())) { throw new IllegalArgumentException("sinkDataType:" + FormatType.CSV.name() + " must set fieldDelimiter"); } - final CsvCRowSerializationSchema.Builder serSchemaBuilder = new CsvCRowSerializationSchema.Builder(typeInformation); serSchemaBuilder.setFieldDelimiter(kafkaSinkTableInfo.getFieldDelimiter().toCharArray()[0]); + serSchemaBuilder.setUpdateMode(kafkaSinkTableInfo.getUpdateMode()); + serializationSchema = serSchemaBuilder.build(); } else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) { if (StringUtils.isBlank(kafkaSinkTableInfo.getSchemaString())) { throw new IllegalArgumentException("sinkDataType:" + FormatType.AVRO.name() + " must set schemaString"); } - serializationSchema = new AvroCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(),kafkaSinkTableInfo.getUpdateMode()); + serializationSchema = new AvroCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(), kafkaSinkTableInfo.getUpdateMode()); } if (null == serializationSchema) { diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java index 34fa22c99..692e208b5 100644 --- a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java @@ -36,8 +36,6 @@ import org.apache.avro.util.Utf8; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.formats.avro.AvroRowDeserializationSchema; -import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -55,17 +53,16 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.TimeZone; +import java.util.stream.Collectors; /** * Serialization schema that serializes CROW into Avro bytes. * *

Serializes objects that are represented in (nested) Flink rows. It support types that * are compatible with Flink's Table & SQL API. - * - *

Note: Changes in this class need to be kept in sync with the corresponding runtime - * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}. - * + ** * @author maqi */ public class AvroCRowSerializationSchema implements SerializationSchema { @@ -107,14 +104,14 @@ public class AvroCRowSerializationSchema implements SerializationSchema { private String updateMode; - private final String retractKey = "retract"; + private String retractKey = "retract"; /** * Creates an Avro serialization schema for the given specific record class. * * @param recordClazz Avro record class used to serialize Flink's row to Avro's record */ - public AvroCRowSerializationSchema(Class recordClazz) { + public AvroCRowSerializationSchema(Class recordClazz, String updateMode) { Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); this.recordClazz = recordClazz; this.schema = SpecificData.get().getSchema(recordClazz); @@ -122,6 +119,7 @@ public AvroCRowSerializationSchema(Class recordClazz) this.datumWriter = new SpecificDatumWriter<>(schema); this.arrayOutputStream = new ByteArrayOutputStream(); this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + this.updateMode = updateMode; } /** @@ -152,10 +150,10 @@ public byte[] serialize(CRow crow) { // convert to record final GenericRecord record = convertRowToAvroRecord(schema, row); + + dealRetractField(change, record); + arrayOutputStream.reset(); - if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) { - record.put(retractKey, change); - } datumWriter.write(record, encoder); encoder.flush(); return arrayOutputStream.toByteArray(); @@ -164,6 +162,18 @@ public byte[] serialize(CRow crow) { } } + protected void dealRetractField(boolean change, GenericRecord record) { + schema.getFields() + .stream() + .filter(field -> StringUtils.equalsIgnoreCase(field.name(), retractKey)) + .findFirst() + .ifPresent(field -> { + if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) { + record.put(retractKey, convertFlinkType(field.schema(), change)); + } + }); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -184,7 +194,12 @@ public int hashCode() { // -------------------------------------------------------------------------------------------- private GenericRecord convertRowToAvroRecord(Schema schema, Row row) { - final List fields = schema.getFields(); + + final List fields = schema.getFields() + .stream() + .filter(field -> !StringUtils.equalsIgnoreCase(field.name(), retractKey)) + .collect(Collectors.toList()); + final int length = fields.size(); final GenericRecord record = new GenericData.Record(schema); for (int i = 0; i < length; i++) { @@ -328,6 +343,8 @@ private long convertFromTimestamp(Schema schema, Timestamp date) { private void writeObject(ObjectOutputStream outputStream) throws IOException { outputStream.writeObject(recordClazz); outputStream.writeObject(schemaString); // support for null + outputStream.writeObject(retractKey); + outputStream.writeObject(updateMode); } @SuppressWarnings("unchecked") @@ -339,6 +356,9 @@ private void readObject(ObjectInputStream inputStream) throws ClassNotFoundExcep } else { schema = new Schema.Parser().parse(schemaString); } + retractKey = (String) inputStream.readObject(); + updateMode = (String) inputStream.readObject(); + datumWriter = new SpecificDatumWriter<>(schema); arrayOutputStream = new ByteArrayOutputStream(); encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java index 903395f9d..4e57b6f2a 100644 --- a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java @@ -47,7 +47,10 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; +import java.util.Iterator; import java.util.Objects; +import java.util.stream.IntStream; +import java.util.stream.Stream; /** * Serialization schema that serializes an object of Flink types into a CSV bytes. @@ -72,25 +75,27 @@ public final class CsvCRowSerializationSchema implements SerializationSchema kafkaParam = new HashMap(); @@ -119,9 +129,27 @@ public boolean check() { Preconditions.checkNotNull(getType(), "kafka of type is required"); Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required"); Preconditions.checkNotNull(topic, "kafka of topic is required"); + + if (StringUtils.equalsIgnoreCase(getSinkDataType(), FormatType.AVRO.name())) { + avroParamCheck(); + } + return false; } + public void avroParamCheck() { + Preconditions.checkNotNull(schemaString, "avro type schemaInfo is required"); + if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) { + Schema schema = new Schema.Parser().parse(schemaString); + schema.getFields() + .stream() + .filter(field -> StringUtils.equalsIgnoreCase(field.name(), RETRACT_FIELD_KEY)) + .findFirst() + .orElseThrow(() -> + new NullPointerException(String.valueOf("arvo upsert mode the retract attribute must be contained in schemaInfo field "))); + } + } + public String getEnableKeyPartition() { return enableKeyPartition; }