Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filedao] get recent blocks from staging buffer #4488

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion blockchain/blockdao/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestBlobStore(t *testing.T) {
for i := 0; i < cfg.BlockStoreBatchSize+7; i++ {
blk, err := dao.GetBlockByHeight(1 + uint64(i))
r.NoError(err)
if i < cfg.BlockStoreBatchSize {
if i < 7 {
// blocks written to disk has sidecar removed
r.False(blk.HasBlob())
r.Equal(4, len(blk.Actions))
Expand Down
2 changes: 1 addition & 1 deletion blockchain/filedao/filedao_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestBlockWithSidecar(t *testing.T) {
for i := 0; i < cfg.BlockStoreBatchSize+2; i++ {
blk, err := fd.GetBlockByHeight(start + uint64(i))
r.NoError(err)
if i < cfg.BlockStoreBatchSize {
if i < 2 {
// blocks written to disk has sidecar removed
r.False(blk.HasBlob())
r.Equal(4, len(blk.Actions))
Expand Down
42 changes: 19 additions & 23 deletions blockchain/filedao/filedao_v2_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.deser)
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.header.Start)
blockStoreTip := fd.highestBlockOfStoreTip()
for i := uint64(0); i < fd.header.BlockStoreSize; i++ {
v, err := fd.kvStore.Get(_headerDataNs, byteutil.Uint64ToBytesBigEndian(i))
Expand All @@ -42,7 +42,7 @@ func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
// populate to staging buffer, if the block is in latest round
height := info.Block.Height()
if height > blockStoreTip {
if _, err = buffer.Put(stagingKey(height, fd.header), info); err != nil {
if _, err = buffer.Put(height, info); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -87,12 +87,12 @@ func (fd *fileDAOv2) putBlock(blk *block.Block) error {
}

// add to staging buffer
index := stagingKey(blk.Height(), fd.header)
full, err := fd.blkBuffer.Put(index, blkInfo)
full, err := fd.blkBuffer.Put(blk.Height(), blkInfo)
if err != nil {
return err
}
if !full {
index := fd.blkBuffer.slot(blk.Height())
fd.batch.Put(_headerDataNs, byteutil.Uint64ToBytesBigEndian(index), blkBytes, "failed to put block")
return nil
}
Expand Down Expand Up @@ -151,11 +151,6 @@ func blockStoreKey(height uint64, header *FileHeader) uint64 {
return (height - header.Start) / header.BlockStoreSize
}

// stagingKey is the position of block in the staging buffer
func stagingKey(height uint64, header *FileHeader) uint64 {
return (height - header.Start) % header.BlockStoreSize
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced by stagingBuffer.slot()


// lowestBlockOfStoreTip is the lowest height of the tip of block storage
// used in DeleteTipBlock(), once new tip height drops below this, the tip of block storage can be deleted
func (fd *fileDAOv2) lowestBlockOfStoreTip() uint64 {
Expand All @@ -178,12 +173,7 @@ func (fd *fileDAOv2) getBlock(height uint64) (*block.Block, error) {
return nil, db.ErrNotExist
}
// check whether block in staging buffer or not
storeKey := blockStoreKey(height, fd.header)
if storeKey >= fd.blkStore.Size() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

root-cause of the bug:
the batch has not been written to DB yet, but blkStore.Size() already increased 1. This caused code go to L190 attempting to read the block from DB, which is not yet written to disk yet.

blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header))
if err != nil {
return nil, err
}
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil {
return blkStore.Block, nil
}
// read from storage DB
Expand All @@ -199,12 +189,7 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) {
return nil, db.ErrNotExist
}
// check whether block in staging buffer or not
storeKey := blockStoreKey(height, fd.header)
if storeKey >= fd.blkStore.Size() {
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header))
if err != nil {
return nil, err
}
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil {
return blkStore.Receipts, nil
}
// read from storage DB
Expand All @@ -215,12 +200,23 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) {
return fd.deser.ReceiptsFromBlockStoreProto(blockStore)
}

func (fd *fileDAOv2) getFromStagingBuffer(height uint64) *block.Store {
if fd.loadTip().Height-height >= fd.header.BlockStoreSize {
return nil
}
blkStore := fd.blkBuffer.Get(height)
if blkStore == nil || blkStore.Block.Height() != height {
return nil
}
return blkStore
}

func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error) {
// check whether blockStore in read cache or not
storeKey := blockStoreKey(height, fd.header)
if value, ok := fd.blkStorePbCache.Get(storeKey); ok {
pbInfos := value.(*iotextypes.BlockStores)
return pbInfos.BlockStores[stagingKey(height, fd.header)], nil
return pbInfos.BlockStores[fd.blkBuffer.slot(height)], nil
}
// read from storage DB
value, err := fd.blkStore.Get(storeKey)
Expand All @@ -240,5 +236,5 @@ func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error
}
// add to read cache
fd.blkStorePbCache.Add(storeKey, pbStores)
return pbStores.BlockStores[stagingKey(height, fd.header)], nil
return pbStores.BlockStores[fd.blkBuffer.slot(height)], nil
}
32 changes: 23 additions & 9 deletions blockchain/filedao/staging_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package filedao

import (
"sync"

"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-proto/golang/iotextypes"
Expand All @@ -15,41 +17,53 @@ import (

type (
stagingBuffer struct {
lock sync.RWMutex
size uint64
start uint64
buffer []*block.Store
deser *block.Deserializer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

}
)

func newStagingBuffer(size uint64, deser *block.Deserializer) *stagingBuffer {
func newStagingBuffer(size, start uint64) *stagingBuffer {
return &stagingBuffer{
size: size,
start: start,
buffer: make([]*block.Store, size),
deser: deser,
}
}

func (s *stagingBuffer) Get(pos uint64) (*block.Store, error) {
if pos >= s.size {
return nil, ErrNotSupported
func (s *stagingBuffer) Get(height uint64) *block.Store {
if height < s.start {
return nil
}
return s.buffer[pos], nil
s.lock.RLock()
defer s.lock.RUnlock()
return s.buffer[s.slot(height)]
}

func (s *stagingBuffer) Put(pos uint64, blk *block.Store) (bool, error) {
if pos >= s.size {
func (s *stagingBuffer) Put(height uint64, blk *block.Store) (bool, error) {
if height < s.start {
return false, ErrNotSupported
}
pos := s.slot(height)
s.lock.Lock()
defer s.lock.Unlock()
s.buffer[pos] = blk
return pos == s.size-1, nil
}

func (s *stagingBuffer) slot(height uint64) uint64 {
return (height - s.start) % s.size
}

func (s *stagingBuffer) Serialize() ([]byte, error) {
blkStores := []*iotextypes.BlockStore{}
// blob sidecar data are stored separately
s.lock.RLock()
for _, v := range s.buffer {
blkStores = append(blkStores, v.ToProtoWithoutSidecar())
}
s.lock.RUnlock()
allBlks := &iotextypes.BlockStores{
BlockStores: blkStores,
}
Expand Down
Loading