From ff21b78c82d7dc2e42bbcf2e350b68db5588aca3 Mon Sep 17 00:00:00 2001 From: Tomasz Janiszewski Date: Tue, 15 Dec 2015 12:20:12 +0100 Subject: [PATCH] Add metrics for resposnes and marathon --- config/config.go | 2 +- consul/agents.go | 10 ++++---- consul/consul.go | 10 ++++++++ main.go | 2 +- marathon/marathon.go | 24 ++++++++++++-------- metrics/metrics.go | 13 ++++++----- web/event_handler.go | 24 +++++++++----------- web/event_handler_test.go | 48 +++++++++++++++++++-------------------- 8 files changed, 73 insertions(+), 60 deletions(-) diff --git a/config/config.go b/config/config.go index 8b7b577..a418b40 100644 --- a/config/config.go +++ b/config/config.go @@ -104,7 +104,7 @@ func (config *Config) loadConfigFromFile() error { func (config *Config) setLogLevel() error { level, err := log.ParseLevel(config.Log.Level) if err != nil { - log.WithError(err).WithField("level", config.Log.Level).Error("bad level") + log.WithError(err).WithField("Level", config.Log.Level).Error("Bad level") return err } log.SetLevel(level) diff --git a/consul/agents.go b/consul/agents.go index 28b1c7b..376161e 100644 --- a/consul/agents.go +++ b/consul/agents.go @@ -73,20 +73,20 @@ func (a *ConcurrentAgents) createAgent(host string) (*consulapi.Client, error) { config := consulapi.DefaultConfig() config.Address = fmt.Sprintf("%s:%s", host, a.config.Port) - log.Debugf("consul address: %s", config.Address) + log.Debugf("Consul address: %s", config.Address) if a.config.Token != "" { - log.Debugf("setting token to %s", a.config.Token) + log.Debugf("Setting token to %s", a.config.Token) config.Token = a.config.Token } if a.config.SslEnabled { - log.Debugf("enabling SSL") + log.Debugf("Enabling SSL") config.Scheme = "https" } if !a.config.SslVerify { - log.Debugf("disabled SSL verification") + log.Debugf("Disabled SSL verification") config.HttpClient.Transport = &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, @@ -95,7 +95,7 @@ func (a *ConcurrentAgents) createAgent(host string) (*consulapi.Client, error) { } if a.config.Auth.Enabled { - log.Debugf("setting basic auth") + log.Debugf("Setting basic auth") config.HttpAuth = &consulapi.HttpBasicAuth{ Username: a.config.Auth.Username, Password: a.config.Auth.Password, diff --git a/consul/consul.go b/consul/consul.go index ae0cdd8..10359bc 100644 --- a/consul/consul.go +++ b/consul/consul.go @@ -67,6 +67,11 @@ func contains(slice []string, search string) bool { func (c *Consul) Register(service *consulapi.AgentServiceRegistration) error { var err error metrics.Time("consul.register", func() { err = c.register(service) }) + if err != nil { + metrics.Mark("consul.register.error") + } else { + metrics.Mark("consul.register.success") + } return err } @@ -94,6 +99,11 @@ func (c *Consul) register(service *consulapi.AgentServiceRegistration) error { func (c *Consul) Deregister(serviceId string, agentHost string) error { var err error metrics.Time("consul.deregister", func() { err = c.deregister(serviceId, agentHost) }) + if err != nil { + metrics.Mark("consul.deregister.error") + } else { + metrics.Mark("consul.deregister.success") + } return err } diff --git a/main.go b/main.go index f0a88a7..e22d326 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,6 @@ func main() { http.HandleFunc("/health", web.HealthHandler) http.HandleFunc("/events", web.NewEventHandler(service, remote).Handle) - log.WithField("port", config.Web.Listen).Info("Listening") + log.WithField("Port", config.Web.Listen).Info("Listening") log.Fatal(http.ListenAndServe(config.Web.Listen, nil)) } diff --git a/marathon/marathon.go b/marathon/marathon.go index f86898b..5bb015d 100644 --- a/marathon/marathon.go +++ b/marathon/marathon.go @@ -5,6 +5,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" "github.com/allegro/marathon-consul/apps" + "github.com/allegro/marathon-consul/metrics" "github.com/allegro/marathon-consul/tasks" "github.com/sethgrid/pester" "io/ioutil" @@ -47,7 +48,7 @@ func New(config Config) (*Marathon, error) { } func (m Marathon) App(appId string) (*apps.App, error) { - log.WithField("location", m.Location).Debug("asking Marathon for " + appId) + log.WithField("Location", m.Location).Debug("Asking Marathon for " + appId) body, err := m.get(m.urlWithQuery("/v2/apps/"+appId, "embed=apps.tasks")) if err != nil { @@ -58,7 +59,7 @@ func (m Marathon) App(appId string) (*apps.App, error) { } func (m Marathon) Apps() ([]*apps.App, error) { - log.WithField("location", m.Location).Debug("asking Marathon for apps") + log.WithField("Location", m.Location).Debug("Asking Marathon for apps") body, err := m.get(m.urlWithQuery("/v2/apps", "embed=apps.tasks")) if err != nil { return nil, err @@ -69,7 +70,7 @@ func (m Marathon) Apps() ([]*apps.App, error) { func (m Marathon) Tasks(app string) ([]*tasks.Task, error) { log.WithFields(log.Fields{ - "location": m.Location, + "Location": m.Location, "Id": app, }).Debug("asking Marathon for tasks") @@ -92,17 +93,21 @@ func (m Marathon) get(url string) ([]byte, error) { request.Header.Add("Accept", "application/json") log.WithFields(log.Fields{ - "uri": request.URL.RequestURI(), - "location": m.Location, - "protocol": m.Protocol, + "Uri": request.URL.RequestURI(), + "Location": m.Location, + "Protocol": m.Protocol, }).Debug("Sending GET request to marathon") - response, err := client.Do(request) + var response *http.Response + metrics.Time("marathon.get", func() { response, err = client.Do(request) }) if err != nil { + metrics.Mark("marathon.get.error") m.logHTTPError(response, err) return nil, err } if response.StatusCode != 200 { + metrics.Mark("marathon.get.error") + metrics.Mark(fmt.Sprintf("marathon.get.error.%d", response.StatusCode)) err = fmt.Errorf("Expected 200 but got %d for %s", response.StatusCode, response.Request.URL.Path) m.logHTTPError(response, err) return nil, err @@ -118,8 +123,8 @@ func (m Marathon) logHTTPError(resp *http.Response, err error) { } log.WithFields(log.Fields{ - "location": m.Location, - "protocol": m.Protocol, + "Location": m.Location, + "Protocol": m.Protocol, "statusCode": statusCode, }).Error(err) } @@ -136,7 +141,6 @@ func (m Marathon) urlWithQuery(path string, query string) string { Path: path, RawQuery: query, } - return marathon.String() } diff --git a/metrics/metrics.go b/metrics/metrics.go index ee53a8d..4ecc5bf 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -4,7 +4,7 @@ package metrics import ( "errors" "fmt" - "log" + logger "log" "net" "net/url" "os" @@ -12,6 +12,7 @@ import ( "strings" "time" + log "github.com/Sirupsen/logrus" "github.com/cyberdelia/go-metrics-graphite" "github.com/rcrowley/go-metrics" ) @@ -40,17 +41,17 @@ func Init(cfg Config) error { switch cfg.Target { case "stdout": - log.Printf("[INFO] Sending metrics to stdout") + log.Info("Sending metrics to stdout") return initStdout(cfg.Interval) case "graphite": if cfg.Addr == "" { return errors.New("metrics: graphite addr missing") } - log.Printf("[INFO] Sending metrics to Graphite on %s as %q", cfg.Addr, pfx) + log.Infof("Sending metrics to Graphite on %s as %q", cfg.Addr, pfx) return initGraphite(cfg.Addr, cfg.Interval) case "": - log.Printf("[INFO] Metrics disabled") + log.Infof("Metrics disabled") default: return fmt.Errorf("Invalid metrics target %s", cfg.Target) } @@ -81,7 +82,7 @@ var hostname = os.Hostname func defaultPrefix() (string, error) { host, err := hostname() if err != nil { - log.Printf("[FATAL] %s", err.Error()) + log.WithError(err).Error("Problem with detecting prefix") return "", err } exe := filepath.Base(os.Args[0]) @@ -89,7 +90,7 @@ func defaultPrefix() (string, error) { } func initStdout(interval time.Duration) error { - logger := log.New(os.Stderr, "localhost: ", log.Lmicroseconds) + logger := logger.New(os.Stderr, "localhost: ", logger.Lmicroseconds) go metrics.Log(metrics.DefaultRegistry, interval, logger) return nil } diff --git a/web/event_handler.go b/web/event_handler.go index bf990db..3b455ff 100644 --- a/web/event_handler.go +++ b/web/event_handler.go @@ -26,8 +26,10 @@ func NewEventHandler(service service.ConsulServices, marathon marathon.Marathone } func (fh *EventHandler) Handle(w http.ResponseWriter, r *http.Request) { + metrics.Time("events.response", func() { fh.handle(w, r) }) +} - fh.markRequest() +func (fh *EventHandler) handle(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { @@ -35,7 +37,7 @@ func (fh *EventHandler) Handle(w http.ResponseWriter, r *http.Request) { fh.handleBadRequest(err, w) return } - log.Debug(string(body)) + log.WithField("Body", string(body)).Debug("Received") eventType, err := events.EventType(body) if err != nil { @@ -45,7 +47,7 @@ func (fh *EventHandler) Handle(w http.ResponseWriter, r *http.Request) { fh.markEventRequest(eventType) - log.WithField("eventType", eventType).Debug("Recieved event") + log.WithField("EventType", eventType).Debug("Received event") switch eventType { case "app_terminated_event": @@ -55,10 +57,10 @@ func (fh *EventHandler) Handle(w http.ResponseWriter, r *http.Request) { case "health_status_changed_event": fh.handleHealthStatusEvent(w, body) default: - fh.handleBadRequest(fmt.Errorf("cannot handle %s", eventType), w) + fh.handleBadRequest(fmt.Errorf("Cannot handle %s", eventType), w) } - fh.markResponse() + fh.markSuccess() } func (fh *EventHandler) handleTerminationEvent(w http.ResponseWriter, body []byte) { @@ -209,27 +211,23 @@ func replaceTaskIdWithId(body []byte) []byte { return bytes.Replace(body, []byte("taskId"), []byte("id"), -1) } -func (fh *EventHandler) markRequest() { - metrics.Mark("events.requests") -} - func (fh *EventHandler) markEventRequest(event string) { metrics.Mark("events.requests." + event) } -func (fh *EventHandler) markResponse() { - metrics.Mark("events.response") +func (fh *EventHandler) markSuccess() { + metrics.Mark("events.response.success") } func (fh *EventHandler) handleError(err error, w http.ResponseWriter) { - metrics.Mark("events.error") + metrics.Mark("events.response.error.500") w.WriteHeader(500) log.WithError(err).Debug("Returning 500 due to error") fmt.Fprintln(w, err.Error()) } func (fh *EventHandler) handleBadRequest(err error, w http.ResponseWriter) { - metrics.Mark("events.bad_request") + metrics.Mark("events.response.error.400") w.WriteHeader(400) log.WithError(err).Debug("Returning 400 due to malformed request") fmt.Fprintln(w, err.Error()) diff --git a/web/event_handler_test.go b/web/event_handler_test.go index 973ad05..c3212f0 100644 --- a/web/event_handler_test.go +++ b/web/event_handler_test.go @@ -18,20 +18,20 @@ import ( func TestForwardHandler_NotHandleUnknownEventType(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":"test_event"}`))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) // then assert.Equal(t, 400, recorder.Code) - assert.Equal(t, "cannot handle test_event\n", recorder.Body.String()) + assert.Equal(t, "Cannot handle test_event\n", recorder.Body.String()) } func TestForwardHandler_HandleRadderError(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) req, _ := http.NewRequest("POST", "/events", BadReader{}) // when recorder := httptest.NewRecorder() @@ -44,7 +44,7 @@ func TestForwardHandler_HandleRadderError(t *testing.T) { func TestForwardHandler_HandleEmptyBody(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte{})) // when recorder := httptest.NewRecorder() @@ -57,7 +57,7 @@ func TestForwardHandler_HandleEmptyBody(t *testing.T) { func TestForwardHandler_NotHandleMalformedEventType(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{eventType:"test_event"}`))) // when recorder := httptest.NewRecorder() @@ -70,7 +70,7 @@ func TestForwardHandler_NotHandleMalformedEventType(t *testing.T) { func TestForwardHandler_HandleMalformedEventType(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{eventType:"test_event"}`))) // when recorder := httptest.NewRecorder() @@ -83,7 +83,7 @@ func TestForwardHandler_HandleMalformedEventType(t *testing.T) { func TestForwardHandler_NotHandleInvalidEventType(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":[1,2]}`))) // when recorder := httptest.NewRecorder() @@ -102,7 +102,7 @@ func TestForwardHandler_HandleAppTerminatedEvent(t *testing.T) { for _, task := range app.Tasks { service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels)) } - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body, _ := json.Marshal(events.AppTerminatedEvent{ Type: "app_terminated_event", AppID: app.ID, @@ -121,7 +121,7 @@ func TestForwardHandler_HandleAppTerminatedEvent(t *testing.T) { func TestForwardHandler_HandleAppInvalidBody(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) body := `{"type": "app_terminated_event", "appID": 123}` req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -135,7 +135,7 @@ func TestForwardHandler_HandleAppInvalidBody(t *testing.T) { func TestForwardHandler_HandleAppTerminatedEventInvalidBody(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) body := `{"appId":"/python/simple","eventType":"app_terminated_event","timestamp":2015}` req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -151,7 +151,7 @@ func TestForwardHandler_HandleAppTerminatedEventForUnknownApp(t *testing.T) { // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) - handler := EventHandler{nil, marathon} + handler := NewEventHandler(nil, marathon) body := `{"appId":"/unknown/app","eventType":"app_terminated_event","timestamp":"2015-12-07T09:02:49.934Z"}` req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -173,7 +173,7 @@ func TestForwardHandler_HandleAppTerminatedEventWithProblemsOnDeregistering(t *t } service.ErrorServices["/test/app.1"] = fmt.Errorf("Cannot deregister service") service.ErrorServices["/test/app.2"] = fmt.Errorf("Cannot deregister service") - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body, _ := json.Marshal(events.AppTerminatedEvent{ Type: "app_terminated_event", AppID: app.ID, @@ -194,7 +194,7 @@ func TestForwardHandler_HandleAppTerminatedEventWithProblemsOnDeregistering(t *t func TestForwardHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) body := `{ "slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0", "taskId":"python_simple.4a7e99d0-9cc1-11e5-b4d8-0a0027000004", @@ -220,7 +220,7 @@ func TestForwardHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) { func TestForwardHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) ignoringTaskStatuses := []string{"TASK_STAGING", "TASK_STARTING", "TASK_RUNNING", "unknown"} for _, taskStatus := range ignoringTaskStatuses { body := `{ @@ -257,7 +257,7 @@ func TestForwardHandler_HandleStatusEventAboutDeadTask(t *testing.T) { for _, task := range app.Tasks { service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels)) } - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"} for _, taskStatus := range taskStatuses { body := `{ @@ -296,7 +296,7 @@ func TestForwardHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *te app.Labels["consul"] = "false" marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body := healthStatusChangeEventForTask("/test/app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -315,7 +315,7 @@ func TestForwardHandler_HandleHealthStatusEvent(t *testing.T) { app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body := healthStatusChangeEventForTask("/test/app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -336,7 +336,7 @@ func TestForwardHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testi marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() service.ErrorServices[app.Tasks[1].ID] = fmt.Errorf("Cannot register task") - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body := healthStatusChangeEventForTask("/test/app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -356,7 +356,7 @@ func TestForwardHandler_NotHandleHealthStatusEventForTaskWithNotAllHeathChecksPa app.Tasks[1].HealthCheckResults = []tasks.HealthCheckResult{tasks.HealthCheckResult{Alive: true}, tasks.HealthCheckResult{Alive: false}} marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body := healthStatusChangeEventForTask("/test/app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -376,7 +376,7 @@ func TestForwardHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *te app.Tasks[0].HealthCheckResults = []tasks.HealthCheckResult{} marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() - handler := EventHandler{service, marathon} + handler := NewEventHandler(service, marathon) body := healthStatusChangeEventForTask("/test/app.0") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when @@ -392,7 +392,7 @@ func TestForwardHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *te func TestForwardHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) body := `{ "appId":"/test/app", "taskId":"/test/app.1", @@ -413,7 +413,7 @@ func TestForwardHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing. func TestForwardHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T) { t.Parallel() // given - handler := EventHandler{nil, nil} + handler := NewEventHandler(nil, nil) body := `{ "appId":"/test/app", "taskId":"/test/app.1", @@ -436,7 +436,7 @@ func TestForwardHandler_HandleHealthStatusEventReturn500WhenMarathonReturnedErro // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) - handler := EventHandler{nil, marathon} + handler := NewEventHandler(nil, marathon) body := `{ "appId":"unknown", "taskId":"unknown.1", @@ -459,7 +459,7 @@ func TestForwardHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testin // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) - handler := EventHandler{nil, marathon} + handler := NewEventHandler(nil, marathon) body := healthStatusChangeEventForTask("unknown.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when