Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dd add #176

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ import (
kube_api "k8s.io/api/core/v1"
)




//DistinctSameResourceEvent : Distinct Events base on involvedObject.Name & event.Reason
func (eventBatch *EventBatch) DistinctSameResourceEvent() {
tempMap := make(map[string]bool)
var finalEvents []*kube_api.Event
for _, event := range eventBatch.Events {
involvedObject := event.InvolvedObject
if &involvedObject == nil {
continue
}
resourceName := involvedObject.Name
reason := event.Reason
msg:=event.Message
key := resourceName + reason + msg
if _, contain := tempMap[key]; !contain {
// fmt.Printf("key: %s \n", key)
tempMap[key] = true
finalEvents = append(finalEvents, event)
}
}

if len(finalEvents) > 0 {
eventBatch.Events = finalEvents
}
}


type EventBatch struct {
// When this batch was created.
Timestamp time.Time
Expand Down
3 changes: 3 additions & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (rm *realManager) Housekeep() {
}
}



func (rm *realManager) housekeep() {
defer func() {
lastHousekeepTimestamp.Set(float64(time.Now().Unix()))
Expand All @@ -100,5 +102,6 @@ func (rm *realManager) housekeep() {
// when this stops to be true.
events := rm.source.GetNewEvents()
klog.V(0).Infof("Exporting %d events", len(events.Events))

rm.sink.ExportEvents(events)
}
26 changes: 21 additions & 5 deletions sinks/dingtalk/dingtalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dingtalk
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -52,6 +53,15 @@ var (
}
)

var ArgDDbufferWindows time.Duration

func init() {
//dingding buffer windows
flag.DurationVar(&ArgDDbufferWindows, "bufferwindows", 0, "if you wanna aggregate the event's message what type is"+
" Waring in a given time windows to dingding sink, just set bufferwindows >0 ,but Sugget you set bufferwindows > 300"+
"(defult 0s means do not aggregate message) ")
}

/**
dingtalk msg struct
*/
Expand Down Expand Up @@ -98,13 +108,19 @@ func (d *DingTalkSink) Stop() {
}

func (d *DingTalkSink) ExportEvents(batch *core.EventBatch) {
for _, event := range batch.Events {
if d.isEventLevelDangerous(event.Type) {
d.Ding(event)
// add threshold
time.Sleep(time.Millisecond * 50)
if ArgDDbufferWindows == 0 {
for _, event := range batch.Events {
if d.isEventLevelDangerous(event.Type) {
d.Ding(event)
// add threshold
time.Sleep(time.Millisecond * 50)
}
}
} else {
klog.V(2).Info("ArgDDbufferWindows value is ", ArgDDbufferWindows, "!=0 , then Trun on dingdingtalk buffer windows.")
d.ExportBufferEvents(batch)
}

}

func (d *DingTalkSink) isEventLevelDangerous(level string) bool {
Expand Down
124 changes: 124 additions & 0 deletions sinks/dingtalk/dingtalkbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package dingtalk

import (
"bytes"
"encoding/json"
"fmt"
"github.com/AliyunContainerService/kube-eventer/core"
kube_api "k8s.io/api/core/v1"
"k8s.io/klog"
"net/http"
"sync"
"time"
)

type BufferEventBatch map[string][]*kube_api.Event

func (d *DingTalkSink) ExportBufferEvents(batch *core.EventBatch) {

var wg sync.WaitGroup
var bufferEventBatch = BufferEventBatch{}
defer func() {
bufferEventBatch = BufferEventBatch{}
}()
// dump level is error event into buffer
wg.Add(1)
go func() {
defer wg.Done()
for _, event := range batch.Events {
// only handler Warning Buffer
if event.Type == "Warning" {
bufferEventBatch[event.InvolvedObject.Name] = append(bufferEventBatch[event.InvolvedObject.Name], event)
}
}
}()

//buffer windows
klog.V(2).Info("dingding buffer windows is ", ArgDDbufferWindows)
time.Sleep(ArgDDbufferWindows)
klog.V(2).Info("NewEventBatch len:", len(bufferEventBatch))

for _, bufferEvent := range bufferEventBatch {
d.DingBuffer(bufferEvent)
// add threshold
time.Sleep(time.Millisecond * 50)
}

wg.Wait()
}

func (d *DingTalkSink) DingBuffer(bufferevent []*kube_api.Event) {

msg := NewcreateMsgFromEvent(d, bufferevent)

if msg == nil {
klog.Warningf("failed to create msg from event,because of %v", bufferevent)
return
}

msg_bytes, err := json.Marshal(msg)
if err != nil {
klog.Warningf("failed to marshal msg %v", msg)
return
}

b := bytes.NewBuffer(msg_bytes)

resp, err := http.Post(fmt.Sprintf("https://%s?access_token=%s", d.Endpoint, d.Token), CONTENT_TYPE_JSON, b)
if err != nil {
klog.Errorf("failed to send msg to dingtalk. error: %s", err.Error())
return
}

defer resp.Body.Close()
if resp != nil && resp.StatusCode != http.StatusOK {
klog.Errorf("failed to send msg to dingtalk, because the response code is %d", resp.StatusCode)
return
}
}

func NewcreateMsgFromEvent(d *DingTalkSink, bufferevent []*kube_api.Event) *DingTalkMsg {
msg := &DingTalkMsg{}
msg.MsgType = d.MsgType

m := ""
m2 := ""
i := 0
for _, event := range bufferevent {
i = i + 1
m = m + fmt.Sprintf("msg%d : ", i) + event.Message + "\n" + " "
m2 = m2 + "### " + fmt.Sprintf("msg%d : ", i) + event.Message + "\n" + " "
}
msgs := fmt.Sprintf("[%s]", m)
msgs_markdown := fmt.Sprintf("[\n%s]", m2)

switch msg.MsgType {
//https://open-doc.dingtalk.com/microapp/serverapi2/ye8tup#-6
case MARKDOWN_MSG_TYPE:
markdownCreator := NewMarkdownMsgBuilder(d.ClusterID, d.Region, bufferevent[0], msgs_markdown)
markdownCreator.AddNodeName(bufferevent[0].Source.Host)
markdownCreator.AddLabels(d.Labels)
msg.Markdown = DingTalkMarkdown{
//title 加不加其实没所谓,最终不会显示
Title: fmt.Sprintf("Kubernetes(ID:%s) Event", d.ClusterID),
Text: markdownCreator.Build(),
}
break

default:
//默认按文本模式推送
template := MSG_TEMPLATE
if len(d.Labels) > 0 {
for _, label := range d.Labels {
template = fmt.Sprintf(LABE_TEMPLATE, label) + template
}
}

event := bufferevent[0]
msg.Text = DingTalkText{
Content: fmt.Sprintf(template, event.Type, event.InvolvedObject.Kind, event.Namespace, event.InvolvedObject.Name, event.Reason, event.LastTimestamp.Format(TIME_FORMAT), msgs),
}
}

return msg
}
14 changes: 9 additions & 5 deletions sinks/dingtalk/markdownMsgBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dingtalk
import (
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
)

Expand All @@ -13,7 +12,6 @@ const (
MARKDOWN_LINK_TEMPLATE = "[%s](%s)"
MARKDOWN_TEXT_BOLD = "**%s**"
MARKDOWN_NEW_LINE = "\n\n"

URL_ALIYUN_K8S_CONSULE = "https://cs.console.aliyun.com/#/k8s"
//阿里云 kubernetes 管理控制台, Deployment,StatefulSet,DaemonSet 有同样的URL规律
URL_ALIYUN_RESOURCE_DETAIL_TEMPLATE = URL_ALIYUN_K8S_CONSULE + "/%s/detail/%s/%s/%s/%s/pods"
Expand All @@ -31,7 +29,7 @@ type MarkdownMsgBuilder struct {
OutputText string
}

func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event) *MarkdownMsgBuilder {
func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event,msgs ...string) *MarkdownMsgBuilder {

m := MarkdownMsgBuilder{
Region: region,
Expand All @@ -43,6 +41,7 @@ func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event) *MarkdownM
namespace := fmt.Sprintf(MARKDOWN_LINK_TEMPLATE, event.Namespace, URL_ALIYUN_NAMESPACE_TEMPLATE)
name := ""


switch event.InvolvedObject.Kind {
case "Deployment":
deployName := removeDotContent(event.Name)
Expand Down Expand Up @@ -81,8 +80,13 @@ func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event) *MarkdownM
}
reason := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.Reason)
timestamp := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.LastTimestamp.String())
message := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.Message)
m.OutputText = fmt.Sprintf(MARKDOWN_TEMPLATE, level, kind, namespace, name, reason, timestamp, message)

if len(msgs)==0{
message := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.Message)
m.OutputText = fmt.Sprintf(MARKDOWN_TEMPLATE, level, kind, namespace, name, reason, timestamp, message)
}else{
m.OutputText = fmt.Sprintf(MARKDOWN_TEMPLATE, level, kind, namespace, name, reason, timestamp, msgs[0])
}
return &m

}
Expand Down
5 changes: 5 additions & 0 deletions sinks/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,17 @@ func (this *sinkManager) Stop() {
}
}

// export to Eventsink ,for example dingding
func export(s core.EventSink, data *core.EventBatch) {
startTime := time.Now()
defer func() {
exporterDuration.
WithLabelValues(s.Name()).
Observe(float64(time.Since(startTime)) / float64(time.Millisecond))
}()

data.DistinctSameResourceEvent()
s.ExportEvents(data)
}