Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
Reuse http client when getting metrics
Browse files Browse the repository at this point in the history
Simplifies context handling when using request with their own ctx
Enables debugging when fetching metrics
  • Loading branch information
snichme committed Feb 23, 2021
1 parent fca2c24 commit 7698548
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 88 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
WebRequestTimeout time.Duration = 5 * time.Second
DevMode bool = false
NoConsumers bool = false
VerboseLogging bool = false
)

func PrintConfig() {
Expand Down
5 changes: 5 additions & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
kafkaDir = flag.String("kafkadir", "/opt/kafka", "The directory where kafka lives")
devMode = flag.Bool("dev", false, "Devmode add more logging and reloadable assets")
noConsumers = flag.Bool("no-consumers", false, "Disable listing of consumer groups")
verbose = flag.Bool("verbose", false, "Verbose logging")
)

func Parse() {
Expand All @@ -27,5 +28,9 @@ func Parse() {
ZookeeperURL = strings.Split(*zk, ",")
DevMode = *devMode
NoConsumers = *noConsumers
VerboseLogging = *verbose
if DevMode {
VerboseLogging = true
}
PrintConfig()
}
14 changes: 12 additions & 2 deletions server/api/store_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"github.com/cloudkarafka/cloudkarafka-manager/config"
"github.com/cloudkarafka/cloudkarafka-manager/log"
"github.com/cloudkarafka/cloudkarafka-manager/store"
"github.com/cloudkarafka/cloudkarafka-manager/zookeeper"
)
Expand All @@ -22,15 +24,23 @@ func topics(fn zookeeper.PermissionFunc) store.TopicSlice {

func consumers(fn zookeeper.PermissionFunc) store.ConsumerSlice {
var (
consumers = store.Consumers()
i = 0
consumers = store.Consumers()
i = 0
beforeCount = len(consumers)
)

for i < len(consumers) {
if !fn(consumers[i].Name) {
consumers = append(consumers[:i], consumers[i+1:]...)
} else {
i += 1
}
}
if config.VerboseLogging {
log.Debug("store consumers", log.MapEntry{
"before_filter": beforeCount,
"after_filter": len(consumers),
})
}
return consumers
}
48 changes: 27 additions & 21 deletions store/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

Expand Down Expand Up @@ -77,7 +76,7 @@ func (g ConsumerGroups) Topics(group string) []map[string]interface{} {
t = map[string]interface{}{
"lag": member.Lag(),
"clients": map[string]struct{}{
member.ConsumerId: struct{}{},
member.ConsumerId: {},
},
}
}
Expand Down Expand Up @@ -140,7 +139,7 @@ func (g ConsumerGroups) MarshalJSON() ([]byte, error) {
res = make([]map[string]interface{}, len(g))
i = 0
)
for group, _ := range g {
for group := range g {
res[i] = map[string]interface{}{
"name": group,
"topics": g.Topics(group),
Expand All @@ -159,32 +158,39 @@ func FetchConsumerGroups(ctx context.Context, out chan ConsumerGroups) {
v ConsumerGroups
r *http.Response
)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
ctx, cancel := context.WithTimeout(ctx, Timeout)
defer cancel()
host := config.BrokerUrls.Rand()
if host == "" {
log.Error("fetch_consumer_groups", log.StringEntry("No brokers to request consumer group metrics from"))
return
}
url := fmt.Sprintf("%s/consumer-groups", host)
select {
case <-ctx.Done():
log.Warn("fetch_consumer_groups", log.ErrorEntry{ctx.Err()})
if config.VerboseLogging {
log.Debug("fetch_consumer_groups", log.MapEntry{"url": url})
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
r, err = httpClient.Do(req)
if err != nil {
log.Warn("fetch_consumer_groups req_failed", log.ErrorEntry{Err: err})
return
default:
r, err = http.Get(url)
if err != nil {
log.Warn("fetch_consumer_groups", log.ErrorEntry{err})
return
}
err = json.NewDecoder(r.Body).Decode(&v)
if err != nil {
b, err := ioutil.ReadAll(r.Body)
if err == nil {
log.Error("fetch_consumer_groups", log.MapEntry{"err": "could_not_parse", "body": string(b)})
}
return
}
defer r.Body.Close()
if r.StatusCode != 200 {
log.Warn("fetch_consumer_groups status_code", log.MapEntry{"url": req.URL, "status": r.StatusCode})
return
}

err = json.NewDecoder(r.Body).Decode(&v)
if err != nil {
log.Warn("fetch_consumer_groups parse_failed", log.ErrorEntry{Err: err})
return
}
if config.VerboseLogging {
log_data := log.MapEntry{
"num_groups": len(v),
}
out <- v
log.Debug("fetch_consumer_groups", log_data)
}
out <- v
}
86 changes: 41 additions & 45 deletions store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ import (
var (
jmxCache1Min = cache.New(1*time.Minute, 1*time.Minute)
plainCache = cache.New(1*time.Hour, 1*time.Hour)

tr = &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
}
httpClient = &http.Client{
Transport: tr,
}
)

type MetricRequest struct {
Expand Down Expand Up @@ -43,64 +51,50 @@ type Metric struct {
Error string `json:"-"`
}

func doRequest(ctx context.Context, url string) ([]Metric, error) {
var v []Metric
r, err := http.Get(url)
if err != nil {
return nil, err
}
defer r.Body.Close()
//log.Debug("bean_request", log.MapEntry{"url": url, "status": r.StatusCode})
if r.StatusCode != 200 {
log.Warn("bean_request", log.MapEntry{"url": url, "status": r.StatusCode})
return nil, fmt.Errorf("URL %s returned %d", url, r.StatusCode)
}
if err = json.NewDecoder(r.Body).Decode(&v); err != nil {
return nil, err
}
//if len(v) == 0 {
// log.Debug("bean_request", log.MapEntry{"body": "[]", "url": url})
//}
return v, nil
}

func GetMetrics(ctx context.Context, query MetricRequest) ([]Metric, error) {
switch query.Attr {
case "OneMinuteRate":
if r, found := jmxCache1Min.Get(query.String()); found {
log.Info("GetMetrics cached", log.MapEntry{"Bean": query.Bean.String(), "Attr": query.Attr})
return r.([]Metric), nil
}
case "TimeSerie", "Count", "Value":
// Never cache
default:
log.Info("GetMetrics nocache", log.MapEntry{"Attr": query.Attr})
}
host := config.BrokerUrls.HttpUrl(query.BrokerId)
if host == "" {
return nil, fmt.Errorf("Broker %d not available", query.BrokerId)
}
url := fmt.Sprintf("%s/jmx?bean=%s&attrs=%s", host, query.Bean, query.Attr)
select {
case <-ctx.Done():
return []Metric{}, ctx.Err()
default:
v, err := doRequest(ctx, url)
if err == nil {
for i, _ := range v {
v[i].Broker = query.BrokerId
}
}
switch query.Attr {
case "OneMinuteRate":
jmxCache1Min.Set(query.String(), v, cache.DefaultExpiration)
}
return v, err
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}

r, err := httpClient.Do(req)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode != 200 {
log.Warn("bean_request", log.MapEntry{"url": req.URL, "status": r.StatusCode})
return nil, fmt.Errorf("URL %s returned %d", req.URL, r.StatusCode)
}

var v []Metric
if err = json.NewDecoder(r.Body).Decode(&v); err != nil {
return nil, err
}

for i := range v {
v[i].Broker = query.BrokerId
}
switch query.Attr {
case "OneMinuteRate":
jmxCache1Min.Set(query.String(), v, cache.DefaultExpiration)
}
return v, nil
}

func getSimpleValue(url string) (string, error) {
r, err := http.Get(url)
func getSimpleValue(req *http.Request) (string, error) {
r, err := httpClient.Do(req)
if err != nil {
return "", err
}
Expand All @@ -116,7 +110,8 @@ func KafkaVersion(brokerId int) (string, error) {
if r, found := plainCache.Get(key); found {
return r.(string), nil
}
res, err := getSimpleValue(fmt.Sprintf("%s/kafka-version", config.BrokerUrls.HttpUrl(brokerId)))
req, err := http.NewRequest("GET", fmt.Sprintf("%s/kafka-version", config.BrokerUrls.HttpUrl(brokerId)), nil)
res, err := getSimpleValue(req)
if err != nil {
return "", nil
}
Expand All @@ -129,7 +124,8 @@ func PluginVersion(brokerId int) (string, error) {
if r, found := plainCache.Get(key); found {
return r.(string), nil
}
res, err := getSimpleValue(fmt.Sprintf("%s/plugin-version", config.BrokerUrls.HttpUrl(brokerId)))
req, err := http.NewRequest("GET", fmt.Sprintf("%s/plugin-version", config.BrokerUrls.HttpUrl(brokerId)), nil)
res, err := getSimpleValue(req)
if err != nil {
return "", nil
}
Expand Down
28 changes: 14 additions & 14 deletions store/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,36 @@ type storage struct {
consumers ConsumerGroups
}

var store = storage{
var store = &storage{
brokers: make(brokers),
topics: make(topics),
consumers: make(ConsumerGroups),
}

func (me storage) DeleteTopic(name string) {
func (me *storage) DeleteTopic(name string) {
delete(me.topics, name)
}

func (me storage) UpdateBroker(b broker) {
func (me *storage) UpdateBroker(b broker) {
me.Lock()
defer me.Unlock()
me.brokers[strconv.Itoa(b.Id)] = b
}

func (me storage) Brokers() brokers {
func (me *storage) Brokers() brokers {
me.RLock()
defer me.RUnlock()
return me.brokers
}

func (me storage) Broker(id string) (broker, bool) {
func (me *storage) Broker(id string) (broker, bool) {
me.RLock()
defer me.RUnlock()
b, ok := me.brokers[id]
return b, ok
}

func (me storage) Topics() TopicSlice {
func (me *storage) Topics() TopicSlice {
me.RLock()
defer me.RUnlock()
var (
Expand All @@ -61,13 +61,13 @@ func (me storage) Topics() TopicSlice {
}
return topics
}
func (me storage) Topic(name string) (topic, bool) {
func (me *storage) Topic(name string) (topic, bool) {
me.RLock()
defer me.RUnlock()
t, ok := me.topics[name]
return t, ok
}
func (me storage) UpdateTopic(t topic) {
func (me *storage) UpdateTopic(t topic) {
me.Lock()
defer me.Unlock()
me.topics[string(t.Name)] = t
Expand Down Expand Up @@ -105,7 +105,7 @@ func (me *storage) UpdateTopicMetric(m Metric) {
me.topics[m.Topic] = t
}

func (me storage) BrokerTopicStats(brokerId int) (int, int, string) {
func (me *storage) BrokerTopicStats(brokerId int) (int, int, string) {
me.RLock()
defer me.RUnlock()
var (
Expand Down Expand Up @@ -146,7 +146,7 @@ func (me *storage) UpdateBrokerMetrics(m Metric) {
b.ISRExpand.Add(int(m.Value))
}
}
func (me storage) SumBrokerSeries(metric string) TimeSerie {
func (me *storage) SumBrokerSeries(metric string) TimeSerie {
me.RLock()
defer me.RUnlock()
var (
Expand Down Expand Up @@ -174,7 +174,7 @@ func (me storage) SumBrokerSeries(metric string) TimeSerie {
return NewSumTimeSerie(series)
}

func (me storage) Consumers() ConsumerSlice {
func (me *storage) Consumers() ConsumerSlice {
if config.NoConsumers {
return ConsumerSlice{}
}
Expand All @@ -184,15 +184,15 @@ func (me storage) Consumers() ConsumerSlice {
cs = make(ConsumerSlice, len(me.consumers))
i = 0
)
for c, _ := range me.consumers {
for c := range me.consumers {
consumer, _ := me.Consumer(c)
cs[i] = consumer
i += 1
}
return cs
}

func (me storage) Consumer(name string) (ConsumerGroup, bool) {
func (me *storage) Consumer(name string) (ConsumerGroup, bool) {
me.RLock()
defer me.RUnlock()
members, ok := me.consumers[name]
Expand All @@ -206,7 +206,7 @@ func (me storage) Consumer(name string) (ConsumerGroup, bool) {
return cg, ok
}

func (me storage) UpdateConsumers(cgs ConsumerGroups) {
func (me *storage) UpdateConsumers(cgs ConsumerGroups) {
me.Lock()
defer me.Unlock()
for name, cg := range cgs {
Expand Down
Loading

0 comments on commit 7698548

Please sign in to comment.