The Confluent.Kafka.Extensions.OpenTelemetry
package enables collection of instrumentation data of the Confluent.Kafka
library.
The actual instrumentation of the Confluent.Kafka
library should be configured using
Confluent.Kafka.Extensions.Diagnostics.
Install-Package Confluent.Kafka.Extensions.OpenTelemetry
As Confluent.Kafka
does not expose any instrumentation data, additional, configuration is required.
Full documentation is available at Confluent.Kafka.Extensions.Diagnostics docs.
There is also an example on how to use the package in real world application.
using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;
using var producer =
new ProducerBuilder<Null, string>(new ProducerConfig(new ClientConfig { BootstrapServers = "localhost:9092" }))
.SetKeySerializer(Serializers.Null)
.SetValueSerializer(Serializers.Utf8)
.BuildWithInstrumentation();
await producer.ProduceAsync("topic", new Message<Null, string> { Value = "Hello World!" });
using Confluent.Kafka;
using Confluent.Kafka.Extensions.Diagnostics;
using var consumer = new ConsumerBuilder<Ignore, string>(
new ConsumerConfig(new ClientConfig { BootstrapServers = "localhost:9092" })
{
GroupId = "group", AutoOffsetReset = AutoOffsetReset.Earliest
})
.SetValueDeserializer(Deserializers.Utf8)
.Build();
consumer.Subscribe("topic");
consumer.ConsumeWithInstrumentation((result) =>
{
Console.WriteLine(result.Message.Value);
});
using Confluent.Kafka.Extensions.OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
builder.Services.AddOpenTelemetry().WithTracing(traceBuilder =>
{
traceBuilder
.AddInMemoryExporter()
.AddHttpClientInstrumentation()
.AddAspNetCoreInstrumentation()
.AddConfluentKafkaInstrumentation(); // <-- Add Confluent.Kafka OpenTelemetry support
});