Skip to content

Commit

Permalink
Lint + fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <[email protected]>
  • Loading branch information
justinjung04 committed Jan 23, 2025
1 parent c167f4a commit b13ae90
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 61 deletions.
8 changes: 4 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
}

// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, label, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
Expand All @@ -1241,8 +1241,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
}

// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, label model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, label, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri

if partialdata.IsPartialDataError(err) {
level.Info(d.log).Log("msg", "returning partial data")
return resp, partialdata.Error{}
return resp, err
}

return resp, nil
Expand Down
14 changes: 5 additions & 9 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (q *distributorQuerier) LabelValues(ctx context.Context, name string, hints

func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
if len(matchers) > 0 && !q.labelNamesMatchers {
return q.labelNamesWithMatchers(ctx, hints, matchers...)
return q.labelNamesWithMatchers(ctx, hints, q.isPartialDataEnabled(ctx), matchers...)
}

log, ctx := spanlogger.New(ctx, "distributorQuerier.LabelNames")
Expand All @@ -227,7 +227,7 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe
}

// labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method
func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
log, ctx := spanlogger.New(ctx, "distributorQuerier.labelNamesWithMatchers")
defer log.Span.Finish()

Expand All @@ -237,9 +237,9 @@ func (q *distributorQuerier) labelNamesWithMatchers(ctx context.Context, hints *
)

if q.streamingMetadata {
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(q.mint), model.Time(q.maxt), labelHintsToSelectHints(hints), false, matchers...)
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(q.mint), model.Time(q.maxt), labelHintsToSelectHints(hints), partialDataEnabled, matchers...)
} else {
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), labelHintsToSelectHints(hints), false, matchers...)
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), labelHintsToSelectHints(hints), partialDataEnabled, matchers...)
}

if err != nil {
Expand Down Expand Up @@ -267,16 +267,12 @@ func (q *distributorQuerier) Close() error {
}

func (q *distributorQuerier) isPartialDataEnabled(ctx context.Context) bool {
if q.limits != nil {
return false
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return false
}

return q.limits.QueryPartialData(userID)
return q.limits != nil && q.limits.QueryPartialData(userID)
}

type distributorExemplarQueryable struct {
Expand Down
21 changes: 8 additions & 13 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]model.Metric{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, queryPartialDataDisabledFn)
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, nil)
querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

Expand Down Expand Up @@ -129,7 +129,7 @@ func TestDistributorQueryableFilter(t *testing.T) {
t.Parallel()

d := &MockDistributor{}
dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, queryPartialDataDisabledFn)
dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, nil)

now := time.Now()

Expand Down Expand Up @@ -174,13 +174,16 @@ func TestIngesterStreaming(t *testing.T) {
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(queryResponse, nil)

ctx := user.InjectOrgID(context.Background(), "0")
partialDataFn := queryPartialDataDisabledFn
if partialDataEnabled {
partialDataFn = queryPartialDataEnabledFn
d = &MockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(queryResponse, partialdata.Error{})
}
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, partialDataFn)

limits := validation.Limits{QueryPartialData: partialDataEnabled}
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, overrides)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -254,11 +257,3 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
}
}
}

func queryPartialDataEnabledFn(string) bool {
return true
}

func queryPartialDataDisabledFn(string) bool {
return false
}
4 changes: 0 additions & 4 deletions pkg/querier/partialdata/partia_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ func (e Error) Error() string {
return ErrorMsg
}

func ReturnPartialData(err error, isEnabled bool) bool {
return isEnabled && errors.As(err, &Error{})
}

func IsPartialDataError(err error) bool {
return errors.As(err, &Error{})
}
6 changes: 2 additions & 4 deletions pkg/querier/partialdata/partial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
)

func TestPartialData_ReturnPartialData(t *testing.T) {
assert.False(t, ReturnPartialData(fmt.Errorf(""), false))
assert.False(t, ReturnPartialData(fmt.Errorf(""), true))
assert.False(t, ReturnPartialData(Error{}, false))
assert.True(t, ReturnPartialData(Error{}, true))
assert.False(t, IsPartialDataError(fmt.Errorf("")))
assert.True(t, IsPartialDataError(Error{}))
}
27 changes: 1 addition & 26 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestLimits(t *testing.T) {
response: &streamResponse,
}

distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, queryPartialDataDisabledFn)
distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil)

tCases := []struct {
name string
Expand Down Expand Up @@ -1702,28 +1702,3 @@ func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool {
m.useQueryableCalled = true
return true
}

type mockChunkStore struct {
chunks []chunk.Chunk
}

func (m mockChunkStore) Get() ([]chunk.Chunk, error) {
return m.chunks, nil
}

func makeMockChunkStore(t require.TestingT, numChunks int, enc promchunk.Encoding) (mockChunkStore, model.Time) {
chks, from := makeMockChunks(t, numChunks, enc, 0)
return mockChunkStore{chks}, from
}

func makeMockChunks(t require.TestingT, numChunks int, enc promchunk.Encoding, from model.Time, additionalLabels ...labels.Label) ([]chunk.Chunk, model.Time) {
var (
chunks = make([]chunk.Chunk, 0, numChunks)
)
for i := 0; i < numChunks; i++ {
c := util.GenerateChunk(t, sampleRate, from, int(samplesPerChunk), enc, additionalLabels...)
chunks = append(chunks, c)
from = from.Add(chunkOffset)
}
return chunks, from
}

0 comments on commit b13ae90

Please sign in to comment.