Skip to content

Commit

Permalink
Merge pull request #136 from Taraxa-project/prune_light_node
Browse files Browse the repository at this point in the history
chore: prune light node
mfrankovi authored Jan 23, 2023
2 parents 8dd99aa + dbbe193 commit 79439f8
Showing 5 changed files with 213 additions and 0 deletions.
2 changes: 2 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@ const (
HashLength = 32
// AddressLength is the expected length of the address
AddressLength = 20
// VersionedKeyLength is the expected length of the versioned key
VersionedKeyLength = 40
)

var (
16 changes: 16 additions & 0 deletions taraxa/C/state.go
Original file line number Diff line number Diff line change
@@ -362,6 +362,22 @@ func taraxa_evm_state_api_db_snapshot(
util.PanicIfNotNil(db.Snapshot(dir, log_size_for_flush))
}

//export taraxa_evm_state_api_prune
func taraxa_evm_state_api_prune(
ptr C.taraxa_evm_state_API_ptr,
params_enc C.taraxa_evm_Bytes,
cb_err C.taraxa_evm_BytesCallback,
) {
defer handle_err(cb_err)
var params struct {
StateRootToKeep common.Hash
StateRootToPrune []common.Hash
BlkNum types.BlockNum
}
dec_rlp(params_enc, &params)
state_API_instances[ptr].db.Prune(params.StateRootToKeep, params.StateRootToPrune, params.BlkNum)
}

type state_API_ptr = byte

const state_API_max_instances = ^state_API_ptr(0)
25 changes: 25 additions & 0 deletions taraxa/state/state_db/block_reader.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,31 @@ func (self ExtendedReader) ForEachStorage(addr *common.Address, f func(*common.H
})
}

func (self ExtendedReader) ForEachAccountNodeHashByRoot(storage_root *common.Hash, f func(*common.Hash)) {
if storage_root == nil {
return
}
no_addr := common.Address{}
trie.Reader{AccountTrieSchema{}}.ForEachNodeHash(
AccountTrieInputAdapter{&no_addr, self},
storage_root,
func(hash *common.Hash) {
f(hash)
})
}

func (self ExtendedReader) ForEachMainNodeHashByRoot(storage_root *common.Hash, f func(*common.Hash)) {
if storage_root == nil {
return
}
trie.Reader{MainTrieSchema{}}.ForEachNodeHash(
MainTrieInputAdapter{self},
storage_root,
func(hash *common.Hash) {
f(hash)
})
}

type Proof struct {
AccountProof trie.Proof
StorageProofs []trie.Proof
144 changes: 144 additions & 0 deletions taraxa/state/state_db_rocksdb/db.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package state_db_rocksdb

import (
"bytes"
"encoding/binary"
"math/big"
"runtime"
"strconv"
@@ -112,6 +113,149 @@ func (self *DB) Snapshot(dir string, log_size_for_flush uint64) error {
return c.CreateCheckpoint(dir, log_size_for_flush)
}

func (self *DB) Prune(state_root_to_keep common.Hash, state_root_to_prune []common.Hash, blk_num types.BlockNum) {
var member struct{}
set_node_to_keep := make(map[common.Hash]struct{})
set_node_to_remove := make(map[common.Hash]struct{})
blk_num_counter := blk_num

//Select nodes which are not to be deleted
set_node_to_keep[state_root_to_keep] = member
state_db.GetBlockState(self, blk_num_counter).ForEachMainNodeHashByRoot(&state_root_to_keep, func(h *common.Hash) {
set_node_to_keep[*h] = member
})
blk_num_counter--

//Select nodes from older blocks to remove only if they are not in set_node_to_keep
for _, root_to_prune := range state_root_to_prune {
state_db.GetBlockState(self, blk_num_counter).ForEachMainNodeHashByRoot(&root_to_prune, func(h *common.Hash) {
if _, ok := set_node_to_keep[*h]; !ok {
set_node_to_remove[*h] = member
}
})
blk_num_counter--
}

//Select main trie values to prune/remove
set_value_to_keep := make(map[string]struct{})
set_value_to_prune := make(map[string]struct{})
set_storage_root_to_keep := make(map[common.Hash]struct{})
set_storage_root_to_prune := make(map[common.Hash]struct{})

itr := self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_main_trie_value])
itr.SeekToFirst()
prev_key := make([]byte, common.VersionedKeyLength)
for itr.Valid() {
account := state_db.DecodeAccountFromTrie(itr.Value().Data())
copy(prev_key, itr.Key().Data())
itr.Next()
keep := true
if itr.Valid() {
acc := make([]byte, common.HashLength)
copy(acc, itr.Key().Data()[0:common.HashLength])
ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength])
if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 {
//Only prune the previous value if current value for the same account is below blk_num
if ver_blk_num < blk_num {
keep = false
}
}
}
if keep {
set_value_to_keep[string(prev_key)] = member
if account.StorageRootHash != nil {
set_storage_root_to_keep[*account.StorageRootHash] = member
}
} else {
set_value_to_prune[string(prev_key)] = member
if account.StorageRootHash != nil {
set_storage_root_to_prune[*account.StorageRootHash] = member
}
}
}

