Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quorum-based Series() query short circuit #137

Open
wants to merge 4 commits into
base: db_main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
false,
s.metrics.emptyPostingCount.WithLabelValues(tenant),
nil,
nil,
)
} else {
resp = newLazyRespSet(
Expand All @@ -1623,6 +1624,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
shardMatcher,
false,
s.metrics.emptyPostingCount.WithLabelValues(tenant),
nil,
)
}

Expand Down
32 changes: 30 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -274,6 +275,25 @@ func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo {
return infos
}

type quorumGroup struct {
name string
counter *atomic.Int32
quorum int32
ctx context.Context
cancel context.CancelCauseFunc
}

func newQuorumGroup(name string, quorum int32, ctx context.Context) *quorumGroup {
ctx, cancel := context.WithCancelCause(ctx)
return &quorumGroup{
name: name,
counter: atomic.NewInt32(0),
quorum: quorum,
ctx: ctx,
cancel: cancel,
}
}

func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
// TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be
// triggered by tracing span to reduce cognitive load.
Expand Down Expand Up @@ -385,11 +405,19 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
}
}
defer logGroupReplicaErrors()
var qg *quorumGroup
storeCtx := ctx
if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA {
// Currently we implement a single quorum group for all store APIs.
// Quorum is one less than the number of stores. This would tolerate one slow store to mitigate long-tail latency.
// TODO: Use a more fine-grained quorum strategy.
qg = newQuorumGroup("all-store-apis", int32(len(stores)-1), ctx)
storeCtx = qg.ctx
}

