-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
query, rule: make endpoint discovery dynamically reloadable
* Removed previously deprecated and hidden flags to configure endpoints ( --rule, --target, ...) * Removed --store.sd-file and --store.sd-interval flags * Added new flags --endpoint.sd-config, --endpoint-sd-config-reload-interval to configure a dynamic SD file * Moved endpoint set construction into cmd/thanos/endpointset.go for a little cleanup The new config makes it possible to also set "strict" and "group" flags on the endpoint instead of only their addresses, making it possible to have file based service discovery for endpoint groups too. Signed-off-by: Michael Hoffmann <[email protected]>
- Loading branch information
1 parent
bfbabbb
commit e9f1ff4
Showing
9 changed files
with
455 additions
and
513 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,298 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
extflag "github.com/efficientgo/tools/extkingpin" | ||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/oklog/run" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"google.golang.org/grpc" | ||
"gopkg.in/yaml.v3" | ||
|
||
"github.com/thanos-io/thanos/pkg/discovery/dns" | ||
"github.com/thanos-io/thanos/pkg/errors" | ||
"github.com/thanos-io/thanos/pkg/extgrpc" | ||
"github.com/thanos-io/thanos/pkg/extkingpin" | ||
"github.com/thanos-io/thanos/pkg/extprom" | ||
"github.com/thanos-io/thanos/pkg/query" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
) | ||
|
||
type EndpointSpec struct { | ||
Strict bool | ||
Group bool | ||
Address string | ||
} | ||
|
||
type EndpointConfig struct { | ||
Endpoints []EndpointSpec | ||
} | ||
|
||
type endpointConfigProvider struct { | ||
mu sync.Mutex | ||
cfg EndpointConfig | ||
|
||
// statically defined endpoints from flags for backwards compatibility | ||
endpoints []string | ||
endpointGroups []string | ||
strictEndpoints []string | ||
strictEndpointGroups []string | ||
} | ||
|
||
func (er *endpointConfigProvider) config() EndpointConfig { | ||
er.mu.Lock() | ||
defer er.mu.Unlock() | ||
|
||
res := EndpointConfig{} | ||
copy(res.Endpoints, er.cfg.Endpoints) | ||
return res | ||
} | ||
|
||
func (er *endpointConfigProvider) parse(configFile *extflag.PathOrContent) (EndpointConfig, error) { | ||
content, err := configFile.Content() | ||
if err != nil { | ||
return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path()) | ||
} | ||
var cfg EndpointConfig | ||
if err := yaml.Unmarshal(content, &cfg); err != nil { | ||
return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path()) | ||
} | ||
return cfg, nil | ||
} | ||
|
||
func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) { | ||
for _, e := range er.endpoints { | ||
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ | ||
Address: e, | ||
}) | ||
} | ||
for _, e := range er.endpointGroups { | ||
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ | ||
Address: e, | ||
Group: true, | ||
}) | ||
} | ||
for _, e := range er.strictEndpoints { | ||
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ | ||
Address: e, | ||
Strict: true, | ||
}) | ||
} | ||
for _, e := range er.strictEndpointGroups { | ||
cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ | ||
Address: e, | ||
Group: true, | ||
Strict: true, | ||
}) | ||
} | ||
} | ||
|
||
func validateEndpointConfig(cfg EndpointConfig) error { | ||
for _, ecfg := range cfg.Endpoints { | ||
if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict { | ||
return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address) | ||
} | ||
if dns.IsDynamicNode(ecfg.Address) && ecfg.Group { | ||
return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func newEndpointConfigProvider( | ||
g *run.Group, | ||
logger log.Logger, | ||
configFile *extflag.PathOrContent, | ||
configReloadInterval time.Duration, | ||
endpoints []string, | ||
endpointGroups []string, | ||
strictEndpoints []string, | ||
strictEndpointGroups []string, | ||
) (*endpointConfigProvider, error) { | ||
res := &endpointConfigProvider{ | ||
endpoints: endpoints, | ||
endpointGroups: endpointGroups, | ||
strictEndpoints: strictEndpoints, | ||
strictEndpointGroups: strictEndpointGroups, | ||
} | ||
|
||
// only static endpoints | ||
if configFile == nil { | ||
cfg := EndpointConfig{} | ||
res.addStaticEndpoints(&cfg) | ||
if err := validateEndpointConfig(cfg); err != nil { | ||
return nil, err | ||
} | ||
res.cfg = cfg | ||
return res, nil | ||
} | ||
|
||
// dynamically reload config file and merge with static endpoints | ||
cfg, err := res.parse(configFile) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "unable to load config file") | ||
} | ||
res.cfg = cfg | ||
|
||
reloadCtx, reloadCancel := context.WithCancel(context.Background()) | ||
g.Add(func() error { | ||
return extkingpin.PathContentReloader(reloadCtx, configFile, logger, func() { | ||
res.mu.Lock() | ||
defer res.mu.Unlock() | ||
|
||
level.Info(logger).Log("msg", "reloading endpoint config") | ||
cfg, err := res.parse(configFile) | ||
if err != nil { | ||
level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err) | ||
return | ||
} | ||
res.addStaticEndpoints(&cfg) | ||
if err := validateEndpointConfig(cfg); err != nil { | ||
level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err) | ||
return | ||
} | ||
res.cfg = cfg | ||
}, configReloadInterval) | ||
}, func(err error) { | ||
reloadCancel() | ||
}) | ||
return res, nil | ||
} | ||
|
||
func setupEndpointset( | ||
g *run.Group, | ||
reg prometheus.Registerer, | ||
logger log.Logger, | ||
configFile *extflag.PathOrContent, | ||
configReloadInterval time.Duration, | ||
endpoints []string, | ||
endpointGroups []string, | ||
strictEndpoints []string, | ||
strictEndpointGroups []string, | ||
dnsSDResolver string, | ||
dnsSDInterval time.Duration, | ||
unhealthyTimeout time.Duration, | ||
endpointTimeout time.Duration, | ||
dialOpts []grpc.DialOption, | ||
queryConnMetricLabels ...string, | ||
) (*query.EndpointSet, error) { | ||
configProvider, err := newEndpointConfigProvider( | ||
g, | ||
logger, | ||
configFile, | ||
configReloadInterval, | ||
endpoints, | ||
endpointGroups, | ||
strictEndpoints, | ||
strictEndpointGroups, | ||
) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "unable to load config initially") | ||
} | ||
// Register resolver for the "thanos:///" scheme for endpoint-groups | ||
dns.RegisterGRPCResolver( | ||
logger, | ||
dns.NewProvider( | ||
logger, | ||
extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), | ||
dns.ResolverType(dnsSDResolver), | ||
), | ||
dnsSDInterval, | ||
) | ||
dnsEndpointProvider := dns.NewProvider( | ||
logger, | ||
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), | ||
dns.ResolverType(dnsSDResolver), | ||
) | ||
duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
Name: "thanos_query_duplicated_store_addresses_total", | ||
Help: "The number of times a duplicated store addresses is detected from the different configs in query", | ||
}) | ||
|
||
removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { | ||
set := make(map[string]*query.GRPCEndpointSpec) | ||
for _, spec := range specs { | ||
addr := spec.Addr() | ||
if _, ok := set[addr]; ok { | ||
level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) | ||
duplicatedStores.Inc() | ||
} | ||
set[addr] = spec | ||
} | ||
deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) | ||
for _, value := range set { | ||
deduplicated = append(deduplicated, value) | ||
} | ||
return deduplicated | ||
} | ||
|
||
ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background()) | ||
g.Add(func() error { | ||
return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error { | ||
ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval) | ||
defer cancelUpdateIter() | ||
|
||
endpointConfig := configProvider.config() | ||
|
||
addresses := make([]string, 0, len(endpointConfig.Endpoints)) | ||
for _, ecfg := range endpointConfig.Endpoints { | ||
if addr := ecfg.Address; dns.IsDynamicNode(addr) { | ||
addresses = append(addresses, addr) | ||
} | ||
} | ||
if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil { | ||
level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err) | ||
} | ||
return nil | ||
}) | ||
}, func(error) { | ||
cancelDNSUpdate() | ||
}) | ||
|
||
endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec { | ||
endpointConfig := configProvider.config() | ||
|
||
specs := make([]*query.GRPCEndpointSpec, 0) | ||
// add static nodes | ||
for _, ecfg := range endpointConfig.Endpoints { | ||
strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address | ||
if dns.IsDynamicNode(addr) { | ||
continue | ||
} | ||
if group { | ||
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...)) | ||
} else { | ||
specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...)) | ||
} | ||
|
||
} | ||
// add dynamic nodes | ||
for _, addr := range dnsEndpointProvider.Addresses() { | ||
specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...)) | ||
} | ||
return removeDuplicateEndpointSpecs(specs) | ||
}, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...) | ||
|
||
ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background()) | ||
g.Add(func() error { | ||
return runutil.Repeat(5*time.Second, ctxEndpointUpdate.Done(), func() error { | ||
ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, 5*time.Second) | ||
defer cancelIter() | ||
endpointset.Update(ctxIter) | ||
return nil | ||
}) | ||
}, func(error) { | ||
cancelEndpointUpdate() | ||
}) | ||
|
||
return endpointset, nil | ||
} |
Oops, something went wrong.