From e170684bb86e860212b3175a7d15a297627aadc4 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Wed, 18 Dec 2024 16:35:39 +0100 Subject: [PATCH] fix: Kafka metrics (#1985) --- pkg/kafka/metrics/broker.go | 72 +++++++++++++++--------------- pkg/kafka/metrics/consumergroup.go | 6 +-- pkg/kafka/metrics/metrics.go | 54 +++++++++++----------- pkg/kafka/metrics/topic.go | 30 ++++++------- 4 files changed, 81 insertions(+), 81 deletions(-) diff --git a/pkg/kafka/metrics/broker.go b/pkg/kafka/metrics/broker.go index ebeab8e21..c929f58b9 100644 --- a/pkg/kafka/metrics/broker.go +++ b/pkg/kafka/metrics/broker.go @@ -26,29 +26,29 @@ type BrokerMetrics struct { // Number of messages in-flight to broker awaiting response InflightMessagesAwaitingResponse metric.Int64Gauge // Total number of requests sent - RequestsSent metric.Int64Counter + RequestsSent metric.Int64Gauge // Total number of bytes sent - RequestBytesSent metric.Int64Counter + RequestBytesSent metric.Int64Gauge // Total number of transmission errors - RequestErrors metric.Int64Counter + RequestErrors metric.Int64Gauge // Total number of request retries - RequestRetries metric.Int64Counter + RequestRetries metric.Int64Gauge // Microseconds since last socket send (or -1 if no sends yet for current connection). - LastSocketSend metric.Int64Counter + LastSocketSend metric.Int64Gauge // Total number of requests timed out - RequestTimeouts metric.Int64Counter + RequestTimeouts metric.Int64Gauge // Total number of responses received - ResponsesReceived metric.Int64Counter + ResponsesReceived metric.Int64Gauge // Total number of bytes received - ResponseBytesReceived metric.Int64Counter + ResponseBytesReceived metric.Int64Gauge // Total number of receive errors - ResponseErrors metric.Int64Counter + ResponseErrors metric.Int64Gauge // Microseconds since last socket receive (or -1 if no receives yet for current connection). - LastSocketReceive metric.Int64Counter + LastSocketReceive metric.Int64Gauge // Number of connection attempts, including successful and failed, and name resolution failures. - Connects metric.Int64Counter + Connects metric.Int64Gauge // Number of disconnects (triggered by broker, network, load-balancer, etc.). - Disconnects metric.Int64Counter + Disconnects metric.Int64Gauge // Smallest value LatencyMin metric.Int64Gauge @@ -112,18 +112,18 @@ func (m *BrokerMetrics) Add(ctx context.Context, stats *stats.BrokerStats, attrs m.MessagesAwaitingTransmission.Record(ctx, stats.MessagesAwaitingTransmission, metric.WithAttributes(attrs...)) m.InflightRequestsAwaitingResponse.Record(ctx, stats.InflightRequestsAwaitingResponse, metric.WithAttributes(attrs...)) m.InflightMessagesAwaitingResponse.Record(ctx, stats.InflightMessagesAwaitingResponse, metric.WithAttributes(attrs...)) - m.RequestsSent.Add(ctx, stats.RequestsSent, metric.WithAttributes(attrs...)) - m.RequestBytesSent.Add(ctx, stats.RequestBytesSent, metric.WithAttributes(attrs...)) - m.RequestErrors.Add(ctx, stats.RequestErrors, metric.WithAttributes(attrs...)) - m.RequestRetries.Add(ctx, stats.RequestRetries, metric.WithAttributes(attrs...)) - m.LastSocketSend.Add(ctx, stats.LastSocketSend, metric.WithAttributes(attrs...)) - m.RequestTimeouts.Add(ctx, stats.RequestTimeouts, metric.WithAttributes(attrs...)) - m.ResponsesReceived.Add(ctx, stats.ResponsesReceived, metric.WithAttributes(attrs...)) - m.ResponseBytesReceived.Add(ctx, stats.ResponseBytesReceived, metric.WithAttributes(attrs...)) - m.ResponseErrors.Add(ctx, stats.ResponseErrors, metric.WithAttributes(attrs...)) - m.LastSocketReceive.Add(ctx, stats.LastSocketReceive, metric.WithAttributes(attrs...)) - m.Connects.Add(ctx, stats.Connects, metric.WithAttributes(attrs...)) - m.Disconnects.Add(ctx, stats.Disconnects, metric.WithAttributes(attrs...)) + m.RequestsSent.Record(ctx, stats.RequestsSent, metric.WithAttributes(attrs...)) + m.RequestBytesSent.Record(ctx, stats.RequestBytesSent, metric.WithAttributes(attrs...)) + m.RequestErrors.Record(ctx, stats.RequestErrors, metric.WithAttributes(attrs...)) + m.RequestRetries.Record(ctx, stats.RequestRetries, metric.WithAttributes(attrs...)) + m.LastSocketSend.Record(ctx, stats.LastSocketSend, metric.WithAttributes(attrs...)) + m.RequestTimeouts.Record(ctx, stats.RequestTimeouts, metric.WithAttributes(attrs...)) + m.ResponsesReceived.Record(ctx, stats.ResponsesReceived, metric.WithAttributes(attrs...)) + m.ResponseBytesReceived.Record(ctx, stats.ResponseBytesReceived, metric.WithAttributes(attrs...)) + m.ResponseErrors.Record(ctx, stats.ResponseErrors, metric.WithAttributes(attrs...)) + m.LastSocketReceive.Record(ctx, stats.LastSocketReceive, metric.WithAttributes(attrs...)) + m.Connects.Record(ctx, stats.Connects, metric.WithAttributes(attrs...)) + m.Disconnects.Record(ctx, stats.Disconnects, metric.WithAttributes(attrs...)) m.LatencyMin.Record(ctx, stats.Latency.Min, metric.WithAttributes(attrs...)) m.LatencyMax.Record(ctx, stats.Latency.Max, metric.WithAttributes(attrs...)) @@ -215,7 +215,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.inflight_messages_awaiting_response: %w", err) } - m.RequestsSent, err = meter.Int64Counter( + m.RequestsSent, err = meter.Int64Gauge( "kafka.broker.request_sent", metric.WithDescription("Total number of requests sent"), metric.WithUnit("{request}"), @@ -224,7 +224,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.request_sent: %w", err) } - m.RequestBytesSent, err = meter.Int64Counter( + m.RequestBytesSent, err = meter.Int64Gauge( "kafka.broker.request_bytes_sent", metric.WithDescription("Total number of bytes sent"), metric.WithUnit("{byte}"), @@ -233,7 +233,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.request_bytes_sent: %w", err) } - m.RequestErrors, err = meter.Int64Counter( + m.RequestErrors, err = meter.Int64Gauge( "kafka.broker.request_errors", metric.WithDescription("Total number of transmission errors"), metric.WithUnit("{request}"), @@ -242,7 +242,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.request_errors: %w", err) } - m.RequestRetries, err = meter.Int64Counter( + m.RequestRetries, err = meter.Int64Gauge( "kafka.broker.request_retries", metric.WithDescription("Total number of request retries"), metric.WithUnit("{request}"), @@ -251,7 +251,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.request_retries: %w", err) } - m.LastSocketSend, err = meter.Int64Counter( + m.LastSocketSend, err = meter.Int64Gauge( "kafka.broker.last_socket_send", metric.WithDescription("Microseconds since last socket send (or -1 if no sends yet for current connection)"), metric.WithUnit("{microsecond}"), @@ -260,7 +260,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.last_socket_send: %w", err) } - m.RequestTimeouts, err = meter.Int64Counter( + m.RequestTimeouts, err = meter.Int64Gauge( "kafka.broker.request_timeouts", metric.WithDescription("Total number of requests timed out"), metric.WithUnit("{request}"), @@ -269,7 +269,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.request_retries: %w", err) } - m.ResponsesReceived, err = meter.Int64Counter( + m.ResponsesReceived, err = meter.Int64Gauge( "kafka.broker.responses_received", metric.WithDescription("Total number of responses received"), metric.WithUnit("{response}"), @@ -278,7 +278,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.responses_received: %w", err) } - m.ResponseBytesReceived, err = meter.Int64Counter( + m.ResponseBytesReceived, err = meter.Int64Gauge( "kafka.broker.responses_bytes_received", metric.WithDescription("Total number of bytes received"), metric.WithUnit("{byte}"), @@ -287,7 +287,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.responses_bytes_received: %w", err) } - m.ResponseErrors, err = meter.Int64Counter( + m.ResponseErrors, err = meter.Int64Gauge( "kafka.broker.responses_errors", metric.WithDescription("Total number of receive errors"), metric.WithUnit("{event}"), @@ -296,7 +296,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.responses_errors: %w", err) } - m.LastSocketReceive, err = meter.Int64Counter( + m.LastSocketReceive, err = meter.Int64Gauge( "kafka.broker.last_socket_receive", metric.WithDescription("Microseconds since last socket receive (or -1 if no receives yet for current connection)"), metric.WithUnit("{microsecond}"), @@ -305,7 +305,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.last_socket_receive: %w", err) } - m.Connects, err = meter.Int64Counter( + m.Connects, err = meter.Int64Gauge( "kafka.broker.connects", metric.WithDescription("Number of connection attempts, including successful and failed, and name resolution failures"), metric.WithUnit("{event}"), @@ -314,7 +314,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.broker.connects: %w", err) } - m.Disconnects, err = meter.Int64Counter( + m.Disconnects, err = meter.Int64Gauge( "kafka.broker.disconnects", metric.WithDescription("Number of disconnects (triggered by broker, network, load-balancer, etc.)"), metric.WithUnit("{event}"), diff --git a/pkg/kafka/metrics/consumergroup.go b/pkg/kafka/metrics/consumergroup.go index 834bdda7b..719f5fbf2 100644 --- a/pkg/kafka/metrics/consumergroup.go +++ b/pkg/kafka/metrics/consumergroup.go @@ -20,7 +20,7 @@ type ConsumerGroupMetrics struct { // Time elapsed since last rebalance (assign or revoke) (milliseconds) RebalanceAge metric.Int64Gauge // Total number of rebalances (assign or revoke) - RebalanceCount metric.Int64Counter + RebalanceCount metric.Int64Gauge // Current assignment's partition count PartitionAssigned metric.Int64Gauge } @@ -30,7 +30,7 @@ func (m *ConsumerGroupMetrics) Add(ctx context.Context, stats *stats.ConsumerGro m.StateAge.Record(ctx, stats.StateAge, metric.WithAttributes(attrs...)) m.JoinState.Record(ctx, stats.JoinState.Int64(), metric.WithAttributes(attrs...)) m.RebalanceAge.Record(ctx, stats.RebalanceAge, metric.WithAttributes(attrs...)) - m.RebalanceCount.Add(ctx, stats.RebalanceCount, metric.WithAttributes(attrs...)) + m.RebalanceCount.Record(ctx, stats.RebalanceCount, metric.WithAttributes(attrs...)) m.PartitionAssigned.Record(ctx, stats.PartitionAssigned, metric.WithAttributes(attrs...)) } @@ -72,7 +72,7 @@ func NewConsumerGroupMetrics(meter metric.Meter) (*ConsumerGroupMetrics, error) return nil, fmt.Errorf("failed to create metric: kafka.consumer_group.rebalance_age: %w", err) } - m.RebalanceCount, err = meter.Int64Counter( + m.RebalanceCount, err = meter.Int64Gauge( "kafka.consumer_group.rebalance_count", metric.WithDescription("Time elapsed since last rebalance (assign or revoke) (milliseconds)"), metric.WithUnit("{event}"), diff --git a/pkg/kafka/metrics/metrics.go b/pkg/kafka/metrics/metrics.go index 7a4a17a95..fc5f85ae5 100644 --- a/pkg/kafka/metrics/metrics.go +++ b/pkg/kafka/metrics/metrics.go @@ -18,7 +18,7 @@ type Metrics struct { *ConsumerGroupMetrics // Time since this client instance was created (microseconds) - Age metric.Int64Counter + Age metric.Int64Gauge // Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll() ReplyQueue metric.Int64Gauge // Current number of messages in producer queues @@ -26,21 +26,21 @@ type Metrics struct { // Current total size of messages in producer queues MessageSize metric.Int64Gauge // Total number of requests sent to Kafka brokers - RequestsSent metric.Int64Counter + RequestsSent metric.Int64Gauge // Total number of bytes transmitted to Kafka brokers - RequestsBytesSent metric.Int64Counter + RequestsBytesSent metric.Int64Gauge // Total number of responses received from Kafka brokers - RequestsReceived metric.Int64Counter + RequestsReceived metric.Int64Gauge // Total number of bytes received from Kafka brokers - RequestsBytesReceived metric.Int64Counter + RequestsBytesReceived metric.Int64Gauge // Total number of messages transmitted (produced) to Kafka brokers - MessagesProduced metric.Int64Counter + MessagesProduced metric.Int64Gauge // Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers - MessagesBytesProduced metric.Int64Counter + MessagesBytesProduced metric.Int64Gauge // Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers. - MessagesConsumed metric.Int64Counter + MessagesConsumed metric.Int64Gauge // Total number of message bytes (including framing) received from Kafka brokers - MessagesBytesConsumed metric.Int64Counter + MessagesBytesConsumed metric.Int64Gauge // Number of topics in the metadata cache TopicsInMetadataCache metric.Int64Gauge } @@ -52,18 +52,18 @@ func (m *Metrics) Add(ctx context.Context, stats *stats.Stats, attrs ...attribut attribute.String("type", stats.Type), }...) - m.Age.Add(ctx, stats.Age, metric.WithAttributes(attrs...)) + m.Age.Record(ctx, stats.Age, metric.WithAttributes(attrs...)) m.ReplyQueue.Record(ctx, stats.ReplyQueue, metric.WithAttributes(attrs...)) m.MessageCount.Record(ctx, stats.MessageCount, metric.WithAttributes(attrs...)) m.MessageSize.Record(ctx, stats.MessageSize, metric.WithAttributes(attrs...)) - m.RequestsSent.Add(ctx, stats.RequestsSent, metric.WithAttributes(attrs...)) - m.RequestsBytesSent.Add(ctx, stats.RequestsBytesSent, metric.WithAttributes(attrs...)) - m.RequestsReceived.Add(ctx, stats.RequestsReceived, metric.WithAttributes(attrs...)) - m.RequestsBytesReceived.Add(ctx, stats.RequestsBytesReceived, metric.WithAttributes(attrs...)) - m.MessagesProduced.Add(ctx, stats.MessagesProduced, metric.WithAttributes(attrs...)) - m.MessagesBytesProduced.Add(ctx, stats.MessagesBytesProduced, metric.WithAttributes(attrs...)) - m.MessagesConsumed.Add(ctx, stats.MessagesConsumed, metric.WithAttributes(attrs...)) - m.MessagesBytesConsumed.Add(ctx, stats.MessagesBytesConsumed, metric.WithAttributes(attrs...)) + m.RequestsSent.Record(ctx, stats.RequestsSent, metric.WithAttributes(attrs...)) + m.RequestsBytesSent.Record(ctx, stats.RequestsBytesSent, metric.WithAttributes(attrs...)) + m.RequestsReceived.Record(ctx, stats.RequestsReceived, metric.WithAttributes(attrs...)) + m.RequestsBytesReceived.Record(ctx, stats.RequestsBytesReceived, metric.WithAttributes(attrs...)) + m.MessagesProduced.Record(ctx, stats.MessagesProduced, metric.WithAttributes(attrs...)) + m.MessagesBytesProduced.Record(ctx, stats.MessagesBytesProduced, metric.WithAttributes(attrs...)) + m.MessagesConsumed.Record(ctx, stats.MessagesConsumed, metric.WithAttributes(attrs...)) + m.MessagesBytesConsumed.Record(ctx, stats.MessagesBytesConsumed, metric.WithAttributes(attrs...)) m.TopicsInMetadataCache.Record(ctx, stats.TopicsInMetadataCache, metric.WithAttributes(attrs...)) if m.BrokerMetrics != nil { @@ -107,7 +107,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create topic metrics: %w", err) } - m.Age, err = meter.Int64Counter( + m.Age, err = meter.Int64Gauge( "kafka.age_microseconds", metric.WithDescription("Time since this client instance was created (microseconds)"), metric.WithUnit("{microseconds}"), @@ -143,7 +143,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.message_size_bytes: %w", err) } - m.RequestsSent, err = meter.Int64Counter( + m.RequestsSent, err = meter.Int64Gauge( "kafka.requests_sent_count", metric.WithDescription("Total number of requests sent to Kafka brokers"), metric.WithUnit("{request}"), @@ -152,7 +152,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.requests_sent_count: %w", err) } - m.RequestsBytesSent, err = meter.Int64Counter( + m.RequestsBytesSent, err = meter.Int64Gauge( "kafka.request_sent_bytes", metric.WithDescription("Total number of bytes transmitted to Kafka brokers"), metric.WithUnit("{byte}"), @@ -161,7 +161,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.request_sent_bytes: %w", err) } - m.RequestsReceived, err = meter.Int64Counter( + m.RequestsReceived, err = meter.Int64Gauge( "kafka.requests_received_count", metric.WithDescription("Total number of responses received from Kafka brokers"), metric.WithUnit("{request}"), @@ -170,7 +170,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.requests_received_count: %w", err) } - m.RequestsBytesReceived, err = meter.Int64Counter( + m.RequestsBytesReceived, err = meter.Int64Gauge( "kafka.requests_received_bytes", metric.WithDescription("Total number of bytes received from Kafka brokers"), metric.WithUnit("{byte}"), @@ -179,7 +179,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.requests-sent: %w", err) } - m.MessagesProduced, err = meter.Int64Counter( + m.MessagesProduced, err = meter.Int64Gauge( "kafka.messages_produced_count", metric.WithDescription("Total number of messages transmitted (produced) to Kafka brokers"), metric.WithUnit("{message}"), @@ -188,7 +188,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.messages_produced_count: %w", err) } - m.MessagesBytesProduced, err = meter.Int64Counter( + m.MessagesBytesProduced, err = meter.Int64Gauge( "kafka.messages_produced_bytes", metric.WithDescription("Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers"), metric.WithUnit("{byte}"), @@ -197,7 +197,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.messages_produced_bytes: %w", err) } - m.MessagesConsumed, err = meter.Int64Counter( + m.MessagesConsumed, err = meter.Int64Gauge( "kafka.messages_consumed_count", metric.WithDescription("Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers."), metric.WithUnit("{message}"), @@ -206,7 +206,7 @@ func New(meter metric.Meter) (*Metrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.messages_consumed_count: %w", err) } - m.MessagesBytesConsumed, err = meter.Int64Counter( + m.MessagesBytesConsumed, err = meter.Int64Gauge( "kafka.messages_consumed_bytes", metric.WithDescription("Total number of message bytes (including framing) received from Kafka brokers"), metric.WithUnit("{byte}"), diff --git a/pkg/kafka/metrics/topic.go b/pkg/kafka/metrics/topic.go index 454a8e3ff..c5b384ae0 100644 --- a/pkg/kafka/metrics/topic.go +++ b/pkg/kafka/metrics/topic.go @@ -362,15 +362,15 @@ type PartitionMetrics struct { // Difference between (HighWatermarkOffset or LowWatermarkOffset) and StoredOffset. See ConsumerLag and StoredOffset. ConsumerLagStored metric.Int64Gauge // Total number of messages transmitted (produced) - MessagesSent metric.Int64Counter + MessagesSent metric.Int64Gauge // Total number of bytes transmitted - MessageBytesSent metric.Int64Counter + MessageBytesSent metric.Int64Gauge // Total number of messages consumed, not including ignored messages (due to offset, etc). - MessagesReceived metric.Int64Counter + MessagesReceived metric.Int64Gauge // Total number of bytes received - MessageBytesReceived metric.Int64Counter + MessageBytesReceived metric.Int64Gauge // Total number of messages received (consumer), or total number of messages produced (possibly not yet transmitted) (producer). - TotalNumOfMessages metric.Int64Counter + TotalNumOfMessages metric.Int64Gauge // Current number of messages in-flight to/from broker MessagesInflight metric.Int64Gauge } @@ -399,11 +399,11 @@ func (m *PartitionMetrics) Add(ctx context.Context, stats *stats.Partition, attr m.LastStableOffsetOnBroker.Record(ctx, stats.LastStableOffsetOnBroker, metric.WithAttributes(attrs...)) m.ConsumerLag.Record(ctx, stats.ConsumerLag, metric.WithAttributes(attrs...)) m.ConsumerLagStored.Record(ctx, stats.ConsumerLagStored, metric.WithAttributes(attrs...)) - m.MessagesSent.Add(ctx, stats.MessagesSent, metric.WithAttributes(attrs...)) - m.MessageBytesSent.Add(ctx, stats.MessageBytesSent, metric.WithAttributes(attrs...)) - m.MessagesReceived.Add(ctx, stats.MessagesReceived, metric.WithAttributes(attrs...)) - m.MessageBytesReceived.Add(ctx, stats.MessageBytesReceived, metric.WithAttributes(attrs...)) - m.TotalNumOfMessages.Add(ctx, stats.TotalNumOfMessages, metric.WithAttributes(attrs...)) + m.MessagesSent.Record(ctx, stats.MessagesSent, metric.WithAttributes(attrs...)) + m.MessageBytesSent.Record(ctx, stats.MessageBytesSent, metric.WithAttributes(attrs...)) + m.MessagesReceived.Record(ctx, stats.MessagesReceived, metric.WithAttributes(attrs...)) + m.MessageBytesReceived.Record(ctx, stats.MessageBytesReceived, metric.WithAttributes(attrs...)) + m.TotalNumOfMessages.Record(ctx, stats.TotalNumOfMessages, metric.WithAttributes(attrs...)) m.MessagesInflight.Record(ctx, stats.MessagesInflight, metric.WithAttributes(attrs...)) } @@ -580,7 +580,7 @@ func NewPartitionMetrics(meter metric.Meter) (*PartitionMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.partition.consumer_lag_stored: %w", err) } - m.MessagesSent, err = meter.Int64Counter( + m.MessagesSent, err = meter.Int64Gauge( "kafka.partition.messages_sent", metric.WithDescription("Total number of messages transmitted (produced)"), metric.WithUnit("{message}"), @@ -589,7 +589,7 @@ func NewPartitionMetrics(meter metric.Meter) (*PartitionMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.partition.messages_sent: %w", err) } - m.MessageBytesSent, err = meter.Int64Counter( + m.MessageBytesSent, err = meter.Int64Gauge( "kafka.partition.message_bytes_sent", metric.WithDescription("Total number of bytes transmitted for messages_sent"), metric.WithUnit("{byte}"), @@ -598,7 +598,7 @@ func NewPartitionMetrics(meter metric.Meter) (*PartitionMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.partition.message_bytes_sent: %w", err) } - m.MessagesReceived, err = meter.Int64Counter( + m.MessagesReceived, err = meter.Int64Gauge( "kafka.partition.messages_received", metric.WithDescription("Total number of messages transmitted (produced)"), metric.WithUnit("{message}"), @@ -607,7 +607,7 @@ func NewPartitionMetrics(meter metric.Meter) (*PartitionMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.partition.messages_received: %w", err) } - m.MessageBytesReceived, err = meter.Int64Counter( + m.MessageBytesReceived, err = meter.Int64Gauge( "kafka.partition.message_bytes_received", metric.WithDescription("Total number of bytes received for messages_received"), metric.WithUnit("{byte}"), @@ -616,7 +616,7 @@ func NewPartitionMetrics(meter metric.Meter) (*PartitionMetrics, error) { return nil, fmt.Errorf("failed to create metric: kafka.partition.message_bytes_received: %w", err) } - m.TotalNumOfMessages, err = meter.Int64Counter( + m.TotalNumOfMessages, err = meter.Int64Gauge( "kafka.partition.total_num_of_messages", metric.WithDescription("Total number of messages received (consumer, same as MessageBytesReceived), or total number of messages produced (possibly not yet transmitted) (producer)"), metric.WithUnit("{message}"),