Skip to content

Commit

Permalink
core, triedb/pathdb: integrate state snapshot inth pathdb
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Dec 23, 2024
1 parent 341647f commit 985a84e
Show file tree
Hide file tree
Showing 26 changed files with 2,577 additions and 207 deletions.
63 changes: 39 additions & 24 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,13 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
StateHistory: c.StateHistory,
TrieCleanSize: c.TrieCleanLimit * 1024 * 1024,
StateCleanSize: c.SnapshotLimit * 1024 * 1024,

// TODO(rjl493456442): The write buffer represents the memory limit used
// for flushing both trie data and state data to disk. The config name
// should be updated to eliminate the confusion.
WriteBufferSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
Expand Down Expand Up @@ -349,11 +354,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
// Head state is missing, before the state recovery, find out the disk
// layer point of snapshot(if it's enabled). Make sure the rewound point
// is lower than disk layer.
//
// Note it's unnecessary in path mode which always keep trie data and
// state data consistent.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
if bc.cacheConfig.SnapshotLimit > 0 && bc.cacheConfig.StateScheme == rawdb.HashScheme {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
Expand Down Expand Up @@ -426,15 +434,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
bc.setupSnapshot()

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
if bc.cacheConfig.StateScheme == rawdb.PathScheme {
return
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
Expand All @@ -451,23 +483,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Re-initialize the state database with snapshot
bc.statedb = state.NewDatabase(bc.triedb, bc.snaps)
}

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

// empty returns an indicator whether the blockchain is empty.
Expand Down
32 changes: 21 additions & 11 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
)
defer engine.Close()
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
config.SnapshotLimit = 256
config.SnapshotWait = true
}
Expand Down Expand Up @@ -1820,7 +1820,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil {
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down Expand Up @@ -1952,8 +1952,10 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
if scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}

// Insert block B3 and commit the state into disk
Expand Down Expand Up @@ -1997,15 +1999,21 @@ func testIssue23496(t *testing.T, scheme string) {
}
expHead := uint64(1)
if scheme == rawdb.PathScheme {
expHead = uint64(2)
expHead = uint64(3)
}
if head := chain.CurrentBlock(); head.Number.Uint64() != expHead {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, expHead)
}

// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
if scheme == rawdb.PathScheme {
// Reinsert B3-B4
if _, err := chain.InsertChain(blocks[2:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
} else {
// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
}
if head := chain.CurrentHeader(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, 4)
Expand All @@ -2016,7 +2024,9 @@ func testIssue23496(t *testing.T, scheme string) {
if head := chain.CurrentBlock(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, uint64(4))
}
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
if scheme == rawdb.HashScheme {
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
}
}
}
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
}
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down
16 changes: 10 additions & 6 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme {
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
Expand Down Expand Up @@ -149,13 +149,17 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
} else if basic.scheme == rawdb.HashScheme {
if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
}

// Check the snapshot, ensure it's integrated
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
if basic.scheme == rawdb.HashScheme {
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
}

Expand Down Expand Up @@ -570,7 +574,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)
if scheme == rawdb.PathScheme {
expHead = uint64(4)
expHead = uint64(6)
}
test := &crashSnapshotTest{
snapshotTestBasic{
Expand Down
5 changes: 3 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
readers = append(readers, newFlatReader(snap))
}
} else {
// If standalone state snapshot is not available, try to construct
// the state reader with database.
// If standalone state snapshot is not available (path scheme
// or the state snapshot is explicitly disabled in hash mode),
// try to construct the state reader with database.
reader, err := db.triedb.StateReader(stateRoot)
if err == nil {
readers = append(readers, newFlatReader(reader)) // state reader is optional
Expand Down
4 changes: 3 additions & 1 deletion core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ func newHelper(scheme string) *testHelper {
diskdb := rawdb.NewMemoryDatabase()
config := &triedb.Config{}
if scheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
config.PathDB = &pathdb.Config{
SnapshotNoBuild: true,
} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}
Expand Down
3 changes: 2 additions & 1 deletion core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,8 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
)
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
TrieCleanSize: 0,
StateCleanSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {
Expand Down
3 changes: 2 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -175,7 +176,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
}
// If snap sync is requested but snapshots are disabled, fail loudly
if h.snapSync.Load() && config.Chain.Snapshots() == nil {
if h.snapSync.Load() && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
return nil, errors.New("snap sync not supported with snapshots disabled")
}
// Construct the downloader (long sync)
Expand Down
44 changes: 38 additions & 6 deletions eth/protocols/snap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb/database"
)

