From 5ab921e6da83d4b1d699fe101c19242692553e09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Wed, 8 Nov 2023 17:51:37 +0100 Subject: [PATCH 1/9] fix(torch): consensus metrics fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 67 ++++++++++++++++++++++++++++-------------- pkg/metrics/metrics.go | 16 +++++++--- pkg/nodes/consensus.go | 21 +++++++------ 3 files changed, 67 insertions(+), 37 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index 1c5682a..304afa1 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -19,6 +19,10 @@ import ( "github.com/celestiaorg/torch/pkg/nodes" ) +const ( + retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric. +) + // GetHttpPort GetPort retrieves the namespace where the service will be deployed func GetHttpPort() string { port := os.Getenv("HTTP_PORT") @@ -57,13 +61,6 @@ func Run(cfg config.MutualPeersConfig) { return } - // generate the metric from the Genesis Hash data - notOk := GenerateHashMetrics(cfg, err) - if notOk { - log.Error("Error registering metric block_height_1") - return - } - // Create the server server := &http.Server{ Addr: ":" + httpPort, @@ -81,6 +78,26 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) + // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. + if cfg.MutualPeers[0].ConsensusNode != "" { + // Initialise the goroutine. + go func() { + log.Info("Consensus node defined to get the first block") + for { + err := GenerateHashMetrics(cfg, err) + // check if err is nil, if so, that means that Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block...") + // The metric was successfully generated, stop the retries. + break + } + + // Wait for the retry interval before the next execution + time.Sleep(retryInterval) + } + }() + } + // Initialize the goroutine to check the nodes in the queue. log.Info("Initializing queues to process the nodes...") // Create a new context without timeout as we want to keep this goroutine running forever, if we specify a timeout, @@ -127,23 +144,29 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Exited Properly") } -func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) bool { +// GenerateHashMetrics generates the metric by getting the first block and calculating the days. +func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) error { + log.Info("Generating the metric for the first block generated...") + // Get the genesisHash - // check if the config has the consensusNode field defined - if cfg.MutualPeers[0].ConsensusNode != "" { - blockHash, earliestBlockTime := nodes.GenesisHash(cfg) - err = metrics.WithMetricsBlockHeight( - blockHash, - earliestBlockTime, - cfg.MutualPeers[0].ConsensusNode, - os.Getenv("POD_NAMESPACE"), - ) - if err != nil { - log.Errorf("Error registering metric block_height_1: %v", err) - return true - } + blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode) + if err != nil { + return err } - return false + + // check if earliestBlockTime is not empty, otherwise torch skips this process for now. + err = metrics.WithMetricsBlockHeight( + blockHash, + earliestBlockTime, + cfg.MutualPeers[0].ConsensusNode, + os.Getenv("POD_NAMESPACE"), + ) + if err != nil { + log.Error("Error registering metric block_height_1: ", err) + return err + } + + return nil } // RegisterMetrics generates and registers the metrics for all nodes in case they already exist in the DB. diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index ba712ce..409b4e0 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -77,6 +77,14 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa log.Fatalf(err.Error()) return err } + + // Calculate the days that the chain is live. + daysRunning, err := CalculateDaysDifference(earliestBlockTime) + if err != nil { + log.Error("ERROR: ", err) + return err + } + callback := func(ctx context.Context, observer metric.Observer) error { // Define the callback function that will be called periodically to observe metrics. // Create labels with attributes for each block_height_1. @@ -84,7 +92,7 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa attribute.String("service_name", serviceName), attribute.String("block_height_1", blockHeight), attribute.String("earliest_block_time", earliestBlockTime), - attribute.Int("days_running", CalculateDaysDifference(earliestBlockTime)), + attribute.Int("days_running", daysRunning), attribute.String("namespace", namespace), ) // Observe the float64 value for the current block_height_1 with the associated labels. @@ -99,17 +107,17 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa } // CalculateDaysDifference based on the date received, returns the number of days since this day. -func CalculateDaysDifference(inputTimeString string) int { +func CalculateDaysDifference(inputTimeString string) (int, error) { layout := "2006-01-02T15:04:05.999999999Z" inputTime, err := time.Parse(layout, inputTimeString) if err != nil { log.Error("Error parsing time: [", inputTimeString, "]", err) - return -1 + return -1, err } currentTime := time.Now() timeDifference := currentTime.Sub(inputTime) daysDifference := int(timeDifference.Hours() / 24) - return daysDifference + return daysDifference, nil } diff --git a/pkg/nodes/consensus.go b/pkg/nodes/consensus.go index 03d3a18..a265be2 100644 --- a/pkg/nodes/consensus.go +++ b/pkg/nodes/consensus.go @@ -15,7 +15,7 @@ import ( var ( consContainerSetupName = "consensus-setup" // consContainerSetupName initContainer that we use to configure the nodes. consContainerName = "consensus" // consContainerName container name which the pod runs. - namespace = k8s.GetCurrentNamespace() // ns namespace of the node. + namespace = k8s.GetCurrentNamespace() // namespace of the node. ) // SetConsNodeDefault sets all the default values in case they are empty @@ -34,26 +34,25 @@ func SetConsNodeDefault(peer config.Peer) config.Peer { // GenesisHash connects to the node specified in: config.MutualPeersConfig.ConsensusNode // makes a request to the API and gets the info about the genesis and return it -func GenesisHash(pods config.MutualPeersConfig) (string, string) { - consensusNode := pods.MutualPeers[0].ConsensusNode +func GenesisHash(consensusNode string) (string, string, error) { url := fmt.Sprintf("http://%s:26657/block?height=1", consensusNode) response, err := http.Get(url) if err != nil { - log.Error("Error making GET request:", err) - return "", "" + log.Error("Error making the request to the node [", consensusNode, "] - ", err) + return "", "", err } defer response.Body.Close() if response.StatusCode != http.StatusOK { log.Error("Non-OK response:", response.Status) - return "", "" + return "", "", err } bodyBytes, err := ioutil.ReadAll(response.Body) if err != nil { log.Error("Error reading response body:", err) - return "", "" + return "", "", err } bodyString := string(bodyBytes) @@ -64,26 +63,26 @@ func GenesisHash(pods config.MutualPeersConfig) (string, string) { err = json.Unmarshal([]byte(bodyString), &jsonResponse) if err != nil { log.Error("Error parsing JSON:", err) - return "", "" + return "", "", err } // Access and print the .block_id.hash field blockIDHash, ok := jsonResponse["result"].(map[string]interface{})["block_id"].(map[string]interface{})["hash"].(string) if !ok { log.Error("Unable to access .block_id.hash") - return "", "" + return "", "", err } // Access and print the .block.header.time field blockTime, ok := jsonResponse["result"].(map[string]interface{})["block"].(map[string]interface{})["header"].(map[string]interface{})["time"].(string) if !ok { log.Error("Unable to access .block.header.time") - return "", "" + return "", "", err } log.Info("Block ID Hash: ", blockIDHash) log.Info("Block Time: ", blockTime) log.Info("Full output: ", bodyString) - return blockIDHash, blockTime + return blockIDHash, blockTime, nil } From ecde58fe5c9fed413c3e820877dd446ac5b4abf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Wed, 8 Nov 2023 18:00:56 +0100 Subject: [PATCH 2/9] fix(torch): fix func MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index 304afa1..2fd134c 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -80,11 +80,11 @@ func Run(cfg config.MutualPeersConfig) { // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. if cfg.MutualPeers[0].ConsensusNode != "" { - // Initialise the goroutine. + // Initialise the goroutine to generate the metric in the background, only if we specify the node in the config. go func() { log.Info("Consensus node defined to get the first block") for { - err := GenerateHashMetrics(cfg, err) + err := GenerateHashMetrics(cfg) // check if err is nil, if so, that means that Torch was able to generate the metric. if err == nil { log.Info("Metric generated for the first block...") @@ -145,7 +145,7 @@ func Run(cfg config.MutualPeersConfig) { } // GenerateHashMetrics generates the metric by getting the first block and calculating the days. -func GenerateHashMetrics(cfg config.MutualPeersConfig, err error) error { +func GenerateHashMetrics(cfg config.MutualPeersConfig) error { log.Info("Generating the metric for the first block generated...") // Get the genesisHash From e25c68e9efb9c9b4c691338f55ee3d8b9952bf82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Thu, 9 Nov 2023 12:40:07 +0100 Subject: [PATCH 3/9] fix(torch): refactor extract to func - priv func MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 44 ++++++++++++++++++++++++------------------ pkg/metrics/metrics.go | 6 +++--- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index 2fd134c..ab53b57 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -78,25 +78,7 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) - // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. - if cfg.MutualPeers[0].ConsensusNode != "" { - // Initialise the goroutine to generate the metric in the background, only if we specify the node in the config. - go func() { - log.Info("Consensus node defined to get the first block") - for { - err := GenerateHashMetrics(cfg) - // check if err is nil, if so, that means that Torch was able to generate the metric. - if err == nil { - log.Info("Metric generated for the first block...") - // The metric was successfully generated, stop the retries. - break - } - - // Wait for the retry interval before the next execution - time.Sleep(retryInterval) - } - }() - } + BackgroundGenerateHashMetric(cfg) // Initialize the goroutine to check the nodes in the queue. log.Info("Initializing queues to process the nodes...") @@ -144,6 +126,30 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Exited Properly") } +// BackgroundGenerateHashMetric check if we have defined the consensus in the config, if so, it creates a goroutine +// to generate the metric +func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) { + // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. + if cfg.MutualPeers[0].ConsensusNode != "" { + // Initialise the goroutine to generate the metric in the background, only if we specify the node in the config. + go func() { + log.Info("Consensus node defined to get the first block") + for { + err := GenerateHashMetrics(cfg) + // check if err is nil, if so, that means that Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block...") + // The metric was successfully generated, stop the retries. + break + } + + // Wait for the retry interval before the next execution + time.Sleep(retryInterval) + } + }() + } +} + // GenerateHashMetrics generates the metric by getting the first block and calculating the days. func GenerateHashMetrics(cfg config.MutualPeersConfig) error { log.Info("Generating the metric for the first block generated...") diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 409b4e0..4bd0826 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -79,7 +79,7 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa } // Calculate the days that the chain is live. - daysRunning, err := CalculateDaysDifference(earliestBlockTime) + daysRunning, err := calculateDaysDifference(earliestBlockTime) if err != nil { log.Error("ERROR: ", err) return err @@ -106,8 +106,8 @@ func WithMetricsBlockHeight(blockHeight, earliestBlockTime, serviceName, namespa return err } -// CalculateDaysDifference based on the date received, returns the number of days since this day. -func CalculateDaysDifference(inputTimeString string) (int, error) { +// calculateDaysDifference based on the date received, returns the number of days since this day. +func calculateDaysDifference(inputTimeString string) (int, error) { layout := "2006-01-02T15:04:05.999999999Z" inputTime, err := time.Parse(layout, inputTimeString) if err != nil { From d3353f2f9390c98424b6c171781769b37af55730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Thu, 9 Nov 2023 12:41:40 +0100 Subject: [PATCH 4/9] fix(torch): add comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/http/server.go b/pkg/http/server.go index ab53b57..6c3dd79 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -78,6 +78,7 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) + // check if Torch has to generate the metric or not. BackgroundGenerateHashMetric(cfg) // Initialize the goroutine to check the nodes in the queue. From fd6cc870f4e8b0eae0adcf1ad52eaeccc0f3a6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Thu, 9 Nov 2023 12:43:21 +0100 Subject: [PATCH 5/9] fix(torch): reset sts file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/k8s/statefulsets.go | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index ec5ce3c..b556847 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -40,28 +40,17 @@ func WatchStatefulSets() error { return err } - // Variable to keep track of the last processed name - lastProcessedName := "" - // Watch for events on the watcher channel for event := range watcher.ResultChan() { if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { - // Check if the name has the "da" prefix - if strings.HasPrefix(statefulSet.Name, "da") { - // Check if the name is different from the last processed one - if statefulSet.Name != lastProcessedName { - // Process the name and perform necessary actions - err := redis.Producer(statefulSet.Name, queueK8SNodes) - if err != nil { - log.Error("ERROR adding the node to the queue: ", err) - return err - } + //log.Info("StatefulSet containers: ", statefulSet.Spec.Template.Spec.Containers) - // Update the last processed name to don't process it more than once. - lastProcessedName = statefulSet.Name - } else { - // cleanup the previous processed to add it in future iterations. - lastProcessedName = "" + // check if the node is DA, if so, send it to the queue to generate the multi address + if strings.HasPrefix(statefulSet.Name, "da") { + err := redis.Producer(statefulSet.Name, queueK8SNodes) + if err != nil { + log.Error("ERROR adding the node to the queue: ", err) + return err } } } From 3d52d14b4158150101d19d0b1feead74f7a78813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Fri, 10 Nov 2023 09:06:13 +0100 Subject: [PATCH 6/9] feat(torch): manage errors from chan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 45 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index 6c3dd79..d21307f 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -78,8 +78,8 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Started...") log.Info("Listening on port: " + httpPort) - // check if Torch has to generate the metric or not. - BackgroundGenerateHashMetric(cfg) + // check if Torch has to generate the metric or not, we invoke this function async to continue the execution flow. + go BackgroundGenerateHashMetric(cfg) // Initialize the goroutine to check the nodes in the queue. log.Info("Initializing queues to process the nodes...") @@ -131,26 +131,49 @@ func Run(cfg config.MutualPeersConfig) { // to generate the metric func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) { // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. + log.Info("BackgroundGenerateHashMetric...") + if cfg.MutualPeers[0].ConsensusNode != "" { + log.Info("Initializing goroutine to generate the metric: hash ") // Initialise the goroutine to generate the metric in the background, only if we specify the node in the config. go func() { log.Info("Consensus node defined to get the first block") + + // Watch for events on the watcher channel + done := make(chan error) + go WatchHashMetric(cfg, done) + + // Handle errors from WatchHashMetric for { - err := GenerateHashMetrics(cfg) - // check if err is nil, if so, that means that Torch was able to generate the metric. - if err == nil { - log.Info("Metric generated for the first block...") - // The metric was successfully generated, stop the retries. - break + select { + case err := <-done: + if err != nil { + log.Error("Error in WatchHashMetric: ", err) + // Handle the error as needed + } } - - // Wait for the retry interval before the next execution - time.Sleep(retryInterval) } }() } } +// WatchHashMetric watches for changes to generate hash metrics in the specified interval +func WatchHashMetric(cfg config.MutualPeersConfig, done chan<- error) { + for { + err := GenerateHashMetrics(cfg) + // check if err is nil, if so, that means that Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block...") + // The metric was successfully generated, stop the retries. + done <- nil + return + } + + // Wait for the retry interval before the next execution + time.Sleep(retryInterval) + } +} + // GenerateHashMetrics generates the metric by getting the first block and calculating the days. func GenerateHashMetrics(cfg config.MutualPeersConfig) error { log.Info("Generating the metric for the first block generated...") From 0384196b88ceba8f4069d28f6cd770ea189f8ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Fri, 10 Nov 2023 15:19:00 +0100 Subject: [PATCH 7/9] fix(torch): use ctx with timeouts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 53 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index d21307f..6870968 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -20,7 +20,8 @@ import ( ) const ( - retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric. + retryInterval = 10 * time.Second // retryInterval Retry interval in seconds to generate the consensus metric. + hashMetricGenTimeout = 5 * time.Minute // hashMetricGenTimeout specify the max time to retry to generate the metric. ) // GetHttpPort GetPort retrieves the namespace where the service will be deployed @@ -157,26 +158,50 @@ func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) { } } -// WatchHashMetric watches for changes to generate hash metrics in the specified interval +// WatchHashMetric watches for changes to generate hash metrics in the specified interval. func WatchHashMetric(cfg config.MutualPeersConfig, done chan<- error) { - for { - err := GenerateHashMetrics(cfg) - // check if err is nil, if so, that means that Torch was able to generate the metric. - if err == nil { - log.Info("Metric generated for the first block...") - // The metric was successfully generated, stop the retries. - done <- nil - return - } + // Use context.WithTimeout to set a clear deadline for the process + ctx, cancel := context.WithTimeout(context.Background(), hashMetricGenTimeout) + defer cancel() - // Wait for the retry interval before the next execution - time.Sleep(retryInterval) + // Use a select statement to handle both generating metrics and timing out + select { + case <-ctx.Done(): + // Timeout occurred, return an error + done <- ctx.Err() + return + default: + // Continue generating metrics with retries + for { + err := GenerateHashMetrics(cfg) + // Check if err is nil, if so, Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block, let's stop the process successfully...") + // The metric was successfully generated, stop the retries. + done <- nil + return + } + + // Log the error + log.Error("Error generating hash metrics: ", err) + + // Wait for the retry interval before the next execution using a timer + timer := time.NewTimer(retryInterval) + select { + case <-ctx.Done(): + log.Info("Context timeout reached, we won't generate the metric as the validator seems to be unavailable.") + timer.Stop() + return + case <-timer.C: + // Continue to the next iteration + } + } } } // GenerateHashMetrics generates the metric by getting the first block and calculating the days. func GenerateHashMetrics(cfg config.MutualPeersConfig) error { - log.Info("Generating the metric for the first block generated...") + log.Info("Trying to generate the metric for the first block generated...") // Get the genesisHash blockHash, earliestBlockTime, err := nodes.GenesisHash(cfg.MutualPeers[0].ConsensusNode) From 6178d2bc2c26877d58eb8d202b40d598f93a03f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Fri, 10 Nov 2023 15:43:55 +0100 Subject: [PATCH 8/9] fix(torch): use of errorgroup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- go.mod | 1 + go.sum | 2 + pkg/http/server.go | 103 +++++++++++++++++++++++---------------------- 3 files changed, 56 insertions(+), 50 deletions(-) diff --git a/go.mod b/go.mod index 79d8abb..cc64b7f 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.41.0 go.opentelemetry.io/otel/metric v1.19.0 go.opentelemetry.io/otel/sdk/metric v0.41.0 + golang.org/x/sync v0.5.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.27.4 k8s.io/apimachinery v0.27.4 diff --git a/go.sum b/go.sum index 8a3b205..8cbcabb 100644 --- a/go.sum +++ b/go.sum @@ -190,6 +190,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/http/server.go b/pkg/http/server.go index 6870968..fe89925 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -11,6 +11,7 @@ import ( "github.com/gorilla/mux" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "github.com/celestiaorg/torch/config" "github.com/celestiaorg/torch/pkg/db/redis" @@ -128,75 +129,77 @@ func Run(cfg config.MutualPeersConfig) { log.Info("Server Exited Properly") } -// BackgroundGenerateHashMetric check if we have defined the consensus in the config, if so, it creates a goroutine -// to generate the metric +// BackgroundGenerateHashMetric checks if the consensusNode field is defined in the config to generate the metric from the Genesis Hash data. func BackgroundGenerateHashMetric(cfg config.MutualPeersConfig) { - // Check if the config has the consensusNode field defined to generate the metric from the Genesis Hash data. log.Info("BackgroundGenerateHashMetric...") if cfg.MutualPeers[0].ConsensusNode != "" { log.Info("Initializing goroutine to generate the metric: hash ") - // Initialise the goroutine to generate the metric in the background, only if we specify the node in the config. - go func() { - log.Info("Consensus node defined to get the first block") - // Watch for events on the watcher channel - done := make(chan error) - go WatchHashMetric(cfg, done) + // Create an errgroup with a context + eg, ctx := errgroup.WithContext(context.Background()) - // Handle errors from WatchHashMetric - for { - select { - case err := <-done: - if err != nil { - log.Error("Error in WatchHashMetric: ", err) - // Handle the error as needed - } - } - } - }() + // Run the WatchHashMetric function in a separate goroutine + eg.Go(func() error { + log.Info("Consensus node defined to get the first block") + return WatchHashMetric(cfg, ctx) + }) + + // Wait for all goroutines to finish + if err := eg.Wait(); err != nil { + log.Error("Error in BackgroundGenerateHashMetric: ", err) + // Handle the error as needed + } } } // WatchHashMetric watches for changes to generate hash metrics in the specified interval. -func WatchHashMetric(cfg config.MutualPeersConfig, done chan<- error) { - // Use context.WithTimeout to set a clear deadline for the process - ctx, cancel := context.WithTimeout(context.Background(), hashMetricGenTimeout) +func WatchHashMetric(cfg config.MutualPeersConfig, ctx context.Context) error { + // Create a new context derived from the input context with a timeout + ctx, cancel := context.WithTimeout(ctx, hashMetricGenTimeout) defer cancel() - // Use a select statement to handle both generating metrics and timing out - select { - case <-ctx.Done(): - // Timeout occurred, return an error - done <- ctx.Err() - return - default: + // Create an errgroup with the context + eg, ctx := errgroup.WithContext(ctx) + + // Run the WatchHashMetric function in a separate goroutine + eg.Go(func() error { // Continue generating metrics with retries for { - err := GenerateHashMetrics(cfg) - // Check if err is nil, if so, Torch was able to generate the metric. - if err == nil { - log.Info("Metric generated for the first block, let's stop the process successfully...") - // The metric was successfully generated, stop the retries. - done <- nil - return - } - - // Log the error - log.Error("Error generating hash metrics: ", err) - - // Wait for the retry interval before the next execution using a timer - timer := time.NewTimer(retryInterval) select { case <-ctx.Done(): - log.Info("Context timeout reached, we won't generate the metric as the validator seems to be unavailable.") - timer.Stop() - return - case <-timer.C: - // Continue to the next iteration + // Context canceled, stop the process + log.Info("Context canceled, stopping WatchHashMetric.") + return ctx.Err() + default: + err := GenerateHashMetrics(cfg) + // Check if err is nil, if so, Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block, let's stop the process successfully...") + // The metric was successfully generated, stop the retries. + return nil + } + + // Log the error + log.Error("Error generating hash metrics: ", err) + + // Wait for the retry interval before the next execution using a timer + timer := time.NewTimer(retryInterval) + select { + case <-ctx.Done(): + // Context canceled, stop the process + log.Info("Context canceled, stopping WatchHashMetric.") + timer.Stop() + return ctx.Err() + case <-timer.C: + // Continue to the next iteration + } } } - } + }) + + // Wait for all goroutines to finish + return eg.Wait() } // GenerateHashMetrics generates the metric by getting the first block and calculating the days. From 78bbf75eec49ecbd86e56449c9c76e7b5ab5970a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20Ramon=20Ma=C3=B1es?= Date: Mon, 13 Nov 2023 13:10:37 +0100 Subject: [PATCH 9/9] refactor(torch): split in priv funcs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jose Ramon Mañes --- pkg/http/server.go | 78 +++++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/pkg/http/server.go b/pkg/http/server.go index fe89925..50840d4 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -164,44 +164,58 @@ func WatchHashMetric(cfg config.MutualPeersConfig, ctx context.Context) error { // Run the WatchHashMetric function in a separate goroutine eg.Go(func() error { - // Continue generating metrics with retries - for { - select { - case <-ctx.Done(): - // Context canceled, stop the process - log.Info("Context canceled, stopping WatchHashMetric.") - return ctx.Err() - default: - err := GenerateHashMetrics(cfg) - // Check if err is nil, if so, Torch was able to generate the metric. - if err == nil { - log.Info("Metric generated for the first block, let's stop the process successfully...") - // The metric was successfully generated, stop the retries. - return nil - } - - // Log the error - log.Error("Error generating hash metrics: ", err) - - // Wait for the retry interval before the next execution using a timer - timer := time.NewTimer(retryInterval) - select { - case <-ctx.Done(): - // Context canceled, stop the process - log.Info("Context canceled, stopping WatchHashMetric.") - timer.Stop() - return ctx.Err() - case <-timer.C: - // Continue to the next iteration - } - } - } + return watchMetricsWithRetry(cfg, ctx) }) // Wait for all goroutines to finish return eg.Wait() } +// watchMetricsWithRetry is a helper function for WatchHashMetric that encapsulates the retry logic. +func watchMetricsWithRetry(cfg config.MutualPeersConfig, ctx context.Context) error { + // Continue generating metrics with retries + for { + select { + case <-ctx.Done(): + // Context canceled, stop the process + log.Info("Context canceled, stopping WatchHashMetric.") + return ctx.Err() + default: + err := GenerateHashMetrics(cfg) + // Check if err is nil, if so, Torch was able to generate the metric. + if err == nil { + log.Info("Metric generated for the first block, let's stop the process successfully...") + // The metric was successfully generated, stop the retries. + return nil + } + + // Log the error + log.Error("Error generating hash metrics: ", err) + + // Wait for the retry interval before the next execution using a timer + if err := waitForRetry(ctx); err != nil { + return err + } + } + } +} + +// waitForRetry is a helper function to wait for the retry interval or stop if the context is canceled. +func waitForRetry(ctx context.Context) error { + timer := time.NewTimer(retryInterval) + defer timer.Stop() + + select { + case <-ctx.Done(): + // Context canceled, stop the process + log.Info("Context canceled, stopping WatchHashMetric.") + return ctx.Err() + case <-timer.C: + // Continue to the next iteration + return nil + } +} + // GenerateHashMetrics generates the metric by getting the first block and calculating the days. func GenerateHashMetrics(cfg config.MutualPeersConfig) error { log.Info("Trying to generate the metric for the first block generated...")