From 4398782f21a7734c79f2728216ae1fba70debdc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Wed, 8 Nov 2023 11:52:59 +0100 Subject: [PATCH 1/4] fix(torch): processing duplications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/k8s/statefulsets.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index b556847..ec5ce3c 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -40,17 +40,28 @@ func WatchStatefulSets() error { return err } + // Variable to keep track of the last processed name + lastProcessedName := "" + // Watch for events on the watcher channel for event := range watcher.ResultChan() { if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { - //log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers) - - // check if the node is DA, if so, send it to the queue to generate the multi address + // Check if the name has the "da" prefix if strings.HasPrefix(statefulSet.Name, "da") { - err := redis.Producer(statefulSet.Name, queueK8SNodes) - if err != nil { - log.Error("ERROR adding the node to the queue: ", err) - return err + // Check if the name is different from the last processed one + if statefulSet.Name != lastProcessedName { + // Process the name and perform necessary actions + err := redis.Producer(statefulSet.Name, queueK8SNodes) + if err != nil { + log.Error("ERROR adding the node to the queue: ", err) + return err + } + + // Update the last processed name to don't process it more than once. + lastProcessedName = statefulSet.Name + } else { + // cleanup the previous processed to add it in future iterations. + lastProcessedName = "" } } } From eddceca7b2f92c22071bd9f51a24fcc8f143db75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Thu, 9 Nov 2023 12:18:07 +0100 Subject: [PATCH 2/4] fix(torch): refactor extract to func - check if pods are in Running state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/k8s/statefulsets.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index ec5ce3c..9defb87 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -13,7 +13,10 @@ import ( "github.com/celestiaorg/torch/pkg/db/redis" ) -const queueK8SNodes = "k8s" +const ( + queueK8SNodes = "k8s" // queueK8SNodes name of the queue. + daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets. +) // WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly func WatchStatefulSets() error { @@ -40,28 +43,17 @@ func WatchStatefulSets() error { return err } - // Variable to keep track of the last processed name - lastProcessedName := "" - // Watch for events on the watcher channel for event := range watcher.ResultChan() { + // Check if the event object is of type *v1.StatefulSet if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { - // Check if the name has the "da" prefix - if strings.HasPrefix(statefulSet.Name, "da") { - // Check if the name is different from the last processed one - if statefulSet.Name != lastProcessedName { - // Process the name and perform necessary actions - err := redis.Producer(statefulSet.Name, queueK8SNodes) - if err != nil { - log.Error("ERROR adding the node to the queue: ", err) - return err - } - - // Update the last processed name to don't process it more than once. - lastProcessedName = statefulSet.Name - } else { - // cleanup the previous processed to add it in future iterations. - lastProcessedName = "" + // Check if the StatefulSet is valid based on the conditions + if isStatefulSetValid(statefulSet) { + // Perform necessary actions, such as adding the node to the Redis queue + err := redis.Producer(statefulSet.Name, queueK8SNodes) + if err != nil { + log.Error("ERROR adding the node to the queue: ", err) + return err } } } @@ -69,3 +61,11 @@ func WatchStatefulSets() error { return nil } + +// isStatefulSetValid validates the StatefulSet received. +// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state. +func isStatefulSetValid(statefulSet *v1.StatefulSet) bool { + return strings.HasPrefix(statefulSet.Name, daNodePrefix) && + statefulSet.Status.CurrentReplicas > 0 && + statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas +} From aa62e48d5124af728df7e482416054316c0836b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Fri, 10 Nov 2023 10:49:17 +0100 Subject: [PATCH 3/4] fix(torch): check statements before continue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/k8s/statefulsets.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index 9defb87..1917955 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -47,6 +47,12 @@ func WatchStatefulSets() error { for event := range watcher.ResultChan() { // Check if the event object is of type *v1.StatefulSet if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { + if !ok { + // If it's not a StatefulSet, log a warning and continue with the next event. + log.Warn("Received an event that is not a StatefulSet. Skipping this resource...") + continue + } + // Check if the StatefulSet is valid based on the conditions if isStatefulSetValid(statefulSet) { // Perform necessary actions, such as adding the node to the Redis queue From 3c5a220f3d24d21592a87fa9baa0c1a017e6653c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Mon, 13 Nov 2023 11:32:57 +0100 Subject: [PATCH 4/4] fix(torch): cleanup not needed comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/k8s/statefulsets.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index 1917955..2d3ae3b 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -45,17 +45,13 @@ func WatchStatefulSets() error { // Watch for events on the watcher channel for event := range watcher.ResultChan() { - // Check if the event object is of type *v1.StatefulSet if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { if !ok { - // If it's not a StatefulSet, log a warning and continue with the next event. log.Warn("Received an event that is not a StatefulSet. Skipping this resource...") continue } - // Check if the StatefulSet is valid based on the conditions if isStatefulSetValid(statefulSet) { - // Perform necessary actions, such as adding the node to the Redis queue err := redis.Producer(statefulSet.Name, queueK8SNodes) if err != nil { log.Error("ERROR adding the node to the queue: ", err)