From 7ed0c41408827cdb2c69b2de4788f266dd07efa2 Mon Sep 17 00:00:00 2001 From: Ben Ye <benye@amazon.com> Date: Thu, 16 Jan 2025 10:27:30 -0800 Subject: [PATCH] update thanos version to 236777732278c64ca01c1c09d726f0f712c87164 (#6514) Signed-off-by: yeya24 <benye@amazon.com> --- go.mod | 2 +- go.sum | 4 +-- pkg/storegateway/bucket_stores.go | 3 +- .../thanos/pkg/discovery/dns/grpc.go | 2 +- .../thanos/pkg/errutil/multierror.go | 9 ++++++ .../thanos-io/thanos/pkg/extkingpin/flags.go | 10 +------ .../pkg/extkingpin/path_content_reloader.go | 19 +++++++++++++ .../thanos-io/thanos/pkg/query/endpointset.go | 28 +++++++------------ .../thanos-io/thanos/pkg/store/bucket.go | 21 ++++++++++++-- .../thanos/pkg/store/lazy_postings.go | 3 +- vendor/modules.txt | 2 +- 11 files changed, 67 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 85adabfc83..be6de1356e 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 github.com/thanos-io/promql-engine v0.0.0-20250110162513-14f995518af3 - github.com/thanos-io/thanos v0.37.3-0.20250110074750-4ba0ba403896 + github.com/thanos-io/thanos v0.37.3-0.20250115144759-236777732278 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index 92dc0762ee..64dfdce3ba 100644 --- a/go.sum +++ b/go.sum @@ -1657,8 +1657,8 @@ github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1Dkn github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= github.com/thanos-io/promql-engine v0.0.0-20250110162513-14f995518af3 h1:feQKBuPhRE/+xd4Ru6Jv48EzVatpXg2mnpl0x0f5OWY= github.com/thanos-io/promql-engine v0.0.0-20250110162513-14f995518af3/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= -github.com/thanos-io/thanos v0.37.3-0.20250110074750-4ba0ba403896 h1:K5YqD5JzNPh7P/XGB2J19cxJlv61K9Mm2/UZ+iPVGMU= -github.com/thanos-io/thanos v0.37.3-0.20250110074750-4ba0ba403896/go.mod h1:VOu1neDpx4n/2OCQmfT/0RMU85UzhO35ce0S3Ew+NSk= +github.com/thanos-io/thanos v0.37.3-0.20250115144759-236777732278 h1:HkZohVruRD0ENAXZIl2qDcpblbMok++jb3zHvjUeQfg= +github.com/thanos-io/thanos v0.37.3-0.20250115144759-236777732278/go.mod h1:DvlfyJhdYeufGbw3z6VQuDpGh2Q46/XvalnmEtQOf/0= github.com/tjhop/slog-gokit v0.1.2 h1:pmQI4SvU9h4gA0vIQsdhJQSqQg4mOmsPykG2/PM3j1I= github.com/tjhop/slog-gokit v0.1.2/go.mod h1:8fhlcp8C8ELbg3GCyKv06tgt4B5sDq2P1r2DQAu1HuM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 778136b3c3..fe69645c57 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -642,7 +642,8 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro }), store.WithLazyExpandedPostings(u.cfg.BucketStore.LazyExpandedPostingsEnabled), store.WithPostingGroupMaxKeySeriesRatio(u.cfg.BucketStore.LazyExpandedPostingGroupMaxKeySeriesRatio), - store.WithDontResort(true), // Cortex doesn't need to resort series in store gateway. + store.WithSeriesMatchRatio(0.5), // TODO: expose this as a config. + store.WithDontResort(true), // Cortex doesn't need to resort series in store gateway. store.WithBlockLifecycleCallback(&shardingBlockLifecycleCallbackAdapter{ userID: userID, strategy: u.shardingStrategy, diff --git a/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go b/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go index 79e832b652..7971e7991c 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go +++ b/vendor/github.com/thanos-io/thanos/pkg/discovery/dns/grpc.go @@ -23,7 +23,7 @@ type builder struct { logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { +func RegisterGRPCResolver(logger log.Logger, provider *Provider, interval time.Duration) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, diff --git a/vendor/github.com/thanos-io/thanos/pkg/errutil/multierror.go b/vendor/github.com/thanos-io/thanos/pkg/errutil/multierror.go index a99b714e27..600a557324 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/errutil/multierror.go +++ b/vendor/github.com/thanos-io/thanos/pkg/errutil/multierror.go @@ -71,6 +71,15 @@ func (es NonNilMultiError) Cause() error { return es.getCause() } +func (es NonNilMultiError) Is(target error) bool { + for _, err := range es { + if errors.Is(err, target) { + return true + } + } + return false +} + func (es NonNilMultiError) getCause() NonNilMultiRootError { var causes []error for _, err := range es { diff --git a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go index 62b9142beb..033769c56e 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go +++ b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/flags.go @@ -47,10 +47,8 @@ func Addrs(flags *kingpin.FlagClause) (target *addressSlice) { return } -// validateAddrs checks an address slice for duplicates and empty or invalid elements. +// validateAddrs checks an address slice for empty or invalid elements. func validateAddrs(addrs addressSlice) error { - set := map[string]struct{}{} - for _, addr := range addrs { if addr == "" { return errors.New("Address is empty.") @@ -61,12 +59,6 @@ func validateAddrs(addrs addressSlice) error { if len(qtypeAndName) != 2 && len(hostAndPort) != 2 { return errors.Errorf("Address %s is not of <host>:<port> format or a valid DNS query.", addr) } - - if _, ok := set[addr]; ok { - return errors.Errorf("Address %s is duplicated.", addr) - } - - set[addr] = struct{}{} } return nil diff --git a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go index e96b0ddb34..b2b84db6c5 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/extkingpin/path_content_reloader.go @@ -21,6 +21,25 @@ type fileContent interface { Path() string } +type NopConfigContent struct{} + +var _ fileContent = (*NopConfigContent)(nil) + +// Content returns no content and no error. +func (n NopConfigContent) Content() ([]byte, error) { + return nil, nil +} + +// Path returns an empty path. +func (n NopConfigContent) Path() string { + return "" +} + +// NewNopConfig creates a no-op config content (no configuration). +func NewNopConfig() NopConfigContent { + return NopConfigContent{} +} + // PathContentReloader runs the reloadFunc when it detects that the contents of fileContent have changed. func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error { filePath, err := filepath.Abs(fileContent.Path()) diff --git a/vendor/github.com/thanos-io/thanos/pkg/query/endpointset.go b/vendor/github.com/thanos-io/thanos/pkg/query/endpointset.go index 4c519bf925..071e04a846 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/query/endpointset.go +++ b/vendor/github.com/thanos-io/thanos/pkg/query/endpointset.go @@ -211,8 +211,7 @@ type EndpointSet struct { // Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer // accessible and we close gRPC client for it, unless it is strict. - endpointSpec func() map[string]*GRPCEndpointSpec - dialOpts []grpc.DialOption + endpointSpecs func() map[string]*GRPCEndpointSpec endpointInfoTimeout time.Duration unhealthyEndpointTimeout time.Duration @@ -235,7 +234,6 @@ func NewEndpointSet( logger log.Logger, reg prometheus.Registerer, endpointSpecs func() []*GRPCEndpointSpec, - dialOpts []grpc.DialOption, unhealthyEndpointTimeout time.Duration, endpointInfoTimeout time.Duration, endpointMetricLabels ...string, @@ -254,19 +252,17 @@ func NewEndpointSet( } return &EndpointSet{ - now: now, - logger: log.With(logger, "component", "endpointset"), - endpointsMetric: endpointsMetric, - - dialOpts: dialOpts, + now: now, + logger: log.With(logger, "component", "endpointset"), + endpointsMetric: endpointsMetric, endpointInfoTimeout: endpointInfoTimeout, unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: func() map[string]*GRPCEndpointSpec { - specs := make(map[string]*GRPCEndpointSpec) + endpointSpecs: func() map[string]*GRPCEndpointSpec { + res := make(map[string]*GRPCEndpointSpec) for _, s := range endpointSpecs() { - specs[s.addr] = s + res[s.addr] = s } - return specs + return res }, endpoints: make(map[string]*endpointRef), } @@ -288,7 +284,7 @@ func (e *EndpointSet) Update(ctx context.Context) { mu sync.Mutex ) - for _, spec := range e.endpointSpec() { + for _, spec := range e.endpointSpecs() { spec := spec if er, existingRef := e.endpoints[spec.Addr()]; existingRef { @@ -571,11 +567,7 @@ type endpointRef struct { // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { - var dialOpts []grpc.DialOption - - dialOpts = append(dialOpts, e.dialOpts...) - dialOpts = append(dialOpts, spec.dialOpts...) - conn, err := grpc.NewClient(spec.Addr(), dialOpts...) + conn, err := grpc.NewClient(spec.Addr(), spec.dialOpts...) if err != nil { return nil, errors.Wrap(err, "dialing connection") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 16e9e8c39d..6e5a656e62 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -442,6 +442,7 @@ type BucketStore struct { enableChunkHashCalculation bool enabledLazyExpandedPostings bool + seriesMatchRatio float64 postingGroupMaxKeySeriesRatio float64 sortingStrategy sortingStrategy @@ -591,6 +592,15 @@ func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) Bu } } +// WithSeriesMatchRatio configures how many series would match when intersecting posting groups. +// This is used for lazy posting optimization strategy. Ratio should be within (0, 1). +// The closer to 1, it means matchers have bad selectivity. +func WithSeriesMatchRatio(seriesMatchRatio float64) BucketStoreOption { + return func(s *BucketStore) { + s.seriesMatchRatio = seriesMatchRatio + } +} + // WithDontResort disables series resorting in Store Gateway. func WithDontResort(true bool) BucketStoreOption { return func(s *BucketStore) { @@ -1065,6 +1075,7 @@ type blockSeriesClient struct { bytesLimiter BytesLimiter lazyExpandedPostingEnabled bool + seriesMatchRatio float64 // Mark posting group as lazy if it adds too many keys. 0 to disable. postingGroupMaxKeySeriesRatio float64 lazyExpandedPostingsCount prometheus.Counter @@ -1111,6 +1122,7 @@ func newBlockSeriesClient( chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, + seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingsCount prometheus.Counter, lazyExpandedPostingByReason *prometheus.CounterVec, @@ -1148,6 +1160,7 @@ func newBlockSeriesClient( chunkFetchDurationSum: chunkFetchDurationSum, lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + seriesMatchRatio: seriesMatchRatio, postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio, lazyExpandedPostingsCount: lazyExpandedPostingsCount, lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason, @@ -1202,7 +1215,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.seriesMatchRatio, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1635,6 +1648,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.chunkFetchDurationSum, extLsetToRemove, s.enabledLazyExpandedPostings, + s.seriesMatchRatio, s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingGroupsByReason, @@ -1951,6 +1965,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, extLsetToRemove, s.enabledLazyExpandedPostings, + s.seriesMatchRatio, s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingGroupsByReason, @@ -2179,6 +2194,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, nil, s.enabledLazyExpandedPostings, + s.seriesMatchRatio, s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingGroupsByReason, @@ -2647,6 +2663,7 @@ func (r *bucketIndexReader) ExpandedPostings( ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, + seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec, @@ -2703,7 +2720,7 @@ func (r *bucketIndexReader) ExpandedPostings( postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, seriesMatchRatio, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go index 57b48cc342..325e92e904 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go @@ -214,6 +214,7 @@ func fetchLazyExpandedPostings( bytesLimiter BytesLimiter, addAllPostings bool, lazyExpandedPostingEnabled bool, + seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec, @@ -237,7 +238,7 @@ func fetchLazyExpandedPostings( r, postingGroups, int64(r.block.estimatedMaxSeriesSize), - 0.5, // TODO(yeya24): Expose this as a flag. + seriesMatchRatio, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, diff --git a/vendor/modules.txt b/vendor/modules.txt index 0ebdd81496..a8d2eac2f3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -988,7 +988,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.37.3-0.20250110074750-4ba0ba403896 +# github.com/thanos-io/thanos v0.37.3-0.20250115144759-236777732278 ## explicit; go 1.23.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block