Skip to content

Commit

Permalink
query, rule: make endpoint discovery dynamically reloadable
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Nov 7, 2024
1 parent 731e460 commit 7b8c118
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 488 deletions.
330 changes: 330 additions & 0 deletions cmd/thanos/endpointset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"context"
"fmt"
"slices"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/errors"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extgrpc/snappy"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/runutil"
)

type EndpointSpec struct {
Strict bool
Group bool
Address string
}

type EndpointConfig struct {
Endpoints []EndpointSpec
Secure bool `yaml:"grpc-client-tls-secure"`
SkipVerify bool `yaml:"grpc-client-tls-skip-verify"`
Cert string `yaml:"grpc-client-tls-cert"`
Key string `yaml:"grpc-client-tls-key"`
CaCert string `yaml:"grpc-client-tls-ca"`
ServerName string `yaml:"grpc-client-server-name"`
Compression string `yaml:"grpc-compression"`
}

func (cfg EndpointConfig) grpcOpts(
logger log.Logger,
reg prometheus.Registerer,
tracer opentracing.Tracer,
) ([]grpc.DialOption, error) {
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, cfg.Secure, cfg.SkipVerify, cfg.Cert, cfg.Key, cfg.CaCert, cfg.ServerName)
if err != nil {
return nil, errors.Wrapf(err, "building gRPC client")
}
if cfg.Compression != compressionNone {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(cfg.Compression)))
}
return dialOpts, nil
}

type endpointConfigReloader struct {
mu sync.Mutex
cfg EndpointConfig
}

func (er *endpointConfigReloader) config() EndpointConfig {
er.mu.Lock()
defer er.mu.Unlock()

res := EndpointConfig{
Secure: er.cfg.Secure,
SkipVerify: er.cfg.SkipVerify,
Cert: er.cfg.Cert,
Key: er.cfg.Key,
CaCert: er.cfg.CaCert,
ServerName: er.cfg.ServerName,
Compression: er.cfg.Compression,
}
copy(res.Endpoints, er.cfg.Endpoints)
return res
}

func (er *endpointConfigReloader) parse(configFile *extflag.PathOrContent) (EndpointConfig, error) {
content, err := configFile.Content()
if err != nil {
return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path())
}
var cfg EndpointConfig
if err := yaml.Unmarshal(content, &cfg); err != nil {
return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path())
}
for _, ecfg := range cfg.Endpoints {
if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict {
return EndpointConfig{}, errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address)
}
if dns.IsDynamicNode(ecfg.Address) && ecfg.Group {
return EndpointConfig{}, errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address)
}
}

if !slices.Contains([]string{snappy.Name, compressionNone}, cfg.Compression) {
return EndpointConfig{}, errors.Newf("compression must be one of [%q, %q], got: %s", snappy.Name, compressionNone, cfg.Compression)
}

return EndpointConfig{}, nil
}

func newEndpointReloader(
g *run.Group,
logger log.Logger,
configFile *extflag.PathOrContent,
configReloadInterval time.Duration,
) (*endpointConfigReloader, error) {
res := &endpointConfigReloader{}

cfg, err := res.parse(configFile)
if err != nil {
return nil, errors.Wrapf(err, "unable to load config file")
}
res.cfg = cfg

reloadCtx, reloadCancel := context.WithCancel(context.Background())
g.Add(func() error {
return extkingpin.PathContentReloader(reloadCtx, configFile, logger, func() {
res.mu.Lock()
defer res.mu.Unlock()

level.Error(logger).Log("msg", "reloading endpoint config")
cfg, err := res.parse(configFile)
if err != nil {
level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err)
return
}
res.cfg = cfg
}, configReloadInterval)
}, func(err error) {
reloadCancel()
})

return res, nil
}

type configProvider interface {
config() EndpointConfig
}

type providerFunc func() EndpointConfig

func (f providerFunc) config() EndpointConfig { return f() }

