diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index f72d19fd79..f16f885737 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -14,11 +14,17 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "google.golang.org/grpc" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" ) @@ -58,6 +64,38 @@ func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig { return gc } +type grpcClientConfig struct { + secure bool + skipVerify bool + cert, key, caCert string + serverName string + compression string +} + +func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig { + cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").BoolVar(&gc.secure) + cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").BoolVar(&gc.skipVerify) + cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").StringVar(&gc.cert) + cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").StringVar(&gc.key) + cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").StringVar(&gc.caCert) + cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName) + compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") + cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone) + + return gc +} + +func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) { + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName) + if err != nil { + return nil, errors.Wrapf(err, "building gRPC client") + } + if gc.compression != compressionNone { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression))) + } + return dialOpts, nil +} + type httpConfig struct { bindAddress string tlsConfig string diff --git a/cmd/thanos/endpointset.go b/cmd/thanos/endpointset.go new file mode 100644 index 0000000000..d8610e0d2e --- /dev/null +++ b/cmd/thanos/endpointset.go @@ -0,0 +1,289 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package main + +import ( + "context" + "fmt" + "sync" + "time" + + extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "gopkg.in/yaml.v3" + + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/runutil" +) + +type EndpointSpec struct { + Strict bool `yaml:"strict"` + Group bool `yaml:"group"` + Address string `yaml:"address"` +} + +type EndpointConfig struct { + Endpoints []EndpointSpec `yaml:"endpoints"` +} + +type endpointConfigProvider struct { + mu sync.Mutex + cfg EndpointConfig + + // statically defined endpoints from flags for backwards compatibility + endpoints []string + endpointGroups []string + strictEndpoints []string + strictEndpointGroups []string +} + +func (er *endpointConfigProvider) config() EndpointConfig { + er.mu.Lock() + defer er.mu.Unlock() + + res := EndpointConfig{Endpoints: make([]EndpointSpec, len(er.cfg.Endpoints))} + copy(res.Endpoints, er.cfg.Endpoints) + return res +} + +func (er *endpointConfigProvider) parse(configFile *extflag.PathOrContent) (EndpointConfig, error) { + content, err := configFile.Content() + if err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path()) + } + var cfg EndpointConfig + if err := yaml.Unmarshal(content, &cfg); err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path()) + } + return cfg, nil +} + +func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) { + for _, e := range er.endpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + }) + } + for _, e := range er.endpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + }) + } + for _, e := range er.strictEndpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Strict: true, + }) + } + for _, e := range er.strictEndpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + Strict: true, + }) + } +} + +func validateEndpointConfig(cfg EndpointConfig) error { + for _, ecfg := range cfg.Endpoints { + if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address) + } + if dns.IsDynamicNode(ecfg.Address) && ecfg.Group { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address) + } + } + return nil +} + +func newEndpointConfigProvider( + g *run.Group, + logger log.Logger, + configFile *extflag.PathOrContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, +) (*endpointConfigProvider, error) { + res := &endpointConfigProvider{ + endpoints: endpoints, + endpointGroups: endpointGroups, + strictEndpoints: strictEndpoints, + strictEndpointGroups: strictEndpointGroups, + } + + cfg, err := res.parse(configFile) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config file") + } + res.addStaticEndpoints(&cfg) + res.cfg = cfg + + // only static endpoints + if len(configFile.Path()) == 0 { + return res, nil + } + + if err := extkingpin.PathContentReloader(context.Background(), configFile, logger, func() { + res.mu.Lock() + defer res.mu.Unlock() + + level.Info(logger).Log("msg", "reloading endpoint config") + cfg, err := res.parse(configFile) + if err != nil { + level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err) + return + } + res.addStaticEndpoints(&cfg) + if err := validateEndpointConfig(cfg); err != nil { + level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err) + return + } + res.cfg = cfg + }, configReloadInterval); err != nil { + return nil, errors.Wrapf(err, "unable to create config reloader") + } + return res, nil +} + +func setupEndpointset( + g *run.Group, + reg prometheus.Registerer, + logger log.Logger, + configFile *extflag.PathOrContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, + dnsSDResolver string, + dnsSDInterval time.Duration, + unhealthyTimeout time.Duration, + endpointTimeout time.Duration, + dialOpts []grpc.DialOption, + queryConnMetricLabels ...string, +) (*query.EndpointSet, error) { + configProvider, err := newEndpointConfigProvider( + g, + logger, + configFile, + configReloadInterval, + endpoints, + endpointGroups, + strictEndpoints, + strictEndpointGroups, + ) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config initially") + } + // Register resolver for the "thanos:///" scheme for endpoint-groups + dns.RegisterGRPCResolver( + logger, + dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), + dns.ResolverType(dnsSDResolver), + ), + dnsSDInterval, + ) + dnsEndpointProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), + dns.ResolverType(dnsSDResolver), + ) + duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_query_duplicated_store_addresses_total", + Help: "The number of times a duplicated store addresses is detected from the different configs in query", + }) + + removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { + set := make(map[string]*query.GRPCEndpointSpec) + for _, spec := range specs { + addr := spec.Addr() + if _, ok := set[addr]; ok { + level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) + duplicatedStores.Inc() + } + set[addr] = spec + } + deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) + for _, value := range set { + deduplicated = append(deduplicated, value) + } + return deduplicated + } + + ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error { + ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval) + defer cancelUpdateIter() + + endpointConfig := configProvider.config() + + addresses := make([]string, 0, len(endpointConfig.Endpoints)) + for _, ecfg := range endpointConfig.Endpoints { + if addr := ecfg.Address; dns.IsDynamicNode(addr) { + addresses = append(addresses, addr) + } + } + if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err) + } + return nil + }) + }, func(error) { + cancelDNSUpdate() + }) + + endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec { + endpointConfig := configProvider.config() + + specs := make([]*query.GRPCEndpointSpec, 0) + // add static nodes + for _, ecfg := range endpointConfig.Endpoints { + strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address + if dns.IsDynamicNode(addr) { + continue + } + if group { + specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...)) + } else { + specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...)) + } + } + // add dynamic nodes + for _, addr := range dnsEndpointProvider.Addresses() { + specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...)) + } + return removeDuplicateEndpointSpecs(specs) + }, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...) + + ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(endpointTimeout, ctxEndpointUpdate.Done(), func() error { + ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, endpointTimeout) + defer cancelIter() + + endpointset.Update(ctxIter) + return nil + }) + }, func(error) { + cancelEndpointUpdate() + }) + + return endpointset, nil +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 69ffb8ea32..b48af3596c 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -4,7 +4,6 @@ package main import ( - "context" "fmt" "math" "net/http" @@ -12,7 +11,6 @@ import ( "time" extflag "github.com/efficientgo/tools/extkingpin" - "google.golang.org/grpc" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -21,11 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/api" @@ -35,11 +29,8 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/exemplars" - "github.com/thanos-io/thanos/pkg/extgrpc" - "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -51,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" - "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" @@ -86,14 +76,8 @@ func registerQuery(app *extkingpin.App) { var grpcServerConfig grpcConfig grpcServerConfig.registerFlag(cmd) - secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() - skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() - cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() - key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String() - caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String() - serverName := cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() - compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") - grpcCompression := cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).Enum(snappy.Name, compressionNone) + var grpcClientConfig grpcClientConfig + grpcClientConfig.registerFlag(cmd) webRoutePrefix := cmd.Flag("web.route-prefix", "Prefix for API and UI endpoints. This allows thanos UI to be served on a sub-path. Defaults to the value of --web.external-prefix. This option is analogous to --web.route-prefix of Prometheus.").Default("").String() webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() @@ -134,55 +118,6 @@ func registerQuery(app *extkingpin.App) { selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("=\"\"").Strings() - endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups."). - PlaceHolder("")) - - endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components."). - PlaceHolder("")) - - stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). - PlaceHolder("")) - - // TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - ruleEndpoints := extkingpin.Addrs(cmd.Flag("rule", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - metadataEndpoints := extkingpin.Addrs(cmd.Flag("metadata", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - exemplarEndpoints := extkingpin.Addrs(cmd.Flag("exemplar", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - // TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - targetEndpoints := extkingpin.Addrs(cmd.Flag("target", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - strictStores := cmd.Flag("store-strict", "Deprecation Warning - This flag is deprecated and replaced with `endpoint-strict`. Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("").Strings() - - strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("").Strings() - - strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails."). - PlaceHolder("")) - - fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). - PlaceHolder("").Strings() - - fileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "Refresh interval to re-read file SD files. It is used as a resync fallback."). - Default("5m")) - - // TODO(bwplotka): Grab this from TTL at some point. - dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). - Default("30s")) - - dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). - Default(string(dns.MiekgdnsResolverType)).Hidden().String() - - unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) - - endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) - enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified."). Default("false").Bool() @@ -233,6 +168,29 @@ func registerQuery(app *extkingpin.App) { tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool() tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String() + // TODO(bwplotka): Grab this from TTL at some point. + dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). + Default("30s")) + + dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). + Default(string(dns.MiekgdnsResolverType)).Hidden().String() + + unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) + + endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) + + endpointSetConfig := extflag.RegisterPathOrContent(cmd, "endpoint.sd-config", "Config File with endpoint definitions") + + endpointSetConfigReloadInterval := extkingpin.ModelDuration(cmd.Flag("endpoint.sd-config-reload-interval", "Interval between endpoint config refreshes").Default("1m")) + + endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "(Deprecated): Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").Hidden().PlaceHolder("")) + + endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").Hidden().PlaceHolder("")) + + strictEndpoints := extkingpin.Addrs(cmd.Flag("endpoint-strict", "(Deprecated): Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). + Hidden().PlaceHolder("")) + + strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").Hidden().PlaceHolder("")) var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -266,18 +224,6 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } - var fileSD *file.Discovery - if len(*fileSDFiles) > 0 { - conf := &file.SDConfig{ - Files: *fileSDFiles, - RefreshInterval: *fileSDInterval, - } - var err error - if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil { - return err - } - } - if *webRoutePrefix == "" { *webRoutePrefix = *webExternalPrefix } @@ -295,23 +241,43 @@ func registerQuery(app *extkingpin.App) { return err } + dialOpts, err := grpcClientConfig.dialOptions(logger, reg, tracer) + if err != nil { + return err + } + + endpointSet, err := setupEndpointset( + g, + reg, + logger, + endpointSetConfig, + time.Duration(*endpointSetConfigReloadInterval), + *endpoints, + *endpointGroups, + *strictEndpoints, + *strictEndpointGroups, + *dnsSDResolver, + time.Duration(*dnsSDInterval), + time.Duration(*unhealthyStoreTimeout), + time.Duration(*endpointInfoTimeout), + dialOpts, + *queryConnMetricLabels..., + ) + if err != nil { + return err + } + return runQuery( g, logger, debugLogging, + endpointSet, reg, tracer, httpLogOpts, grpcLogOpts, logFilterMethods, grpcServerConfig, - *grpcCompression, - *secure, - *skipVerify, - *cert, - *key, - *caCert, - *serverName, *httpBindAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), @@ -326,18 +292,10 @@ func registerQuery(app *extkingpin.App) { *dynamicLookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), - *queryConnMetricLabels, *queryReplicaLabels, *queryPartitionLabels, selectorLset, getFlagsMap(cmd.Flags()), - *endpoints, - *endpointGroups, - *stores, - *ruleEndpoints, - *targetEndpoints, - *metadataEndpoints, - *exemplarEndpoints, *enableAutodownsampling, *enableQueryPartialResponse, *enableRulePartialResponse, @@ -345,16 +303,8 @@ func registerQuery(app *extkingpin.App) { *enableMetricMetadataPartialResponse, *enableExemplarPartialResponse, *activeQueryDir, - fileSD, - time.Duration(*dnsSDInterval), - *dnsSDResolver, - time.Duration(*unhealthyStoreTimeout), - time.Duration(*endpointInfoTimeout), time.Duration(*instantDefaultMaxSourceResolution), *defaultMetadataTimeRange, - *strictStores, - *strictEndpoints, - *strictEndpointGroups, *webDisableCORS, *alertQueryURL, *grpcProxyStrategy, @@ -381,19 +331,13 @@ func runQuery( g *run.Group, logger log.Logger, debugLogging bool, + endpointSet *query.EndpointSet, reg *prometheus.Registry, tracer opentracing.Tracer, httpLogOpts []logging.Option, grpcLogOpts []grpc_logging.Option, logFilterMethods []string, grpcServerConfig grpcConfig, - grpcCompression string, - secure bool, - skipVerify bool, - cert string, - key string, - caCert string, - serverName string, httpBindAddr string, httpTLSConfig string, httpGracePeriod time.Duration, @@ -408,18 +352,10 @@ func runQuery( dynamicLookbackDelta bool, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, - queryConnMetricLabels []string, queryReplicaLabels []string, queryPartitionLabels []string, selectorLset labels.Labels, flagsMap map[string]string, - endpointAddrs []string, - endpointGroupAddrs []string, - storeAddrs []string, - ruleAddrs []string, - targetAddrs []string, - metadataAddrs []string, - exemplarAddrs []string, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, @@ -427,16 +363,8 @@ func runQuery( enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, activeQueryDir string, - fileSD *file.Discovery, - dnsSDInterval time.Duration, - dnsSDResolver string, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, defaultMetadataTimeRange time.Duration, - strictStores []string, - strictEndpoints []string, - strictEndpointGroups []string, disableCORS bool, alertQueryURL string, grpcProxyStrategy string, @@ -462,79 +390,6 @@ func runQuery( } // NOTE(GiedriusS): default is set in config.ts. } - // TODO(bplotka in PR #513 review): Move arguments into struct. - duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_query_duplicated_store_addresses_total", - Help: "The number of times a duplicated store addresses is detected from the different configs in query", - }) - - dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName) - if err != nil { - return errors.Wrap(err, "building gRPC client") - } - if grpcCompression != compressionNone { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression))) - } - - fileSDCache := cache.New() - dnsStoreProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_store_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - for _, store := range strictStores { - if dns.IsDynamicNode(store) { - return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store) - } - } - - for _, endpoint := range strictEndpoints { - if dns.IsDynamicNode(endpoint) { - return errors.Errorf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode. Use --endpoint for this", endpoint) - } - } - - // Register resolver for the "thanos:///" scheme for endpoint-groups - dns.RegisterGRPCResolver( - dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), - dns.ResolverType(dnsSDResolver), - ), - dnsSDInterval, - logger, - ) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsRuleProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsTargetProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsMetadataProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsExemplarProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_exemplar_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) options := []store.ProxyStoreOption{ store.WithTSDBSelector(tsdbSelector), @@ -545,35 +400,12 @@ func runQuery( queryReplicaLabels = strutil.ParseFlagLabels(queryReplicaLabels) var ( - endpoints = prepareEndpointSet( - g, - logger, - reg, - []*dns.Provider{ - dnsStoreProvider, - dnsRuleProvider, - dnsExemplarProvider, - dnsMetadataProvider, - dnsTargetProvider, - dnsEndpointProvider, - }, - duplicatedStores, - strictStores, - strictEndpoints, - endpointGroupAddrs, - strictEndpointGroups, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - proxyStore = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) + proxyStore = store.NewProxyStore(logger, reg, endpointSet.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) seriesProxy = store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxyStore), reg, storeRateLimits) - rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) - targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) - metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients) - exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset) + rulesProxy = rules.NewProxy(logger, endpointSet.GetRulesClients) + targetsProxy = targets.NewProxy(logger, endpointSet.GetTargetsClients) + metadataProxy = metadata.NewProxy(logger, endpointSet.GetMetricMetadataClients) + exemplarsProxy = exemplars.NewProxy(logger, endpointSet.GetExemplarsStores, selectorLset) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), @@ -583,78 +415,6 @@ func runQuery( ) ) - // Run File Service Discovery and update the store set when the files are modified. - if fileSD != nil { - var fileSDUpdates chan []*targetgroup.Group - ctxRun, cancelRun := context.WithCancel(context.Background()) - - fileSDUpdates = make(chan []*targetgroup.Group) - - g.Add(func() error { - fileSD.Run(ctxRun, fileSDUpdates) - return nil - }, func(error) { - cancelRun() - }) - - ctxUpdate, cancelUpdate := context.WithCancel(context.Background()) - g.Add(func() error { - for { - select { - case update := <-fileSDUpdates: - // Discoverers sometimes send nil updates so need to check for it to avoid panics. - if update == nil { - continue - } - fileSDCache.Update(update) - endpoints.Update(ctxUpdate) - - if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - - // Rules apis do not support file service discovery as of now. - case <-ctxUpdate.Done(): - return nil - } - } - }, func(error) { - cancelUpdate() - }) - } - // Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval) - defer resolveCancel() - if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) - } - if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) - } - if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) - } - if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) - } - if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err) - - } - return nil - }) - }, func(error) { - cancel() - }) - } - grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -687,7 +447,7 @@ func runQuery( if queryMode != queryModeLocal { level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.") defaultEngine = string(apiv1.PromqlEngineThanos) - remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ + remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpointSet.GetQueryAPIClients, query.Opts{ AutoDownsample: enableAutodownsampling, ReplicaLabels: queryReplicaLabels, PartitionLabels: queryPartitionLabels, @@ -727,11 +487,11 @@ func runQuery( ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) + ui.NewQueryUI(logger, endpointSet, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) api := apiv1.NewQueryAPI( logger, - endpoints.GetEndpointStatus, + endpointSet.GetEndpointStatus, engineFactory, apiv1.PromqlEngineType(defaultEngine), lookbackDeltaCreator, @@ -844,7 +604,7 @@ func runQuery( }, func(error) { statusProber.NotReady(err) s.Shutdown(err) - endpoints.Close() + endpointSet.Close() }) } @@ -852,96 +612,6 @@ func runQuery( return nil } -func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { - set := make(map[string]*query.GRPCEndpointSpec) - for _, spec := range specs { - addr := spec.Addr() - if _, ok := set[addr]; ok { - level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) - duplicatedStores.Inc() - } - set[addr] = spec - } - deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) - for _, value := range set { - deduplicated = append(deduplicated, value) - } - return deduplicated -} - -func prepareEndpointSet( - g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - dnsProviders []*dns.Provider, - duplicatedStores prometheus.Counter, - strictStores []string, - strictEndpoints []string, - endpointGroupAddrs []string, - strictEndpointGroups []string, - dialOpts []grpc.DialOption, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, - queryConnMetricLabels ...string, -) *query.EndpointSet { - endpointSet := query.NewEndpointSet( - time.Now, - logger, - reg, - func() (specs []*query.GRPCEndpointSpec) { - // Add strict & static nodes. - for _, addr := range strictStores { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, addr := range strictEndpoints { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, dnsProvider := range dnsProviders { - var tmpSpecs []*query.GRPCEndpointSpec - - for _, addr := range dnsProvider.Addresses() { - tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false)) - } - tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs) - specs = append(specs, tmpSpecs...) - } - - for _, eg := range endpointGroupAddrs { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - for _, eg := range strictEndpointGroups { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - return specs - }, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - // Periodically update the store set with the addresses we see in our cluster. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - endpointSet.Update(ctx) - return nil - }) - }, func(error) { - cancel() - }) - } - - return endpointSet -} - // LookbackDeltaFactory creates from 1 to 3 lookback deltas depending on // dynamicLookbackDelta and eo.LookbackDelta and returns a function // that returns appropriate lookback delta for given maxSourceResolutionMillis. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e0780452fd..72e161b862 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -79,8 +79,6 @@ import ( "github.com/thanos-io/thanos/pkg/ui" ) -const dnsSDResolver = "miekgdns" - type ruleConfig struct { http httpConfig grpc grpcConfig @@ -404,17 +402,6 @@ func runRule( } if len(grpcEndpoints) > 0 { - duplicatedGRPCEndpoints := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_rule_grpc_endpoints_duplicated_total", - Help: "The number of times a duplicated grpc endpoint is detected from the different configs in rule", - }) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_rule_grpc_endpoints_", reg), - dnsSDResolver, - ) - dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, @@ -430,36 +417,24 @@ func runRule( return err } - grpcEndpointSet = prepareEndpointSet( + grpcEndpointSet, err = setupEndpointset( g, - logger, reg, - []*dns.Provider{dnsEndpointProvider}, - duplicatedGRPCEndpoints, + logger, nil, + 1*time.Minute, + grpcEndpoints, nil, nil, nil, - dialOpts, + conf.query.dnsSDResolver, + conf.query.dnsSDInterval, 5*time.Minute, 5*time.Second, + dialOpts, ) - - // Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second) - defer resolveCancel() - if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err) - } - return nil - }) - }, func(error) { - cancel() - }) + if err != nil { + return nil } } diff --git a/docs/components/query.md b/docs/components/query.md index 10975138b6..a97bef900c 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -299,27 +299,14 @@ Flags: detected maximum container or system memory. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. - --endpoint= ... Addresses of statically configured Thanos - API servers (repeatable). The scheme may be - prefixed with 'dns+' or 'dnssrv+' to detect - Thanos API servers through respective DNS - lookups. - --endpoint-group= ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable). Targets - resolved from the DNS name will be queried in - a round-robin, instead of a fanout manner. - This flag should be used when connecting a - Thanos Query to HA groups of Thanos components. - --endpoint-group-strict= ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable) that are - always used, even if the health check fails. - --endpoint-strict= ... - Addresses of only statically configured Thanos - API servers that are always used, even if - the health check fails. Useful if you have a - caching layer on top. + --endpoint.sd-config= + Alternative to 'endpoint.sd-config-file' flag + (mutually exclusive). Content of Config File + with endpoint definitions + --endpoint.sd-config-file= + Path to Config File with endpoint definitions + --endpoint.sd-config-reload-interval=1m + Interval between endpoint config refreshes --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable @@ -500,19 +487,6 @@ Flags: It follows the Thanos sharding relabel-config syntax. For format details see: https://thanos.io/tip/thanos/sharding.md/#relabelling - --store= ... Deprecation Warning - This flag is deprecated - and replaced with `endpoint`. Addresses of - statically configured store API servers - (repeatable). The scheme may be prefixed with - 'dns+' or 'dnssrv+' to detect store API servers - through respective DNS lookups. - --store-strict= ... - Deprecation Warning - This flag is deprecated - and replaced with `endpoint-strict`. Addresses - of only statically configured store API servers - that are always used, even if the health check - fails. Useful if you have a caching layer on - top. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if @@ -531,12 +505,6 @@ Flags: enabled. 0 disables timeout. --store.sd-dns-interval=30s Interval between DNS resolutions. - --store.sd-files= ... - Path to files that contain addresses of store - API servers. The path can be a glob pattern - (repeatable). - --store.sd-interval=5m Refresh interval to re-read file SD files. - It is used as a resync fallback. --store.unhealthy-timeout=5m Timeout before an unhealthy store is cleaned from the store UI page. diff --git a/pkg/discovery/dns/grpc.go b/pkg/discovery/dns/grpc.go index 79e832b652..7971e7991c 100644 --- a/pkg/discovery/dns/grpc.go +++ b/pkg/discovery/dns/grpc.go @@ -23,7 +23,7 @@ type builder struct { logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { +func RegisterGRPCResolver(logger log.Logger, provider *Provider, interval time.Duration) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, diff --git a/pkg/extkingpin/flags.go b/pkg/extkingpin/flags.go index 62b9142beb..033769c56e 100644 --- a/pkg/extkingpin/flags.go +++ b/pkg/extkingpin/flags.go @@ -47,10 +47,8 @@ func Addrs(flags *kingpin.FlagClause) (target *addressSlice) { return } -// validateAddrs checks an address slice for duplicates and empty or invalid elements. +// validateAddrs checks an address slice for empty or invalid elements. func validateAddrs(addrs addressSlice) error { - set := map[string]struct{}{} - for _, addr := range addrs { if addr == "" { return errors.New("Address is empty.") @@ -61,12 +59,6 @@ func validateAddrs(addrs addressSlice) error { if len(qtypeAndName) != 2 && len(hostAndPort) != 2 { return errors.Errorf("Address %s is not of : format or a valid DNS query.", addr) } - - if _, ok := set[addr]; ok { - return errors.Errorf("Address %s is duplicated.", addr) - } - - set[addr] = struct{}{} } return nil diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 4c519bf925..071e04a846 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -211,8 +211,7 @@ type EndpointSet struct { // Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer // accessible and we close gRPC client for it, unless it is strict. - endpointSpec func() map[string]*GRPCEndpointSpec - dialOpts []grpc.DialOption + endpointSpecs func() map[string]*GRPCEndpointSpec endpointInfoTimeout time.Duration unhealthyEndpointTimeout time.Duration @@ -235,7 +234,6 @@ func NewEndpointSet( logger log.Logger, reg prometheus.Registerer, endpointSpecs func() []*GRPCEndpointSpec, - dialOpts []grpc.DialOption, unhealthyEndpointTimeout time.Duration, endpointInfoTimeout time.Duration, endpointMetricLabels ...string, @@ -254,19 +252,17 @@ func NewEndpointSet( } return &EndpointSet{ - now: now, - logger: log.With(logger, "component", "endpointset"), - endpointsMetric: endpointsMetric, - - dialOpts: dialOpts, + now: now, + logger: log.With(logger, "component", "endpointset"), + endpointsMetric: endpointsMetric, endpointInfoTimeout: endpointInfoTimeout, unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: func() map[string]*GRPCEndpointSpec { - specs := make(map[string]*GRPCEndpointSpec) + endpointSpecs: func() map[string]*GRPCEndpointSpec { + res := make(map[string]*GRPCEndpointSpec) for _, s := range endpointSpecs() { - specs[s.addr] = s + res[s.addr] = s } - return specs + return res }, endpoints: make(map[string]*endpointRef), } @@ -288,7 +284,7 @@ func (e *EndpointSet) Update(ctx context.Context) { mu sync.Mutex ) - for _, spec := range e.endpointSpec() { + for _, spec := range e.endpointSpecs() { spec := spec if er, existingRef := e.endpoints[spec.Addr()]; existingRef { @@ -571,11 +567,7 @@ type endpointRef struct { // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { - var dialOpts []grpc.DialOption - - dialOpts = append(dialOpts, e.dialOpts...) - dialOpts = append(dialOpts, spec.dialOpts...) - conn, err := grpc.NewClient(spec.Addr(), dialOpts...) + conn, err := grpc.NewClient(spec.Addr(), spec.dialOpts...) if err != nil { return nil, errors.Wrap(err, "dialing connection") } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 6f061211ab..68b00aff5e 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -675,11 +675,11 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { endpointSet := NewEndpointSet(nowFunc, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, false)) + specs = append(specs, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Initial update. @@ -1052,7 +1052,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Should not matter how many of these we run. @@ -1159,11 +1159,11 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { slowStaticEndpointAddr := discoveredEndpointAddr[2] endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(discoveredEndpointAddr[0], true), - NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), - NewGRPCEndpointSpec(discoveredEndpointAddr[2], true), + NewGRPCEndpointSpec(discoveredEndpointAddr[0], true, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[1], false, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[2], true, testGRPCOpts...), } - }, testGRPCOpts, time.Minute, 1*time.Second) + }, time.Minute, 1*time.Second) defer endpointSet.Close() // Initial update. @@ -1273,7 +1273,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { endpointSpec: func() []*GRPCEndpointSpec { endpointSpec := make([]*GRPCEndpointSpec, 0, len(endpoints.orderAddrs)) for _, addr := range endpoints.orderAddrs { - endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false)) + endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return endpointSpec }, @@ -1297,7 +1297,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar discovered, no Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), } }, expectedStores: 1, // sidecar @@ -1310,8 +1310,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 2, // sidecar + ruler @@ -1324,7 +1324,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar removed", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 1, // ruler @@ -1344,7 +1344,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { return tc.states[currentState].endpointSpec() }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() @@ -1532,11 +1532,11 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, endpointSet := NewEndpointSet(now, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, strict)) + specs = append(specs, NewGRPCEndpointSpec(addr, strict, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, time.Second, metricLabels...) + time.Minute, time.Second, metricLabels...) return endpointSet } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62..a25f8f6ab9 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,9 +17,7 @@ import ( e2edb "github.com/efficientgo/e2e/db" e2eobs "github.com/efficientgo/e2e/observable" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/relabel" "gopkg.in/yaml.v2" @@ -429,26 +427,25 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { "--store.sd-dns-interval": "5s", "--log.level": infoLogLevel, "--query.max-concurrent": "1", - "--store.sd-interval": "5s", }) for _, repl := range q.replicaLabels { args = append(args, "--query.replica-label="+repl) } for _, addr := range q.storeAddresses { - args = append(args, "--store="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.ruleAddresses { - args = append(args, "--rule="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.targetAddresses { - args = append(args, "--target="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.metadataAddresses { - args = append(args, "--metadata="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.exemplarAddresses { - args = append(args, "--exemplar="+addr) + args = append(args, "--endpoint="+addr) } for _, feature := range q.enableFeatures { args = append(args, "--enable-feature="+feature) @@ -470,21 +467,27 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { return nil, errors.Wrap(err, "create query dir failed") } - fileSD := []*targetgroup.Group{{}} + type EndpointSpec struct{ Address string } + + endpoints := make([]EndpointSpec, 0) for _, a := range q.fileSDStoreAddresses { - fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)}) + endpoints = append(endpoints, EndpointSpec{Address: a}) } - b, err := yaml.Marshal(fileSD) + endpointSDConfig := struct { + Endpoints []EndpointSpec `yaml:"endpoints"` + }{Endpoints: endpoints} + b, err := yaml.Marshal(endpointSDConfig) if err != nil { return nil, err } - if err := os.WriteFile(q.Dir()+"/filesd.yaml", b, 0600); err != nil { + if err := os.WriteFile(q.Dir()+"/endpoint-sd-config.yaml", b, 0600); err != nil { return nil, errors.Wrap(err, "creating query SD config failed") } - args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml")) + args = append(args, "--endpoint.sd-config-file="+filepath.Join(q.InternalDir(), "endpoint-sd-config.yaml")) + args = append(args, "--endpoint.sd-config-reload-interval=5s") } if q.routePrefix != "" { args = append(args, "--web.route-prefix="+q.routePrefix)