Skip to content

Commit

Permalink
feat: add name and labels in cluster metrics (#17870) (#18453)
Browse files Browse the repository at this point in the history
Signed-off-by: flbla <[email protected]>
Signed-off-by: Alexandre Gaudreault <[email protected]>
Co-authored-by: Alexandre Gaudreault <[email protected]>
  • Loading branch information
flbla and agaudreault authored Jan 29, 2025
1 parent e147247 commit e4311d8
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewCommand() *cobra.Command {
metricsCacheExpiration time.Duration
metricsAplicationLabels []string
metricsAplicationConditions []string
metricsClusterLabels []string
kubectlParallelismLimit int64
cacheSource func() (*appstatecache.Cache, error)
redisClient *redis.Client
Expand Down Expand Up @@ -202,6 +203,7 @@ func NewCommand() *cobra.Command {
metricsCacheExpiration,
metricsAplicationLabels,
metricsAplicationConditions,
metricsClusterLabels,
kubectlParallelismLimit,
persistResourceHealth,
clusterSharding,
Expand Down Expand Up @@ -272,6 +274,7 @@ func NewCommand() *cobra.Command {
command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_STRICT_TLS", false), "Whether to use strict validation of the TLS cert presented by the repo server")
command.Flags().StringSliceVar(&metricsAplicationLabels, "metrics-application-labels", []string{}, "List of Application labels that will be added to the argocd_application_labels metric")
command.Flags().StringSliceVar(&metricsAplicationConditions, "metrics-application-conditions", []string{}, "List of Application conditions that will be added to the argocd_application_conditions metric")
command.Flags().StringSliceVar(&metricsClusterLabels, "metrics-cluster-labels", []string{}, "List of Cluster labels that will be added to the argocd_cluster_labels metric")
command.Flags().StringVar(&otlpAddress, "otlp-address", env.StringFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS", ""), "OpenTelemetry collector address to send traces to")
command.Flags().BoolVar(&otlpInsecure, "otlp-insecure", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_INSECURE", true), "OpenTelemetry collector insecure mode")
command.Flags().StringToStringVar(&otlpHeaders, "otlp-headers", env.ParseStringToStringFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_HEADERS", map[string]string{}, ","), "List of OpenTelemetry collector extra headers sent with traces, headers are comma-separated key-value pairs(e.g. key1=value1,key2=value2)")
Expand Down
5 changes: 4 additions & 1 deletion controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type ApplicationController struct {
refreshRequestedApps map[string]CompareWith
refreshRequestedAppsMutex *sync.Mutex
metricsServer *metrics.MetricsServer
metricsClusterLabels []string
kubectlSemaphore *semaphore.Weighted
clusterSharding sharding.ClusterShardingCache
projByNameCache sync.Map
Expand Down Expand Up @@ -173,6 +174,7 @@ func NewApplicationController(
metricsCacheExpiration time.Duration,
metricsApplicationLabels []string,
metricsApplicationConditions []string,
metricsClusterLabels []string,
kubectlParallelismLimit int64,
persistResourceHealth bool,
clusterSharding sharding.ClusterShardingCache,
Expand Down Expand Up @@ -218,6 +220,7 @@ func NewApplicationController(
applicationNamespaces: applicationNamespaces,
dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled,
ignoreNormalizerOpts: ignoreNormalizerOpts,
metricsClusterLabels: metricsClusterLabels,
}
if hydratorEnabled {
ctrl.hydrator = hydrator.NewHydrator(&ctrl, appResyncPeriod, commitClientset)
Expand Down Expand Up @@ -857,8 +860,8 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int
defer ctrl.appHydrateQueue.ShutDown()
defer ctrl.hydrationQueue.ShutDown()

ctrl.metricsServer.RegisterClustersInfoSource(ctx, ctrl.stateCache)
ctrl.RegisterClusterSecretUpdater(ctx)
ctrl.metricsServer.RegisterClustersInfoSource(ctx, ctrl.stateCache, ctrl.db, ctrl.metricsClusterLabels)

go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())
Expand Down
1 change: 1 addition & 0 deletions controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func newFakeControllerWithResync(data *fakeData, appResyncPeriod time.Duration,
data.metricsCacheExpiration,
[]string{},
[]string{},
[]string{},
0,
true,
nil,
Expand Down
136 changes: 118 additions & 18 deletions controller/metrics/clustercollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,27 @@ import (
"time"

"github.com/argoproj/gitops-engine/pkg/cache"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

argoappv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
metricsutil "github.com/argoproj/argo-cd/v3/util/metrics"
)

const (
metricsCollectionInterval = 30 * time.Second
metricsCollectionTimeout = 10 * time.Second
)

var (
descClusterDefaultLabels = []string{"server"}

descClusterLabels *prometheus.Desc

descClusterInfo = prometheus.NewDesc(
"argocd_cluster_info",
"Information about cluster.",
append(descClusterDefaultLabels, "k8s_version"),
append(descClusterDefaultLabels, "k8s_version", "name"),
nil,
)
descClusterCacheResources = prometheus.NewDesc(
Expand Down Expand Up @@ -53,26 +59,99 @@ type HasClustersInfo interface {
GetClustersInfo() []cache.ClusterInfo
}

type ClusterLister func(ctx context.Context) (*argoappv1.ClusterList, error)

type clusterCollector struct {
infoSource HasClustersInfo
info []cache.ClusterInfo
lock sync.Mutex
infoSource HasClustersInfo
lock sync.RWMutex
clusterLabels []string
clusterLister ClusterLister

latestInfo []*clusterData
}

type clusterData struct {
info *cache.ClusterInfo
cluster *argoappv1.Cluster
}

func (c *clusterCollector) Run(ctx context.Context) {
func NewClusterCollector(ctx context.Context, source HasClustersInfo, clusterLister ClusterLister, clusterLabels []string) prometheus.Collector {
if len(clusterLabels) > 0 {
normalizedClusterLabels := metricsutil.NormalizeLabels("label", clusterLabels)
descClusterLabels = prometheus.NewDesc(
"argocd_cluster_labels",
"Argo Cluster labels converted to Prometheus labels",
append(append(descClusterDefaultLabels, "name"), normalizedClusterLabels...),
nil,
)
}

collector := &clusterCollector{
infoSource: source,
clusterLabels: clusterLabels,
clusterLister: clusterLister,
lock: sync.RWMutex{},
}

collector.setClusterData()
go collector.run(ctx)

return collector
}

func (c *clusterCollector) run(ctx context.Context) {
//nolint:staticcheck // FIXME: complains about SA1015
tick := time.Tick(metricsCollectionInterval)
for {
select {
case <-ctx.Done():
case <-tick:
info := c.infoSource.GetClustersInfo()
c.setClusterData()
}
}
}

func (c *clusterCollector) setClusterData() {
if clusterData, err := c.getClusterData(); err == nil {
c.lock.Lock()
c.latestInfo = clusterData
c.lock.Unlock()
} else {
log.Warnf("error collecting cluster metrics: %v", err)
}
}

func (c *clusterCollector) getClusterData() ([]*clusterData, error) {
clusterDatas := []*clusterData{}
clusterInfos := c.infoSource.GetClustersInfo()

ctx, cancel := context.WithTimeout(context.Background(), metricsCollectionTimeout)
defer cancel()
clusters, err := c.clusterLister(ctx)
if err != nil {
return nil, err
}

c.lock.Lock()
c.info = info
c.lock.Unlock()
clusterMap := map[string]*argoappv1.Cluster{}
for i, cluster := range clusters.Items {
clusterMap[cluster.Server] = &clusters.Items[i]
}

// Base the cluster data on the ClusterInfo because it only contains the
// clusters managed by this controller instance
for i, info := range clusterInfos {
cluster, ok := clusterMap[info.Server]
if !ok {
// This should not happen, but we cannot emit incomplete metrics, so we skip this cluster
log.WithField("server", info.Server).Warnf("could find cluster for metrics collection")
continue
}
clusterDatas = append(clusterDatas, &clusterData{
info: &clusterInfos[i],
cluster: cluster,
})
}
return clusterDatas, nil
}

// Describe implements the prometheus.Collector interface
Expand All @@ -82,20 +161,41 @@ func (c *clusterCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- descClusterAPIs
ch <- descClusterCacheAgeSeconds
ch <- descClusterConnectionStatus
if len(c.clusterLabels) > 0 {
ch <- descClusterLabels
}
}

func (c *clusterCollector) Collect(ch chan<- prometheus.Metric) {
c.lock.RLock()
latestInfo := c.latestInfo
c.lock.RUnlock()

now := time.Now()
for _, c := range c.info {
defaultValues := []string{c.Server}
ch <- prometheus.MustNewConstMetric(descClusterInfo, prometheus.GaugeValue, 1, append(defaultValues, c.K8SVersion)...)
ch <- prometheus.MustNewConstMetric(descClusterCacheResources, prometheus.GaugeValue, float64(c.ResourcesCount), defaultValues...)
ch <- prometheus.MustNewConstMetric(descClusterAPIs, prometheus.GaugeValue, float64(c.APIsCount), defaultValues...)
for _, clusterData := range latestInfo {
info := clusterData.info
name := clusterData.cluster.Name
labels := clusterData.cluster.Labels

defaultValues := []string{info.Server}
ch <- prometheus.MustNewConstMetric(descClusterInfo, prometheus.GaugeValue, 1, append(defaultValues, info.K8SVersion, name)...)
ch <- prometheus.MustNewConstMetric(descClusterCacheResources, prometheus.GaugeValue, float64(info.ResourcesCount), defaultValues...)
ch <- prometheus.MustNewConstMetric(descClusterAPIs, prometheus.GaugeValue, float64(info.APIsCount), defaultValues...)
cacheAgeSeconds := -1
if c.LastCacheSyncTime != nil {
cacheAgeSeconds = int(now.Sub(*c.LastCacheSyncTime).Seconds())
if info.LastCacheSyncTime != nil {
cacheAgeSeconds = int(now.Sub(*info.LastCacheSyncTime).Seconds())
}
ch <- prometheus.MustNewConstMetric(descClusterCacheAgeSeconds, prometheus.GaugeValue, float64(cacheAgeSeconds), defaultValues...)
ch <- prometheus.MustNewConstMetric(descClusterConnectionStatus, prometheus.GaugeValue, boolFloat64(c.SyncError == nil), append(defaultValues, c.K8SVersion)...)
ch <- prometheus.MustNewConstMetric(descClusterConnectionStatus, prometheus.GaugeValue, boolFloat64(info.SyncError == nil), append(defaultValues, info.K8SVersion)...)

if len(c.clusterLabels) > 0 && labels != nil {
labelValues := []string{}
labelValues = append(labelValues, info.Server, name)
for _, desiredLabel := range c.clusterLabels {
value := labels[desiredLabel]
labelValues = append(labelValues, value)
}
ch <- prometheus.MustNewConstMetric(descClusterLabels, prometheus.GaugeValue, 1, labelValues...)
}
}
}
55 changes: 42 additions & 13 deletions controller/metrics/clustercollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,36 @@ import (
"testing"

gitopsCache "github.com/argoproj/gitops-engine/pkg/cache"
"github.com/stretchr/testify/mock"

dbmocks "github.com/argoproj/argo-cd/v3/util/db/mocks"

"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
)

func TestMetricClusterConnectivity(t *testing.T) {
db := dbmocks.ArgoDB{}
cluster1 := v1alpha1.Cluster{Name: "cluster1", Server: "server1", Labels: map[string]string{"env": "dev", "team": "team1"}}
cluster2 := v1alpha1.Cluster{Name: "cluster2", Server: "server2", Labels: map[string]string{"env": "staging", "team": "team2"}}
cluster3 := v1alpha1.Cluster{Name: "cluster3", Server: "server3", Labels: map[string]string{"env": "production", "team": "team3"}}
clusterList := &v1alpha1.ClusterList{Items: []v1alpha1.Cluster{cluster1, cluster2, cluster3}}
db.On("ListClusters", mock.Anything).Return(clusterList, nil)

type testCases struct {
testCombination
skip bool
description string
metricLabels []string
clustersInfo []gitopsCache.ClusterInfo
skip bool
description string
metricLabels []string
clusterLabels []string
clustersInfo []gitopsCache.ClusterInfo
}

cases := []testCases{
{
description: "metric will have value 1 if connected with the cluster",
skip: false,
metricLabels: []string{"non-existing"},
description: "metric will have value 1 if connected with the cluster",
skip: false,
metricLabels: []string{"non-existing"},
clusterLabels: []string{"env"},
testCombination: testCombination{
applications: []string{fakeApp},
responseContains: `
Expand All @@ -36,9 +51,10 @@ argocd_cluster_connection_status{k8s_version="1.21",server="server1"} 1
},
},
{
description: "metric will have value 0 if not connected with the cluster",
skip: false,
metricLabels: []string{"non-existing"},
description: "metric will have value 0 if not connected with the cluster",
skip: false,
metricLabels: []string{"non-existing"},
clusterLabels: []string{"env"},
testCombination: testCombination{
applications: []string{fakeApp},
responseContains: `
Expand All @@ -55,16 +71,27 @@ argocd_cluster_connection_status{k8s_version="1.21",server="server1"} 0
},
},
{
description: "will have one metric per cluster",
skip: false,
metricLabels: []string{"non-existing"},
description: "will have one metric per cluster",
skip: false,
metricLabels: []string{"non-existing"},
clusterLabels: []string{"env", "team"},
testCombination: testCombination{
applications: []string{fakeApp},
responseContains: `
# TYPE argocd_cluster_connection_status gauge
argocd_cluster_connection_status{k8s_version="1.21",server="server1"} 1
argocd_cluster_connection_status{k8s_version="1.21",server="server2"} 1
argocd_cluster_connection_status{k8s_version="1.21",server="server3"} 1
# TYPE argocd_cluster_info gauge
argocd_cluster_info{k8s_version="1.21",name="cluster1",server="server1"} 1
argocd_cluster_info{k8s_version="1.21",name="cluster2",server="server2"} 1
argocd_cluster_info{k8s_version="1.21",name="cluster3",server="server3"} 1
# TYPE argocd_cluster_labels gauge
argocd_cluster_labels{label_env="dev",label_team="team1",name="cluster1",server="server1"} 1
argocd_cluster_labels{label_env="staging",label_team="team2",name="cluster2",server="server2"} 1
argocd_cluster_labels{label_env="production",label_team="team3",name="cluster3",server="server3"} 1
`,
},
clustersInfo: []gitopsCache.ClusterInfo{
Expand Down Expand Up @@ -95,7 +122,9 @@ argocd_cluster_connection_status{k8s_version="1.21",server="server3"} 1
FakeAppYAMLs: c.applications,
ExpectedResponse: c.responseContains,
AppLabels: c.metricLabels,
ClusterLabels: c.clusterLabels,
ClustersInfo: c.clustersInfo,
ClusterLister: db.ListClusters,
}
runTest(t, cfg)
}
Expand Down
6 changes: 3 additions & 3 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/argoproj/argo-cd/v3/common"
argoappv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
applister "github.com/argoproj/argo-cd/v3/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v3/util/db"
"github.com/argoproj/argo-cd/v3/util/git"
"github.com/argoproj/argo-cd/v3/util/healthz"
metricsutil "github.com/argoproj/argo-cd/v3/util/metrics"
Expand Down Expand Up @@ -222,9 +223,8 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, appFil
}, nil
}

func (m *MetricsServer) RegisterClustersInfoSource(ctx context.Context, source HasClustersInfo) {
collector := &clusterCollector{infoSource: source}
go collector.Run(ctx)
func (m *MetricsServer) RegisterClustersInfoSource(ctx context.Context, source HasClustersInfo, db db.ArgoDB, clusterLabels []string) {
collector := NewClusterCollector(ctx, source, db.ListClusters, clusterLabels)
m.registry.MustRegister(collector)
}

Expand Down
Loading

0 comments on commit e4311d8

Please sign in to comment.