Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: auto discovery for consul #1045

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 57 additions & 115 deletions inputs/prometheus/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"log"
"net/url"
"strings"
"sync"
"text/template"
"time"

Expand All @@ -23,7 +22,6 @@
Agent string `toml:"agent"`
QueryInterval config.Duration `toml:"query_interval"`
Queries []*ConsulQuery `toml:"query"`
Catalog *api.Catalog `toml:"-"`
}

// One Consul service discovery query
Expand Down Expand Up @@ -51,152 +49,98 @@
lastQueryFailed bool
}

func (ins *Instance) InitConsulClient() error {
func (ins *Instance) InitConsulClient(ctx context.Context) error {
consulAPIConfig := api.DefaultConfig()
if ins.ConsulConfig.Agent != "" {
consulAPIConfig.Address = ins.ConsulConfig.Agent
}
consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("cannot connect to the Consul agent: %w", err)
}

i := 0
// Parse the template for metrics URL, drop queries with template parse errors
for i := range ins.ConsulConfig.Queries {
for _, q := range ins.ConsulConfig.Queries {
serviceURLTemplate, err := template.New("URL").Parse(ins.ConsulConfig.Queries[i].ServiceURL)
if err != nil {
return fmt.Errorf("failed to parse the Consul query URL template (%s): %s", ins.ConsulConfig.Queries[i].ServiceURL, err)
}
ins.ConsulConfig.Queries[i].serviceURLTemplate = serviceURLTemplate
q.serviceURLTemplate = serviceURLTemplate

// Allow to use join function in tags
templateFunctions := template.FuncMap{"join": strings.Join}
// Parse the tag value templates
ins.ConsulConfig.Queries[i].serviceExtraTagsTemplate = make(map[string]*template.Template)
q.serviceExtraTagsTemplate = make(map[string]*template.Template)
for tagName, tagTemplateString := range ins.ConsulConfig.Queries[i].ServiceExtraTags {
tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString)
if err != nil {
return fmt.Errorf("failed to parse the Consul query Extra Tag template (%s): %s", tagTemplateString, err)
log.Println("failed to parse the Consul query Extra Tag template (%s): %s", tagTemplateString, err)

Check failure on line 78 in inputs/prometheus/consul.go

View workflow job for this annotation

GitHub Actions / Code Analysis

log.Println call has possible Printf formatting directive %s
continue
}
ins.ConsulConfig.Queries[i].serviceExtraTagsTemplate[tagName] = tagTemplate
q.serviceExtraTagsTemplate[tagName] = tagTemplate
}
ins.ConsulConfig.Queries[i] = q
i++
}

// Prevent memory leak by erasing truncated values
// for j := i; j < len(ins.ConsulConfig.Queries); j++ {
// ins.ConsulConfig.Queries[j] = nil
// }
// ins.ConsulConfig.Queries = ins.ConsulConfig.Queries[:i]

consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("failed to connect the Consul agent(%s): %v", consulAPIConfig.Address, err)
}

ins.ConsulConfig.Catalog = consul.Catalog()

return nil
}

func (ins *Instance) UrlsFromConsul(ctx context.Context) ([]ScrapeUrl, error) {
if !ins.ConsulConfig.Enabled {
return []ScrapeUrl{}, nil
for j := i; j < len(ins.ConsulConfig.Queries); j++ {
ins.ConsulConfig.Queries[j] = nil
}
ins.ConsulConfig.Queries = ins.ConsulConfig.Queries[:i]

if ins.DebugMod {
log.Println("D! get urls from consul:", ins.ConsulConfig.Agent)
}

urlset := map[string]struct{}{}
var returls []ScrapeUrl

for _, q := range ins.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}
catalog := consul.Catalog()

// Request services from Consul
consulServices, _, err := ins.ConsulConfig.Catalog.Service(q.ServiceName, q.ServiceTag, &queryOptions)
ins.wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurence
var refreshFailed = false
defer ins.wg.Done()
err := ins.refreshConsulServices(catalog)
if err != nil {
return nil, err
refreshFailed = true
log.Printf("Unable to refreh Consul services: %v", err)
}

if len(consulServices) == 0 {
if ins.DebugMod {
log.Println("D! query consul did not find any instances, service:", q.ServiceName, " tag:", q.ServiceTag)
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)):
err := ins.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refreh Consul services: %v", err)
if refreshFailed {
log.Println("E!", message)
} else {
log.Println("W!", message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
log.Println("Successfully refreshed Consul services after previous errors")
}
}
continue
}
}()

if ins.DebugMod {
log.Println("D! query consul found", len(consulServices), "instances, service:", q.ServiceName, " tag:", q.ServiceTag)
}

