diff --git a/openmeter/watermill/driver/kafka/broker.go b/openmeter/watermill/driver/kafka/broker.go index 0c017622e..6e2536fab 100644 --- a/openmeter/watermill/driver/kafka/broker.go +++ b/openmeter/watermill/driver/kafka/broker.go @@ -8,10 +8,10 @@ import ( "time" "github.com/IBM/sarama" - gometrics "github.com/rcrowley/go-metrics" otelmetric "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka/metrics" ) const ( @@ -91,19 +91,16 @@ func (o *BrokerOptions) createKafkaConfig(role string) (*sarama.Config, error) { config.Producer.Retry.Max = 10 config.Producer.Return.Successes = true - //meterRegistry, err := metrics.NewRegistry(metrics.NewRegistryOptions{ - // MetricMeter: o.MetricMeter, - // NameTransformFn: SaramaMetricRenamer(role), - // ErrorHandler: metrics.LoggingErrorHandler(o.Logger), - //}) - //if err != nil { - // return nil, err - //} - // - //config.MetricRegistry = meterRegistry - - // FIXME(chrisgacsal): disable metric collection to test possibler mem/goroutine leak - gometrics.UseNilMetrics = true + meterRegistry, err := metrics.NewRegistry(metrics.NewRegistryOptions{ + MetricMeter: o.MetricMeter, + NameTransformFn: SaramaMetricRenamer(role), + ErrorHandler: metrics.LoggingErrorHandler(o.Logger), + }) + if err != nil { + return nil, err + } + + config.MetricRegistry = meterRegistry return config, nil }