Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: consensus metric #4

Merged
merged 11 commits into from
Nov 13, 2023
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.41.0
go.opentelemetry.io/otel/metric v1.19.0
go.opentelemetry.io/otel/sdk/metric v0.41.0
golang.org/x/sync v0.5.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
137 changes: 116 additions & 21 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/celestiaorg/torch/config"
"github.com/celestiaorg/torch/pkg/db/redis"
Expand All @@ -19,6 +20,11 @@ import (
"github.com/celestiaorg/torch/pkg/nodes"
)

const (
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric.
)

// GetHttpPort GetPort retrieves the namespace where the service will be deployed
func GetHttpPort() string {
port := os.Getenv("HTTP_PORT")
Expand Down Expand Up @@ -57,13 +63,6 @@ func Run(cfg config.MutualPeersConfig) {
return
}

// generate the metric from the Genesis Hash data
notOk := GenerateHashMetrics(cfg, err)
if notOk {
log.Error("Error registering metric block_height_1")
return
}

// Create the server
server := &http.Server{
Addr: ":" + httpPort,
Expand All @@ -81,6 +80,9 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Started...")
log.Info("Listening on port: " + httpPort)

// check if Torch has to generate the metric or not, we invoke this function async to continue the execution flow.
go BackgroundGenerateHashMetric(cfg)

// Initialize the goroutine to check the nodes in the queue.
log.Info("Initializing queues to process the nodes...")
// Create a new context without timeout as we want to keep this goroutine running forever, if we specify a timeout,
Expand Down Expand Up @@ -127,23 +129,116 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Exited Properly")
}

func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool {
// Get the genesisHash
// check if the config has the consensusNode field defined
// BackgroundGenerateHashMetric checks if the consensusNode field is defined in the config to generate the metric from the Genesis Hash data.
func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) {
log.Info("BackgroundGenerateHashMetric...")

if cfg.MutualPeers[0].ConsensusNode != "" {
blockHash, earliestBlockTime := nodes.GenesisHash(cfg)
err = metrics.WithMetricsBlockHeight(
blockHash,
earliestBlockTime,
cfg.MutualPeers[0].ConsensusNode,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Errorf("Error registering metric block_height_1: %v", err)
return true
log.Info("Initializing goroutine to generate the metric: hash ")

// Create an errgroup with a context
eg, ctx := errgroup.WithContext(context.Background())

// Run the WatchHashMetric function in a separate goroutine
eg.Go(func() error {
log.Info("Consensus node defined to get the first block")
return WatchHashMetric(cfg, ctx)
})

// Wait for all goroutines to finish
if err := eg.Wait(); err != nil {
log.Error("Error in BackgroundGenerateHashMetric: ", err)
// Handle the error as needed
}
}
}

// WatchHashMetric watches for changes to generate hash metrics in the specified interval.
func WatchHashMetric(cfg config.MutualPeersConfig, ctx context.Context) error {
// Create a new context derived from the input context with a timeout
ctx, cancel := context.WithTimeout(ctx, hashMetricGenTimeout)
defer cancel()

// Create an errgroup with the context
eg, ctx := errgroup.WithContext(ctx)

// Run the WatchHashMetric function in a separate goroutine
eg.Go(func() error {
return watchMetricsWithRetry(cfg, ctx)
})

// Wait for all goroutines to finish
return eg.Wait()
}

// watchMetricsWithRetry is a helper function for WatchHashMetric that encapsulates the retry logic.
func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) error {
// Continue generating metrics with retries
for {
select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
return ctx.Err()
default:
err := GenerateHashMetrics(cfg)
// Check if err is nil, if so, Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block, let's stop the process successfully...")
// The metric was successfully generated, stop the retries.
return nil
}

// Log the error
log.Error("Error generating hash metrics: ", err)

// Wait for the retry interval before the next execution using a timer
if err := waitForRetry(ctx); err != nil {
return err
}
}
}
return false
}

