Skip to content

Commit

Permalink
Merge pull request #5 from celestiaorg/jose/watchdog-to-torch
Browse files Browse the repository at this point in the history
feat: merge - watchdog to torch
  • Loading branch information
tty47 authored Nov 16, 2023
2 parents e79df58 + 68e52d3 commit fb5c979
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 1 deletion.
67 changes: 66 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ You can use Torch to manage the nodes connections from a config file and Torch w

Torch uses the Kubernetes API to manage the nodes, it gets their multi addresses information and stores them in a Redis instance, also, it provides some metrics to expose the node's IDs through the `/metrics` endpoint.

Torch automatically detects Load Balancer resources in a Kubernetes cluster and exposes metrics related to these Load Balancers.
The service uses OpenTelemetry to instrument the metrics and Prometheus to expose them.
It uses the Kubernetes API server with a watcher to receive events from it. Then filters the list to include only services of type **LoadBalancer**.
For each LoadBalancer service found, it retrieves the LoadBalancer public IP and name and generates metrics with custom labels. These metrics are then exposed via a Prometheus endpoint, making them available for monitoring and visualization in Grafana or other monitoring tools.

---

## Workflow
Expand Down Expand Up @@ -243,6 +248,66 @@ Torch uses [Redis](https://redis.io/) as a DB, so to use Torch, you need to have

We are using Redis in two different ways:
- Store the Nodes IDs and reuse them.
- As a message broker, where Torch uses Producer & Consumer approach to process data async.
- As a message broker, Torch uses the Producer & Consumer approach to process data async.

---

## Metrics

### MultiAddress

Custom metrics to expose the nodes multi-address:

- `multiaddr`: This metric represents the nodes Multi Address:
- `service_name`: The service name. In this case, it is set to **torch**.
- `node_name`: The name of the node.
- `multiaddress`: Node MultiAddress.
- `namespace`: The namespace in which the torch is deployed.
- `value`: The value of the metric. In this example, it is set to 1.

### BlockHeight

Custom metrics to expose the first block height of the chain:

- `block_height_1`: Name of the metric to represent the first block height of the chain:
- `service_name`: The service name. In this case, it is set to **torch**.
- `block_height_1`: First block id generated
- `earliest_block_time`: Timestamp when the chain was created.
- `days_running`: Number of days that the chain is running.
- `namespace`: The namespace in which the torch is deployed.
- `value`: The value of the metric. In this example, it is set to 1.

### Load Balancer

Custom metrics to expose the LoadBalancer public IPs:

- `load_balancer`: This metric represents the LoadBalancer resource and includes the following labels:
- `service_name`: The service name. In this case, it is set to **torch**.
- `load_balancer_name`: The name of the LoadBalancer service.
- `load_balancer_ip`: The IP address of the LoadBalancer.
- `namespace`: The namespace in which the LoadBalancer is deployed.
- `value`: The value of the metric. In this example, it is set to 1, but it can be customized to represent different load balancing states.


---

## Monitoring and Visualization

Torch exposes some custom metrics through the Prometheus endpoint.
You can use Grafana to connect to Prometheus and create custom dashboards to visualize these metrics.

To access the Prometheus and Grafana dashboards and view the metrics, follow these steps:

1. Access the Prometheus dashboard:
- Open a web browser and navigate to the Prometheus server's URL (e.g., `http://prometheus-server:9090`).
- In the Prometheus web interface, you can explore and query the metrics collected by the Service Torch.

2. Access the Grafana dashboard:
- Open a web browser and navigate to the Grafana server's URL (e.g., `http://grafana-server:3000`).
- Log in to Grafana using your credentials.
- Create a new dashboard or import an existing one to visualize the LoadBalancer metrics from Prometheus.
- Use the `load_balancer` metric and its labels to filter and display the relevant information.

Customizing dashboards and setting up alerts in Grafana will help you monitor the performance and health of your LoadBalancer resources effectively.

---
26 changes: 26 additions & 0 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func Run(cfg config.MutualPeersConfig) {

// check if Torch has to generate the metric or not, we invoke this function async to continue the execution flow.
go BackgroundGenerateHashMetric(cfg)
go BackgroundGenerateLBMetric()

// Initialize the goroutine to check the nodes in the queue.
log.Info("Initializing queues to process the nodes...")
Expand Down Expand Up @@ -129,6 +130,31 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Exited Properly")
}

// BackgroundGenerateLBMetric initializes a goroutine to generate the load_balancer metric.
func BackgroundGenerateLBMetric() {
log.Info("Initializing goroutine to generate the metric: load_balancer ")

// Retrieve the list of Load Balancers
_, err := k8s.RetrieveAndGenerateMetrics()
if err != nil {
log.Printf("Failed to update metrics: %v", err)
}

// Start watching for changes to the services in a separate goroutine
done := make(chan error)
go k8s.WatchServices(done)

// Handle errors from WatchServices
for {
select {
case err := <-done:
if err != nil {
log.Error("Error in WatchServices: ", err)
}
}
}
}

// BackgroundGenerateHashMetric checks if the consensusNode field is defined in the config to generate the metric from the Genesis Hash data.
func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) {
log.Info("BackgroundGenerateHashMetric...")
Expand Down
147 changes: 147 additions & 0 deletions pkg/k8s/services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package k8s

import (
"context"
"errors"
"fmt"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/celestiaorg/torch/pkg/metrics"
)

// RetrieveAndGenerateMetrics retrieves the list of Load Balancers and generates metrics
func RetrieveAndGenerateMetrics() ([]metrics.LoadBalancer, error) {
log.Info("Retrieving the list of Load Balancers")

// Get list of LBs
svc, err := ListServices()
if err != nil {
log.Error("Failed to retrieve the LoadBalancers: ", err)
return nil, err
}

// Get the list of the LBs
loadBalancers, err := GetLoadBalancers(svc)
if err != nil {
log.Error("Error getting the load balancers: ", err)
return nil, err
}

// Generate the metrics with the LBs
err = metrics.WithMetricsLoadBalancer(loadBalancers)
if err != nil {
log.Error("Failed to update metrics: ", err)
return nil, err
}

return loadBalancers, nil
}

// ListServices retrieves the list of services in a namespace
func ListServices() (*corev1.ServiceList, error) {
// Authentication in cluster - using Service Account, Role, RoleBinding
config, err := rest.InClusterConfig()
if err != nil {
log.Error("ERROR: ", err)
return nil, err
}

// Create the Kubernetes clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error("ERROR: ", err)
return nil, err
}

// Get all services in the namespace
services, err := clientSet.CoreV1().Services(GetCurrentNamespace()).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error("ERROR: ", err)
return nil, err
}

