-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformer.go
75 lines (64 loc) · 1.41 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main
import (
"strings"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
// "github.com/pkg/profile"
)
func runTransformer(
inputChan chan *Metric,
outputChans []chan *Metric,
tenant string,
forceTenant bool,
prefix string,
immutablePrefix []string) {
tlog = clog.WithFields(log.Fields{
"thread": "transformer",
"tenant": tenant,
"prefix": prefix,
"immutablePrefix": immutablePrefix,
})
tlog.Info("Starting Transformer.")
for {
select {
case metric := <-inputChan:
go transformMetric(metric, outputChans, tenant, forceTenant, prefix, immutablePrefix)
default:
time.Sleep(100 * time.Millisecond)
}
}
}
func transformMetric(
metric *Metric,
outputChans []chan *Metric,
tenant string,
forceTenant bool,
prefix string,
immutablePrefix []string) {
// Set tenant only if it is empty
if forceTenant == true {
metric.Tenant = tenant
} else {
if metric.Tenant == "" {
metric.Tenant = tenant
}
}
if metric.Prefix == "" && prefix != "" {
mutate := true
for i := range immutablePrefix {
if strings.HasPrefix(metric.Path, immutablePrefix[i]) == true {
mutate = false
}
}
if mutate {
metric.Prefix = prefix + "."
}
}
tlog.WithFields(log.Fields{"metric": metric}).Debug("Metric transformed.")
for i := 0; i < len(outputChans); i++ {
outputChans[i] <- metric
}
// Update state
atomic.AddInt64(&state.Transformed, 1)
}