Skip to content

Commit

Permalink
perf(ingester): refactor lock acquisitions related to not_owned ser…
Browse files Browse the repository at this point in the history
…ies limit functionality (#15839)
  • Loading branch information
owen-d authored Jan 21, 2025
1 parent c62dc2e commit 2a2b528
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func (i *instance) updateOwnedStreams(isOwnedStream func(*stream) (bool, error))
}()

var err error
i.streams.WithLock(func() {
i.streams.WithRLock(func() {
i.ownedStreamsSvc.resetStreamCounts()
err = i.streams.ForEach(func(s *stream) (bool, error) {
ownedStream, err := isOwnedStream(s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {

ownedStreamSvc := &ownedStreamService{
fixedLimit: atomic.NewInt32(testData.fixedLimit),
ownedStreamCount: testData.ownedStreamCount,
ownedStreamCount: atomic.NewInt64(int64(testData.ownedStreamCount)),
}
strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit}
limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Namespace: constants.Loki,
Name: "ingester_streams_ownership_check_duration_ms",
Help: "Distribution of streams ownership check durations in milliseconds.",
// 100ms to 5s.
Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000},
// 1ms -> 16s
Buckets: prometheus.ExponentialBuckets(1, 4, 8),
}),

duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Expand Down
28 changes: 15 additions & 13 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,26 @@ type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
ownedStreamCount *atomic.Int64
lock sync.RWMutex
notOwnedStreams map[model.Fingerprint]any
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
notOwnedStreams: make(map[model.Fingerprint]any),
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
ownedStreamCount: atomic.NewInt64(0),
notOwnedStreams: make(map[model.Fingerprint]any),
}

svc.updateFixedLimit()
return svc
}

func (s *ownedStreamService) getOwnedStreamCount() int {
s.lock.RLock()
defer s.lock.RUnlock()
return s.ownedStreamCount
return int(s.ownedStreamCount.Load())
}

func (s *ownedStreamService) updateFixedLimit() (old, new int32) {
Expand All @@ -55,12 +54,15 @@ func (s *ownedStreamService) getFixedLimit() int {
}

func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
s.lock.Lock()
defer s.lock.Unlock()
// only need to inc the owned count; can use sync atomics.
if owned {
s.ownedStreamCount++
s.ownedStreamCount.Inc()
return
}

// need to update map; lock required
s.lock.Lock()
defer s.lock.Unlock()
notOwnedStreamsMetric.Inc()
s.notOwnedStreams[fp] = nil
}
Expand All @@ -74,13 +76,13 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
delete(s.notOwnedStreams, fp)
return
}
s.ownedStreamCount--
s.ownedStreamCount.Dec()
}

func (s *ownedStreamService) resetStreamCounts() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount = 0
s.ownedStreamCount.Store(0)
notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams)))
s.notOwnedStreams = make(map[model.Fingerprint]any)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing.
func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) {
tests := map[string]struct {
featureEnabled bool
expectedOwnedStreamCount int
expectedOwnedStreamCount int64
expectedNotOwnedStreamCount int
}{
"expected streams ownership to be recalculated": {
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
mockRing.addMapping(createStream(t, tenant, 100), true)
mockRing.addMapping(createStream(t, tenant, 250), true)

require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, int64(7), tenant.ownedStreamsSvc.ownedStreamCount.Load())
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)

mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}
Expand All @@ -116,7 +116,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
if testData.featureEnabled {
require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
}
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount.Load())
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
})
}
Expand Down

0 comments on commit 2a2b528

Please sign in to comment.