Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chunk Data Pack Pruner] Add block view index #6933

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
ResourceProposal = "proposal"
ResourceHeader = "header"
ResourceFinalizedHeight = "finalized_height"
ResourceCertifiedView = "certified_view"
ResourceIndex = "index"
ResourceIdentity = "identity"
ResourceGuarantee = "guarantee"
Expand Down
13 changes: 13 additions & 0 deletions state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ func (m *FollowerState) headerExtend(ctx context.Context, candidate *flow.Block,
return fmt.Errorf("could not store incorporated qc: %w", err)
}
} else {
// parent is a block that has been received and certified by a QC.
err := transaction.WithTx(operation.SkipDuplicates(operation.IndexCertifiedBlockByView(parent.View, qc.BlockID)))(tx)
Copy link
Member Author

@zhangchiqing zhangchiqing Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexHentschel I was using the db operation directly, for 2 reasons:

  • I wanted to add a IndexCertifiedBlockByView method to headers, and simply call m.headers.IndexCertifiedBlockByView(parent) here. But the storage.Headers interface is mostly for read methods, we didn't expose methods like IndexFinalizedBlockByView because we didn't want this method being misused.
  • For the same reason I also didn't add IndexCertifiedBlockByView method to Blocks, and also, the parent block body doesn't exist in this context, passing a block to blocks.IndexCertifiedBlockByView would require us to make additional query to the parent block body.

if err != nil {
return fmt.Errorf("could not index certified block: %w", err)
}

// trigger BlockProcessable for parent block above root height
if parent.Height > m.finalizedRootHeight {
tx.OnSucceed(func() {
Expand All @@ -389,6 +395,13 @@ func (m *FollowerState) headerExtend(ctx context.Context, candidate *flow.Block,
if err != nil {
return fmt.Errorf("could not store certifying qc: %w", err)
}

// candidate is a block that has been received and certified by a QC
err := transaction.WithTx(operation.SkipDuplicates(operation.IndexCertifiedBlockByView(candidate.Header.View, blockID)))(tx)
if err != nil {
return fmt.Errorf("could not index certified block: %w", err)
}

tx.OnSucceed(func() { // queue a BlockProcessable event for candidate block, since it is certified
m.consumer.BlockProcessable(candidate.Header, certifyingQC)
})
Expand Down
11 changes: 10 additions & 1 deletion state/protocol/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func TestExtendValid(t *testing.T) {
consumer.On("BlockProcessable", block1.Header, mock.Anything).Once()
err := fullState.Extend(context.Background(), block2)
require.NoError(t, err)

// verify that block1's view is indexed
var indexedID flow.Identifier
require.NoError(t, db.View(operation.LookupCertifiedBlockByView(block1.Header.View, &indexedID)))
require.Equal(t, block1.ID(), indexedID)

// verify that block2's view is not indexed
err = db.View(operation.LookupCertifiedBlockByView(block2.Header.View, &indexedID))
require.ErrorIs(t, err, stoerr.ErrNotFound)
})
})
}
Expand Down Expand Up @@ -1518,7 +1527,7 @@ func TestExtendEpochCommitInvalid(t *testing.T) {
// swap consensus node for a new one for epoch 2
epoch2NewParticipant := unittest.IdentityFixture(unittest.WithRole(flow.RoleConsensus))
epoch2Participants := append(
participants.Filter(filter.Not[flow.Identity](filter.HasRole[flow.Identity](flow.RoleConsensus))),
participants.Filter(filter.Not(filter.HasRole[flow.Identity](flow.RoleConsensus))),
epoch2NewParticipant,
).Sort(flow.Canonical[flow.Identity]).ToSkeleton()

Expand Down
11 changes: 11 additions & 0 deletions storage/badger/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ func (b *Blocks) ByHeight(height uint64) (*flow.Block, error) {
return b.retrieveTx(blockID)(tx)
}

func (b *Blocks) ByView(view uint64) (*flow.Block, error) {
header, err := b.headers.ByView(view)
if err != nil {
return nil, err
}

tx := b.db.NewTransaction(false)
defer tx.Discard()
return b.retrieveTx(header.ID())(tx)
}

// ByCollectionID ...
func (b *Blocks) ByCollectionID(collID flow.Identifier) (*flow.Block, error) {
var blockID flow.Identifier
Expand Down
42 changes: 42 additions & 0 deletions storage/badger/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Headers struct {
db *badger.DB
cache *Cache[flow.Identifier, *flow.Header]
heightCache *Cache[uint64, flow.Identifier]
viewCache *Cache[uint64, flow.Identifier]
}

func NewHeaders(collector module.CacheMetrics, db *badger.DB) *Headers {
Expand Down Expand Up @@ -48,6 +49,14 @@ func NewHeaders(collector module.CacheMetrics, db *badger.DB) *Headers {
}
}

retrieveView := func(view uint64) func(tx *badger.Txn) (flow.Identifier, error) {
return func(tx *badger.Txn) (flow.Identifier, error) {
var id flow.Identifier
err := operation.LookupCertifiedBlockByView(view, &id)(tx)
return id, err
}
}

h := &Headers{
db: db,
cache: newCache(collector, metrics.ResourceHeader,
Expand All @@ -59,6 +68,10 @@ func NewHeaders(collector module.CacheMetrics, db *badger.DB) *Headers {
withLimit[uint64, flow.Identifier](4*flow.DefaultTransactionExpiry),
withStore(storeHeight),
withRetrieve(retrieveHeight)),

viewCache: newCache(collector, metrics.ResourceCertifiedView,
withLimit[uint64, flow.Identifier](4*flow.DefaultTransactionExpiry),
withRetrieve(retrieveView)),
Copy link
Member Author

@zhangchiqing zhangchiqing Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is only filled by retrieval op.

}

return h
Expand Down Expand Up @@ -110,6 +123,19 @@ func (h *Headers) ByHeight(height uint64) (*flow.Header, error) {
return h.retrieveTx(blockID)(tx)
}

// ByView returns block header for the given view. It is only available for certified blocks.
// Note: this method is not available until next spork or a migration that builds the index.
func (h *Headers) ByView(view uint64) (*flow.Header, error) {
tx := h.db.NewTransaction(false)
defer tx.Discard()

blockID, err := h.viewCache.Get(view)(tx)
if err != nil {
return nil, err
}
return h.retrieveTx(blockID)(tx)
}

// Exists returns true if a header with the given ID has been stored.
// No errors are expected during normal operation.
func (h *Headers) Exists(blockID flow.Identifier) (bool, error) {
Expand Down Expand Up @@ -140,6 +166,22 @@ func (h *Headers) BlockIDByHeight(height uint64) (flow.Identifier, error) {
return blockID, nil
}

// BlockIDByView returns the block ID that is certified at the given view. It is an optimized
// version of `ByView` that skips retrieving the block. Expected errors during normal operations:
// - `storage.ErrNotFound` if no certified block is known at given view.
//
// NOTE: this method is not available until next spork (mainnet27) or a migration that builds the index.
func (h *Headers) BlockIDByView(view uint64) (flow.Identifier, error) {
tx := h.db.NewTransaction(false)
defer tx.Discard()

blockID, err := h.viewCache.Get(view)(tx)
if err != nil {
return flow.ZeroID, fmt.Errorf("could not lookup block id by view %d: %w", view, err)
}
return blockID, nil
}

func (h *Headers) ByParentID(parentID flow.Identifier) ([]*flow.Header, error) {
var blockIDs flow.IdentifierList
err := h.db.View(procedure.LookupBlockChildren(parentID, &blockIDs))
Expand Down
31 changes: 31 additions & 0 deletions storage/badger/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,34 @@ func TestHeaderRetrieveWithoutStore(t *testing.T) {
require.True(t, errors.Is(err, storage.ErrNotFound))
})
}

func TestHeaderGetByView(t *testing.T) {

unittest.RunWithBadgerDB(t, func(db *badger.DB) {
metrics := metrics.NewNoopCollector()
headers := badgerstorage.NewHeaders(metrics, db)

block := unittest.BlockFixture()

// store header
err := headers.Store(block.Header)
require.NoError(t, err)

// verify storing the block doesn't not index the view automatically
_, err = headers.BlockIDByView(block.Header.View)
require.ErrorIs(t, err, storage.ErrNotFound)

// index block by view
require.NoError(t, db.Update(operation.IndexCertifiedBlockByView(block.Header.View, block.ID())))

// verify that the block ID can be retrieved by view
indexedID, err := headers.BlockIDByView(block.Header.View)
require.NoError(t, err)
require.Equal(t, block.ID(), indexedID)

// verify that the block header can be retrieved by view
actual, err := headers.ByView(block.Header.View)
require.NoError(t, err)
require.Equal(t, block.Header, actual)
})
}
14 changes: 14 additions & 0 deletions storage/badger/operation/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,25 @@ func IndexBlockHeight(height uint64, blockID flow.Identifier) func(*badger.Txn)
return insert(makePrefix(codeHeightToBlock, height), blockID)
}

// IndexCertifiedBlockByView indexes the view of a block.
// HotStuff guarantees that there is at most one certified block. Caution: this does not hold for
// uncertified proposals, as byzantine actors might produce multiple proposals for the same block.
// Hence, only certified blocks (i.e. blocks that have received a QC) can be indexed!
func IndexCertifiedBlockByView(view uint64, blockID flow.Identifier) func(*badger.Txn) error {
return insert(makePrefix(codeCertifiedBlockByView, view), blockID)
}

// LookupBlockHeight retrieves finalized blocks by height.
func LookupBlockHeight(height uint64, blockID *flow.Identifier) func(*badger.Txn) error {
return retrieve(makePrefix(codeHeightToBlock, height), blockID)
}

// LookupCertifiedBlockByView retrieves the certified block by view. (certified blocks are blocks that have received QC)
// Returns `storage.ErrNotFound` if no certified block for the specified view is known.
func LookupCertifiedBlockByView(view uint64, blockID *flow.Identifier) func(*badger.Txn) error {
return retrieve(makePrefix(codeCertifiedBlockByView, view), blockID)
}

// BlockExists checks whether the block exists in the database.
// No errors are expected during normal operation.
func BlockExists(blockID flow.Identifier, blockExists *bool) func(*badger.Txn) error {
Expand Down
1 change: 1 addition & 0 deletions storage/badger/operation/prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID
codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID
codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID
codeCertifiedBlockByView = 48 // index mapping view to ID of certified block (guaranteed by HotStuff that there is at most one per view)

// codes for indexing multiple identifiers by identifier
codeBlockChildren = 50 // index mapping block ID to children blocks
Expand Down
2 changes: 1 addition & 1 deletion storage/badger/qcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewQuorumCertificates(collector module.CacheMetrics, db *badger.DB, cacheSi

return &QuorumCertificates{
db: db,
cache: newCache[flow.Identifier, *flow.QuorumCertificate](collector, metrics.ResourceQC,
cache: newCache(collector, metrics.ResourceQC,
withLimit[flow.Identifier, *flow.QuorumCertificate](cacheSize),
withStore(store),
withRetrieve(retrieve)),
Expand Down
6 changes: 6 additions & 0 deletions storage/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type Blocks interface {
// for finalized blocks.
ByHeight(height uint64) (*flow.Block, error)

// ByView returns the block with the given view. It is only available for certified blocks.
// certified blocks are the blocks that have received QC.
//
// TODO: this method is not available until next spork (mainnet27) or a migration that builds the index.
// ByView(view uint64) (*flow.Header, error)

// ByCollectionID returns the block for the given collection ID.
ByCollectionID(collID flow.Identifier) (*flow.Block, error)

Expand Down
8 changes: 8 additions & 0 deletions storage/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ type Headers interface {
// ByHeight returns the block with the given number. It is only available for finalized blocks.
ByHeight(height uint64) (*flow.Header, error)

// ByView returns the block with the given view. It is only available for certified blocks.
// Certified blocks are the blocks that have received QC. Hotstuff guarantees that for each view,
// at most one block is certified. Hence, the return value of `ByView` is guaranteed to be unique
// oven for non-finalized blocks.
//
// TODO: this method is not available until next spork (mainnet27) or a migration that builds the index.
// ByView(view uint64) (*flow.Header, error)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will uncomment in next spork.


// Exists returns true if a header with the given ID has been stored.
// No errors are expected during normal operation.
Exists(blockID flow.Identifier) (bool, error)
Expand Down
Loading