From 626d0e5bfbbff6f40a01f67ec7063e922096282c Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 3 Jan 2025 18:59:24 +0100 Subject: [PATCH] Receiver: cache matchers for series calls (#7353) * Receiver: cache matchers for series calls We have tried caching matchers before with a time-based expiration cache, this time we are trying with LRU cache. We saw some of our receivers busy with compiling regexes and with high CPU usage, similar to the profile of the benchmark I added here: * Adding matcher cache for method `MatchersToPromMatchers` and a new version which uses the cache. * The main change is in `matchesExternalLabels` function which now receives a cache instance. adding matcher cache and refactor matchers Co-authored-by: Andre Branchizio Signed-off-by: Pedro Tanaka Using the cache in proxy and tsdb stores (only receiver) Signed-off-by: Pedro Tanaka fixing problem with deep equality Signed-off-by: Pedro Tanaka adding some docs Signed-off-by: Pedro Tanaka Adding benchmark Signed-off-by: Pedro Tanaka undo unecessary changes Signed-off-by: Pedro Tanaka Adjusting metric names Signed-off-by: Pedro Tanaka adding changelog Signed-off-by: Pedro Tanaka wiring changes to the receiver Signed-off-by: Pedro Tanaka Fixing linting Signed-off-by: Pedro Tanaka docs Signed-off-by: Pedro Tanaka * using singleflight to get or set items Signed-off-by: Pedro Tanaka * improve metrics Signed-off-by: Pedro Tanaka * Introduce interface for matchers cache Signed-off-by: Pedro Tanaka * fixing unit test Signed-off-by: Pedro Tanaka * adding changelog Signed-off-by: Pedro Tanaka * fixing benchmark Signed-off-by: Pedro Tanaka * moving matcher cache to storecache package Signed-off-by: Pedro Tanaka * Trying to make the cache more reusable introducing interface Signed-off-by: Pedro Tanaka Fixing problem with wrong initialization Signed-off-by: Pedro Tanaka Moving interface to storecache package Signed-off-by: Pedro Tanaka remove empty file and fix calls to constructor passing nil; Signed-off-by: Pedro Tanaka * Fix false entry on change log Signed-off-by: Pedro Tanaka * Removing default value for registry and rename test file Signed-off-by: Pedro Tanaka * Using fmt.Errf() Signed-off-by: Pedro Tanaka * Remove method that is not on interface anymore Signed-off-by: Pedro Tanaka * Remove duplicate get call Signed-off-by: Pedro Tanaka --------- Signed-off-by: Pedro Tanaka Signed-off-by: Pedro Tanaka --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 20 ++- docs/components/receive.md | 2 + pkg/query/query_test.go | 6 +- pkg/receive/handler_test.go | 3 +- pkg/receive/multitsdb.go | 23 ++- pkg/receive/multitsdb_test.go | 42 ++---- pkg/receive/receive_test.go | 5 +- pkg/receive/writer_test.go | 3 +- pkg/store/cache/matchers_cache.go | 186 +++++++++++++++++++++++++ pkg/store/cache/matchers_cache_test.go | 112 +++++++++++++++ pkg/store/local.go | 3 +- pkg/store/prometheus.go | 16 ++- pkg/store/proxy.go | 18 ++- pkg/store/proxy_test.go | 55 +++++++- pkg/store/storepb/custom.go | 47 ++++--- pkg/store/tsdb.go | 21 ++- 17 files changed, 478 insertions(+), 85 deletions(-) create mode 100644 pkg/store/cache/matchers_cache.go create mode 100644 pkg/store/cache/matchers_cache_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cb965c19fd..a509e3c149 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. - [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. - [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options +- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 0372e83690..e3724d65fe 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -27,12 +27,11 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/wlog" - "google.golang.org/grpc" - "gopkg.in/yaml.v2" - "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" objstoretracing "github.com/thanos-io/objstore/tracing/opentracing" + "google.golang.org/grpc" + "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" @@ -50,6 +49,7 @@ import ( grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tls" @@ -225,6 +225,15 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } + var cache = storecache.NewNoopMatcherCache() + if conf.matcherCacheSize > 0 { + cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg)) + if err != nil { + return errors.Wrap(err, "failed to create matchers cache") + } + multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache)) + } + dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -345,6 +354,7 @@ func runReceive( options := []store.ProxyStoreOption{ store.WithProxyStoreDebugLogging(debugLogging), + store.WithMatcherCache(cache), store.WithoutDedup(), } @@ -893,6 +903,8 @@ type receiveConfig struct { asyncForwardWorkerCount uint + matcherCacheSize int + featureList *[]string headExpandedPostingsCacheSize uint64 @@ -1046,6 +1058,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "about order."). Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) + cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) + rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) diff --git a/docs/components/receive.md b/docs/components/receive.md index fc0a64d98b..536b2d2cdb 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -407,6 +407,8 @@ Flags: --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. + --matcher-cache-size=0 The size of the cache used for matching against + external labels. Using 0 disables caching. --objstore.config= Alternative to 'objstore.config-file' flag (mutually exclusive). Content of diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 8d1df8593c..cc19f43ab6 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -14,8 +14,10 @@ import ( "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil/custom" @@ -54,6 +56,8 @@ func TestQuerier_Proxy(t *testing.T) { files, err := filepath.Glob("testdata/promql/**/*.test") testutil.Ok(t, err) testutil.Equals(t, 10, len(files), "%v", files) + cache, err := storecache.NewMatchersCache() + testutil.Ok(t, err) logger := log.NewLogfmtLogger(os.Stderr) t.Run("proxy", func(t *testing.T) { @@ -62,7 +66,7 @@ func TestQuerier_Proxy(t *testing.T) { logger, nil, store.NewProxyStore(logger, nil, func() []store.Client { return sc.get() }, - component.Debug, nil, 5*time.Minute, store.EagerRetrieval), + component.Debug, nil, 5*time.Minute, store.EagerRetrieval, store.WithMatcherCache(cache)), 1000000, 5*time.Minute, ) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index f3a58e128f..5711c67be8 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -6,6 +6,7 @@ package receive import ( "bytes" "context" + goerrors "errors" "fmt" "io" "math" @@ -24,8 +25,6 @@ import ( "gopkg.in/yaml.v3" - goerrors "errors" - "github.com/alecthomas/units" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 2cfa967a2a..96e4e43345 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -18,16 +18,14 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/objstore" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/api/status" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -39,6 +37,7 @@ import ( "github.com/thanos-io/thanos/pkg/receive/expandedpostingscache" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -64,6 +63,8 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig + matcherCache storecache.MatchersCache + tsdbClients []store.Client exemplarClients map[string]*exemplars.TSDB @@ -95,6 +96,12 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption { } } +func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption { + return func(s *MultiTSDB) { + s.matcherCache = cache + } +} + // NewMultiTSDB creates new MultiTSDB. // NOTE: Passed labels must be sorted lexicographically (alphabetically). func NewMultiTSDB( @@ -127,6 +134,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -755,10 +763,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - options := []store.TSDBStoreOption{} + var options []store.TSDBStoreOption if t.metricNameFilterEnabled { options = append(options, store.WithCuckooMetricNameStoreFilter()) } + if t.matcherCache != nil { + options = append(options, store.WithMatcherCacheInstance(t.matcherCache)) + } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a36db4b402..eb1a999c94 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -46,21 +46,14 @@ func TestMultiTSDB(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) t.Run("run fresh", func(t *testing.T) { - m := NewMultiTSDB( - dir, logger, prometheus.NewRegistry(), &tsdb.Options{ - MinBlockDuration: (2 * time.Hour).Milliseconds(), - MaxBlockDuration: (2 * time.Hour).Milliseconds(), - RetentionDuration: (6 * time.Hour).Milliseconds(), - NoLockfile: true, - MaxExemplars: 100, - EnableExemplarStorage: true, - }, - labels.FromStrings("replica", "01"), - "tenant_id", - nil, - false, - metadata.NoneFunc, - ) + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + MaxExemplars: 100, + EnableExemplarStorage: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc) defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) @@ -175,19 +168,12 @@ func TestMultiTSDB(t *testing.T) { t.Run("flush with one sample produces a block", func(t *testing.T) { const testTenant = "test_tenant" - m := NewMultiTSDB( - dir, logger, prometheus.NewRegistry(), &tsdb.Options{ - MinBlockDuration: (2 * time.Hour).Milliseconds(), - MaxBlockDuration: (2 * time.Hour).Milliseconds(), - RetentionDuration: (6 * time.Hour).Milliseconds(), - NoLockfile: true, - }, - labels.FromStrings("replica", "01"), - "tenant_id", - nil, - false, - metadata.NoneFunc, - ) + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc) defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index bf38cb06ed..1ab909fd5f 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -8,13 +8,12 @@ import ( "time" "github.com/go-kit/log" - "github.com/stretchr/testify/require" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" - + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 2db5e6a341..434512c694 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -10,8 +10,6 @@ import ( "testing" "time" - "github.com/thanos-io/thanos/pkg/receive/writecapnp" - "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" @@ -24,6 +22,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/receive/writecapnp" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" diff --git a/pkg/store/cache/matchers_cache.go b/pkg/store/cache/matchers_cache.go new file mode 100644 index 0000000000..fe4fd4c668 --- /dev/null +++ b/pkg/store/cache/matchers_cache.go @@ -0,0 +1,186 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/singleflight" + + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" +) + +const DefaultCacheSize = 200 + +type NewItemFunc func(matcher ConversionLabelMatcher) (*labels.Matcher, error) + +type MatchersCache interface { + // GetOrSet retrieves a matcher from cache or creates and stores it if not present. + // If the matcher is not in cache, it uses the provided newItem function to create it. + GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) +} + +// Ensure implementations satisfy the interface. +var ( + _ MatchersCache = (*LruMatchersCache)(nil) + _ MatchersCache = (*NoopMatcherCache)(nil) +) + +// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything. +type NoopMatcherCache struct{} + +// NewNoopMatcherCache creates a new no-op matcher cache. +func NewNoopMatcherCache() MatchersCache { + return &NoopMatcherCache{} +} + +// GetOrSet implements MatchersCache by always creating a new matcher without caching. +func (n *NoopMatcherCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + return newItem(key) +} + +// LruMatchersCache implements MatchersCache with an LRU cache and metrics. +type LruMatchersCache struct { + reg prometheus.Registerer + cache *lru.Cache[ConversionLabelMatcher, *labels.Matcher] + metrics *matcherCacheMetrics + size int + sf singleflight.Group +} + +type MatcherCacheOption func(*LruMatchersCache) + +func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.reg = reg + } +} + +func WithSize(size int) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.size = size + } +} + +func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { + cache := &LruMatchersCache{ + size: DefaultCacheSize, + } + + for _, opt := range opts { + opt(cache) + } + cache.metrics = newMatcherCacheMetrics(cache.reg) + + lruCache, err := lru.NewWithEvict[ConversionLabelMatcher, *labels.Matcher](cache.size, cache.onEvict) + if err != nil { + return nil, err + } + cache.cache = lruCache + + return cache, nil +} + +func (c *LruMatchersCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + c.metrics.requestsTotal.Inc() + + v, err, _ := c.sf.Do(key.String(), func() (interface{}, error) { + if item, ok := c.cache.Get(key); ok { + c.metrics.hitsTotal.Inc() + return item, nil + } + + item, err := newItem(key) + if err != nil { + return nil, err + } + c.cache.Add(key, item) + c.metrics.numItems.Set(float64(c.cache.Len())) + return item, nil + }) + + if err != nil { + return nil, err + } + return v.(*labels.Matcher), nil +} + +func (c *LruMatchersCache) onEvict(_ ConversionLabelMatcher, _ *labels.Matcher) { + c.metrics.evicted.Inc() + c.metrics.numItems.Set(float64(c.cache.Len())) +} + +type matcherCacheMetrics struct { + requestsTotal prometheus.Counter + hitsTotal prometheus.Counter + numItems prometheus.Gauge + maxItems prometheus.Gauge + evicted prometheus.Counter +} + +func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { + return &matcherCacheMetrics{ + requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_requests_total", + Help: "Total number of cache requests for series matchers", + }), + hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_hits_total", + Help: "Total number of cache hits for series matchers", + }), + numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_items", + Help: "Total number of cached items", + }), + maxItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_max_items", + Help: "Maximum number of items that can be cached", + }), + evicted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_evicted_total", + Help: "Total number of items evicted from the cache", + }), + } +} + +// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. +// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. +// NOTE: It allocates memory. +func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for i := range ms { + pm, err := cache.GetOrSet(&ms[i], MatcherToPromMatcher) + if err != nil { + return nil, err + } + res = append(res, pm) + } + return res, nil +} + +func MatcherToPromMatcher(m ConversionLabelMatcher) (*labels.Matcher, error) { + mi, ok := m.(*storepb.LabelMatcher) + if !ok { + return nil, fmt.Errorf("invalid matcher type. Got: %T", m) + } + + return storepb.MatcherToPromMatcher(*mi) +} + +// ConversionLabelMatcher is a common interface for the Prometheus and Thanos label matchers. +type ConversionLabelMatcher interface { + String() string + GetName() string + GetValue() string +} + +var ( + _ ConversionLabelMatcher = (*storepb.LabelMatcher)(nil) + _ ConversionLabelMatcher = (*prompb.LabelMatcher)(nil) +) diff --git a/pkg/store/cache/matchers_cache_test.go b/pkg/store/cache/matchers_cache_test.go new file mode 100644 index 0000000000..957c10d9ff --- /dev/null +++ b/pkg/store/cache/matchers_cache_test.go @@ -0,0 +1,112 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache_test + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" + + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func TestMatchersCache(t *testing.T) { + cache, err := storecache.NewMatchersCache(storecache.WithSize(2)) + testutil.Ok(t, err) + + matcher := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key", + Value: "val", + } + + matcher2 := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_RE, + Name: "key2", + Value: "val2|val3", + } + + matcher3 := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key3", + Value: "val3", + } + + var cacheHit bool + newItem := func(matcher storecache.ConversionLabelMatcher) (*labels.Matcher, error) { + cacheHit = false + return storecache.MatcherToPromMatcher(matcher) + } + expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val") + expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3") + expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3") + + item, err := cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher3, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected3, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) +} + +func BenchmarkMatchersCache(b *testing.B) { + cache, err := storecache.NewMatchersCache(storecache.WithSize(100)) + if err != nil { + b.Fatalf("failed to create cache: %v", err) + } + + matchers := []*storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "key1", Value: "val1"}, + {Type: storepb.LabelMatcher_EQ, Name: "key2", Value: "val2"}, + {Type: storepb.LabelMatcher_EQ, Name: "key3", Value: "val3"}, + {Type: storepb.LabelMatcher_EQ, Name: "key4", Value: "val4"}, + {Type: storepb.LabelMatcher_RE, Name: "key5", Value: "^(val5|val6|val7|val8|val9).*$"}, + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + matcher := matchers[i%len(matchers)] + _, err := cache.GetOrSet(matcher, storecache.MatcherToPromMatcher) + if err != nil { + b.Fatalf("failed to get or set cache item: %v", err) + } + } +} diff --git a/pkg/store/local.go b/pkg/store/local.go index cb80f8f8cb..5d72ee28af 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -130,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 11d7f1ff77..a503d15689 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -36,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -125,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -488,8 +489,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) { - tms, err := storepb.MatchersToPromMatchers(ms...) +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storecache.MatchersCache) (bool, []*labels.Matcher, error) { + var ( + tms []*labels.Matcher + err error + ) + + tms, err = storecache.MatchersToPromMatchersCached(cache, ms...) if err != nil { return false, nil, err } @@ -537,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -600,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 498c80e2e7..af1ba9dae1 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -89,6 +90,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector + matcherCache storecache.MatchersCache enableDedup bool } @@ -113,7 +115,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* } } -// BucketStoreOption are functions that configure BucketStore. +// ProxyStoreOption are functions that configure the ProxyStore. type ProxyStoreOption func(s *ProxyStore) // WithProxyStoreDebugLogging toggles debug logging. @@ -137,6 +139,13 @@ func WithoutDedup() ProxyStoreOption { } } +// WithMatcherCache sets the matcher cache instance for the proxy. +func WithMatcherCache(cache storecache.MatchersCache) ProxyStoreOption { + return func(s *ProxyStore) { + s.matcherCache = cache + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( @@ -168,6 +177,7 @@ func NewProxyStore( retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, enableDedup: true, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -248,7 +258,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -353,7 +363,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La if s.debugLogging { reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -456,7 +466,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty") } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1389e89b1d..a9db1d11a0 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,22 +6,21 @@ package store import ( "context" "fmt" - "strings" - - "github.com/pkg/errors" - "math" "math/rand" "os" "path/filepath" + "strings" "sync" "testing" "time" "github.com/cespare/xxhash/v2" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -29,11 +28,10 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "google.golang.org/grpc" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" @@ -2086,6 +2084,47 @@ func BenchmarkProxySeries(b *testing.B) { }) } +func BenchmarkProxySeriesRegex(b *testing.B) { + tb := testutil.NewTB(b) + + cache, err := storecache.NewMatchersCache(storecache.WithSize(200)) + testutil.Ok(b, err) + + q := NewProxyStore(nil, + nil, + func() []Client { return nil }, + component.Query, + labels.EmptyLabels(), 0*time.Second, EagerRetrieval, + WithMatcherCache(cache), + ) + + words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"} + bigRegex := strings.Builder{} + for i := 0; i < 200; i++ { + bigRegex.WriteString(words[rand.Intn(len(words))]) + bigRegex.WriteString("|") + } + + matchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "foo", Value: ".*"}, + {Type: storepb.LabelMatcher_RE, Name: "bar", Value: bigRegex.String()}, + } + + // Create a regex that matches all series. + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: matchers, + } + s := newStoreSeriesServer(context.Background()) + + tb.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + testutil.Ok(b, q.Series(req, s)) + } +} + func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { tmpDir := t.TempDir() @@ -2136,6 +2175,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { responseTimeout: 5 * time.Second, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, + matcherCache: storecache.NewNoopMatcherCache(), } var allResps []*storepb.SeriesResponse @@ -2272,6 +2312,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, + matcherCache: storecache.NewNoopMatcherCache(), } ctx, cancel := context.WithCancel(context.Background()) @@ -2309,6 +2350,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, + matcherCache: storecache.NewNoopMatcherCache(), } ctx := context.Background() @@ -2469,5 +2511,4 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { tcase.testFn(tcase.responses, h) }) } - } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index d5461a5947..b165d76fcc 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -385,30 +385,35 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) { // NOTE: It allocates memory. func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) - for _, m := range ms { - var t labels.MatchType - - switch m.Type { - case LabelMatcher_EQ: - t = labels.MatchEqual - case LabelMatcher_NEQ: - t = labels.MatchNotEqual - case LabelMatcher_RE: - t = labels.MatchRegexp - case LabelMatcher_NRE: - t = labels.MatchNotRegexp - default: - return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) - } - m, err := labels.NewMatcher(t, m.Name, m.Value) + for i := range ms { + pm, err := MatcherToPromMatcher(ms[i]) if err != nil { return nil, err } - res = append(res, m) + res = append(res, pm) } return res, nil } +// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher. +func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { + var t labels.MatchType + + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + return labels.NewMatcher(t, m.Name, m.Value) +} + // MatchersToString converts label matchers to string format. // String should be parsable as a valid PromQL query metric selector. func MatchersToString(ms ...LabelMatcher) string { @@ -439,6 +444,14 @@ func (m *LabelMatcher) PromString() string { return fmt.Sprintf("%s%s%q", m.Name, m.Type.PromString(), m.Value) } +func (m *LabelMatcher) GetName() string { + return m.Name +} + +func (m *LabelMatcher) GetValue() string { + return m.Value +} + func (x LabelMatcher_Type) PromString() string { typeToStr := map[LabelMatcher_Type]string{ LabelMatcher_EQ: "=", diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 737fee3bbd..a62481a53f 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -53,6 +54,12 @@ func WithCuckooMetricNameStoreFilter() TSDBStoreOption { } } +func WithMatcherCacheInstance(cache storecache.MatchersCache) TSDBStoreOption { + return func(s *TSDBStore) { + s.matcherCache = cache + } +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. @@ -62,6 +69,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int + matcherCache storecache.MatchersCache extLset labels.Labels startStoreFilterUpdate bool @@ -112,6 +120,7 @@ func NewTSDBStore( b := make([]byte, 0, initialBufSize) return &b }}, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -177,13 +186,13 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { return labelSets } -func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo { - labels := p.LabelSet() +func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { + labels := s.LabelSet() if len(labels) == 0 { return []infopb.TSDBInfo{} } - mint, maxt := p.TimeRange() + mint, maxt := s.TimeRange() return []infopb.TSDBInfo{ { Labels: labelpb.ZLabelSet{ @@ -247,7 +256,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser srv = fs } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -370,7 +379,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -432,7 +441,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque } } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) }