Skip to content

Commit

Permalink
metric: remove mutex around windowed histogram update
Browse files Browse the repository at this point in the history
Previously, we used an `RWMutex` to serialize access to the windowed
histogram. This was done because there's a rotation that needs to
happen that moves the currently active histogram into the `prev`
variable and creates a fresh one to update in `cur`. This is done to
maintain the window.

This change limits the mutex usage to rotation and window
snapshotting, removing the need to take a read lock during updates
since we can atomically grab the `cur` histogram.

There's a bit of nuance to manage around the fact that `prev` can be
nil on the first iteration, since `cur` is the first histogram. After
a single rotation, `prev` will not be nil again.

The prometheus histogram itself (what's stored in the `atomic.Value`)
is thread-safe.

```
old:  507c68f Merge #140092
new:  ff9dd52 metric: remove mutex around windowed histogram upd
args: benchdiff "./pkg/util/metric" "-b" "-r" "BenchmarkHistogramRecordValue" "-c" "10" "--preview"

name                                     old time/op    new time/op    delta
HistogramRecordValue/insert_zero-10        30.7ns ± 8%    24.3ns ± 2%  -20.94%  (p=0.000 n=9+8)
HistogramRecordValue/insert_integers-10    37.4ns ± 6%    33.8ns ± 1%   -9.83%  (p=0.000 n=8+8)
HistogramRecordValue/random_integers-10    44.9ns ± 9%    41.7ns ± 1%   -7.25%  (p=0.000 n=8+8)

name                                     old alloc/op   new alloc/op   delta
HistogramRecordValue/insert_integers-10     0.00B          0.00B          ~     (all equal)
HistogramRecordValue/insert_zero-10         0.00B          0.00B          ~     (all equal)
HistogramRecordValue/random_integers-10     0.00B          0.00B          ~     (all equal)

name                                     old allocs/op  new allocs/op  delta
HistogramRecordValue/insert_integers-10      0.00           0.00          ~     (all equal)
HistogramRecordValue/insert_zero-10          0.00           0.00          ~     (all equal)
HistogramRecordValue/random_integers-10      0.00           0.00          ~     (all equal)
```

Part of #133306

Release note: None
  • Loading branch information
dhartunian committed Feb 3, 2025
1 parent 507c68f commit e947cf2
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 36 deletions.
50 changes: 20 additions & 30 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,12 @@ func newHistogram(
// intervals within a histogram's total duration.
duration/WindowedHistogramWrapNum,
func() {
h.windowed.prev = h.windowed.cur
h.windowed.cur = prometheus.NewHistogram(opts)
h.windowed.Lock()
defer h.windowed.Unlock()
if h.windowed.cur.Load() != nil {
h.windowed.prev.Store(h.windowed.cur.Load())
}
h.windowed.cur.Store(prometheus.NewHistogram(opts))
})
h.windowed.Ticker.OnTick()
return h
Expand Down Expand Up @@ -383,12 +387,9 @@ type Histogram struct {
// it up right now. It should be doable though, since there is only one
// consumer of windowed histograms - our internal timeseries system.
windowed struct {
// prometheus.Histogram is thread safe, so we only
// need an RLock to record into it. But write lock
// is held while rotating.
syncutil.RWMutex
*tick.Ticker
prev, cur prometheus.HistogramInternal
syncutil.Mutex
prev, cur atomic.Value
}
}

Expand All @@ -415,8 +416,6 @@ type IHistogram interface {
// fix and should be expected to be removed.
// TODO(obs-infra): remove this once pkg/util/aggmetric is merged with this package.
func (h *Histogram) NextTick() time.Time {
h.windowed.RLock()
defer h.windowed.RUnlock()
return h.windowed.NextTick()
}

Expand All @@ -426,27 +425,16 @@ func (h *Histogram) NextTick() time.Time {
// as part of the public API.
// TODO(obs-infra): remove this once pkg/util/aggmetric is merged with this package.
func (h *Histogram) Tick() {
h.windowed.Lock()
defer h.windowed.Unlock()
h.windowed.Tick()
}

// Windowed returns a copy of the current windowed histogram.
func (h *Histogram) Windowed() prometheus.Histogram {
h.windowed.RLock()
defer h.windowed.RUnlock()
return h.windowed.cur
}

// RecordValue adds the given value to the histogram.
func (h *Histogram) RecordValue(n int64) {
v := float64(n)
b := h.cum.FindBucket(v)
h.cum.ObserveInternal(v, b)

h.windowed.RLock()
defer h.windowed.RUnlock()
h.windowed.cur.ObserveInternal(v, b)
h.windowed.cur.Load().(prometheus.HistogramInternal).ObserveInternal(v, b)
}

// GetType returns the prometheus type enum for this metric.
Expand All @@ -470,18 +458,22 @@ func (h *Histogram) CumulativeSnapshot() HistogramSnapshot {
func (h *Histogram) WindowedSnapshot() HistogramSnapshot {
h.windowed.Lock()
defer h.windowed.Unlock()
cur := &prometheusgo.Metric{}
prev := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(cur); err != nil {
cur := h.windowed.cur.Load().(prometheus.Histogram)
// Can't cast here since prev might be nil.
prev := h.windowed.prev.Load()

curMetric := &prometheusgo.Metric{}
if err := cur.Write(curMetric); err != nil {
panic(err)
}
if h.windowed.prev != nil {
if err := h.windowed.prev.Write(prev); err != nil {
if prev != nil {
prevMetric := &prometheusgo.Metric{}
if err := prev.(prometheus.Histogram).Write(prevMetric); err != nil {
panic(err)
}
MergeWindowedHistogram(cur.Histogram, prev.Histogram)
MergeWindowedHistogram(curMetric.Histogram, prevMetric.Histogram)
}
return MakeHistogramSnapshot(cur.Histogram)
return MakeHistogramSnapshot(curMetric.Histogram)
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand All @@ -493,8 +485,6 @@ func (h *Histogram) GetMetadata() Metadata {
// Inspect calls the closure.
func (h *Histogram) Inspect(f func(interface{})) {
func() {
h.windowed.Lock()
defer h.windowed.Unlock()
tick.MaybeTick(&h.windowed)
}()
f(h)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func TestNewHistogramRotate(t *testing.T) {
h.RecordValue(12345)
f := float64(12345) + sum
_, wSum := h.WindowedSnapshot().Total()
require.Equal(t, wSum, f)
require.Equal(t, f, wSum)
}
// Tick. This rotates the histogram.
now = now.Add(time.Duration(i+1) * 10 * time.Second)
Expand Down
12 changes: 7 additions & 5 deletions pkg/util/metric/tick/tick.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tick

import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -55,7 +56,7 @@ func MaybeTick(m Periodic) {
// maintain a windowed version to work around limitations of our internal
// timeseries database.
type Ticker struct {
nextT time.Time
nextT atomic.Value
tickInterval time.Duration

onTick func()
Expand All @@ -65,11 +66,12 @@ var _ Periodic = &Ticker{}

// NewTicker returns a new *Ticker instance.
func NewTicker(nextT time.Time, tickInterval time.Duration, onTick func()) *Ticker {
return &Ticker{
nextT: nextT,
t := &Ticker{
tickInterval: tickInterval,
onTick: onTick,
}
t.nextT.Store(nextT)
return t
}

// OnTick calls the onTick function provided at construction.
Expand All @@ -85,7 +87,7 @@ func (s *Ticker) OnTick() {

// NextTick returns the timestamp of the next scheduled tick for this Ticker.
func (s *Ticker) NextTick() time.Time {
return s.nextT
return s.nextT.Load().(time.Time)
}

// Tick updates the next tick timestamp to the next tickInterval, and invokes
Expand All @@ -94,6 +96,6 @@ func (s *Ticker) NextTick() time.Time {
// NB: Generally, MaybeTick should be used instead to ensure that we don't tick
// before nextT. This function is only used when we want to tick regardless
func (s *Ticker) Tick() {
s.nextT = s.nextT.Add(s.tickInterval)
s.nextT.Store(s.nextT.Load().(time.Time).Add(s.tickInterval))
s.OnTick()
}

0 comments on commit e947cf2

Please sign in to comment.