From 6bfd49ba9ba693d4d3105ba4a27cdbcb8529bd61 Mon Sep 17 00:00:00 2001 From: Mayuresh Gharat Date: Tue, 28 Jan 2020 13:13:12 -0800 Subject: [PATCH] [LI-HOTFIX] Added support to enable passthrough mirroring data from V1 format to V2 format (#70) Pass through mirroring as it is today, works when the log.message.format.version between source and destination cluster is the same (0.10.0 to 0.10.0, 2.0 to 2.0). When there is a mixed message format between source and destination, KMM throws errors. This patch makes passthrough mirroring work when the source is at 0.10.0 format and destination is at 2.0 format. This patch does not address the errors/issues when the source is at 2.0 format and destination is at 0.10.0 format. TICKET = --- .../consumer/PassThroughConsumerRecord.java | 54 +++++++++++++++++ .../clients/consumer/internals/Fetcher.java | 8 +++ .../producer/internals/RecordAccumulator.java | 16 +++++ .../common/record/MemoryRecordsBuilder.java | 6 +- .../kafka/consumer/BaseConsumerRecord.scala | 3 +- .../main/scala/kafka/tools/MirrorMaker.scala | 60 ++++++++++++++++++- 6 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java new file mode 100644 index 0000000000000..efe82a02c2393 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; + +/** + * In case of passthrough, value contains the message batch while the key is null + * @param null + * @param batch of messages + */ +public class PassThroughConsumerRecord extends ConsumerRecord { + private final byte magic; + + public PassThroughConsumerRecord(String topic, int partition, long offset, K key, V value, byte magic) { + super(topic, partition, offset, key, value); + this.magic = magic; + } + + public PassThroughConsumerRecord(String topic, int partition, long offset, long timestamp, + TimestampType timestampType, long checksum, int serializedKeySize, int serializedValueSize, K key, V value, + byte magic) { + super(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, + value); + this.magic = magic; + } + + public PassThroughConsumerRecord(String topic, int partition, long offset, long timestamp, + TimestampType timestampType, Long checksum, int serializedKeySize, int serializedValueSize, K key, V value, + Headers headers, byte magic) { + super(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, + value, headers); + this.magic = magic; + } + + public byte magic() { + return magic; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 93e9a0b306089..8602bc1a0aad1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.PassThroughConsumerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -1147,6 +1148,13 @@ private ConsumerRecord parseRecord(TopicPartition partition, (enableShallowIteration ? ((AbstractLegacyRecordBatch) record).outerRecord().buffer() : record.value()); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); + if (enableShallowIteration) { + return new PassThroughConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, + timestampType, record.checksumOrNull(), + keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, + valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers, + batch.magic()); + } return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a9bb29ce73f03..174937f758176 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -69,6 +69,8 @@ */ public final class RecordAccumulator { + private static final String PASS_THROUGH_MAGIC_VALUE = "__passThroughMagicValue"; + private final Logger log; private volatile boolean closed; private final AtomicInteger flushesInProgress; @@ -222,6 +224,20 @@ public RecordAppendResult append(TopicPartition tp, return appendResult; } + // HOTFIX for enabling mirroring data with passthrough compression with mixed message format + // In PassThrough mode, KMM will set a header with key = "__passThroughMagicValue" and value = magic byte of the message. + // This code enables passthrough were source is in 0.10 format and destination is in 2.0 format (not the other way round). + if (compression.equals(CompressionType.PASSTHROUGH)) { + for (Header header : headers) { + if (header.key().equals(PASS_THROUGH_MAGIC_VALUE)) { + byte srcMagicValue = header.value()[0]; + if (maxUsableMagic > srcMagicValue) { + maxUsableMagic = srcMagicValue; + } + } + } + } + MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 54e25ec10d11d..b80c32fac7996 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -792,8 +792,12 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head // For passthrough V2, ensure one producerBatch only has one DefaultRecordBatch, and since // in this case, DefaultRecordBatch is the value part of a DefaultRecord, so we only allow one record - if (magic >= RecordBatch.MAGIC_VALUE_V2 && usePassthrough) + // For passthrough V1, ideally we can append multiple passthrough records to the same batch, it + // will not work when we are migrating from V1 to V2 message format as we cannot append V1 and V2 messages + // in the same batch. + if (usePassthrough) { return false; + } final int recordSize; if (magic < RecordBatch.MAGIC_VALUE_V2) { diff --git a/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala b/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala index 7628b6b65168c..01408a744d17b 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala @@ -30,4 +30,5 @@ case class BaseConsumerRecord(topic: String, timestampType: TimestampType = TimestampType.NO_TIMESTAMP_TYPE, key: Array[Byte], value: Array[Byte], - headers: Headers = new RecordHeaders()) + headers: Headers = new RecordHeaders(), + magic: Byte = -1) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 24d799d922744..5d7ce16cef1bd 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -28,13 +28,15 @@ import joptsimple.OptionParser import kafka.consumer.BaseConsumerRecord import kafka.metrics.KafkaMetricsGroup import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist} -import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, PassThroughConsumerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} import org.apache.kafka.common.record.RecordBatch import scala.collection.JavaConverters._ @@ -69,6 +71,22 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var offsetCommitIntervalMs = 0 private var abortOnSendFailure: Boolean = true @volatile private var exitingOnSendFailure: Boolean = false + private var passThroughEnabled: Boolean = false + private val PASS_THROUGH_MAGIC_VALUE = "__passThroughMagicValue"; + + val recordHeadersV1: Headers = { + val magicValueV1 = Array[Byte] { + RecordBatch.MAGIC_VALUE_V1 + } + new RecordHeaders().add(new RecordHeader(PASS_THROUGH_MAGIC_VALUE, magicValueV1)) + } + + val recordHeadersV2: Headers = { + val magicValueV2 = Array[Byte] { + RecordBatch.MAGIC_VALUE_V2 + } + new RecordHeaders().add(new RecordHeader(PASS_THROUGH_MAGIC_VALUE, magicValueV2)) + } // If a message send failed after retries are exhausted. The offset of the messages will also be removed from // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that @@ -214,6 +232,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") if (options.has(passthroughCompressionOpt)) { + passThroughEnabled = true consumerProps.setProperty("enable.shallow.iterator", "true") producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "passthrough") } @@ -253,7 +272,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { - defaultMirrorMakerMessageHandler + if (passThroughEnabled) { + passThroughMirrorMakerMessageHandler + } else { + defaultMirrorMakerMessageHandler + } } } } catch { @@ -362,6 +385,17 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { record.value, record.headers) + private def toBaseConsumerRecordWithPassThrough(record: PassThroughConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord = + BaseConsumerRecord(record.topic, + record.partition, + record.offset, + record.timestamp, + record.timestampType, + record.key, + record.value, + record.headers, + record.magic) + override def run() { info("Starting mirror maker thread " + threadName) try { @@ -377,7 +411,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } else { trace("Sending message with null value and offset %d.".format(data.offset)) } - val records = messageHandler.handle(toBaseConsumerRecord(data)) + val records = { + if (passThroughEnabled) { + messageHandler.handle(toBaseConsumerRecordWithPassThrough(data.asInstanceOf[PassThroughConsumerRecord[Array[Byte], Array[Byte]]])) + } else { + messageHandler.handle(toBaseConsumerRecord(data)) + } + } records.asScala.foreach(producer.send) maybeFlushAndCommitOffsets() } @@ -587,6 +627,20 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + private[tools] object passThroughMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { + val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp + // It is assumed that we don't have message format V0 anymore at Linkedin + if (record.magic.equals(RecordBatch.MAGIC_VALUE_V1)) { + Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, recordHeadersV1)) + } else if (record.magic.equals(RecordBatch.MAGIC_VALUE_V2)) { + Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, recordHeadersV2)) + } else { + throw new IllegalArgumentException("Record Batch with magic value : " + record.magic + ", is not supported in PassThrough mode") + } + } + } + private class NoRecordsException extends RuntimeException }