Skip to content

Commit

Permalink
Merge pull request #4 from celestiaorg/jose/fix-cons-metric
Browse files Browse the repository at this point in the history
fix: consensus metric
  • Loading branch information
tty47 authored Nov 13, 2023
2 parents 51f8e1d + 78bbf75 commit e79df58
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 37 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.41.0
go.opentelemetry.io/otel/metric v1.19.0
go.opentelemetry.io/otel/sdk/metric v0.41.0
golang.org/x/sync v0.5.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
137 changes: 116 additions & 21 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/celestiaorg/torch/config"
"github.com/celestiaorg/torch/pkg/db/redis"
Expand All @@ -19,6 +20,11 @@ import (
"github.com/celestiaorg/torch/pkg/nodes"
)

const (
retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric.
hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric.
)

// GetHttpPort GetPort retrieves the namespace where the service will be deployed
func GetHttpPort() string {
port := os.Getenv("HTTP_PORT")
Expand Down Expand Up @@ -57,13 +63,6 @@ func Run(cfg config.MutualPeersConfig) {
return
}

// generate the metric from the Genesis Hash data
notOk := GenerateHashMetrics(cfg, err)
if notOk {
log.Error("Error registering metric block_height_1")
return
}

// Create the server
server := &http.Server{
Addr: ":" + httpPort,
Expand All @@ -81,6 +80,9 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Started...")
log.Info("Listening on port: " + httpPort)

// check if Torch has to generate the metric or not, we invoke this function async to continue the execution flow.
go BackgroundGenerateHashMetric(cfg)

// Initialize the goroutine to check the nodes in the queue.
log.Info("Initializing queues to process the nodes...")
// Create a new context without timeout as we want to keep this goroutine running forever, if we specify a timeout,
Expand Down Expand Up @@ -127,23 +129,116 @@ func Run(cfg config.MutualPeersConfig) {
log.Info("Server Exited Properly")
}

func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool {
// Get the genesisHash
// check if the config has the consensusNode field defined
// BackgroundGenerateHashMetric checks if the consensusNode field is defined in the config to generate the metric from the Genesis Hash data.
func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) {
log.Info("BackgroundGenerateHashMetric...")

if cfg.MutualPeers[0].ConsensusNode != "" {
blockHash, earliestBlockTime := nodes.GenesisHash(cfg)
err = metrics.WithMetricsBlockHeight(
blockHash,
earliestBlockTime,
cfg.MutualPeers[0].ConsensusNode,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Errorf("Error registering metric block_height_1: %v", err)
return true
log.Info("Initializing goroutine to generate the metric: hash ")

// Create an errgroup with a context
eg, ctx := errgroup.WithContext(context.Background())

// Run the WatchHashMetric function in a separate goroutine
eg.Go(func() error {
log.Info("Consensus node defined to get the first block")
return WatchHashMetric(cfg, ctx)
})

// Wait for all goroutines to finish
if err := eg.Wait(); err != nil {
log.Error("Error in BackgroundGenerateHashMetric: ", err)
// Handle the error as needed
}
}
}

// WatchHashMetric watches for changes to generate hash metrics in the specified interval.
func WatchHashMetric(cfg config.MutualPeersConfig, ctx context.Context) error {
// Create a new context derived from the input context with a timeout
ctx, cancel := context.WithTimeout(ctx, hashMetricGenTimeout)
defer cancel()

// Create an errgroup with the context
eg, ctx := errgroup.WithContext(ctx)

// Run the WatchHashMetric function in a separate goroutine
eg.Go(func() error {
return watchMetricsWithRetry(cfg, ctx)
})

// Wait for all goroutines to finish
return eg.Wait()
}

// watchMetricsWithRetry is a helper function for WatchHashMetric that encapsulates the retry logic.
func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) error {
// Continue generating metrics with retries
for {
select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
return ctx.Err()
default:
err := GenerateHashMetrics(cfg)
// Check if err is nil, if so, Torch was able to generate the metric.
if err == nil {
log.Info("Metric generated for the first block, let's stop the process successfully...")
// The metric was successfully generated, stop the retries.
return nil
}

// Log the error
log.Error("Error generating hash metrics: ", err)

// Wait for the retry interval before the next execution using a timer
if err := waitForRetry(ctx); err != nil {
return err
}
}
}
return false
}

// waitForRetry is a helper function to wait for the retry interval or stop if the context is canceled.
func waitForRetry(ctx context.Context) error {
timer := time.NewTimer(retryInterval)
defer timer.Stop()

select {
case <-ctx.Done():
// Context canceled, stop the process
log.Info("Context canceled, stopping WatchHashMetric.")
return ctx.Err()
case <-timer.C:
// Continue to the next iteration
return nil
}
}

// GenerateHashMetrics generates the metric by getting the first block and calculating the days.
func GenerateHashMetrics(cfg config.MutualPeersConfig) error {
log.Info("Trying to generate the metric for the first block generated...")

// Get the genesisHash
blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode)
if err != nil {
return err
}

// check if earliestBlockTime is not empty, otherwise torch skips this process for now.
err = metrics.WithMetricsBlockHeight(
blockHash,
earliestBlockTime,
cfg.MutualPeers[0].ConsensusNode,
os.Getenv("POD_NAMESPACE"),
)
if err != nil {
log.Error("Error registering metric block_height_1: ", err)
return err
}

return nil
}

// RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB.
Expand Down
18 changes: 13 additions & 5 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,22 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa
log.Fatalf(err.Error())
return err
}

// Calculate the days that the chain is live.
daysRunning, err := calculateDaysDifference(earliestBlockTime)
if err != nil {
log.Error("ERROR: ", err)
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
// Define the callback function that will be called periodically to observe metrics.
// Create labels with attributes for each block_height_1.
labels := metric.WithAttributes(
attribute.String("service_name", serviceName),
attribute.String("block_height_1", blockHeight),
attribute.String("earliest_block_time", earliestBlockTime),
attribute.Int("days_running", CalculateDaysDifference(earliestBlockTime)),
attribute.Int("days_running", daysRunning),
attribute.String("namespace", namespace),
)
// Observe the float64 value for the current block_height_1 with the associated labels.
Expand All @@ -98,18 +106,18 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa
return err
}

// CalculateDaysDifference based on the date received, returns the number of days since this day.
func CalculateDaysDifference(inputTimeString string) int {
// calculateDaysDifference based on the date received, returns the number of days since this day.
func calculateDaysDifference(inputTimeString string) (int, error) {
layout := "2006-01-02T15:04:05.999999999Z"
inputTime, err := time.Parse(layout, inputTimeString)
if err != nil {
log.Error("Error parsing time: [", inputTimeString, "]", err)
return -1
return -1, err
}

currentTime := time.Now()
timeDifference := currentTime.Sub(inputTime)
daysDifference := int(timeDifference.Hours() / 24)

return daysDifference
return daysDifference, nil
}
21 changes: 10 additions & 11 deletions pkg/nodes/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
var (
consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes.
consContainerName = "consensus" // consContainerName container name which the pod runs.
namespace = k8s.GetCurrentNamespace() // ns namespace of the node.
namespace = k8s.GetCurrentNamespace() // namespace of the node.
)

// SetConsNodeDefault sets all the default values in case they are empty
Expand All @@ -34,26 +34,25 @@ func SetConsNodeDefault(peer config.Peer) config.Peer {

// GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode
// makes a request to the API and gets the info about the genesis and return it
func GenesisHash(pods config.MutualPeersConfig) (string, string) {
consensusNode := pods.MutualPeers[0].ConsensusNode
func GenesisHash(consensusNode string) (string, string, error) {
url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode)

response, err := http.Get(url)
if err != nil {
log.Error("Error making GET request:", err)
return "", ""
log.Error("Error making the request to the node [", consensusNode, "] - ", err)
return "", "", err
}
defer response.Body.Close()

if response.StatusCode != http.StatusOK {
log.Error("Non-OK response:", response.Status)
return "", ""
return "", "", err
}

bodyBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Error("Error reading response body:", err)
return "", ""
return "", "", err
}

bodyString := string(bodyBytes)
Expand All @@ -64,26 +63,26 @@ func GenesisHash(pods config.MutualPeersConfig) (string, string) {
err = json.Unmarshal([]byte(bodyString), &jsonResponse)
if err != nil {
log.Error("Error parsing JSON:", err)
return "", ""
return "", "", err
}

// Access and print the .block_id.hash field
blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string)
if !ok {
log.Error("Unable to access .block_id.hash")
return "", ""
return "", "", err
}

// Access and print the .block.header.time field
blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string)
if !ok {
log.Error("Unable to access .block.header.time")
return "", ""
return "", "", err
}

log.Info("Block ID Hash: ", blockIDHash)
log.Info("Block Time: ", blockTime)
log.Info("Full output: ", bodyString)

return blockIDHash, blockTime
return blockIDHash, blockTime, nil
}

0 comments on commit e79df58

Please sign in to comment.