diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index c6afcacfbdfde..80905bff23505 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -1183,7 +1183,7 @@ func (i *instance) updateOwnedStreams(isOwnedStream func(*stream) (bool, error)) }() var err error - i.streams.WithLock(func() { + i.streams.WithRLock(func() { i.ownedStreamsSvc.resetStreamCounts() err = i.streams.ForEach(func(s *stream) (bool, error) { ownedStream, err := isOwnedStream(s) diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index b611db4d109e1..78e579187a502 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -130,7 +130,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { ownedStreamSvc := &ownedStreamService{ fixedLimit: atomic.NewInt32(testData.fixedLimit), - ownedStreamCount: testData.ownedStreamCount, + ownedStreamCount: atomic.NewInt64(int64(testData.ownedStreamCount)), } strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit} limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits}) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index ff4db43747676..5f144038bc094 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -318,8 +318,8 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Namespace: constants.Loki, Name: "ingester_streams_ownership_check_duration_ms", Help: "Distribution of streams ownership check durations in milliseconds.", - // 100ms to 5s. - Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000}, + // 1ms -> 16s + Buckets: prometheus.ExponentialBuckets(1, 4, 8), }), duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index 3bb729815e718..56c5a77fa768e 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -21,17 +21,18 @@ type ownedStreamService struct { tenantID string limiter *Limiter fixedLimit *atomic.Int32 - ownedStreamCount int + ownedStreamCount *atomic.Int64 lock sync.RWMutex notOwnedStreams map[model.Fingerprint]any } func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { svc := &ownedStreamService{ - tenantID: tenantID, - limiter: limiter, - fixedLimit: atomic.NewInt32(0), - notOwnedStreams: make(map[model.Fingerprint]any), + tenantID: tenantID, + limiter: limiter, + fixedLimit: atomic.NewInt32(0), + ownedStreamCount: atomic.NewInt64(0), + notOwnedStreams: make(map[model.Fingerprint]any), } svc.updateFixedLimit() @@ -39,9 +40,7 @@ func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamServic } func (s *ownedStreamService) getOwnedStreamCount() int { - s.lock.RLock() - defer s.lock.RUnlock() - return s.ownedStreamCount + return int(s.ownedStreamCount.Load()) } func (s *ownedStreamService) updateFixedLimit() (old, new int32) { @@ -55,12 +54,15 @@ func (s *ownedStreamService) getFixedLimit() int { } func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) { - s.lock.Lock() - defer s.lock.Unlock() + // only need to inc the owned count; can use sync atomics. if owned { - s.ownedStreamCount++ + s.ownedStreamCount.Inc() return } + + // need to update map; lock required + s.lock.Lock() + defer s.lock.Unlock() notOwnedStreamsMetric.Inc() s.notOwnedStreams[fp] = nil } @@ -74,13 +76,13 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) { delete(s.notOwnedStreams, fp) return } - s.ownedStreamCount-- + s.ownedStreamCount.Dec() } func (s *ownedStreamService) resetStreamCounts() { s.lock.Lock() defer s.lock.Unlock() - s.ownedStreamCount = 0 + s.ownedStreamCount.Store(0) notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams))) s.notOwnedStreams = make(map[model.Fingerprint]any) } diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index 3e531dcdef66f..f3bea57f69bae 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -37,7 +37,7 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing. func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) { tests := map[string]struct { featureEnabled bool - expectedOwnedStreamCount int + expectedOwnedStreamCount int64 expectedNotOwnedStreamCount int }{ "expected streams ownership to be recalculated": { @@ -101,7 +101,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) mockRing.addMapping(createStream(t, tenant, 100), true) mockRing.addMapping(createStream(t, tenant, 250), true) - require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) + require.Equal(t, int64(7), tenant.ownedStreamsSvc.ownedStreamCount.Load()) require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} @@ -116,7 +116,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) if testData.featureEnabled { require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation") } - require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount) + require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount.Load()) require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) }) }