Skip to content

Commit

Permalink
Merge pull request #6890 from onflow/leo/cdp-prune-block-iterator-cre…
Browse files Browse the repository at this point in the history
…ator

[Chunk Data Pack Pruner]  Add Iterator Creator
  • Loading branch information
zhangchiqing authored Jan 31, 2025
2 parents fa68cc2 + 95c3acd commit 49bb41d
Show file tree
Hide file tree
Showing 10 changed files with 1,082 additions and 182 deletions.
114 changes: 34 additions & 80 deletions module/block_iterator.go
Original file line number Diff line number Diff line change
@@ -1,114 +1,68 @@
package module

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// IterateJob defines the range of blocks to iterate over
// IteratorRange defines the range of blocks to iterate over
// the range could be either view based range or height based range.
// when specifying the range, the start and end are inclusive, and the end must be greater than or
// equal to the start
type IterateJob struct {
type IteratorRange struct {
Start uint64 // the start of the range
End uint64 // the end of the range
}

// IterateJobCreator is an interface for creating iterate jobs
type IteratorJobCreator interface {
// CreateJob takes a progress reader which is used to read the progress of the iterator
// and returns an iterate job that specifies the range of blocks to iterate over
CreateJob(IterateProgressReader) (IterateJob, error)
// IteratorState is an interface for reading and writing the progress of the iterator
type IteratorState interface {
IteratorStateReader
IteratorStateWriter
}

// IterateProgressReader reads the progress of the iterator, useful for resuming the iteration
// IteratorStateReader reads the progress of the iterator, useful for resuming the iteration
// after restart
type IterateProgressReader interface {
// ReadNext reads the next block to iterate
// caller must ensure the reader is created by the IterateProgressInitializer,
// otherwise ReadNext would return exception.
ReadNext() (uint64, error)
}

// IterateProgressWriter saves the progress of the iterator
type IterateProgressWriter interface {
// SaveNext persists the next block to be iterated
SaveNext(uint64) error
type IteratorStateReader interface {
// LoadState reads the next block to iterate
// caller must ensure the state is initialized, otherwise LoadState would return exception.
LoadState() (progress uint64, exception error)
}

// IterateProgressInitializer is an interface for initializing the progress of the iterator
// a initializer must be used to ensures the initial next block to be iterated is saved in
// storage before creating the block iterator
type IterateProgressInitializer interface {
Init() (IterateProgressReader, IterateProgressWriter, error)
// IteratorStateWriter saves the progress of the iterator
type IteratorStateWriter interface {
// SaveState persists the next block to be iterated
SaveState(uint64) (exception error)
}

// BlockIterator is an interface for iterating over blocks
type BlockIterator interface {
// Next returns the next block in the iterator
// Note: this method is not concurrent-safe
// Note: a block will only be iterated once in a single iteration, however
// if the iteration is interrupted (e.g. by a restart), the iterator can be
// resumed from the last checkpoint, which might result in the same block being
// iterated again.
// TODO: once upgraded to go 1.23, consider using the Range iterator
// Range() iter.Seq2[flow.Identifier, error]
// so that the iterator can be used in a for loop:
// for blockID, err := range heightIterator.Range()
Next() (blockID flow.Identifier, hasNext bool, exception error)

// Checkpoint saves the current state of the iterator
// so that it can be resumed later
// when Checkpoint is called, if SaveNextFunc is called with block A,
// Checkpoint saves the current state of the iterator so that it can be resumed later
// when Checkpoint is called, if SaveStateFunc is called with block A,
// then after restart, the iterator will resume from A.
Checkpoint() error
// make sure to call this after all the blocks for processing the block IDs returned by
// Next() are completed.
// It returns the saved index (next index to iterate)
// any error returned are exceptions
Checkpoint() (savedIndex uint64, exception error)
}

// IteratorCreator is an interface for creating block iterators
// IteratorCreator creates block iterators.
// a block iterator iterates through a saved index to the latest block.
// after iterating through all the blocks in the range, the iterator can be discarded.
// a new block iterator can be created to iterate through the next range.
// if there is no block to iterate, hasNext is false
// any error returned are exception
type IteratorCreator interface {
// CreateIterator takes iterate job which specifies the range of blocks to iterate over
// and a progress writer which is used to save the progress of the iterator,
// and returns a block iterator that can be used to iterate over the blocks
// Note: it's up to the implementation to decide how often the progress is saved,
// it is wise to consider the trade-off between the performance and the progress saving,
// if the progress is saved too often, it might impact the iteration performance, however,
// if the progress is only saved at the end of the iteration, then if the iteration
// was interrupted, then the iterator will start from the beginning of the range again,
// which means some blocks might be iterated multiple times.
CreateIterator(IterateJob, IterateProgressWriter) (BlockIterator, error)
}

type IteratorFactory struct {
progressReader IterateProgressReader
progressWriter IterateProgressWriter
creator IteratorCreator
jobCreator IteratorJobCreator
}

func NewIteratorFactory(
initializer IterateProgressInitializer,
creator IteratorCreator,
jobCreator IteratorJobCreator,
) (*IteratorFactory, error) {
progressReader, progressWriter, err := initializer.Init()
if err != nil {
return nil, fmt.Errorf("failed to initialize progress: %w", err)
}

return &IteratorFactory{
progressReader: progressReader,
progressWriter: progressWriter,
creator: creator,
jobCreator: jobCreator,
}, nil
}

func (f *IteratorFactory) Create() (BlockIterator, error) {
job, err := f.jobCreator.CreateJob(f.progressReader)
if err != nil {
return nil, fmt.Errorf("failed to create job for block iteration: %w", err)
}

iterator, err := f.creator.CreateIterator(job, f.progressWriter)
if err != nil {
return nil, fmt.Errorf("failed to create block iterator: %w", err)
}

return iterator, nil
Create() (fromSavedIndexToLatest BlockIterator, hasNext bool, exception error)
}
117 changes: 117 additions & 0 deletions module/block_iterator/creator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package block_iterator

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/storage"
)

// Creator creates block iterators.
// a block iterator iterates through a saved index to the latest block.
// after iterating through all the blocks in the range, the iterator can be discarded.
// a new block iterator can be created to iterate through the next range.
type Creator struct {
getBlockIDByIndex func(uint64) (flow.Identifier, bool, error)
progress *PersistentIteratorState
}

var _ module.IteratorCreator = (*Creator)(nil)

// NewCreator creates a block iterator that iterates through blocks by index.
// the root is the block index to start iterating from. (it could either root height or root view)
// the latest is a function that returns the latest block index.
// since latest is a function, the caller can reuse the creator to create block iterator one
// after another to iterate from the root to the latest, and from last iterated to the new latest.
func NewCreator(
getBlockIDByIndex func(uint64) (blockID flow.Identifier, indexed bool, exception error),
progressStorage storage.ConsumerProgress,
root uint64,
latest func() (uint64, error),
) (*Creator, error) {
// initialize the progress in storage, saving the root block index in storage
progress, err := NewPersistentIteratorState(progressStorage, root, latest)
if err != nil {
return nil, fmt.Errorf("failed to initialize progress: %w", err)
}

return &Creator{
getBlockIDByIndex: getBlockIDByIndex,
progress: progress,
}, nil
}

func (c *Creator) Create() (iter module.BlockIterator, hasNext bool, exception error) {
// create a iteration range from the first un-iterated to the latest block
iterRange, hasNext, err := c.progress.NextRange()
if err != nil {
return nil, false, fmt.Errorf("failed to create range for block iteration: %w", err)
}

if !hasNext {
// no block to iterate
return nil, false, nil
}

// create a block iterator with
// the function to get block ID by index,
// the progress writer to update the progress in storage,
// and the iteration range
return NewIndexedBlockIterator(c.getBlockIDByIndex, c.progress, iterRange), true, nil
}

// NewHeightBasedCreator creates a block iterator that iterates through blocks
// from root to the latest (either finalized or sealed) by height.
func NewHeightBasedCreator(
getBlockIDByHeight func(height uint64) (flow.Identifier, error),
progress storage.ConsumerProgress,
root *flow.Header,
latest func() (*flow.Header, error),
) (*Creator, error) {

return NewCreator(
func(height uint64) (flow.Identifier, bool, error) {
blockID, err := getBlockIDByHeight(height)
if err != nil {
return flow.Identifier{}, false, fmt.Errorf("failed to get block ID by height: %w", err)
}
// each height between root and latest (either finalized or sealed) must be indexed.
// so it's always true
alwaysIndexed := true
return blockID, alwaysIndexed, nil
},
progress,
root.Height,
func() (uint64, error) {
latestBlock, err := latest()
if err != nil {
return 0, fmt.Errorf("failed to get latest block: %w", err)
}
return latestBlock.Height, nil
},
)
}

// NewViewBasedCreator creates a block iterator that iterates through blocks
// from root to the latest (either finalized or sealed) by view.
// since view has gaps, the iterator will skip views that have no blocks.
func NewViewBasedCreator(
getBlockIDByView func(view uint64) (blockID flow.Identifier, viewIndexed bool, exception error),
progress storage.ConsumerProgress,
root *flow.Header,
latest func() (*flow.Header, error),
) (*Creator, error) {
return NewCreator(
getBlockIDByView,
progress,
root.View,
func() (uint64, error) {
latestBlock, err := latest()
if err != nil {
return 0, fmt.Errorf("failed to get latest block: %w", err)
}
return latestBlock.View, nil
},
)
}
Loading

0 comments on commit 49bb41d

Please sign in to comment.