Skip to content

Commit

Permalink
refactor(main): bau as long as there is >=1 healthy beacon node
Browse files Browse the repository at this point in the history
  • Loading branch information
mattevans committed Feb 7, 2025
1 parent 7ce7948 commit 3a27bbf
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 124 deletions.
152 changes: 125 additions & 27 deletions cmd/sentry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
var log = logrus.New()

type beaconNodeInstance struct {
node *ethereum.BeaconNode
node ethereum.BeaconNodeAPI
cache *events.DuplicateCache
sinks []sinks.ContributoorSink
metrics *events.Metrics
Expand Down Expand Up @@ -168,7 +168,7 @@ func main() {
return err
}

if err := s.initBeaconNodes(ctx); err != nil {
if err := s.initBeacons(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -250,11 +250,31 @@ func (s *contributoor) start(ctx context.Context) error {
}

for _, instance := range s.beaconNodes {
// Start the cache.
instance.cache.Start()
go instance.summary.Start(ctx)

// We don't really want to output/log a summary until the node is healthy.
// Wait for node to become healthy before starting summary.
go func(instance *beaconNodeInstance) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if node, ok := instance.node.(*ethereum.BeaconNode); ok && node.IsHealthy() {
instance.summary.Start(ctx)

return
}
}
}
}(instance)
}

return s.startBeaconNodes(ctx)
return s.connectBeacons(ctx)
}

