Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OM-184 - support pod-name/ server-ip #116

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Config struct {
OtelPushInterval uint8 `toml:"push_interval"`
OtelServerStatFetchInterval uint8 `toml:"server_stat_fetch_interval"`
} `toml:"OpenTelemetry"`

IsKubernetes bool
KubernetesPodName string
} `toml:"Agent"`

Aerospike struct {
Expand Down Expand Up @@ -200,6 +203,27 @@ func (c *Config) FetchCloudInfo(md toml.MetaData) {
}
}

func (c *Config) FetchKubernetesInfo(md toml.MetaData) {
// use kubectl to fetch required Kubernetes context and find the required Kubenetes environment variables
envKubeServiceHost := os.Getenv("KUBERNETES_SERVICE_HOST")

Cfg.Agent.IsKubernetes = false

if envKubeServiceHost != "" && len(strings.TrimSpace(envKubeServiceHost)) > 0 {
Cfg.Agent.IsKubernetes = true
log.Info("Exporter is running in Kubernetes")

// get host-name
var err error
Cfg.Agent.KubernetesPodName, err = os.Hostname()
if err != nil {
log.Errorln(err)
return
}

}
}

// Initialize exporter configuration
func InitConfig(configFile string) {
// to print everything out regarding reading the config in app init
Expand All @@ -225,6 +249,8 @@ func InitConfig(configFile string) {

Cfg.ValidateAndUpdate(md)
Cfg.FetchCloudInfo(md)

Cfg.FetchKubernetesInfo(md)
}

// Set log file path
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/executors/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func sendNodeUp(meter metric.Meter, ctx context.Context, commonLabels []attribut
metric.WithDescription("Aerospike node active status"),
)

if config.Cfg.Agent.IsKubernetes {
statprocessors.Service = config.Cfg.Agent.KubernetesPodName
}

labels := []attribute.KeyValue{
attribute.String("cluster_name", statprocessors.ClusterName),
attribute.String("service", statprocessors.Service),
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/executors/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (o *PrometheusImpl) Collect(ch chan<- prometheus.Metric) {
return
}

// if kubernetes then send host-name/pod-name else send server-ip as-isnh
if config.Cfg.Agent.IsKubernetes {
statprocessors.Service = config.Cfg.Agent.KubernetesPodName
}
ch <- prometheus.MustNewConstMetric(nodeActiveDesc, prometheus.GaugeValue, 1.0, statprocessors.ClusterName, statprocessors.Service, statprocessors.Build)

for _, wm := range refreshed_metrics {
Expand Down
6 changes: 2 additions & 4 deletions internal/pkg/statprocessors/sp_host_systeminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func getNetworkInfo() []AerospikeStat {
arrReceiveStats, arrTransferStats := dataprovider.GetSystemProvider().GetNetDevStats()

// netdev receive
clusterName := ClusterName
service := Service
for _, stats := range arrReceiveStats {
deviceName := stats["device_name"]
statName := "receive_bytes_total"
Expand All @@ -132,7 +130,7 @@ func getNetworkInfo() []AerospikeStat {
continue
}

labelValues := []string{clusterName, service, deviceName}
labelValues := []string{ClusterName, Service, deviceName}

allowed := isMetricAllowed(commons.CTX_SYSINFO_NETWORK_STATS, statName)
sysMetric := NewAerospikeStat(commons.CTX_SYSINFO_NETWORK_STATS, statName, allowed)
Expand All @@ -154,7 +152,7 @@ func getNetworkInfo() []AerospikeStat {
continue
}

labelValues := []string{clusterName, service, deviceName}
labelValues := []string{ClusterName, Service, deviceName}
allowed := isMetricAllowed(commons.CTX_SYSINFO_NETWORK_STATS, statName)
sysMetric := NewAerospikeStat(commons.CTX_SYSINFO_NETWORK_STATS, statName, allowed)
sysMetric.Labels = networkLabels
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/statprocessors/sp_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strings"

commons "github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/commons"
"github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/config"
config "github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/config"

log "github.com/sirupsen/logrus"
)
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/statprocessors/sp_namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (nw *NamespaceStatsProcessor) refreshIndexPressure(singleInfoKey string, in
nsName := values[0]

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS}
labelValues := []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], nsName}
labelValues := []string{ClusterName, Service, nsName}

// Server index-pressure output: test:0:0;bar_device:0:0;materials:0:0
// ignore first element - namespace
Expand Down Expand Up @@ -182,7 +182,7 @@ func (nw *NamespaceStatsProcessor) refreshNamespaceStats(singleInfoKey string, i
// default: aerospike_namespace_<stat-name>
constructedStatname = stat
labels = []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS}
labelValues = []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], nsName}
labelValues = []string{ClusterName, Service, nsName}

