Skip to content

Commit

Permalink
feat(torch): update from main
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Ramon Mañes <[email protected]>
  • Loading branch information
tty47 committed Nov 14, 2023
2 parents 70ea48d + e79df58 commit 81ee66c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 31 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Torch uses the Kubernetes API to manage the nodes, it gets their multi addresses

Torch automatically detects Load Balancer resources in a Kubernetes cluster and exposes metrics related to these Load Balancers.
The service uses OpenTelemetry to instrument the metrics and Prometheus to expose them.
It uses the Kubernetes API server with a watcher to receive events from it. Then filters the list to include only services of type **LoadBalancer**.
It uses the Kubernetes API server with a watcher to receive events from it. Then filters the list to include only services of type **LoadBalancer**.
For each LoadBalancer service found, it retrieves the LoadBalancer public IP and name and generates metrics with custom labels. These metrics are then exposed via a Prometheus endpoint, making them available for monitoring and visualization in Grafana or other monitoring tools.

---
Expand Down Expand Up @@ -290,7 +290,7 @@ Custom metrics to expose the LoadBalancer public IPs:


---

## Monitoring and Visualization

Torch exposes some custom metrics through the Prometheus endpoint.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
122 changes: 93 additions & 29 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"golang.org/x/sync/errgroup"
"net/http"
"os"
"os/signal"
Expand All @@ -20,7 +21,8 @@ import (
)

const (
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric.
)

// GetHttpPort GetPort retrieves the namespace where the service will be deployed
Expand Down Expand Up @@ -78,9 +80,9 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Started...")
log.Info("Listening on port: " + httpPort)

go BackgroundGenerateLBMetric()
// check if Torch has to generate the metric or not.
BackgroundGenerateHashMetric(cfg)
BackgroundGenerateLBMetric()
go BackgroundGenerateHashMetric(cfg)

// Initialize the goroutine to check the nodes in the queue.
log.Info("Initializing queues to process the nodes...")
Expand Down Expand Up @@ -128,31 +130,6 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Exited Properly")
}

// BackgroundGenerateHashMetric check if we have defined the consensus in the config, if so, it creates a goroutine
// to generate the metric
func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) {
// Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data.
if cfg.MutualPeers[0].ConsensusNode != "" {
log.Info("Initializing goroutine to generate the metric: block_height_1")
// Initialise the goroutine to generate the metric in the background, only if we specify the node in the config.
go func() {
log.Info("Consensus node defined to get the first block")
for {
err := GenerateHashMetrics(cfg)
// check if err is nil, if so, that means that Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block...")
// The metric was successfully generated, stop the retries.
break
}

// Wait for the retry interval before the next execution
time.Sleep(retryInterval)
}
}()
}
}

// BackgroundGenerateLBMetric initializes a goroutine to generate the load_balancer metric.
func BackgroundGenerateLBMetric() {
log.Info("Initializing goroutine to generate the metric: load_balancer ")
Expand All @@ -178,9 +155,96 @@ func BackgroundGenerateLBMetric() {
}
}

// BackgroundGenerateHashMetric checks if the consensusNode field is defined in the config to generate the metric from the Genesis Hash data.
func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) {
log.Info("BackgroundGenerateHashMetric...")

if cfg.MutualPeers[0].ConsensusNode != "" {
log.Info("Initializing goroutine to generate the metric: hash ")

// Create an errgroup with a context
eg, ctx := errgroup.WithContext(context.Background())

// Run the WatchHashMetric function in a separate goroutine
eg.Go(func() error {
log.Info("Consensus node defined to get the first block")
return WatchHashMetric(cfg, ctx)
})

// Wait for all goroutines to finish
if err := eg.Wait(); err != nil {
log.Error("Error in BackgroundGenerateHashMetric: ", err)
// Handle the error as needed
}
}
}

// WatchHashMetric watches for changes to generate hash metrics in the specified interval.
func WatchHashMetric(cfg config.MutualPeersConfig, ctx context.Context) error {
// Create a new context derived from the input context with a timeout
ctx, cancel := context.WithTimeout(ctx, hashMetricGenTimeout)
defer cancel()

// Create an errgroup with the context
eg, ctx := errgroup.WithContext(ctx)

// Run the WatchHashMetric function in a separate goroutine
eg.Go(func() error {
return watchMetricsWithRetry(cfg, ctx)
})

// Wait for all goroutines to finish
return eg.Wait()
}

// watchMetricsWithRetry is a helper function for WatchHashMetric that encapsulates the retry logic.
func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) error {
// Continue generating metrics with retries
for {
select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
return ctx.Err()
default:
err := GenerateHashMetrics(cfg)
// Check if err is nil, if so, Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block, let's stop the process successfully...")
// The metric was successfully generated, stop the retries.
return nil
}

// Log the error
log.Error("Error generating hash metrics: ", err)

// Wait for the retry interval before the next execution using a timer
if err := waitForRetry(ctx); err != nil {
return err
}
}
}
}

// waitForRetry is a helper function to wait for the retry interval or stop if the context is canceled.
func waitForRetry(ctx context.Context) error {
timer := time.NewTimer(retryInterval)
defer timer.Stop()

select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
return ctx.Err()
case <-timer.C:
// Continue to the next iteration
return nil
}
}

// GenerateHashMetrics generates the metric by getting the first block and calculating the days.
func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
log.Info("Generating the metric for the first block generated...")
log.Info("Trying to generate the metric for the first block generated...")

// Get the genesisHash
blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode)
Expand Down
6 changes: 6 additions & 0 deletions pkg/k8s/statefulsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func WatchStatefulSets() error {
for event := range watcher.ResultChan() {
// Check if the event object is of type *v1.StatefulSet
if statefulSet, ok := event.Object.(*v1.StatefulSet); ok {
if !ok {
// If it's not a StatefulSet, log a warning and continue with the next event.
log.Warn("Received an event that is not a StatefulSet. Skipping this resource...")
continue
}

// Check if the StatefulSet is valid based on the conditions
if isStatefulSetValid(statefulSet) {
// Perform necessary actions, such as adding the node to the Redis queue
Expand Down

0 comments on commit 81ee66c

Please sign in to comment.