Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test for qps.go #8391

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion server/util/qps/BUILD
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "qps",
srcs = ["qps.go"],
importpath = "github.com/buildbuddy-io/buildbuddy/server/util/qps",
visibility = ["//visibility:public"],
)

go_test(
name = "qps_test",
size = "small",
srcs = ["qps_test.go"],
deps = [
":qps",
"@com_github_stretchr_testify//require",
],
)
19 changes: 16 additions & 3 deletions server/util/qps/qps.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,26 @@ type Counter struct {
nValidBins uint64
window time.Duration
startOnce sync.Once
ticker <-chan time.Time
stop chan struct{}
}

// NewCounter returns a QPS counter using the given duration as the averaging
// window. The caller must call Stop() on the returned counter when it is no
// longer needed.
func NewCounter(window time.Duration) *Counter {
return new(window, nil)
}

func NewCounterForTesting(window time.Duration, ticker <-chan time.Time) *Counter {
return new(window, ticker)
}
Comment on lines 41 to +47
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably more conventional to have only a single constructor and have it accept an instance of clockwork.Clock rather than a ticker channel


func new(window time.Duration, ticker <-chan time.Time) *Counter {
return &Counter{
nValidBins: 1,
window: window,
ticker: ticker,
stop: make(chan struct{}),
}
}
Expand Down Expand Up @@ -68,13 +78,16 @@ func (c *Counter) Get() float64 {
}

func (c *Counter) start() {
t := time.NewTicker(time.Duration(float64(c.window) / float64(len(c.counts))))
defer t.Stop()
if c.ticker == nil {
ticker := time.NewTicker(time.Duration(float64(c.window) / float64(len(c.counts))))
c.ticker = ticker.C
defer ticker.Stop()
}
for {
select {
case <-c.stop:
return
case <-t.C:
case <-c.ticker:
}
// Advance to the next bin, reset its current count, and mark it valid
// if we haven't done so already.
Expand Down
71 changes: 71 additions & 0 deletions server/util/qps/qps_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package qps_test

import (
"testing"
"time"

"github.com/buildbuddy-io/buildbuddy/server/util/qps"
"github.com/stretchr/testify/require"
)

func TestQPS(t *testing.T) {
ticker := make(chan time.Time, 1)
counter := qps.NewCounterForTesting(5*time.Second, ticker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to defer counter.Stop() here


// Test cases where the ring buffer is not full.
require.Equal(t, float64(0), counter.Get())
counter.Inc()
require.Equal(t, float64(12), counter.Get())
for i := 0; i < 59; i++ {
counter.Inc()
}
require.Equal(t, float64(720), counter.Get())
for i := 0; i < 29; i++ {
ticker <- time.Now()
}
// The current bin is incremented asynchronously upon receiving a tick via
// the ticker channel. Give it some time to run.
time.Sleep(10 * time.Millisecond)
Copy link
Member

@bduffany bduffany Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely be flaky. Could do one of these instead

  • Have the QPS counter accept an optional updates channel (chan struct{}) and send a non-blocking message on that channel whenever a tick is processed. Then the test can receive from the channel here instead of waiting for an update.
  • Do whitebox testing - move qps_test to the qps package, and manually call some update(now time.Time) function in the QPS counter rather than driving the counter using a ticker.
  • Do a poll here and wait indefinitely for the condition to become true - probably the simplest option but the downside though is that the test will just reach the test timeout if there's a bug rather than immediately failing - a little annoying but probably fine.

require.Equal(t, float64(24), counter.Get())

// Test cases where the ring buffer is full.
for i := 0; i < 100; i++ {
ticker <- time.Now()
}
time.Sleep(10 * time.Millisecond)
require.Equal(t, float64(0), counter.Get())
for i := 0; i < 61; i++ {
ticker <- time.Now()
time.Sleep(10 * time.Millisecond)
counter.Inc()
}
require.Equal(t, float64(12), counter.Get())
for i := 0; i < 60; i++ {
ticker <- time.Now()
time.Sleep(10 * time.Millisecond)
for j := 0; j < i; j++ {
counter.Inc()
}
}
// Sum(0, n) = n * (n-1) / 2
// So, sum(0, 60) = 60 * 59 / 2 = 1,770
// 1,770 Queries / 5s = 354 QPS.
require.Equal(t, float64(354), counter.Get())
}

func TestRaciness(t *testing.T) {
ticker := make(chan time.Time, 1)
counter := qps.NewCounterForTesting(5*time.Second, ticker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer counter.Stop() here too


for i := 1; i < 1000; i++ {
go func() {
for j := i; j < 1000; j++ {
counter.Inc()
if i%2 == 0 {
counter.Get()
}
}
ticker <- time.Now()
}()
}
}