From e878779ba499cc47d02bebf6575033b871d2a1b3 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 15 Jul 2024 11:55:41 +0200 Subject: [PATCH 1/3] feat(sink-worker): enable consumer logging --- cmd/sink-worker/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 626a3ca77..3ce343339 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -35,6 +35,7 @@ import ( "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/sink" "github.com/openmeterio/openmeter/pkg/gosundheit" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" ) @@ -276,6 +277,9 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr return nil, fmt.Errorf("failed to initialize kafka consumer: %s", err) } + // Enable Kafka client logging + go pkgkafka.ConsumeLogChannel(consumer, logger.WithGroup("kafka").WithGroup("consumer")) + sinkConfig := sink.SinkConfig{ Logger: logger, Tracer: tracer, From e56de4cc26250daaca6978df51454440ed85fae5 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 15 Jul 2024 12:01:41 +0200 Subject: [PATCH 2/3] feat(kafka): allow setting debug contexts --- config.example.yaml | 5 ++ config/config_test.go | 5 ++ config/ingest.go | 8 ++ config/ingest_test.go | 6 ++ config/testdata/complete.yaml | 4 + pkg/kafka/config.go | 142 ++++++++++++++++++++++++++++++++++ 6 files changed, 170 insertions(+) diff --git a/config.example.yaml b/config.example.yaml index 7fbaaff93..cc50ffb70 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -16,6 +16,11 @@ telemetry: # topicMetadataRefreshInterval: 1m # # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection. # socketKeepAliveEnabled: true +# # Set list of debug contexts to enable for librdkafka +# # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts +# debugContexts: +# - broker +# - topic # dedupe: # enabled: true diff --git a/config/config_test.go b/config/config_test.go index b9f0e27a5..129c12219 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -89,6 +89,11 @@ func TestComplete(t *testing.T) { TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), SocketKeepAliveEnabled: true, + DebugContexts: pkgkafka.DebugContexts{ + "broker", + "topic", + "consumer", + }, }, Partitions: 1, EventsTopicTemplate: "om_%s_events", diff --git a/config/ingest.go b/config/ingest.go index 5c7b06306..65f543820 100644 --- a/config/ingest.go +++ b/config/ingest.go @@ -64,6 +64,10 @@ type KafkaConfiguration struct { // in case of large clusters where changes are more frequent. // This value must not be set to value lower than 10s. TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds + + // Enable contexts for extensive debugging of librdkafka. + // See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts + DebugContexts pkgkafka.DebugContexts } func (c KafkaConfiguration) Validate() error { @@ -134,6 +138,10 @@ func (c KafkaConfiguration) CreateKafkaConfig() kafka.ConfigMap { config["metadata.max.age.ms"] = 3 * c.TopicMetadataRefreshInterval } + if len(c.DebugContexts) > 0 { + config["debug"] = c.DebugContexts.String() + } + return config } diff --git a/config/ingest_test.go b/config/ingest_test.go index 21aa8d75d..c12740c2e 100644 --- a/config/ingest_test.go +++ b/config/ingest_test.go @@ -30,6 +30,11 @@ func TestKafkaIngestConfiguration(t *testing.T) { BrokerAddressFamily: "v6", SocketKeepAliveEnabled: true, TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), + DebugContexts: pkgkafka.DebugContexts{ + "broker", + "topic", + "consumer", + }, }, ExpectedKafkaConfigMap: kafka.ConfigMap{ "bootstrap.servers": "127.0.0.1:29092", @@ -43,6 +48,7 @@ func TestKafkaIngestConfiguration(t *testing.T) { "socket.keepalive.enable": true, "statistics.interval.ms": pkgkafka.TimeDurationMilliSeconds(10 * time.Second), "topic.metadata.refresh.interval.ms": pkgkafka.TimeDurationMilliSeconds(time.Minute), + "debug": "broker,topic,consumer", }, }, { diff --git a/config/testdata/complete.yaml b/config/testdata/complete.yaml index 96b340d77..cbdb16341 100644 --- a/config/testdata/complete.yaml +++ b/config/testdata/complete.yaml @@ -39,6 +39,10 @@ ingest: brokerAddressFamily: any socketKeepAliveEnabled: true topicMetadataRefreshInterval: 1m + debugContexts: + - broker + - topic + - consumer aggregation: clickhouse: diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index e2f0358dd..7f5036b2b 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -74,3 +74,145 @@ func (d TimeDurationMilliSeconds) Duration() time.Duration { func (d TimeDurationMilliSeconds) String() string { return strconv.Itoa(int(time.Duration(d).Milliseconds())) } + +var _ configValue = (*DebugContext)(nil) + +type DebugContext string + +func (c DebugContext) String() string { + return string(c) +} + +func (c *DebugContext) UnmarshalText(text []byte) error { + switch strings.ToLower(strings.TrimSpace(string(text))) { + case "generic": + *c = DebugContextGeneric + case "broker": + *c = DebugContextBroker + case "topic": + *c = DebugContextTopic + case "metadata": + *c = DebugContextMetadata + case "feature": + *c = DebugContextFeature + case "queue": + *c = DebugContextQueue + case "msg": + *c = DebugContextMessage + case "protocol": + *c = DebugContextProtocol + case "cgrp": + *c = DebugContextConsumerGroup + case "security": + *c = DebugContextSecurity + case "fetch": + *c = DebugContextFetch + case "interceptor": + *c = DebugContextInterceptor + case "plugin": + *c = DebugContextPlugin + case "consumer": + *c = DebugContextConsumer + case "admin": + *c = DebugContextAdmin + case "eos": + *c = DebugContextIdempotentProducer + case "mock": + *c = DebugContextMock + case "assignor": + *c = DebugContextAssignor + case "conf": + *c = DebugContextConfig + case "all": + *c = DebugContextAll + default: + return fmt.Errorf("invalid debug context: %s", text) + } + + return nil +} + +func (c *DebugContext) UnmarshalJSON(data []byte) error { + return c.UnmarshalText(data) +} + +const ( + // DebugContextGeneric enables generic client instance level debugging. + // Includes initialization and termination debugging. + // Client Type: producer, consumer + DebugContextGeneric DebugContext = "generic" + // DebugContextBroker enables broker and connection state debugging. + // Client Type: producer, consumer + DebugContextBroker DebugContext = "broker" + // DebugContextTopic enables topic and partition state debugging. Includes leader changes. + // Client Type: producer, consumer + DebugContextTopic DebugContext = "topic" + // DebugContextMetadata enables cluster and topic metadata retrieval debugging. + // Client Type: producer, consumer + DebugContextMetadata DebugContext = "metadata" + // DebugContextFeature enables Kafka protocol feature support as negotiated with the broker. + // Client Type: producer, consumer + DebugContextFeature DebugContext = "feature" + // DebugContextQueue enables message queue debugging. + // Client Type: producer + DebugContextQueue DebugContext = "queue" + // DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc. + // Client Type: producer, consumer + DebugContextMessage DebugContext = "msg" + // DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts. + // Client Type: producer, consumer + DebugContextProtocol DebugContext = "protocol" + // DebugContextConsumerGroup enables low-level consumer group state debugging. + // Client Type: consumer + DebugContextConsumerGroup DebugContext = "cgrp" + // DebugContextSecurity enables security and authentication debugging. + // Client Type: producer, consumer + DebugContextSecurity DebugContext = "security" + // DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched. + // Client Type: consumer + DebugContextFetch DebugContext = "fetch" + // DebugContextInterceptor enables interceptor interface debugging. + // Client Type: producer, consumer + DebugContextInterceptor DebugContext = "interceptor" + // DebugContextPlugin enables plugin loading debugging. + // Client Type: producer, consumer + DebugContextPlugin DebugContext = "plugin" + // DebugContextConsumer enables high-level consumer debugging. + // Client Type: consumer + DebugContextConsumer DebugContext = "consumer" + // DebugContextAdmin enables admin API debugging. + // Client Type: admin + DebugContextAdmin DebugContext = "admin" + // DebugContextIdempotentProducer enables idempotent Producer debugging. + // Client Type: producer + DebugContextIdempotentProducer DebugContext = "eos" + // DebugContextMock enables mock cluster functionality debugging. + // Client Type: producer, consumer + DebugContextMock DebugContext = "mock" + // DebugContextAssignor enables detailed consumer group partition assignor debugging. + // Client Type: consumer + DebugContextAssignor DebugContext = "assignor" + // DebugContextConfig enables displaying set configuration properties on startup. + // Client Type: producer, consumer + DebugContextConfig DebugContext = "conf" + // DebugContextAll enables all of the above. + // Client Type: producer, consumer + DebugContextAll DebugContext = "all" +) + +var _ fmt.Stringer = (DebugContexts)(nil) + +type DebugContexts []DebugContext + +func (d DebugContexts) String() string { + if len(d) > 0 { + dd := make([]string, len(d)) + for idx, v := range d { + dd[idx] = v.String() + } + + return strings.Join(dd, ",") + } + + return "" +} From bb795e9375eb3e763161c404774b0c6101b62c83 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 15 Jul 2024 13:12:00 +0200 Subject: [PATCH 3/3] refactor(backend): local Kafka errors logged on warning level --- internal/ingest/kafkaingest/collector.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/ingest/kafkaingest/collector.go b/internal/ingest/kafkaingest/collector.go index 50db7c0cb..788b8eece 100644 --- a/internal/ingest/kafkaingest/collector.go +++ b/internal/ingest/kafkaingest/collector.go @@ -152,7 +152,19 @@ func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *s // as the underlying client will automatically try to // recover from any errors encountered, the application // does not need to take action on them. - logger.Error("kafka error", "error", ev) + attrs := []any{ + slog.Int("code", int(ev.Code())), + slog.String("error", ev.Error()), + } + + // Log Kafka client "local" errors on warning level as those are mostly informational and the client is + // able to handle/recover from them automatically. + // See: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L415 + if ev.Code() <= -100 { + logger.Warn("kafka local error", attrs...) + } else { + logger.Error("kafka broker error", attrs...) + } } case <-ctx.Done(): return ctx.Err()