diff --git a/cache/lru_cache.go b/cache/lru_cache.go index f35804ac448d..bab5e9549a58 100644 --- a/cache/lru_cache.go +++ b/cache/lru_cache.go @@ -92,7 +92,9 @@ func (c *LRU[K, _]) evict(key K) { } func (c *LRU[K, V]) flush() { - c.elements = linked.NewHashmap[K, V]() + if c.elements != nil { + c.elements.Clear() + } } func (c *LRU[_, _]) len() int { diff --git a/cache/lru_sized_cache.go b/cache/lru_sized_cache.go index 592674cb222b..e8c8b0c76e7b 100644 --- a/cache/lru_sized_cache.go +++ b/cache/lru_sized_cache.go @@ -113,7 +113,7 @@ func (c *sizedLRU[K, _]) evict(key K) { } func (c *sizedLRU[K, V]) flush() { - c.elements = linked.NewHashmap[K, V]() + c.elements.Clear() c.currentSize = 0 } diff --git a/database/prefixdb/db.go b/database/prefixdb/db.go index 8698453683ff..0e203653acc1 100644 --- a/database/prefixdb/db.go +++ b/database/prefixdb/db.go @@ -9,13 +9,10 @@ import ( "sync" "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/hashing" ) -const ( - defaultBufCap = 256 -) - var ( _ database.Database = (*Database)(nil) _ database.Batch = (*batch)(nil) @@ -26,9 +23,8 @@ var ( // a unique value. type Database struct { // All keys in this db begin with this byte slice - dbPrefix []byte - // Holds unused []byte - bufferPool sync.Pool + dbPrefix []byte + bufferPool *utils.BytesPool // lock needs to be held during Close to guarantee db will not be set to nil // concurrently with another operation. All other operations can hold RLock. @@ -40,13 +36,9 @@ type Database struct { func newDB(prefix []byte, db database.Database) *Database { return &Database{ - dbPrefix: prefix, - db: db, - bufferPool: sync.Pool{ - New: func() interface{} { - return make([]byte, 0, defaultBufCap) - }, - }, + dbPrefix: prefix, + db: db, + bufferPool: utils.NewBytesPool(), } } @@ -91,9 +83,6 @@ func PrefixKey(prefix, key []byte) []byte { return prefixedKey } -// Assumes that it is OK for the argument to db.db.Has -// to be modified after db.db.Has returns -// [key] may be modified after this method returns. func (db *Database) Has(key []byte) (bool, error) { db.lock.RLock() defer db.lock.RUnlock() @@ -102,14 +91,11 @@ func (db *Database) Has(key []byte) (bool, error) { return false, database.ErrClosed } prefixedKey := db.prefix(key) - has, err := db.db.Has(prefixedKey) - db.bufferPool.Put(prefixedKey) - return has, err + defer db.bufferPool.Put(prefixedKey) + + return db.db.Has(*prefixedKey) } -// Assumes that it is OK for the argument to db.db.Get -// to be modified after db.db.Get returns. -// [key] may be modified after this method returns. func (db *Database) Get(key []byte) ([]byte, error) { db.lock.RLock() defer db.lock.RUnlock() @@ -118,15 +104,11 @@ func (db *Database) Get(key []byte) ([]byte, error) { return nil, database.ErrClosed } prefixedKey := db.prefix(key) - val, err := db.db.Get(prefixedKey) - db.bufferPool.Put(prefixedKey) - return val, err + defer db.bufferPool.Put(prefixedKey) + + return db.db.Get(*prefixedKey) } -// Assumes that it is OK for the argument to db.db.Put -// to be modified after db.db.Put returns. -// [key] can be modified after this method returns. -// [value] should not be modified. func (db *Database) Put(key, value []byte) error { db.lock.RLock() defer db.lock.RUnlock() @@ -135,14 +117,11 @@ func (db *Database) Put(key, value []byte) error { return database.ErrClosed } prefixedKey := db.prefix(key) - err := db.db.Put(prefixedKey, value) - db.bufferPool.Put(prefixedKey) - return err + defer db.bufferPool.Put(prefixedKey) + + return db.db.Put(*prefixedKey, value) } -// Assumes that it is OK for the argument to db.db.Delete -// to be modified after db.db.Delete returns. -// [key] may be modified after this method returns. func (db *Database) Delete(key []byte) error { db.lock.RLock() defer db.lock.RUnlock() @@ -151,9 +130,9 @@ func (db *Database) Delete(key []byte) error { return database.ErrClosed } prefixedKey := db.prefix(key) - err := db.db.Delete(prefixedKey) - db.bufferPool.Put(prefixedKey) - return err + defer db.bufferPool.Put(prefixedKey) + + return db.db.Delete(*prefixedKey) } func (db *Database) NewBatch() database.Batch { @@ -186,15 +165,17 @@ func (db *Database) NewIteratorWithStartAndPrefix(start, prefix []byte) database Err: database.ErrClosed, } } + prefixedStart := db.prefix(start) + defer db.bufferPool.Put(prefixedStart) + prefixedPrefix := db.prefix(prefix) - it := &iterator{ - Iterator: db.db.NewIteratorWithStartAndPrefix(prefixedStart, prefixedPrefix), + defer db.bufferPool.Put(prefixedPrefix) + + return &iterator{ + Iterator: db.db.NewIteratorWithStartAndPrefix(*prefixedStart, *prefixedPrefix), db: db, } - db.bufferPool.Put(prefixedStart) - db.bufferPool.Put(prefixedPrefix) - return it } func (db *Database) Compact(start, limit []byte) error { @@ -204,7 +185,14 @@ func (db *Database) Compact(start, limit []byte) error { if db.closed { return database.ErrClosed } - return db.db.Compact(db.prefix(start), db.prefix(limit)) + + prefixedStart := db.prefix(start) + defer db.bufferPool.Put(prefixedStart) + + prefixedLimit := db.prefix(limit) + defer db.bufferPool.Put(prefixedLimit) + + return db.db.Compact(*prefixedStart, *prefixedLimit) } func (db *Database) Close() error { @@ -236,23 +224,12 @@ func (db *Database) HealthCheck(ctx context.Context) (interface{}, error) { } // Return a copy of [key], prepended with this db's prefix. -// The returned slice should be put back in the pool -// when it's done being used. -func (db *Database) prefix(key []byte) []byte { - // Get a []byte from the pool - prefixedKey := db.bufferPool.Get().([]byte) +// The returned slice should be put back in the pool when it's done being used. +func (db *Database) prefix(key []byte) *[]byte { keyLen := len(db.dbPrefix) + len(key) - if cap(prefixedKey) >= keyLen { - // The [] byte we got from the pool is big enough to hold the prefixed key - prefixedKey = prefixedKey[:keyLen] - } else { - // The []byte from the pool wasn't big enough. - // Put it back and allocate a new, bigger one - db.bufferPool.Put(prefixedKey) - prefixedKey = make([]byte, keyLen) - } - copy(prefixedKey, db.dbPrefix) - copy(prefixedKey[len(db.dbPrefix):], key) + prefixedKey := db.bufferPool.Get(keyLen) + copy(*prefixedKey, db.dbPrefix) + copy((*prefixedKey)[len(db.dbPrefix):], key) return prefixedKey } @@ -264,33 +241,32 @@ type batch struct { // Each key is prepended with the database's prefix. // Each byte slice underlying a key should be returned to the pool // when this batch is reset. - ops []database.BatchOp + ops []batchOp +} + +type batchOp struct { + Key *[]byte + Value []byte + Delete bool } -// Assumes that it is OK for the argument to b.Batch.Put -// to be modified after b.Batch.Put returns -// [key] may be modified after this method returns. -// [value] may be modified after this method returns. func (b *batch) Put(key, value []byte) error { prefixedKey := b.db.prefix(key) copiedValue := slices.Clone(value) - b.ops = append(b.ops, database.BatchOp{ + b.ops = append(b.ops, batchOp{ Key: prefixedKey, Value: copiedValue, }) - return b.Batch.Put(prefixedKey, copiedValue) + return b.Batch.Put(*prefixedKey, copiedValue) } -// Assumes that it is OK for the argument to b.Batch.Delete -// to be modified after b.Batch.Delete returns -// [key] may be modified after this method returns. func (b *batch) Delete(key []byte) error { prefixedKey := b.db.prefix(key) - b.ops = append(b.ops, database.BatchOp{ + b.ops = append(b.ops, batchOp{ Key: prefixedKey, Delete: true, }) - return b.Batch.Delete(prefixedKey) + return b.Batch.Delete(*prefixedKey) } // Write flushes any accumulated data to the memory database. @@ -316,19 +292,17 @@ func (b *batch) Reset() { // Clear b.writes if cap(b.ops) > len(b.ops)*database.MaxExcessCapacityFactor { - b.ops = make([]database.BatchOp, 0, cap(b.ops)/database.CapacityReductionFactor) + b.ops = make([]batchOp, 0, cap(b.ops)/database.CapacityReductionFactor) } else { b.ops = b.ops[:0] } b.Batch.Reset() } -// Replay replays the batch contents. -// Assumes it's safe to modify the key argument to w.Delete and w.Put -// after those methods return. +// Replay the batch contents. func (b *batch) Replay(w database.KeyValueWriterDeleter) error { for _, op := range b.ops { - keyWithoutPrefix := op.Key[len(b.db.dbPrefix):] + keyWithoutPrefix := (*op.Key)[len(b.db.dbPrefix):] if op.Delete { if err := w.Delete(keyWithoutPrefix); err != nil { return err diff --git a/network/p2p/client.go b/network/p2p/client.go index 80d0118513e3..18556bfad1fa 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -8,7 +8,10 @@ import ( "errors" "fmt" + "go.uber.org/zap" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/set" ) @@ -72,6 +75,14 @@ func (c *Client) AppRequest( appRequestBytes []byte, onResponse AppResponseCallback, ) error { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendAppRequest should be non-blocking and any error other than context + // cancellation is unexpected. + // + // This guarantees that the router should never receive an unexpected + // AppResponse. + ctxWithoutCancel := context.WithoutCancel(ctx) + c.router.lock.Lock() defer c.router.lock.Unlock() @@ -87,11 +98,17 @@ func (c *Client) AppRequest( } if err := c.sender.SendAppRequest( - ctx, + ctxWithoutCancel, set.Of(nodeID), requestID, appRequestBytes, ); err != nil { + c.router.log.Error("unexpected error when sending message", + zap.Stringer("op", message.AppRequestOp), + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + zap.Error(err), + ) return err } @@ -111,8 +128,13 @@ func (c *Client) AppGossip( config common.SendConfig, appGossipBytes []byte, ) error { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendAppGossip should be non-blocking and any error other than context + // cancellation is unexpected. + ctxWithoutCancel := context.WithoutCancel(ctx) + return c.sender.SendAppGossip( - ctx, + ctxWithoutCancel, config, PrefixMessage(c.handlerPrefix, appGossipBytes), ) @@ -126,6 +148,14 @@ func (c *Client) CrossChainAppRequest( appRequestBytes []byte, onResponse CrossChainAppResponseCallback, ) error { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendCrossChainAppRequest should be non-blocking and any error other than + // context cancellation is unexpected. + // + // This guarantees that the router should never receive an unexpected + // CrossChainAppResponse. + ctxWithoutCancel := context.WithoutCancel(ctx) + c.router.lock.Lock() defer c.router.lock.Unlock() @@ -139,11 +169,17 @@ func (c *Client) CrossChainAppRequest( } if err := c.sender.SendCrossChainAppRequest( - ctx, + ctxWithoutCancel, chainID, requestID, PrefixMessage(c.handlerPrefix, appRequestBytes), ); err != nil { + c.router.log.Error("unexpected error when sending message", + zap.Stringer("op", message.CrossChainAppRequestOp), + zap.Stringer("chainID", chainID), + zap.Uint32("requestID", requestID), + zap.Error(err), + ) return err } diff --git a/network/p2p/network_test.go b/network/p2p/network_test.go index 73b6ef64d1a2..5339a6eeb315 100644 --- a/network/p2p/network_test.go +++ b/network/p2p/network_test.go @@ -180,6 +180,48 @@ func TestAppRequestResponse(t *testing.T) { <-done } +// Tests that the Client does not provide a cancelled context to the AppSender. +func TestAppRequestCancelledContext(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + sentMessages := make(chan []byte, 1) + sender := &common.SenderTest{ + SendAppRequestF: func(ctx context.Context, _ set.Set[ids.NodeID], _ uint32, msgBytes []byte) error { + require.NoError(ctx.Err()) + sentMessages <- msgBytes + return nil + }, + } + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + require.NoError(err) + client := network.NewClient(handlerID) + + wantResponse := []byte("response") + wantNodeID := ids.GenerateTestNodeID() + done := make(chan struct{}) + + callback := func(_ context.Context, gotNodeID ids.NodeID, gotResponse []byte, err error) { + require.Equal(wantNodeID, gotNodeID) + require.NoError(err) + require.Equal(wantResponse, gotResponse) + + close(done) + } + + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + + want := []byte("request") + require.NoError(client.AppRequest(cancelledCtx, set.Of(wantNodeID), want, callback)) + got := <-sentMessages + require.Equal(handlerPrefix, got[0]) + require.Equal(want, got[1:]) + + require.NoError(network.AppResponse(ctx, wantNodeID, 1, wantResponse)) + <-done +} + // Tests that the Client callback is given an error if the request fails func TestAppRequestFailed(t *testing.T) { require := require.New(t) @@ -241,6 +283,44 @@ func TestCrossChainAppRequestResponse(t *testing.T) { <-done } +// Tests that the Client does not provide a cancelled context to the AppSender. +func TestCrossChainAppRequestCancelledContext(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + sentMessages := make(chan []byte, 1) + sender := &common.SenderTest{ + SendCrossChainAppRequestF: func(ctx context.Context, _ ids.ID, _ uint32, msgBytes []byte) { + require.NoError(ctx.Err()) + sentMessages <- msgBytes + }, + } + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + require.NoError(err) + client := network.NewClient(handlerID) + + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + + wantChainID := ids.GenerateTestID() + wantResponse := []byte("response") + done := make(chan struct{}) + + callback := func(_ context.Context, gotChainID ids.ID, gotResponse []byte, err error) { + require.Equal(wantChainID, gotChainID) + require.NoError(err) + require.Equal(wantResponse, gotResponse) + + close(done) + } + + require.NoError(client.CrossChainAppRequest(cancelledCtx, wantChainID, []byte("request"), callback)) + <-sentMessages + + require.NoError(network.CrossChainAppResponse(ctx, wantChainID, 1, wantResponse)) + <-done +} + // Tests that the Client callback is given an error if the request fails func TestCrossChainAppRequestFailed(t *testing.T) { require := require.New(t) diff --git a/snow/engine/common/sender.go b/snow/engine/common/sender.go index 09389061985e..69b53a899568 100644 --- a/snow/engine/common/sender.go +++ b/snow/engine/common/sender.go @@ -167,11 +167,16 @@ type QuerySender interface { // NetworkAppSender sends VM-level messages to nodes in the network. type NetworkAppSender interface { // Send an application-level request. - // A nil return value guarantees that for each nodeID in [nodeIDs], - // the VM corresponding to this AppSender eventually receives either: + // + // The VM corresponding to this AppSender may receive either: // * An AppResponse from nodeID with ID [requestID] // * An AppRequestFailed from nodeID with ID [requestID] - // Exactly one of the above messages will eventually be received per nodeID. + // + // A nil return value guarantees that the VM corresponding to this AppSender + // will receive exactly one of the above messages. + // + // A non-nil return value guarantees that the VM corresponding to this + // AppSender will receive at most one of the above messages. SendAppRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error // Send an application-level response to a request. // This response must be in response to an AppRequest that the VM corresponding @@ -192,12 +197,16 @@ type CrossChainAppSender interface { // SendCrossChainAppRequest sends an application-level request to a // specific chain. // - // A nil return value guarantees that the VM corresponding to this - // CrossChainAppSender eventually receives either: + // The VM corresponding to this CrossChainAppSender may receive either: // * A CrossChainAppResponse from [chainID] with ID [requestID] // * A CrossChainAppRequestFailed from [chainID] with ID [requestID] - // Exactly one of the above messages will eventually be received from - // [chainID]. + // + // A nil return value guarantees that the VM corresponding to this + // CrossChainAppSender will eventually receive exactly one of the above + // messages. + // + // A non-nil return value guarantees that the VM corresponding to this + // CrossChainAppSender will receive at most one of the above messages. SendCrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, appRequestBytes []byte) error // SendCrossChainAppResponse sends an application-level response to a // specific chain diff --git a/utils/bytes.go b/utils/bytes.go index a32f353cf75e..4232b98fc66f 100644 --- a/utils/bytes.go +++ b/utils/bytes.go @@ -3,7 +3,11 @@ package utils -import "crypto/rand" +import ( + "crypto/rand" + "math/bits" + "sync" +) // RandomBytes returns a slice of n random bytes // Intended for use in testing @@ -12,3 +16,68 @@ func RandomBytes(n int) []byte { _, _ = rand.Read(b) return b } + +// Constant taken from the "math" package +const intSize = 32 << (^uint(0) >> 63) // 32 or 64 + +// BytesPool tracks buckets of available buffers to be allocated. Each bucket +// allocates buffers of the following length: +// +// 0 +// 1 +// 3 +// 7 +// 15 +// 31 +// 63 +// 127 +// ... +// MaxInt +// +// In order to allocate a buffer of length 19 (for example), we calculate the +// number of bits required to represent 19 (5). And therefore allocate a slice +// from bucket 5, which has length 31. This is the bucket which produces the +// smallest slices that are at least length 19. +// +// When replacing a buffer of length 19, we calculate the number of bits +// required to represent 20 (5). And therefore place the slice into bucket 4, +// which has length 15. This is the bucket which produces the largest slices +// that a length 19 slice can be used for. +type BytesPool [intSize]sync.Pool + +func NewBytesPool() *BytesPool { + var p BytesPool + for i := range p { + // uint is used here to avoid overflowing int during the shift + size := uint(1)< 0; size-- { + p.Put(p.Get(size)) + } + } +} + +func BenchmarkBytesPool_Ascending(b *testing.B) { + p := NewBytesPool() + for i := 0; i < b.N; i++ { + for size := 0; size < 100_000; size++ { + p.Put(p.Get(size)) + } + } +} + +func BenchmarkBytesPool_Random(b *testing.B) { + p := NewBytesPool() + sizes := make([]int, 1_000) + for i := range sizes { + sizes[i] = rand.Intn(100_000) //#nosec G404 + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, size := range sizes { + p.Put(p.Get(size)) + } + } +} diff --git a/utils/linked/hashmap.go b/utils/linked/hashmap.go index b17b7b60972d..9e85e22f0081 100644 --- a/utils/linked/hashmap.go +++ b/utils/linked/hashmap.go @@ -20,10 +20,19 @@ type Hashmap[K comparable, V any] struct { } func NewHashmap[K comparable, V any]() *Hashmap[K, V] { - return &Hashmap[K, V]{ - entryMap: make(map[K]*ListElement[keyValue[K, V]]), + return NewHashmapWithSize[K, V](0) +} + +func NewHashmapWithSize[K comparable, V any](initialSize int) *Hashmap[K, V] { + lh := &Hashmap[K, V]{ + entryMap: make(map[K]*ListElement[keyValue[K, V]], initialSize), entryList: NewList[keyValue[K, V]](), + freeList: make([]*ListElement[keyValue[K, V]], initialSize), } + for i := range lh.freeList { + lh.freeList[i] = &ListElement[keyValue[K, V]]{} + } + return lh } func (lh *Hashmap[K, V]) Put(key K, value V) { @@ -63,14 +72,25 @@ func (lh *Hashmap[K, V]) Get(key K) (V, bool) { func (lh *Hashmap[K, V]) Delete(key K) bool { e, ok := lh.entryMap[key] if ok { - lh.entryList.Remove(e) - delete(lh.entryMap, key) - e.Value = keyValue[K, V]{} // Free the key value pair - lh.freeList = append(lh.freeList, e) + lh.remove(e) } return ok } +func (lh *Hashmap[K, V]) Clear() { + for _, e := range lh.entryMap { + lh.remove(e) + } +} + +// remove assumes that [e] is currently in the Hashmap. +func (lh *Hashmap[K, V]) remove(e *ListElement[keyValue[K, V]]) { + delete(lh.entryMap, e.Value.key) + lh.entryList.Remove(e) + e.Value = keyValue[K, V]{} // Free the key value pair + lh.freeList = append(lh.freeList, e) +} + func (lh *Hashmap[K, V]) Len() int { return len(lh.entryMap) } diff --git a/utils/linked/hashmap_test.go b/utils/linked/hashmap_test.go index 1920180b180f..25131888dcbc 100644 --- a/utils/linked/hashmap_test.go +++ b/utils/linked/hashmap_test.go @@ -95,6 +95,23 @@ func TestHashmap(t *testing.T) { require.Equal(1, val1, "wrong value") } +func TestHashmapClear(t *testing.T) { + require := require.New(t) + + lh := NewHashmap[int, int]() + lh.Put(1, 1) + lh.Put(2, 2) + + lh.Clear() + + require.Empty(lh.entryMap) + require.Zero(lh.entryList.Len()) + require.Len(lh.freeList, 2) + for _, e := range lh.freeList { + require.Zero(e.Value) // Make sure the value is cleared + } +} + func TestIterator(t *testing.T) { require := require.New(t) id1, id2, id3 := ids.GenerateTestID(), ids.GenerateTestID(), ids.GenerateTestID() diff --git a/x/merkledb/cache.go b/x/merkledb/cache.go index cdd553d82954..92e8c8739c5e 100644 --- a/x/merkledb/cache.go +++ b/x/merkledb/cache.go @@ -65,15 +65,14 @@ func (c *onEvictCache[K, V]) Put(key K, value V) error { } // Flush removes all elements from the cache. -// Returns the last non-nil error during [c.onEviction], if any. -// If [c.onEviction] errors, it will still be called for any -// subsequent elements and the cache will still be emptied. +// +// Returns the first non-nil error returned by [c.onEviction], if any. +// +// If [c.onEviction] errors, it will still be called for any subsequent elements +// and the cache will still be emptied. func (c *onEvictCache[K, V]) Flush() error { c.lock.Lock() - defer func() { - c.fifo = linked.NewHashmap[K, V]() - c.lock.Unlock() - }() + defer c.lock.Unlock() return c.resize(0) } diff --git a/x/merkledb/db.go b/x/merkledb/db.go index 46a6fc653568..ea34d0a8d9f3 100644 --- a/x/merkledb/db.go +++ b/x/merkledb/db.go @@ -260,13 +260,9 @@ func newDatabase( rootGenConcurrency = int(config.RootGenConcurrency) } - // Share a sync.Pool of []byte between the intermediateNodeDB and valueNodeDB - // to reduce memory allocations. - bufferPool := &sync.Pool{ - New: func() interface{} { - return make([]byte, 0, defaultBufferLength) - }, - } + // Share a bytes pool between the intermediateNodeDB and valueNodeDB to + // reduce memory allocations. + bufferPool := utils.NewBytesPool() trieDB := &merkleDB{ metrics: metrics, @@ -932,10 +928,10 @@ func (db *merkleDB) commitBatch(ops []database.BatchOp) error { return view.commitToDB(context.Background()) } -// commitChanges commits the changes in [trieToCommit] to [db]. +// commitView commits the changes in [trieToCommit] to [db]. // Assumes [trieToCommit]'s node IDs have been calculated. // Assumes [db.commitLock] is held. -func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error { +func (db *merkleDB) commitView(ctx context.Context, trieToCommit *view) error { db.lock.Lock() defer db.lock.Unlock() @@ -953,7 +949,7 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error } changes := trieToCommit.changes - _, span := db.infoTracer.Start(ctx, "MerkleDB.commitChanges", oteltrace.WithAttributes( + _, span := db.infoTracer.Start(ctx, "MerkleDB.commitView", oteltrace.WithAttributes( attribute.Int("nodesChanged", len(changes.nodes)), attribute.Int("valuesChanged", len(changes.values)), )) @@ -969,8 +965,46 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error return nil } - currentValueNodeBatch := db.valueNodeDB.NewBatch() - _, nodesSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.writeNodes") + valueNodeBatch := db.baseDB.NewBatch() + if err := db.applyChanges(ctx, valueNodeBatch, changes); err != nil { + return err + } + + if err := db.commitValueChanges(ctx, valueNodeBatch); err != nil { + return err + } + + db.history.record(changes) + + // Update root in database. + db.root = changes.rootChange.after + db.rootID = changes.rootID + return nil +} + +// moveChildViewsToDB removes any child views from the trieToCommit and moves +// them to the db. +// +// assumes [db.lock] is held +func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) { + trieToCommit.validityTrackingLock.Lock() + defer trieToCommit.validityTrackingLock.Unlock() + + for _, childView := range trieToCommit.childViews { + childView.updateParent(db) + db.childViews = append(db.childViews, childView) + } + trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize) +} + +// applyChanges takes the [changes] and applies them to [db.intermediateNodeDB] +// and [valueNodeBatch]. +// +// assumes [db.lock] is held +func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch database.KeyValueWriterDeleter, changes *changeSummary) error { + _, span := db.infoTracer.Start(ctx, "MerkleDB.applyChanges") + defer span.End() + for key, nodeChange := range changes.nodes { shouldAddIntermediate := nodeChange.after != nil && !nodeChange.after.hasValue() shouldDeleteIntermediate := !shouldAddIntermediate && nodeChange.before != nil && !nodeChange.before.hasValue() @@ -980,50 +1014,34 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error if shouldAddIntermediate { if err := db.intermediateNodeDB.Put(key, nodeChange.after); err != nil { - nodesSpan.End() return err } } else if shouldDeleteIntermediate { if err := db.intermediateNodeDB.Delete(key); err != nil { - nodesSpan.End() return err } } if shouldAddValue { - currentValueNodeBatch.Put(key, nodeChange.after) + if err := db.valueNodeDB.Write(valueNodeBatch, key, nodeChange.after); err != nil { + return err + } } else if shouldDeleteValue { - currentValueNodeBatch.Delete(key) + if err := db.valueNodeDB.Write(valueNodeBatch, key, nil); err != nil { + return err + } } } - nodesSpan.End() - - _, commitSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.valueNodeDBCommit") - err := currentValueNodeBatch.Write() - commitSpan.End() - if err != nil { - return err - } - - db.history.record(changes) - - // Update root in database. - db.root = changes.rootChange.after - db.rootID = changes.rootID return nil } -// moveChildViewsToDB removes any child views from the trieToCommit and moves them to the db -// assumes [db.lock] is held -func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) { - trieToCommit.validityTrackingLock.Lock() - defer trieToCommit.validityTrackingLock.Unlock() +// commitValueChanges is a thin wrapper around [valueNodeBatch.Write()] to +// provide tracing. +func (db *merkleDB) commitValueChanges(ctx context.Context, valueNodeBatch database.Batch) error { + _, span := db.infoTracer.Start(ctx, "MerkleDB.commitValueChanges") + defer span.End() - for _, childView := range trieToCommit.childViews { - childView.updateParent(db) - db.childViews = append(db.childViews, childView) - } - trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize) + return valueNodeBatch.Write() } // CommitToDB is a no-op for db since it is already in sync with itself. @@ -1341,33 +1359,17 @@ func (db *merkleDB) getTokenSize() int { } // Returns [key] prefixed by [prefix]. -// The returned []byte is taken from [bufferPool] and -// should be returned to it when the caller is done with it. -func addPrefixToKey(bufferPool *sync.Pool, prefix []byte, key []byte) []byte { +// The returned *[]byte is taken from [bufferPool] and should be returned to it +// when the caller is done with it. +func addPrefixToKey(bufferPool *utils.BytesPool, prefix []byte, key []byte) *[]byte { prefixLen := len(prefix) keyLen := prefixLen + len(key) - prefixedKey := getBufferFromPool(bufferPool, keyLen) - copy(prefixedKey, prefix) - copy(prefixedKey[prefixLen:], key) + prefixedKey := bufferPool.Get(keyLen) + copy(*prefixedKey, prefix) + copy((*prefixedKey)[prefixLen:], key) return prefixedKey } -// Returns a []byte from [bufferPool] with length exactly [size]. -// The []byte is not guaranteed to be zeroed. -func getBufferFromPool(bufferPool *sync.Pool, size int) []byte { - buffer := bufferPool.Get().([]byte) - if cap(buffer) >= size { - // The [] byte we got from the pool is big enough to hold the prefixed key - buffer = buffer[:size] - } else { - // The []byte from the pool wasn't big enough. - // Put it back and allocate a new, bigger one - bufferPool.Put(buffer) - buffer = make([]byte, size) - } - return buffer -} - // cacheEntrySize returns a rough approximation of the memory consumed by storing the key and node. func cacheEntrySize(key Key, n *node) int { if n == nil { diff --git a/x/merkledb/db_test.go b/x/merkledb/db_test.go index 02ce9cf84d0f..4108d4c5cdfd 100644 --- a/x/merkledb/db_test.go +++ b/x/merkledb/db_test.go @@ -6,6 +6,7 @@ package merkledb import ( "bytes" "context" + "encoding/binary" "fmt" "math/rand" "slices" @@ -39,7 +40,7 @@ func newDB(ctx context.Context, db database.Database, config Config) (*merkleDB, func newDefaultConfig() Config { return Config{ - IntermediateWriteBatchSize: 10, + IntermediateWriteBatchSize: 256 * units.KiB, HistoryLength: defaultHistoryLength, ValueNodeCacheSize: units.MiB, IntermediateNodeCacheSize: units.MiB, @@ -1330,3 +1331,72 @@ func TestCrashRecovery(t *testing.T) { require.NoError(err) require.Equal(expectedRoot, rootAfterRecovery) } + +func BenchmarkCommitView(b *testing.B) { + db, err := getBasicDB() + require.NoError(b, err) + + ops := make([]database.BatchOp, 1_000) + for i := range ops { + k := binary.AppendUvarint(nil, uint64(i)) + ops[i] = database.BatchOp{ + Key: k, + Value: hashing.ComputeHash256(k), + } + } + + ctx := context.Background() + viewIntf, err := db.NewView(ctx, ViewChanges{BatchOps: ops}) + require.NoError(b, err) + + view := viewIntf.(*view) + require.NoError(b, view.applyValueChanges(ctx)) + + b.Run("apply and commit changes", func(b *testing.B) { + require := require.New(b) + + for i := 0; i < b.N; i++ { + db.baseDB = memdb.New() // Keep each iteration independent + + valueNodeBatch := db.baseDB.NewBatch() + require.NoError(db.applyChanges(ctx, valueNodeBatch, view.changes)) + require.NoError(db.commitValueChanges(ctx, valueNodeBatch)) + } + }) +} + +func BenchmarkIteration(b *testing.B) { + db, err := getBasicDB() + require.NoError(b, err) + + ops := make([]database.BatchOp, 1_000) + for i := range ops { + k := binary.AppendUvarint(nil, uint64(i)) + ops[i] = database.BatchOp{ + Key: k, + Value: hashing.ComputeHash256(k), + } + } + + ctx := context.Background() + view, err := db.NewView(ctx, ViewChanges{BatchOps: ops}) + require.NoError(b, err) + + require.NoError(b, view.CommitToDB(ctx)) + + b.Run("create iterator", func(b *testing.B) { + for i := 0; i < b.N; i++ { + it := db.NewIterator() + it.Release() + } + }) + + b.Run("iterate", func(b *testing.B) { + for i := 0; i < b.N; i++ { + it := db.NewIterator() + for it.Next() { + } + it.Release() + } + }) +} diff --git a/x/merkledb/intermediate_node_db.go b/x/merkledb/intermediate_node_db.go index e57dcb31834b..4542f864f188 100644 --- a/x/merkledb/intermediate_node_db.go +++ b/x/merkledb/intermediate_node_db.go @@ -4,20 +4,16 @@ package merkledb import ( - "sync" - "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils" ) -const defaultBufferLength = 256 - // Holds intermediate nodes. That is, those without values. // Changes to this database aren't written to [baseDB] until // they're evicted from the [nodeCache] or Flush is called. type intermediateNodeDB struct { - // Holds unused []byte - bufferPool *sync.Pool + bufferPool *utils.BytesPool // The underlying storage. // Keys written to [baseDB] are prefixed with [intermediateNodePrefix]. @@ -41,7 +37,7 @@ type intermediateNodeDB struct { func newIntermediateNodeDB( db database.Database, - bufferPool *sync.Pool, + bufferPool *utils.BytesPool, metrics merkleMetrics, cacheSize int, writeBufferSize int, @@ -98,14 +94,15 @@ func (db *intermediateNodeDB) onEviction(key Key, n *node) error { return nil } -func (db *intermediateNodeDB) addToBatch(b database.Batch, key Key, n *node) error { +func (db *intermediateNodeDB) addToBatch(b database.KeyValueWriterDeleter, key Key, n *node) error { dbKey := db.constructDBKey(key) defer db.bufferPool.Put(dbKey) + db.metrics.DatabaseNodeWrite() if n == nil { - return b.Delete(dbKey) + return b.Delete(*dbKey) } - return b.Put(dbKey, n.bytes()) + return b.Put(*dbKey, n.bytes()) } func (db *intermediateNodeDB) Get(key Key) (*node, error) { @@ -128,12 +125,13 @@ func (db *intermediateNodeDB) Get(key Key) (*node, error) { db.metrics.IntermediateNodeCacheMiss() dbKey := db.constructDBKey(key) + defer db.bufferPool.Put(dbKey) + db.metrics.DatabaseNodeRead() - nodeBytes, err := db.baseDB.Get(dbKey) + nodeBytes, err := db.baseDB.Get(*dbKey) if err != nil { return nil, err } - db.bufferPool.Put(dbKey) return parseNode(key, nodeBytes) } @@ -142,7 +140,7 @@ func (db *intermediateNodeDB) Get(key Key) (*node, error) { // We need to be able to differentiate between two keys of equal // byte length but different bit length, so we add padding to differentiate. // Additionally, we add a prefix indicating it is part of the intermediateNodeDB. -func (db *intermediateNodeDB) constructDBKey(key Key) []byte { +func (db *intermediateNodeDB) constructDBKey(key Key) *[]byte { if db.tokenSize == 8 { // For tokens of size byte, no padding is needed since byte length == token length return addPrefixToKey(db.bufferPool, intermediateNodePrefix, key.Bytes()) diff --git a/x/merkledb/intermediate_node_db_test.go b/x/merkledb/intermediate_node_db_test.go index 26ad722ffa45..4c0d29c44cfd 100644 --- a/x/merkledb/intermediate_node_db_test.go +++ b/x/merkledb/intermediate_node_db_test.go @@ -4,13 +4,13 @@ package merkledb import ( - "sync" "testing" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/maybe" ) @@ -35,9 +35,7 @@ func Test_IntermediateNodeDB(t *testing.T) { baseDB := memdb.New() db := newIntermediateNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, bufferSize, @@ -149,9 +147,7 @@ func FuzzIntermediateNodeDBConstructDBKey(f *testing.F) { for _, tokenSize := range validTokenSizes { db := newIntermediateNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, bufferSize, @@ -167,18 +163,18 @@ func FuzzIntermediateNodeDBConstructDBKey(f *testing.F) { p = p.Take(int(uBitLength)) constructedKey := db.constructDBKey(p) baseLength := len(p.value) + len(intermediateNodePrefix) - require.Equal(intermediateNodePrefix, constructedKey[:len(intermediateNodePrefix)]) + require.Equal(intermediateNodePrefix, (*constructedKey)[:len(intermediateNodePrefix)]) switch { case tokenSize == 8: // for keys with tokens of size byte, no padding is added - require.Equal(p.Bytes(), constructedKey[len(intermediateNodePrefix):]) + require.Equal(p.Bytes(), (*constructedKey)[len(intermediateNodePrefix):]) case p.hasPartialByte(): - require.Len(constructedKey, baseLength) - require.Equal(p.Extend(ToToken(1, tokenSize)).Bytes(), constructedKey[len(intermediateNodePrefix):]) + require.Len(*constructedKey, baseLength) + require.Equal(p.Extend(ToToken(1, tokenSize)).Bytes(), (*constructedKey)[len(intermediateNodePrefix):]) default: // when a whole number of bytes, there is an extra padding byte - require.Len(constructedKey, baseLength+1) - require.Equal(p.Extend(ToToken(1, tokenSize)).Bytes(), constructedKey[len(intermediateNodePrefix):]) + require.Len(*constructedKey, baseLength+1) + require.Equal(p.Extend(ToToken(1, tokenSize)).Bytes(), (*constructedKey)[len(intermediateNodePrefix):]) } } }) @@ -192,9 +188,7 @@ func Test_IntermediateNodeDB_ConstructDBKey_DirtyBuffer(t *testing.T) { baseDB := memdb.New() db := newIntermediateNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, bufferSize, @@ -202,23 +196,19 @@ func Test_IntermediateNodeDB_ConstructDBKey_DirtyBuffer(t *testing.T) { 4, ) - db.bufferPool.Put([]byte{0xFF, 0xFF, 0xFF}) + db.bufferPool.Put(&[]byte{0xFF, 0xFF, 0xFF}) constructedKey := db.constructDBKey(ToKey([]byte{})) - require.Len(constructedKey, 2) - require.Equal(intermediateNodePrefix, constructedKey[:len(intermediateNodePrefix)]) - require.Equal(byte(16), constructedKey[len(constructedKey)-1]) - - db.bufferPool = &sync.Pool{ - New: func() interface{} { - return make([]byte, 0, defaultBufferLength) - }, - } - db.bufferPool.Put([]byte{0xFF, 0xFF, 0xFF}) + require.Len(*constructedKey, 2) + require.Equal(intermediateNodePrefix, (*constructedKey)[:len(intermediateNodePrefix)]) + require.Equal(byte(16), (*constructedKey)[len(*constructedKey)-1]) + + db.bufferPool = utils.NewBytesPool() + db.bufferPool.Put(&[]byte{0xFF, 0xFF, 0xFF}) p := ToKey([]byte{0xF0}).Take(4) constructedKey = db.constructDBKey(p) - require.Len(constructedKey, 2) - require.Equal(intermediateNodePrefix, constructedKey[:len(intermediateNodePrefix)]) - require.Equal(p.Extend(ToToken(1, 4)).Bytes(), constructedKey[len(intermediateNodePrefix):]) + require.Len(*constructedKey, 2) + require.Equal(intermediateNodePrefix, (*constructedKey)[:len(intermediateNodePrefix)]) + require.Equal(p.Extend(ToToken(1, 4)).Bytes(), (*constructedKey)[len(intermediateNodePrefix):]) } func TestIntermediateNodeDBClear(t *testing.T) { @@ -229,9 +219,7 @@ func TestIntermediateNodeDBClear(t *testing.T) { baseDB := memdb.New() db := newIntermediateNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, bufferSize, @@ -265,9 +253,7 @@ func TestIntermediateNodeDBDeleteEmptyKey(t *testing.T) { baseDB := memdb.New() db := newIntermediateNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, bufferSize, @@ -280,7 +266,7 @@ func TestIntermediateNodeDBDeleteEmptyKey(t *testing.T) { require.NoError(db.Flush()) emptyDBKey := db.constructDBKey(emptyKey) - has, err := baseDB.Has(emptyDBKey) + has, err := baseDB.Has(*emptyDBKey) require.NoError(err) require.True(has) @@ -288,7 +274,7 @@ func TestIntermediateNodeDBDeleteEmptyKey(t *testing.T) { require.NoError(db.Flush()) emptyDBKey = db.constructDBKey(emptyKey) - has, err = baseDB.Has(emptyDBKey) + has, err = baseDB.Has(*emptyDBKey) require.NoError(err) require.False(has) } diff --git a/x/merkledb/value_node_db.go b/x/merkledb/value_node_db.go index 8ee7d7436fcc..42457a1b7076 100644 --- a/x/merkledb/value_node_db.go +++ b/x/merkledb/value_node_db.go @@ -4,18 +4,21 @@ package merkledb import ( - "sync" + "errors" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/utils" ) -var _ database.Iterator = (*iterator)(nil) +var ( + _ database.Iterator = (*iterator)(nil) + + errNodeMissingValue = errors.New("valueNodeDB contains node without a value") +) type valueNodeDB struct { - // Holds unused []byte - bufferPool *sync.Pool + bufferPool *utils.BytesPool // The underlying storage. // Keys written to [baseDB] are prefixed with [valueNodePrefix]. @@ -31,7 +34,7 @@ type valueNodeDB struct { func newValueNodeDB( db database.Database, - bufferPool *sync.Pool, + bufferPool *utils.BytesPool, metrics merkleMetrics, cacheSize int, ) *valueNodeDB { @@ -43,29 +46,35 @@ func newValueNodeDB( } } +func (db *valueNodeDB) Write(batch database.KeyValueWriterDeleter, key Key, n *node) error { + db.metrics.DatabaseNodeWrite() + db.nodeCache.Put(key, n) + prefixedKey := addPrefixToKey(db.bufferPool, valueNodePrefix, key.Bytes()) + defer db.bufferPool.Put(prefixedKey) + + if n == nil { + return batch.Delete(*prefixedKey) + } + return batch.Put(*prefixedKey, n.bytes()) +} + func (db *valueNodeDB) newIteratorWithStartAndPrefix(start, prefix []byte) database.Iterator { prefixedStart := addPrefixToKey(db.bufferPool, valueNodePrefix, start) + defer db.bufferPool.Put(prefixedStart) + prefixedPrefix := addPrefixToKey(db.bufferPool, valueNodePrefix, prefix) - i := &iterator{ + defer db.bufferPool.Put(prefixedPrefix) + + return &iterator{ db: db, - nodeIter: db.baseDB.NewIteratorWithStartAndPrefix(prefixedStart, prefixedPrefix), + nodeIter: db.baseDB.NewIteratorWithStartAndPrefix(*prefixedStart, *prefixedPrefix), } - db.bufferPool.Put(prefixedStart) - db.bufferPool.Put(prefixedPrefix) - return i } func (db *valueNodeDB) Close() { db.closed.Set(true) } -func (db *valueNodeDB) NewBatch() *valueNodeBatch { - return &valueNodeBatch{ - db: db, - ops: make(map[Key]*node, defaultBufferLength), - } -} - func (db *valueNodeDB) Get(key Key) (*node, error) { if cachedValue, isCached := db.nodeCache.Get(key); isCached { db.metrics.ValueNodeCacheHit() @@ -80,7 +89,7 @@ func (db *valueNodeDB) Get(key Key) (*node, error) { defer db.bufferPool.Put(prefixedKey) db.metrics.DatabaseNodeRead() - nodeBytes, err := db.baseDB.Get(prefixedKey) + nodeBytes, err := db.baseDB.Get(*prefixedKey) if err != nil { return nil, err } @@ -93,45 +102,11 @@ func (db *valueNodeDB) Clear() error { return database.AtomicClearPrefix(db.baseDB, db.baseDB, valueNodePrefix) } -// Batch of database operations -type valueNodeBatch struct { - db *valueNodeDB - ops map[Key]*node -} - -func (b *valueNodeBatch) Put(key Key, value *node) { - b.ops[key] = value -} - -func (b *valueNodeBatch) Delete(key Key) { - b.ops[key] = nil -} - -// Write flushes any accumulated data to the underlying database. -func (b *valueNodeBatch) Write() error { - dbBatch := b.db.baseDB.NewBatch() - for key, n := range b.ops { - b.db.metrics.DatabaseNodeWrite() - b.db.nodeCache.Put(key, n) - prefixedKey := addPrefixToKey(b.db.bufferPool, valueNodePrefix, key.Bytes()) - if n == nil { - if err := dbBatch.Delete(prefixedKey); err != nil { - return err - } - } else if err := dbBatch.Put(prefixedKey, n.bytes()); err != nil { - return err - } - - b.db.bufferPool.Put(prefixedKey) - } - - return dbBatch.Write() -} - type iterator struct { db *valueNodeDB nodeIter database.Iterator - current *node + key []byte + value []byte err error } @@ -146,21 +121,16 @@ func (i *iterator) Error() error { } func (i *iterator) Key() []byte { - if i.current == nil { - return nil - } - return i.current.key.Bytes() + return i.key } func (i *iterator) Value() []byte { - if i.current == nil { - return nil - } - return i.current.value.Value() + return i.value } func (i *iterator) Next() bool { - i.current = nil + i.key = nil + i.value = nil if i.Error() != nil || i.db.closed.Get() { return false } @@ -169,15 +139,25 @@ func (i *iterator) Next() bool { } i.db.metrics.DatabaseNodeRead() - key := i.nodeIter.Key() - key = key[valueNodePrefixLen:] - n, err := parseNode(ToKey(key), i.nodeIter.Value()) + + r := codecReader{ + b: i.nodeIter.Value(), + // We are discarding the other bytes from the node, so we avoid copying + // the value here. + copy: false, + } + maybeValue, err := r.MaybeBytes() if err != nil { i.err = err return false } + if maybeValue.IsNothing() { + i.err = errNodeMissingValue + return false + } - i.current = n + i.key = i.nodeIter.Key()[valueNodePrefixLen:] + i.value = maybeValue.Value() return true } diff --git a/x/merkledb/value_node_db_test.go b/x/merkledb/value_node_db_test.go index 224a4fe94ac1..b30efa008fce 100644 --- a/x/merkledb/value_node_db_test.go +++ b/x/merkledb/value_node_db_test.go @@ -4,13 +4,13 @@ package merkledb import ( - "sync" "testing" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/maybe" ) @@ -23,9 +23,7 @@ func TestValueNodeDB(t *testing.T) { cacheSize := 10_000 db := newValueNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, ) @@ -42,8 +40,8 @@ func TestValueNodeDB(t *testing.T) { }, key: key, } - batch := db.NewBatch() - batch.Put(key, node1) + batch := db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, node1)) require.NoError(batch.Write()) // Get the key-node pair. @@ -52,8 +50,8 @@ func TestValueNodeDB(t *testing.T) { require.Equal(node1, node1Read) // Delete the key-node pair. - batch = db.NewBatch() - batch.Delete(key) + batch = db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, nil)) require.NoError(batch.Write()) // Key should be gone now. @@ -61,9 +59,9 @@ func TestValueNodeDB(t *testing.T) { require.ErrorIs(err, database.ErrNotFound) // Put a key-node pair and delete it in the same batch. - batch = db.NewBatch() - batch.Put(key, node1) - batch.Delete(key) + batch = db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, node1)) + require.NoError(db.Write(batch, key, nil)) require.NoError(batch.Write()) // Key should still be gone. @@ -77,9 +75,9 @@ func TestValueNodeDB(t *testing.T) { }, key: key, } - batch = db.NewBatch() - batch.Put(key, node1) - batch.Put(key, node2) + batch = db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, node1)) + require.NoError(db.Write(batch, key, node2)) require.NoError(batch.Write()) // Get the key-node pair. @@ -88,8 +86,8 @@ func TestValueNodeDB(t *testing.T) { require.Equal(node2, node2Read) // Overwrite the key-node pair in a subsequent batch. - batch = db.NewBatch() - batch.Put(key, node1) + batch = db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, node1)) require.NoError(batch.Write()) // Get the key-node pair. @@ -118,9 +116,7 @@ func TestValueNodeDBIterator(t *testing.T) { cacheSize := 10 db := newValueNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, ) @@ -134,8 +130,8 @@ func TestValueNodeDBIterator(t *testing.T) { }, key: key, } - batch := db.NewBatch() - batch.Put(key, node) + batch := db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, node)) require.NoError(batch.Write()) } @@ -172,8 +168,8 @@ func TestValueNodeDBIterator(t *testing.T) { }, key: key, } - batch := db.NewBatch() - batch.Put(key, n) + batch := db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, n)) require.NoError(batch.Write()) key = ToKey([]byte{0xFF, 0x01}) @@ -183,8 +179,8 @@ func TestValueNodeDBIterator(t *testing.T) { }, key: key, } - batch = db.NewBatch() - batch.Put(key, n) + batch = db.baseDB.NewBatch() + require.NoError(db.Write(batch, key, n)) require.NoError(batch.Write()) // Iterate over the key-node pairs with a prefix. @@ -225,16 +221,14 @@ func TestValueNodeDBClear(t *testing.T) { baseDB := memdb.New() db := newValueNodeDB( baseDB, - &sync.Pool{ - New: func() interface{} { return make([]byte, 0) }, - }, + utils.NewBytesPool(), &mockMetrics{}, cacheSize, ) - batch := db.NewBatch() + batch := db.baseDB.NewBatch() for _, b := range [][]byte{{1}, {2}, {3}} { - batch.Put(ToKey(b), newNode(ToKey(b))) + require.NoError(db.Write(batch, ToKey(b), newNode(ToKey(b)))) } require.NoError(batch.Write()) diff --git a/x/merkledb/view.go b/x/merkledb/view.go index 87325847e777..4ced77e51a02 100644 --- a/x/merkledb/view.go +++ b/x/merkledb/view.go @@ -495,13 +495,13 @@ func (v *view) commitToDB(ctx context.Context) error { )) defer span.End() - // Call this here instead of in [v.db.commitChanges] - // because doing so there would be a deadlock. + // Call this here instead of in [v.db.commitView] because doing so there + // would be a deadlock. if err := v.applyValueChanges(ctx); err != nil { return err } - if err := v.db.commitChanges(ctx, v); err != nil { + if err := v.db.commitView(ctx, v); err != nil { return err } diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 15f59cc5885a..18530d1c4e7c 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -286,7 +286,14 @@ func (c *networkClient) sendRequestLocked( // Send an app request to the peer. nodeIDs := set.Of(nodeID) - if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendAppRequest should be non-blocking and any error other than context + // cancellation is unexpected. + // + // This guarantees that the network should never receive an unexpected + // AppResponse. + ctxWithoutCancel := context.WithoutCancel(ctx) + if err := c.appSender.SendAppRequest(ctxWithoutCancel, nodeIDs, requestID, request); err != nil { c.lock.Unlock() c.log.Fatal("failed to send app request", zap.Stringer("nodeID", nodeID),