Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
Close #3574

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 27, 2025
1 parent 30c2f9b commit b8b92ce
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 51 deletions.
17 changes: 17 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ func (s *FakeStateSync) BlockHeight() uint32 {
return 0
}

// HeaderHeight implements the StateSync interface.
func (s *FakeStateSync) HeaderHeight() uint32 {
return 0
}

// IsActive implements the StateSync interface.
func (s *FakeStateSync) IsActive() bool { return s.IsActiveFlag.Load() }

Expand All @@ -447,6 +452,8 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
// NeedHeaders implements the StateSync interface.
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }

func (s *FakeStateSync) NeedBlocks() bool { return false }

// NeedMPTNodes implements the StateSync interface.
func (s *FakeStateSync) NeedMPTNodes() bool {
panic("TODO")
Expand All @@ -464,3 +471,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}

// SetOnStageChanged implements the StateSync interface.
func (s *FakeStateSync) SetOnStageChanged(func()) {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
return Blockchain{
ProtocolConfiguration: c.ProtocolConfiguration,
Ledger: c.ApplicationConfiguration.Ledger,
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Ledger struct {
type Blockchain struct {
ProtocolConfiguration
Ledger
NeoFSBlockFetcher
}
2 changes: 2 additions & 0 deletions pkg/config/protocol_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
// NeoFSStateSyncExtensions enables additional MPT state data exchange logic via NeoFS.
NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"`
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
ReservedAttributes bool `yaml:"ReservedAttributes"`

Expand Down
10 changes: 10 additions & 0 deletions pkg/core/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func (b *Header) GetIndex() uint32 {
return b.Index
}

// GetExpectedHeaderSize returns the expected header size with empty witness.
func (b *Header) GetExpectedHeaderSize() int {
size := expectedHeaderSizeWithEmptyWitness - 1 - 1 + // 1 is for the zero-length (new(Header)).Script.Invocation/Verification
io.GetVarSize(&b.Script)
if b.StateRootEnabled {
size += util.Uint256Size
}
return size
}

// Hash returns the hash of the block. Notice that it is cached internally,
// so no matter how you change the [Header] after the first invocation of this
// method it won't change. To get an updated hash in case you're changing
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
if !cfg.StateRootInHeader && !cfg.NeoFSStateSyncExtensions {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader or NeoFSStateSyncExtensions are off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
Expand Down
67 changes: 63 additions & 4 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type Module struct {
billet *mpt.Billet

jumpCallback func(p uint32) error

// stageChangedCallback is an optional callback that is triggered whenever
// the sync stage changes.
stageChangedCallback func()
}

// NewModule returns new instance of statesync module.
Expand Down Expand Up @@ -120,9 +124,14 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si
// Init initializes state sync module for the current chain's height with given
// callback for MPT nodes requests.
func (s *Module) Init(currChainHeight uint32) error {
oldStage := s.syncStage
s.lock.Lock()
defer s.lock.Unlock()

defer func() {
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
}()
if s.syncStage != none {
return errors.New("already initialized or inactive")
}
Expand Down Expand Up @@ -176,6 +185,20 @@ func (s *Module) Init(currChainHeight uint32) error {
return s.defineSyncStage()
}

// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
func (s *Module) SetOnStageChanged(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()
s.stageChangedCallback = cb
}

// notifyStageChanged triggers stage callback if it's set.
func (s *Module) notifyStageChanged() {
if s.stageChangedCallback != nil {
s.stageChangedCallback()
}
}

// TemporaryPrefix accepts current storage prefix and returns prefix
// to use for storing intermediate items during synchronization.
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
Expand All @@ -192,6 +215,12 @@ func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
// defineSyncStage sequentially checks and sets sync state process stage after Module
// initialization. It also performs initialization of MPT Billet if necessary.
func (s *Module) defineSyncStage() error {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
}()
// check headers sync stage first
ltstHeaderHeight := s.bc.HeaderHeight()
if ltstHeaderHeight > s.syncPoint {
Expand Down Expand Up @@ -306,6 +335,7 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error {

// AddBlock verifies and saves block skipping executable scripts.
func (s *Module) AddBlock(block *block.Block) error {
oldStage := s.syncStage
s.lock.Lock()
defer s.lock.Unlock()

Expand Down Expand Up @@ -351,6 +381,9 @@ func (s *Module) AddBlock(block *block.Block) error {
s.syncStage |= blocksSynced
s.log.Info("blocks are in sync",
zap.Uint32("blockHeight", s.blockHeight))
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
s.checkSyncIsCompleted()
}
return nil
Expand All @@ -361,6 +394,7 @@ func (s *Module) AddBlock(block *block.Block) error {
func (s *Module) AddMPTNodes(nodes [][]byte) error {
s.lock.Lock()
defer s.lock.Unlock()
oldStage := s.syncStage

if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
return errors.New("MPT nodes were not requested")
Expand All @@ -382,6 +416,9 @@ func (s *Module) AddMPTNodes(nodes [][]byte) error {
s.syncStage |= mptSynced
s.log.Info("MPT is in sync",
zap.Uint32("height", s.syncPoint))
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
s.checkSyncIsCompleted()
}
return nil
Expand Down Expand Up @@ -425,6 +462,12 @@ func (s *Module) restoreNode(n mpt.Node) error {
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
// should take care of it.
func (s *Module) checkSyncIsCompleted() {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
}()
if s.syncStage != headersSynced|mptSynced|blocksSynced {
return
}
Expand All @@ -450,6 +493,14 @@ func (s *Module) BlockHeight() uint32 {
return s.blockHeight
}

// HeaderHeight returns index of the last stored header.
func (s *Module) HeaderHeight() uint32 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.bc.HeaderHeight()
}

// IsActive tells whether state sync module is on and still gathering state
// synchronisation data (headers, blocks or MPT nodes).
func (s *Module) IsActive() bool {
Expand Down Expand Up @@ -484,6 +535,14 @@ func (s *Module) NeedMPTNodes() bool {
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
}

// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
func (s *Module) NeedBlocks() bool {
s.lock.RLock()
defer s.lock.RUnlock()

return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0
}

// Traverse traverses local MPT nodes starting from the specified root down to its
// children calling `process` for each serialised node until stop condition is satisfied.
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
Expand All @@ -509,7 +568,7 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
return s.mptpool.GetBatch(limit)
}

// HeaderHeight returns the height of the latest header.
func (s *Module) HeaderHeight() uint32 {
return s.bc.HeaderHeight()
// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()
}
2 changes: 2 additions & 0 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"

"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"go.uber.org/zap"
)
Expand All @@ -15,6 +16,7 @@ type Blockqueuer interface {
AddHeaders(...*block.Header) error
BlockHeight() uint32
HeaderHeight() uint32
GetConfig() config.Blockchain
}

// OperationMode is the mode of operation for the block queue.
Expand Down
Loading

0 comments on commit b8b92ce

Please sign in to comment.