From bbb292a503f219d4513cb69eca91f7384482e2a2 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 25 Feb 2021 20:22:13 +0800 Subject: [PATCH 1/2] aggregate the event's message in a time windows and Resolve issue #66 --- core/types.go | 29 ++ manager/manager.go | 3 + sinks/dingtalk/dingtalk.go | 26 +- sinks/dingtalk/dingtalk/OWNERS | 4 + sinks/dingtalk/dingtalk/dingtalk.go | 289 ++++++++++++++++++ sinks/dingtalk/dingtalk/dingtalk_test.go | 81 +++++ sinks/dingtalk/dingtalk/dingtalkbuffer.go | 124 ++++++++ sinks/dingtalk/dingtalk/markdownMsgBuilder.go | 132 ++++++++ .../dingtalk/markdownMsgBuilder_test.go | 130 ++++++++ sinks/dingtalk/dingtalkbuffer.go | 124 ++++++++ sinks/dingtalk/markdownMsgBuilder.go | 14 +- sinks/manager.go | 5 + 12 files changed, 951 insertions(+), 10 deletions(-) create mode 100644 sinks/dingtalk/dingtalk/OWNERS create mode 100755 sinks/dingtalk/dingtalk/dingtalk.go create mode 100755 sinks/dingtalk/dingtalk/dingtalk_test.go create mode 100755 sinks/dingtalk/dingtalk/dingtalkbuffer.go create mode 100644 sinks/dingtalk/dingtalk/markdownMsgBuilder.go create mode 100644 sinks/dingtalk/dingtalk/markdownMsgBuilder_test.go create mode 100755 sinks/dingtalk/dingtalkbuffer.go diff --git a/core/types.go b/core/types.go index d5dc07a9..94b96998 100755 --- a/core/types.go +++ b/core/types.go @@ -20,6 +20,35 @@ import ( kube_api "k8s.io/api/core/v1" ) + + + +//DistinctSameResourceEvent : Distinct Events base on involvedObject.Name & event.Reason +func (eventBatch *EventBatch) DistinctSameResourceEvent() { + tempMap := make(map[string]bool) + var finalEvents []*kube_api.Event + for _, event := range eventBatch.Events { + involvedObject := event.InvolvedObject + if &involvedObject == nil { + continue + } + resourceName := involvedObject.Name + reason := event.Reason + msg:=event.Message + key := resourceName + reason + msg + if _, contain := tempMap[key]; !contain { + // fmt.Printf("key: %s \n", key) + tempMap[key] = true + finalEvents = append(finalEvents, event) + } + } + + if len(finalEvents) > 0 { + eventBatch.Events = finalEvents + } +} + + type EventBatch struct { // When this batch was created. Timestamp time.Time diff --git a/manager/manager.go b/manager/manager.go index 14ea9e7b..9ca87619 100755 --- a/manager/manager.go +++ b/manager/manager.go @@ -89,6 +89,8 @@ func (rm *realManager) Housekeep() { } } + + func (rm *realManager) housekeep() { defer func() { lastHousekeepTimestamp.Set(float64(time.Now().Unix())) @@ -100,5 +102,6 @@ func (rm *realManager) housekeep() { // when this stops to be true. events := rm.source.GetNewEvents() klog.V(0).Infof("Exporting %d events", len(events.Events)) + rm.sink.ExportEvents(events) } diff --git a/sinks/dingtalk/dingtalk.go b/sinks/dingtalk/dingtalk.go index 5d92adad..4e169a02 100755 --- a/sinks/dingtalk/dingtalk.go +++ b/sinks/dingtalk/dingtalk.go @@ -17,6 +17,7 @@ package dingtalk import ( "bytes" "encoding/json" + "flag" "fmt" "net/http" "net/url" @@ -52,6 +53,15 @@ var ( } ) +var ArgDDbufferWindows time.Duration + +func init() { + //dingding buffer windows + flag.DurationVar(&ArgDDbufferWindows, "bufferwindows", 0, "if you wanna aggregate the event's message what type is"+ + " Waring in a given time windows to dingding sink, just set bufferwindows >0 ,but Sugget you set bufferwindows > 300"+ + "(defult 0s means do not aggregate message) ") +} + /** dingtalk msg struct */ @@ -98,13 +108,19 @@ func (d *DingTalkSink) Stop() { } func (d *DingTalkSink) ExportEvents(batch *core.EventBatch) { - for _, event := range batch.Events { - if d.isEventLevelDangerous(event.Type) { - d.Ding(event) - // add threshold - time.Sleep(time.Millisecond * 50) + if ArgDDbufferWindows == 0 { + for _, event := range batch.Events { + if d.isEventLevelDangerous(event.Type) { + d.Ding(event) + // add threshold + time.Sleep(time.Millisecond * 50) + } } + } else { + klog.V(2).Info("ArgDDbufferWindows value is ", ArgDDbufferWindows, "!=0 , then Trun on dingdingtalk buffer windows.") + d.ExportBufferEvents(batch) } + } func (d *DingTalkSink) isEventLevelDangerous(level string) bool { diff --git a/sinks/dingtalk/dingtalk/OWNERS b/sinks/dingtalk/dingtalk/OWNERS new file mode 100644 index 00000000..ec7c0a1d --- /dev/null +++ b/sinks/dingtalk/dingtalk/OWNERS @@ -0,0 +1,4 @@ +approvers: +- ringtail +reviewers: +- ringtail \ No newline at end of file diff --git a/sinks/dingtalk/dingtalk/dingtalk.go b/sinks/dingtalk/dingtalk/dingtalk.go new file mode 100755 index 00000000..4e169a02 --- /dev/null +++ b/sinks/dingtalk/dingtalk/dingtalk.go @@ -0,0 +1,289 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dingtalk + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/AliyunContainerService/kube-eventer/core" + "k8s.io/api/core/v1" + "k8s.io/klog" +) + +const ( + DINGTALK_SINK = "DingTalkSink" + WARNING int = 2 + NORMAL int = 1 + DEFAULT_MSG_TYPE = "text" + CONTENT_TYPE_JSON = "application/json" + LABE_TEMPLATE = "%s\n" + TIME_FORMAT = "2006-01-02 15:04:05" +) + +var ( + MSG_TEMPLATE = "Level:%s \nKind:%s \nNamespace:%s \nName:%s \nReason:%s \nTimestamp:%s \nMessage:%s" + + MSG_TEMPLATE_ARR = [][]string{ + {"Level"}, + {"Kind"}, + {"Namespace"}, + {"Name"}, + {"Reason"}, + {"Timestamp"}, + {"Message"}, + } +) + +var ArgDDbufferWindows time.Duration + +func init() { + //dingding buffer windows + flag.DurationVar(&ArgDDbufferWindows, "bufferwindows", 0, "if you wanna aggregate the event's message what type is"+ + " Waring in a given time windows to dingding sink, just set bufferwindows >0 ,but Sugget you set bufferwindows > 300"+ + "(defult 0s means do not aggregate message) ") +} + +/** +dingtalk msg struct +*/ +type DingTalkMsg struct { + MsgType string `json:"msgtype"` + Text DingTalkText `json:"text"` + Markdown DingTalkMarkdown `json:"markdown"` +} + +type DingTalkMarkdown struct { + Title string `json:"title"` + Text string `json:"text"` +} + +type DingTalkText struct { + Content string `json:"content"` +} + +/** +dingtalk sink usage +--sink:dingtalk:https://oapi.dingtalk.com/robot/send?access_token=[access_token]&level=Warning&label=[label] + +level: Normal or Warning. The event level greater than global level will emit. +label: some thing unique when you want to distinguish different k8s clusters. +*/ +type DingTalkSink struct { + Endpoint string + Namespaces []string + Kinds []string + Token string + Level int + Labels []string + MsgType string + ClusterID string + Region string +} + +func (d *DingTalkSink) Name() string { + return DINGTALK_SINK +} + +func (d *DingTalkSink) Stop() { + //do nothing +} + +func (d *DingTalkSink) ExportEvents(batch *core.EventBatch) { + if ArgDDbufferWindows == 0 { + for _, event := range batch.Events { + if d.isEventLevelDangerous(event.Type) { + d.Ding(event) + // add threshold + time.Sleep(time.Millisecond * 50) + } + } + } else { + klog.V(2).Info("ArgDDbufferWindows value is ", ArgDDbufferWindows, "!=0 , then Trun on dingdingtalk buffer windows.") + d.ExportBufferEvents(batch) + } + +} + +func (d *DingTalkSink) isEventLevelDangerous(level string) bool { + score := getLevel(level) + if score >= d.Level { + return true + } + return false +} + +func (d *DingTalkSink) Ding(event *v1.Event) { + if d.Namespaces != nil { + skip := true + for _, namespace := range d.Namespaces { + if namespace == event.Namespace { + skip = false + break + } + } + if skip { + return + } + } + + if d.Kinds != nil { + skip := true + for _, kind := range d.Kinds { + if kind == event.InvolvedObject.Kind { + skip = false + break + } + } + if skip { + return + } + } + + msg := createMsgFromEvent(d, event) + if msg == nil { + klog.Warningf("failed to create msg from event,because of %v", event) + return + } + + msg_bytes, err := json.Marshal(msg) + if err != nil { + klog.Warningf("failed to marshal msg %v", msg) + return + } + + b := bytes.NewBuffer(msg_bytes) + + resp, err := http.Post(fmt.Sprintf("https://%s?access_token=%s", d.Endpoint, d.Token), CONTENT_TYPE_JSON, b) + if err != nil { + klog.Errorf("failed to send msg to dingtalk. error: %s", err.Error()) + return + } + defer resp.Body.Close() + if resp != nil && resp.StatusCode != http.StatusOK { + klog.Errorf("failed to send msg to dingtalk, because the response code is %d", resp.StatusCode) + return + } +} + +func getLevel(level string) int { + score := 0 + switch level { + case v1.EventTypeWarning: + score += 2 + case v1.EventTypeNormal: + score += 1 + default: + //score will remain 0 + } + return score +} + +func createMsgFromEvent(d *DingTalkSink, event *v1.Event) *DingTalkMsg { + msg := &DingTalkMsg{} + msg.MsgType = d.MsgType + + switch msg.MsgType { + //https://open-doc.dingtalk.com/microapp/serverapi2/ye8tup#-6 + case MARKDOWN_MSG_TYPE: + markdownCreator := NewMarkdownMsgBuilder(d.ClusterID, d.Region, event) + markdownCreator.AddNodeName(event.Source.Host) + markdownCreator.AddLabels(d.Labels) + msg.Markdown = DingTalkMarkdown{ + //title 加不加其实没所谓,最终不会显示 + Title: fmt.Sprintf("Kubernetes(ID:%s) Event", d.ClusterID), + Text: markdownCreator.Build(), + } + break + + default: + //默认按文本模式推送 + template := MSG_TEMPLATE + if len(d.Labels) > 0 { + for _, label := range d.Labels { + template = fmt.Sprintf(LABE_TEMPLATE, label) + template + } + } + msg.Text = DingTalkText{ + Content: fmt.Sprintf(template, event.Type, event.InvolvedObject.Kind, event.Namespace, event.Name, event.Reason, event.LastTimestamp.Format(TIME_FORMAT), event.Message), + } + break + } + + return msg +} + +func NewDingTalkSink(uri *url.URL) (*DingTalkSink, error) { + d := &DingTalkSink{ + Level: WARNING, + } + if len(uri.Host) > 0 { + d.Endpoint = uri.Host + uri.Path + } + opts := uri.Query() + + if len(opts["access_token"]) >= 1 { + d.Token = opts["access_token"][0] + } else { + return nil, fmt.Errorf("you must provide dingtalk bot access_token") + } + + if len(opts["level"]) >= 1 { + d.Level = getLevel(opts["level"][0]) + } + + //add extra labels + if len(opts["label"]) >= 1 { + d.Labels = opts["label"] + } + + if msgType := opts["msg_type"]; len(msgType) >= 1 { + d.MsgType = msgType[0] + } else { + //向下兼容,覆盖以前的版本,没有这个参数的情况 + d.MsgType = DEFAULT_MSG_TYPE + } + + if clusterID := opts["cluster_id"]; len(clusterID) >= 1 { + d.ClusterID = clusterID[0] + } + + if region := opts["region"]; len(region) >= 1 { + d.Region = region[0] + } + + d.Namespaces = getValues(opts["namespaces"]) + // kinds:https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#lists-and-simple-kinds + // such as node,pod,component and so on + d.Kinds = getValues(opts["kinds"]) + + return d, nil +} + +func getValues(o []string) []string { + if len(o) >= 1 { + if len(o[0]) == 0 { + return nil + } + return strings.Split(o[0], ",") + } + return nil +} diff --git a/sinks/dingtalk/dingtalk/dingtalk_test.go b/sinks/dingtalk/dingtalk/dingtalk_test.go new file mode 100755 index 00000000..8daf2cf7 --- /dev/null +++ b/sinks/dingtalk/dingtalk/dingtalk_test.go @@ -0,0 +1,81 @@ +package dingtalk + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + + // metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + //"github.com/olekukonko/tablewriter" + //"os" + "encoding/json" + "net/url" +) + +func TestGetLevel(t *testing.T) { + warning := getLevel(v1.EventTypeWarning) + normal := getLevel(v1.EventTypeNormal) + none := getLevel("") + assert.True(t, warning > normal) + assert.True(t, warning == WARNING) + assert.True(t, normal == NORMAL) + assert.True(t, 0 == none) +} + +func TestCreateMsgFromEvent(t *testing.T) { + labels := make([]string, 2) + labels[0] = "abcd" + labels[1] = "defg" + event := createTestEvent() + event.Source.Host = TEST_NODENAME + event.InvolvedObject.Kind = TEST_RESOURCE_TYPE + event.Name = TEST_DEPLOY_NAME + event.Namespace = TEST_NAMESPACE + u, _ := url.Parse("dingtalk:https://oapi.dingtalk.com/robot/send?access_token=&label=