diff --git a/io/aerospike/xdr/metrics.go b/io/aerospike/xdr/metrics.go index 9d05bf55..90921f8e 100644 --- a/io/aerospike/xdr/metrics.go +++ b/io/aerospike/xdr/metrics.go @@ -15,43 +15,55 @@ package xdr import ( + "context" "log/slog" "sync/atomic" "time" ) -type MetricsCollector struct { - requestCount uint64 +type metricsCollector struct { + ctx context.Context + + requestCount atomic.Uint64 + increment func() lastTime time.Time - logger *slog.Logger + + logger *slog.Logger } -func NewMetricsCollector(logger *slog.Logger) *MetricsCollector { - mc := &MetricsCollector{ - lastTime: time.Now(), - logger: logger, +func mewMetricsCollector(ctx context.Context, logger *slog.Logger) *metricsCollector { + mc := &metricsCollector{ + ctx: ctx, + increment: func() {}, + lastTime: time.Now(), + logger: logger, } - go mc.reportMetrics() - return mc -} + // enable only for logger debug level + if mc.logger.Enabled(mc.ctx, slog.LevelDebug) { + mc.increment = func() { mc.requestCount.Add(1) } + go mc.reportMetrics() + } -func (mc *MetricsCollector) IncrementRequests() { - atomic.AddUint64(&mc.requestCount, 1) + return mc } -func (mc *MetricsCollector) reportMetrics() { +func (mc *metricsCollector) reportMetrics() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - for range ticker.C { - now := time.Now() - count := atomic.SwapUint64(&mc.requestCount, 0) - elapsed := now.Sub(mc.lastTime).Seconds() - rps := float64(count) / elapsed + for { + select { + case t := <-ticker.C: + count := mc.requestCount.Swap(0) + elapsed := t.Sub(mc.lastTime).Seconds() + rps := float64(count) / elapsed - mc.logger.Debug("metrics", slog.Float64("rps", rps)) + mc.logger.Debug("metrics", slog.Float64("rps", rps)) - mc.lastTime = now + mc.lastTime = t + case <-mc.ctx.Done(): + break + } } } diff --git a/io/aerospike/xdr/tcp_server.go b/io/aerospike/xdr/tcp_server.go index 5144e6ba..2c6730ff 100644 --- a/io/aerospike/xdr/tcp_server.go +++ b/io/aerospike/xdr/tcp_server.go @@ -176,7 +176,7 @@ func (s *TCPServer) GetActiveConnections() int32 { // acceptConnections serves connections, not more than maxConnections. // All connections over pool will be rejected. func (s *TCPServer) acceptConnections(ctx context.Context) { - metrics := NewMetricsCollector(s.logger) + metrics := mewMetricsCollector(ctx, s.logger) for { select { @@ -251,7 +251,7 @@ type ConnectionHandler struct { timeNow int64 logger *slog.Logger - metrics *MetricsCollector + metrics *metricsCollector } // NewConnectionHandler returns new connection handler. @@ -263,7 +263,7 @@ func NewConnectionHandler( readTimeout int64, writeTimeout int64, logger *slog.Logger, - metrics *MetricsCollector, + metrics *metricsCollector, ) *ConnectionHandler { return &ConnectionHandler{ conn: conn, @@ -368,7 +368,7 @@ func (h *ConnectionHandler) handleMessages(ctx context.Context) { return } - h.metrics.IncrementRequests() + h.metrics.increment() // Process message asynchronously h.bodyQueue <- message