Skip to content

Commit

Permalink
Add healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Schneppenheim committed Feb 11, 2019
1 parent 1a9521b commit d38f19c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
5 changes: 5 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,8 @@ func getVersionedConsumerGroup(consumerGroupName string) *versionedConsumerGroup
}
return &versionedConsumerGroup{BaseName: baseName, Name: consumerGroupName, Version: uint8(parsedVersion)}
}

// IsHealthy returns a bool which indicates if the collector is in a healthy state or not
func (e *Collector) IsHealthy() bool {
return e.kafkaClient.IsHealthy()
}
15 changes: 15 additions & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func NewKafkaClient(opts *options.Options) (*Client, error) {

// GetTopicNames returns an array of topic names
func (c *Client) GetTopicNames() ([]string, error) {
err := c.client.RefreshMetadata()
if err != nil {
return nil, fmt.Errorf("Cannot refresh metadata. %s", err)
}

topicNames, err := c.consumer.Topics()
if err != nil {
return nil, fmt.Errorf("Cannot fetch topic names. %s", err)
Expand All @@ -107,6 +112,16 @@ func (c *Client) GetTopicNames() ([]string, error) {
return topicNames, nil
}

// IsHealthy returns true if communication with kafka brokers is fine
func (c *Client) IsHealthy() bool {
err := c.client.RefreshMetadata()
if err != nil {
return false
}

return true
}

// GetPartitionIDs returns an int32 array with all partitionIDs for a specific topic
func (c *Client) GetPartitionIDs(topicName string) ([]int32, error) {
partitionIDs, err := c.client.Partitions(topicName)
Expand Down
30 changes: 17 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,29 @@ func main() {
log.SetLevel(level)

log.Info("Starting kafka minion version%v", opts.Version)
startCollector(opts)
kafkaCollector, err := collector.NewKafkaCollector(opts)
if err != nil {
log.Fatal("Could not create kafka exporter. ", err)
}
prometheus.MustRegister(kafkaCollector)
log.Infof("Successfully started kafka exporter")

// Start listening on /metrics endpoint
http.Handle("/metrics", promhttp.Handler())
http.Handle("/healthcheck", healthcheck(kafkaCollector))
listenAddress := fmt.Sprintf(":%d", opts.Port)
log.Fatal(http.ListenAndServe(listenAddress, nil))
log.Infof("Listening on: '%s", listenAddress)

}

func startCollector(opts *options.Options) {
log.Infof("Starting kafka lag exporter v%v", opts.Version)

// Start kafka exporter
log.Infof("Starting kafka exporter")
kafkaCollector, err := collector.NewKafkaCollector(opts)
if err != nil {
log.Fatal("Could not create kafka exporter. ", err)
}
prometheus.MustRegister(kafkaCollector)
log.Infof("Successfully started kafka exporter")
func healthcheck(kafkaCollector *collector.Collector) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Debug("Healthcheck has been called")
isHealthy := kafkaCollector.IsHealthy()
if isHealthy {
w.Write([]byte("Status: Healthy"))
} else {
http.Error(w, "Healthcheck failed", http.StatusServiceUnavailable)
}
})
}

0 comments on commit d38f19c

Please sign in to comment.