return services, nil
}

// GetLoadBalancers filters the list of services to include only Load Balancers and returns a list of them
func GetLoadBalancers(svc *corev1.ServiceList) ([]metrics.LoadBalancer, error) {
var loadBalancers []metrics.LoadBalancer

for _, svc := range svc.Items {
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
for _, ingress := range svc.Status.LoadBalancer.Ingress {
log.Info(fmt.Sprintf("Updating metrics for service: [%s] with IP: [%s]", svc.Name, ingress.IP))

// Create a LoadBalancer struct and append it to the loadBalancers list
loadBalancer := metrics.LoadBalancer{
ServiceName: "torch",
LoadBalancerName: svc.Name,
LoadBalancerIP: ingress.IP,
Namespace: svc.Namespace,
Value: 1, // Set the value of the metric here (e.g., 1)
}
loadBalancers = append(loadBalancers, loadBalancer)
}
}
}

if len(loadBalancers) == 0 {
return nil, errors.New("no Load Balancers found")
}

return loadBalancers, nil
}

// WatchServices watches for changes to the services in the specified namespace and updates the metrics accordingly
func WatchServices(done chan<- error) {
defer close(done)

// Authentication in cluster - using Service Account, Role, RoleBinding
config, err := rest.InClusterConfig()
if err != nil {
log.Error("Failed to get in-cluster config: ", err)
done <- err
return
}

// Create the Kubernetes clientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error("Failed to create Kubernetes clientSet: ", err)
done <- err
return
}

// Create a service watcher
watcher, err := clientSet.CoreV1().Services(GetCurrentNamespace()).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error("Failed to create service watcher: ", err)
done <- err
return
}

// Watch for events on the watcher channel
for event := range watcher.ResultChan() {
if service, ok := event.Object.(*corev1.Service); ok {
if service.Spec.Type == corev1.ServiceTypeLoadBalancer {
loadBalancers, err := GetLoadBalancers(&corev1.ServiceList{Items: []corev1.Service{*service}})
if err != nil {
log.Error("Failed to get the load balancers metrics: %v", err)
done <- err
return
}

if err := metrics.WithMetricsLoadBalancer(loadBalancers); err != nil {
log.Error("Failed to update metrics with load balancers: ", err)
done <- err
return
}
}
}
}
}
43 changes: 43 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,46 @@ func calculateDaysDifference(inputTimeString string) (int, error) {

return daysDifference, nil
}

// LoadBalancer represents the information for a load balancer.
type LoadBalancer struct {
ServiceName string // ServiceName Name of the service associated with the load balancer.
LoadBalancerName string // LoadBalancerName Name of the load balancer.
LoadBalancerIP string // LoadBalancerIP IP address of the load balancer.
Namespace string // Namespace where the service is deployed.
Value float64 // Value to be observed for the load balancer.
}

// WithMetricsLoadBalancer creates a callback function to observe metrics for multiple load balancers.
func WithMetricsLoadBalancer(loadBalancers []LoadBalancer) error {
// Create a Float64ObservableGauge named "load_balancer" with a description for the metric.
loadBalancersGauge, err := meter.Float64ObservableGauge(
"load_balancer",
metric.WithDescription("Torch - Load Balancers"),
)
if err != nil {
log.Fatalf(err.Error())
return err
}

// Define the callback function that will be called periodically to observe metrics.
callback := func(ctx context.Context, observer metric.Observer) error {
for _, lb := range loadBalancers {
// Create labels with attributes for each load balancer.
labels := metric.WithAttributes(
attribute.String("service_name", lb.ServiceName),
attribute.String("load_balancer_name", lb.LoadBalancerName),
attribute.String("load_balancer_ip", lb.LoadBalancerIP),
attribute.String("namespace", lb.Namespace),
)
// Observe the float64 value for the current load balancer with the associated labels.
observer.ObserveFloat64(loadBalancersGauge, lb.Value, labels)
}

return nil
}

// Register the callback with the meter and the Float64ObservableGauge.
_, err = meter.RegisterCallback(callback, loadBalancersGauge)
return err
}

0 comments on commit fb5c979

Please sign in to comment.