diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index b556847..2d3ae3b 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -13,7 +13,10 @@ import ( "github.com/celestiaorg/torch/pkg/db/redis" ) -const queueK8SNodes = "k8s" +const ( + queueK8SNodes = "k8s" // queueK8SNodes name of the queue. + daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets. +) // WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly func WatchStatefulSets() error { @@ -43,10 +46,12 @@ func WatchStatefulSets() error { // Watch for events on the watcher channel for event := range watcher.ResultChan() { if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { - //log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers) + if !ok { + log.Warn("Received an event that is not a StatefulSet. Skipping this resource...") + continue + } - // check if the node is DA, if so, send it to the queue to generate the multi address - if strings.HasPrefix(statefulSet.Name, "da") { + if isStatefulSetValid(statefulSet) { err := redis.Producer(statefulSet.Name, queueK8SNodes) if err != nil { log.Error("ERROR adding the node to the queue: ", err) @@ -58,3 +63,11 @@ func WatchStatefulSets() error { return nil } + +// isStatefulSetValid validates the StatefulSet received. +// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state. +func isStatefulSetValid(statefulSet *v1.StatefulSet) bool { + return strings.HasPrefix(statefulSet.Name, daNodePrefix) && + statefulSet.Status.CurrentReplicas > 0 && + statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas +}