Skip to content

Commit

Permalink
handle the case where there is no block to iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jan 31, 2025
1 parent 6eb1941 commit 95c3acd
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 20 deletions.
7 changes: 5 additions & 2 deletions module/block_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ type BlockIterator interface {
// then after restart, the iterator will resume from A.
// make sure to call this after all the blocks for processing the block IDs returned by
// Next() are completed.
// It returns the saved index (next index to iterate), and error returned are exceptions
// It returns the saved index (next index to iterate)
// any error returned are exceptions
Checkpoint() (savedIndex uint64, exception error)
}

// IteratorCreator creates block iterators.
// a block iterator iterates through a saved index to the latest block.
// after iterating through all the blocks in the range, the iterator can be discarded.
// a new block iterator can be created to iterate through the next range.
// if there is no block to iterate, hasNext is false
// any error returned are exception
type IteratorCreator interface {
Create() (fromSavedIndexToLatest BlockIterator, exception error)
Create() (fromSavedIndexToLatest BlockIterator, hasNext bool, exception error)
}
13 changes: 9 additions & 4 deletions module/block_iterator/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,23 @@ func NewCreator(
}, nil
}

func (c *Creator) Create() (module.BlockIterator, error) {
func (c *Creator) Create() (iter module.BlockIterator, hasNext bool, exception error) {
// create a iteration range from the first un-iterated to the latest block
iterRange, err := c.progress.NextRange()
iterRange, hasNext, err := c.progress.NextRange()
if err != nil {
return nil, fmt.Errorf("failed to create range for block iteration: %w", err)
return nil, false, fmt.Errorf("failed to create range for block iteration: %w", err)
}

if !hasNext {
// no block to iterate
return nil, false, nil
}

// create a block iterator with
// the function to get block ID by index,
// the progress writer to update the progress in storage,
// and the iteration range
return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), nil
return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), true, nil
}

// NewHeightBasedCreator creates a block iterator that iterates through blocks
Expand Down
52 changes: 47 additions & 5 deletions module/block_iterator/creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
// TestCanResume: stop at a height, and take checkpoint, create a new iterator,
// verify it will resume from the next height to the latest
// TestCanSkipViewsIfNotIndexed: iterate through all views, and skip views that are not indexed
// TestCanSkipIfThereIsNoBlockToIterate: skip iterationg if there is no block to iterate

func TestCanIterate(t *testing.T) {
root := &flow.Header{Height: 0}
Expand Down Expand Up @@ -55,8 +56,9 @@ func TestCanIterate(t *testing.T) {
)
require.NoError(t, err)

iterator, err := creator.Create()
iterator, hasNext, err := creator.Create()
require.NoError(t, err)
require.True(t, hasNext)

// Iterate through blocks
visitedBlocks := make([]flow.Identifier, 0, len(blocks))
Expand Down Expand Up @@ -99,8 +101,9 @@ func TestCanIterate(t *testing.T) {
blocks = append(blocks, additionalBlocks...)

// Create another iterator
iterator, err = creator.Create()
iterator, hasNext, err = creator.Create()
require.NoError(t, err)
require.True(t, hasNext)

// Iterate through initial blocks
for i := 0; i < len(additionalBlocks); i++ {
Expand Down Expand Up @@ -171,8 +174,9 @@ func TestCanResume(t *testing.T) {
)
require.NoError(t, err)

iterator, err := creator.Create()
iterator, hasNext, err := creator.Create()
require.NoError(t, err)
require.True(t, hasNext)

// Iterate through blocks
visitedBlocks := make([]flow.Identifier, 0, len(blocks))
Expand Down Expand Up @@ -209,8 +213,9 @@ func TestCanResume(t *testing.T) {
)
require.NoError(t, err)

newIterator, err := newCreator.Create()
newIterator, hasNext, err := newCreator.Create()
require.NoError(t, err)
require.True(t, hasNext)

// iterate until the end
for {
Expand Down Expand Up @@ -271,8 +276,9 @@ func TestCanSkipViewsIfNotIndexed(t *testing.T) {
)
require.NoError(t, err)

iterator, err := creator.Create()
iterator, hasNext, err := creator.Create()
require.NoError(t, err)
require.True(t, hasNext)

// Iterate through blocks
visitedBlocks := make(map[flow.Identifier]struct{})
Expand Down Expand Up @@ -304,6 +310,42 @@ func TestCanSkipViewsIfNotIndexed(t *testing.T) {
require.Equal(t, uint64(8), savedView, "Expected next view to be 8 (last View + 1)")
}

func TestCanSkipIfThereIsNoBlockToIterate(t *testing.T) {
// Set up root block
root := &flow.Header{Height: 10}

// Mock getBlockIDByHeight function
getBlockIDByHeight := func(height uint64) (flow.Identifier, error) {
return flow.Identifier{}, fmt.Errorf("block not found at height %d", height)
}

// Mock progress tracker
progress := &mockProgress{}

// Mock latest function that returns the same height as root
latest := func() (*flow.Header, error) {
return root, nil
}

// Create iterator
creator, err := NewHeightBasedCreator(
getBlockIDByHeight,
progress,
root,
latest,
)
require.NoError(t, err)

// Create the iterator
_, hasNext, err := creator.Create()
require.NoError(t, err)
require.False(t, hasNext, "Expected no blocks to iterate")

savedHeight, err := progress.ProcessedIndex()
require.NoError(t, err)
require.Equal(t, root.Height+1, savedHeight, "Expected saved height to be root height + 1")
}

type mockProgress struct {
index uint64
initialized bool
Expand Down
18 changes: 13 additions & 5 deletions module/block_iterator/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewPersistentIteratorState(store storage.ConsumerProgress, root uint64, lat
}

func (n *PersistentIteratorState) LoadState() (uint64, error) {
// TODO: adding cache
return n.store.ProcessedIndex()
}

Expand All @@ -41,24 +42,31 @@ func (n *PersistentIteratorState) SaveState(next uint64) error {
}

// NextRange returns the next range of blocks to iterate over
func (n *PersistentIteratorState) NextRange() (module.IteratorRange, error) {
// the range is inclusive, and the end is the latest block
// if there is no block to iterate, hasNext is false
func (n *PersistentIteratorState) NextRange() (rg module.IteratorRange, hasNext bool, exception error) {
next, err := n.LoadState()
if err != nil {
return module.IteratorRange{}, fmt.Errorf("failed to read next height: %w", err)
return module.IteratorRange{}, false, fmt.Errorf("failed to read next height: %w", err)
}

latest, err := n.latest()
if err != nil {
return module.IteratorRange{}, fmt.Errorf("failed to get latest block: %w", err)
return module.IteratorRange{}, false, fmt.Errorf("failed to get latest block: %w", err)
}

// if the next is the next of the latest, then there is no block to iterate
if latest+1 == next {
return module.IteratorRange{}, false, nil
}

if latest < next {
return module.IteratorRange{}, fmt.Errorf("latest block is less than next block: %d < %d", latest, next)
return module.IteratorRange{}, false, fmt.Errorf("latest block is less than next block: %d < %d", latest, next)
}

// iterate from next to latest (inclusive)
return module.IteratorRange{
Start: next,
End: latest,
}, nil
}, true, nil
}
23 changes: 19 additions & 4 deletions module/block_iterator/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,47 @@ func TestProgress(t *testing.T) {
progress, err := NewPersistentIteratorState(store, root, getLatest)
require.NoError(t, err)

// initial state should be the next of root
// 1. verify initial state should be the next of root
next, err := progress.LoadState()
require.NoError(t, err)
require.Equal(t, root+1, next)

rg, err := progress.NextRange()
rg, hasNext, err := progress.NextRange()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, root+1, rg.Start)
require.Equal(t, latest, rg.End)

// save the state
err = progress.SaveState(latest + 1)
require.NoError(t, err)

// 2. verify the saved state
next, err = progress.LoadState()
require.NoError(t, err)
require.Equal(t, latest+1, next)

// update latest
// 3. verify when latest is updated to a higher height
// the end height of the next range should be updated
oldLatest := latest
latest = latest + 20
rg, err = progress.NextRange()
rg, hasNext, err = progress.NextRange()
require.NoError(t, err)
require.True(t, hasNext)

// verify the new range
require.Equal(t, oldLatest+1, rg.Start)
require.Equal(t, latest, rg.End)

// 4. verify when state is up to date, and latest
// does not change, the next range should include no block
err = progress.SaveState(latest + 1)
require.NoError(t, err)

// verify that NextRange will return an error indicating that
// there is no block to iterate
rg, hasNext, err = progress.NextRange()
require.NoError(t, err)
require.False(t, hasNext)
})
}

0 comments on commit 95c3acd

Please sign in to comment.