Skip to content

Commit

Permalink
merkledb -- Add Clearer interface (#2277)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Laine authored Nov 13, 2023
1 parent b8746de commit baf0ef7
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 49 deletions.
103 changes: 54 additions & 49 deletions proto/pb/sync/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions proto/pb/sync/sync_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/sync/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ message Request {
service DB {
rpc GetMerkleRoot(google.protobuf.Empty) returns (GetMerkleRootResponse);

rpc Clear(google.protobuf.Empty) returns (google.protobuf.Empty);

rpc GetProof(GetProofRequest) returns (GetProofResponse);

rpc GetChangeProof(GetChangeProofRequest) returns (GetChangeProofResponse);
Expand Down
37 changes: 37 additions & 0 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// TODO: name better
rebuildViewSizeFractionOfCacheSize = 50
minRebuildViewSizePerCommit = 1000
clearBatchSize = units.MiB
rebuildIntermediateDeletionWriteSize = units.MiB
valueNodePrefixLen = 1
)
Expand Down Expand Up @@ -113,6 +114,12 @@ type RangeProofer interface {
CommitRangeProof(ctx context.Context, start, end maybe.Maybe[[]byte], proof *RangeProof) error
}

type Clearer interface {
// Deletes all key/value pairs from the database
// and clears the change history.
Clear() error
}

type Prefetcher interface {
// PrefetchPath attempts to load all trie nodes on the path of [key]
// into the cache.
Expand All @@ -128,6 +135,7 @@ type Prefetcher interface {

type MerkleDB interface {
database.Database
Clearer
Trie
MerkleRootGetter
ProofGetter
Expand Down Expand Up @@ -1264,6 +1272,35 @@ func (db *merkleDB) getNode(key Key, hasValue bool) (*node, error) {
return db.intermediateNodeDB.Get(key)
}

func (db *merkleDB) Clear() error {
db.commitLock.Lock()
defer db.commitLock.Unlock()

db.lock.Lock()
defer db.lock.Unlock()

// Clear nodes from disk and caches
if err := db.valueNodeDB.Clear(); err != nil {
return err
}
if err := db.intermediateNodeDB.Clear(); err != nil {
return err
}

// Clear root
db.sentinelNode = newNode(Key{})
db.sentinelNode.calculateID(db.metrics)

// Clear history
db.history = newTrieHistory(db.history.maxHistoryLen)
db.history.record(&changeSummary{
rootID: db.getMerkleRoot(),
values: map[Key]*change[maybe.Maybe[[]byte]]{},
nodes: map[Key]*change[*node]{},
})
return nil
}

// Returns [key] prefixed by [prefix].
// The returned []byte is taken from [bufferPool] and
// should be returned to it when the caller is done with it.
Expand Down
45 changes: 45 additions & 0 deletions x/merkledb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

const defaultHistoryLength = 300

var emptyKey Key

// newDB returns a new merkle database with the underlying type so that tests can access unexported fields
func newDB(ctx context.Context, db database.Database, config Config) (*merkleDB, error) {
db, err := New(ctx, db, config)
Expand Down Expand Up @@ -773,6 +775,49 @@ func Test_MerkleDB_Random_Insert_Ordering(t *testing.T) {
}
}

func TestMerkleDBClear(t *testing.T) {
require := require.New(t)

// Make a database and insert some key-value pairs.
db, err := getBasicDB()
require.NoError(err)

emptyRootID := db.getMerkleRoot()

now := time.Now().UnixNano()
t.Logf("seed: %d", now)
r := rand.New(rand.NewSource(now)) // #nosec G404

insertRandomKeyValues(
require,
r,
[]database.Database{db},
1_000,
0.25,
)

// Clear the database.
require.NoError(db.Clear())

// Assert that the database is empty.
iter := db.NewIterator()
defer iter.Release()
require.False(iter.Next())
require.Equal(emptyRootID, db.getMerkleRoot())
require.Equal(emptyKey, db.sentinelNode.key)

// Assert caches are empty.
require.Zero(db.valueNodeDB.nodeCache.Len())
require.Zero(db.intermediateNodeDB.nodeCache.currentSize)

// Assert history has only the clearing change.
require.Len(db.history.lastChanges, 1)
change, ok := db.history.lastChanges[emptyRootID]
require.True(ok)
require.Empty(change.nodes)
require.Empty(change.values)
}

func FuzzMerkleDBEmptyRandomizedActions(f *testing.F) {
f.Fuzz(
func(
Expand Down
11 changes: 11 additions & 0 deletions x/merkledb/intermediate_node_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,14 @@ func (db *intermediateNodeDB) Flush() error {
func (db *intermediateNodeDB) Delete(key Key) error {
return db.nodeCache.Put(key, nil)
}

func (db *intermediateNodeDB) Clear() error {
// Reset the cache. Note we don't flush because that would cause us to
// persist intermediate nodes we're about to delete.
db.nodeCache = newOnEvictCache(
db.nodeCache.maxSize,
db.nodeCache.size,
db.nodeCache.onEviction,
)
return database.AtomicClearPrefix(db.baseDB, db.baseDB, intermediateNodePrefix)
}
29 changes: 29 additions & 0 deletions x/merkledb/intermediate_node_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,32 @@ func Test_IntermediateNodeDB_ConstructDBKey_DirtyBuffer(t *testing.T) {
require.Equal(intermediateNodePrefix, constructedKey[:len(intermediateNodePrefix)])
require.Equal(p.Extend(ToToken(1, 4)).Bytes(), constructedKey[len(intermediateNodePrefix):])
}

func TestIntermediateNodeDBClear(t *testing.T) {
require := require.New(t)
cacheSize := 200
evictionBatchSize := cacheSize
baseDB := memdb.New()
db := newIntermediateNodeDB(
baseDB,
&sync.Pool{
New: func() interface{} { return make([]byte, 0) },
},
&mockMetrics{},
cacheSize,
evictionBatchSize,
4,
)

for _, b := range [][]byte{{1}, {2}, {3}} {
require.NoError(db.Put(ToKey(b), newNode(ToKey(b))))
}

require.NoError(db.Clear())

iter := baseDB.NewIteratorWithPrefix(intermediateNodePrefix)
defer iter.Release()
require.False(iter.Next())

require.Zero(db.nodeCache.currentSize)
}
Loading

0 comments on commit baf0ef7

Please sign in to comment.