// For querier
func setupEndpointset(
g *run.Group,
reg prometheus.Registerer,
logger log.Logger,
configFile *extflag.PathOrContent,
configReloadInterval time.Duration,
dnsSDResolver string,
dnsSDInterval time.Duration,
unhealthyTimeout time.Duration,
endpointTimeout time.Duration,
queryConnMetricLabels ...string,
) (*query.EndpointSet, error) {
reloader, err := newEndpointReloader(g, logger, configFile, configReloadInterval)
if err != nil {
return nil, errors.Wrapf(err, "unable to load config initially")
}
return setupEndpointsetWithProvider(
g,
reg,
logger,
reloader,
dnsSDResolver,
dnsSDInterval,
unhealthyTimeout,
endpointTimeout,
queryConnMetricLabels...,
)
}

// For ruler ( for some reason )
func setupStaticEndpointset(
g *run.Group,
reg prometheus.Registerer,
logger log.Logger,
endpointCfg EndpointConfig,
dnsSDResolver string,
dnsSDInterval time.Duration,
unhealthyTimeout time.Duration,
endpointTimeout time.Duration,
queryConnMetricLabels ...string,
) (*query.EndpointSet, error) {
provider := providerFunc(func() EndpointConfig {
return endpointCfg
})
return setupEndpointsetWithProvider(
g,
reg,
logger,
provider,
dnsSDResolver,
dnsSDInterval,
unhealthyTimeout,
endpointTimeout,
queryConnMetricLabels...,
)
}

func setupEndpointsetWithProvider(
g *run.Group,
reg prometheus.Registerer,
logger log.Logger,
cfgProvider configProvider,
dnsSDResolver string,
dnsSDInterval time.Duration,
unhealthyTimeout time.Duration,
endpointTimeout time.Duration,
queryConnMetricLabels ...string,
) (*query.EndpointSet, error) {
// Register resolver for the "thanos:///" scheme for endpoint-groups
dns.RegisterGRPCResolver(
dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg),
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
)
dnsEndpointProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg),
dns.ResolverType(dnsSDResolver),
)
duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_query_duplicated_store_addresses_total",
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
})
skippedReloading := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_query_skipped_reloading_total",
Help: "The number of times we skipped reloading because of errors during endpoint update.",
})

removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec {
set := make(map[string]*query.GRPCEndpointSpec)
for _, spec := range specs {
addr := spec.Addr()
if _, ok := set[addr]; ok {
level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr)
duplicatedStores.Inc()
}
set[addr] = spec
}
deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set))
for _, value := range set {
deduplicated = append(deduplicated, value)
}
return deduplicated
}

ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error {
ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval)
defer cancelUpdateIter()

endpointConfig := cfgProvider.config()

addresses := make([]string, 0, len(endpointConfig.Endpoints))
for _, ecfg := range endpointConfig.Endpoints {
if addr := ecfg.Address; dns.IsDynamicNode(addr) {
addresses = append(addresses, addr)
}
}
dnsEndpointProvider.Resolve(ctxUpdateIter, addresses)
return nil
})
}, func(error) {
cancelDNSUpdate()
})

endpointset := query.NewEndpointSet(time.Now, logger, reg, func() ([]*query.GRPCEndpointSpec, error) {
endpointConfig := cfgProvider.config()

grpcOpts, err := endpointConfig.grpcOpts(logger, reg, nil)
if err != nil {
skippedReloading.Inc()
level.Warn(logger).Log("msg", "Skipping reload of endpoints because of broken gRPC config", "err", err)
return nil, err
}

specs := make([]*query.GRPCEndpointSpec, 0)
// add static nodes
for _, ecfg := range endpointConfig.Endpoints {
strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address
if dns.IsDynamicNode(addr) {
continue
}
if group {
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(grpcOpts, extgrpc.EndpointGroupGRPCOpts()...)...))
} else {
specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, grpcOpts...))
}

}
// add dynamic nodes
var tmpSpecs []*query.GRPCEndpointSpec
for _, addr := range dnsEndpointProvider.Addresses() {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false, grpcOpts...))
}
return append(specs, removeDuplicateEndpointSpecs(tmpSpecs)...), nil
}, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...)

ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctxEndpointUpdate.Done(), func() error {
ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, 5*time.Second)
defer cancelIter()
endpointset.Update(ctxIter)
return nil
})
}, func(error) {
cancelEndpointUpdate()
})

return endpointset, nil
}
Loading

0 comments on commit 7b8c118

Please sign in to comment.