forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapplication_insights.go
359 lines (297 loc) · 9.92 KB
/
application_insights.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
package application_insights
import (
"fmt"
"log"
"math"
"time"
"unsafe"
"github.com/Microsoft/ApplicationInsights-Go/appinsights"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)
type TelemetryTransmitter interface {
Track(appinsights.Telemetry)
Close() <-chan struct{}
}
type DiagnosticsMessageSubscriber interface {
Subscribe(appinsights.DiagnosticsMessageHandler) appinsights.DiagnosticsMessageListener
}
type ApplicationInsights struct {
InstrumentationKey string
EndpointURL string
Timeout internal.Duration
EnableDiagnosticLogging bool
ContextTagSources map[string]string
diagMsgSubscriber DiagnosticsMessageSubscriber
transmitter TelemetryTransmitter
diagMsgListener appinsights.DiagnosticsMessageListener
}
const (
Error = "E! "
Warning = "W! "
Info = "I! "
Debug = "D! "
)
var (
sampleConfig = `
## Instrumentation key of the Application Insights resource.
instrumentation_key = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx"
## Regions that require endpoint modification https://docs.microsoft.com/en-us/azure/azure-monitor/app/custom-endpoints
# endpoint_url = "https://dc.services.visualstudio.com/v2/track"
## Timeout for closing (default: 5s).
# timeout = "5s"
## Enable additional diagnostic logging.
# enable_diagnostic_logging = false
## Context Tag Sources add Application Insights context tags to a tag value.
##
## For list of allowed context tag keys see:
## https://github.com/Microsoft/ApplicationInsights-Go/blob/master/appinsights/contracts/contexttagkeys.go
# [outputs.application_insights.context_tag_sources]
# "ai.cloud.role" = "kubernetes_container_name"
# "ai.cloud.roleInstance" = "kubernetes_pod_name"
`
is32Bit bool
is32BitChecked bool
)
func (a *ApplicationInsights) SampleConfig() string {
return sampleConfig
}
func (a *ApplicationInsights) Description() string {
return "Send metrics to Azure Application Insights"
}
func (a *ApplicationInsights) Connect() error {
if a.InstrumentationKey == "" {
return fmt.Errorf("Instrumentation key is required")
}
if a.transmitter == nil {
a.transmitter = NewTransmitter(a.InstrumentationKey, a.EndpointURL)
}
if a.EnableDiagnosticLogging && a.diagMsgSubscriber != nil {
a.diagMsgListener = a.diagMsgSubscriber.Subscribe(func(msg string) error {
logOutputMsg(Info, "%s", msg)
return nil
})
}
return nil
}
func (a *ApplicationInsights) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
allMetricTelemetry := a.createTelemetry(metric)
for _, telemetry := range allMetricTelemetry {
a.transmitter.Track(telemetry)
}
}
return nil
}
func (a *ApplicationInsights) Close() error {
if a.diagMsgListener != nil {
// We want to listen to diagnostic messages during closing
// That is why we stop listening only after Close() ends (or a timeout occurs)
defer a.diagMsgListener.Remove()
}
if a.transmitter == nil {
return nil
}
select {
case <-a.transmitter.Close():
logOutputMsg(Info, "Closed")
case <-time.After(a.Timeout.Duration):
logOutputMsg(Warning, "Close operation timed out after %v", a.Timeout.Duration)
}
return nil
}
func (a *ApplicationInsights) createTelemetry(metric telegraf.Metric) []appinsights.Telemetry {
aggregateTelemetry, usedFields := a.createAggregateMetricTelemetry(metric)
if aggregateTelemetry != nil {
telemetry := a.createTelemetryForUnusedFields(metric, usedFields)
telemetry = append(telemetry, aggregateTelemetry)
return telemetry
}
fields := metric.Fields()
if len(fields) == 1 && metric.FieldList()[0].Key == "value" {
// Just use metric name as the telemetry name
telemetry := a.createSimpleMetricTelemetry(metric, "value", false)
if telemetry != nil {
return []appinsights.Telemetry{telemetry}
} else {
return nil
}
} else {
// AppInsights does not support multi-dimensional metrics at the moment, so we need to disambiguate resulting telemetry
// by adding field name as the telemetry name suffix
retval := a.createTelemetryForUnusedFields(metric, nil)
return retval
}
}
func (a *ApplicationInsights) createSimpleMetricTelemetry(metric telegraf.Metric, fieldName string, useFieldNameInTelemetryName bool) *appinsights.MetricTelemetry {
telemetryValue, err := getFloat64TelemetryPropertyValue([]string{fieldName}, metric, nil)
if err != nil {
return nil
}
var telemetryName string
if useFieldNameInTelemetryName {
telemetryName = metric.Name() + "_" + fieldName
} else {
telemetryName = metric.Name()
}
telemetry := appinsights.NewMetricTelemetry(telemetryName, telemetryValue)
telemetry.Properties = metric.Tags()
a.addContextTags(metric, telemetry)
telemetry.Timestamp = metric.Time()
return telemetry
}
func (a *ApplicationInsights) createAggregateMetricTelemetry(metric telegraf.Metric) (*appinsights.AggregateMetricTelemetry, []string) {
usedFields := make([]string, 0, 6) // We will use up to 6 fields
// Get the sum of all individual measurements(mandatory property)
telemetryValue, err := getFloat64TelemetryPropertyValue([]string{"sum", "value"}, metric, &usedFields)
if err != nil {
return nil, nil
}
// Get the count of measurements (mandatory property)
telemetryCount, err := getIntTelemetryPropertyValue([]string{"count", "samples"}, metric, &usedFields)
if err != nil {
return nil, nil
}
telemetry := appinsights.NewAggregateMetricTelemetry(metric.Name())
telemetry.Value = telemetryValue
telemetry.Count = telemetryCount
telemetry.Properties = metric.Tags()
a.addContextTags(metric, telemetry)
telemetry.Timestamp = metric.Time()
// We attempt to set min, max, variance and stddev fields but do not really care if they are not present--
// they are not essential for aggregate metric.
// By convention AppInsights prefers stddev over variance, so to be consistent, we test for stddev after testing for variance.
telemetry.Min, _ = getFloat64TelemetryPropertyValue([]string{"min"}, metric, &usedFields)
telemetry.Max, _ = getFloat64TelemetryPropertyValue([]string{"max"}, metric, &usedFields)
telemetry.Variance, _ = getFloat64TelemetryPropertyValue([]string{"variance"}, metric, &usedFields)
telemetry.StdDev, _ = getFloat64TelemetryPropertyValue([]string{"stddev"}, metric, &usedFields)
return telemetry, usedFields
}
func (a *ApplicationInsights) createTelemetryForUnusedFields(metric telegraf.Metric, usedFields []string) []appinsights.Telemetry {
fields := metric.Fields()
retval := make([]appinsights.Telemetry, 0, len(fields))
for fieldName := range fields {
if contains(usedFields, fieldName) {
continue
}
telemetry := a.createSimpleMetricTelemetry(metric, fieldName, true)
if telemetry != nil {
retval = append(retval, telemetry)
}
}
return retval
}
func (a *ApplicationInsights) addContextTags(metric telegraf.Metric, telemetry appinsights.Telemetry) {
for contextTagName, tagSourceName := range a.ContextTagSources {
if contextTagValue, found := metric.GetTag(tagSourceName); found {
telemetry.ContextTags()[contextTagName] = contextTagValue
}
}
}
func getFloat64TelemetryPropertyValue(
candidateFields []string,
metric telegraf.Metric,
usedFields *[]string) (float64, error) {
for _, fieldName := range candidateFields {
fieldValue, found := metric.GetField(fieldName)
if !found {
continue
}
metricValue, err := toFloat64(fieldValue)
if err != nil {
continue
}
if usedFields != nil {
*usedFields = append(*usedFields, fieldName)
}
return metricValue, nil
}
return 0.0, fmt.Errorf("No field from the candidate list was found in the metric")
}
func getIntTelemetryPropertyValue(
candidateFields []string,
metric telegraf.Metric,
usedFields *[]string) (int, error) {
for _, fieldName := range candidateFields {
fieldValue, found := metric.GetField(fieldName)
if !found {
continue
}
metricValue, err := toInt(fieldValue)
if err != nil {
continue
}
if usedFields != nil {
*usedFields = append(*usedFields, fieldName)
}
return metricValue, nil
}
return 0, fmt.Errorf("No field from the candidate list was found in the metric")
}
func contains(set []string, val string) bool {
for _, elem := range set {
if elem == val {
return true
}
}
return false
}
func toFloat64(value interface{}) (float64, error) {
// Out of all Golang numerical types Telegraf only uses int64, unit64 and float64 for fields
switch v := value.(type) {
case int64:
return float64(v), nil
case uint64:
return float64(v), nil
case float64:
return v, nil
}
return 0.0, fmt.Errorf("[%s] cannot be converted to a float64 value", value)
}
func toInt(value interface{}) (int, error) {
if !is32BitChecked {
is32BitChecked = true
var i int
if unsafe.Sizeof(i) == 4 {
is32Bit = true
} else {
is32Bit = false
}
}
// Out of all Golang numerical types Telegraf only uses int64, unit64 and float64 for fields
switch v := value.(type) {
case uint64:
if is32Bit {
if v > math.MaxInt32 {
return 0, fmt.Errorf("Value [%d] out of range of 32-bit integers", v)
}
} else {
if v > math.MaxInt64 {
return 0, fmt.Errorf("Value [%d] out of range of 64-bit integers", v)
}
}
return int(v), nil
case int64:
if is32Bit {
if v > math.MaxInt32 || v < math.MinInt32 {
return 0, fmt.Errorf("Value [%d] out of range of 32-bit integers", v)
}
}
return int(v), nil
}
return 0.0, fmt.Errorf("[%s] cannot be converted to an int value", value)
}
func logOutputMsg(level string, format string, v ...interface{}) {
log.Printf(level+"[outputs.application_insights] "+format, v...)
}
func init() {
outputs.Add("application_insights", func() telegraf.Output {
return &ApplicationInsights{
Timeout: internal.Duration{Duration: time.Second * 5},
diagMsgSubscriber: diagnosticsMessageSubscriber{},
// It is very common to set Cloud.RoleName and Cloud.RoleInstance context properties, hence initial capacity of two
ContextTagSources: make(map[string]string, 2),
}
})
}