Skip to content

Commit

Permalink
Remove value_node_db batch (#2922)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Apr 8, 2024
1 parent 0ba0ace commit 5d76351
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 72 deletions.
14 changes: 9 additions & 5 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ func (db *merkleDB) commitView(ctx context.Context, trieToCommit *view) error {
return nil
}

valueNodeBatch := db.valueNodeDB.NewBatch()
valueNodeBatch := db.baseDB.NewBatch()
if err := db.applyChanges(ctx, valueNodeBatch, changes); err != nil {
return err
}
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
// and [valueNodeBatch].
//
// assumes [db.lock] is held
func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch *valueNodeBatch, changes *changeSummary) error {
func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch database.KeyValueWriterDeleter, changes *changeSummary) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.applyChanges")
defer span.End()

Expand All @@ -1023,17 +1023,21 @@ func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch *valueNodeB
}

if shouldAddValue {
valueNodeBatch.Put(key, nodeChange.after)
if err := db.valueNodeDB.Write(valueNodeBatch, key, nodeChange.after); err != nil {
return err
}
} else if shouldDeleteValue {
valueNodeBatch.Delete(key)
if err := db.valueNodeDB.Write(valueNodeBatch, key, nil); err != nil {
return err
}
}
}
return nil
}

// commitValueChanges is a thin wrapper around [valueNodeBatch.Write()] to
// provide tracing.
func (db *merkleDB) commitValueChanges(ctx context.Context, valueNodeBatch *valueNodeBatch) error {
func (db *merkleDB) commitValueChanges(ctx context.Context, valueNodeBatch database.Batch) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitValueChanges")
defer span.End()

Expand Down
36 changes: 35 additions & 1 deletion x/merkledb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package merkledb
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
"slices"
Expand Down Expand Up @@ -39,7 +40,7 @@ func newDB(ctx context.Context, db database.Database, config Config) (*merkleDB,

func newDefaultConfig() Config {
return Config{
IntermediateWriteBatchSize: 10,
IntermediateWriteBatchSize: 256 * units.KiB,
HistoryLength: defaultHistoryLength,
ValueNodeCacheSize: units.MiB,
IntermediateNodeCacheSize: units.MiB,
Expand Down Expand Up @@ -1330,3 +1331,36 @@ func TestCrashRecovery(t *testing.T) {
require.NoError(err)
require.Equal(expectedRoot, rootAfterRecovery)
}

func BenchmarkCommitView(b *testing.B) {
db, err := getBasicDB()
require.NoError(b, err)

ops := make([]database.BatchOp, 1_000)
for i := range ops {
k := binary.AppendUvarint(nil, uint64(i))
ops[i] = database.BatchOp{
Key: k,
Value: hashing.ComputeHash256(k),
}
}

ctx := context.Background()
viewIntf, err := db.NewView(ctx, ViewChanges{BatchOps: ops})
require.NoError(b, err)

view := viewIntf.(*view)
require.NoError(b, view.applyValueChanges(ctx))

b.Run("apply and commit changes", func(b *testing.B) {
require := require.New(b)

for i := 0; i < b.N; i++ {
db.baseDB = memdb.New() // Keep each iteration independent

valueNodeBatch := db.baseDB.NewBatch()
require.NoError(db.applyChanges(ctx, valueNodeBatch, view.changes))
require.NoError(db.commitValueChanges(ctx, valueNodeBatch))
}
})
}
58 changes: 12 additions & 46 deletions x/merkledb/value_node_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/ava-labs/avalanchego/utils"
)

const defaultBatchOpsLength = 256

var _ database.Iterator = (*iterator)(nil)

type valueNodeDB struct {
Expand Down Expand Up @@ -42,6 +40,18 @@ func newValueNodeDB(
}
}

func (db *valueNodeDB) Write(batch database.KeyValueWriterDeleter, key Key, n *node) error {
db.metrics.DatabaseNodeWrite()
db.nodeCache.Put(key, n)
prefixedKey := addPrefixToKey(db.bufferPool, valueNodePrefix, key.Bytes())
defer db.bufferPool.Put(prefixedKey)

if n == nil {
return batch.Delete(*prefixedKey)
}
return batch.Put(*prefixedKey, n.bytes())
}

func (db *valueNodeDB) newIteratorWithStartAndPrefix(start, prefix []byte) database.Iterator {
prefixedStart := addPrefixToKey(db.bufferPool, valueNodePrefix, start)
defer db.bufferPool.Put(prefixedStart)
Expand All @@ -59,13 +69,6 @@ func (db *valueNodeDB) Close() {
db.closed.Set(true)
}

func (db *valueNodeDB) NewBatch() *valueNodeBatch {
return &valueNodeBatch{
db: db,
ops: make(map[Key]*node, defaultBatchOpsLength),
}
}

func (db *valueNodeDB) Get(key Key) (*node, error) {
if cachedValue, isCached := db.nodeCache.Get(key); isCached {
db.metrics.ValueNodeCacheHit()
Expand Down Expand Up @@ -93,43 +96,6 @@ func (db *valueNodeDB) Clear() error {
return database.AtomicClearPrefix(db.baseDB, db.baseDB, valueNodePrefix)
}

// Batch of database operations
type valueNodeBatch struct {
db *valueNodeDB
ops map[Key]*node
}

func (b *valueNodeBatch) Put(key Key, value *node) {
b.ops[key] = value
}

func (b *valueNodeBatch) Delete(key Key) {
b.ops[key] = nil
}

// Write flushes any accumulated data to the underlying database.
func (b *valueNodeBatch) Write() error {
dbBatch := b.db.baseDB.NewBatch()
for key, n := range b.ops {
b.db.metrics.DatabaseNodeWrite()
b.db.nodeCache.Put(key, n)
prefixedKey := addPrefixToKey(b.db.bufferPool, valueNodePrefix, key.Bytes())

var err error
if n == nil {
err = dbBatch.Delete(*prefixedKey)
} else {
err = dbBatch.Put(*prefixedKey, n.bytes())
}
b.db.bufferPool.Put(prefixedKey)
if err != nil {
return err
}
}

return dbBatch.Write()
}

type iterator struct {
db *valueNodeDB
nodeIter database.Iterator
Expand Down
40 changes: 20 additions & 20 deletions x/merkledb/value_node_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestValueNodeDB(t *testing.T) {
},
key: key,
}
batch := db.NewBatch()
batch.Put(key, node1)
batch := db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, node1))
require.NoError(batch.Write())

// Get the key-node pair.
Expand All @@ -50,18 +50,18 @@ func TestValueNodeDB(t *testing.T) {
require.Equal(node1, node1Read)

// Delete the key-node pair.
batch = db.NewBatch()
batch.Delete(key)
batch = db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, nil))
require.NoError(batch.Write())

// Key should be gone now.
_, err = db.Get(key)
require.ErrorIs(err, database.ErrNotFound)

// Put a key-node pair and delete it in the same batch.
batch = db.NewBatch()
batch.Put(key, node1)
batch.Delete(key)
batch = db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, node1))
require.NoError(db.Write(batch, key, nil))
require.NoError(batch.Write())

// Key should still be gone.
Expand All @@ -75,9 +75,9 @@ func TestValueNodeDB(t *testing.T) {
},
key: key,
}
batch = db.NewBatch()
batch.Put(key, node1)
batch.Put(key, node2)
batch = db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, node1))
require.NoError(db.Write(batch, key, node2))
require.NoError(batch.Write())