const (
Expand Down Expand Up @@ -279,7 +282,16 @@ func ServiceGetAccountRangeQuery(chain *core.BlockChain, req *GetAccountRangePac
if err != nil {
return nil, nil
}
it, err := chain.Snapshots().AccountIterator(req.Root, req.Origin)
// Temporary solution: using the snapshot interface for both cases.
// This can be removed once the hash scheme is deprecated.
var it snapshot.AccountIterator
if chain.TrieDB().Scheme() == rawdb.HashScheme {
// The snapshot is assumed to be available in hash mode if
// the SNAP protocol is enabled.
it, err = chain.Snapshots().AccountIterator(req.Root, req.Origin)
} else {
it, err = chain.TrieDB().AccountIterator(req.Root, req.Origin)
}
if err != nil {
return nil, nil
}
Expand Down Expand Up @@ -359,7 +371,19 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
limit, req.Limit = common.BytesToHash(req.Limit), nil
}
// Retrieve the requested state and bail out if non existent
it, err := chain.Snapshots().StorageIterator(req.Root, account, origin)
var (
err error
it snapshot.StorageIterator
)
// Temporary solution: using the snapshot interface for both cases.
// This can be removed once the hash scheme is deprecated.
if chain.TrieDB().Scheme() == rawdb.HashScheme {
// The snapshot is assumed to be available in hash mode if
// the SNAP protocol is enabled.
it, err = chain.Snapshots().StorageIterator(req.Root, account, origin)
} else {
it, err = chain.TrieDB().StorageIterator(req.Root, account, origin)
}
if err != nil {
return nil, nil
}
Expand Down Expand Up @@ -479,8 +503,15 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
// We don't have the requested state available, bail out
return nil, nil
}
// The 'snap' might be nil, in which case we cannot serve storage slots.
snap := chain.Snapshots().Snapshot(req.Root)
// The 'reader' might be nil, in which case we cannot serve storage slots
// via snapshot.
var reader database.StateReader
if chain.Snapshots() != nil {
reader = chain.Snapshots().Snapshot(req.Root)
}
if reader == nil {
reader, _ = triedb.StateReader(req.Root)
}
// Retrieve trie nodes until the packet size limit is reached
var (
nodes [][]byte
Expand All @@ -505,8 +536,9 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s

default:
var stRoot common.Hash

// Storage slots requested, open the storage trie and retrieve from there
if snap == nil {
if reader == nil {
// We don't have the requested state snapshotted yet (or it is stale),
// but can look up the account via the trie instead.
account, err := accTrie.GetAccountByHash(common.BytesToHash(pathset[0]))
Expand All @@ -516,7 +548,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
}
stRoot = account.Root
} else {
account, err := snap.Account(common.BytesToHash(pathset[0]))
account, err := reader.Account(common.BytesToHash(pathset[0]))
loads++ // always account database reads, even for failures
if err != nil || account == nil {
break
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/snap/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1962,5 +1962,5 @@ func newDbConfig(scheme string) *triedb.Config {
if scheme == rawdb.HashScheme {
return &triedb.Config{}
}
return &triedb.Config{PathDB: pathdb.Defaults}
return &triedb.Config{PathDB: &pathdb.Config{SnapshotNoBuild: true}}
}
6 changes: 4 additions & 2 deletions tests/block_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t
}
// Cross-check the snapshot-to-hash against the trie hash
if snapshotter {
if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil {
return err
if chain.Snapshots() != nil {
if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil {
return err
}
}
}
return t.validateImportedHeaders(chain, validBlocks)
Expand Down
Loading

0 comments on commit 985a84e

Please sign in to comment.