diff --git a/arbnode/delayed_seq_reorg_test.go b/arbnode/delayed_seq_reorg_test.go index 87d93ac3d4..699eb3e8f6 100644 --- a/arbnode/delayed_seq_reorg_test.go +++ b/arbnode/delayed_seq_reorg_test.go @@ -22,9 +22,9 @@ func TestSequencerReorgFromDelayed(t *testing.T) { tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig) Require(t, err) - err = streamer.Start(ctx, 0) + err = streamer.Start(ctx) Require(t, err) - exec.Start(ctx, 0) + exec.Start(ctx) init, err := streamer.GetMessage(0) Require(t, err) diff --git a/arbnode/delayed_sequencer.go b/arbnode/delayed_sequencer.go index 9e63d4ea03..10e845a393 100644 --- a/arbnode/delayed_sequencer.go +++ b/arbnode/delayed_sequencer.go @@ -32,6 +32,7 @@ type DelayedSequencer struct { waitingForFinalizedBlock uint64 mutex sync.Mutex config DelayedSequencerConfigFetcher + syncTillBlock uint64 } type DelayedSequencerConfig struct { @@ -64,15 +65,16 @@ var TestDelayedSequencerConfig = DelayedSequencerConfig{ UseMergeFinality: false, } -func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher) (*DelayedSequencer, error) { +func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher, syncTillBlock uint64) (*DelayedSequencer, error) { d := &DelayedSequencer{ - l1Reader: l1Reader, - bridge: reader.DelayedBridge(), - inbox: reader.Tracker(), - reader: reader, - coordinator: coordinator, - exec: exec, - config: config, + l1Reader: l1Reader, + bridge: reader.DelayedBridge(), + inbox: reader.Tracker(), + reader: reader, + coordinator: coordinator, + exec: exec, + config: config, + syncTillBlock: syncTillBlock, } if coordinator != nil { coordinator.SetDelayedSequencer(d) @@ -211,7 +213,7 @@ func (d *DelayedSequencer) ForceSequenceDelayed(ctx context.Context) error { return d.sequenceWithoutLockout(ctx, lastBlockHeader) } -func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { +func (d *DelayedSequencer) run(ctx context.Context) { headerChan, cancel := d.l1Reader.Subscribe(false) defer cancel() @@ -221,8 +223,8 @@ func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { log.Warn("error reading delayed count", "err", err) continue } - if syncTillBlock > 0 && delayedCount >= syncTillBlock { - log.Info("stopping block creation in delayed sequencer", "syncTillBlock", syncTillBlock) + if d.syncTillBlock > 0 && delayedCount >= d.syncTillBlock { + log.Info("stopping block creation in delayed sequencer", "syncTillBlock", d.syncTillBlock) return } select { @@ -241,9 +243,9 @@ func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) { } } -func (d *DelayedSequencer) Start(ctxIn context.Context, syncTillBlock uint64) { +func (d *DelayedSequencer) Start(ctxIn context.Context) { d.StopWaiter.Start(ctxIn, d) d.LaunchThread(func(ctx context.Context) { - d.run(ctx, syncTillBlock) + d.run(ctx) }) } diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index c760a82202..9c432ed013 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -96,13 +96,14 @@ type InboxReader struct { caughtUpChan chan struct{} client *ethclient.Client l1Reader *headerreader.HeaderReader + syncTillBlock uint64 // Atomic lastSeenBatchCount atomic.Uint64 lastReadBatchCount atomic.Uint64 } -func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { +func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher, syncTillBlock uint64) (*InboxReader, error) { err := config().Validate() if err != nil { return nil, err @@ -116,10 +117,11 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h firstMessageBlock: firstMessageBlock, caughtUpChan: make(chan struct{}), config: config, + syncTillBlock: syncTillBlock, }, nil } -func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error { +func (r *InboxReader) Start(ctxIn context.Context) error { r.StopWaiter.Start(ctxIn, r) hadError := false r.LaunchThread(func(ctx context.Context) { @@ -129,8 +131,8 @@ func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error { log.Warn("error reading delayed count", "err", err) hadError = true } - if syncTillBlock > 0 && delayedCount >= syncTillBlock { - log.Info("stopping block creation in inbox reader", "syncTillBlock", syncTillBlock) + if r.syncTillBlock > 0 && delayedCount >= r.syncTillBlock { + log.Info("stopping block creation in inbox reader", "syncTillBlock", r.syncTillBlock) return } err = r.run(ctx, hadError) diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index e75e9ee620..a6a4984f38 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -68,7 +68,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* } transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &DefaultTransactionStreamerConfig } - execEngine, err := gethexec.NewExecutionEngine(bc) + execEngine, err := gethexec.NewExecutionEngine(bc, 0) if err != nil { Fail(t, err) } @@ -106,9 +106,9 @@ func TestTransactionStreamer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := inbox.Start(ctx, 0) + err := inbox.Start(ctx) Require(t, err) - exec.Start(ctx, 0) + exec.Start(ctx) maxExpectedGasCost := big.NewInt(l2pricing.InitialBaseFeeWei) maxExpectedGasCost.Mul(maxExpectedGasCost, big.NewInt(2100*2)) diff --git a/arbnode/node.go b/arbnode/node.go index 3e57dba13b..28935034c6 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -79,7 +79,6 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com type Config struct { Sequencer bool `koanf:"sequencer"` - SyncTillBlock uint64 `koanf:"sync-till-block"` ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"` DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"` @@ -146,7 +145,6 @@ func (c *Config) ValidatorRequired() bool { func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) { f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer") - f.Uint64(prefix+".sync-till-block", ConfigDefault.SyncTillBlock, "sync till block") headerreader.AddOptions(prefix+".parent-chain-reader", f) InboxReaderConfigAddOptions(prefix+".inbox-reader", f) DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f) @@ -165,7 +163,6 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed var ConfigDefault = Config{ Sequencer: false, - SyncTillBlock: 0, ParentChainReader: headerreader.DefaultConfig, InboxReader: DefaultInboxReaderConfig, DelayedSequencer: DefaultDelayedSequencerConfig, @@ -473,7 +470,7 @@ func createNodeImpl( } if config.SeqCoordinator.Enable { - coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator) + coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator, config.TransactionStreamer.SyncTillBlock) if err != nil { return nil, err } @@ -501,6 +498,7 @@ func createNodeImpl( nil, fatalErrChan, bpVerifier, + config.TransactionStreamer.SyncTillBlock, ) if err != nil { return nil, err @@ -592,7 +590,7 @@ func createNodeImpl( if err != nil { return nil, err } - inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }) + inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }, configFetcher.Get().TransactionStreamer.SyncTillBlock) if err != nil { return nil, err } @@ -741,7 +739,7 @@ func createNodeImpl( } // always create DelayedSequencer, it won't do anything if it is disabled - delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }) + delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }, configFetcher.Get().TransactionStreamer.SyncTillBlock) if err != nil { return nil, err } @@ -842,7 +840,7 @@ func (n *Node) Start(ctx context.Context) error { if execClient != nil { execClient.SetConsensusClient(n) } - err = n.Execution.Start(ctx, n.configFetcher.Get().SyncTillBlock) + err = n.Execution.Start(ctx) if err != nil { return fmt.Errorf("error starting exec client: %w", err) } @@ -872,12 +870,12 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("error populating feed backlog on startup: %w", err) } } - err = n.TxStreamer.Start(ctx, n.configFetcher.Get().SyncTillBlock) + err = n.TxStreamer.Start(ctx) if err != nil { return fmt.Errorf("error starting transaction streamer: %w", err) } if n.InboxReader != nil { - err = n.InboxReader.Start(ctx, n.configFetcher.Get().SyncTillBlock) + err = n.InboxReader.Start(ctx) if err != nil { return fmt.Errorf("error starting inbox reader: %w", err) } @@ -890,7 +888,7 @@ func (n *Node) Start(ctx context.Context) error { } } if n.SeqCoordinator != nil { - n.SeqCoordinator.Start(ctx, n.configFetcher.Get().SyncTillBlock) + n.SeqCoordinator.Start(ctx) } else { n.Execution.Activate() } @@ -898,7 +896,7 @@ func (n *Node) Start(ctx context.Context) error { n.MaintenanceRunner.Start(ctx) } if n.DelayedSequencer != nil { - n.DelayedSequencer.Start(ctx, n.configFetcher.Get().SyncTillBlock) + n.DelayedSequencer.Start(ctx) } if n.BatchPoster != nil { n.BatchPoster.Start(ctx) @@ -948,7 +946,7 @@ func (n *Node) Start(ctx context.Context) error { return } } - n.BroadcastClients.Start(ctx, n.configFetcher.Get().SyncTillBlock) + n.BroadcastClients.Start(ctx) }() } if n.configFetcher != nil { diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 6563ffde9a..a94bdc0ac1 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -51,6 +51,7 @@ type SeqCoordinator struct { prevChosenSequencer string reportedWantsLockout bool + syncTillBlock uint64 lockoutUntil atomic.Int64 // atomic @@ -150,6 +151,7 @@ func NewSeqCoordinator( sequencer execution.ExecutionSequencer, sync *SyncMonitor, config SeqCoordinatorConfig, + syncTillBlock uint64, ) (*SeqCoordinator, error) { redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl) if err != nil { @@ -166,6 +168,7 @@ func NewSeqCoordinator( sequencer: sequencer, config: config, signer: signer, + syncTillBlock: syncTillBlock, } streamer.SetSeqCoordinator(coordinator) return coordinator, nil @@ -854,7 +857,7 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) { } } -func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) { +func (c *SeqCoordinator) Start(ctxIn context.Context) { c.StopWaiter.Start(ctxIn, c) var newRedisCoordinator *redisutil.RedisCoordinator if c.config.NewRedisUrl != "" { @@ -872,8 +875,8 @@ func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) { if err != nil { log.Warn("failed to get message count", "err", err) } - if syncTillBlock > 0 && uint64(count) >= syncTillBlock { - log.Info("stopping block creation in sequencer", "syncTillBlock", syncTillBlock) + if c.syncTillBlock > 0 && uint64(count) >= c.syncTillBlock { + log.Info("stopping block creation in sequencer", "syncTillBlock", c.syncTillBlock) return } interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index a279bb378c..a0607a166e 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -74,6 +74,7 @@ type TransactionStreamerConfig struct { MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"` MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"` ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` + SyncTillBlock uint64 `koanf:"sync-till-block"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig @@ -82,18 +83,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 50_000, MaxReorgResequenceDepth: 1024, ExecuteMessageLoopDelay: time.Millisecond * 100, + SyncTillBlock: 0, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ MaxBroadcasterQueueSize: 10_000, MaxReorgResequenceDepth: 128 * 1024, ExecuteMessageLoopDelay: time.Millisecond, + SyncTillBlock: 0, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages") f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)") f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") + f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block") } func NewTransactionStreamer( @@ -1218,14 +1222,14 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc return s.config().ExecuteMessageLoopDelay } -func (s *TransactionStreamer) Start(ctxIn context.Context, syncTillBlock uint64) error { +func (s *TransactionStreamer) Start(ctxIn context.Context) error { s.StopWaiter.Start(ctxIn, s) return s.LaunchThreadSafe(func(ctx context.Context) { var defaultVal struct{} var val struct{} for { - if syncTillBlock > 0 && uint64(s.execLastMsgCount) >= syncTillBlock { - log.Info("stopping block creation in transaction streamer", "syncTillBlock", syncTillBlock) + if s.config().SyncTillBlock > 0 && uint64(s.execLastMsgCount) >= s.config().SyncTillBlock { + log.Info("stopping block creation in transaction streamer", "syncTillBlock", s.config().SyncTillBlock) return } interval := s.executeMessages(ctx, val) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index a539c104cf..08b3df3518 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -135,6 +135,8 @@ type BroadcastClient struct { retryCount atomic.Int64 + syncTillBlock uint64 + retrying bool shuttingDown bool confirmedSequenceNumberListener chan arbutil.MessageIndex @@ -158,6 +160,7 @@ func NewBroadcastClient( fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, adjustCount func(int32), + syncTillBlock uint64, ) (*BroadcastClient, error) { sigVerifier, err := signature.NewVerifier(&config().Verify, addrVerifier) if err != nil { @@ -173,10 +176,11 @@ func NewBroadcastClient( fatalErrChan: fatalErrChan, sigVerifier: sigVerifier, adjustCount: adjustCount, + syncTillBlock: syncTillBlock, }, err } -func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) { +func (bc *BroadcastClient) Start(ctxIn context.Context) { bc.StopWaiter.Start(ctxIn, bc) if bc.StopWaiter.Stopped() { log.Info("broadcast client has already been stopped, not starting") @@ -186,8 +190,8 @@ func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) { backoffDuration := bc.config().ReconnectInitialBackoff for { - if syncTillBlock > 0 && uint64(bc.nextSeqNum) >= syncTillBlock { - log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock) + if bc.syncTillBlock > 0 && uint64(bc.nextSeqNum) >= bc.syncTillBlock { + log.Info("stopping block creation in broadcast client", "syncTillBlock", bc.syncTillBlock) return } earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) diff --git a/broadcastclient/broadcastclient_test.go b/broadcastclient/broadcastclient_test.go index ecf5a88b3b..6ffa58e621 100644 --- a/broadcastclient/broadcastclient_test.go +++ b/broadcastclient/broadcastclient_test.go @@ -153,7 +153,7 @@ func TestInvalidSignature(t *testing.T) { &badSequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) go func() { for i := 0; i < messageCount; i++ { @@ -210,7 +210,7 @@ func newTestBroadcastClient(config Config, listenerAddress net.Addr, chainId uin } else { config.Verify.AcceptSequencer = false } - return NewBroadcastClient(func() *Config { return &config }, fmt.Sprintf("ws://127.0.0.1:%d/", port), chainId, currentMessageCount, txStreamer, confirmedSequenceNumberListener, feedErrChan, av, func(_ int32) {}) + return NewBroadcastClient(func() *Config { return &config }, fmt.Sprintf("ws://127.0.0.1:%d/", port), chainId, currentMessageCount, txStreamer, confirmedSequenceNumberListener, feedErrChan, av, func(_ int32) {}, 0) } func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Config, addr net.Addr, index int, expectedCount int, chainId uint64, wg *sync.WaitGroup, sequencerAddr *common.Address) { @@ -227,7 +227,7 @@ func startMakeBroadcastClient(ctx context.Context, t *testing.T, clientConfig Co sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) messageCount := 0 wg.Add(1) @@ -315,7 +315,7 @@ func TestServerClientDisconnect(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) t.Log("broadcasting seq 0 message") Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil)) @@ -386,7 +386,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) t.Log("broadcasting seq 0 message") Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil)) @@ -458,7 +458,7 @@ func TestServerIncorrectChainId(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -517,7 +517,7 @@ func TestServerMissingChainId(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -574,7 +574,7 @@ func TestServerIncorrectFeedServerVersion(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -633,7 +633,7 @@ func TestServerMissingFeedServerVersion(t *testing.T) { &sequencerAddr, ) Require(t, err) - badBroadcastClient.Start(ctx, 0) + badBroadcastClient.Start(ctx) badTimer := time.NewTimer(5 * time.Second) select { case err := <-feedErrChan: @@ -684,7 +684,7 @@ func TestBroadcastClientReconnectsOnServerDisconnect(t *testing.T) { &sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) defer broadcastClient.StopAndWait() // Client set to timeout connection at 200 milliseconds, and server set to send ping every 50 seconds, @@ -796,7 +796,7 @@ func connectAndGetCachedMessages(ctx context.Context, addr net.Addr, chainId uin sequencerAddr, ) Require(t, err) - broadcastClient.Start(ctx, 0) + broadcastClient.Start(ctx) go func() { defer wg.Done() diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index a2c4fa13dd..a72b6088e0 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -48,6 +48,8 @@ type BroadcastClients struct { primaryRouter *Router secondaryRouter *Router + syncTillBlock uint64 + // Use atomic access connected atomic.Int32 } @@ -60,6 +62,7 @@ func NewBroadcastClients( confirmedSequenceNumberListener chan arbutil.MessageIndex, fatalErrChan chan error, addrVerifier contracts.AddressVerifierInterface, + syncTillBlock uint64, ) (*BroadcastClients, error) { config := configFetcher() if len(config.URL) == 0 && len(config.SecondaryURL) == 0 { @@ -79,6 +82,7 @@ func NewBroadcastClients( primaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.URL)), secondaryClients: make([]*broadcastclient.BroadcastClient, 0, len(config.SecondaryURL)), secondaryURL: config.SecondaryURL, + syncTillBlock: syncTillBlock, } clients.makeClient = func(url string, router *Router) (*broadcastclient.BroadcastClient, error) { return broadcastclient.NewBroadcastClient( @@ -91,6 +95,7 @@ func NewBroadcastClients( fatalErrChan, addrVerifier, func(delta int32) { clients.adjustCount(delta) }, + syncTillBlock, ) } @@ -131,12 +136,12 @@ func clearAndResetTicker(timer *time.Ticker, interval time.Duration) { timer.Reset(interval) } -func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { +func (bcs *BroadcastClients) Start(ctx context.Context) { bcs.primaryRouter.StopWaiter.Start(ctx, bcs.primaryRouter) bcs.secondaryRouter.StopWaiter.Start(ctx, bcs.secondaryRouter) for _, client := range bcs.primaryClients { - client.Start(ctx, syncTillBlock) + client.Start(ctx) } var lastConfirmed arbutil.MessageIndex @@ -178,8 +183,8 @@ func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { // Multiple select statements to prioritize reading messages from primary feeds' channels and avoid starving of timers var msg m.BroadcastFeedMessage for { - if syncTillBlock > 0 && uint64(msg.SequenceNumber) >= syncTillBlock { - log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock) + if bcs.syncTillBlock > 0 && uint64(msg.SequenceNumber) >= bcs.syncTillBlock { + log.Info("stopping block creation in broadcast client", "syncTillBlock", bcs.syncTillBlock) return } select { @@ -234,7 +239,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { clearAndResetTicker(startSecondaryFeedTimer, MAX_FEED_INACTIVE_TIME) clearAndResetTicker(primaryFeedIsDownTimer, MAX_FEED_INACTIVE_TIME) case <-startSecondaryFeedTimer.C: - bcs.startSecondaryFeed(ctx, syncTillBlock) + bcs.startSecondaryFeed(ctx) case <-primaryFeedIsDownTimer.C: clearAndResetTicker(stopSecondaryFeedTimer, PRIMARY_FEED_UPTIME) } @@ -243,7 +248,7 @@ func (bcs *BroadcastClients) Start(ctx context.Context, syncTillBlock uint64) { }) } -func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context, syncTillBlock uint64) { +func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) { pos := len(bcs.secondaryClients) if pos < len(bcs.secondaryURL) { url := bcs.secondaryURL[pos] @@ -254,7 +259,7 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context, syncTillBlo return } bcs.secondaryClients = append(bcs.secondaryClients, client) - client.Start(ctx, syncTillBlock) + client.Start(ctx) log.Info("secondary feed started", "url", url) } else if len(bcs.secondaryURL) > 0 { log.Warn("failed to start a new secondary feed all available secondary feeds were started") diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 076ac66ec4..95048be325 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -527,6 +527,7 @@ func mainImpl() int { l2BlockChain, l1Client, func() *gethexec.Config { return &liveNodeConfig.Get().Execution }, + liveNodeConfig.Get().Node.TransactionStreamer.SyncTillBlock, ) if err != nil { log.Error("failed to create execution node", "err", err) diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index e7d639bf42..b8925cd5b1 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -92,6 +92,7 @@ type ExecutionEngine struct { prefetchBlock bool cachedL1PriceData *L1PriceData + syncTillBlock uint64 } func NewL1PriceData() *L1PriceData { @@ -100,12 +101,13 @@ func NewL1PriceData() *L1PriceData { } } -func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { +func NewExecutionEngine(bc *core.BlockChain, syncTillBlock uint64) (*ExecutionEngine, error) { return &ExecutionEngine{ bc: bc, resequenceChan: make(chan []*arbostypes.MessageWithMetadata), newBlockNotifier: make(chan struct{}, 1), cachedL1PriceData: NewL1PriceData(), + syncTillBlock: syncTillBlock, }, nil } @@ -943,12 +945,12 @@ func (s *ExecutionEngine) ArbOSVersionForMessageNumber(messageNum arbutil.Messag return extra.ArbOSFormatVersion, nil } -func (s *ExecutionEngine) Start(ctx_in context.Context, syncTillBlock uint64) { +func (s *ExecutionEngine) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) s.LaunchThread(func(ctx context.Context) { for { - if syncTillBlock > 0 && s.latestBlock.NumberU64() >= syncTillBlock { - log.Info("stopping block creation in execution engine", "syncTillBlock", syncTillBlock) + if s.syncTillBlock > 0 && s.latestBlock.NumberU64() >= s.syncTillBlock { + log.Info("stopping block creation in execution engine", "syncTillBlock", s.syncTillBlock) return } select { diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index e2e3f7282f..3f743887be 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -173,6 +173,7 @@ type ExecutionNode struct { ParentChainReader *headerreader.HeaderReader ClassicOutbox *ClassicOutboxRetriever started atomic.Bool + syncTillBlock uint64 } func CreateExecutionNode( @@ -182,9 +183,10 @@ func CreateExecutionNode( l2BlockChain *core.BlockChain, l1client *ethclient.Client, configFetcher ConfigFetcher, + syncTillBlock uint64, ) (*ExecutionNode, error) { config := configFetcher() - execEngine, err := NewExecutionEngine(l2BlockChain) + execEngine, err := NewExecutionEngine(l2BlockChain, syncTillBlock) if config.EnablePrefetchBlock { execEngine.EnablePrefetchBlock() } @@ -308,6 +310,7 @@ func CreateExecutionNode( SyncMonitor: syncMon, ParentChainReader: parentChainReader, ClassicOutbox: classicOutbox, + syncTillBlock: syncTillBlock, }, nil } @@ -339,7 +342,7 @@ func (n *ExecutionNode) Initialize(ctx context.Context) error { } // not thread safe -func (n *ExecutionNode) Start(ctx context.Context, syncTillBlock uint64) error { +func (n *ExecutionNode) Start(ctx context.Context) error { if n.started.Swap(true) { return errors.New("already started") } @@ -348,7 +351,7 @@ func (n *ExecutionNode) Start(ctx context.Context, syncTillBlock uint64) error { // if err != nil { // return fmt.Errorf("error starting geth stack: %w", err) // } - n.ExecEngine.Start(ctx, syncTillBlock) + n.ExecEngine.Start(ctx) err := n.TxPublisher.Start(ctx) if err != nil { return fmt.Errorf("error starting transaction puiblisher: %w", err) diff --git a/execution/interface.go b/execution/interface.go index 428b79cf89..2a3d79c697 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -64,7 +64,7 @@ type FullExecutionClient interface { ExecutionRecorder ExecutionSequencer - Start(ctx context.Context, syncTillBlock uint64) error + Start(ctx context.Context) error StopAndWait() Maintenance() error diff --git a/relay/relay.go b/relay/relay.go index 936e3e5b1a..ac8906661d 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -56,6 +56,7 @@ func NewRelay(config *Config, feedErrChan chan error) (*Relay, error) { confirmedSequenceNumberListener, feedErrChan, nil, + 0, ) if err != nil { return nil, err @@ -86,7 +87,7 @@ func (r *Relay) Start(ctx context.Context) error { return errors.New("broadcast unable to start") } - r.broadcastClients.Start(ctx, 0) + r.broadcastClients.Start(ctx) r.LaunchThread(func(ctx context.Context) { for { diff --git a/relay/relay_stress_test.go b/relay/relay_stress_test.go index 287d1a795b..5d469e5f57 100644 --- a/relay/relay_stress_test.go +++ b/relay/relay_stress_test.go @@ -138,11 +138,11 @@ func largeBacklogRelayTestImpl(t *testing.T, numClients, backlogSize, l2MsgSize for i := 0; i < numClients; i++ { ts := &dummyTxStreamer{id: i} streamers = append(streamers, ts) - client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}) + client, err := broadcastclient.NewBroadcastClient(func() *broadcastclient.Config { return &clientConfig }, relayURL, relayConfig.Chain.ID, 0, ts, nil, fatalErrChan, nil, func(_ int32) {}, 0) if err != nil { t.FailNow() } - client.Start(ctx, 0) + client.Start(ctx) defer client.StopOnly() } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 027a41d875..e388441d3c 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -476,7 +476,7 @@ func buildOnParentChain( Require(t, execConfig.Validate()) execConfigToBeUsedInConfigFetcher := execConfig execConfigFetcher := func() *gethexec.Config { return execConfigToBeUsedInConfigFetcher } - execNode, err := gethexec.CreateExecutionNode(ctx, chainTestClient.Stack, chainDb, blockchain, parentChainTestClient.Client, execConfigFetcher) + execNode, err := gethexec.CreateExecutionNode(ctx, chainTestClient.Stack, chainDb, blockchain, parentChainTestClient.Client, execConfigFetcher, 0) Require(t, err) fatalErrChan := make(chan error, 10) @@ -596,7 +596,7 @@ func (b *NodeBuilder) BuildL2(t *testing.T) func() { Require(t, b.execConfig.Validate()) execConfig := b.execConfig execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher) + execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher, 0) Require(t, err) fatalErrChan := make(chan error, 10) @@ -645,7 +645,7 @@ func (b *NodeBuilder) RestartL2Node(t *testing.T) { l2info, stack, chainDb, arbDb, blockchain := createNonL1BlockChainWithStackConfig(t, b.L2Info, b.dataDir, b.chainConfig, b.initMessage, b.l2StackConfig, b.execConfig, b.wasmCacheTag) execConfigFetcher := func() *gethexec.Config { return b.execConfig } - execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher) + execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher, 0) Require(t, err) feedErrChan := make(chan error, 10) @@ -1445,7 +1445,7 @@ func Create2ndNodeWithConfig( Require(t, execConfig.Validate()) Require(t, nodeConfig.Validate()) configFetcher := func() *gethexec.Config { return execConfig } - currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher) + currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher, 0) Require(t, err) currentNode, err := arbnode.CreateNode(ctx, chainStack, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil)