diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index b5e94cbf0210..52af25d8912d 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -353,8 +353,12 @@ func newHistogram( // intervals within a histogram's total duration. duration/WindowedHistogramWrapNum, func() { - h.windowed.prev = h.windowed.cur - h.windowed.cur = prometheus.NewHistogram(opts) + h.windowed.Lock() + defer h.windowed.Unlock() + if h.windowed.cur.Load() != nil { + h.windowed.prev.Store(h.windowed.cur.Load()) + } + h.windowed.cur.Store(prometheus.NewHistogram(opts)) }) h.windowed.Ticker.OnTick() return h @@ -383,12 +387,9 @@ type Histogram struct { // it up right now. It should be doable though, since there is only one // consumer of windowed histograms - our internal timeseries system. windowed struct { - // prometheus.Histogram is thread safe, so we only - // need an RLock to record into it. But write lock - // is held while rotating. - syncutil.RWMutex *tick.Ticker - prev, cur prometheus.HistogramInternal + syncutil.Mutex + prev, cur atomic.Value } } @@ -415,8 +416,6 @@ type IHistogram interface { // fix and should be expected to be removed. // TODO(obs-infra): remove this once pkg/util/aggmetric is merged with this package. func (h *Histogram) NextTick() time.Time { - h.windowed.RLock() - defer h.windowed.RUnlock() return h.windowed.NextTick() } @@ -426,27 +425,16 @@ func (h *Histogram) NextTick() time.Time { // as part of the public API. // TODO(obs-infra): remove this once pkg/util/aggmetric is merged with this package. func (h *Histogram) Tick() { - h.windowed.Lock() - defer h.windowed.Unlock() h.windowed.Tick() } -// Windowed returns a copy of the current windowed histogram. -func (h *Histogram) Windowed() prometheus.Histogram { - h.windowed.RLock() - defer h.windowed.RUnlock() - return h.windowed.cur -} - // RecordValue adds the given value to the histogram. func (h *Histogram) RecordValue(n int64) { v := float64(n) b := h.cum.FindBucket(v) h.cum.ObserveInternal(v, b) - h.windowed.RLock() - defer h.windowed.RUnlock() - h.windowed.cur.ObserveInternal(v, b) + h.windowed.cur.Load().(prometheus.HistogramInternal).ObserveInternal(v, b) } // GetType returns the prometheus type enum for this metric. @@ -470,18 +458,22 @@ func (h *Histogram) CumulativeSnapshot() HistogramSnapshot { func (h *Histogram) WindowedSnapshot() HistogramSnapshot { h.windowed.Lock() defer h.windowed.Unlock() - cur := &prometheusgo.Metric{} - prev := &prometheusgo.Metric{} - if err := h.windowed.cur.Write(cur); err != nil { + cur := h.windowed.cur.Load().(prometheus.Histogram) + // Can't cast here since prev might be nil. + prev := h.windowed.prev.Load() + + curMetric := &prometheusgo.Metric{} + if err := cur.Write(curMetric); err != nil { panic(err) } - if h.windowed.prev != nil { - if err := h.windowed.prev.Write(prev); err != nil { + if prev != nil { + prevMetric := &prometheusgo.Metric{} + if err := prev.(prometheus.Histogram).Write(prevMetric); err != nil { panic(err) } - MergeWindowedHistogram(cur.Histogram, prev.Histogram) + MergeWindowedHistogram(curMetric.Histogram, prevMetric.Histogram) } - return MakeHistogramSnapshot(cur.Histogram) + return MakeHistogramSnapshot(curMetric.Histogram) } // GetMetadata returns the metric's metadata including the Prometheus @@ -493,8 +485,6 @@ func (h *Histogram) GetMetadata() Metadata { // Inspect calls the closure. func (h *Histogram) Inspect(f func(interface{})) { func() { - h.windowed.Lock() - defer h.windowed.Unlock() tick.MaybeTick(&h.windowed) }() f(h) diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index 59b5588c3526..e5ba523d3784 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -454,7 +454,7 @@ func TestNewHistogramRotate(t *testing.T) { h.RecordValue(12345) f := float64(12345) + sum _, wSum := h.WindowedSnapshot().Total() - require.Equal(t, wSum, f) + require.Equal(t, f, wSum) } // Tick. This rotates the histogram. now = now.Add(time.Duration(i+1) * 10 * time.Second) diff --git a/pkg/util/metric/tick/tick.go b/pkg/util/metric/tick/tick.go index c29af324ef9a..9538dcae37b5 100644 --- a/pkg/util/metric/tick/tick.go +++ b/pkg/util/metric/tick/tick.go @@ -6,6 +6,7 @@ package tick import ( + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -55,7 +56,7 @@ func MaybeTick(m Periodic) { // maintain a windowed version to work around limitations of our internal // timeseries database. type Ticker struct { - nextT time.Time + nextT atomic.Value tickInterval time.Duration onTick func() @@ -65,11 +66,12 @@ var _ Periodic = &Ticker{} // NewTicker returns a new *Ticker instance. func NewTicker(nextT time.Time, tickInterval time.Duration, onTick func()) *Ticker { - return &Ticker{ - nextT: nextT, + t := &Ticker{ tickInterval: tickInterval, onTick: onTick, } + t.nextT.Store(nextT) + return t } // OnTick calls the onTick function provided at construction. @@ -85,7 +87,7 @@ func (s *Ticker) OnTick() { // NextTick returns the timestamp of the next scheduled tick for this Ticker. func (s *Ticker) NextTick() time.Time { - return s.nextT + return s.nextT.Load().(time.Time) } // Tick updates the next tick timestamp to the next tickInterval, and invokes @@ -94,6 +96,6 @@ func (s *Ticker) NextTick() time.Time { // NB: Generally, MaybeTick should be used instead to ensure that we don't tick // before nextT. This function is only used when we want to tick regardless func (s *Ticker) Tick() { - s.nextT = s.nextT.Add(s.tickInterval) + s.nextT.Store(s.nextT.Load().(time.Time).Add(s.tickInterval)) s.OnTick() }