From d38f19c5de8873c1398c3a2ea0e9910c2502ed45 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim Date: Mon, 11 Feb 2019 15:18:05 +0100 Subject: [PATCH] Add healthcheck --- collector/collector.go | 5 +++++ kafka/client.go | 15 +++++++++++++++ main.go | 30 +++++++++++++++++------------- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index 1fbb073..f5ef535 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -177,3 +177,8 @@ func getVersionedConsumerGroup(consumerGroupName string) *versionedConsumerGroup } return &versionedConsumerGroup{BaseName: baseName, Name: consumerGroupName, Version: uint8(parsedVersion)} } + +// IsHealthy returns a bool which indicates if the collector is in a healthy state or not +func (e *Collector) IsHealthy() bool { + return e.kafkaClient.IsHealthy() +} diff --git a/kafka/client.go b/kafka/client.go index 168500a..832c956 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -99,6 +99,11 @@ func NewKafkaClient(opts *options.Options) (*Client, error) { // GetTopicNames returns an array of topic names func (c *Client) GetTopicNames() ([]string, error) { + err := c.client.RefreshMetadata() + if err != nil { + return nil, fmt.Errorf("Cannot refresh metadata. %s", err) + } + topicNames, err := c.consumer.Topics() if err != nil { return nil, fmt.Errorf("Cannot fetch topic names. %s", err) @@ -107,6 +112,16 @@ func (c *Client) GetTopicNames() ([]string, error) { return topicNames, nil } +// IsHealthy returns true if communication with kafka brokers is fine +func (c *Client) IsHealthy() bool { + err := c.client.RefreshMetadata() + if err != nil { + return false + } + + return true +} + // GetPartitionIDs returns an int32 array with all partitionIDs for a specific topic func (c *Client) GetPartitionIDs(topicName string) ([]int32, error) { partitionIDs, err := c.client.Partitions(topicName) diff --git a/main.go b/main.go index c5ffe80..9adb205 100644 --- a/main.go +++ b/main.go @@ -33,25 +33,29 @@ func main() { log.SetLevel(level) log.Info("Starting kafka minion version%v", opts.Version) - startCollector(opts) + kafkaCollector, err := collector.NewKafkaCollector(opts) + if err != nil { + log.Fatal("Could not create kafka exporter. ", err) + } + prometheus.MustRegister(kafkaCollector) + log.Infof("Successfully started kafka exporter") // Start listening on /metrics endpoint http.Handle("/metrics", promhttp.Handler()) + http.Handle("/healthcheck", healthcheck(kafkaCollector)) listenAddress := fmt.Sprintf(":%d", opts.Port) log.Fatal(http.ListenAndServe(listenAddress, nil)) log.Infof("Listening on: '%s", listenAddress) - } -func startCollector(opts *options.Options) { - log.Infof("Starting kafka lag exporter v%v", opts.Version) - - // Start kafka exporter - log.Infof("Starting kafka exporter") - kafkaCollector, err := collector.NewKafkaCollector(opts) - if err != nil { - log.Fatal("Could not create kafka exporter. ", err) - } - prometheus.MustRegister(kafkaCollector) - log.Infof("Successfully started kafka exporter") +func healthcheck(kafkaCollector *collector.Collector) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log.Debug("Healthcheck has been called") + isHealthy := kafkaCollector.IsHealthy() + if isHealthy { + w.Write([]byte("Status: Healthy")) + } else { + http.Error(w, "Healthcheck failed", http.StatusServiceUnavailable) + } + }) }