forked from cmaster11/overseer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
153 lines (130 loc) · 3.56 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//
// This is the queue bridge, which should be built like so:
//
// go build .
//
// Once built launch it as follows:
//
// $ ./queue-bridge [-redis-queue-key=overseer.results] -dest-queue overseer.results.email -dest-queue overseer.results.webhook[tag=hello.*,name=dsf]
//
// It is possible to conditionally clone results to different queues by using regex filters, e.g.
//
// -dest-queue overseer.results.webhook[tag=k8s-cluster.*]
//
// Results are filterable on:
//
// - type: type=k8s-event
// - tag: tag=my-k8s-cluster
// - input
// - target: target=10\.0\.123\.111
// - error: error=(ssl|SSL)
// - isDedup: isDedup=true/isDedup=false
// - recovered: recovered=true/recovered=false
//
// When a test is provided on the source queue, it gets cloned into the destination queues.
// This helps using multiple bridges, e.g. to send an queue and a webhook for each test result.
//
// When the queue bridge is used, the email and webhook bridges can be started like:
//
// $ ./email-bridge [email protected],[email protected] -redis-queue-key overseer.results.email
// $ ./webhook-bridge -url=https://example.com/bla -redis-queue-key overseer.results.webhook
//
// Alberto
// --
//
package main
import (
"flag"
"fmt"
"os"
"github.com/cmaster11/overseer/test"
"github.com/go-redis/redis"
)
type QueueBridge struct {
R *redis.Client
// The queues to use as destination
Queues []*destinationQueue
}
//
// Given a JSON string decode it and post it via queue if it describes
// a test-failure.
//
func (bridge *QueueBridge) Process(msg []byte) {
testResult, err := test.ResultFromJSON(msg)
if err != nil {
panic(err)
}
fmt.Printf("Processing result: %+v\n", testResult)
for _, queue := range bridge.Queues {
if queue.Filter != nil && !queue.Filter.Matches(testResult) {
continue
}
_, err = bridge.R.RPush(queue.QueueKey, msg).Result()
if err != nil {
fmt.Printf("Result clone failed for queue [%s]: %s\n", queue.QueueKey, err)
}
}
}
//
// Entry Point
//
func main() {
//
// Parse our flags
//
redisHost := flag.String("redis-host", "127.0.0.1:6379", "Specify the address of the redis queue.")
redisPass := flag.String("redis-pass", "", "Specify the password of the redis queue.")
redisQueueKey := flag.String("redis-queue-key", "overseer.results", "Specify the redis queue key to use as source.")
var queuesArray stringsFlag
flag.Var(&queuesArray, "dest-queue", "The redis queues to clone results into")
flag.Parse()
queues, err := newDestinationQueuesFromStringArray(queuesArray)
if err != nil {
fmt.Printf("Error parsing queues: %+v\n", err)
os.Exit(1)
}
//
// Sanity-check.
//
if len(queues) == 0 {
fmt.Printf("Usage: ./queue-bridge [-redis-queue-key=overseer.results] -dest-queue=overseer.results.queue -dest-queue=overseer.results.webhook[tag=my-cluster-.*]\n")
os.Exit(1)
}
fmt.Printf("started with %d queues", len(queues))
//
// Create the redis client
//
r := redis.NewClient(&redis.Options{
Addr: *redisHost,
Password: *redisPass,
DB: 0, // use default DB
})
//
// And run a ping, just to make sure it worked.
//
_, err = r.Ping().Result()
if err != nil {
fmt.Printf("Redis connection failed: %s\n", err.Error())
os.Exit(1)
}
bridge := QueueBridge{
R: r,
Queues: queues,
}
for {
//
// Get test-results
//
msg, _ := r.BLPop(0, *redisQueueKey).Result()
//
// If they were non-empty, process them.
//
// msg[0] will be "overseer.results"
//
// msg[1] will be the value removed from the list.
//
if len(msg) >= 1 {
bridge.Process([]byte(msg[1]))
}
}
}