Skip to content

Commit

Permalink
Make E2E service accept a prometheus registerer
Browse files Browse the repository at this point in the history
I was informed about a usecase that uses KMinions  end to end service
as a library. In order to provide more control about the exported
metrics the E2E service now accepts a Prometheus Gatherer that is used to
register all exposed metrics
  • Loading branch information
Martin Schneppenheim committed Nov 15, 2021
1 parent 2b50411 commit d0d9e9d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
19 changes: 10 additions & 9 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/cloudhut/kminion/v2/kafka"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -44,7 +43,7 @@ type Service struct {
}

// NewService creates a new instance of the e2e moinitoring service (wow)
func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string) (*Service, error) {
func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, promRegisterer prometheus.Registerer) (*Service, error) {
minionID := uuid.NewString()
groupID := fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionID)

Expand Down Expand Up @@ -94,29 +93,32 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
svc.messageTracker = newMessageTracker(svc)

makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec {
return promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: metricNamespace,
cv := prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: "end_to_end",
Name: name,
Help: help,
}, labelNames)
promRegisterer.MustRegister(cv)
return cv
}
makeGaugeVec := func(name string, labelNames []string, help string) *prometheus.GaugeVec {
return promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "end_to_end",
Name: name,
Help: help,
}, labelNames)
promRegisterer.MustRegister(gv)
return gv
}
makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec {
return promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricNamespace,
hv := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "end_to_end",
Name: name,
Help: help,
Buckets: createHistogramBuckets(maxLatency),
}, labelNames)
promRegisterer.MustRegister(hv)
return hv
}

// Low-level info
Expand Down Expand Up @@ -178,7 +180,6 @@ func (s *Service) Start(ctx context.Context) error {
case <-initCh:
isInitialized = true
s.logger.Info("consumer has been successfully initialized")
break
case <-ctx.Done():
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func main() {
}
}()

wrappedRegisterer := promclient.WrapRegistererWithPrefix(cfg.Exporter.Namespace+"_", promclient.DefaultRegisterer)

// Create kafka service
kafkaSvc := kafka.NewService(cfg.Kafka, logger)

Expand All @@ -77,7 +79,7 @@ func main() {
cfg.Minion.EndToEnd,
logger,
kafkaSvc,
cfg.Exporter.Namespace,
wrappedRegisterer,
)
if err != nil {
logger.Fatal("failed to create end-to-end monitoring service: %w", zap.Error(err))
Expand Down

0 comments on commit d0d9e9d

Please sign in to comment.