-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathopenwhisk_status.go
465 lines (354 loc) · 13.5 KB
/
openwhisk_status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
package keynuker
import (
"bufio"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strings"
"regexp"
"strconv"
"github.com/apache/incubator-openwhisk-client-go/whisk"
"github.com/pkg/errors"
"gopkg.in/mailgun/mailgun-go.v1"
"github.com/dustin/go-humanize"
"encoding/json"
)
type ParamsMonitorActivations struct {
// MailerParams
MailerParams
// This is the name of the KeyNuker "org/tenant". Defaults to "default", but allows to be extended multi-tenant.
KeyNukerOrg string
// The FROM address that will be used for any notifications
EmailFromAddress string `json:"email_from_address"`
// Optionally specify the Keynuker admin email to be CC'd about any leaked/nuked keys
KeynukerAdminEmailCCAddress string `json:"admin_email_cc_address"`
}
func SendMonitorNotifications(params ParamsMonitorActivations, activationStatus map[string]interface{}) (deliveryId string, err error) {
mailer := mailgun.NewMailgun(
params.MailerParams.Domain,
params.MailerParams.ApiKey,
params.MailerParams.PublicApiKey,
)
activationStatusMarshalled, err := json.MarshalIndent(activationStatus, "", " ")
if err != nil {
return "", err
}
messageBody := fmt.Sprintf(
"Failed activations: %s",
activationStatusMarshalled,
)
message := mailgun.NewMessage(
params.EmailFromAddress,
"KeyNuker Monitoring: failed activations 💥",
messageBody,
params.KeynukerAdminEmailCCAddress,
)
mes, id, err := mailer.Send(message)
if err != nil {
return "", fmt.Errorf("Error sending message: %v. Mes: %v", err, mes)
}
return id, nil
}
func SendReportNotifications(params ParamsMonitorActivations, report RecentActivationsReportOutput) (deliveryId string, err error) {
mailer := mailgun.NewMailgun(
params.MailerParams.Domain,
params.MailerParams.ApiKey,
params.MailerParams.PublicApiKey,
)
var messageBody string
if len(report.FailedActivationIds) > 0 {
messageBody += fmt.Sprintf("WARNING: %d failed activations. IDs: %+v\n\n", len(report.FailedActivationIds), report.FailedActivationIds)
}
messageBody += fmt.Sprintf("Raw content scanned in most recent 200 activations: %s", humanize.Bytes(uint64(report.TotalNumBytesScanned)))
message := mailgun.NewMessage(
params.EmailFromAddress,
"KeyNuker Activity Report 🔐",
messageBody,
params.KeynukerAdminEmailCCAddress,
)
mes, id, err := mailer.Send(message)
if err != nil {
return "", fmt.Errorf("Error sending message: %v. Mes: %v", err, mes)
}
return id, nil
}
type RecentActivationsReportInput struct {
MaxActivationsToScan int
}
type RecentActivationsReportOutput struct {
FailedActivationIds []string
TotalNumBytesScanned int64
}
// A more generalized version of OpenWhiskRecentActivationsStatus
// TODO #1: Moving to structured logging (logrus?) will make this a lot more tenable. Either that or json stats.
func OpenWhiskRecentActivationsReport(input RecentActivationsReportInput) (output RecentActivationsReportOutput, err error) {
whiskConfig, err := WhiskConfigFromEnvironment()
if err != nil {
return output, errors.Wrapf(err, "Error getting whisk config from environment")
}
client, err := whisk.NewClient(http.DefaultClient, whiskConfig)
if err != nil {
return output, errors.Wrapf(err, "Error creating whisk.NewClient")
}
// TODO: is this needed?
// output.FailedActivations = []whisk.Activation{}
// This must limited to a small number, otherwise it will exceed memory limits and get killed abruptly
pageSize := 25
// Keep track
skipOffset := 0
for {
// Check to see if we've already scanned far enough back
numActivationsScanned := skipOffset
if numActivationsScanned >= input.MaxActivationsToScan {
// return what we have so far (should be no failures)
log.Printf("Exceeded max activations to scan: %d. Returning results.", input.MaxActivationsToScan)
return output, nil
}
listActivationsOptions := &whisk.ActivationListOptions{
Docs: true, // Need to include this to get the activation doc body, which ends up using lots of memory
Limit: pageSize,
Skip: skipOffset,
}
log.Printf("Getting activation list %+v. Max activations to scan: %d", listActivationsOptions, input.MaxActivationsToScan)
// Make REST call to OpenWhisk API to load list of activations
activations, _, err := client.Activations.List(listActivationsOptions)
if err != nil {
return output, errors.Wrapf(err, "client.Activations.List with options %+v returned error", listActivationsOptions)
}
if len(activations) == 0 {
// Looks like we hit the end of list of total avaialable activations
return output, nil
}
// Loop over activations and look for failures and total up bytes scanned by scanning logs
for _, activation := range activations {
if activation.Name == "monitor-activations" || activation.Name == "activity-report" {
continue
}
log.Printf("CalculateBytesScanned for activation %v w/ id: %v", activation.Name, activation.ActivationID)
if activation.Response.Success == false {
output.FailedActivationIds = append(output.FailedActivationIds, activation.ActivationID)
}
bytesScanned, err := CalculateBytesScanned(activation.Logs)
if err != nil {
return output, errors.Wrapf(err, "Error calculating bytes scanned from activation %v logs", activation.ActivationID)
}
output.TotalNumBytesScanned += bytesScanned
}
// Go to the next page of data
skipOffset += pageSize
}
return output, nil
}
// Look for log messages with form:
// Scanning 1833 bytes
// and extract the number of bytes, and then add them up
func CalculateBytesScanned(logs []string) (int64, error) {
log.Printf("CalculateBytesScanned called with %d log lines", len(logs))
defer log.Printf("/CalculateBytesScanned finished scanning %d log lines", len(logs))
numBytesAccumulated := int64(0)
r, err := regexp.Compile(`Scanning (\d*) bytes`)
if err != nil {
return 0, errors.Wrapf(err, "Error compiling regex")
}
for _, logLine := range logs {
result := r.FindStringSubmatch(logLine)
if len(result) > 0 {
numBytesAsStr := result[1]
numBytes, err := strconv.Atoi(numBytesAsStr)
if err != nil {
return 0, errors.Wrapf(err, "Error converting %s to a number", numBytesAsStr)
}
numBytesAccumulated += int64(numBytes)
}
}
return numBytesAccumulated, nil
}
// Connect to OpenWhisk API and scan the list of recent activations and look for any failures.
// If any failures found, return {"status": "failure"}. Otherwise return {"status": "success"}.
// The idea is that this would be served up by a web action that a monitoring tool could poll
// and send alerts if any failures occurred.
func OpenWhiskRecentActivationsStatus(maxActivationsToScan int) (keynukerStatus map[string]interface{}) {
keynukerStatus = map[string]interface{}{}
keynukerStatus["status"] = "failure"
whiskConfig, err := WhiskConfigFromEnvironment()
if err != nil {
msg := fmt.Sprintf("Error getting whisk config from environment: %v", err)
log.Printf(msg)
keynukerStatus["error"] = msg
return keynukerStatus
}
// whiskConfig.Debug = true
failedActivations, err := ScanActivationsForFailures(whiskConfig, maxActivationsToScan)
log.Printf("ScanActivationsForFailures returned %d failedActivations", len(failedActivations))
if err != nil {
msg := fmt.Sprintf("Error scanning activations for failures: %v", err)
log.Printf(msg)
keynukerStatus["error"] = msg
// Don't return an actual error since this is a monitoring tool and it should always return a result
// so the upstream web action returns the JSON response to the caller
return keynukerStatus
}
if len(failedActivations) == 0 {
keynukerStatus["status"] = "success"
} else {
for i, failedActivation := range failedActivations {
log.Printf("Trimming %d activation logs", len(failedActivation.Logs))
failedActivation.Logs = failedActivation.Logs[:0]
failedActivations[i] = failedActivation
}
keynukerStatus["failedActivations"] = failedActivations
}
log.Printf("keynukerStatus: %+v", keynukerStatus)
return keynukerStatus
}
// Loop over all activations and return the ones that have a whisk.Result with Success == false.
// Stop scanning after maxActivationsToScan activations have been scanned
func ScanActivationsForFailures(whiskConfig *whisk.Config, maxActivationsToScan int) (failedActivations []whisk.Activation, err error) {
client, err := whisk.NewClient(http.DefaultClient, whiskConfig)
if err != nil {
return failedActivations, err
}
failedActivations = []whisk.Activation{}
// This must limited to a small number, otherwise it will exceed memory limits and get killed abruptly
pageSize := 25
// Keep track
skipOffset := 0
for {
// Check to see if we've already scanned far enough back
numActivationsScanned := skipOffset
if numActivationsScanned >= maxActivationsToScan {
// return what we have so far (should be no failures)
log.Printf("numActivationsScanned (%d) >= maxActivationsToScan (%d). return failedActivations: %v", numActivationsScanned, maxActivationsToScan, failedActivations)
return failedActivations, nil
}
listActivationsOptions := &whisk.ActivationListOptions{
Docs: true, // Need to include this to get the activation doc body, which ends up using lots of memory
Limit: pageSize,
Skip: skipOffset,
}
log.Printf("List Activations with: %+v", listActivationsOptions)
// Make REST call to OpenWhisk API to load list of activations
activations, _, err := client.Activations.List(listActivationsOptions)
log.Printf("List Activations returned %d activations", len(activations))
if err != nil {
log.Printf("List Activations returned err: %v", err)
return failedActivations, err
}
if len(activations) == 0 {
// Looks like we hit the end of list of total avaialable activations
return failedActivations, nil
}
// Loop over activations and look for failures
for _, activation := range activations {
if activation.Name == "monitor-activations" {
log.Printf("Ignoring monitor-activations activation: %v", activation.ActivationID)
continue
}
if activation.Response.Success == false {
log.Printf("Detected failed activation: %v", activation.ActivationID)
failedActivations = append(failedActivations, activation)
}
}
// If we found any failures, just return early
if len(failedActivations) > 0 {
log.Printf("len(failedActivations) > 0 (=%d). Returning: %v", len(failedActivations), failedActivations)
return failedActivations, nil
}
// Go to the next page of data
skipOffset += pageSize
}
// Should never get here
if len(failedActivations) > 0 {
log.Printf("len(failedActivations) > 0 (=%d). Returning: %v", len(failedActivations), failedActivations)
}
return failedActivations, nil
}
func WhiskConfigFromEnvironment() (config *whisk.Config, err error) {
// First try to load from env variables and return that (eg, __OW_API_HOST). This is what will run when running
// on the BlueMix cloud
config, err = WhiskConfigFromOwEnvVars()
if err != nil {
return nil, err
}
if config != nil {
return config, nil
}
// Otherwise try to load config based on the contents of the WSK_CONFIG_FILE
config = &whisk.Config{}
whiskPropsMap, err := WhiskPropsMapFromWskConfigFile()
if err != nil {
return nil, err
}
for key, val := range whiskPropsMap {
switch strings.ToUpper(key) {
case "AUTH":
config.AuthToken = val
case "APIHOST":
// Add "api" to workaround https://github.com/apache/incubator-openwhisk-client-go/issues/25
apiUrl := fmt.Sprintf("http://%v/api", val)
apiHost, err := url.Parse(apiUrl)
if err != nil {
return nil, fmt.Errorf("Unable to parse url (%v). Error: %v", val, err)
}
apiHost.Scheme = "http" // TODO: what should this be?
config.BaseURL = apiHost
config.Host = val
}
}
return config, nil
}
// Given a base hostname like "openwhisk.ng.bluemix.net" or "https://openwhisk.ng.bluemix.net:443", return a URL
// that includes a trailing "/api" in the path.
// The trailing /api is needed due to https://github.com/apache/incubator-openwhisk-client-go/issues/25
func CreateApiHostBaseUrl(hostname string) (baseUrl *url.URL, err error) {
hostnameWithPath := fmt.Sprintf("%v/api", hostname)
baseUrl, err = url.Parse(hostnameWithPath)
if err != nil {
return nil, fmt.Errorf("Unable to parse url (%v). Error: %v", hostnameWithPath, err)
}
return baseUrl, nil
}
func WhiskConfigFromOwEnvVars() (config *whisk.Config, err error) {
config = &whisk.Config{}
owApiHost := os.Getenv("__OW_API_HOST")
owApiKey := os.Getenv("__OW_API_KEY")
// None of the env vars are set, return nil
if owApiHost == "" || owApiKey == "" {
return nil, nil
}
baseUrl, err := CreateApiHostBaseUrl(owApiHost)
if err != nil {
return nil, err
}
config.BaseURL = baseUrl
config.AuthToken = owApiKey
config.Host = owApiHost
return config, nil
}
func WhiskPropsMapFromWskConfigFile() (map[string]string, error) {
whiskPropsMap := map[string]string{}
wskConfigFilePath := os.Getenv("WSK_CONFIG_FILE")
if wskConfigFilePath == "" {
return nil, fmt.Errorf("You need to set WSK_CONFIG_FILE to specify where to find .wskprops")
}
wskConfigFile, err := os.Open(wskConfigFilePath)
if err != nil {
return nil, fmt.Errorf("Error opening file: %v. Err: %v", wskConfigFilePath, err)
}
defer wskConfigFile.Close()
scanner := bufio.NewScanner(wskConfigFile)
for scanner.Scan() {
log.Println(scanner.Text())
fields := strings.Split(scanner.Text(), "=")
key := fields[0]
val := fields[1]
whiskPropsMap[key] = val
}
// check for errors
if err = scanner.Err(); err != nil {
return nil, fmt.Errorf("Error reading lines from file: %v. Err: %v", wskConfigFilePath, err)
}
return whiskPropsMap, nil
}