for _, st := range stores {
st := st

respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
respSet, err := newAsyncRespSet(storeCtx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, qg)
if err != nil {
level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err)
s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc()
Expand Down
15 changes: 15 additions & 0 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func newLazyRespSet(
shardMatcher *storepb.ShardMatcher,
applySharding bool,
emptyStreamResponses prometheus.Counter,
qg *quorumGroup,
) respSet {
bufferedResponses := []*storepb.SeriesResponse{}
bufferedResponsesMtx := &sync.Mutex{}
Expand Down Expand Up @@ -393,6 +394,9 @@ func newLazyRespSet(
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
if qg != nil && qg.counter.Add(1) == qg.quorum {
qg.cancel(errors.Errorf("Quorum %d is reached in Series RPC streaming.", qg.quorum))
}
return false
}

Expand All @@ -403,6 +407,8 @@ func newLazyRespSet(
<-t.C // Drain the channel if it was already stopped.
}
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st)
} else if qg != nil && qg.counter.Load() >= qg.quorum {
rerr = errors.Wrapf(err, "quorum reached in group %s and %s got canceled or failed", qg.name, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}
Expand Down Expand Up @@ -474,6 +480,7 @@ func newAsyncRespSet(
shardInfo *storepb.ShardInfo,
logger log.Logger,
emptyStreamResponses prometheus.Counter,
qg *quorumGroup,
) (respSet, error) {

var (
Expand Down Expand Up @@ -535,6 +542,7 @@ func newAsyncRespSet(
shardMatcher,
applySharding,
emptyStreamResponses,
qg,
), nil
case EagerRetrieval:
span.SetTag("retrival_strategy", EagerRetrieval)
Expand All @@ -549,6 +557,7 @@ func newAsyncRespSet(
applySharding,
emptyStreamResponses,
labelsToRemove,
qg,
), nil
default:
panic(fmt.Sprintf("unsupported retrieval strategy %s", retrievalStrategy))
Expand Down Expand Up @@ -602,6 +611,7 @@ func newEagerRespSet(
applySharding bool,
emptyStreamResponses prometheus.Counter,
removeLabels map[string]struct{},
qg *quorumGroup,
) respSet {
ret := &eagerRespSet{
span: span,
Expand Down Expand Up @@ -655,6 +665,9 @@ func newEagerRespSet(
resp, err := cl.Recv()
if err != nil {
if err == io.EOF {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check if this is response stream is canceled by another stream because the quorum is reached.

if err == context.Canceled {
  log something

if qg != nil && qg.counter.Add(1) == qg.quorum {
qg.cancel(errors.Errorf("Quorum %d is reached in Series RPC streaming.", qg.quorum))
}
return false
}

Expand All @@ -665,6 +678,8 @@ func newEagerRespSet(
<-t.C // Drain the channel if it was already stopped.
}
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName)
} else if qg != nil && qg.counter.Load() >= qg.quorum {
rerr = errors.Wrapf(err, "quorum reached in group %s and %s got canceled or failed", qg.name, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}
Expand Down
202 changes: 202 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,49 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
},
expectedWarningsLen: 2,
},
{
title: "group replica strategy; 1st store is fast, 2nd store is slow;",
storeAPIs: []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
},
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
MinTime: 1,
MaxTime: 300,
GroupKeyStr: "group1",
ReplicaKeyStr: "replica1",
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 10 * time.Second,
},
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
MinTime: 1,
MaxTime: 300,
GroupKeyStr: "group1",
ReplicaKeyStr: "replica1",
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseStrategy: storepb.PartialResponseStrategy_GROUP_REPLICA,
},
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("a", "b"),
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}},
},
},
expectedWarningsLen: 2,
},
} {
if ok := t.Run(tc.title, func(t *testing.T) {
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
Expand Down Expand Up @@ -1632,6 +1675,165 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
time.Sleep(5 * time.Second)
}

func TestProxyStore_SeriesQuorum(t *testing.T) {
t.Parallel()

enable := os.Getenv("THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS")
if enable == "" {
t.Skip("enable THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS to run store-read-timeout tests")
}

for _, tc := range []struct {
title string
storeAPIs []Client
selectorLabels labels.Labels

req *storepb.SeriesRequest

expectedSeries []rawSeries
expectedErr error
expectedWarning string
}{
{
title: "group replica strategy; 1st store is fast, 2nd store is slow and got canceled;",
storeAPIs: []Client{
&storetestutil.TestClient{
Name: "fast_store",
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
},
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
MinTime: 1,
MaxTime: 300,
GroupKeyStr: "group1",
ReplicaKeyStr: "replica1",
},
&storetestutil.TestClient{
Name: "slow_store",
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 2 * time.Second, // greater than frame timeout
},
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
MinTime: 1,
MaxTime: 300,
GroupKeyStr: "group1",
ReplicaKeyStr: "replica1",
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseStrategy: storepb.PartialResponseStrategy_GROUP_REPLICA,
},
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("a", "b"),
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}},
},
},
expectedWarning: "quorum reached in group all-store-apis and slow_store got canceled or failed: context canceled",
},
{
title: "group replica strategy; 1st store is fast, 2nd store is slow on second series and got canceled;",
storeAPIs: []Client{
&storetestutil.TestClient{
Name: "store1",
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{3, 1}, {4, 2}, {5, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{6, 1}, {7, 2}, {8, 3}})},
},
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
MinTime: 1,
MaxTime: 300,
GroupKeyStr: "group1",
ReplicaKeyStr: "replica1",
},
&storetestutil.TestClient{
Name: "store2",
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{3, 1}, {4, 2}, {5, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{6, 1}, {7, 2}, {8, 3}})},
RespDuration: 2 * time.Second, // greater than frame timeout
SlowSeriesIndex: 2,
},
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
MinTime: 1,
MaxTime: 300,
GroupKeyStr: "group1",
ReplicaKeyStr: "replica1",
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseStrategy: storepb.PartialResponseStrategy_GROUP_REPLICA,
},
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("a", "b"),
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{3, 1}, {4, 2}, {5, 3}}, {{6, 1}, {7, 2}, {8, 3}}},
},
},
expectedWarning: "quorum reached in group all-store-apis and store2 got canceled or failed: context canceled",
},
} {
if ok := t.Run(tc.title, func(t *testing.T) {
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
if ok := t.Run(string(strategy), func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
1*time.Second, strategy,
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s := newStoreSeriesServer(ctx)

t0 := time.Now()
err := q.Series(tc.req, s)
elapsedTime := time.Since(t0)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
return
}

testutil.Ok(t, err)

seriesEquals(t, tc.expectedSeries, s.SeriesSet)
testutil.Equals(t, 1, len(s.Warnings), "got %v", s.Warnings)
testutil.Equals(t, tc.expectedWarning, s.Warnings[0])

testutil.Assert(t, elapsedTime < 500*time.Millisecond, fmt.Sprintf("Request has taken %d ms, expected: < %d ms, it seems that quorum-based query short circuit doesn't work properly.", elapsedTime.Milliseconds(), 500))

}); !ok {
return
}
}
}); !ok {
return
}
}

// Wait until the last goroutine exits which is stuck on time.Sleep().
// Otherwise, goleak complains.
time.Sleep(5 * time.Second)
}

func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
t.Parallel()

Expand Down
Loading