if isArrayType {
constructedStatname, labels, labelValues = nw.handleArrayStats(nsName, stat, pv, stats, deviceType, rawMetrics)
Expand Down Expand Up @@ -287,7 +287,7 @@ func (nw *NamespaceStatsProcessor) handleArrayStats(nsName string, statToProcess
compositeStatName := deviceType + "_" + statType + "_" + statName
deviceOrFileName := allNamespaceStats[deviceType+"."+statType+"["+statIndex+"]"]
labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS, statType + "_index", statType}
labelValues := []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], nsName, statIndex, deviceOrFileName}
labelValues := []string{ClusterName, Service, nsName, statIndex, deviceOrFileName}

return compositeStatName, labels, labelValues

Expand Down
11 changes: 4 additions & 7 deletions internal/pkg/statprocessors/sp_node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,14 @@ func (sw *NodeStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
log.Tracef("node-configs:%s", nodeConfigs)
log.Tracef("node-stats:%s", nodeStats)

clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

// we are sending configs and stats in same refresh call, as both are being sent to prom, instead of doing prom-push in 2 functions
// handle configs
var allMetricsToSend = []AerospikeStat{}

lCfgMetricsToSend := sw.handleRefresh(nodeConfigs, clusterName, service)
lCfgMetricsToSend := sw.handleRefresh(nodeConfigs)

// handle stats
lStatMetricsToSend := sw.handleRefresh(nodeStats, clusterName, service)
lStatMetricsToSend := sw.handleRefresh(nodeStats)

// merge both array into single
allMetricsToSend = append(allMetricsToSend, lCfgMetricsToSend...)
Expand All @@ -61,7 +58,7 @@ func (sw *NodeStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
return allMetricsToSend, nil
}

func (sw *NodeStatsProcessor) handleRefresh(nodeRawMetrics string, clusterName string, service string) []AerospikeStat {
func (sw *NodeStatsProcessor) handleRefresh(nodeRawMetrics string) []AerospikeStat {

stats := commons.ParseStats(nodeRawMetrics, ";")

Expand All @@ -81,7 +78,7 @@ func (sw *NodeStatsProcessor) handleRefresh(nodeRawMetrics string, clusterName s
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE}
labelValues := []string{clusterName, service}
labelValues := []string{ClusterName, Service}

// pushToPrometheus(asMetric, pv, labels, labelsValues)
asMetric.updateValues(pv, labels, labelValues)
Expand Down
5 changes: 1 addition & 4 deletions internal/pkg/statprocessors/sp_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (sw *SetsStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
var allMetricsToSend = []AerospikeStat{}

for i := range setStats {
clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

stats := commons.ParseStats(setStats[i], ":")
for stat, value := range stats {
pv, err := commons.TryConvert(value)
Expand All @@ -59,7 +56,7 @@ func (sw *SetsStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS, commons.METRIC_LABEL_SET}
labelValues := []string{clusterName, service, stats["ns"], stats["set"]}
labelValues := []string{ClusterName, Service, stats["ns"], stats["set"]}

// pushToPrometheus(asMetric, pv, labels, labelsValues, ch)
asMetric.updateValues(pv, labels, labelValues)
Expand Down
5 changes: 1 addition & 4 deletions internal/pkg/statprocessors/sp_sindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func (siw *SindexStatsProcessor) Refresh(infoKeys []string, rawMetrics map[strin
sindexName := sindexInfoKeySplit[1]
log.Tracef("sindex-stats:%s:%s:%s", nsName, sindexName, rawMetrics[sindex])

clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

stats := commons.ParseStats(rawMetrics[sindex], ";")
for stat, value := range stats {
pv, err := commons.TryConvert(value)
Expand All @@ -90,7 +87,7 @@ func (siw *SindexStatsProcessor) Refresh(infoKeys []string, rawMetrics map[strin
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS, commons.METRIC_LABEL_SINDEX}
labelValues := []string{clusterName, service, nsName, sindexName}
labelValues := []string{ClusterName, Service, nsName, sindexName}

asMetric.updateValues(pv, labels, labelValues)
allMetricsToSend = append(allMetricsToSend, asMetric)
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/statprocessors/sp_users.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,21 @@ func (uw *UserStatsProcessor) refreshUserStats(infoKeys []string, rawMetrics map
readInfoStats := []string{"read_quota", "read_single_record_tps", "read_scan_query_rps", "limitless_read_scan_query"}
writeInfoStats := []string{"write_quota", "write_single_record_tps", "write_scan_query_rps", "limitless_write_scan_query"}

asMetric, labels, labelValues := internalCreateLocalAerospikeStat(rawMetrics, "conns_in_use", user.User)
asMetric, labels, labelValues := internalCreateLocalAerospikeStat("conns_in_use", user.User)
asMetric.updateValues(float64(user.ConnsInUse), labels, labelValues)
allMetricsToSend = append(allMetricsToSend, asMetric)

if len(user.ReadInfo) >= 4 && len(user.WriteInfo) >= 4 {

for idxReadinfo := 0; idxReadinfo < len(user.ReadInfo); idxReadinfo++ {
riAeroMetric, riLabels, riLabelValues := internalCreateLocalAerospikeStat(rawMetrics, readInfoStats[idxReadinfo], user.User)
riAeroMetric, riLabels, riLabelValues := internalCreateLocalAerospikeStat(readInfoStats[idxReadinfo], user.User)
riAeroMetric.updateValues(float64(user.ReadInfo[idxReadinfo]), riLabels, riLabelValues)

allMetricsToSend = append(allMetricsToSend, riAeroMetric)

}
for idxWriteinfo := 0; idxWriteinfo < len(user.WriteInfo); idxWriteinfo++ {
wiAeroMetric, wiLabels, wiLabelValues := internalCreateLocalAerospikeStat(rawMetrics, writeInfoStats[idxWriteinfo], user.User)
wiAeroMetric, wiLabels, wiLabelValues := internalCreateLocalAerospikeStat(writeInfoStats[idxWriteinfo], user.User)
wiAeroMetric.updateValues(float64(user.WriteInfo[idxWriteinfo]), wiLabels, wiLabelValues)
allMetricsToSend = append(allMetricsToSend, wiAeroMetric)

Expand All @@ -147,9 +147,9 @@ func (uw *UserStatsProcessor) refreshUserStats(infoKeys []string, rawMetrics map
return allMetricsToSend, nil
}

func internalCreateLocalAerospikeStat(rawMetrics map[string]string, pStatName string, username string) (AerospikeStat, []string, []string) {
func internalCreateLocalAerospikeStat(pStatName string, username string) (AerospikeStat, []string, []string) {
labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_USER}
labelValues := []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], username}
labelValues := []string{ClusterName, Service, username}
allowed := isMetricAllowed(commons.CTX_USERS, pStatName)
asMetric := NewAerospikeStat(commons.CTX_USERS, pStatName, allowed)

Expand Down
11 changes: 4 additions & 7 deletions internal/pkg/statprocessors/sp_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,14 @@ func (xw *XdrStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]st
xw.xdrMetrics = make(map[string]AerospikeStat)
}

clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

var allMetricsToSend = []AerospikeStat{}

for _, key := range infoKeys {

xdrRawMetrics := rawMetrics[key]
// find and construct metric name
dcName, ns, metricPrefix := xw.constructMetricNamePrefix(key)
tmpXdrMetricsToSend := xw.handleRefresh(key, xdrRawMetrics, clusterName, service, dcName, ns, metricPrefix)
tmpXdrMetricsToSend := xw.handleRefresh(key, xdrRawMetrics, dcName, ns, metricPrefix)

allMetricsToSend = append(allMetricsToSend, tmpXdrMetricsToSend...)
}
Expand Down Expand Up @@ -108,7 +105,7 @@ func (xw *XdrStatsProcessor) constructMetricNamePrefix(infoKeyToProcess string)
}

func (xw *XdrStatsProcessor) handleRefresh(infoKeyToProcess string, xdrRawMetrics string,
clusterName string, service string, dcName string, ns string, metricPrefix string) []AerospikeStat {
dcName string, ns string, metricPrefix string) []AerospikeStat {
log.Tracef("xdr-%s:%s", infoKeyToProcess, xdrRawMetrics)

stats := commons.ParseStats(xdrRawMetrics, ";")
Expand All @@ -129,12 +126,12 @@ func (xw *XdrStatsProcessor) handleRefresh(infoKeyToProcess string, xdrRawMetric
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_DC_NAME}
labelValues := []string{clusterName, service, dcName}
labelValues := []string{ClusterName, Service, dcName}

// if namespace exists, add it to the label and label-values array
if len(ns) > 0 {
labels = []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_DC_NAME, commons.METRIC_LABEL_NS}
labelValues = []string{clusterName, service, dcName, ns}
labelValues = []string{ClusterName, Service, dcName, ns}
}

// pushToPrometheus(asMetric, pv, labels, labelsValues, ch)
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/statprocessors/statsrefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package statprocessors

import (
commons "github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/commons"
"github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/config"
"github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/dataprovider"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -68,6 +69,9 @@ func Refresh() ([]AerospikeStat, error) {

// set global values
ClusterName, Service, Build = rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], rawMetrics[Infokey_Build]
if config.Cfg.Agent.IsKubernetes {
Service = config.Cfg.Agent.KubernetesPodName
}

// sanitize the utf8 strings before sending them to watchers
for k, v := range rawMetrics {
Expand Down
Loading