Skip to content

Commit

Permalink
fix: Kafka metrics (#1985)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgacsal authored Dec 18, 2024
1 parent 05e5ca6 commit e170684
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 81 deletions.
72 changes: 36 additions & 36 deletions pkg/kafka/metrics/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,29 @@ type BrokerMetrics struct {
// Number of messages in-flight to broker awaiting response
InflightMessagesAwaitingResponse metric.Int64Gauge
// Total number of requests sent
RequestsSent metric.Int64Counter
RequestsSent metric.Int64Gauge
// Total number of bytes sent
RequestBytesSent metric.Int64Counter
RequestBytesSent metric.Int64Gauge
// Total number of transmission errors
RequestErrors metric.Int64Counter
RequestErrors metric.Int64Gauge
// Total number of request retries
RequestRetries metric.Int64Counter
RequestRetries metric.Int64Gauge
// Microseconds since last socket send (or -1 if no sends yet for current connection).
LastSocketSend metric.Int64Counter
LastSocketSend metric.Int64Gauge
// Total number of requests timed out
RequestTimeouts metric.Int64Counter
RequestTimeouts metric.Int64Gauge
// Total number of responses received
ResponsesReceived metric.Int64Counter
ResponsesReceived metric.Int64Gauge
// Total number of bytes received
ResponseBytesReceived metric.Int64Counter
ResponseBytesReceived metric.Int64Gauge
// Total number of receive errors
ResponseErrors metric.Int64Counter
ResponseErrors metric.Int64Gauge
// Microseconds since last socket receive (or -1 if no receives yet for current connection).
LastSocketReceive metric.Int64Counter
LastSocketReceive metric.Int64Gauge
// Number of connection attempts, including successful and failed, and name resolution failures.
Connects metric.Int64Counter
Connects metric.Int64Gauge
// Number of disconnects (triggered by broker, network, load-balancer, etc.).
Disconnects metric.Int64Counter
Disconnects metric.Int64Gauge

// Smallest value
LatencyMin metric.Int64Gauge
Expand Down Expand Up @@ -112,18 +112,18 @@ func (m *BrokerMetrics) Add(ctx context.Context, stats *stats.BrokerStats, attrs
m.MessagesAwaitingTransmission.Record(ctx, stats.MessagesAwaitingTransmission, metric.WithAttributes(attrs...))
m.InflightRequestsAwaitingResponse.Record(ctx, stats.InflightRequestsAwaitingResponse, metric.WithAttributes(attrs...))
m.InflightMessagesAwaitingResponse.Record(ctx, stats.InflightMessagesAwaitingResponse, metric.WithAttributes(attrs...))
m.RequestsSent.Add(ctx, stats.RequestsSent, metric.WithAttributes(attrs...))
m.RequestBytesSent.Add(ctx, stats.RequestBytesSent, metric.WithAttributes(attrs...))
m.RequestErrors.Add(ctx, stats.RequestErrors, metric.WithAttributes(attrs...))
m.RequestRetries.Add(ctx, stats.RequestRetries, metric.WithAttributes(attrs...))
m.LastSocketSend.Add(ctx, stats.LastSocketSend, metric.WithAttributes(attrs...))
m.RequestTimeouts.Add(ctx, stats.RequestTimeouts, metric.WithAttributes(attrs...))
m.ResponsesReceived.Add(ctx, stats.ResponsesReceived, metric.WithAttributes(attrs...))
m.ResponseBytesReceived.Add(ctx, stats.ResponseBytesReceived, metric.WithAttributes(attrs...))
m.ResponseErrors.Add(ctx, stats.ResponseErrors, metric.WithAttributes(attrs...))
m.LastSocketReceive.Add(ctx, stats.LastSocketReceive, metric.WithAttributes(attrs...))
m.Connects.Add(ctx, stats.Connects, metric.WithAttributes(attrs...))
m.Disconnects.Add(ctx, stats.Disconnects, metric.WithAttributes(attrs...))
m.RequestsSent.Record(ctx, stats.RequestsSent, metric.WithAttributes(attrs...))
m.RequestBytesSent.Record(ctx, stats.RequestBytesSent, metric.WithAttributes(attrs...))
m.RequestErrors.Record(ctx, stats.RequestErrors, metric.WithAttributes(attrs...))
m.RequestRetries.Record(ctx, stats.RequestRetries, metric.WithAttributes(attrs...))
m.LastSocketSend.Record(ctx, stats.LastSocketSend, metric.WithAttributes(attrs...))
m.RequestTimeouts.Record(ctx, stats.RequestTimeouts, metric.WithAttributes(attrs...))
m.ResponsesReceived.Record(ctx, stats.ResponsesReceived, metric.WithAttributes(attrs...))
m.ResponseBytesReceived.Record(ctx, stats.ResponseBytesReceived, metric.WithAttributes(attrs...))
m.ResponseErrors.Record(ctx, stats.ResponseErrors, metric.WithAttributes(attrs...))
m.LastSocketReceive.Record(ctx, stats.LastSocketReceive, metric.WithAttributes(attrs...))
m.Connects.Record(ctx, stats.Connects, metric.WithAttributes(attrs...))
m.Disconnects.Record(ctx, stats.Disconnects, metric.WithAttributes(attrs...))

m.LatencyMin.Record(ctx, stats.Latency.Min, metric.WithAttributes(attrs...))
m.LatencyMax.Record(ctx, stats.Latency.Max, metric.WithAttributes(attrs...))
Expand Down Expand Up @@ -215,7 +215,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.inflight_messages_awaiting_response: %w", err)
}

m.RequestsSent, err = meter.Int64Counter(
m.RequestsSent, err = meter.Int64Gauge(
"kafka.broker.request_sent",
metric.WithDescription("Total number of requests sent"),
metric.WithUnit("{request}"),
Expand All @@ -224,7 +224,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.request_sent: %w", err)
}

m.RequestBytesSent, err = meter.Int64Counter(
m.RequestBytesSent, err = meter.Int64Gauge(
"kafka.broker.request_bytes_sent",
metric.WithDescription("Total number of bytes sent"),
metric.WithUnit("{byte}"),
Expand All @@ -233,7 +233,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.request_bytes_sent: %w", err)
}

m.RequestErrors, err = meter.Int64Counter(
m.RequestErrors, err = meter.Int64Gauge(
"kafka.broker.request_errors",
metric.WithDescription("Total number of transmission errors"),
metric.WithUnit("{request}"),
Expand All @@ -242,7 +242,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.request_errors: %w", err)
}

m.RequestRetries, err = meter.Int64Counter(
m.RequestRetries, err = meter.Int64Gauge(
"kafka.broker.request_retries",
metric.WithDescription("Total number of request retries"),
metric.WithUnit("{request}"),
Expand All @@ -251,7 +251,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.request_retries: %w", err)
}

m.LastSocketSend, err = meter.Int64Counter(
m.LastSocketSend, err = meter.Int64Gauge(
"kafka.broker.last_socket_send",
metric.WithDescription("Microseconds since last socket send (or -1 if no sends yet for current connection)"),
metric.WithUnit("{microsecond}"),
Expand All @@ -260,7 +260,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.last_socket_send: %w", err)
}

m.RequestTimeouts, err = meter.Int64Counter(
m.RequestTimeouts, err = meter.Int64Gauge(
"kafka.broker.request_timeouts",
metric.WithDescription("Total number of requests timed out"),
metric.WithUnit("{request}"),
Expand All @@ -269,7 +269,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.request_retries: %w", err)
}

m.ResponsesReceived, err = meter.Int64Counter(
m.ResponsesReceived, err = meter.Int64Gauge(
"kafka.broker.responses_received",
metric.WithDescription("Total number of responses received"),
metric.WithUnit("{response}"),
Expand All @@ -278,7 +278,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.responses_received: %w", err)
}

m.ResponseBytesReceived, err = meter.Int64Counter(
m.ResponseBytesReceived, err = meter.Int64Gauge(
"kafka.broker.responses_bytes_received",
metric.WithDescription("Total number of bytes received"),
metric.WithUnit("{byte}"),
Expand All @@ -287,7 +287,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.responses_bytes_received: %w", err)
}

m.ResponseErrors, err = meter.Int64Counter(
m.ResponseErrors, err = meter.Int64Gauge(
"kafka.broker.responses_errors",
metric.WithDescription("Total number of receive errors"),
metric.WithUnit("{event}"),
Expand All @@ -296,7 +296,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.responses_errors: %w", err)
}

m.LastSocketReceive, err = meter.Int64Counter(
m.LastSocketReceive, err = meter.Int64Gauge(
"kafka.broker.last_socket_receive",
metric.WithDescription("Microseconds since last socket receive (or -1 if no receives yet for current connection)"),
metric.WithUnit("{microsecond}"),
Expand All @@ -305,7 +305,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.last_socket_receive: %w", err)
}

m.Connects, err = meter.Int64Counter(
m.Connects, err = meter.Int64Gauge(
"kafka.broker.connects",
metric.WithDescription("Number of connection attempts, including successful and failed, and name resolution failures"),
metric.WithUnit("{event}"),
Expand All @@ -314,7 +314,7 @@ func NewBrokerMetrics(meter metric.Meter) (*BrokerMetrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.broker.connects: %w", err)
}

m.Disconnects, err = meter.Int64Counter(
m.Disconnects, err = meter.Int64Gauge(
"kafka.broker.disconnects",
metric.WithDescription("Number of disconnects (triggered by broker, network, load-balancer, etc.)"),
metric.WithUnit("{event}"),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/metrics/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ConsumerGroupMetrics struct {
// Time elapsed since last rebalance (assign or revoke) (milliseconds)
RebalanceAge metric.Int64Gauge
// Total number of rebalances (assign or revoke)
RebalanceCount metric.Int64Counter
RebalanceCount metric.Int64Gauge
// Current assignment's partition count
PartitionAssigned metric.Int64Gauge
}
Expand All @@ -30,7 +30,7 @@ func (m *ConsumerGroupMetrics) Add(ctx context.Context, stats *stats.ConsumerGro
m.StateAge.Record(ctx, stats.StateAge, metric.WithAttributes(attrs...))
m.JoinState.Record(ctx, stats.JoinState.Int64(), metric.WithAttributes(attrs...))
m.RebalanceAge.Record(ctx, stats.RebalanceAge, metric.WithAttributes(attrs...))
m.RebalanceCount.Add(ctx, stats.RebalanceCount, metric.WithAttributes(attrs...))
m.RebalanceCount.Record(ctx, stats.RebalanceCount, metric.WithAttributes(attrs...))
m.PartitionAssigned.Record(ctx, stats.PartitionAssigned, metric.WithAttributes(attrs...))
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func NewConsumerGroupMetrics(meter metric.Meter) (*ConsumerGroupMetrics, error)
return nil, fmt.Errorf("failed to create metric: kafka.consumer_group.rebalance_age: %w", err)
}

m.RebalanceCount, err = meter.Int64Counter(
m.RebalanceCount, err = meter.Int64Gauge(
"kafka.consumer_group.rebalance_count",
metric.WithDescription("Time elapsed since last rebalance (assign or revoke) (milliseconds)"),
metric.WithUnit("{event}"),
Expand Down
54 changes: 27 additions & 27 deletions pkg/kafka/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@ type Metrics struct {
*ConsumerGroupMetrics

// Time since this client instance was created (microseconds)
Age metric.Int64Counter
Age metric.Int64Gauge
// Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()
ReplyQueue metric.Int64Gauge
// Current number of messages in producer queues
MessageCount metric.Int64Gauge
// Current total size of messages in producer queues
MessageSize metric.Int64Gauge
// Total number of requests sent to Kafka brokers
RequestsSent metric.Int64Counter
RequestsSent metric.Int64Gauge
// Total number of bytes transmitted to Kafka brokers
RequestsBytesSent metric.Int64Counter
RequestsBytesSent metric.Int64Gauge
// Total number of responses received from Kafka brokers
RequestsReceived metric.Int64Counter
RequestsReceived metric.Int64Gauge
// Total number of bytes received from Kafka brokers
RequestsBytesReceived metric.Int64Counter
RequestsBytesReceived metric.Int64Gauge
// Total number of messages transmitted (produced) to Kafka brokers
MessagesProduced metric.Int64Counter
MessagesProduced metric.Int64Gauge
// Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers
MessagesBytesProduced metric.Int64Counter
MessagesBytesProduced metric.Int64Gauge
// Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
MessagesConsumed metric.Int64Counter
MessagesConsumed metric.Int64Gauge
// Total number of message bytes (including framing) received from Kafka brokers
MessagesBytesConsumed metric.Int64Counter
MessagesBytesConsumed metric.Int64Gauge
// Number of topics in the metadata cache
TopicsInMetadataCache metric.Int64Gauge
}
Expand All @@ -52,18 +52,18 @@ func (m *Metrics) Add(ctx context.Context, stats *stats.Stats, attrs ...attribut
attribute.String("type", stats.Type),
}...)

m.Age.Add(ctx, stats.Age, metric.WithAttributes(attrs...))
m.Age.Record(ctx, stats.Age, metric.WithAttributes(attrs...))
m.ReplyQueue.Record(ctx, stats.ReplyQueue, metric.WithAttributes(attrs...))
m.MessageCount.Record(ctx, stats.MessageCount, metric.WithAttributes(attrs...))
m.MessageSize.Record(ctx, stats.MessageSize, metric.WithAttributes(attrs...))
m.RequestsSent.Add(ctx, stats.RequestsSent, metric.WithAttributes(attrs...))
m.RequestsBytesSent.Add(ctx, stats.RequestsBytesSent, metric.WithAttributes(attrs...))
m.RequestsReceived.Add(ctx, stats.RequestsReceived, metric.WithAttributes(attrs...))
m.RequestsBytesReceived.Add(ctx, stats.RequestsBytesReceived, metric.WithAttributes(attrs...))
m.MessagesProduced.Add(ctx, stats.MessagesProduced, metric.WithAttributes(attrs...))
m.MessagesBytesProduced.Add(ctx, stats.MessagesBytesProduced, metric.WithAttributes(attrs...))
m.MessagesConsumed.Add(ctx, stats.MessagesConsumed, metric.WithAttributes(attrs...))
m.MessagesBytesConsumed.Add(ctx, stats.MessagesBytesConsumed, metric.WithAttributes(attrs...))
m.RequestsSent.Record(ctx, stats.RequestsSent, metric.WithAttributes(attrs...))
m.RequestsBytesSent.Record(ctx, stats.RequestsBytesSent, metric.WithAttributes(attrs...))
m.RequestsReceived.Record(ctx, stats.RequestsReceived, metric.WithAttributes(attrs...))
m.RequestsBytesReceived.Record(ctx, stats.RequestsBytesReceived, metric.WithAttributes(attrs...))
m.MessagesProduced.Record(ctx, stats.MessagesProduced, metric.WithAttributes(attrs...))
m.MessagesBytesProduced.Record(ctx, stats.MessagesBytesProduced, metric.WithAttributes(attrs...))
m.MessagesConsumed.Record(ctx, stats.MessagesConsumed, metric.WithAttributes(attrs...))
m.MessagesBytesConsumed.Record(ctx, stats.MessagesBytesConsumed, metric.WithAttributes(attrs...))
m.TopicsInMetadataCache.Record(ctx, stats.TopicsInMetadataCache, metric.WithAttributes(attrs...))

if m.BrokerMetrics != nil {
Expand Down Expand Up @@ -107,7 +107,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create topic metrics: %w", err)
}

m.Age, err = meter.Int64Counter(
m.Age, err = meter.Int64Gauge(
"kafka.age_microseconds",
metric.WithDescription("Time since this client instance was created (microseconds)"),
metric.WithUnit("{microseconds}"),
Expand Down Expand Up @@ -143,7 +143,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.message_size_bytes: %w", err)
}

m.RequestsSent, err = meter.Int64Counter(
m.RequestsSent, err = meter.Int64Gauge(
"kafka.requests_sent_count",
metric.WithDescription("Total number of requests sent to Kafka brokers"),
metric.WithUnit("{request}"),
Expand All @@ -152,7 +152,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.requests_sent_count: %w", err)
}

m.RequestsBytesSent, err = meter.Int64Counter(
m.RequestsBytesSent, err = meter.Int64Gauge(
"kafka.request_sent_bytes",
metric.WithDescription("Total number of bytes transmitted to Kafka brokers"),
metric.WithUnit("{byte}"),
Expand All @@ -161,7 +161,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.request_sent_bytes: %w", err)
}

m.RequestsReceived, err = meter.Int64Counter(
m.RequestsReceived, err = meter.Int64Gauge(
"kafka.requests_received_count",
metric.WithDescription("Total number of responses received from Kafka brokers"),
metric.WithUnit("{request}"),
Expand All @@ -170,7 +170,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.requests_received_count: %w", err)
}

m.RequestsBytesReceived, err = meter.Int64Counter(
m.RequestsBytesReceived, err = meter.Int64Gauge(
"kafka.requests_received_bytes",
metric.WithDescription("Total number of bytes received from Kafka brokers"),
metric.WithUnit("{byte}"),
Expand All @@ -179,7 +179,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.requests-sent: %w", err)
}

m.MessagesProduced, err = meter.Int64Counter(
m.MessagesProduced, err = meter.Int64Gauge(
"kafka.messages_produced_count",
metric.WithDescription("Total number of messages transmitted (produced) to Kafka brokers"),
metric.WithUnit("{message}"),
Expand All @@ -188,7 +188,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.messages_produced_count: %w", err)
}

m.MessagesBytesProduced, err = meter.Int64Counter(
m.MessagesBytesProduced, err = meter.Int64Gauge(
"kafka.messages_produced_bytes",
metric.WithDescription("Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers"),
metric.WithUnit("{byte}"),
Expand All @@ -197,7 +197,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.messages_produced_bytes: %w", err)
}

m.MessagesConsumed, err = meter.Int64Counter(
m.MessagesConsumed, err = meter.Int64Gauge(
"kafka.messages_consumed_count",
metric.WithDescription("Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers."),
metric.WithUnit("{message}"),
Expand All @@ -206,7 +206,7 @@ func New(meter metric.Meter) (*Metrics, error) {
return nil, fmt.Errorf("failed to create metric: kafka.messages_consumed_count: %w", err)
}

m.MessagesBytesConsumed, err = meter.Int64Counter(
m.MessagesBytesConsumed, err = meter.Int64Gauge(
"kafka.messages_consumed_bytes",
metric.WithDescription("Total number of message bytes (including framing) received from Kafka brokers"),
metric.WithUnit("{byte}"),
Expand Down
Loading

0 comments on commit e170684

Please sign in to comment.