-
Notifications
You must be signed in to change notification settings - Fork 324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[filedao] get recent blocks from staging buffer #4488
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ import ( | |
) | ||
|
||
func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) { | ||
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.deser) | ||
buffer := newStagingBuffer(fd.header.BlockStoreSize, fd.header.Start) | ||
blockStoreTip := fd.highestBlockOfStoreTip() | ||
for i := uint64(0); i < fd.header.BlockStoreSize; i++ { | ||
v, err := fd.kvStore.Get(_headerDataNs, byteutil.Uint64ToBytesBigEndian(i)) | ||
|
@@ -42,7 +42,7 @@ func (fd *fileDAOv2) populateStagingBuffer() (*stagingBuffer, error) { | |
// populate to staging buffer, if the block is in latest round | ||
height := info.Block.Height() | ||
if height > blockStoreTip { | ||
if _, err = buffer.Put(stagingKey(height, fd.header), info); err != nil { | ||
if _, err = buffer.Put(height, info); err != nil { | ||
return nil, err | ||
} | ||
} else { | ||
|
@@ -87,12 +87,12 @@ func (fd *fileDAOv2) putBlock(blk *block.Block) error { | |
} | ||
|
||
// add to staging buffer | ||
index := stagingKey(blk.Height(), fd.header) | ||
full, err := fd.blkBuffer.Put(index, blkInfo) | ||
full, err := fd.blkBuffer.Put(blk.Height(), blkInfo) | ||
if err != nil { | ||
return err | ||
} | ||
if !full { | ||
index := fd.blkBuffer.slot(blk.Height()) | ||
fd.batch.Put(_headerDataNs, byteutil.Uint64ToBytesBigEndian(index), blkBytes, "failed to put block") | ||
return nil | ||
} | ||
|
@@ -151,11 +151,6 @@ func blockStoreKey(height uint64, header *FileHeader) uint64 { | |
return (height - header.Start) / header.BlockStoreSize | ||
} | ||
|
||
// stagingKey is the position of block in the staging buffer | ||
func stagingKey(height uint64, header *FileHeader) uint64 { | ||
return (height - header.Start) % header.BlockStoreSize | ||
} | ||
|
||
// lowestBlockOfStoreTip is the lowest height of the tip of block storage | ||
// used in DeleteTipBlock(), once new tip height drops below this, the tip of block storage can be deleted | ||
func (fd *fileDAOv2) lowestBlockOfStoreTip() uint64 { | ||
|
@@ -178,12 +173,7 @@ func (fd *fileDAOv2) getBlock(height uint64) (*block.Block, error) { | |
return nil, db.ErrNotExist | ||
} | ||
// check whether block in staging buffer or not | ||
storeKey := blockStoreKey(height, fd.header) | ||
if storeKey >= fd.blkStore.Size() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. root-cause of the bug: |
||
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil { | ||
return blkStore.Block, nil | ||
} | ||
// read from storage DB | ||
|
@@ -199,12 +189,7 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) { | |
return nil, db.ErrNotExist | ||
} | ||
// check whether block in staging buffer or not | ||
storeKey := blockStoreKey(height, fd.header) | ||
if storeKey >= fd.blkStore.Size() { | ||
blkStore, err := fd.blkBuffer.Get(stagingKey(height, fd.header)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if blkStore := fd.getFromStagingBuffer(height); blkStore != nil { | ||
return blkStore.Receipts, nil | ||
} | ||
// read from storage DB | ||
|
@@ -215,12 +200,23 @@ func (fd *fileDAOv2) getReceipt(height uint64) ([]*action.Receipt, error) { | |
return fd.deser.ReceiptsFromBlockStoreProto(blockStore) | ||
} | ||
|
||
func (fd *fileDAOv2) getFromStagingBuffer(height uint64) *block.Store { | ||
if fd.loadTip().Height-height >= fd.header.BlockStoreSize { | ||
return nil | ||
} | ||
blkStore := fd.blkBuffer.Get(height) | ||
if blkStore == nil || blkStore.Block.Height() != height { | ||
return nil | ||
} | ||
return blkStore | ||
} | ||
|
||
func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error) { | ||
// check whether blockStore in read cache or not | ||
storeKey := blockStoreKey(height, fd.header) | ||
if value, ok := fd.blkStorePbCache.Get(storeKey); ok { | ||
pbInfos := value.(*iotextypes.BlockStores) | ||
return pbInfos.BlockStores[stagingKey(height, fd.header)], nil | ||
return pbInfos.BlockStores[fd.blkBuffer.slot(height)], nil | ||
} | ||
// read from storage DB | ||
value, err := fd.blkStore.Get(storeKey) | ||
|
@@ -240,5 +236,5 @@ func (fd *fileDAOv2) getBlockStore(height uint64) (*iotextypes.BlockStore, error | |
} | ||
// add to read cache | ||
fd.blkStorePbCache.Add(storeKey, pbStores) | ||
return pbStores.BlockStores[stagingKey(height, fd.header)], nil | ||
return pbStores.BlockStores[fd.blkBuffer.slot(height)], nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,8 @@ | |
package filedao | ||
|
||
import ( | ||
"sync" | ||
|
||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/iotexproject/iotex-proto/golang/iotextypes" | ||
|
@@ -15,41 +17,53 @@ import ( | |
|
||
type ( | ||
stagingBuffer struct { | ||
lock sync.RWMutex | ||
size uint64 | ||
start uint64 | ||
buffer []*block.Store | ||
deser *block.Deserializer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not needed |
||
} | ||
) | ||
|
||
func newStagingBuffer(size uint64, deser *block.Deserializer) *stagingBuffer { | ||
func newStagingBuffer(size, start uint64) *stagingBuffer { | ||
return &stagingBuffer{ | ||
size: size, | ||
start: start, | ||
buffer: make([]*block.Store, size), | ||
deser: deser, | ||
} | ||
} | ||
|
||
func (s *stagingBuffer) Get(pos uint64) (*block.Store, error) { | ||
if pos >= s.size { | ||
return nil, ErrNotSupported | ||
func (s *stagingBuffer) Get(height uint64) *block.Store { | ||
if height < s.start { | ||
return nil | ||
} | ||
return s.buffer[pos], nil | ||
s.lock.RLock() | ||
defer s.lock.RUnlock() | ||
return s.buffer[s.slot(height)] | ||
} | ||
|
||
func (s *stagingBuffer) Put(pos uint64, blk *block.Store) (bool, error) { | ||
if pos >= s.size { | ||
func (s *stagingBuffer) Put(height uint64, blk *block.Store) (bool, error) { | ||
if height < s.start { | ||
return false, ErrNotSupported | ||
} | ||
pos := s.slot(height) | ||
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
s.buffer[pos] = blk | ||
return pos == s.size-1, nil | ||
} | ||
|
||
func (s *stagingBuffer) slot(height uint64) uint64 { | ||
return (height - s.start) % s.size | ||
} | ||
|
||
func (s *stagingBuffer) Serialize() ([]byte, error) { | ||
blkStores := []*iotextypes.BlockStore{} | ||
// blob sidecar data are stored separately | ||
s.lock.RLock() | ||
for _, v := range s.buffer { | ||
blkStores = append(blkStores, v.ToProtoWithoutSidecar()) | ||
} | ||
s.lock.RUnlock() | ||
allBlks := &iotextypes.BlockStores{ | ||
BlockStores: blkStores, | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced by
stagingBuffer.slot()