for _, consulService := range consulServices {
su, err := ins.getConsulServiceURL(q, consulService)
if err != nil {
return nil, fmt.Errorf("unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
}

if _, has := urlset[su.URL.String()]; has {
continue
}

urlset[su.URL.String()] = struct{}{}
returls = append(returls, *su)
}
}
return nil
}

if ins.firstRun {
var wg sync.WaitGroup
consulAPIConfig := api.DefaultConfig()
if ins.ConsulConfig.Agent != "" {
consulAPIConfig.Address = ins.ConsulConfig.Agent
}
func (ins *Instance) UrlsFromConsul() ([]*ScrapeUrl, error) {
ins.lock.Lock()
defer ins.lock.Unlock()

consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return []ScrapeUrl{}, fmt.Errorf("cannot connect to the Consul agent: %w", err)
}
catalog := consul.Catalog()

wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurrence
var refreshFailed = false
defer wg.Done()
err := ins.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
log.Printf("Unable to refresh Consul services: %v\n", err)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)):
err := ins.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refresh Consul services: %v", err)
if refreshFailed {
log.Println("E!", message)
} else {
log.Println("W!", message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
log.Println("Successfully refreshed Consul services after previous errors")
}
}
}
}()
ins.firstRun = false
wg.Wait()
urls := make([]*ScrapeUrl, 0, len(ins.consulServices))
for _, u := range ins.consulServices {
urls = append(urls, u)
}

return returls, nil
return urls, nil
}

func (ins *Instance) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]ScrapeUrl)
consulServiceURLs := make(map[string]*ScrapeUrl)

if ins.DebugMod {
log.Println("Refreshing Consul services")
Expand Down Expand Up @@ -240,14 +184,12 @@
}
q.lastQueryFailed = false
log.Printf("Adding scrape URL from Consul for Service (%s, %s): %s\n", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
consulServiceURLs[uaa.URL.String()] = uaa
}
}

ins.lock.Lock()
for _, u := range consulServiceURLs {
ins.URLs = append(ins.URLs, u.URL.String())
}
ins.consulServices = consulServiceURLs
ins.lock.Unlock()

return nil
Expand Down
25 changes: 17 additions & 8 deletions inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ type Instance struct {
ignoreLabelKeysFilter filter.Filter
cancel context.CancelFunc
lock sync.Mutex
firstRun bool
tls.ClientConfig
client *http.Client

wg sync.WaitGroup
consulServices map[string]*ScrapeUrl
}

func (ins *Instance) Empty() bool {
Expand All @@ -67,8 +69,11 @@ func (ins *Instance) Init() error {
return types.ErrInstancesEmpty
}

var ctx context.Context
ctx, ins.cancel = context.WithCancel(context.Background())

if ins.ConsulConfig.Enabled && len(ins.ConsulConfig.Queries) > 0 {
if err := ins.InitConsulClient(); err != nil {
if err := ins.InitConsulClient(ctx); err != nil {
return err
}
}
Expand All @@ -80,7 +85,6 @@ func (ins *Instance) Init() error {
if ins.Timeout <= 0 {
ins.Timeout = config.Duration(time.Second * 3)
}
ins.firstRun = true

client, err := ins.createHTTPClient()
if err != nil {
Expand Down Expand Up @@ -156,8 +160,13 @@ func (p *Prometheus) GetInstances() []inputs.Instance {
return ret
}

func (p *Prometheus) Drop() {
for _, ins := range p.Instances {
ins.Drop()
}
}

func (ins *Instance) Gather(slist *types.SampleList) {
var ctx context.Context
urlwg := new(sync.WaitGroup)
defer urlwg.Wait()

Expand All @@ -170,11 +179,10 @@ func (ins *Instance) Gather(slist *types.SampleList) {

urlwg.Add(1)

go ins.gatherUrl(urlwg, slist, ScrapeUrl{URL: u, Tags: map[string]string{}})
go ins.gatherUrl(urlwg, slist, &ScrapeUrl{URL: u, Tags: map[string]string{}})
}

ctx, ins.cancel = context.WithCancel(context.Background())
urls, err := ins.UrlsFromConsul(ctx)
urls, err := ins.UrlsFromConsul()
if err != nil {
log.Println("E! failed to query urls from consul:", err)
return
Expand All @@ -186,7 +194,7 @@ func (ins *Instance) Gather(slist *types.SampleList) {
}
}

func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, uri ScrapeUrl) {
func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, uri *ScrapeUrl) {
defer urlwg.Done()

u := uri.URL
Expand Down Expand Up @@ -271,4 +279,5 @@ func (ins *Instance) setHeaders(req *http.Request) {

func (ins *Instance) Drop() {
ins.cancel()
ins.wg.Wait()
}
Loading