diff --git a/multitree.go b/multitree.go index ccfbcd4c9..edecd0695 100644 --- a/multitree.go +++ b/multitree.go @@ -8,7 +8,6 @@ import ( "sync/atomic" "github.com/cosmos/iavl/v2/metrics" - "github.com/dustin/go-humanize" "golang.org/x/exp/slices" ) @@ -186,10 +185,10 @@ func (mt *MultiTree) SaveVersionConcurrently() ([]byte, int64, error) { mt.shouldCheckpoint = false if mt.treeOpts.MetricsProxy != nil { - bz := workingBytes.Load() - sz := workingSize.Load() - fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n", - version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory)) + //bz := workingBytes.Load() + //sz := workingSize.Load() + //fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n", + // version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory)) mt.treeOpts.MetricsProxy.SetGauge(float32(workingBytes.Load()), "iavl_v2", "working_bytes") mt.treeOpts.MetricsProxy.SetGauge(float32(workingSize.Load()), "iavl_v2", "working_size") } diff --git a/node.go b/node.go index f27c208ec..66aa590ed 100644 --- a/node.go +++ b/node.go @@ -65,8 +65,8 @@ type Node struct { } func (node *Node) String() string { - return fmt.Sprintf("Node{hash: %x, nodeKey: %s, leftNodeKey: %v, rightNodeKey: %v, size: %d, subtreeHeight: %d, poolId: %d}", - node.hash, node.nodeKey, node.leftNodeKey, node.rightNodeKey, node.size, node.subtreeHeight, node.poolId) + return fmt.Sprintf("Node{hash: %x, nodeKey: %s, leftNodeKey: %v, rightNodeKey: %v, size: %d, subtreeHeight: %d, evict: %t, dirty: %t, poolId: %d}", + node.hash, node.nodeKey, node.leftNodeKey, node.rightNodeKey, node.size, node.subtreeHeight, node.evict, node.dirty, node.poolId) } func (node *Node) isLeaf() bool { @@ -107,6 +107,13 @@ func (node *Node) getLeftNode(t *Tree) (*Node, error) { if node.leftNode != nil { return node.leftNode, nil } + // check writer cache + var ok bool + node.leftNode, ok = t.sqlWriter.cachePop(node.leftNodeKey) + if ok { + return node.leftNode, nil + } + var err error node.leftNode, err = t.sql.getLeftNode(node) if err != nil { @@ -122,6 +129,13 @@ func (node *Node) getRightNode(t *Tree) (*Node, error) { if node.rightNode != nil { return node.rightNode, nil } + // check writer cache + var ok bool + node.rightNode, ok = t.sqlWriter.cachePop(node.rightNodeKey) + if ok { + return node.rightNode, nil + } + var err error node.rightNode, err = t.sql.getRightNode(node) if err != nil { diff --git a/pool.go b/pool.go index d68bd479e..db7e9b900 100644 --- a/pool.go +++ b/pool.go @@ -8,9 +8,7 @@ import ( type NodePool struct { syncPool *sync.Pool - free chan int - nodes []Node - + free chan int poolId uint64 } @@ -54,3 +52,16 @@ func (np *NodePool) Put(node *Node) { node.poolId = 0 np.syncPool.Put(node) } + +func (np *NodePool) clone(n *Node) *Node { + node := np.Get() + node.leftNodeKey = n.leftNodeKey + node.rightNodeKey = n.rightNodeKey + node.nodeKey = n.nodeKey + node.hash = n.hash + node.key = n.key + node.value = n.value + node.subtreeHeight = n.subtreeHeight + node.size = n.size + return node +} diff --git a/pool_test.go b/pool_test.go index d8a90ac0a..41a010cf9 100644 --- a/pool_test.go +++ b/pool_test.go @@ -6,11 +6,12 @@ import ( "github.com/stretchr/testify/require" ) -func TestNodePool_Get(t *testing.T) { +func Test_TheLimitsOfMySanity(t *testing.T) { pool := NewNodePool() node := pool.Get() node.key = []byte("hello") - require.Equal(t, node.key, pool.nodes[node.poolId].key) - pool.Put(node) - require.Equal(t, []byte(nil), pool.nodes[node.poolId].key) + n2 := pool.clone(node) + require.Equal(t, node.key, n2.key) + node.key = []byte("world") + require.NotEqual(t, node.key, n2.key) } diff --git a/sqlite.go b/sqlite.go index 78feff512..1a05514ad 100644 --- a/sqlite.go +++ b/sqlite.go @@ -61,8 +61,9 @@ func defaultSqliteDbOptions(opts SqliteDbOptions) SqliteDbOptions { opts.MmapSize = 8 * 1024 * 1024 * 1024 } if opts.WalSize == 0 { - opts.WalSize = 1024 * 1024 * 100 + opts.WalSize = 1024 * 1024 * 500 } + opts.ShardTrees = true opts.walPages = opts.WalSize / os.Getpagesize() return opts } @@ -722,8 +723,8 @@ func (sql *SqliteDb) getRightNode(node *Node) (*Node, error) { node.rightNode, err = sql.Get(node.rightNodeKey) if err != nil { - return nil, fmt.Errorf("failed to get right node node_key=%s height=%d path=%s: %w", - node.rightNodeKey, node.subtreeHeight, sql.opts.Path, err) + return nil, fmt.Errorf("failed to get right node node_key=%s for %v at path=%s: %w", + node.rightNodeKey, node, sql.opts.Path, err) } return node.rightNode, nil } @@ -743,7 +744,8 @@ func (sql *SqliteDb) getLeftNode(node *Node) (*Node, error) { node.leftNode, err = sql.Get(node.leftNodeKey) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get left node node_key=%s for %v at path=%s: %w", + node.leftNodeKey, node, sql.opts.Path, err) } return node.leftNode, err } diff --git a/sqlite_batch.go b/sqlite_batch.go index 304ca44fd..6e21bb786 100644 --- a/sqlite_batch.go +++ b/sqlite_batch.go @@ -10,7 +10,14 @@ import ( ) type sqliteBatch struct { - tree *Tree + // used in leaves, TODO remove + tree *Tree + + // used in branch checkpoint + version int64 + branchOrphans []NodeKey + branches []*Node + sql *SqliteDb size int64 logger zerolog.Logger @@ -33,11 +40,11 @@ func (b *sqliteBatch) newChangeLogBatch() (err error) { if err = b.sql.leafWrite.Begin(); err != nil { return err } - b.leafInsert, err = b.sql.leafWrite.Prepare("INSERT OR REPLACE INTO leaf (version, sequence, bytes) VALUES (?, ?, ?)") + b.leafInsert, err = b.sql.leafWrite.Prepare("INSERT INTO leaf (version, sequence, bytes) VALUES (?, ?, ?)") if err != nil { return err } - b.deleteInsert, err = b.sql.leafWrite.Prepare("INSERT OR REPLACE INTO leaf_delete (version, sequence, key) VALUES (?, ?, ?)") + b.deleteInsert, err = b.sql.leafWrite.Prepare("INSERT INTO leaf_delete (version, sequence, key) VALUES (?, ?, ?)") if err != nil { return err } @@ -233,30 +240,29 @@ func (b *sqliteBatch) saveLeaves() (int64, error) { } func (b *sqliteBatch) isCheckpoint() bool { - return len(b.tree.branches) > 0 + return len(b.branches) > 0 } func (b *sqliteBatch) saveBranches() (n int64, err error) { if b.isCheckpoint() { - tree := b.tree b.treeCount = 0 - shardID, err := tree.sql.nextShard(tree.version) + shardID, err := b.sql.nextShard(b.version) if err != nil { return 0, err } - b.logger.Debug().Msgf("checkpoint db=tree version=%d shard=%d orphans=%s", - tree.version, shardID, humanize.Comma(int64(len(tree.branchOrphans)))) + b.logger.Debug().Msgf("checkpoint version=%d shard=%d orphans=%s", + b.version, shardID, humanize.Comma(int64(len(b.branchOrphans)))) if err = b.newTreeBatch(shardID); err != nil { return 0, err } - for _, node := range tree.branches { + for _, node := range b.branches { b.treeCount++ bz, err := node.Bytes() if err != nil { - return 0, err + return 0, fmt.Errorf("failed to encode node: %v %w", node, err) } if err = b.treeInsert.Exec(node.nodeKey.Version(), int(node.nodeKey.Sequence()), bz); err != nil { return 0, err @@ -265,11 +271,12 @@ func (b *sqliteBatch) saveBranches() (n int64, err error) { return 0, err } if node.evict { - tree.returnNode(node) + // TODO, remove tree reference + b.tree.returnNode(node) } } - for _, orphan := range tree.branchOrphans { + for _, orphan := range b.branchOrphans { b.treeCount++ err = b.execBranchOrphan(orphan) if err != nil { diff --git a/sqlite_test.go b/sqlite_test.go index 1468e6868..275080c2e 100644 --- a/sqlite_test.go +++ b/sqlite_test.go @@ -3,12 +3,14 @@ package iavl import ( "fmt" "math/rand" + "os" "testing" "time" "github.com/bvinc/go-sqlite-lite/sqlite3" - "github.com/cosmos/iavl/v2/testutil" + "github.com/cosmos/iavl-bench/bench" "github.com/dustin/go-humanize" + api "github.com/kocubinski/costor-api" "github.com/stretchr/testify/require" ) @@ -45,202 +47,253 @@ and potentially re-balancing the BTree index. Writes: 245,000 nodes/sec Reads: 30,000 nodes/sec !!! +# Testing variables +- payload size +- PRIMARY KEY vs ROWID (index b-tree vs table b-tree) +- B*Tree index (WITHOUT ROWID + PRIMARY KEY) vs b-tree table (WITH ROWID) +- structured vs unstructured (sqlite raw columns vs bytes marshal) +- clustered reads (optimized for page size) vs uniformly distributed reads + */ var testDbLocation = "/tmp/sqlite_test" -func TestBuildSqlite(t *testing.T) { - //dir := t.TempDir() - dir := testDbLocation - t.Logf("using temp dir %s", dir) - - sql, err := NewSqliteDb(NewNodePool(), SqliteDbOptions{Path: dir}) - - require.NoError(t, err) +type sqliteTestFixture struct { + t *testing.T - gen := testutil.OsmoLike() - version1 := gen.Iterator.Nodes() - var count int - require.Equal(t, int64(1), gen.Iterator.Version()) + batchSize int + startTime time.Time + since time.Time + conn *sqlite3.Conn + gen bench.ChangesetGenerator +} - since := time.Now() +func newSqliteTestFixture(t *testing.T) *sqliteTestFixture { + return &sqliteTestFixture{t: t, batchSize: 200_000} +} - err = sql.leafWrite.Exec("CREATE TABLE node (seq INTEGER, version INTEGER, hash BLOB, key BLOB, height INTEGER, size INTEGER, l_seq INTEGER, l_version INTEGER, r_seq INTEGER, r_version INTEGER)") +func (f *sqliteTestFixture) init(dir string) { + t := f.t + require.NoError(t, os.RemoveAll(dir)) + require.NoError(t, os.MkdirAll(dir, 0o755)) + conn, err := sqlite3.Open(fmt.Sprintf("file:%s/sqlite.db", dir)) require.NoError(t, err) - - err = sql.leafWrite.Exec("CREATE INDEX trie_idx ON node (key)") - //err = sql.leafWrite.Exec("CREATE INDEX node_idx ON node (version, seq)") + // opt for typical page size instead of call to os + pageSize := os.Getpagesize() + // pageSize := 4 * 1024 + log.Info().Msgf("setting page size to %s", humanize.Bytes(uint64(pageSize))) + require.NoError(t, conn.Exec(fmt.Sprintf("PRAGMA page_size=%d; VACUUM;", pageSize))) + require.NoError(t, conn.Exec("PRAGMA journal_mode=WAL;")) + require.NoError(t, conn.Exec("PRAGMA synchronous=0;")) + require.NoError(t, conn.Exec(fmt.Sprintf("PRAGMA wal_autocheckpoint=%d;", (200*1000*1000)/pageSize))) + + q, err := conn.Prepare("PRAGMA synchronous;") require.NoError(t, err) - err = sql.leafWrite.Exec("CREATE INDEX tree_idx ON tree (version, sequence)") + hasRow, err := q.Step() require.NoError(t, err) + require.True(t, hasRow) + sync, ok, err := q.ColumnText(0) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, "0", sync) + require.NoError(t, q.Close()) - require.NoError(t, sql.leafWrite.Begin()) + f.conn = conn +} - var stmt *sqlite3.Stmt - //stmt, err = sql.leafWrite.Prepare("INSERT INTO tree(version, sequence, bytes) VALUES (?, ?, ?)") - stmt, err = sql.leafWrite.Prepare("INSERT INTO node(version, seq, hash, key, height, size, l_seq, l_version, r_seq, r_version)" + - "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") +func (f *sqliteTestFixture) maybeWrite(count int) { + if count%f.batchSize == 0 { + require.NoError(f.t, f.conn.Commit()) + require.NoError(f.t, f.conn.Begin()) + log.Info().Msgf("nodes=%s dur=%s; rate=%s", + humanize.Comma(int64(count)), + time.Since(f.since).Round(time.Millisecond), + humanize.Comma(int64(float64(f.batchSize)/time.Since(f.since).Seconds()))) + f.since = time.Now() + } +} +func (f *sqliteTestFixture) finishWrite(title string) { + require.NoError(f.t, f.conn.Commit()) + f.t.Logf("_%s_\ndur=%s rate=%s\ngen=%+v", + title, + time.Since(f.startTime).Round(time.Millisecond), + humanize.Comma(int64(float64(f.gen.InitialSize)/time.Since(f.startTime).Seconds())), + f.gen, + ) + require.NoError(f.t, f.conn.Close()) +} + +func (f *sqliteTestFixture) startBatch() { + f.startTime = time.Now() + f.since = time.Now() + require.NoError(f.t, f.conn.Begin()) +} + +func (f *sqliteTestFixture) genNodes(itr api.NodeIterator) []*api.Node { + f.t.Logf("generating %s nodes", humanize.Comma(int64(f.gen.InitialSize))) + nodes := make([]*api.Node, f.gen.InitialSize) + var i int + var err error + for ; itr.Valid(); err = itr.Next() { + require.NoError(f.t, err) + n := itr.GetNode() + n.Value = nil + nodes[i] = n + i++ + } + return nodes +} + +func (f *sqliteTestFixture) writeStructuredTableBTree(pregenNodes bool) { + t := f.t + itr, err := f.gen.Iterator() + require.NoError(t, err) + require.Equal(t, int64(1), itr.Version()) + + err = f.conn.Exec(` +CREATE TABLE node ( + version INTEGER, + seq INTEGER, + hash BLOB, + key BLOB, + value BLOB, + height INTEGER, + size INTEGER, + l_seq INTEGER, l_version INTEGER, + r_seq INTEGER, r_version INTEGER +)`) require.NoError(t, err) - startTime := time.Now() - batchSize := 200_000 - //nodeBz := new(bytes.Buffer) - for ; version1.Valid(); err = version1.Next() { - node := version1.GetNode() - lnk := NewNodeKey(1, uint32(count+1)) - rnk := NewNodeKey(1, uint32(count+2)) - n := &Node{key: node.Key, hash: node.Key[:32], - subtreeHeight: 13, size: 4, leftNodeKey: lnk, rightNodeKey: rnk} - - //nodeBz.Reset() - //require.NoError(t, n.WriteBytes(nodeBz)) - - // tree table - //nk := NewNodeKey(1, uint32(count)) - //nodeBz, err := n.Bytes() - //require.NoError(t, err) - //err = stmt.Exec(int(nk.Version()), int(nk.Sequence()), nodeBz) - //require.NoError(t, err) + stmt, err := f.conn.Prepare("INSERT INTO node(version, seq, hash, key, height, size, l_seq, l_version, r_seq, r_version)" + + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + var count int + write := func(node *api.Node) { // node table err = stmt.Exec( - 1, // version - count, // seq - n.key[:32], // hash - n.key, // key - 13, // height - 4, // size - count+1, // l_seq - 1, // l_version - count+2, // r_seq - 1, // r_version + 1, // version + count, // seq + node.Key[:32], // hash + node.Key, // key + 13, // height + 4, // size + count+1, // l_seq + 1, // l_version + count+2, // r_seq + 1, // r_version ) + require.NoError(t, err) - if count%batchSize == 0 { - err := sql.leafWrite.Commit() + f.maybeWrite(count) + count++ + } + + f.startBatch() + version1 := itr.Nodes() + require.NoError(t, err) + + if pregenNodes { + require.Equal(t, int64(1), itr.Version()) + nodes := f.genNodes(version1) + for _, node := range nodes { + write(node) + } + } else { + for ; version1.Valid(); err = version1.Next() { require.NoError(t, err) - //stmt, err = newBatch() - //require.NoError(t, err) - require.NoError(t, sql.leafWrite.Begin()) - log.Info().Msgf("nodes=%s dur=%s; rate=%s", - humanize.Comma(int64(count)), - time.Since(since).Round(time.Millisecond), - humanize.Comma(int64(float64(batchSize)/time.Since(since).Seconds()))) - since = time.Now() + write(version1.GetNode()) } - count++ - require.NoError(t, err) } - log.Info().Msg("final commit") - require.NoError(t, sql.leafWrite.Commit()) - log.Info().Msgf("total dur=%s rate=%s", - time.Since(startTime).Round(time.Millisecond), - humanize.Comma(int64(40_000_000/time.Since(startTime).Seconds())), - ) require.NoError(t, stmt.Close()) - require.NoError(t, sql.Close()) + f.finishWrite("write structured table b-tree") } -func TestReadSqlite_Trie(t *testing.T) { - dir := testDbLocation - sql, err := NewSqliteDb(NewNodePool(), SqliteDbOptions{Path: dir}) +func (f *sqliteTestFixture) readStructuredBTree(dir string, chunkSize int, tableBTree bool) { + t := f.t + var err error + conn, err := sqlite3.Open(fmt.Sprintf("file:%s/sqlite.db", dir)) require.NoError(t, err) + require.NoError(t, conn.Exec(fmt.Sprintf("PRAGMA mmap_size=%d", 4*1000*1000*1000))) + require.NoError(t, conn.Exec("PRAGMA page_cache=10000;")) - read, err := sql.getReadConn() + var stmt *sqlite3.Stmt require.NoError(t, err) - - query, err := read.Prepare("SELECT version, seq, hash, key, height, size, l_seq, l_version, r_seq, r_version FROM node WHERE key = ?") + if tableBTree { + stmt, err = conn.Prepare("SELECT hash, key, height, size, l_seq, l_version, r_seq, r_version FROM node WHERE ROWID = ?") + } else { + stmt, err = conn.Prepare("SELECT hash, key, height, size, l_seq, l_version, r_seq, r_version FROM node WHERE seq = ? AND version = ?") + } require.NoError(t, err) var hash, key []byte - var version, seq, height, size, lSeq, lVersion, rSeq, rVersion int + var height, size, lSeq, lVersion, rSeq, rVersion int - i := int64(1) since := time.Now() - gen := testutil.OsmoLike() - version1 := gen.Iterator.Nodes() - for ; version1.Valid(); err = version1.Next() { - node := version1.GetNode() - require.NoError(t, query.Bind(node.Key)) - hasRow, err := query.Step() - require.NoError(t, err) - require.True(t, hasRow) - require.NoError(t, query.Scan(&version, &seq, &hash, &key, &height, &size, &lSeq, &lVersion, &rSeq, &rVersion)) - require.NoError(t, err) + startTime := time.Now() + for i := 0; i < f.gen.InitialSize; { + j := rand.Intn(f.gen.InitialSize-chunkSize-1) + chunkSize + 1 - if i%100_000 == 0 { + for m := 0; m < chunkSize; m++ { i++ - log.Info().Msgf("nodes=%s dur=%s; rate=%s", - humanize.Comma(i), - time.Since(since), - humanize.Comma(int64(float64(100_000)/time.Since(since).Seconds()))) - since = time.Now() + if tableBTree { + require.NoError(t, stmt.Bind(j+m)) + } else { + require.NoError(t, stmt.Bind(j+m, 1)) + } + hasRow, err := stmt.Step() + require.NoError(t, err) + require.Truef(t, hasRow, "no row for %d", j+m) + require.NoError(t, stmt.Scan(&hash, &key, &height, &size, &lSeq, &lVersion, &rSeq, &rVersion)) + require.NoError(t, stmt.Reset()) + if i%250_000 == 0 { + log.Info().Msgf("nodes=%s dur=%s; rate=%s", + humanize.Comma(int64(i)), + time.Since(since), + humanize.Comma(int64(float64(250_000)/time.Since(since).Seconds()))) + since = time.Now() + } } - require.NoError(t, query.Reset()) - i++ } + t.Logf("_structured read_\nuse-primary-key=%t chunk-size=%d dur=%s rate=%s\ngen=%+v", + tableBTree, chunkSize, + time.Since(startTime).Round(time.Millisecond), + humanize.Comma(int64(float64(f.gen.InitialSize)/time.Since(startTime).Seconds())), + f.gen, + ) + + require.NoError(t, stmt.Close()) + require.NoError(t, conn.Close()) } -func TestReadSqlite(t *testing.T) { - //pool := NewNodePool() - //dir := t.TempDir() - var err error +func TestSqlite_ReadWriteUpdate_Performance(t *testing.T) { dir := testDbLocation t.Logf("using temp dir %s", dir) - sql, err := NewSqliteDb(NewNodePool(), SqliteDbOptions{Path: dir}) - require.NoError(t, err) - - var stmt *sqlite3.Stmt - //stmt, err = sql.leafWrite.Prepare("SELECT bytes FROM tree WHERE node_key = ?") - - sqlRead, err := sql.getReadConn() - require.NoError(t, err) - //stmt, err = sqlRead.Prepare("SELECT bytes FROM tree WHERE version = ? AND sequence = ?") - stmt, err = sqlRead.Prepare("SELECT hash, key, height, size, l_seq, l_version, r_seq, r_version FROM node WHERE seq = ? AND version = ?") - require.NoError(t, err) - - var hash, key []byte - var height, size, lSeq, lVersion, rSeq, rVersion int - - since := time.Now() - for i := 1; i < 40_000_000; i++ { - j := rand.Intn(40_000_000) - - // unstructured leafRead: - //nk := NewNodeKey(1, uint32(j)) - //require.NoError(t, stmt.Bind(1, j)) - //hasRow, err := stmt.Step() - //require.Truef(t, hasRow, "no row for %d", j) - //require.NoError(t, err) - //nodeBz, err := stmt.ColumnBlob(0) - //require.NoError(t, err) - //_, err = MakeNode(pool, nk, nodeBz) - //require.NoError(t, err) - - // structured leafRead: - require.NoError(t, stmt.Bind(j, 1)) - hasRow, err := stmt.Step() - require.NoError(t, err) - require.True(t, hasRow) - require.NoError(t, stmt.Scan(&hash, &key, &height, &size, &lSeq, &lVersion, &rSeq, &rVersion)) - - if i%100_000 == 0 { - log.Info().Msgf("nodes=%s dur=%s; rate=%s", - humanize.Comma(int64(i)), - time.Since(since), - humanize.Comma(int64(float64(100_000)/time.Since(since).Seconds()))) - since = time.Now() - } - require.NoError(t, stmt.Reset()) + gen := bench.ChangesetGenerator{ + StoreKey: "test", + Seed: 1234, + KeyMean: 1500, + KeyStdDev: 3, + ValueMean: 5, + ValueStdDev: 3, + InitialSize: 7_000_000, + FinalSize: 40_100_000, + Versions: 10, + ChangePerVersion: 100, + DeleteFraction: 0.25, } + fix := newSqliteTestFixture(t) + fix.gen = gen + + // fix.init(dir) + // fix.writeStructuredTableBTree(true) + fix.readStructuredBTree(dir, 1, true) - //gen := testutil.OsmoLike() - //version1 := gen.TreeIterator.Nodes() - //var count int - //require.Equal(t, int64(1), gen.TreeIterator.Version()) + // fix.init(dir) + // fix.writeStructuredTableBTree() + // fix.readStructuredBTree(dir, 16, true) } func TestNodeKeyFormat(t *testing.T) { diff --git a/sqlite_writer.go b/sqlite_writer.go index 0a49b1055..c736a2e16 100644 --- a/sqlite_writer.go +++ b/sqlite_writer.go @@ -2,8 +2,10 @@ package iavl import ( "context" - "errors" "fmt" + "os" + "strings" + "sync/atomic" "time" "github.com/bvinc/go-sqlite-lite/sqlite3" @@ -19,7 +21,6 @@ type pruneSignal struct { type saveSignal struct { batch *sqliteBatch root *Node - version int64 wantCheckpoint bool } @@ -31,6 +32,7 @@ type saveResult struct { type sqlWriter struct { sql *SqliteDb logger zerolog.Logger + cache map[NodeKey]*Node treePruneCh chan *pruneSignal treeCh chan *saveSignal @@ -39,6 +41,8 @@ type sqlWriter struct { leafPruneCh chan *pruneSignal leafCh chan *saveSignal leafResult chan *saveResult + + saving atomic.Bool } func (sql *SqliteDb) newSQLWriter() *sqlWriter { @@ -50,13 +54,14 @@ func (sql *SqliteDb) newSQLWriter() *sqlWriter { treeCh: make(chan *saveSignal), leafResult: make(chan *saveResult), treeResult: make(chan *saveResult), + cache: make(map[NodeKey]*Node), logger: sql.logger.With().Str("module", "write").Logger(), } } func (w *sqlWriter) start(ctx context.Context) { go func() { - err := w.treeLoop(ctx) + err := w.treeLoopAsyncCommit(ctx) if err != nil { w.logger.Fatal().Err(err).Msg("tree loop failed") } @@ -300,20 +305,33 @@ func (w *sqlWriter) treeLoop(ctx context.Context) error { return nil } saveTree := func(sig *saveSignal) { - res := &saveResult{} - res.n, res.err = sig.batch.saveBranches() - if res.err == nil { - err := w.sql.SaveRoot(sig.version, sig.root, sig.wantCheckpoint) - if err != nil { - res.err = fmt.Errorf("failed to save root path=%s version=%d: %w", w.sql.opts.Path, sig.version, err) - } + err := w.sql.SaveRoot(sig.batch.version, sig.root, sig.wantCheckpoint) + if err != nil { + w.treeResult <- &saveResult{err: fmt.Errorf("failed to save root path=%s version=%d: %w", w.sql.opts.Path, sig.batch.version, err)} + return } - if sig.batch.isCheckpoint() { - if err := w.sql.treeWrite.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { - res.err = fmt.Errorf("failed tree checkpoint; %w", err) - } + if !sig.batch.isCheckpoint() { + w.treeResult <- &saveResult{n: 0} + return } - w.treeResult <- res + + w.logger.Debug().Msgf("will save branches=%d", len(sig.batch.branches)) + w.saving.Store(true) + w.treeResult <- &saveResult{n: -34} + + n, err := sig.batch.saveBranches() + if err != nil { + w.treeResult <- &saveResult{err: fmt.Errorf("failed to save branches; path=%s %w", w.sql.opts.Path, err)} + w.saving.Store(false) + return + } + if err := w.sql.treeWrite.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { + w.treeResult <- &saveResult{err: fmt.Errorf("failed tree checkpoint; %w", err)} + w.saving.Store(false) + } + w.logger.Debug().Msgf("tree save done branches=%d", n) + w.saving.Store(false) + w.treeResult <- &saveResult{n: n} } startPrune := func(startPruningVersion int64) error { w.logger.Debug().Msgf("tree prune to version=%d", startPruningVersion) @@ -435,9 +453,37 @@ func (w *sqlWriter) treeLoop(ctx context.Context) error { } } +func (w *sqlWriter) treeLoopAsyncCommit(ctx context.Context) error { + for { + select { + case sig := <-w.treeCh: + res := &saveResult{} + res.n, res.err = w.saveCheckpoint(sig.batch, sig.root) + w.treeResult <- res + + case <-ctx.Done(): + return nil + } + } +} + +func (w *sqlWriter) saveCheckpoint(batch *sqliteBatch, root *Node) (int64, error) { + n, err := batch.saveBranches() + if err != nil { + return n, fmt.Errorf("batch.saveBranches failed; %w", err) + } + if err := w.sql.treeWrite.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { + return n, fmt.Errorf("wal_checkpoint failed; %w", err) + } + err = w.sql.SaveRoot(batch.version, root, true) + if err != nil { + return n, fmt.Errorf("sql.SaveRoot failed node=%v; %w", root, err) + } + return n, err +} + func (w *sqlWriter) saveTree(tree *Tree) error { saveStart := time.Now() - batch := &sqliteBatch{ sql: tree.sql, tree: tree, @@ -445,20 +491,94 @@ func (w *sqlWriter) saveTree(tree *Tree) error { logger: log.With(). Str("module", "sqlite-batch"). Str("path", tree.sql.opts.Path).Logger(), + version: tree.version, } - saveSig := &saveSignal{batch: batch, root: tree.root, version: tree.version, wantCheckpoint: tree.shouldCheckpoint} - w.treeCh <- saveSig + + select { + case treeRes := <-w.treeResult: + if treeRes.err != nil { + return fmt.Errorf("err from last checkpoint: %w", treeRes.err) + } + w.logger.Debug().Msgf("ACK last checkpoint branches=%d", treeRes.n) + // TODO empty cache if successful result found + + default: + } + + saveSig := &saveSignal{batch: batch, root: tree.pool.clone(tree.root), wantCheckpoint: tree.shouldCheckpoint} w.leafCh <- saveSig - treeResult := <-w.treeResult + + if tree.shouldCheckpoint { + // nextCache := make(map[NodeKey]*Node) + // TODO! + // for caching, need a double copy. + // 1) once to freeze state for save + // 2) once to cache; if an evicted node is pulled from cache and mutated it will result in a bad write + for _, branch := range tree.branches { + batch.branches = append(batch.branches, tree.pool.clone(branch)) + // if branch.evict { + // branch.leftNode = nil + // branch.rightNode = nil + // nextCache[branch.nodeKey] = branch + // } + } + // if err := debugDump(tree.sql.opts.Path, tree.version, nextCache); err != nil { + // return err + // } + + // wait for prior save to complete + startWait := time.Now() + w.treeCh <- saveSig + w.logger.Debug().Msgf("save signal sent, waited %s", time.Since(startWait).Round(time.Millisecond)) + + // swap caches + // for _, n := range w.cache { + // tree.returnNode(n) + // } + // w.cache = nextCache + + // for _, node := range tree.branches { + // if node.evict { + // evictCount++ + // } + // } + // w.logger.Debug().Msgf("saving checkpoint version=%d branches=%d evict=%d", + // tree.version, len(tree.branches), evictCount) + } + leafResult := <-w.leafResult dur := time.Since(saveStart) tree.sql.metrics.WriteDurations = append(tree.sql.metrics.WriteDurations, dur) tree.sql.metrics.WriteTime += dur tree.sql.metrics.WriteLeaves += int64(len(tree.leaves)) - err := errors.Join(treeResult.err, leafResult.err) + return leafResult.err +} + +func (w *sqlWriter) cachePop(key NodeKey) (*Node, bool) { + n, ok := w.cache[key] + if ok { + delete(w.cache, key) + } + return n, ok +} - return err +func debugDump(path string, version int64, cache map[NodeKey]*Node) error { + module := strings.Split(path, "/")[len(strings.Split(path, "/"))-1] + if module == "slashing" || module == "lockup" { + f, err := os.Create(fmt.Sprintf("%s-%d.txt", module, version)) + if err != nil { + return err + } + defer f.Close() + for k := range cache { + _, err := f.WriteString(fmt.Sprintf("%s\n", k)) + if err != nil { + return err + } + } + } + return nil } // TODO diff --git a/tree.go b/tree.go index 0bac6fb65..3287c724a 100644 --- a/tree.go +++ b/tree.go @@ -242,7 +242,7 @@ func (tree *Tree) deepHash(node *Node, depth int8) { if node.hash != nil { return } - } else { + } else if node.nodeKey.Version() > tree.checkpoints.Last() { // otherwise accumulate the branch node for checkpointing tree.branches = append(tree.branches, node) @@ -282,7 +282,7 @@ func (tree *Tree) deepHash(node *Node, depth int8) { } // finally, if checkpointing, remove node's children from memory if we're at the eviction height - if tree.shouldCheckpoint { + if tree.shouldCheckpoint && tree.evictionDepth > 0 { if depth >= tree.evictionDepth { node.evictChildren() }