Skip to content

Commit

Permalink
Merge pull request #12 from nodeset-org/dev
Browse files Browse the repository at this point in the history
Prep for v0.1.1
  • Loading branch information
jclapis authored May 2, 2024
2 parents b0d4b64 + 4d8861f commit e96abb3
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 87 deletions.
5 changes: 2 additions & 3 deletions common/nodeset-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const (
authHeader string = "Authorization"

// API paths
devPath string = "dev"
depositDataPath string = "deposit-data"
metaPath string = "meta"
validatorsPath string = "validators"
Expand Down Expand Up @@ -133,7 +132,7 @@ func (c *NodesetClient) UploadSignedExitData(ctx context.Context, exitData []Exi
"network": c.res.EthNetworkName,
}
// Submit the PATCH request with the serialized JSON
response, err := c.submitRequest(ctx, http.MethodPatch, bytes.NewBuffer(jsonData), params, devPath, validatorsPath)
response, err := c.submitRequest(ctx, http.MethodPatch, bytes.NewBuffer(jsonData), params, validatorsPath)
if err != nil {
return nil, fmt.Errorf("error submitting exit data: %w", err)
}
Expand Down Expand Up @@ -186,7 +185,7 @@ func (c *NodesetClient) GetRegisteredValidators(ctx context.Context) ([]Validato
queryParams := map[string]string{
"network": c.res.EthNetworkName,
}
response, err := c.submitRequest(ctx, http.MethodGet, nil, queryParams, devPath, validatorsPath)
response, err := c.submitRequest(ctx, http.MethodGet, nil, queryParams, validatorsPath)
if err != nil {
return nil, fmt.Errorf("error getting registered validators: %w", err)
}
Expand Down
44 changes: 37 additions & 7 deletions common/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,56 @@ import (
)

func (sp *StakewiseServiceProvider) RequireStakewiseWalletReady(ctx context.Context, status wallet.WalletStatus) error {
err := services.CheckIfWalletReady(status)
// No wallet initialized for Hyperdrive
// Get the logger
logger, exists := log.FromContext(ctx)
if !exists {
panic("context didn't have a logger!")
}

// Check if the wallet files exist
exists, err := sp.wallet.CheckIfStakewiseWalletExists()
if exists {
return nil
}
if err != nil {
return err
logger.Error("Error checking if Stakewise wallet exists", log.Err(err))
}

// Check if the Hyperdrive wallet is ready
err = services.CheckIfWalletReady(status)
if err != nil {
return fmt.Errorf("hyperdrive wallet not initialized, can't initialize stakewise wallet yet")
}

return sp.initializeStakewiseWallet(logger)
}

func (sp *StakewiseServiceProvider) WaitForStakewiseWallet(ctx context.Context) error {
// Get the logger
logger, exists := log.FromContext(ctx)
if !exists {
panic("context didn't have a logger!")
}

// Check if the wallet files exist
exists, err = sp.wallet.CheckIfStakewiseWalletExists()
if exists && err == nil {
exists, err := sp.wallet.CheckIfStakewiseWalletExists()
if exists {
return nil
}
if err != nil {
logger.Debug("Error checking if Stakewise wallet exists", log.Err(err))
logger.Error("Error checking if Stakewise wallet exists", log.Err(err))
}

// Wait for the Hyperdrive wallet first, then initialize the Stakewise one
err = sp.WaitForWallet(ctx)
if err != nil {
return err
}
return sp.initializeStakewiseWallet(logger)
}

// If wallet is not initialized for SW, just initialize it
func (sp *StakewiseServiceProvider) initializeStakewiseWallet(logger *log.Logger) error {
// Get the wallet from Hyperdrive
logger.Warn("Stakewise wallet not found, initializing now")
ethkeyResponse, err := sp.GetHyperdriveClient().Wallet.ExportEthKey()
if err != nil {
Expand All @@ -45,5 +73,7 @@ func (sp *StakewiseServiceProvider) RequireStakewiseWalletReady(ctx context.Cont
if err != nil {
return err
}

logger.Info("Stakewise wallet initialized successfully")
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/nodeset-org/hyperdrive-daemon v0.4.2-dev.0.20240429205402-54c7fd4f2c3b
github.com/nodeset-org/hyperdrive-daemon v0.4.2-dev.0.20240501154944-195be498d365
github.com/rocket-pool/batch-query v1.0.0
github.com/rocket-pool/node-manager-core v0.3.0
github.com/urfave/cli/v2 v2.27.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/nodeset-org/hyperdrive-daemon v0.4.2-dev.0.20240429205402-54c7fd4f2c3b h1:HM5d2N5YBLauvHiSaghrnI9Uf25VYu0ek5vKSUcZJjc=
github.com/nodeset-org/hyperdrive-daemon v0.4.2-dev.0.20240429205402-54c7fd4f2c3b/go.mod h1:65/oNCQjYcUeSr+hfb7EC1gGLNkaTJlCx6EEJhXh1N0=
github.com/nodeset-org/hyperdrive-daemon v0.4.2-dev.0.20240501154944-195be498d365 h1:RSunSkBjs5ILC+7syrK8bVIagrZEAvYGyyq0Ohcoa68=
github.com/nodeset-org/hyperdrive-daemon v0.4.2-dev.0.20240501154944-195be498d365/go.mod h1:65/oNCQjYcUeSr+hfb7EC1gGLNkaTJlCx6EEJhXh1N0=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
Expand Down
2 changes: 1 addition & 1 deletion shared/config/stakewise-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const (
// Tags
daemonTag string = "nodeset/hyperdrive-stakewise:v" + shared.StakewiseVersion
operatorTag string = "europe-west4-docker.pkg.dev/stakewiselabs/public/v3-operator:v1.1.0"
operatorTag string = "europe-west4-docker.pkg.dev/stakewiselabs/public/v3-operator:v1.2.2"
)

// Configuration for Stakewise
Expand Down
5 changes: 5 additions & 0 deletions shared/keys/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package keys

const (
RetryKey string = "retry"
)
2 changes: 1 addition & 1 deletion shared/version.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package shared

const (
StakewiseVersion string = "0.1.1-dev"
StakewiseVersion string = "0.1.1"
)
181 changes: 123 additions & 58 deletions tasks/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package swtasks

import (
"context"
"log/slog"
"strings"
"sync"
"time"

Expand All @@ -22,87 +24,69 @@ const (
SendExitDataColor = color.FgGreen
)

type waitUntilReadyResult int

const (
waitUntilReadyExit waitUntilReadyResult = iota
waitUntilReadyContinue
waitUntilReadySuccess
)

type TaskLoop struct {
// Services
ctx context.Context
logger *log.Logger
sp *swcommon.StakewiseServiceProvider
wg *sync.WaitGroup

// Tasks
updateDepositData *UpdateDepositDataTask
sendExitData *SendExitDataTask

// Internal
wasExecutionClientSynced bool
wasBeaconClientSynced bool
}

func NewTaskLoop(sp *swcommon.StakewiseServiceProvider, wg *sync.WaitGroup) *TaskLoop {
logger := sp.GetTasksLogger()
ctx := logger.CreateContextWithLogger(sp.GetBaseContext())
taskLoop := &TaskLoop{
sp: sp,
logger: sp.GetTasksLogger(),
wg: wg,
sp: sp,
logger: logger,
ctx: ctx,
wg: wg,
updateDepositData: NewUpdateDepositDataTask(ctx, sp, logger),
sendExitData: NewSendExitDataTask(ctx, sp, logger),

wasExecutionClientSynced: true,
wasBeaconClientSynced: true,
}
taskLoop.ctx = taskLoop.logger.CreateContextWithLogger(sp.GetBaseContext())
return taskLoop
}

// Run daemon
func (t *TaskLoop) Run() error {
// Initialize tasks
updateDepositData := NewUpdateDepositDataTask(t.ctx, t.sp, t.logger)
sendExitData := NewSendExitData(t.ctx, t.sp, t.logger)

// Initialize the Stakewise wallet if it's not ready
response, err := t.sp.GetHyperdriveClient().Wallet.Status()
if err != nil {
t.logger.Warn("Couldn't check Hyperdrive wallet status", log.Err(err))
} else {
err = t.sp.RequireStakewiseWalletReady(t.ctx, response.Data.WalletStatus)
if err != nil {
t.logger.Warn("Couldn't initialize Stakewise wallet", log.Err(err))
}
}

// Run the loop
// Run task loop
t.wg.Add(1)
go func() {
for {
err := t.sp.WaitEthClientSynced(t.ctx, false) // Force refresh the primary / fallback EC status
if err != nil {
t.logger.Error(err.Error())
if utils.SleepWithCancel(t.ctx, taskCooldown) {
break
}
continue
}
defer t.wg.Done()

// Check the BC status
err = t.sp.WaitBeaconClientSynced(t.ctx, false) // Force refresh the primary / fallback BC status
if err != nil {
t.logger.Error(err.Error())
if utils.SleepWithCancel(t.ctx, taskCooldown) {
break
}
for {
// Make sure all of the resources are ready for task processing
readyResult := t.waitUntilReady()
switch readyResult {
case waitUntilReadyExit:
return
case waitUntilReadyContinue:
continue
}

// Tasks start here

// Update deposit data from the NodeSet server
if err := updateDepositData.Run(); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
break
}

// Submit missing exit messages to the NodeSet server
if err := sendExitData.Run(); err != nil {
t.logger.Error(err.Error())
}

// Tasks end here

if utils.SleepWithCancel(t.ctx, tasksInterval) {
break
// === Task execution ===
if t.runTasks() {
return
}
}

// Signal the task loop is done
t.wg.Done()
}()

/*
Expand All @@ -117,3 +101,84 @@ func (t *TaskLoop) Run() error {
*/
return nil
}

// Wait until the chains and other resources are ready to be queried
// Returns true if the owning loop needs to exit, false if it can continue
func (t *TaskLoop) waitUntilReady() waitUntilReadyResult {
// Check the EC status
err := t.sp.WaitEthClientSynced(t.ctx, false) // Force refresh the primary / fallback EC status
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "context canceled") {
return waitUntilReadyExit
}
t.wasExecutionClientSynced = false
t.logger.Error("Execution Client not synced. Waiting for sync...", slog.String(log.ErrorKey, errMsg))
return t.sleepAndReturnReadyResult()
}

if !t.wasExecutionClientSynced {
t.logger.Info("Execution Client is now synced.")
t.wasExecutionClientSynced = true
}

// Check the BC status
err = t.sp.WaitBeaconClientSynced(t.ctx, false) // Force refresh the primary / fallback BC status
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "context canceled") {
return waitUntilReadyExit
}
// NOTE: if not synced, it returns an error - so there isn't necessarily an underlying issue
t.wasBeaconClientSynced = false
t.logger.Error("Beacon Node not synced. Waiting for sync...", slog.String(log.ErrorKey, errMsg))
return t.sleepAndReturnReadyResult()
}

if !t.wasBeaconClientSynced {
t.logger.Info("Beacon Node is now synced.")
t.wasBeaconClientSynced = true
}

// Wait until the Stakewise wallet has been initialized
err = t.sp.WaitForStakewiseWallet(t.ctx)
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "context canceled") {
return waitUntilReadyExit
}
t.logger.Error("Error waiting for Stakewise wallet initialization", slog.String(log.ErrorKey, errMsg))
return t.sleepAndReturnReadyResult()
}

return waitUntilReadySuccess
}

// Sleep on the context for the task cooldown time, and return either exit or continue
// based on whether the context was cancelled.
func (t *TaskLoop) sleepAndReturnReadyResult() waitUntilReadyResult {
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return waitUntilReadyExit
} else {
return waitUntilReadyContinue
}
}

// Runs an iteration of the node tasks.
// Returns true if the task loop should exit, false if it should continue.
func (t *TaskLoop) runTasks() bool {
// Update deposit data from the NodeSet server
if err := t.updateDepositData.Run(); err != nil {
t.logger.Error(err.Error())
}
if utils.SleepWithCancel(t.ctx, taskCooldown) {
return true
}

// Submit missing exit messages to the NodeSet server
if err := t.sendExitData.Run(); err != nil {
t.logger.Error(err.Error())
}

return utils.SleepWithCancel(t.ctx, tasksInterval)
}
Loading

0 comments on commit e96abb3

Please sign in to comment.