From ec2b84e7e1fa0c54aa7602923768b86671ab79b8 Mon Sep 17 00:00:00 2001 From: maqi Date: Tue, 17 Mar 2020 20:49:15 +0800 Subject: [PATCH] kafk update mode --- .../format/SerializationMetricWrapper.java | 11 +- .../kafka/AbstractKafkaProducerFactory.java | 31 +- .../CustomerKeyedSerializationSchema.java | 35 +- .../AvroCRowSerializationSchema.java | 346 +++++++++++++++++ .../CsvCRowSerializationSchema.java | 349 ++++++++++++++++++ .../JsonCRowSerializationSchema.java | 234 ++++++++++++ .../sql/sink/kafka/table/KafkaSinkParser.java | 2 + .../sink/kafka/table/KafkaSinkTableInfo.java | 13 +- .../KafkaDeserializationMetricWrapper.java | 10 +- .../flink/sql/sink/kafka/KafkaProducer.java | 6 +- .../sql/sink/kafka/KafkaProducerFactory.java | 4 +- .../flink/sql/sink/kafka/KafkaSink.java | 18 +- .../flink/sql/sink/kafka/KafkaProducer09.java | 5 +- .../sink/kafka/KafkaProducer09Factory.java | 3 +- .../flink/sql/sink/kafka/KafkaSink.java | 21 +- .../sql/sink/kafka/KafkaProducer010.java | 6 +- .../sink/kafka/KafkaProducer010Factory.java | 3 +- .../flink/sql/sink/kafka/KafkaSink.java | 21 +- .../sql/sink/kafka/KafkaProducer011.java | 6 +- .../sink/kafka/KafkaProducer011Factory.java | 3 +- .../flink/sql/sink/kafka/KafkaSink.java | 20 +- .../source/kafka/KafkaConsumer011Factory.java | 2 +- .../flink/sql/source/kafka/KafkaSource.java | 4 +- 23 files changed, 1067 insertions(+), 86 deletions(-) create mode 100644 kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java create mode 100644 kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java create mode 100644 kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/JsonCRowSerializationSchema.java diff --git a/core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java b/core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java index 3a5af18b1..8802198a0 100644 --- a/core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java +++ b/core/src/main/java/com/dtstack/flink/sql/format/SerializationMetricWrapper.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; @@ -34,11 +35,11 @@ * author: toutian * create: 2019/12/24 */ -public class SerializationMetricWrapper implements SerializationSchema { +public class SerializationMetricWrapper implements SerializationSchema { private static final long serialVersionUID = 1L; - private SerializationSchema serializationSchema; + private SerializationSchema serializationSchema; private transient RuntimeContext runtimeContext; @@ -47,7 +48,7 @@ public class SerializationMetricWrapper implements SerializationSchema { protected transient Meter dtNumRecordsOutRate; - public SerializationMetricWrapper(SerializationSchema serializationSchema) { + public SerializationMetricWrapper(SerializationSchema serializationSchema) { this.serializationSchema = serializationSchema; } @@ -57,7 +58,7 @@ public void initMetric() { } @Override - public byte[] serialize(Row element) { + public byte[] serialize(CRow element) { beforeSerialize(); byte[] row = serializationSchema.serialize(element); afterSerialize(); @@ -79,7 +80,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) { this.runtimeContext = runtimeContext; } - public SerializationSchema getSerializationSchema() { + public SerializationSchema getSerializationSchema() { return serializationSchema; } diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java index 88c2ca939..ebd313b29 100644 --- a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaProducerFactory.java @@ -19,15 +19,18 @@ import com.dtstack.flink.sql.format.FormatType; import com.dtstack.flink.sql.format.SerializationMetricWrapper; +import com.dtstack.flink.sql.sink.kafka.serialization.AvroCRowSerializationSchema; +import com.dtstack.flink.sql.sink.kafka.serialization.CsvCRowSerializationSchema; +import com.dtstack.flink.sql.sink.kafka.serialization.JsonCRowSerializationSchema; import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.avro.AvroRowSerializationSchema; import org.apache.flink.formats.csv.CsvRowSerializationSchema; -import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import java.util.Optional; @@ -51,42 +54,36 @@ public abstract class AbstractKafkaProducerFactory { * @param partitioner * @return */ - public abstract RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys); + public abstract RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys); - protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation) { - return new SerializationMetricWrapper(createSerializationSchema(kafkaSinkTableInfo, typeInformation)); + protected SerializationMetricWrapper createSerializationMetricWrapper(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation) { + SerializationSchema serializationSchema = createSerializationSchema(kafkaSinkTableInfo, typeInformation); + return new SerializationMetricWrapper(serializationSchema); } - private SerializationSchema createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation) { - SerializationSchema serializationSchema = null; + private SerializationSchema createSerializationSchema(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation) { + SerializationSchema serializationSchema = null; if (FormatType.JSON.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) { - if (StringUtils.isNotBlank(kafkaSinkTableInfo.getSchemaString())) { - serializationSchema = new JsonRowSerializationSchema(kafkaSinkTableInfo.getSchemaString()); + serializationSchema = new JsonCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(), kafkaSinkTableInfo.getUpdateMode()); } else if (typeInformation != null && typeInformation.getArity() != 0) { - serializationSchema = new JsonRowSerializationSchema(typeInformation); + serializationSchema = new JsonCRowSerializationSchema(typeInformation, kafkaSinkTableInfo.getUpdateMode()); } else { throw new IllegalArgumentException("sinkDataType:" + FormatType.JSON.name() + " must set schemaStringļ¼ˆJSON Schemaļ¼‰or TypeInformation"); } - } else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) { - if (StringUtils.isBlank(kafkaSinkTableInfo.getFieldDelimiter())) { throw new IllegalArgumentException("sinkDataType:" + FormatType.CSV.name() + " must set fieldDelimiter"); } - final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(typeInformation); + final CsvCRowSerializationSchema.Builder serSchemaBuilder = new CsvCRowSerializationSchema.Builder(typeInformation); serSchemaBuilder.setFieldDelimiter(kafkaSinkTableInfo.getFieldDelimiter().toCharArray()[0]); serializationSchema = serSchemaBuilder.build(); - } else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSinkTableInfo.getSinkDataType())) { - if (StringUtils.isBlank(kafkaSinkTableInfo.getSchemaString())) { throw new IllegalArgumentException("sinkDataType:" + FormatType.AVRO.name() + " must set schemaString"); } - - serializationSchema = new AvroRowSerializationSchema(kafkaSinkTableInfo.getSchemaString()); - + serializationSchema = new AvroCRowSerializationSchema(kafkaSinkTableInfo.getSchemaString(),kafkaSinkTableInfo.getUpdateMode()); } if (null == serializationSchema) { diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java index 498766564..cde8d1b1d 100644 --- a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java @@ -2,18 +2,20 @@ import com.dtstack.flink.sql.format.SerializationMetricWrapper; +import com.dtstack.flink.sql.sink.kafka.serialization.JsonCRowSerializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicLong; -public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema { +public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema { private static final Logger LOG = LoggerFactory.getLogger(CustomerKeyedSerializationSchema.class); @@ -30,38 +32,41 @@ public CustomerKeyedSerializationSchema(SerializationMetricWrapper serialization this.mapper = new ObjectMapper(); } - public byte[] serializeKey(Row element) { - if(partitionKeys == null || partitionKeys.length <=0){ + @Override + public byte[] serializeKey(CRow element) { + if (partitionKeys == null || partitionKeys.length <= 0) { return null; - } - SerializationSchema serializationSchema = serializationMetricWrapper.getSerializationSchema(); - if(serializationSchema instanceof JsonRowSerializationSchema){ - return serializeJsonKey((JsonRowSerializationSchema) serializationSchema, element); + } + SerializationSchema serializationSchema = serializationMetricWrapper.getSerializationSchema(); + if (serializationSchema instanceof JsonCRowSerializationSchema) { + return serializeJsonKey((JsonCRowSerializationSchema) serializationSchema, element); } return null; } - public byte[] serializeValue(Row element) { + @Override + public byte[] serializeValue(CRow element) { return this.serializationMetricWrapper.serialize(element); } - public String getTargetTopic(Row element) { + @Override + public String getTargetTopic(CRow element) { return null; } - private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationSchema, Row element) { + private byte[] serializeJsonKey(JsonCRowSerializationSchema jsonCRowSerializationSchema, CRow element) { try { - byte[] data = jsonRowSerializationSchema.serialize(element); + byte[] data = jsonCRowSerializationSchema.serialize(element); ObjectNode objectNode = mapper.readValue(data, ObjectNode.class); StringBuilder sb = new StringBuilder(); - for(String key : partitionKeys){ - if(objectNode.has(key)){ + for (String key : partitionKeys) { + if (objectNode.has(key)) { sb.append(objectNode.get(key.trim())); } } return sb.toString().getBytes(); - } catch (Exception e){ - if(COUNTER.getAndIncrement() % 1000 == 0){ + } catch (Exception e) { + if (COUNTER.getAndIncrement() % 1000 == 0) { LOG.error("serializeJsonKey error", e); } } diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java new file mode 100644 index 000000000..34fa22c99 --- /dev/null +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/AvroCRowSerializationSchema.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.kafka.serialization; + +import com.dtstack.flink.sql.enums.EUpdateMode; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.avro.AvroRowDeserializationSchema; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TimeZone; + +/** + * Serialization schema that serializes CROW into Avro bytes. + * + *

Serializes objects that are represented in (nested) Flink rows. It support types that + * are compatible with Flink's Table & SQL API. + * + *

Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}. + * + * @author maqi + */ +public class AvroCRowSerializationSchema implements SerializationSchema { + + /** + * Used for time conversions from SQL types. + */ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** + * Avro record class for serialization. Might be null if record class is not available. + */ + private Class recordClazz; + + /** + * Schema string for deserialization. + */ + private String schemaString; + + /** + * Avro serialization schema. + */ + private transient Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private transient DatumWriter datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private transient ByteArrayOutputStream arrayOutputStream; + + /** + * Low-level class for serialization of Avro values. + */ + private transient Encoder encoder; + + private String updateMode; + + private final String retractKey = "retract"; + + /** + * Creates an Avro serialization schema for the given specific record class. + * + * @param recordClazz Avro record class used to serialize Flink's row to Avro's record + */ + public AvroCRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.schema = SpecificData.get().getSchema(recordClazz); + this.schemaString = schema.toString(); + this.datumWriter = new SpecificDatumWriter<>(schema); + this.arrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + } + + /** + * Creates an Avro serialization schema for the given Avro schema string. + * + * @param avroSchemaString Avro schema string used to serialize Flink's row to Avro's record + */ + public AvroCRowSerializationSchema(String avroSchemaString,String updateMode) { + Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null."); + this.recordClazz = null; + this.schemaString = avroSchemaString; + try { + this.schema = new Schema.Parser().parse(avroSchemaString); + } catch (SchemaParseException e) { + throw new IllegalArgumentException("Could not parse Avro schema string.", e); + } + this.datumWriter = new GenericDatumWriter<>(schema); + this.arrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + this.updateMode = updateMode; + } + + @Override + public byte[] serialize(CRow crow) { + try { + Row row = crow.row(); + boolean change = crow.change(); + + // convert to record + final GenericRecord record = convertRowToAvroRecord(schema, row); + arrayOutputStream.reset(); + if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) { + record.put(retractKey, change); + } + datumWriter.write(record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize row.", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroCRowSerializationSchema that = (AvroCRowSerializationSchema) o; + return Objects.equals(recordClazz, that.recordClazz) && Objects.equals(schemaString, that.schemaString); + } + + @Override + public int hashCode() { + return Objects.hash(recordClazz, schemaString); + } + + // -------------------------------------------------------------------------------------------- + + private GenericRecord convertRowToAvroRecord(Schema schema, Row row) { + final List fields = schema.getFields(); + final int length = fields.size(); + final GenericRecord record = new GenericData.Record(schema); + for (int i = 0; i < length; i++) { + final Schema.Field field = fields.get(i); + record.put(i, convertFlinkType(field.schema(), row.getField(i))); + } + return record; + } + + private Object convertFlinkType(Schema schema, Object object) { + if (object == null) { + return null; + } + switch (schema.getType()) { + case RECORD: + if (object instanceof Row) { + return convertRowToAvroRecord(schema, (Row) object); + } + throw new IllegalStateException("Row expected but was: " + object.getClass()); + case ENUM: + return new GenericData.EnumSymbol(schema, object.toString()); + case ARRAY: + final Schema elementSchema = schema.getElementType(); + final Object[] array = (Object[]) object; + final GenericData.Array convertedArray = new GenericData.Array<>(array.length, schema); + for (Object element : array) { + convertedArray.add(convertFlinkType(elementSchema, element)); + } + return convertedArray; + case MAP: + final Map map = (Map) object; + final Map convertedMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + convertedMap.put( + new Utf8(entry.getKey().toString()), + convertFlinkType(schema.getValueType(), entry.getValue())); + } + return convertedMap; + case UNION: + final List types = schema.getTypes(); + final int size = types.size(); + final Schema actualSchema; + if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { + actualSchema = types.get(1); + } else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { + actualSchema = types.get(0); + } else if (size == 1) { + actualSchema = types.get(0); + } else { + // generic type + return object; + } + return convertFlinkType(actualSchema, object); + case FIXED: + // check for logical type + if (object instanceof BigDecimal) { + return new GenericData.Fixed( + schema, + convertFromDecimal(schema, (BigDecimal) object)); + } + return new GenericData.Fixed(schema, (byte[]) object); + case STRING: + return new Utf8(object.toString()); + case BYTES: + // check for logical type + if (object instanceof BigDecimal) { + return ByteBuffer.wrap(convertFromDecimal(schema, (BigDecimal) object)); + } + return ByteBuffer.wrap((byte[]) object); + case INT: + // check for logical types + if (object instanceof Date) { + return convertFromDate(schema, (Date) object); + } else if (object instanceof Time) { + return convertFromTime(schema, (Time) object); + } + return object; + case LONG: + // check for logical type + if (object instanceof Timestamp) { + return convertFromTimestamp(schema, (Timestamp) object); + } + return object; + case FLOAT: + case DOUBLE: + case BOOLEAN: + return object; + } + throw new RuntimeException("Unsupported Avro type:" + schema); + } + + private byte[] convertFromDecimal(Schema schema, BigDecimal decimal) { + final LogicalType logicalType = schema.getLogicalType(); + if (logicalType instanceof LogicalTypes.Decimal) { + final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + // rescale to target type + final BigDecimal rescaled = decimal.setScale(decimalType.getScale(), BigDecimal.ROUND_UNNECESSARY); + // byte array must contain the two's-complement representation of the + // unscaled integer value in big-endian byte order + return decimal.unscaledValue().toByteArray(); + } else { + throw new RuntimeException("Unsupported decimal type."); + } + } + + private int convertFromDate(Schema schema, Date date) { + final LogicalType logicalType = schema.getLogicalType(); + if (logicalType == LogicalTypes.date()) { + // adopted from Apache Calcite + final long time = date.getTime(); + final long converted = time + (long) LOCAL_TZ.getOffset(time); + return (int) (converted / 86400000L); + } else { + throw new RuntimeException("Unsupported date type."); + } + } + + private int convertFromTime(Schema schema, Time date) { + final LogicalType logicalType = schema.getLogicalType(); + if (logicalType == LogicalTypes.timeMillis()) { + // adopted from Apache Calcite + final long time = date.getTime(); + final long converted = time + (long) LOCAL_TZ.getOffset(time); + return (int) (converted % 86400000L); + } else { + throw new RuntimeException("Unsupported time type."); + } + } + + private long convertFromTimestamp(Schema schema, Timestamp date) { + final LogicalType logicalType = schema.getLogicalType(); + if (logicalType == LogicalTypes.timestampMillis()) { + // adopted from Apache Calcite + final long time = date.getTime(); + return time + (long) LOCAL_TZ.getOffset(time); + } else { + throw new RuntimeException("Unsupported timestamp type."); + } + } + + private void writeObject(ObjectOutputStream outputStream) throws IOException { + outputStream.writeObject(recordClazz); + outputStream.writeObject(schemaString); // support for null + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException { + recordClazz = (Class) inputStream.readObject(); + schemaString = (String) inputStream.readObject(); + if (recordClazz != null) { + schema = SpecificData.get().getSchema(recordClazz); + } else { + schema = new Schema.Parser().parse(schemaString); + } + datumWriter = new SpecificDatumWriter<>(schema); + arrayOutputStream = new ByteArrayOutputStream(); + encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + } +} diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java new file mode 100644 index 000000000..903395f9d --- /dev/null +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/CsvCRowSerializationSchema.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.kafka.serialization; + +import com.dtstack.flink.sql.enums.EUpdateMode; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.csv.CsvRowDeserializationSchema; +import org.apache.flink.formats.csv.CsvRowSchemaConverter; +import org.apache.flink.formats.csv.CsvRowSerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.runtime.types.CRowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Objects; + +/** + * Serialization schema that serializes an object of Flink types into a CSV bytes. + * + *

Serializes the input row into a {@link ObjectNode} and + * converts it into byte[]. + * + *

Result byte[] messages can be deserialized using {@link CsvRowDeserializationSchema}. + */ +@PublicEvolving +public final class CsvCRowSerializationSchema implements SerializationSchema { + + private static final long serialVersionUID = 2098447220136965L; + + /** Type information describing the input CSV data. */ + private final RowTypeInfo typeInfo; + + /** Runtime instance that performs the actual work. */ + private final RuntimeConverter runtimeConverter; + + /** CsvMapper used to write {@link JsonNode} into bytes. */ + private final CsvMapper csvMapper; + + /** Schema describing the input CSV data. */ + private final CsvSchema csvSchema; + + /** Object writer used to write rows. It is configured by {@link CsvSchema}. */ + private final ObjectWriter objectWriter; + + /** Reusable object node. */ + private transient ObjectNode root; + + private String updateMode; + + private final String retractKey = "retract"; + + private CsvCRowSerializationSchema( + RowTypeInfo typeInfo, + CsvSchema csvSchema) { + this.typeInfo = typeInfo; + this.runtimeConverter = createRowRuntimeConverter(typeInfo, true); + this.csvMapper = new CsvMapper(); + this.csvSchema = csvSchema; + this.objectWriter = csvMapper.writer(csvSchema); + } + + /** + * A builder for creating a {@link CsvRowSerializationSchema}. + */ + @PublicEvolving + public static class Builder { + + private final RowTypeInfo typeInfo; + private CsvSchema csvSchema; + + /** + * Creates a {@link CsvRowSerializationSchema} expecting the given {@link TypeInformation}. + * + * @param typeInfo type information used to create schema. + */ + public Builder(TypeInformation typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + + if (!(typeInfo instanceof CRowTypeInfo)) { + throw new IllegalArgumentException("Row type information expected."); + } + RowTypeInfo rowTypeInfo = ((CRowTypeInfo) typeInfo).rowType(); + this.typeInfo = rowTypeInfo; + this.csvSchema = CsvRowSchemaConverter.convert(rowTypeInfo); + } + + public Builder setFieldDelimiter(char c) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(c).build(); + return this; + } + + public Builder setLineDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Delimiter must not be null."); + if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n")) { + throw new IllegalArgumentException( + "Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported."); + } + this.csvSchema = this.csvSchema.rebuild().setLineSeparator(delimiter).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + Preconditions.checkNotNull(delimiter, "Delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String s) { + this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + return this; + } + + public CsvCRowSerializationSchema build() { + return new CsvCRowSerializationSchema( + typeInfo, + csvSchema); + } + } + + @Override + public byte[] serialize(CRow crow) { + Row row = crow.row(); + boolean change = crow.change(); + if (root == null) { + root = csvMapper.createObjectNode(); + } + try { + runtimeConverter.convert(csvMapper, root, row); + if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) { + root.put(retractKey, change); + } + return objectWriter.writeValueAsBytes(root); + } catch (Throwable t) { + throw new RuntimeException("Could not serialize row '" + row + "'.", t); + } + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; + } + if (this == o) { + return true; + } + final CsvCRowSerializationSchema that = (CsvCRowSerializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return typeInfo.equals(that.typeInfo) && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + Arrays.equals(csvSchema.getLineSeparator(), otherSchema.getLineSeparator()) && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); + } + + @Override + public int hashCode() { + return Objects.hash( + typeInfo, + csvSchema.getColumnSeparator(), + csvSchema.getLineSeparator(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); + } + + // -------------------------------------------------------------------------------------------- + + private interface RuntimeConverter extends Serializable { + JsonNode convert(CsvMapper csvMapper, ContainerNode container, Object obj); + } + + private static RuntimeConverter createRowRuntimeConverter(RowTypeInfo rowTypeInfo, boolean isTopLevel) { + final TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes(); + final String[] fieldNames = rowTypeInfo.getFieldNames(); + + final RuntimeConverter[] fieldConverters = createFieldRuntimeConverters(fieldTypes); + + return assembleRowRuntimeConverter(isTopLevel, fieldNames, fieldConverters); + } + + private static RuntimeConverter[] createFieldRuntimeConverters(TypeInformation[] fieldTypes) { + final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i]); + } + return fieldConverters; + } + + private static RuntimeConverter assembleRowRuntimeConverter( + boolean isTopLevel, + String[] fieldNames, + RuntimeConverter[] fieldConverters) { + final int rowArity = fieldNames.length; + // top level reuses the object node container + if (isTopLevel) { + return (csvMapper, container, obj) -> { + final Row row = (Row) obj; + + validateArity(rowArity, row.getArity()); + + final ObjectNode objectNode = (ObjectNode) container; + for (int i = 0; i < rowArity; i++) { + objectNode.set( + fieldNames[i], + fieldConverters[i].convert(csvMapper, container, row.getField(i))); + } + return objectNode; + }; + } else { + return (csvMapper, container, obj) -> { + final Row row = (Row) obj; + + validateArity(rowArity, row.getArity()); + + final ArrayNode arrayNode = csvMapper.createArrayNode(); + for (int i = 0; i < rowArity; i++) { + arrayNode.add(fieldConverters[i].convert(csvMapper, arrayNode, row.getField(i))); + } + return arrayNode; + }; + } + } + + private static RuntimeConverter createNullableRuntimeConverter(TypeInformation info) { + final RuntimeConverter valueConverter = createRuntimeConverter(info); + return (csvMapper, container, obj) -> { + if (obj == null) { + return container.nullNode(); + } + return valueConverter.convert(csvMapper, container, obj); + }; + } + + private static RuntimeConverter createRuntimeConverter(TypeInformation info) { + if (info.equals(Types.VOID)) { + return (csvMapper, container, obj) -> container.nullNode(); + } else if (info.equals(Types.STRING)) { + return (csvMapper, container, obj) -> container.textNode((String) obj); + } else if (info.equals(Types.BOOLEAN)) { + return (csvMapper, container, obj) -> container.booleanNode((Boolean) obj); + } else if (info.equals(Types.BYTE)) { + return (csvMapper, container, obj) -> container.numberNode((Byte) obj); + } else if (info.equals(Types.SHORT)) { + return (csvMapper, container, obj) -> container.numberNode((Short) obj); + } else if (info.equals(Types.INT)) { + return (csvMapper, container, obj) -> container.numberNode((Integer) obj); + } else if (info.equals(Types.LONG)) { + return (csvMapper, container, obj) -> container.numberNode((Long) obj); + } else if (info.equals(Types.FLOAT)) { + return (csvMapper, container, obj) -> container.numberNode((Float) obj); + } else if (info.equals(Types.DOUBLE)) { + return (csvMapper, container, obj) -> container.numberNode((Double) obj); + } else if (info.equals(Types.BIG_DEC)) { + return (csvMapper, container, obj) -> container.numberNode((BigDecimal) obj); + } else if (info.equals(Types.BIG_INT)) { + return (csvMapper, container, obj) -> container.numberNode((BigInteger) obj); + } else if (info.equals(Types.SQL_DATE)) { + return (csvMapper, container, obj) -> container.textNode(obj.toString()); + } else if (info.equals(Types.SQL_TIME)) { + return (csvMapper, container, obj) -> container.textNode(obj.toString()); + } else if (info.equals(Types.SQL_TIMESTAMP)) { + return (csvMapper, container, obj) -> container.textNode(obj.toString()); + } else if (info instanceof RowTypeInfo){ + return createRowRuntimeConverter((RowTypeInfo) info, false); + } else if (info instanceof BasicArrayTypeInfo) { + return createObjectArrayRuntimeConverter(((BasicArrayTypeInfo) info).getComponentInfo()); + } else if (info instanceof ObjectArrayTypeInfo) { + return createObjectArrayRuntimeConverter(((ObjectArrayTypeInfo) info).getComponentInfo()); + } else if (info instanceof PrimitiveArrayTypeInfo && + ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return createByteArrayRuntimeConverter(); + } + else { + throw new RuntimeException("Unsupported type information '" + info + "'."); + } + } + + private static RuntimeConverter createObjectArrayRuntimeConverter(TypeInformation elementType) { + final RuntimeConverter elementConverter = createNullableRuntimeConverter(elementType); + return (csvMapper, container, obj) -> { + final Object[] array = (Object[]) obj; + final ArrayNode arrayNode = csvMapper.createArrayNode(); + for (Object element : array) { + arrayNode.add(elementConverter.convert(csvMapper, arrayNode, element)); + } + return arrayNode; + }; + } + + private static RuntimeConverter createByteArrayRuntimeConverter() { + return (csvMapper, container, obj) -> container.binaryNode((byte[]) obj); + } + + private static void validateArity(int expected, int actual) { + if (expected != actual) { + throw new RuntimeException("Row length mismatch. " + expected + + " fields expected but was " + actual + "."); + } + } +} diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/JsonCRowSerializationSchema.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/JsonCRowSerializationSchema.java new file mode 100644 index 000000000..bfe801d52 --- /dev/null +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/serialization/JsonCRowSerializationSchema.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.kafka.serialization; + +import com.dtstack.flink.sql.enums.EUpdateMode; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.json.JsonRowSchemaConverter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.runtime.types.CRowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Objects; + +/** + * + * Serialization schema that serializes an object of Flink types into a JSON bytes. + * + *

Serializes the input Flink object into a JSON string and + * converts it into byte[]. + * + */ +public class JsonCRowSerializationSchema implements SerializationSchema { + + private static final long serialVersionUID = -2885556750743978636L; + + /** Type information describing the input type. */ + private final TypeInformation typeInfo; + + /** Object mapper that is used to create output JSON objects. */ + private final ObjectMapper mapper = new ObjectMapper(); + + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */ + private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'"); + + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */ + private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'"); + + /** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */ + private SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + + /** Reusable object node. */ + private transient ObjectNode node; + + private String updateMode; + + private final String retractKey = "retract"; + + public JsonCRowSerializationSchema(String jsonSchema, String updateMode) { + this(JsonRowSchemaConverter.convert(jsonSchema), updateMode); + } + + /** + * Creates a JSON serialization schema for the given type information. + * + * @param typeInfo The field names of {@link Row} are used to map to JSON properties. + */ + public JsonCRowSerializationSchema(TypeInformation typeInfo, String updateMode) { + Preconditions.checkNotNull(typeInfo, "Type information"); + this.typeInfo = typeInfo; + this.updateMode = updateMode; + } + + + @Override + public byte[] serialize(CRow crow) { + Row row = crow.row(); + boolean change = crow.change(); + if (node == null) { + node = mapper.createObjectNode(); + } + + RowTypeInfo rowTypeInfo = ((CRowTypeInfo) typeInfo).rowType(); + try { + convertRow(node, rowTypeInfo, row); + if (StringUtils.equalsIgnoreCase(updateMode, EUpdateMode.UPSERT.name())) { + node.put(retractKey, change); + } + return mapper.writeValueAsBytes(node); + } catch (Throwable t) { + throw new RuntimeException("Could not serialize row '" + row + "'. " + + "Make sure that the schema matches the input.", t); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final JsonCRowSerializationSchema that = (JsonCRowSerializationSchema) o; + return Objects.equals(typeInfo, that.typeInfo); + } + + @Override + public int hashCode() { + return Objects.hash(typeInfo); + } + + // -------------------------------------------------------------------------------------------- + + private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) { + if (reuse == null) { + reuse = mapper.createObjectNode(); + } + final String[] fieldNames = info.getFieldNames(); + + final TypeInformation[] fieldTypes = info.getFieldTypes(); + + // validate the row + if (row.getArity() != fieldNames.length) { + throw new IllegalStateException(String.format( + "Number of elements in the row '%s' is different from number of field names: %d", row, fieldNames.length)); + } + + for (int i = 0; i < fieldNames.length; i++) { + final String name = fieldNames[i]; + + final JsonNode fieldConverted = convert(reuse, reuse.get(name), fieldTypes[i], row.getField(i)); + reuse.set(name, fieldConverted); + } + + return reuse; + } + + private JsonNode convert(ContainerNode container, JsonNode reuse, TypeInformation info, Object object) { + if (info == Types.VOID || object == null) { + return container.nullNode(); + } else if (info == Types.BOOLEAN) { + return container.booleanNode((Boolean) object); + } else if (info == Types.STRING) { + return container.textNode((String) object); + } else if (info == Types.BIG_DEC) { + // convert decimal if necessary + if (object instanceof BigDecimal) { + return container.numberNode((BigDecimal) object); + } + return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); + } else if (info == Types.BIG_INT) { + // convert integer if necessary + if (object instanceof BigInteger) { + return container.numberNode((BigInteger) object); + } + return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); + } else if (info == Types.SQL_DATE) { + return container.textNode(object.toString()); + } else if (info == Types.SQL_TIME) { + final Time time = (Time) object; + // strip milliseconds if possible + if (time.getTime() % 1000 > 0) { + return container.textNode(timeFormatWithMillis.format(time)); + } + return container.textNode(timeFormat.format(time)); + } else if (info == Types.SQL_TIMESTAMP) { + return container.textNode(timestampFormat.format((Timestamp) object)); + } else if (info instanceof RowTypeInfo) { + if (reuse != null && reuse instanceof ObjectNode) { + return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object); + } else { + return convertRow(null, (RowTypeInfo) info, (Row) object); + } + } else if (info instanceof ObjectArrayTypeInfo) { + if (reuse != null && reuse instanceof ArrayNode) { + return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); + } else { + return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); + } + } else if (info instanceof BasicArrayTypeInfo) { + if (reuse != null && reuse instanceof ArrayNode) { + return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); + } else { + return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); + } + } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { + return container.binaryNode((byte[]) object); + } else { + // for types that were specified without JSON schema + // e.g. POJOs + try { + return mapper.valueToTree(object); + } catch (IllegalArgumentException e) { + throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e); + } + } + } + + private ArrayNode convertObjectArray(ArrayNode reuse, TypeInformation info, Object[] array) { + if (reuse == null) { + reuse = mapper.createArrayNode(); + } else { + reuse.removeAll(); + } + + for (Object object : array) { + reuse.add(convert(reuse, null, info, object)); + } + return reuse; + } +} diff --git a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java index 7520de512..a42473420 100644 --- a/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java +++ b/kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java @@ -18,6 +18,7 @@ package com.dtstack.flink.sql.sink.kafka.table; +import com.dtstack.flink.sql.enums.EUpdateMode; import com.dtstack.flink.sql.format.FormatType; import com.dtstack.flink.sql.table.AbstractTableParser; import com.dtstack.flink.sql.table.AbstractTableInfo; @@ -51,6 +52,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map kafkaParam = new HashMap(); @@ -58,6 +60,8 @@ public class KafkaSinkTableInfo extends AbstractTargetTableInfo { private String partitionKeys; + private String updateMode; + public void addKafkaParam(String key, String value) { kafkaParam.put(key, value); } @@ -70,7 +74,6 @@ public Set getKafkaParamKeys() { return kafkaParam.keySet(); } - public String getBootstrapServers() { return bootstrapServers; } @@ -103,6 +106,14 @@ public void setFieldDelimiter(String fieldDelimiter) { this.fieldDelimiter = fieldDelimiter; } + public String getUpdateMode() { + return updateMode; + } + + public void setUpdateMode(String updateMode) { + this.updateMode = updateMode; + } + @Override public boolean check() { Preconditions.checkNotNull(getType(), "kafka of type is required"); diff --git a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java index afa950c5b..f08287cec 100644 --- a/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java +++ b/kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java @@ -77,7 +77,7 @@ protected void beforeDeserialize() throws IOException { protected void registerPtMetric(AbstractFetcher fetcher) throws Exception { - Field consumerThreadField = fetcher.getClass().getSuperclass().getDeclaredField("consumerThread"); + Field consumerThreadField = getConsumerThreadField(fetcher); consumerThreadField.setAccessible(true); KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher); @@ -115,6 +115,14 @@ public Long getValue() { } } + private Field getConsumerThreadField(AbstractFetcher fetcher) throws NoSuchFieldException { + try { + return fetcher.getClass().getDeclaredField("consumerThread"); + } catch (Exception e) { + return fetcher.getClass().getSuperclass().getDeclaredField("consumerThread"); + } + } + public void setFetcher(AbstractFetcher fetcher) { this.fetcher = fetcher; } diff --git a/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java b/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java index 71a9cc386..1cbbeafd9 100644 --- a/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java +++ b/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer.java @@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.types.Row; +import org.apache.flink.table.runtime.types.CRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +37,7 @@ * * @author maqi */ -public class KafkaProducer extends FlinkKafkaProducer { +public class KafkaProducer extends FlinkKafkaProducer { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class); @@ -45,7 +45,7 @@ public class KafkaProducer extends FlinkKafkaProducer { private SerializationMetricWrapper serializationMetricWrapper; - public KafkaProducer(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] parititonKeys) { + public KafkaProducer(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] parititonKeys) { super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, parititonKeys), producerConfig, customPartitioner); this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema; } diff --git a/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducerFactory.java b/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducerFactory.java index f3a2f40f5..6bf9014df 100644 --- a/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducerFactory.java +++ b/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducerFactory.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.types.Row; +import org.apache.flink.table.runtime.types.CRow; import java.util.Optional; import java.util.Properties; @@ -36,7 +36,7 @@ public class KafkaProducerFactory extends AbstractKafkaProducerFactory { @Override - public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys) { + public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys) { return new KafkaProducer(kafkaSinkTableInfo.getTopic(), createSerializationMetricWrapper(kafkaSinkTableInfo, typeInformation), properties, partitioner, partitionKeys); } } diff --git a/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java b/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java index 7105bc037..71e938ba5 100644 --- a/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java +++ b/kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java @@ -31,6 +31,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.runtime.types.CRowTypeInfo; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; @@ -56,13 +58,15 @@ public class KafkaSink implements RetractStreamTableSink, IStreamSinkGener< protected Properties properties; - protected FlinkKafkaProducer flinkKafkaProducer; + protected FlinkKafkaProducer flinkKafkaProducer; + protected CRowTypeInfo typeInformation; + /** The schema of the table. */ private TableSchema schema; /** Partitioner to select Kafka partition for each item. */ - protected Optional> partitioner; + protected Optional> partitioner; private String[] partitionKeys; @@ -97,7 +101,9 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { this.parallelism = parallelism; } - this.flinkKafkaProducer = (FlinkKafkaProducer) new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner, partitionKeys); + typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames)); + this.flinkKafkaProducer = (FlinkKafkaProducer) new KafkaProducerFactory() + .createKafkaProducer(kafkaSinkTableInfo, typeInformation, properties, partitioner, partitionKeys); return this; } @@ -108,9 +114,9 @@ public TypeInformation getRecordType() { @Override public void emitDataStream(DataStream> dataStream) { - DataStream mapDataStream = dataStream.filter((Tuple2 record) -> record.f0) - .map((Tuple2 record) -> record.f1) - .returns(getOutputType().getTypeAt(1)) + DataStream mapDataStream = dataStream + .map((Tuple2 record) -> new CRow(record.f1, record.f0)) + .returns(typeInformation) .setParallelism(parallelism); mapDataStream.addSink(flinkKafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); diff --git a/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java b/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java index c815e134a..bee1865dd 100644 --- a/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java +++ b/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,7 @@ * * @author maqi */ -public class KafkaProducer09 extends FlinkKafkaProducer09 { +public class KafkaProducer09 extends FlinkKafkaProducer09 { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer09.class); @@ -45,7 +46,7 @@ public class KafkaProducer09 extends FlinkKafkaProducer09 { private SerializationMetricWrapper serializationMetricWrapper; - public KafkaProducer09(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner,String[] partitionKeys) { + public KafkaProducer09(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] partitionKeys) { super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, partitionKeys), producerConfig, customPartitioner.orElse(null)); this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema; } diff --git a/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09Factory.java b/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09Factory.java index 7fb3909ee..ee3423b07 100644 --- a/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09Factory.java +++ b/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer09Factory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import java.util.Optional; @@ -36,7 +37,7 @@ public class KafkaProducer09Factory extends AbstractKafkaProducerFactory { @Override - public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner,String[] partitionKeys) { + public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys) { return new KafkaProducer09(kafkaSinkTableInfo.getTopic(), createSerializationMetricWrapper(kafkaSinkTableInfo, typeInformation), properties, partitioner, partitionKeys); } } diff --git a/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java b/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java index 0a991a8ea..e6dbdf3d3 100644 --- a/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java +++ b/kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java @@ -27,10 +27,13 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.runtime.types.CRowTypeInfo; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; @@ -56,13 +59,14 @@ public class KafkaSink implements RetractStreamTableSink, IStreamSinkGener< protected Properties properties; - protected FlinkKafkaProducer09 kafkaProducer09; + protected FlinkKafkaProducer09 kafkaProducer09; + protected CRowTypeInfo typeInformation; /** The schema of the table. */ private TableSchema schema; /** Partitioner to select Kafka partition for each item. */ - protected Optional> partitioner; + protected Optional> partitioner; private String[] partitionKeys; @@ -101,8 +105,9 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { this.parallelism = parallelism; } - this.kafkaProducer09 = (FlinkKafkaProducer09) new KafkaProducer09Factory() - .createKafkaProducer(kafka09SinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner, partitionKeys); + typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames)); + this.kafkaProducer09 = (FlinkKafkaProducer09) new KafkaProducer09Factory() + .createKafkaProducer(kafka09SinkTableInfo, typeInformation, properties, partitioner, partitionKeys); return this; } @@ -113,10 +118,10 @@ public TypeInformation getRecordType() { @Override public void emitDataStream(DataStream> dataStream) { - DataStream mapDataStream = dataStream.filter((Tuple2 record) -> record.f0) - .map((Tuple2 record) -> record.f1) - .returns(getOutputType().getTypeAt(1)) - .setParallelism(parallelism); + DataStream mapDataStream = dataStream + .map((Tuple2 record) -> new CRow(record.f1, record.f0)) + .returns(typeInformation) + .setParallelism(parallelism); mapDataStream.addSink(kafkaProducer09) .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); diff --git a/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java b/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java index 3cdc45dec..3936575ef 100644 --- a/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java +++ b/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010.java @@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.types.Row; +import org.apache.flink.table.runtime.types.CRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +37,7 @@ * * @author maqi */ -public class KafkaProducer010 extends FlinkKafkaProducer010 { +public class KafkaProducer010 extends FlinkKafkaProducer010 { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer010.class); @@ -45,7 +45,7 @@ public class KafkaProducer010 extends FlinkKafkaProducer010 { private SerializationMetricWrapper serializationMetricWrapper; - public KafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] partitionKeys) { + public KafkaProducer010(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] partitionKeys) { super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, partitionKeys), producerConfig, customPartitioner.get()); this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema; } diff --git a/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010Factory.java b/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010Factory.java index c44a9fe86..e0e023586 100644 --- a/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010Factory.java +++ b/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer010Factory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import java.util.Optional; @@ -36,7 +37,7 @@ public class KafkaProducer010Factory extends AbstractKafkaProducerFactory { @Override - public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys) { + public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys) { return new KafkaProducer010(kafkaSinkTableInfo.getTopic(), createSerializationMetricWrapper(kafkaSinkTableInfo, typeInformation), properties, partitioner, partitionKeys); } } diff --git a/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java b/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java index 34ea8fc5f..ac5a11810 100644 --- a/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java +++ b/kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.runtime.types.CRowTypeInfo; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; @@ -61,6 +63,9 @@ public class KafkaSink implements RetractStreamTableSink, IStreamSinkGener< protected KafkaSinkTableInfo kafka10SinkTableInfo; + protected RichSinkFunction kafkaProducer010; + protected CRowTypeInfo typeInformation; + /** The schema of the table. */ private TableSchema schema; @@ -97,6 +102,12 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { if (parallelism != null) { this.parallelism = parallelism; } + + typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames)); + kafkaProducer010 = new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, + typeInformation, properties, + Optional.of(new CustomerFlinkPartition<>()), partitionKeys); + return this; } @@ -107,13 +118,9 @@ public TypeInformation getRecordType() { @Override public void emitDataStream(DataStream> dataStream) { - - RichSinkFunction kafkaProducer010 = new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType().getTypeAt(1), properties, - Optional.of(new CustomerFlinkPartition<>()), partitionKeys); - - DataStream mapDataStream = dataStream.filter((Tuple2 record) -> record.f0) - .map((Tuple2 record) -> record.f1) - .returns(getOutputType().getTypeAt(1)) + DataStream mapDataStream = dataStream + .map((Tuple2 record) -> new CRow(record.f1, record.f0)) + .returns(typeInformation) .setParallelism(parallelism); mapDataStream.addSink(kafkaProducer010).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); diff --git a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011.java b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011.java index 7880dd419..429d21a79 100644 --- a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011.java +++ b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011.java @@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.types.Row; +import org.apache.flink.table.runtime.types.CRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +38,7 @@ * * @author maqi */ -public class KafkaProducer011 extends FlinkKafkaProducer011 { +public class KafkaProducer011 extends FlinkKafkaProducer011 { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer011.class); @@ -46,7 +46,7 @@ public class KafkaProducer011 extends FlinkKafkaProducer011 { private SerializationMetricWrapper serializationMetricWrapper; - public KafkaProducer011(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] partitionKeys) { + public KafkaProducer011(String topicId, SerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner, String[] partitionKeys) { super(topicId, new CustomerKeyedSerializationSchema((SerializationMetricWrapper)serializationSchema, partitionKeys), producerConfig, customPartitioner); this.serializationMetricWrapper = (SerializationMetricWrapper) serializationSchema; } diff --git a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011Factory.java b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011Factory.java index e2272b16e..0cb11da82 100644 --- a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011Factory.java +++ b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaProducer011Factory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.types.Row; import java.util.Optional; @@ -36,7 +37,7 @@ public class KafkaProducer011Factory extends AbstractKafkaProducerFactory { @Override - public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, + public RichSinkFunction createKafkaProducer(KafkaSinkTableInfo kafkaSinkTableInfo, TypeInformation typeInformation, Properties properties, Optional> partitioner, String[] partitionKeys) { return new KafkaProducer011(kafkaSinkTableInfo.getTopic(), createSerializationMetricWrapper(kafkaSinkTableInfo, typeInformation), properties, partitioner, partitionKeys); } diff --git a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java index d7807a935..835941ca3 100644 --- a/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java +++ b/kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java @@ -31,6 +31,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.runtime.types.CRowTypeInfo; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; @@ -61,13 +63,15 @@ public class KafkaSink implements RetractStreamTableSink, IStreamSinkGener protected Properties properties; - protected FlinkKafkaProducer011 kafkaProducer011; + protected FlinkKafkaProducer011 kafkaProducer011; + protected CRowTypeInfo typeInformation; + /** The schema of the table. */ private TableSchema schema; /** Partitioner to select Kafka partition for each item. */ - protected Optional> partitioner; + protected Optional> partitioner; private String[] partitionKeys; @@ -102,8 +106,9 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { this.parallelism = parallelism; } - this.kafkaProducer011 = (FlinkKafkaProducer011) new KafkaProducer011Factory() - .createKafkaProducer(kafka11SinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner, partitionKeys); + typeInformation = new CRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames)); + this.kafkaProducer011 = (FlinkKafkaProducer011) new KafkaProducer011Factory() + .createKafkaProducer(kafka11SinkTableInfo, typeInformation, properties, partitioner, partitionKeys); return this; } @@ -114,9 +119,10 @@ public TypeInformation getRecordType() { @Override public void emitDataStream(DataStream> dataStream) { - DataStream mapDataStream = dataStream.filter((Tuple2 record) -> record.f0) - .map((Tuple2 record) -> record.f1) - .returns(getOutputType().getTypeAt(1)) + + DataStream mapDataStream = dataStream + .map((Tuple2 record) -> new CRow(record.f1, record.f0)) + .returns(typeInformation) .setParallelism(parallelism); mapDataStream.addSink(kafkaProducer011).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); diff --git a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer011Factory.java b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer011Factory.java index 3f804fc6c..c20f0678b 100644 --- a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer011Factory.java +++ b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer011Factory.java @@ -31,8 +31,8 @@ /** * company: www.dtstack.com - * author: toutian * create: 2019/12/24 + * @author: toutian */ public class KafkaConsumer011Factory extends AbstractKafkaConsumerFactory { diff --git a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java index 11be1898a..2f760bdf2 100644 --- a/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java +++ b/kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java @@ -93,11 +93,11 @@ public Table genStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExec //earliest,latest if ("earliest".equalsIgnoreCase(kafkaSourceTableInfo.getOffsetReset())) { kafkaSrc.setStartFromEarliest(); - } else if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) {// {"0":12312,"1":12321,"2":12312} + } else if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) { try { Properties properties = PluginUtil.jsonStrToObject(kafkaSourceTableInfo.getOffsetReset(), Properties.class); Map offsetMap = PluginUtil.objectToMap(properties); - Map specificStartupOffsets = new HashMap<>(); + Map specificStartupOffsets = new HashMap<>(16); for (Map.Entry entry : offsetMap.entrySet()) { specificStartupOffsets.put(new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())), Long.valueOf(entry.getValue().toString())); }