From 529954c505d91051aaa9578a812e655f64b9e160 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 7 Nov 2024 14:46:34 +0100 Subject: [PATCH] query, rule: make endpoint discovery dynamically reloadable * Removed previously deprecated and hidden flags to configure endpoints ( --rule, --target, ...) * Removed --store.sd-file and --store.sd-interval flags * Added new flags --endpoint.sd-config, --endpoint-sd-config-reload-interval to configure a dynamic SD file * Moved endpoint set construction into cmd/thanos/endpointset.go for a little cleanup The new config makes it possible to also set "strict" and "group" flags on the endpoint instead of only their addresses, making it possible to have file based service discovery for endpoint groups too. Signed-off-by: Michael Hoffmann --- cmd/thanos/config.go | 38 +++ cmd/thanos/endpointset.go | 289 +++++++++++++++++++++ cmd/thanos/query.go | 454 +++++---------------------------- cmd/thanos/rule.go | 43 +--- docs/components/query.md | 48 +--- pkg/discovery/dns/grpc.go | 2 +- pkg/extkingpin/flags.go | 10 +- pkg/query/endpointset.go | 28 +- pkg/query/endpointset_test.go | 30 +-- test/e2e/e2ethanos/services.go | 29 ++- 10 files changed, 449 insertions(+), 522 deletions(-) create mode 100644 cmd/thanos/endpointset.go diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index f72d19fd79..f16f885737 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -14,11 +14,17 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "google.golang.org/grpc" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" ) @@ -58,6 +64,38 @@ func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig { return gc } +type grpcClientConfig struct { + secure bool + skipVerify bool + cert, key, caCert string + serverName string + compression string +} + +func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig { + cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").BoolVar(&gc.secure) + cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").BoolVar(&gc.skipVerify) + cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").StringVar(&gc.cert) + cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").StringVar(&gc.key) + cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").StringVar(&gc.caCert) + cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName) + compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") + cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone) + + return gc +} + +func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) { + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName) + if err != nil { + return nil, errors.Wrapf(err, "building gRPC client") + } + if gc.compression != compressionNone { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression))) + } + return dialOpts, nil +} + type httpConfig struct { bindAddress string tlsConfig string diff --git a/cmd/thanos/endpointset.go b/cmd/thanos/endpointset.go new file mode 100644 index 0000000000..d8610e0d2e --- /dev/null +++ b/cmd/thanos/endpointset.go @@ -0,0 +1,289 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package main + +import ( + "context" + "fmt" + "sync" + "time" + + extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "gopkg.in/yaml.v3" + + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/runutil" +) + +type EndpointSpec struct { + Strict bool `yaml:"strict"` + Group bool `yaml:"group"` + Address string `yaml:"address"` +} + +type EndpointConfig struct { + Endpoints []EndpointSpec `yaml:"endpoints"` +} + +type endpointConfigProvider struct { + mu sync.Mutex + cfg EndpointConfig + + // statically defined endpoints from flags for backwards compatibility + endpoints []string + endpointGroups []string + strictEndpoints []string + strictEndpointGroups []string +} + +func (er *endpointConfigProvider) config() EndpointConfig { + er.mu.Lock() + defer er.mu.Unlock() + + res := EndpointConfig{Endpoints: make([]EndpointSpec, len(er.cfg.Endpoints))} + copy(res.Endpoints, er.cfg.Endpoints) + return res +} + +func (er *endpointConfigProvider) parse(configFile *extflag.PathOrContent) (EndpointConfig, error) { + content, err := configFile.Content() + if err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path()) + } + var cfg EndpointConfig + if err := yaml.Unmarshal(content, &cfg); err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path()) + } + return cfg, nil +} + +func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) { + for _, e := range er.endpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + }) + } + for _, e := range er.endpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + }) + } + for _, e := range er.strictEndpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Strict: true, + }) + } + for _, e := range er.strictEndpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + Strict: true, + }) + } +} + +func validateEndpointConfig(cfg EndpointConfig) error { + for _, ecfg := range cfg.Endpoints { + if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address) + } + if dns.IsDynamicNode(ecfg.Address) && ecfg.Group { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address) + } + } + return nil +} + +func newEndpointConfigProvider( + g *run.Group, + logger log.Logger, + configFile *extflag.PathOrContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, +) (*endpointConfigProvider, error) { + res := &endpointConfigProvider{ + endpoints: endpoints, + endpointGroups: endpointGroups, + strictEndpoints: strictEndpoints, + strictEndpointGroups: strictEndpointGroups, + } + + cfg, err := res.parse(configFile) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config file") + } + res.addStaticEndpoints(&cfg) + res.cfg = cfg + + // only static endpoints + if len(configFile.Path()) == 0 { + return res, nil + } + + if err := extkingpin.PathContentReloader(context.Background(), configFile, logger, func() { + res.mu.Lock() + defer res.mu.Unlock() + + level.Info(logger).Log("msg", "reloading endpoint config") + cfg, err := res.parse(configFile) + if err != nil { + level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err) + return + } + res.addStaticEndpoints(&cfg) + if err := validateEndpointConfig(cfg); err != nil { + level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err) + return + } + res.cfg = cfg + }, configReloadInterval); err != nil { + return nil, errors.Wrapf(err, "unable to create config reloader") + } + return res, nil +} + +func setupEndpointset( + g *run.Group, + reg prometheus.Registerer, + logger log.Logger, + configFile *extflag.PathOrContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, + dnsSDResolver string, + dnsSDInterval time.Duration, + unhealthyTimeout time.Duration, + endpointTimeout time.Duration, + dialOpts []grpc.DialOption, + queryConnMetricLabels ...string, +) (*query.EndpointSet, error) { + configProvider, err := newEndpointConfigProvider( + g, + logger, + configFile, + configReloadInterval, + endpoints, + endpointGroups, + strictEndpoints, + strictEndpointGroups, + ) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config initially") + } + // Register resolver for the "thanos:///" scheme for endpoint-groups + dns.RegisterGRPCResolver( + logger, + dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), + dns.ResolverType(dnsSDResolver), + ), + dnsSDInterval, + ) + dnsEndpointProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), + dns.ResolverType(dnsSDResolver), + ) + duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_query_duplicated_store_addresses_total", + Help: "The number of times a duplicated store addresses is detected from the different configs in query", + }) + + removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { + set := make(map[string]*query.GRPCEndpointSpec) + for _, spec := range specs { + addr := spec.Addr() + if _, ok := set[addr]; ok { + level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) + duplicatedStores.Inc() + } + set[addr] = spec + } + deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) + for _, value := range set { + deduplicated = append(deduplicated, value) + } + return deduplicated + } + + ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error { + ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval) + defer cancelUpdateIter() + + endpointConfig := configProvider.config() + + addresses := make([]string, 0, len(endpointConfig.Endpoints)) + for _, ecfg := range endpointConfig.Endpoints { + if addr := ecfg.Address; dns.IsDynamicNode(addr) { + addresses = append(addresses, addr) + } + } + if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err) + } + return nil + }) + }, func(error) { + cancelDNSUpdate() + }) + + endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec { + endpointConfig := configProvider.config() + + specs := make([]*query.GRPCEndpointSpec, 0) + // add static nodes + for _, ecfg := range endpointConfig.Endpoints { + strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address + if dns.IsDynamicNode(addr) { + continue + } + if group { + specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...)) + } else { + specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...)) + } + } + // add dynamic nodes + for _, addr := range dnsEndpointProvider.Addresses() { + specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...)) + } + return removeDuplicateEndpointSpecs(specs) + }, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...) + + ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(endpointTimeout, ctxEndpointUpdate.Done(), func() error { + ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, endpointTimeout) + defer cancelIter() + + endpointset.Update(ctxIter) + return nil + }) + }, func(error) { + cancelEndpointUpdate() + }) + + return endpointset, nil +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 69ffb8ea32..b48af3596c 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -4,7 +4,6 @@ package main import ( - "context" "fmt" "math" "net/http" @@ -12,7 +11,6 @@ import ( "time" extflag "github.com/efficientgo/tools/extkingpin" - "google.golang.org/grpc" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -21,11 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/api" @@ -35,11 +29,8 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/exemplars" - "github.com/thanos-io/thanos/pkg/extgrpc" - "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -51,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" - "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" @@ -86,14 +76,8 @@ func registerQuery(app *extkingpin.App) { var grpcServerConfig grpcConfig grpcServerConfig.registerFlag(cmd) - secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() - skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() - cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() - key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String() - caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String() - serverName := cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() - compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") - grpcCompression := cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).Enum(snappy.Name, compressionNone) + var grpcClientConfig grpcClientConfig + grpcClientConfig.registerFlag(cmd) webRoutePrefix := cmd.Flag("web.route-prefix", "Prefix for API and UI endpoints. This allows thanos UI to be served on a sub-path. Defaults to the value of --web.external-prefix. This option is analogous to --web.route-prefix of Prometheus.").Default("").String() webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() @@ -134,55 +118,6 @@ func registerQuery(app *extkingpin.App) { selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("=\"\"").Strings() - endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups."). - PlaceHolder("")) - - endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components."). - PlaceHolder("")) - - stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). - PlaceHolder("")) - - // TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - ruleEndpoints := extkingpin.Addrs(cmd.Flag("rule", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - metadataEndpoints := extkingpin.Addrs(cmd.Flag("metadata", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - exemplarEndpoints := extkingpin.Addrs(cmd.Flag("exemplar", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - // TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - targetEndpoints := extkingpin.Addrs(cmd.Flag("target", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - strictStores := cmd.Flag("store-strict", "Deprecation Warning - This flag is deprecated and replaced with `endpoint-strict`. Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("").Strings() - - strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("").Strings() - - strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails."). - PlaceHolder("")) - - fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). - PlaceHolder("").Strings() - - fileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "Refresh interval to re-read file SD files. It is used as a resync fallback."). - Default("5m")) - - // TODO(bwplotka): Grab this from TTL at some point. - dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). - Default("30s")) - - dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). - Default(string(dns.MiekgdnsResolverType)).Hidden().String() - - unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) - - endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) - enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified."). Default("false").Bool() @@ -233,6 +168,29 @@ func registerQuery(app *extkingpin.App) { tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool() tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String() + // TODO(bwplotka): Grab this from TTL at some point. + dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). + Default("30s")) + + dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). + Default(string(dns.MiekgdnsResolverType)).Hidden().String() + + unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) + + endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) + + endpointSetConfig := extflag.RegisterPathOrContent(cmd, "endpoint.sd-config", "Config File with endpoint definitions") + + endpointSetConfigReloadInterval := extkingpin.ModelDuration(cmd.Flag("endpoint.sd-config-reload-interval", "Interval between endpoint config refreshes").Default("1m")) + + endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "(Deprecated): Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").Hidden().PlaceHolder("")) + + endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").Hidden().PlaceHolder("")) + + strictEndpoints := extkingpin.Addrs(cmd.Flag("endpoint-strict", "(Deprecated): Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). + Hidden().PlaceHolder("")) + + strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").Hidden().PlaceHolder("")) var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -266,18 +224,6 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } - var fileSD *file.Discovery - if len(*fileSDFiles) > 0 { - conf := &file.SDConfig{ - Files: *fileSDFiles, - RefreshInterval: *fileSDInterval, - } - var err error - if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil { - return err - } - } - if *webRoutePrefix == "" { *webRoutePrefix = *webExternalPrefix } @@ -295,23 +241,43 @@ func registerQuery(app *extkingpin.App) { return err } + dialOpts, err := grpcClientConfig.dialOptions(logger, reg, tracer) + if err != nil { + return err + } + + endpointSet, err := setupEndpointset( + g, + reg, + logger, + endpointSetConfig, + time.Duration(*endpointSetConfigReloadInterval), + *endpoints, + *endpointGroups, + *strictEndpoints, + *strictEndpointGroups, + *dnsSDResolver, + time.Duration(*dnsSDInterval), + time.Duration(*unhealthyStoreTimeout), + time.Duration(*endpointInfoTimeout), + dialOpts, + *queryConnMetricLabels..., + ) + if err != nil { + return err + } + return runQuery( g, logger, debugLogging, + endpointSet, reg, tracer, httpLogOpts, grpcLogOpts, logFilterMethods, grpcServerConfig, - *grpcCompression, - *secure, - *skipVerify, - *cert, - *key, - *caCert, - *serverName, *httpBindAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), @@ -326,18 +292,10 @@ func registerQuery(app *extkingpin.App) { *dynamicLookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), - *queryConnMetricLabels, *queryReplicaLabels, *queryPartitionLabels, selectorLset, getFlagsMap(cmd.Flags()), - *endpoints, - *endpointGroups, - *stores, - *ruleEndpoints, - *targetEndpoints, - *metadataEndpoints, - *exemplarEndpoints, *enableAutodownsampling, *enableQueryPartialResponse, *enableRulePartialResponse, @@ -345,16 +303,8 @@ func registerQuery(app *extkingpin.App) { *enableMetricMetadataPartialResponse, *enableExemplarPartialResponse, *activeQueryDir, - fileSD, - time.Duration(*dnsSDInterval), - *dnsSDResolver, - time.Duration(*unhealthyStoreTimeout), - time.Duration(*endpointInfoTimeout), time.Duration(*instantDefaultMaxSourceResolution), *defaultMetadataTimeRange, - *strictStores, - *strictEndpoints, - *strictEndpointGroups, *webDisableCORS, *alertQueryURL, *grpcProxyStrategy, @@ -381,19 +331,13 @@ func runQuery( g *run.Group, logger log.Logger, debugLogging bool, + endpointSet *query.EndpointSet, reg *prometheus.Registry, tracer opentracing.Tracer, httpLogOpts []logging.Option, grpcLogOpts []grpc_logging.Option, logFilterMethods []string, grpcServerConfig grpcConfig, - grpcCompression string, - secure bool, - skipVerify bool, - cert string, - key string, - caCert string, - serverName string, httpBindAddr string, httpTLSConfig string, httpGracePeriod time.Duration, @@ -408,18 +352,10 @@ func runQuery( dynamicLookbackDelta bool, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, - queryConnMetricLabels []string, queryReplicaLabels []string, queryPartitionLabels []string, selectorLset labels.Labels, flagsMap map[string]string, - endpointAddrs []string, - endpointGroupAddrs []string, - storeAddrs []string, - ruleAddrs []string, - targetAddrs []string, - metadataAddrs []string, - exemplarAddrs []string, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, @@ -427,16 +363,8 @@ func runQuery( enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, activeQueryDir string, - fileSD *file.Discovery, - dnsSDInterval time.Duration, - dnsSDResolver string, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, defaultMetadataTimeRange time.Duration, - strictStores []string, - strictEndpoints []string, - strictEndpointGroups []string, disableCORS bool, alertQueryURL string, grpcProxyStrategy string, @@ -462,79 +390,6 @@ func runQuery( } // NOTE(GiedriusS): default is set in config.ts. } - // TODO(bplotka in PR #513 review): Move arguments into struct. - duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_query_duplicated_store_addresses_total", - Help: "The number of times a duplicated store addresses is detected from the different configs in query", - }) - - dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName) - if err != nil { - return errors.Wrap(err, "building gRPC client") - } - if grpcCompression != compressionNone { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression))) - } - - fileSDCache := cache.New() - dnsStoreProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_store_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - for _, store := range strictStores { - if dns.IsDynamicNode(store) { - return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store) - } - } - - for _, endpoint := range strictEndpoints { - if dns.IsDynamicNode(endpoint) { - return errors.Errorf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode. Use --endpoint for this", endpoint) - } - } - - // Register resolver for the "thanos:///" scheme for endpoint-groups - dns.RegisterGRPCResolver( - dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), - dns.ResolverType(dnsSDResolver), - ), - dnsSDInterval, - logger, - ) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsRuleProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsTargetProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsMetadataProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsExemplarProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_exemplar_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) options := []store.ProxyStoreOption{ store.WithTSDBSelector(tsdbSelector), @@ -545,35 +400,12 @@ func runQuery( queryReplicaLabels = strutil.ParseFlagLabels(queryReplicaLabels) var ( - endpoints = prepareEndpointSet( - g, - logger, - reg, - []*dns.Provider{ - dnsStoreProvider, - dnsRuleProvider, - dnsExemplarProvider, - dnsMetadataProvider, - dnsTargetProvider, - dnsEndpointProvider, - }, - duplicatedStores, - strictStores, - strictEndpoints, - endpointGroupAddrs, - strictEndpointGroups, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - proxyStore = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) + proxyStore = store.NewProxyStore(logger, reg, endpointSet.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) seriesProxy = store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxyStore), reg, storeRateLimits) - rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) - targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) - metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients) - exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset) + rulesProxy = rules.NewProxy(logger, endpointSet.GetRulesClients) + targetsProxy = targets.NewProxy(logger, endpointSet.GetTargetsClients) + metadataProxy = metadata.NewProxy(logger, endpointSet.GetMetricMetadataClients) + exemplarsProxy = exemplars.NewProxy(logger, endpointSet.GetExemplarsStores, selectorLset) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), @@ -583,78 +415,6 @@ func runQuery( ) ) - // Run File Service Discovery and update the store set when the files are modified. - if fileSD != nil { - var fileSDUpdates chan []*targetgroup.Group - ctxRun, cancelRun := context.WithCancel(context.Background()) - - fileSDUpdates = make(chan []*targetgroup.Group) - - g.Add(func() error { - fileSD.Run(ctxRun, fileSDUpdates) - return nil - }, func(error) { - cancelRun() - }) - - ctxUpdate, cancelUpdate := context.WithCancel(context.Background()) - g.Add(func() error { - for { - select { - case update := <-fileSDUpdates: - // Discoverers sometimes send nil updates so need to check for it to avoid panics. - if update == nil { - continue - } - fileSDCache.Update(update) - endpoints.Update(ctxUpdate) - - if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - - // Rules apis do not support file service discovery as of now. - case <-ctxUpdate.Done(): - return nil - } - } - }, func(error) { - cancelUpdate() - }) - } - // Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval) - defer resolveCancel() - if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) - } - if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) - } - if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) - } - if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) - } - if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err) - - } - return nil - }) - }, func(error) { - cancel() - }) - } - grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -687,7 +447,7 @@ func runQuery( if queryMode != queryModeLocal { level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.") defaultEngine = string(apiv1.PromqlEngineThanos) - remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ + remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpointSet.GetQueryAPIClients, query.Opts{ AutoDownsample: enableAutodownsampling, ReplicaLabels: queryReplicaLabels, PartitionLabels: queryPartitionLabels, @@ -727,11 +487,11 @@ func runQuery( ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) + ui.NewQueryUI(logger, endpointSet, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) api := apiv1.NewQueryAPI( logger, - endpoints.GetEndpointStatus, + endpointSet.GetEndpointStatus, engineFactory, apiv1.PromqlEngineType(defaultEngine), lookbackDeltaCreator, @@ -844,7 +604,7 @@ func runQuery( }, func(error) { statusProber.NotReady(err) s.Shutdown(err) - endpoints.Close() + endpointSet.Close() }) } @@ -852,96 +612,6 @@ func runQuery( return nil } -func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { - set := make(map[string]*query.GRPCEndpointSpec) - for _, spec := range specs { - addr := spec.Addr() - if _, ok := set[addr]; ok { - level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) - duplicatedStores.Inc() - } - set[addr] = spec - } - deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) - for _, value := range set { - deduplicated = append(deduplicated, value) - } - return deduplicated -} - -func prepareEndpointSet( - g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - dnsProviders []*dns.Provider, - duplicatedStores prometheus.Counter, - strictStores []string, - strictEndpoints []string, - endpointGroupAddrs []string, - strictEndpointGroups []string, - dialOpts []grpc.DialOption, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, - queryConnMetricLabels ...string, -) *query.EndpointSet { - endpointSet := query.NewEndpointSet( - time.Now, - logger, - reg, - func() (specs []*query.GRPCEndpointSpec) { - // Add strict & static nodes. - for _, addr := range strictStores { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, addr := range strictEndpoints { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, dnsProvider := range dnsProviders { - var tmpSpecs []*query.GRPCEndpointSpec - - for _, addr := range dnsProvider.Addresses() { - tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false)) - } - tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs) - specs = append(specs, tmpSpecs...) - } - - for _, eg := range endpointGroupAddrs { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - for _, eg := range strictEndpointGroups { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - return specs - }, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - // Periodically update the store set with the addresses we see in our cluster. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - endpointSet.Update(ctx) - return nil - }) - }, func(error) { - cancel() - }) - } - - return endpointSet -} - // LookbackDeltaFactory creates from 1 to 3 lookback deltas depending on // dynamicLookbackDelta and eo.LookbackDelta and returns a function // that returns appropriate lookback delta for given maxSourceResolutionMillis. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e0780452fd..72e161b862 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -79,8 +79,6 @@ import ( "github.com/thanos-io/thanos/pkg/ui" ) -const dnsSDResolver = "miekgdns" - type ruleConfig struct { http httpConfig grpc grpcConfig @@ -404,17 +402,6 @@ func runRule( } if len(grpcEndpoints) > 0 { - duplicatedGRPCEndpoints := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_rule_grpc_endpoints_duplicated_total", - Help: "The number of times a duplicated grpc endpoint is detected from the different configs in rule", - }) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_rule_grpc_endpoints_", reg), - dnsSDResolver, - ) - dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, @@ -430,36 +417,24 @@ func runRule( return err } - grpcEndpointSet = prepareEndpointSet( + grpcEndpointSet, err = setupEndpointset( g, - logger, reg, - []*dns.Provider{dnsEndpointProvider}, - duplicatedGRPCEndpoints, + logger, nil, + 1*time.Minute, + grpcEndpoints, nil, nil, nil, - dialOpts, + conf.query.dnsSDResolver, + conf.query.dnsSDInterval, 5*time.Minute, 5*time.Second, + dialOpts, ) - - // Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second) - defer resolveCancel() - if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err) - } - return nil - }) - }, func(error) { - cancel() - }) + if err != nil { + return nil } } diff --git a/docs/components/query.md b/docs/components/query.md index 10975138b6..a97bef900c 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -299,27 +299,14 @@ Flags: detected maximum container or system memory. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. - --endpoint= ... Addresses of statically configured Thanos - API servers (repeatable). The scheme may be - prefixed with 'dns+' or 'dnssrv+' to detect - Thanos API servers through respective DNS - lookups. - --endpoint-group= ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable). Targets - resolved from the DNS name will be queried in - a round-robin, instead of a fanout manner. - This flag should be used when connecting a - Thanos Query to HA groups of Thanos components. - --endpoint-group-strict= ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable) that are - always used, even if the health check fails. - --endpoint-strict= ... - Addresses of only statically configured Thanos - API servers that are always used, even if - the health check fails. Useful if you have a - caching layer on top. + --endpoint.sd-config= + Alternative to 'endpoint.sd-config-file' flag + (mutually exclusive). Content of Config File + with endpoint definitions + --endpoint.sd-config-file= + Path to Config File with endpoint definitions + --endpoint.sd-config-reload-interval=1m + Interval between endpoint config refreshes --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable @@ -500,19 +487,6 @@ Flags: It follows the Thanos sharding relabel-config syntax. For format details see: https://thanos.io/tip/thanos/sharding.md/#relabelling - --store= ... Deprecation Warning - This flag is deprecated - and replaced with `endpoint`. Addresses of - statically configured store API servers - (repeatable). The scheme may be prefixed with - 'dns+' or 'dnssrv+' to detect store API servers - through respective DNS lookups. - --store-strict= ... - Deprecation Warning - This flag is deprecated - and replaced with `endpoint-strict`. Addresses - of only statically configured store API servers - that are always used, even if the health check - fails. Useful if you have a caching layer on - top. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if @@ -531,12 +505,6 @@ Flags: enabled. 0 disables timeout. --store.sd-dns-interval=30s Interval between DNS resolutions. - --store.sd-files= ... - Path to files that contain addresses of store - API servers. The path can be a glob pattern - (repeatable). - --store.sd-interval=5m Refresh interval to re-read file SD files. - It is used as a resync fallback. --store.unhealthy-timeout=5m Timeout before an unhealthy store is cleaned from the store UI page. diff --git a/pkg/discovery/dns/grpc.go b/pkg/discovery/dns/grpc.go index 79e832b652..7971e7991c 100644 --- a/pkg/discovery/dns/grpc.go +++ b/pkg/discovery/dns/grpc.go @@ -23,7 +23,7 @@ type builder struct { logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { +func RegisterGRPCResolver(logger log.Logger, provider *Provider, interval time.Duration) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, diff --git a/pkg/extkingpin/flags.go b/pkg/extkingpin/flags.go index 62b9142beb..033769c56e 100644 --- a/pkg/extkingpin/flags.go +++ b/pkg/extkingpin/flags.go @@ -47,10 +47,8 @@ func Addrs(flags *kingpin.FlagClause) (target *addressSlice) { return } -// validateAddrs checks an address slice for duplicates and empty or invalid elements. +// validateAddrs checks an address slice for empty or invalid elements. func validateAddrs(addrs addressSlice) error { - set := map[string]struct{}{} - for _, addr := range addrs { if addr == "" { return errors.New("Address is empty.") @@ -61,12 +59,6 @@ func validateAddrs(addrs addressSlice) error { if len(qtypeAndName) != 2 && len(hostAndPort) != 2 { return errors.Errorf("Address %s is not of : format or a valid DNS query.", addr) } - - if _, ok := set[addr]; ok { - return errors.Errorf("Address %s is duplicated.", addr) - } - - set[addr] = struct{}{} } return nil diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 4c519bf925..071e04a846 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -211,8 +211,7 @@ type EndpointSet struct { // Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer // accessible and we close gRPC client for it, unless it is strict. - endpointSpec func() map[string]*GRPCEndpointSpec - dialOpts []grpc.DialOption + endpointSpecs func() map[string]*GRPCEndpointSpec endpointInfoTimeout time.Duration unhealthyEndpointTimeout time.Duration @@ -235,7 +234,6 @@ func NewEndpointSet( logger log.Logger, reg prometheus.Registerer, endpointSpecs func() []*GRPCEndpointSpec, - dialOpts []grpc.DialOption, unhealthyEndpointTimeout time.Duration, endpointInfoTimeout time.Duration, endpointMetricLabels ...string, @@ -254,19 +252,17 @@ func NewEndpointSet( } return &EndpointSet{ - now: now, - logger: log.With(logger, "component", "endpointset"), - endpointsMetric: endpointsMetric, - - dialOpts: dialOpts, + now: now, + logger: log.With(logger, "component", "endpointset"), + endpointsMetric: endpointsMetric, endpointInfoTimeout: endpointInfoTimeout, unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: func() map[string]*GRPCEndpointSpec { - specs := make(map[string]*GRPCEndpointSpec) + endpointSpecs: func() map[string]*GRPCEndpointSpec { + res := make(map[string]*GRPCEndpointSpec) for _, s := range endpointSpecs() { - specs[s.addr] = s + res[s.addr] = s } - return specs + return res }, endpoints: make(map[string]*endpointRef), } @@ -288,7 +284,7 @@ func (e *EndpointSet) Update(ctx context.Context) { mu sync.Mutex ) - for _, spec := range e.endpointSpec() { + for _, spec := range e.endpointSpecs() { spec := spec if er, existingRef := e.endpoints[spec.Addr()]; existingRef { @@ -571,11 +567,7 @@ type endpointRef struct { // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { - var dialOpts []grpc.DialOption - - dialOpts = append(dialOpts, e.dialOpts...) - dialOpts = append(dialOpts, spec.dialOpts...) - conn, err := grpc.NewClient(spec.Addr(), dialOpts...) + conn, err := grpc.NewClient(spec.Addr(), spec.dialOpts...) if err != nil { return nil, errors.Wrap(err, "dialing connection") } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 6f061211ab..68b00aff5e 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -675,11 +675,11 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { endpointSet := NewEndpointSet(nowFunc, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, false)) + specs = append(specs, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Initial update. @@ -1052,7 +1052,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Should not matter how many of these we run. @@ -1159,11 +1159,11 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { slowStaticEndpointAddr := discoveredEndpointAddr[2] endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(discoveredEndpointAddr[0], true), - NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), - NewGRPCEndpointSpec(discoveredEndpointAddr[2], true), + NewGRPCEndpointSpec(discoveredEndpointAddr[0], true, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[1], false, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[2], true, testGRPCOpts...), } - }, testGRPCOpts, time.Minute, 1*time.Second) + }, time.Minute, 1*time.Second) defer endpointSet.Close() // Initial update. @@ -1273,7 +1273,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { endpointSpec: func() []*GRPCEndpointSpec { endpointSpec := make([]*GRPCEndpointSpec, 0, len(endpoints.orderAddrs)) for _, addr := range endpoints.orderAddrs { - endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false)) + endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return endpointSpec }, @@ -1297,7 +1297,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar discovered, no Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), } }, expectedStores: 1, // sidecar @@ -1310,8 +1310,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 2, // sidecar + ruler @@ -1324,7 +1324,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar removed", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 1, // ruler @@ -1344,7 +1344,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { return tc.states[currentState].endpointSpec() }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() @@ -1532,11 +1532,11 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, endpointSet := NewEndpointSet(now, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, strict)) + specs = append(specs, NewGRPCEndpointSpec(addr, strict, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, time.Second, metricLabels...) + time.Minute, time.Second, metricLabels...) return endpointSet } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62..a25f8f6ab9 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,9 +17,7 @@ import ( e2edb "github.com/efficientgo/e2e/db" e2eobs "github.com/efficientgo/e2e/observable" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/relabel" "gopkg.in/yaml.v2" @@ -429,26 +427,25 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { "--store.sd-dns-interval": "5s", "--log.level": infoLogLevel, "--query.max-concurrent": "1", - "--store.sd-interval": "5s", }) for _, repl := range q.replicaLabels { args = append(args, "--query.replica-label="+repl) } for _, addr := range q.storeAddresses { - args = append(args, "--store="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.ruleAddresses { - args = append(args, "--rule="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.targetAddresses { - args = append(args, "--target="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.metadataAddresses { - args = append(args, "--metadata="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.exemplarAddresses { - args = append(args, "--exemplar="+addr) + args = append(args, "--endpoint="+addr) } for _, feature := range q.enableFeatures { args = append(args, "--enable-feature="+feature) @@ -470,21 +467,27 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { return nil, errors.Wrap(err, "create query dir failed") } - fileSD := []*targetgroup.Group{{}} + type EndpointSpec struct{ Address string } + + endpoints := make([]EndpointSpec, 0) for _, a := range q.fileSDStoreAddresses { - fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)}) + endpoints = append(endpoints, EndpointSpec{Address: a}) } - b, err := yaml.Marshal(fileSD) + endpointSDConfig := struct { + Endpoints []EndpointSpec `yaml:"endpoints"` + }{Endpoints: endpoints} + b, err := yaml.Marshal(endpointSDConfig) if err != nil { return nil, err } - if err := os.WriteFile(q.Dir()+"/filesd.yaml", b, 0600); err != nil { + if err := os.WriteFile(q.Dir()+"/endpoint-sd-config.yaml", b, 0600); err != nil { return nil, errors.Wrap(err, "creating query SD config failed") } - args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml")) + args = append(args, "--endpoint.sd-config-file="+filepath.Join(q.InternalDir(), "endpoint-sd-config.yaml")) + args = append(args, "--endpoint.sd-config-reload-interval=5s") } if q.routePrefix != "" { args = append(args, "--web.route-prefix="+q.routePrefix)