From 9d78eab9e763e7a9c0634655a073e20bd2401aab Mon Sep 17 00:00:00 2001 From: Sam Date: Thu, 7 Sep 2023 11:43:44 +0545 Subject: [PATCH] Calculate batchElapsedTime correctly by converting startTime from nanoseconds to milliseconds --- .../java/org/apache/flume/channel/kafka/KafkaChannel.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 65cad9937d..79fda1cf6f 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory; import scala.Option; +import java.util.concurrent.TimeUnit; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -810,7 +811,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { log.trace("Error sending message to Kafka due to " + exception.getMessage()); } if (log.isDebugEnabled()) { - long batchElapsedTime = System.currentTimeMillis() - startTime; + long batchElapsedTime = System.currentTimeMillis() + - TimeUnit.NANOSECONDS.toMillis(startTime); if (metadata != null) { log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime);