Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async checkpoint poc used in devmos 2024 benchmarks #1001

Draft
wants to merge 2 commits into
base: nightly/v2.0.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync/atomic"

"github.com/cosmos/iavl/v2/metrics"
"github.com/dustin/go-humanize"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -186,10 +185,10 @@ func (mt *MultiTree) SaveVersionConcurrently() ([]byte, int64, error) {
mt.shouldCheckpoint = false

if mt.treeOpts.MetricsProxy != nil {
bz := workingBytes.Load()
sz := workingSize.Load()
fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n",
version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory))
//bz := workingBytes.Load()
//sz := workingSize.Load()
//fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n",
// version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory))
mt.treeOpts.MetricsProxy.SetGauge(float32(workingBytes.Load()), "iavl_v2", "working_bytes")
mt.treeOpts.MetricsProxy.SetGauge(float32(workingSize.Load()), "iavl_v2", "working_size")
}
Expand Down
18 changes: 16 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type Node struct {
}

func (node *Node) String() string {
return fmt.Sprintf("Node{hash: %x, nodeKey: %s, leftNodeKey: %v, rightNodeKey: %v, size: %d, subtreeHeight: %d, poolId: %d}",
node.hash, node.nodeKey, node.leftNodeKey, node.rightNodeKey, node.size, node.subtreeHeight, node.poolId)
return fmt.Sprintf("Node{hash: %x, nodeKey: %s, leftNodeKey: %v, rightNodeKey: %v, size: %d, subtreeHeight: %d, evict: %t, dirty: %t, poolId: %d}",
node.hash, node.nodeKey, node.leftNodeKey, node.rightNodeKey, node.size, node.subtreeHeight, node.evict, node.dirty, node.poolId)
}

func (node *Node) isLeaf() bool {
Expand Down Expand Up @@ -107,6 +107,13 @@ func (node *Node) getLeftNode(t *Tree) (*Node, error) {
if node.leftNode != nil {
return node.leftNode, nil
}
// check writer cache
var ok bool
node.leftNode, ok = t.sqlWriter.cachePop(node.leftNodeKey)
if ok {
return node.leftNode, nil
}

var err error
node.leftNode, err = t.sql.getLeftNode(node)
if err != nil {
Expand All @@ -122,6 +129,13 @@ func (node *Node) getRightNode(t *Tree) (*Node, error) {
if node.rightNode != nil {
return node.rightNode, nil
}
// check writer cache
var ok bool
node.rightNode, ok = t.sqlWriter.cachePop(node.rightNodeKey)
if ok {
return node.rightNode, nil
}

var err error
node.rightNode, err = t.sql.getRightNode(node)
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
type NodePool struct {
syncPool *sync.Pool

free chan int
nodes []Node

free chan int
poolId uint64
}

Expand Down Expand Up @@ -54,3 +52,16 @@ func (np *NodePool) Put(node *Node) {
node.poolId = 0
np.syncPool.Put(node)
}

func (np *NodePool) clone(n *Node) *Node {
node := np.Get()
node.leftNodeKey = n.leftNodeKey
node.rightNodeKey = n.rightNodeKey
node.nodeKey = n.nodeKey
node.hash = n.hash
node.key = n.key
node.value = n.value
node.subtreeHeight = n.subtreeHeight
node.size = n.size
return node
}
9 changes: 5 additions & 4 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"github.com/stretchr/testify/require"
)

func TestNodePool_Get(t *testing.T) {
func Test_TheLimitsOfMySanity(t *testing.T) {
pool := NewNodePool()
node := pool.Get()
node.key = []byte("hello")
require.Equal(t, node.key, pool.nodes[node.poolId].key)
pool.Put(node)
require.Equal(t, []byte(nil), pool.nodes[node.poolId].key)
n2 := pool.clone(node)
require.Equal(t, node.key, n2.key)
node.key = []byte("world")
require.NotEqual(t, node.key, n2.key)
}
10 changes: 6 additions & 4 deletions sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func defaultSqliteDbOptions(opts SqliteDbOptions) SqliteDbOptions {
opts.MmapSize = 8 * 1024 * 1024 * 1024
}
if opts.WalSize == 0 {
opts.WalSize = 1024 * 1024 * 100
opts.WalSize = 1024 * 1024 * 500
}
opts.ShardTrees = true
opts.walPages = opts.WalSize / os.Getpagesize()
return opts
}
Expand Down Expand Up @@ -722,8 +723,8 @@ func (sql *SqliteDb) getRightNode(node *Node) (*Node, error) {

node.rightNode, err = sql.Get(node.rightNodeKey)
if err != nil {
return nil, fmt.Errorf("failed to get right node node_key=%s height=%d path=%s: %w",
node.rightNodeKey, node.subtreeHeight, sql.opts.Path, err)
return nil, fmt.Errorf("failed to get right node node_key=%s for %v at path=%s: %w",
node.rightNodeKey, node, sql.opts.Path, err)
}
return node.rightNode, nil
}
Expand All @@ -743,7 +744,8 @@ func (sql *SqliteDb) getLeftNode(node *Node) (*Node, error) {

node.leftNode, err = sql.Get(node.leftNodeKey)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get left node node_key=%s for %v at path=%s: %w",
node.leftNodeKey, node, sql.opts.Path, err)
}
return node.leftNode, err
}
Expand Down
31 changes: 19 additions & 12 deletions sqlite_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
)