func (s *contributoor) stop(ctx context.Context) error {
Expand All @@ -267,7 +287,7 @@ func (s *contributoor) stop(ctx context.Context) error {
for traceID, instance := range s.beaconNodes {
// Stop the beacon + any sinks we have.
if err := instance.node.Stop(stopCtx); err != nil {
s.log.WithError(err).WithField("trace_id", traceID).Error("Failed to stop beacon node")
s.log.WithError(err).WithField("trace_id", traceID).Error("Failed to stop beacon")
}

for _, sink := range instance.sinks {
Expand Down Expand Up @@ -411,43 +431,114 @@ func (s *contributoor) startHealthCheckServer() error {
return nil
}

func (s *contributoor) startBeaconNodes(ctx context.Context) error {
func (s *contributoor) connectBeacons(ctx context.Context) error {
var (
errChan = make(chan error, len(s.beaconNodes))
doneChan = make(chan struct{}, len(s.beaconNodes))
timeout = time.After(10 * time.Minute)
errChan = make(chan error, len(s.beaconNodes))
doneChan = make(chan string, len(s.beaconNodes))
timeout = time.After(30 * time.Second)
healthyNodes = make(map[string]struct{})
)

// Start all beacons concurrently.
// Connect to all beacon nodes concurrently.
for traceID, instance := range s.beaconNodes {
go func() {
if err := instance.node.Start(ctx); err != nil {
errChan <- fmt.Errorf("failed to start beacon node %s: %w", traceID, err)
readyChan, err := instance.node.Start(ctx)
if err != nil {
errChan <- fmt.Errorf("failed to connect to beacon %s: %w", traceID, err)

return
}

doneChan <- struct{}{}
// Wait for the node to become healthy.
select {
case <-readyChan:
doneChan <- traceID
case <-ctx.Done():
errChan <- fmt.Errorf("context cancelled while waiting to connect to beacon %s", traceID)
}
}()
}

// Wait for all nodes to start or error.
for i := 0; i < len(s.beaconNodes); i++ {
// Wait for at least one node to connect successfully or all to fail.
remainingNodes := len(s.beaconNodes)
for remainingNodes > 0 {
select {
case err := <-errChan:
return err
case <-doneChan:
continue
s.log.WithError(err).Error("Failed to connect to beacon")

remainingNodes--

// If we've failed on all nodes, return the last error.
if remainingNodes == 0 && len(healthyNodes) == 0 {
return fmt.Errorf("all beacons failed to connect: %w", err)
}
case traceID := <-doneChan:
s.log.WithField("trace_id", traceID).Info("Beacon connected successfully")

healthyNodes[traceID] = struct{}{}
remainingNodes--

// If we have at least one healthy node, we're good to grab metrics from that.
// Process any remaining nodes in background.
if len(healthyNodes) == 1 && remainingNodes > 0 {
s.connectRemainingBeaconNodesInBackground(ctx, &remainingNodes, errChan, doneChan, healthyNodes)

return nil
}
case <-ctx.Done():
return ctx.Err()
case <-timeout:
return fmt.Errorf("timeout waiting for beacon nodes to start")
// Only timeout if we have no healthy nodes.
if len(healthyNodes) == 0 {
return fmt.Errorf("timeout waiting to connect to any beacon")
}

// If we have healthy nodes, continue waiting on the others in the background.
s.connectRemainingBeaconNodesInBackground(ctx, &remainingNodes, errChan, doneChan, healthyNodes)

return nil
}
}

// If we get here with no healthy nodes, all nodes failed.
if len(healthyNodes) == 0 {
return fmt.Errorf("all beacons failed to connect")
}

return nil
}

func (s *contributoor) connectRemainingBeaconNodesInBackground(
ctx context.Context,
remainingNodes *int,
errChan chan error,
doneChan chan string,
healthyNodes map[string]struct{},
) {
s.log.WithFields(logrus.Fields{
"healthy_nodes": len(healthyNodes),
"remaining_nodes": *remainingNodes,
}).Info("Continuing beacon connection in background")

go func() {
for *remainingNodes > 0 {
select {
case err := <-errChan:
s.log.WithError(err).Error("Failed to connect to beacon in background")

*remainingNodes--
case traceID := <-doneChan:
s.log.WithField("trace_id", traceID).Info("Additional beacon connected successfully")

healthyNodes[traceID] = struct{}{}
*remainingNodes--
case <-ctx.Done():
return
}
}
}()
}

func (s *contributoor) initClockDrift(ctx context.Context) error {
clockDriftService := clockdrift.NewService(s.log, &clockdrift.ClockDriftConfig{
NTPServer: "pool.ntp.org",
Expand All @@ -463,7 +554,7 @@ func (s *contributoor) initClockDrift(ctx context.Context) error {
return nil
}

func (s *contributoor) initBeaconNodes(ctx context.Context) error {
func (s *contributoor) initBeacons(ctx context.Context) error {
addresses := strings.Split(s.config.BeaconNodeAddress, ",")

traceIDs, err := generateBeaconTraceIDs(addresses)
Expand All @@ -477,17 +568,17 @@ func (s *contributoor) initBeaconNodes(ctx context.Context) error {
"count": len(addresses),
"trace_ids": traceIDs,
"addresses": addresses,
}).Info("Initializing beacon nodes")
}).Info("Initializing beacons")

for i, address := range addresses {
address = strings.TrimSpace(address)
traceID := traceIDs[i]

logCtx := s.log.WithField("trace_id", traceID)

instance, err := s.createBeaconNodeInstance(ctx, address, traceID, logCtx)
instance, err := s.createBeaconInstance(ctx, address, traceID, logCtx)
if err != nil {
return fmt.Errorf("failed to create beacon node instance: %w", err)
return fmt.Errorf("failed to create beacon instance: %w", err)
}

s.beaconNodes[traceID] = instance
Expand All @@ -496,7 +587,14 @@ func (s *contributoor) initBeaconNodes(ctx context.Context) error {
return nil
}

func (s *contributoor) initBeaconNode(log logrus.FieldLogger, address, traceID string, sinks []sinks.ContributoorSink, cache *events.DuplicateCache, summary *events.Summary, metrics *events.Metrics) (*ethereum.BeaconNode, error) {
func (s *contributoor) initBeacon(
log logrus.FieldLogger,
address, traceID string,
sinks []sinks.ContributoorSink,
cache *events.DuplicateCache,
summary *events.Summary,
metrics *events.Metrics,
) (ethereum.BeaconNodeAPI, error) {
return ethereum.NewBeaconNode(
log,
traceID,
Expand Down Expand Up @@ -556,7 +654,7 @@ func (s *contributoor) initSinks(ctx context.Context, log logrus.FieldLogger) ([
return eventSinks, nil
}

func (s *contributoor) createBeaconNodeInstance(ctx context.Context, address, traceID string, log logrus.FieldLogger) (*beaconNodeInstance, error) {
func (s *contributoor) createBeaconInstance(ctx context.Context, address, traceID string, log logrus.FieldLogger) (*beaconNodeInstance, error) {
cache, err := s.initCache()
if err != nil {
return nil, fmt.Errorf("failed to init cache: %w", err)
Expand All @@ -577,9 +675,9 @@ func (s *contributoor) createBeaconNodeInstance(ctx context.Context, address, tr
return nil, fmt.Errorf("failed to init sinks: %w", err)
}

node, err := s.initBeaconNode(log, address, traceID, sinks, cache, summary, metrics)
node, err := s.initBeacon(log, address, traceID, sinks, cache, summary, metrics)
if err != nil {
return nil, fmt.Errorf("failed to init beacon node: %w", err)
return nil, fmt.Errorf("failed to init beacon: %w", err)
}

return &beaconNodeInstance{
Expand Down
Loading

0 comments on commit 3a27bbf

Please sign in to comment.