Skip to content

Commit

Permalink
Changes based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Nov 5, 2024
1 parent bf7f8cd commit 68c34bf
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 76 deletions.
4 changes: 2 additions & 2 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

err = streamer.Start(ctx, 0)
err = streamer.Start(ctx)
Require(t, err)
exec.Start(ctx, 0)
exec.Start(ctx)
init, err := streamer.GetMessage(0)
Require(t, err)

Expand Down
28 changes: 15 additions & 13 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type DelayedSequencer struct {
waitingForFinalizedBlock uint64
mutex sync.Mutex
config DelayedSequencerConfigFetcher
syncTillBlock uint64
}

type DelayedSequencerConfig struct {
Expand Down Expand Up @@ -64,15 +65,16 @@ var TestDelayedSequencerConfig = DelayedSequencerConfig{
UseMergeFinality: false,
}

func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher) (*DelayedSequencer, error) {
func NewDelayedSequencer(l1Reader *headerreader.HeaderReader, reader *InboxReader, exec execution.ExecutionSequencer, coordinator *SeqCoordinator, config DelayedSequencerConfigFetcher, syncTillBlock uint64) (*DelayedSequencer, error) {
d := &DelayedSequencer{
l1Reader: l1Reader,
bridge: reader.DelayedBridge(),
inbox: reader.Tracker(),
reader: reader,
coordinator: coordinator,
exec: exec,
config: config,
l1Reader: l1Reader,
bridge: reader.DelayedBridge(),
inbox: reader.Tracker(),
reader: reader,
coordinator: coordinator,
exec: exec,
config: config,
syncTillBlock: syncTillBlock,
}
if coordinator != nil {
coordinator.SetDelayedSequencer(d)
Expand Down Expand Up @@ -211,7 +213,7 @@ func (d *DelayedSequencer) ForceSequenceDelayed(ctx context.Context) error {
return d.sequenceWithoutLockout(ctx, lastBlockHeader)
}

func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) {
func (d *DelayedSequencer) run(ctx context.Context) {
headerChan, cancel := d.l1Reader.Subscribe(false)
defer cancel()

Expand All @@ -221,8 +223,8 @@ func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) {
log.Warn("error reading delayed count", "err", err)
continue
}
if syncTillBlock > 0 && delayedCount >= syncTillBlock {
log.Info("stopping block creation in delayed sequencer", "syncTillBlock", syncTillBlock)
if d.syncTillBlock > 0 && delayedCount >= d.syncTillBlock {
log.Info("stopping block creation in delayed sequencer", "syncTillBlock", d.syncTillBlock)
return
}
select {
Expand All @@ -241,9 +243,9 @@ func (d *DelayedSequencer) run(ctx context.Context, syncTillBlock uint64) {
}
}

func (d *DelayedSequencer) Start(ctxIn context.Context, syncTillBlock uint64) {
func (d *DelayedSequencer) Start(ctxIn context.Context) {
d.StopWaiter.Start(ctxIn, d)
d.LaunchThread(func(ctx context.Context) {
d.run(ctx, syncTillBlock)
d.run(ctx)
})
}
10 changes: 6 additions & 4 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ type InboxReader struct {
caughtUpChan chan struct{}
client *ethclient.Client
l1Reader *headerreader.HeaderReader
syncTillBlock uint64

// Atomic
lastSeenBatchCount atomic.Uint64
lastReadBatchCount atomic.Uint64
}

func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher, syncTillBlock uint64) (*InboxReader, error) {
err := config().Validate()
if err != nil {
return nil, err
Expand All @@ -116,10 +117,11 @@ func NewInboxReader(tracker *InboxTracker, client *ethclient.Client, l1Reader *h
firstMessageBlock: firstMessageBlock,
caughtUpChan: make(chan struct{}),
config: config,
syncTillBlock: syncTillBlock,
}, nil
}

func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error {
func (r *InboxReader) Start(ctxIn context.Context) error {
r.StopWaiter.Start(ctxIn, r)
hadError := false
r.LaunchThread(func(ctx context.Context) {
Expand All @@ -129,8 +131,8 @@ func (r *InboxReader) Start(ctxIn context.Context, syncTillBlock uint64) error {
log.Warn("error reading delayed count", "err", err)
hadError = true
}
if syncTillBlock > 0 && delayedCount >= syncTillBlock {
log.Info("stopping block creation in inbox reader", "syncTillBlock", syncTillBlock)
if r.syncTillBlock > 0 && delayedCount >= r.syncTillBlock {
log.Info("stopping block creation in inbox reader", "syncTillBlock", r.syncTillBlock)
return
}
err = r.run(ctx, hadError)
Expand Down
6 changes: 3 additions & 3 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
}

transactionStreamerConfigFetcher := func() *TransactionStreamerConfig { return &DefaultTransactionStreamerConfig }
execEngine, err := gethexec.NewExecutionEngine(bc)
execEngine, err := gethexec.NewExecutionEngine(bc, 0)
if err != nil {
Fail(t, err)
}
Expand Down Expand Up @@ -106,9 +106,9 @@ func TestTransactionStreamer(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := inbox.Start(ctx, 0)
err := inbox.Start(ctx)
Require(t, err)
exec.Start(ctx, 0)
exec.Start(ctx)

maxExpectedGasCost := big.NewInt(l2pricing.InitialBaseFeeWei)
maxExpectedGasCost.Mul(maxExpectedGasCost, big.NewInt(2100*2))
Expand Down
22 changes: 10 additions & 12 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func GenerateRollupConfig(prod bool, wasmModuleRoot common.Hash, rollupOwner com

type Config struct {
Sequencer bool `koanf:"sequencer"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
Expand Down Expand Up @@ -146,7 +145,6 @@ func (c *Config) ValidatorRequired() bool {

func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feedOutputEnable bool) {
f.Bool(prefix+".sequencer", ConfigDefault.Sequencer, "enable sequencer")
f.Uint64(prefix+".sync-till-block", ConfigDefault.SyncTillBlock, "sync till block")
headerreader.AddOptions(prefix+".parent-chain-reader", f)
InboxReaderConfigAddOptions(prefix+".inbox-reader", f)
DelayedSequencerConfigAddOptions(prefix+".delayed-sequencer", f)
Expand All @@ -165,7 +163,6 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed

var ConfigDefault = Config{
Sequencer: false,
SyncTillBlock: 0,
ParentChainReader: headerreader.DefaultConfig,
InboxReader: DefaultInboxReaderConfig,
DelayedSequencer: DefaultDelayedSequencerConfig,
Expand Down Expand Up @@ -473,7 +470,7 @@ func createNodeImpl(
}

if config.SeqCoordinator.Enable {
coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator)
coordinator, err = NewSeqCoordinator(dataSigner, bpVerifier, txStreamer, exec, syncMonitor, config.SeqCoordinator, config.TransactionStreamer.SyncTillBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -501,6 +498,7 @@ func createNodeImpl(
nil,
fatalErrChan,
bpVerifier,
config.TransactionStreamer.SyncTillBlock,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -592,7 +590,7 @@ func createNodeImpl(
if err != nil {
return nil, err
}
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader })
inboxReader, err := NewInboxReader(inboxTracker, l1client, l1Reader, new(big.Int).SetUint64(deployInfo.DeployedAt), delayedBridge, sequencerInbox, func() *InboxReaderConfig { return &configFetcher.Get().InboxReader }, configFetcher.Get().TransactionStreamer.SyncTillBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -741,7 +739,7 @@ func createNodeImpl(
}

// always create DelayedSequencer, it won't do anything if it is disabled
delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer })
delayedSequencer, err = NewDelayedSequencer(l1Reader, inboxReader, exec, coordinator, func() *DelayedSequencerConfig { return &configFetcher.Get().DelayedSequencer }, configFetcher.Get().TransactionStreamer.SyncTillBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -842,7 +840,7 @@ func (n *Node) Start(ctx context.Context) error {
if execClient != nil {
execClient.SetConsensusClient(n)
}
err = n.Execution.Start(ctx, n.configFetcher.Get().SyncTillBlock)
err = n.Execution.Start(ctx)
if err != nil {
return fmt.Errorf("error starting exec client: %w", err)
}
Expand Down Expand Up @@ -872,12 +870,12 @@ func (n *Node) Start(ctx context.Context) error {
return fmt.Errorf("error populating feed backlog on startup: %w", err)
}
}
err = n.TxStreamer.Start(ctx, n.configFetcher.Get().SyncTillBlock)
err = n.TxStreamer.Start(ctx)
if err != nil {
return fmt.Errorf("error starting transaction streamer: %w", err)
}
if n.InboxReader != nil {
err = n.InboxReader.Start(ctx, n.configFetcher.Get().SyncTillBlock)
err = n.InboxReader.Start(ctx)
if err != nil {
return fmt.Errorf("error starting inbox reader: %w", err)
}
Expand All @@ -890,15 +888,15 @@ func (n *Node) Start(ctx context.Context) error {
}
}
if n.SeqCoordinator != nil {
n.SeqCoordinator.Start(ctx, n.configFetcher.Get().SyncTillBlock)
n.SeqCoordinator.Start(ctx)
} else {
n.Execution.Activate()
}
if n.MaintenanceRunner != nil {
n.MaintenanceRunner.Start(ctx)
}
if n.DelayedSequencer != nil {
n.DelayedSequencer.Start(ctx, n.configFetcher.Get().SyncTillBlock)
n.DelayedSequencer.Start(ctx)
}
if n.BatchPoster != nil {
n.BatchPoster.Start(ctx)
Expand Down Expand Up @@ -948,7 +946,7 @@ func (n *Node) Start(ctx context.Context) error {
return
}
}
n.BroadcastClients.Start(ctx, n.configFetcher.Get().SyncTillBlock)
n.BroadcastClients.Start(ctx)
}()
}
if n.configFetcher != nil {
Expand Down
9 changes: 6 additions & 3 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type SeqCoordinator struct {

prevChosenSequencer string
reportedWantsLockout bool
syncTillBlock uint64

lockoutUntil atomic.Int64 // atomic

Expand Down Expand Up @@ -150,6 +151,7 @@ func NewSeqCoordinator(
sequencer execution.ExecutionSequencer,
sync *SyncMonitor,
config SeqCoordinatorConfig,
syncTillBlock uint64,
) (*SeqCoordinator, error) {
redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl)
if err != nil {
Expand All @@ -166,6 +168,7 @@ func NewSeqCoordinator(
sequencer: sequencer,
config: config,
signer: signer,
syncTillBlock: syncTillBlock,
}
streamer.SetSeqCoordinator(coordinator)
return coordinator, nil
Expand Down Expand Up @@ -854,7 +857,7 @@ func (c *SeqCoordinator) launchHealthcheckServer(ctx context.Context) {
}
}

func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) {
func (c *SeqCoordinator) Start(ctxIn context.Context) {
c.StopWaiter.Start(ctxIn, c)
var newRedisCoordinator *redisutil.RedisCoordinator
if c.config.NewRedisUrl != "" {
Expand All @@ -872,8 +875,8 @@ func (c *SeqCoordinator) Start(ctxIn context.Context, syncTillBlock uint64) {
if err != nil {
log.Warn("failed to get message count", "err", err)
}
if syncTillBlock > 0 && uint64(count) >= syncTillBlock {
log.Info("stopping block creation in sequencer", "syncTillBlock", syncTillBlock)
if c.syncTillBlock > 0 && uint64(count) >= c.syncTillBlock {
log.Info("stopping block creation in sequencer", "syncTillBlock", c.syncTillBlock)
return
}
interval := c.chooseRedisAndUpdate(ctx, newRedisCoordinator)
Expand Down
10 changes: 7 additions & 3 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type TransactionStreamerConfig struct {
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig
Expand All @@ -82,18 +83,21 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 50_000,
MaxReorgResequenceDepth: 1024,
ExecuteMessageLoopDelay: time.Millisecond * 100,
SyncTillBlock: 0,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
MaxBroadcasterQueueSize: 10_000,
MaxReorgResequenceDepth: 128 * 1024,
ExecuteMessageLoopDelay: time.Millisecond,
SyncTillBlock: 0,
}

func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-broadcaster-queue-size", DefaultTransactionStreamerConfig.MaxBroadcasterQueueSize, "maximum cache of pending broadcaster messages")
f.Int64(prefix+".max-reorg-resequence-depth", DefaultTransactionStreamerConfig.MaxReorgResequenceDepth, "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)")
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block")
}

func NewTransactionStreamer(
Expand Down Expand Up @@ -1218,14 +1222,14 @@ func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struc
return s.config().ExecuteMessageLoopDelay
}

func (s *TransactionStreamer) Start(ctxIn context.Context, syncTillBlock uint64) error {
func (s *TransactionStreamer) Start(ctxIn context.Context) error {
s.StopWaiter.Start(ctxIn, s)
return s.LaunchThreadSafe(func(ctx context.Context) {
var defaultVal struct{}
var val struct{}
for {
if syncTillBlock > 0 && uint64(s.execLastMsgCount) >= syncTillBlock {
log.Info("stopping block creation in transaction streamer", "syncTillBlock", syncTillBlock)
if s.config().SyncTillBlock > 0 && uint64(s.execLastMsgCount) >= s.config().SyncTillBlock {
log.Info("stopping block creation in transaction streamer", "syncTillBlock", s.config().SyncTillBlock)
return
}
interval := s.executeMessages(ctx, val)
Expand Down
10 changes: 7 additions & 3 deletions broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type BroadcastClient struct {

retryCount atomic.Int64

syncTillBlock uint64

retrying bool
shuttingDown bool
confirmedSequenceNumberListener chan arbutil.MessageIndex
Expand All @@ -158,6 +160,7 @@ func NewBroadcastClient(
fatalErrChan chan error,
addrVerifier contracts.AddressVerifierInterface,
adjustCount func(int32),
syncTillBlock uint64,
) (*BroadcastClient, error) {
sigVerifier, err := signature.NewVerifier(&config().Verify, addrVerifier)
if err != nil {
Expand All @@ -173,10 +176,11 @@ func NewBroadcastClient(
fatalErrChan: fatalErrChan,
sigVerifier: sigVerifier,
adjustCount: adjustCount,
syncTillBlock: syncTillBlock,
}, err
}

func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) {
func (bc *BroadcastClient) Start(ctxIn context.Context) {
bc.StopWaiter.Start(ctxIn, bc)
if bc.StopWaiter.Stopped() {
log.Info("broadcast client has already been stopped, not starting")
Expand All @@ -186,8 +190,8 @@ func (bc *BroadcastClient) Start(ctxIn context.Context, syncTillBlock uint64) {
backoffDuration := bc.config().ReconnectInitialBackoff
for {

if syncTillBlock > 0 && uint64(bc.nextSeqNum) >= syncTillBlock {
log.Info("stopping block creation in broadcast client", "syncTillBlock", syncTillBlock)
if bc.syncTillBlock > 0 && uint64(bc.nextSeqNum) >= bc.syncTillBlock {
log.Info("stopping block creation in broadcast client", "syncTillBlock", bc.syncTillBlock)
return
}
earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum)
Expand Down
Loading

0 comments on commit 68c34bf

Please sign in to comment.