Skip to content

Commit

Permalink
Receiver: cache matchers for series calls (#7353)
Browse files Browse the repository at this point in the history
* Receiver: cache matchers for series calls

We have tried caching matchers before with a time-based expiration cache, this time we are trying with LRU cache.

We saw some of our receivers busy with compiling regexes and with high CPU usage, similar to the profile of the benchmark I added here:

* Adding matcher cache for method `MatchersToPromMatchers` and a new version which uses the cache.
* The main change is in `matchesExternalLabels` function which now receives a cache instance.

adding matcher cache and refactor matchers

Co-authored-by: Andre Branchizio <[email protected]>

Signed-off-by: Pedro Tanaka <[email protected]>

Using the cache in proxy and tsdb stores (only receiver)

Signed-off-by: Pedro Tanaka <[email protected]>

fixing problem with deep equality

Signed-off-by: Pedro Tanaka <[email protected]>

adding some docs

Signed-off-by: Pedro Tanaka <[email protected]>

Adding benchmark

Signed-off-by: Pedro Tanaka <[email protected]>

undo unecessary changes

Signed-off-by: Pedro Tanaka <[email protected]>

Adjusting metric names

Signed-off-by: Pedro Tanaka <[email protected]>

adding changelog

Signed-off-by: Pedro Tanaka <[email protected]>

wiring changes to the receiver

Signed-off-by: Pedro Tanaka <[email protected]>

Fixing linting

Signed-off-by: Pedro Tanaka <[email protected]>

docs

Signed-off-by: Pedro Tanaka <[email protected]>

* using singleflight to get or set items

Signed-off-by: Pedro Tanaka <[email protected]>

* improve metrics

Signed-off-by: Pedro Tanaka <[email protected]>

* Introduce interface for matchers cache

Signed-off-by: Pedro Tanaka <[email protected]>

* fixing unit test

Signed-off-by: Pedro Tanaka <[email protected]>

* adding changelog

Signed-off-by: Pedro Tanaka <[email protected]>

* fixing benchmark

Signed-off-by: Pedro Tanaka <[email protected]>

* moving matcher cache to storecache package

Signed-off-by: Pedro Tanaka <[email protected]>

* Trying to make the cache more reusable introducing interface

Signed-off-by: Pedro Tanaka <[email protected]>

Fixing problem with wrong initialization

Signed-off-by: Pedro Tanaka <[email protected]>

Moving interface to storecache package

Signed-off-by: Pedro Tanaka <[email protected]>

remove empty file and fix calls to constructor passing nil;

Signed-off-by: Pedro Tanaka <[email protected]>

* Fix false entry on change log

Signed-off-by: Pedro Tanaka <[email protected]>

* Removing default value for registry and rename test file

Signed-off-by: Pedro Tanaka <[email protected]>

* Using fmt.Errf()

Signed-off-by: Pedro Tanaka <[email protected]>

* Remove method that is not on interface anymore

Signed-off-by: Pedro Tanaka <[email protected]>

* Remove duplicate get call

Signed-off-by: Pedro Tanaka <[email protected]>

---------

Signed-off-by: Pedro Tanaka <[email protected]>
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka authored Jan 3, 2025
1 parent ca40906 commit 626d0e5
Show file tree
Hide file tree
Showing 17 changed files with 478 additions and 85 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options
- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls.

### Changed

Expand Down
20 changes: 17 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/wlog"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand All @@ -50,6 +49,7 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
Expand Down Expand Up @@ -225,6 +225,15 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache = storecache.NewNoopMatcherCache()
if conf.matcherCacheSize > 0 {
cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "failed to create matchers cache")
}
multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache))
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand Down Expand Up @@ -345,6 +354,7 @@ func runReceive(

options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
store.WithMatcherCache(cache),
store.WithoutDedup(),
}

Expand Down Expand Up @@ -893,6 +903,8 @@ type receiveConfig struct {

asyncForwardWorkerCount uint

matcherCacheSize int

featureList *[]string

headExpandedPostingsCacheSize uint64
Expand Down Expand Up @@ -1046,6 +1058,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"about order.").
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
Expand Down
2 changes: 2 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 The size of the cache used for matching against
external labels. Using 0 disables caching.
--objstore.config=<content>
Alternative to 'objstore.config-file'
flag (mutually exclusive). Content of
Expand Down
6 changes: 5 additions & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil/custom"
Expand Down Expand Up @@ -54,6 +56,8 @@ func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)
cache, err := storecache.NewMatchersCache()
testutil.Ok(t, err)

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("proxy", func(t *testing.T) {
Expand All @@ -62,7 +66,7 @@ func TestQuerier_Proxy(t *testing.T) {
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return sc.get() },
component.Debug, nil, 5*time.Minute, store.EagerRetrieval),
component.Debug, nil, 5*time.Minute, store.EagerRetrieval, store.WithMatcherCache(cache)),
1000000,
5*time.Minute,
)
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"bytes"
"context"
goerrors "errors"
"fmt"
"io"
"math"
Expand All @@ -24,8 +25,6 @@ import (

"gopkg.in/yaml.v3"

goerrors "errors"

"github.com/alecthomas/units"
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
Expand Down
23 changes: 17 additions & 6 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ import (
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -39,6 +37,7 @@ import (
"github.com/thanos-io/thanos/pkg/receive/expandedpostingscache"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand All @@ -64,6 +63,8 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

matcherCache storecache.MatchersCache

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

Expand Down Expand Up @@ -95,6 +96,12 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption {
}
}

func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption {
return func(s *MultiTSDB) {
s.matcherCache = cache
}
}

// NewMultiTSDB creates new MultiTSDB.
// NOTE: Passed labels must be sorted lexicographically (alphabetically).
func NewMultiTSDB(
Expand Down Expand Up @@ -127,6 +134,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
matcherCache: storecache.NewNoopMatcherCache(),
}

for _, option := range options {
Expand Down Expand Up @@ -755,10 +763,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
options := []store.TSDBStoreOption{}
var options []store.TSDBStoreOption
if t.metricNameFilterEnabled {
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
if t.matcherCache != nil {
options = append(options, store.WithMatcherCacheInstance(t.matcherCache))
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
Expand Down
42 changes: 14 additions & 28 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,14 @@ func TestMultiTSDB(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("run fresh", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
MaxExemplars: 100,
EnableExemplarStorage: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
MaxExemplars: 100,
EnableExemplarStorage: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
Expand Down Expand Up @@ -175,19 +168,12 @@ func TestMultiTSDB(t *testing.T) {

t.Run("flush with one sample produces a block", func(t *testing.T) {
const testTenant = "test_tenant"
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
Expand Down
5 changes: 2 additions & 3 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/receive/writecapnp"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
Expand All @@ -24,6 +22,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/receive/writecapnp"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down
Loading

0 comments on commit 626d0e5

Please sign in to comment.