Skip to content

Commit

Permalink
[filedao] get recent blocks from staging buffer (#4488)
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie authored Nov 15, 2024
1 parent 8e7d62f commit e3f51ac
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 34 deletions.
2 changes: 1 addition & 1 deletion blockchain/blockdao/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestBlobStore(t *testing.T) {
for i := 0; i < cfg.BlockStoreBatchSize+7; i++ {
blk, err := dao.GetBlockByHeight(1 + uint64(i))
r.NoError(err)
if i < cfg.BlockStoreBatchSize {
if i < 7 {
// blocks written to disk has sidecar removed
r.False(blk.HasBlob())
r.Equal(4, len(blk.Actions))
Expand Down
2 changes: 1 addition & 1 deletion blockchain/filedao/filedao_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestBlockWithSidecar(t *testing.T) {
for i := 0; i < cfg.BlockStoreBatchSize+2; i++ {
blk, err := fd.GetBlockByHeight(start + uint64(i))
r.NoError(err)
if i < cfg.BlockStoreBatchSize {
if i < 2 {
// blocks written to disk has sidecar removed
r.False(blk.HasBlob())
r.Equal(4, len(blk.Actions))
Expand Down
42 changes: 19 additions & 23 deletions blockchain/filedao/filedao_v2_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.deser)
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.header.Start)
blockStoreTip := fd.highestBlockOfStoreTip()
for i := uint64(0); i < fd.header.BlockStoreSize; i++ {
v, err := fd.kvStore.Get(_headerDataNs, byteutil.Uint64ToBytesBigEndian(i))
Expand All @@ -42,7 +42,7 @@ func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) {
// populate to staging buffer, if the block is in latest round
height := info.Block.Height()
if height > blockStoreTip {
if _, err = buffer.Put(stagingKey(height, fd.header), info); err != nil {
if _, err = buffer.Put(height, info); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -87,12 +87,12 @@ func (fd *fileDAOv2) putBlock(blk *block.Block) error {
}

// add to staging buffer
index := stagingKey(blk.Height(), fd.header)
full, err := fd.blkBuffer.Put(index, blkInfo)
full, err := fd.blkBuffer.Put(blk.Height(), blkInfo)
if err != nil {
return err
}
if !full {
index := fd.blkBuffer.slot(blk.Height())
fd.batch.Put(_headerDataNs, byteutil.Uint64ToBytesBigEndian(index), blkBytes, "failed to put block")
return nil
}
Expand Down Expand Up @@ -151,11 +151,6 @@ func blockStoreKey(height uint64, header *FileHeader) uint64 {
return (height - header.Start) / header.BlockStoreSize
}

// stagingKey is the position of block in the staging buffer
func stagingKey(height uint64, header *FileHeader) uint64 {
return (height - header.Start) % header.BlockStoreSize
}

// lowestBlockOfStoreTip is the lowest height of the tip of block storage
// used in DeleteTipBlock(), once new tip height drops below this, the tip of block storage can be deleted
func (fd *fileDAOv2) lowestBlockOfStoreTip() uint64 {
Expand All @@ -178,12 +173,7 @@ func (fd *fileDAOv2) getBlock(height uint64) (*block.Block, error) {
return nil, db.ErrNotExist
}
// check whether block in staging buffer or not
storeKey := blockStoreKey(height, fd.header)
if storeKey >= fd.blkStore.Size() {
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header))
if err != nil {
return nil, err
}
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil {
return blkStore.Block, nil
}
// read from storage DB
Expand All @@ -199,12 +189,7 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) {
return nil, db.ErrNotExist
}
// check whether block in staging buffer or not
storeKey := blockStoreKey(height, fd.header)
if storeKey >= fd.blkStore.Size() {
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header))
if err != nil {
return nil, err
}
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil {
return blkStore.Receipts, nil
}
// read from storage DB
Expand All @@ -215,12 +200,23 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) {
return fd.deser.ReceiptsFromBlockStoreProto(blockStore)
}

func (fd *fileDAOv2) getFromStagingBuffer(height uint64) *block.Store {
if fd.loadTip().Height-height >= fd.header.BlockStoreSize {
return nil
}
blkStore := fd.blkBuffer.Get(height)
if blkStore == nil || blkStore.Block.Height() != height {
return nil
}
return blkStore
}

func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error) {
// check whether blockStore in read cache or not
storeKey := blockStoreKey(height, fd.header)
if value, ok := fd.blkStorePbCache.Get(storeKey); ok {
pbInfos := value.(*iotextypes.BlockStores)
return pbInfos.BlockStores[stagingKey(height, fd.header)], nil
return pbInfos.BlockStores[fd.blkBuffer.slot(height)], nil
}
// read from storage DB
value, err := fd.blkStore.Get(storeKey)
Expand All @@ -240,5 +236,5 @@ func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error
}
// add to read cache
fd.blkStorePbCache.Add(storeKey, pbStores)
return pbStores.BlockStores[stagingKey(height, fd.header)], nil
return pbStores.BlockStores[fd.blkBuffer.slot(height)], nil
}
32 changes: 23 additions & 9 deletions blockchain/filedao/staging_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package filedao

import (
"sync"

"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-proto/golang/iotextypes"
Expand All @@ -15,41 +17,53 @@ import (

type (
stagingBuffer struct {
lock sync.RWMutex
size uint64
start uint64
buffer []*block.Store
deser *block.Deserializer
}
)

func newStagingBuffer(size uint64, deser *block.Deserializer) *stagingBuffer {
func newStagingBuffer(size, start uint64) *stagingBuffer {
return &stagingBuffer{
size: size,
start: start,
buffer: make([]*block.Store, size),
deser: deser,
}
}

func (s *stagingBuffer) Get(pos uint64) (*block.Store, error) {
if pos >= s.size {
return nil, ErrNotSupported
func (s *stagingBuffer) Get(height uint64) *block.Store {
if height < s.start {
return nil
}
return s.buffer[pos], nil
s.lock.RLock()
defer s.lock.RUnlock()
return s.buffer[s.slot(height)]
}

func (s *stagingBuffer) Put(pos uint64, blk *block.Store) (bool, error) {
if pos >= s.size {
func (s *stagingBuffer) Put(height uint64, blk *block.Store) (bool, error) {
if height < s.start {
return false, ErrNotSupported
}
pos := s.slot(height)
s.lock.Lock()
defer s.lock.Unlock()
s.buffer[pos] = blk
return pos == s.size-1, nil
}

func (s *stagingBuffer) slot(height uint64) uint64 {
return (height - s.start) % s.size
}

func (s *stagingBuffer) Serialize() ([]byte, error) {
blkStores := []*iotextypes.BlockStore{}
// blob sidecar data are stored separately
s.lock.RLock()
for _, v := range s.buffer {
blkStores = append(blkStores, v.ToProtoWithoutSidecar())
}
s.lock.RUnlock()
allBlks := &iotextypes.BlockStores{
BlockStores: blkStores,
}
Expand Down

0 comments on commit e3f51ac

Please sign in to comment.