Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/sei-protocol/sei-db into Pe…
Browse files Browse the repository at this point in the history
…bbleDBIteratorBug
  • Loading branch information
kbhat1 authored and kbhat1 committed Jan 19, 2024
2 parents 24c9618 + 5b5a5a2 commit 1076007
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 25 deletions.
3 changes: 2 additions & 1 deletion sc/memiavl/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package memiavl

import (
"bytes"
"context"
"encoding/binary"
"math/rand"
"sort"
Expand Down Expand Up @@ -32,7 +33,7 @@ func BenchmarkRandomGet(b *testing.B) {
}

snapshotDir := b.TempDir()
err := tree.WriteSnapshot(snapshotDir)
err := tree.WriteSnapshot(context.Background(), snapshotDir)
require.NoError(b, err)
snapshot, err := OpenSnapshot(snapshotDir)
require.NoError(b, err)
Expand Down
29 changes: 22 additions & 7 deletions sc/memiavl/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -51,6 +52,8 @@ type DB struct {

// result channel of snapshot rewrite goroutine
snapshotRewriteChan chan snapshotResult
// context cancel function to cancel the snapshot rewrite goroutine
snapshotRewriteCancelFunc context.CancelFunc
// the number of old snapshots to keep (excluding the latest one)
snapshotKeepRecent uint32
// block interval to take a new snapshot
Expand Down Expand Up @@ -352,6 +355,7 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {
select {
case result := <-db.snapshotRewriteChan:
db.snapshotRewriteChan = nil
db.snapshotRewriteCancelFunc = nil

if result.mtree == nil {
// background snapshot rewrite failed
Expand Down Expand Up @@ -380,7 +384,7 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {
if err := db.reloadMultiTree(result.mtree); err != nil {
return fmt.Errorf("switch multitree failed: %w", err)
}
db.logger.Info("switched to new snapshot", "version", db.MultiTree.Version())
db.logger.Info("switched to new memiavl snapshot", "version", db.MultiTree.Version())

db.pruneSnapshots()
default:
Expand Down Expand Up @@ -494,7 +498,7 @@ func (db *DB) copy(cacheSize int) *DB {
}

// RewriteSnapshot writes the current version of memiavl into a snapshot, and update the `current` symlink.
func (db *DB) RewriteSnapshot() error {
func (db *DB) RewriteSnapshot(ctx context.Context) error {
db.mtx.Lock()
defer db.mtx.Unlock()

Expand All @@ -505,7 +509,7 @@ func (db *DB) RewriteSnapshot() error {
snapshotDir := snapshotName(db.lastCommitInfo.Version)
tmpDir := snapshotDir + "-tmp"
path := filepath.Join(db.dir, tmpDir)
if err := db.MultiTree.WriteSnapshot(path, db.snapshotWriterPool); err != nil {
if err := db.MultiTree.WriteSnapshot(ctx, path, db.snapshotWriterPool); err != nil {
return errorutils.Join(err, os.RemoveAll(path))
}
if err := os.Rename(path, filepath.Join(db.dir, snapshotDir)); err != nil {
Expand Down Expand Up @@ -573,15 +577,17 @@ func (db *DB) rewriteSnapshotBackground() error {
return errors.New("there's another ongoing snapshot rewriting process")
}

ctx, cancel := context.WithCancel(context.Background())
ch := make(chan snapshotResult)
db.snapshotRewriteChan = ch
db.snapshotRewriteCancelFunc = cancel

cloned := db.copy(0)
go func() {
defer close(ch)

cloned.logger.Info("start rewriting snapshot", "version", cloned.Version())
if err := cloned.RewriteSnapshot(); err != nil {
if err := cloned.RewriteSnapshot(ctx); err != nil {
ch <- snapshotResult{err: err}
return
}
Expand Down Expand Up @@ -611,8 +617,17 @@ func (db *DB) Close() error {
defer db.mtx.Unlock()

errs := []error{
db.streamHandler.Close(), db.MultiTree.Close(),
db.streamHandler.Close(),
}
if db.snapshotRewriteChan != nil {
db.snapshotRewriteCancelFunc()
<-db.snapshotRewriteChan
db.snapshotRewriteChan = nil
db.snapshotRewriteCancelFunc = nil
}

errs = append(errs, db.MultiTree.Close())

db.streamHandler = nil

if db.fileLock != nil {
Expand Down Expand Up @@ -682,7 +697,7 @@ func (db *DB) WriteSnapshot(dir string) error {
db.mtx.Lock()
defer db.mtx.Unlock()

return db.MultiTree.WriteSnapshot(dir, db.snapshotWriterPool)
return db.MultiTree.WriteSnapshot(context.Background(), dir, db.snapshotWriterPool)
}

func snapshotName(version int64) string {
Expand Down Expand Up @@ -782,7 +797,7 @@ func initEmptyDB(dir string, initialVersion uint32) error {
pool := pond.New(config.DefaultSnapshotWriterLimit, config.DefaultSnapshotWriterLimit*10)
defer pool.Stop()

if err := tmp.WriteSnapshot(filepath.Join(dir, snapshotDir), pool); err != nil {
if err := tmp.WriteSnapshot(context.Background(), filepath.Join(dir, snapshotDir), pool); err != nil {
return err
}
return updateCurrentSymlink(dir, snapshotDir)
Expand Down
7 changes: 4 additions & 3 deletions sc/memiavl/db_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"encoding/hex"
"os"
"path/filepath"
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestRewriteSnapshot(t *testing.T) {
require.NoError(t, err)
require.Equal(t, i+1, int(v))
require.Equal(t, RefHashes[i], db.lastCommitInfo.StoreInfos[0].CommitId.Hash)
require.NoError(t, db.RewriteSnapshot())
require.NoError(t, db.RewriteSnapshot(context.Background()))
require.NoError(t, db.Reload())
})
}
Expand Down Expand Up @@ -250,7 +251,7 @@ func TestInitialVersion(t *testing.T) {
// the nodes are created with version 1, which is compatible with iavl behavior: https://github.com/cosmos/iavl/pull/660
require.Equal(t, info.CommitId.Hash, HashNode(newLeafNode([]byte(key), []byte(value), 1)))

require.NoError(t, db.RewriteSnapshot())
require.NoError(t, db.RewriteSnapshot(context.Background()))
require.NoError(t, db.Reload())

db.ApplyUpgrades([]*proto.TreeNameUpgrade{{Name: name2}})
Expand Down Expand Up @@ -326,7 +327,7 @@ func TestZeroCopy(t *testing.T) {
_, err = db.Commit()
require.NoError(t, err)
require.NoError(t, errors.Join(
db.RewriteSnapshot(),
db.RewriteSnapshot(context.Background()),
db.Reload(),
))

Expand Down
3 changes: 2 additions & 1 deletion sc/memiavl/import.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -134,7 +135,7 @@ func doImport(dir string, version int64, nodes <-chan *types.SnapshotNode) (retu
return errors.New("version overflows uint32")
}

return writeSnapshot(dir, uint32(version), func(w *snapshotWriter) (uint32, error) {
return writeSnapshot(context.Background(), dir, uint32(version), func(w *snapshotWriter) (uint32, error) {
i := &importer{
snapshotWriter: *w,
}
Expand Down
4 changes: 2 additions & 2 deletions sc/memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio
return nil
}

func (t *MultiTree) WriteSnapshot(dir string, wp *pond.WorkerPool) error {
func (t *MultiTree) WriteSnapshot(ctx context.Context, dir string, wp *pond.WorkerPool) error {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}
Expand All @@ -367,7 +367,7 @@ func (t *MultiTree) WriteSnapshot(dir string, wp *pond.WorkerPool) error {
for _, entry := range t.trees {
tree, name := entry.Tree, entry.Name
group.Submit(func() error {
return tree.WriteSnapshot(filepath.Join(dir, name))
return tree.WriteSnapshot(ctx, filepath.Join(dir, name))
})
}

Expand Down
3 changes: 2 additions & 1 deletion sc/memiavl/proof_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"strconv"
"testing"

Expand Down Expand Up @@ -41,7 +42,7 @@ func TestProofs(t *testing.T) {
require.True(t, tree.VerifyNonMembership(proof, tc.nonExistKey))

// test persisted tree
require.NoError(t, tree.WriteSnapshot(tmpDir))
require.NoError(t, tree.WriteSnapshot(context.Background(), tmpDir))
snapshot, err := OpenSnapshot(tmpDir)
require.NoError(t, err)
ptree := NewFromSnapshot(snapshot, true, 0)
Expand Down
20 changes: 16 additions & 4 deletions sc/memiavl/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package memiavl

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -350,11 +351,12 @@ func (snapshot *Snapshot) export(callback func(*types.SnapshotNode) bool) {
}

// WriteSnapshot save the IAVL tree to a new snapshot directory.
func (t *Tree) WriteSnapshot(snapshotDir string) error {
return writeSnapshot(snapshotDir, t.version, func(w *snapshotWriter) (uint32, error) {
func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
return writeSnapshot(ctx, snapshotDir, t.version, func(w *snapshotWriter) (uint32, error) {
if t.root == nil {
return 0, nil
}

if err := w.writeRecursive(t.root); err != nil {
return 0, err
}
Expand All @@ -363,6 +365,7 @@ func (t *Tree) WriteSnapshot(snapshotDir string) error {
}

func writeSnapshot(
ctx context.Context,
dir string, version uint32,
doWrite func(*snapshotWriter) (uint32, error),
) (returnErr error) {
Expand Down Expand Up @@ -408,7 +411,7 @@ func writeSnapshot(
leavesWriter := bufio.NewWriterSize(fpLeaves, bufIOSize)
kvsWriter := bufio.NewWriterSize(fpKVs, bufIOSize)

w := newSnapshotWriter(nodesWriter, leavesWriter, kvsWriter)
w := newSnapshotWriter(ctx, nodesWriter, leavesWriter, kvsWriter)
leaves, err := doWrite(w)
if err != nil {
return err
Expand Down Expand Up @@ -461,6 +464,9 @@ func writeSnapshot(
}

type snapshotWriter struct {
// context for cancel the writing process
ctx context.Context

nodesWriter, leavesWriter, kvWriter io.Writer

// count how many nodes have been written
Expand All @@ -470,8 +476,9 @@ type snapshotWriter struct {
kvsOffset uint64
}

func newSnapshotWriter(nodesWriter, leavesWriter, kvsWriter io.Writer) *snapshotWriter {
func newSnapshotWriter(ctx context.Context, nodesWriter, leavesWriter, kvsWriter io.Writer) *snapshotWriter {
return &snapshotWriter{
ctx: ctx,
nodesWriter: nodesWriter,
leavesWriter: leavesWriter,
kvWriter: kvsWriter,
Expand Down Expand Up @@ -545,6 +552,11 @@ func (w *snapshotWriter) writeBranch(version, size uint32, height, preTrees uint
// writeRecursive write the node recursively in depth-first post-order,
// returns `(nodeIndex, err)`.
func (w *snapshotWriter) writeRecursive(node Node) error {
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}
if node.IsLeaf() {
return w.writeLeaf(node.Version(), node.Key(), node.Value(), node.Hash())
}
Expand Down
9 changes: 5 additions & 4 deletions sc/memiavl/snapshot_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"errors"
"testing"

Expand All @@ -21,7 +22,7 @@ func TestSnapshotEncodingRoundTrip(t *testing.T) {
}

snapshotDir := t.TempDir()
require.NoError(t, tree.WriteSnapshot(snapshotDir))
require.NoError(t, tree.WriteSnapshot(context.Background(), snapshotDir))

snapshot, err := OpenSnapshot(snapshotDir)
require.NoError(t, err)
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestSnapshotExport(t *testing.T) {
}

snapshotDir := t.TempDir()
require.NoError(t, tree.WriteSnapshot(snapshotDir))
require.NoError(t, tree.WriteSnapshot(context.Background(), snapshotDir))

snapshot, err := OpenSnapshot(snapshotDir)
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +101,7 @@ func TestSnapshotImportExport(t *testing.T) {
}

snapshotDir := t.TempDir()
require.NoError(t, tree.WriteSnapshot(snapshotDir))
require.NoError(t, tree.WriteSnapshot(context.Background(), snapshotDir))
snapshot, err := OpenSnapshot(snapshotDir)
require.NoError(t, err)

Expand Down Expand Up @@ -161,7 +162,7 @@ func TestDBSnapshotRestore(t *testing.T) {
testSnapshotRoundTrip(t, db)
}

require.NoError(t, db.RewriteSnapshot())
require.NoError(t, db.RewriteSnapshot(context.Background()))
require.NoError(t, db.Reload())
require.Equal(t, len(ChangeSets), int(db.metadata.CommitInfo.Version))
testSnapshotRoundTrip(t, db)
Expand Down
3 changes: 2 additions & 1 deletion sc/memiavl/tree_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -254,7 +255,7 @@ func TestGetByIndex(t *testing.T) {

// test persisted tree
dir := t.TempDir()
require.NoError(t, tree.WriteSnapshot(dir))
require.NoError(t, tree.WriteSnapshot(context.Background(), dir))
snapshot, err := OpenSnapshot(dir)
require.NoError(t, err)
ptree := NewFromSnapshot(snapshot, true, 0)
Expand Down
2 changes: 1 addition & 1 deletion ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (db *Database) Prune(version int64) error {
// Delete a key if another entry for that key exists a larger version than original but leq to the prune height
// Also delete a key if it has been tombstoned and its version is leq to the prune height
if prevVersionDecoded <= version && (bytes.Equal(prevKey, currKey) || valTombstoned(prevValEncoded)) {
err = batch.Delete(prevKeyEncoded, defaultWriteOpts)
err = batch.Delete(prevKeyEncoded, nil)
if err != nil {
return err
}
Expand Down

0 comments on commit 1076007

Please sign in to comment.