Skip to content

Commit

Permalink
feat(kafka): allow setting debug contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgacsal committed Jul 15, 2024
1 parent e878779 commit e56de4c
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 0 deletions.
5 changes: 5 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ telemetry:
# topicMetadataRefreshInterval: 1m
# # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection.
# socketKeepAliveEnabled: true
# # Set list of debug contexts to enable for librdkafka
# # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
# debugContexts:
# - broker
# - topic

# dedupe:
# enabled: true
Expand Down
5 changes: 5 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func TestComplete(t *testing.T) {
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
SocketKeepAliveEnabled: true,
DebugContexts: pkgkafka.DebugContexts{
"broker",
"topic",
"consumer",
},
},
Partitions: 1,
EventsTopicTemplate: "om_%s_events",
Expand Down
8 changes: 8 additions & 0 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type KafkaConfiguration struct {
// in case of large clusters where changes are more frequent.
// This value must not be set to value lower than 10s.
TopicMetadataRefreshInterval pkgkafka.TimeDurationMilliSeconds

// Enable contexts for extensive debugging of librdkafka.
// See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
DebugContexts pkgkafka.DebugContexts
}

func (c KafkaConfiguration) Validate() error {
Expand Down Expand Up @@ -134,6 +138,10 @@ func (c KafkaConfiguration) CreateKafkaConfig() kafka.ConfigMap {
config["metadata.max.age.ms"] = 3 * c.TopicMetadataRefreshInterval
}

if len(c.DebugContexts) > 0 {
config["debug"] = c.DebugContexts.String()
}

return config
}

Expand Down
6 changes: 6 additions & 0 deletions config/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func TestKafkaIngestConfiguration(t *testing.T) {
BrokerAddressFamily: "v6",
SocketKeepAliveEnabled: true,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
DebugContexts: pkgkafka.DebugContexts{
"broker",
"topic",
"consumer",
},
},
ExpectedKafkaConfigMap: kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:29092",
Expand All @@ -43,6 +48,7 @@ func TestKafkaIngestConfiguration(t *testing.T) {
"socket.keepalive.enable": true,
"statistics.interval.ms": pkgkafka.TimeDurationMilliSeconds(10 * time.Second),
"topic.metadata.refresh.interval.ms": pkgkafka.TimeDurationMilliSeconds(time.Minute),
"debug": "broker,topic,consumer",
},
},
{
Expand Down
4 changes: 4 additions & 0 deletions config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ ingest:
brokerAddressFamily: any
socketKeepAliveEnabled: true
topicMetadataRefreshInterval: 1m
debugContexts:
- broker
- topic
- consumer

aggregation:
clickhouse:
Expand Down
142 changes: 142 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,145 @@ func (d TimeDurationMilliSeconds) Duration() time.Duration {
func (d TimeDurationMilliSeconds) String() string {
return strconv.Itoa(int(time.Duration(d).Milliseconds()))
}

var _ configValue = (*DebugContext)(nil)

type DebugContext string

func (c DebugContext) String() string {
return string(c)
}

func (c *DebugContext) UnmarshalText(text []byte) error {
switch strings.ToLower(strings.TrimSpace(string(text))) {
case "generic":
*c = DebugContextGeneric
case "broker":
*c = DebugContextBroker
case "topic":
*c = DebugContextTopic
case "metadata":
*c = DebugContextMetadata
case "feature":
*c = DebugContextFeature
case "queue":
*c = DebugContextQueue
case "msg":
*c = DebugContextMessage
case "protocol":
*c = DebugContextProtocol
case "cgrp":
*c = DebugContextConsumerGroup
case "security":
*c = DebugContextSecurity
case "fetch":
*c = DebugContextFetch
case "interceptor":
*c = DebugContextInterceptor
case "plugin":
*c = DebugContextPlugin
case "consumer":
*c = DebugContextConsumer
case "admin":
*c = DebugContextAdmin
case "eos":
*c = DebugContextIdempotentProducer
case "mock":
*c = DebugContextMock
case "assignor":
*c = DebugContextAssignor
case "conf":
*c = DebugContextConfig
case "all":
*c = DebugContextAll
default:
return fmt.Errorf("invalid debug context: %s", text)
}

return nil
}

func (c *DebugContext) UnmarshalJSON(data []byte) error {
return c.UnmarshalText(data)
}

const (
// DebugContextGeneric enables generic client instance level debugging.
// Includes initialization and termination debugging.
// Client Type: producer, consumer
DebugContextGeneric DebugContext = "generic"
// DebugContextBroker enables broker and connection state debugging.
// Client Type: producer, consumer
DebugContextBroker DebugContext = "broker"
// DebugContextTopic enables topic and partition state debugging. Includes leader changes.
// Client Type: producer, consumer
DebugContextTopic DebugContext = "topic"
// DebugContextMetadata enables cluster and topic metadata retrieval debugging.
// Client Type: producer, consumer
DebugContextMetadata DebugContext = "metadata"
// DebugContextFeature enables Kafka protocol feature support as negotiated with the broker.
// Client Type: producer, consumer
DebugContextFeature DebugContext = "feature"
// DebugContextQueue enables message queue debugging.
// Client Type: producer
DebugContextQueue DebugContext = "queue"
// DebugContextMessage enables message debugging. Includes information about batching, compression, sizes, etc.
// Client Type: producer, consumer
DebugContextMessage DebugContext = "msg"
// DebugContextProtocol enables Kafka protocol request/response debugging. Includes latency (rtt) printouts.
// Client Type: producer, consumer
DebugContextProtocol DebugContext = "protocol"
// DebugContextConsumerGroup enables low-level consumer group state debugging.
// Client Type: consumer
DebugContextConsumerGroup DebugContext = "cgrp"
// DebugContextSecurity enables security and authentication debugging.
// Client Type: producer, consumer
DebugContextSecurity DebugContext = "security"
// DebugContextFetch enables consumer message fetch debugging. Includes decision when and why messages are fetched.
// Client Type: consumer
DebugContextFetch DebugContext = "fetch"
// DebugContextInterceptor enables interceptor interface debugging.
// Client Type: producer, consumer
DebugContextInterceptor DebugContext = "interceptor"
// DebugContextPlugin enables plugin loading debugging.
// Client Type: producer, consumer
DebugContextPlugin DebugContext = "plugin"
// DebugContextConsumer enables high-level consumer debugging.
// Client Type: consumer
DebugContextConsumer DebugContext = "consumer"
// DebugContextAdmin enables admin API debugging.
// Client Type: admin
DebugContextAdmin DebugContext = "admin"
// DebugContextIdempotentProducer enables idempotent Producer debugging.
// Client Type: producer
DebugContextIdempotentProducer DebugContext = "eos"
// DebugContextMock enables mock cluster functionality debugging.
// Client Type: producer, consumer
DebugContextMock DebugContext = "mock"
// DebugContextAssignor enables detailed consumer group partition assignor debugging.
// Client Type: consumer
DebugContextAssignor DebugContext = "assignor"
// DebugContextConfig enables displaying set configuration properties on startup.
// Client Type: producer, consumer
DebugContextConfig DebugContext = "conf"
// DebugContextAll enables all of the above.
// Client Type: producer, consumer
DebugContextAll DebugContext = "all"
)

var _ fmt.Stringer = (DebugContexts)(nil)

type DebugContexts []DebugContext

func (d DebugContexts) String() string {
if len(d) > 0 {
dd := make([]string, len(d))
for idx, v := range d {
dd[idx] = v.String()
}

return strings.Join(dd, ",")
}

return ""
}

0 comments on commit e56de4c

Please sign in to comment.