// Get the key-node pair.
Expand All @@ -86,8 +86,8 @@ func TestValueNodeDB(t *testing.T) {
require.Equal(node2, node2Read)

// Overwrite the key-node pair in a subsequent batch.
batch = db.NewBatch()
batch.Put(key, node1)
batch = db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, node1))
require.NoError(batch.Write())

// Get the key-node pair.
Expand Down Expand Up @@ -130,8 +130,8 @@ func TestValueNodeDBIterator(t *testing.T) {
},
key: key,
}
batch := db.NewBatch()
batch.Put(key, node)
batch := db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, node))
require.NoError(batch.Write())
}

Expand Down Expand Up @@ -168,8 +168,8 @@ func TestValueNodeDBIterator(t *testing.T) {
},
key: key,
}
batch := db.NewBatch()
batch.Put(key, n)
batch := db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, n))
require.NoError(batch.Write())

key = ToKey([]byte{0xFF, 0x01})
Expand All @@ -179,8 +179,8 @@ func TestValueNodeDBIterator(t *testing.T) {
},
key: key,
}
batch = db.NewBatch()
batch.Put(key, n)
batch = db.baseDB.NewBatch()
require.NoError(db.Write(batch, key, n))
require.NoError(batch.Write())

// Iterate over the key-node pairs with a prefix.
Expand Down Expand Up @@ -226,9 +226,9 @@ func TestValueNodeDBClear(t *testing.T) {
cacheSize,
)

batch := db.NewBatch()
batch := db.baseDB.NewBatch()
for _, b := range [][]byte{{1}, {2}, {3}} {
batch.Put(ToKey(b), newNode(ToKey(b)))
require.NoError(db.Write(batch, ToKey(b), newNode(ToKey(b))))
}
require.NoError(batch.Write())

Expand Down

0 comments on commit 5d76351

Please sign in to comment.