type sqliteBatch struct {
tree *Tree
// used in leaves, TODO remove
tree *Tree

// used in branch checkpoint
version int64
branchOrphans []NodeKey
branches []*Node

sql *SqliteDb
size int64
logger zerolog.Logger
Expand All @@ -33,11 +40,11 @@ func (b *sqliteBatch) newChangeLogBatch() (err error) {
if err = b.sql.leafWrite.Begin(); err != nil {
return err
}
b.leafInsert, err = b.sql.leafWrite.Prepare("INSERT OR REPLACE INTO leaf (version, sequence, bytes) VALUES (?, ?, ?)")
b.leafInsert, err = b.sql.leafWrite.Prepare("INSERT INTO leaf (version, sequence, bytes) VALUES (?, ?, ?)")
if err != nil {
return err
}
b.deleteInsert, err = b.sql.leafWrite.Prepare("INSERT OR REPLACE INTO leaf_delete (version, sequence, key) VALUES (?, ?, ?)")
b.deleteInsert, err = b.sql.leafWrite.Prepare("INSERT INTO leaf_delete (version, sequence, key) VALUES (?, ?, ?)")
if err != nil {
return err
}
Expand Down Expand Up @@ -233,30 +240,29 @@ func (b *sqliteBatch) saveLeaves() (int64, error) {
}

func (b *sqliteBatch) isCheckpoint() bool {
return len(b.tree.branches) > 0
return len(b.branches) > 0
}

func (b *sqliteBatch) saveBranches() (n int64, err error) {
if b.isCheckpoint() {
tree := b.tree
b.treeCount = 0

shardID, err := tree.sql.nextShard(tree.version)
shardID, err := b.sql.nextShard(b.version)
if err != nil {
return 0, err
}
b.logger.Debug().Msgf("checkpoint db=tree version=%d shard=%d orphans=%s",
tree.version, shardID, humanize.Comma(int64(len(tree.branchOrphans))))
b.logger.Debug().Msgf("checkpoint version=%d shard=%d orphans=%s",
b.version, shardID, humanize.Comma(int64(len(b.branchOrphans))))

if err = b.newTreeBatch(shardID); err != nil {
return 0, err
}

for _, node := range tree.branches {
for _, node := range b.branches {
b.treeCount++
bz, err := node.Bytes()
if err != nil {
return 0, err
return 0, fmt.Errorf("failed to encode node: %v %w", node, err)
}
if err = b.treeInsert.Exec(node.nodeKey.Version(), int(node.nodeKey.Sequence()), bz); err != nil {
return 0, err
Expand All @@ -265,11 +271,12 @@ func (b *sqliteBatch) saveBranches() (n int64, err error) {
return 0, err
}
if node.evict {
tree.returnNode(node)
// TODO, remove tree reference
b.tree.returnNode(node)
}
}

for _, orphan := range tree.branchOrphans {
for _, orphan := range b.branchOrphans {
b.treeCount++
err = b.execBranchOrphan(orphan)
if err != nil {
Expand Down
Loading
Loading