Skip to content

Commit

Permalink
Merge pull request #3 from celestiaorg/jose/fix-processing-duplications
Browse files Browse the repository at this point in the history
fix(torch): processing duplications
  • Loading branch information
tty47 authored Nov 13, 2023
2 parents 6ae5bd1 + 3c5a220 commit 51f8e1d
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions pkg/k8s/statefulsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/celestiaorg/torch/pkg/db/redis"
)

const queueK8SNodes = "k8s"
const (
queueK8SNodes = "k8s" // queueK8SNodes name of the queue.
daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets.
)

// WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly
func WatchStatefulSets() error {
Expand Down Expand Up @@ -43,10 +46,12 @@ func WatchStatefulSets() error {
// Watch for events on the watcher channel
for event := range watcher.ResultChan() {
if statefulSet, ok := event.Object.(*v1.StatefulSet); ok {
//log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers)
if !ok {
log.Warn("Received an event that is not a StatefulSet. Skipping this resource...")
continue
}

// check if the node is DA, if so, send it to the queue to generate the multi address
if strings.HasPrefix(statefulSet.Name, "da") {
if isStatefulSetValid(statefulSet) {
err := redis.Producer(statefulSet.Name, queueK8SNodes)
if err != nil {
log.Error("ERROR adding the node to the queue: ", err)
Expand All @@ -58,3 +63,11 @@ func WatchStatefulSets() error {

return nil
}

// isStatefulSetValid validates the StatefulSet received.
// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state.
func isStatefulSetValid(statefulSet *v1.StatefulSet) bool {
return strings.HasPrefix(statefulSet.Name, daNodePrefix) &&
statefulSet.Status.CurrentReplicas > 0 &&
statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas
}

0 comments on commit 51f8e1d

Please sign in to comment.