//Select account nodes to prune
set_account_node_to_keep := make(map[common.Hash]struct{})
set_account_node_to_remove := make(map[common.Hash]struct{})
for root_to_keep, _ := range set_storage_root_to_keep {
set_account_node_to_keep[root_to_keep] = member
state_db.GetBlockState(self, blk_num_counter).ForEachAccountNodeHashByRoot(&root_to_keep, func(h *common.Hash) {
set_account_node_to_keep[*h] = member
})
}
for root_to_prune, _ := range set_storage_root_to_prune {
if _, ok := set_account_node_to_keep[root_to_prune]; !ok {
state_db.GetBlockState(self, blk_num_counter).ForEachAccountNodeHashByRoot(&root_to_prune, func(h *common.Hash) {
if _, ok := set_account_node_to_keep[*h]; !ok {
set_account_node_to_remove[*h] = member
}
})
}
}

//Select account storage values to prune
set_account_storage_value_to_keep := make(map[string]struct{})
set_account_storage_value_to_prune := make(map[string]struct{})
itr = self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_acc_trie_value])
itr.SeekToFirst()
for itr.Valid() {
copy(prev_key, itr.Key().Data())
itr.Next()
keep := true
if itr.Valid() {
acc := make([]byte, common.HashLength)
copy(acc, itr.Key().Data()[0:common.HashLength])
ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength])

if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 {
if ver_blk_num < blk_num {
keep = false
}
}
}
if keep {
set_account_storage_value_to_keep[string(prev_key)] = member
} else {
set_account_storage_value_to_prune[string(prev_key)] = member
}
}

//Remove and compact everything we can remove
range_limit := [common.VersionedKeyLength]byte{255}
range32 := grocksdb.Range{
Start: make([]byte, common.HashLength),
Limit: make([]byte, common.HashLength),
}
copy(range32.Limit, range_limit[0:common.HashLength])
range40 := grocksdb.Range{
Start: make([]byte, common.VersionedKeyLength),
Limit: make([]byte, common.VersionedKeyLength),
}
copy(range40.Limit, range_limit[0:common.VersionedKeyLength])

for node_to_remove, _ := range set_node_to_remove {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_main_trie_node], node_to_remove[:])
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_main_trie_node], range32)

for value_to_remove, _ := range set_value_to_prune {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_main_trie_value], []byte(value_to_remove))
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_main_trie_value], range40)

for node_to_remove, _ := range set_account_node_to_remove {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_acc_trie_node], node_to_remove[:])
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_acc_trie_node], range32)

for value_to_remove, _ := range set_account_storage_value_to_prune {
if _, ok := set_account_storage_value_to_keep[value_to_remove]; !ok {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_acc_trie_value], []byte(value_to_remove))
}
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_acc_trie_value], range40)
}

func (self *DB) Close() {
self.latest_state.Close()
self.invalidate_versioned_read_pools()
26 changes: 26 additions & 0 deletions taraxa/trie/reader.go
Original file line number Diff line number Diff line change
@@ -70,6 +70,32 @@ func (self Reader) ForEach(db_tx Input, root_hash *common.Hash, with_values bool
self.for_each(db_tx, (*node_hash)(root_hash), with_values, cb, kbuf[:0])
}

func (self Reader) ForEachNodeHash(db_tx Input, root_hash *common.Hash, cb func(*common.Hash)) {
var kbuf hex_key
self.for_each_node_hash(db_tx, (*node_hash)(root_hash), cb, kbuf[:0])
}

func (self Reader) for_each_node_hash(db_tx Input, n node, cb func(*common.Hash), prefix []byte) {
switch n := n.(type) {
case *node_hash:
self.for_each_node_hash(db_tx, self.resolve(db_tx, n, prefix), cb, prefix)
cb(n.common_hash())
case *short_node:
key_extended := append(prefix, n.key_part...)
if _, has_val := n.val.(value_node); !has_val {
self.for_each_node_hash(db_tx, n.val, cb, key_extended)
}
case *full_node:
for i := 0; i < full_node_child_cnt; i++ {
if c := n.children[i]; c != nil {
self.for_each_node_hash(db_tx, c, cb, append(prefix, byte(i)))
}
}
default:
panic("impossible")
}
}

func (self Reader) for_each(db_tx Input, n node, with_values bool, cb KVCallback, prefix []byte) {
switch n := n.(type) {
case *node_hash:

0 comments on commit 79439f8

Please sign in to comment.