// waitForRetry is a helper function to wait for the retry interval or stop if the context is canceled.
func waitForRetry(ctx context.Context) error {
timer := time.NewTimer(retryInterval)
defer timer.Stop()

select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
return ctx.Err()
case <-timer.C:
// Continue to the next iteration
return nil
}
}

// GenerateHashMetrics generates the metric by getting the first block and calculating the days.
func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
log.Info("Trying to generate the metric for the first block generated...")

// Get the genesisHash
blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode)
if err != nil {
return err
}

// check if earliestBlockTime is not empty, otherwise torch skips this process for now.
err = metrics.WithMetricsBlockHeight(
blockHash,
earliestBlockTime,
cfg.MutualPeers[0].ConsensusNode,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric block_height_1: ", err)
return err
}

return nil
}

// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
Expand Down
18 changes: 13 additions & 5 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,22 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa
log.Fatalf(err.Error())
return err
}

// Calculate the days that the chain is live.
daysRunning, err := calculateDaysDifference(earliestBlockTime)
if err != nil {
log.Error("ERROR: ", err)
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
// Define the callback function that will be called periodically to observe metrics.
// Create labels with attributes for each block_height_1.
labels := metric.WithAttributes(
attribute.String("service_name", serviceName),
attribute.String("block_height_1", blockHeight),
attribute.String("earliest_block_time", earliestBlockTime),
attribute.Int("days_running", CalculateDaysDifference(earliestBlockTime)),
attribute.Int("days_running", daysRunning),
attribute.String("namespace", namespace),
)
// Observe the float64 value for the current block_height_1 with the associated labels.
Expand All @@ -98,18 +106,18 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa
return err
}

// CalculateDaysDifference based on the date received, returns the number of days since this day.
func CalculateDaysDifference(inputTimeString string) int {
// calculateDaysDifference based on the date received, returns the number of days since this day.
func calculateDaysDifference(inputTimeString string) (int, error) {
layout := "2006-01-02T15:04:05.999999999Z"
inputTime, err := time.Parse(layout, inputTimeString)
if err != nil {
log.Error("Error parsing time: [", inputTimeString, "]", err)
return -1
return -1, err
}

currentTime := time.Now()
timeDifference := currentTime.Sub(inputTime)
daysDifference := int(timeDifference.Hours() / 24)

return daysDifference
return daysDifference, nil
}
21 changes: 10 additions & 11 deletions pkg/nodes/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var (
consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes.
consContainerName = "consensus" // consContainerName container name which the pod runs.
namespace = k8s.GetCurrentNamespace() // ns namespace of the node.
namespace = k8s.GetCurrentNamespace() // namespace of the node.
)

// SetConsNodeDefault sets all the default values in case they are empty
Expand All @@ -34,26 +34,25 @@ func SetConsNodeDefault(peer config.Peer) config.Peer {

// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode
// makes a request to the API and gets the info about the genesis and return it
func GenesisHash(pods config.MutualPeersConfig) (string, string) {
consensusNode := pods.MutualPeers[0].ConsensusNode
func GenesisHash(consensusNode string) (string, string, error) {
url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode)

response, err := http.Get(url)
if err != nil {
log.Error("Error making GET request:", err)
return "", ""
log.Error("Error making the request to the node [", consensusNode, "] - ", err)
return "", "", err
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
log.Error("Non-OK response:", response.Status)
return "", ""
return "", "", err
}

bodyBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Error reading response body:", err)
return "", ""
return "", "", err
}

bodyString := string(bodyBytes)
Expand All @@ -64,26 +63,26 @@ func GenesisHash(pods config.MutualPeersConfig) (string, string) {
err = json.Unmarshal([]byte(bodyString), &jsonResponse)
if err != nil {
log.Error("Error parsing JSON:", err)
return "", ""
return "", "", err
}

// Access and print the .block_id.hash field
blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", ""
return "", "", err
}

// Access and print the .block.header.time field
blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", ""
return "", "", err
}

log.Info("Block ID Hash: ", blockIDHash)
log.Info("Block Time: ", blockTime)
log.Info("Full output: ", bodyString)

return blockIDHash, blockTime
return blockIDHash, blockTime, nil
}