Skip to content

Commit

Permalink
[facotry] add factoryWithHeight for archive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Nov 5, 2024
1 parent 679f001 commit b365980
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 174 deletions.
11 changes: 11 additions & 0 deletions action/protocol/managers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package protocol

import (
"context"
"math/big"

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-address/address"
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/state"
)

Expand Down Expand Up @@ -86,6 +89,14 @@ type (
Dock
}

// ArchiveStateSimulator defines an interface to read state and
// simulate-run actions on a given height
ArchiveStateSimulator interface {
StateReader
SimulateExecution(context.Context, address.Address, action.Envelope, ...SimulateOption) ([]byte, *action.Receipt, error)
ReadContractStorage(context.Context, address.Address, []byte) ([]byte, error)
}

// Dock defines an interface for protocol to read/write their private data in StateReader/Manager
// data are stored as interface{}, user needs to type-assert on their own upon Unload()
Dock interface {
Expand Down
8 changes: 6 additions & 2 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
if height != "" {
inputHeight, err := strconv.ParseUint(height, 0, 64)
if err != nil {
return nil, uint64(0), err
return nil, 0, err
}
rp := rolldpos.FindProtocol(core.registry)
if rp != nil {
Expand All @@ -961,7 +961,11 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
}
if inputHeight < tipHeight {
// old data, wrap to history state reader
d, h, err := p.ReadState(ctx, factory.NewHistoryStateReader(core.sf, inputHeight), methodName, arguments...)
sfAtHeight, err := core.sf.AtHeight(inputHeight)
if err != nil {
return nil, 0, err
}
d, h, err := p.ReadState(ctx, sfAtHeight, methodName, arguments...)
if err == nil {
key.Height = strconv.FormatUint(h, 10)
core.readCache.Put(key.Hash(), d)
Expand Down
1 change: 1 addition & 0 deletions api/serverV2_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func addActsToActPool(ctx context.Context, ap actpool.ActPool) error {

func setupChain(cfg testConfig) (blockchain.Blockchain, blockdao.BlockDAO, blockindex.Indexer, blockindex.BloomFilterIndexer, factory.Factory, actpool.ActPool, *protocol.Registry, string, error) {
cfg.chain.ProducerPrivKey = hex.EncodeToString(identityset.PrivateKey(0).Bytes())
cfg.chain.EnableArchiveMode = true
registry := protocol.NewRegistry()
factoryCfg := factory.GenerateConfig(cfg.chain, cfg.genesis)
sf, err := factory.NewFactory(factoryCfg, db.NewMemKVStore(), factory.RegistryOption(registry))
Expand Down
17 changes: 9 additions & 8 deletions blockchain/integrity/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2355,14 +2355,14 @@ func testHistoryForAccount(t *testing.T, statetx bool) {

// check history account's balance
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
_, err = sf.AtHeight(0)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
AccountA, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
sr, err := sf.AtHeight(bc.TipHeight() - 1)
require.NoError(err)
AccountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(err)
AccountB, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
AccountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(err)
require.Equal(big.NewInt(100), AccountA.Balance)
require.Equal(big.NewInt(100), AccountB.Balance)
Expand Down Expand Up @@ -2403,10 +2403,11 @@ func testHistoryForContract(t *testing.T, statetx bool) {

// check the the original balance again
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), contractAddr)
require.True(errors.Cause(err) == factory.ErrNotSupported)
_, err = sf.AtHeight(bc.TipHeight() - 1)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
sr := factory.NewHistoryStateReader(sf, bc.TipHeight()-1)
sr, err := sf.AtHeight(bc.TipHeight() - 1)
require.NoError(err)
account, err = accountutil.AccountState(ctx, sr, contractAddr)
require.NoError(err)
balance = BalanceOfContract(contract, genesisAccount, sr, t, account.Root)
Expand Down
84 changes: 16 additions & 68 deletions state/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ type (
ReadContractStorage(context.Context, address.Address, []byte) ([]byte, error)
PutBlock(context.Context, *block.Block) error
DeleteTipBlock(context.Context, *block.Block) error
StateAtHeight(uint64, interface{}, ...protocol.StateOption) error
StatesAtHeight(uint64, ...protocol.StateOption) (state.Iterator, error)
AtHeight(uint64) (protocol.ArchiveStateSimulator, error) // returns a factory at the given height
}

// factory implements StateFactory interface, tracks changes to account/contract and batch-commits to DB
Expand Down Expand Up @@ -346,6 +345,21 @@ func (sf *factory) Validate(ctx context.Context, blk *block.Block) error {
return nil
}

func (sf *factory) AtHeight(h uint64) (protocol.ArchiveStateSimulator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if h == sf.currentChainHeight {
return sf, nil
}
if !sf.saveHistory {
return nil, errors.Wrap(ErrNoArchiveData, "archive mode not enabled")
}
return &factoryWithHeight{
factory: sf,
height: h,
}, nil
}

// NewBlockBuilder returns block builder which hasn't been signed yet
func (sf *factory) NewBlockBuilder(
ctx context.Context,
Expand Down Expand Up @@ -485,52 +499,6 @@ func (sf *factory) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from factory")
}

// StateAtHeight returns a confirmed state at height -- archive mode
func (sf *factory) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
cfg, err := processOptions(opts...)
if err != nil {
return err
}
if cfg.Keys != nil {
return errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if height > sf.currentChainHeight {
return errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.stateAtHeight(height, cfg.Namespace, cfg.Key, s)
}

// StatesAtHeight returns a set states in the state factory at height -- archive mode
func (sf *factory) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return nil, err
}
if cfg.Keys != nil {
return nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return nil, err
}
return state.NewIterator(keys, values)
}

// State returns a confirmed state in the state factory
func (sf *factory) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
Expand Down Expand Up @@ -602,26 +570,6 @@ func legacyKeyLen() int {
return 20
}

func (sf *factory) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factory) createGenesisStates(ctx context.Context) error {
ws, err := sf.newWorkingSet(ctx, 0)
if err != nil {
Expand Down
12 changes: 4 additions & 8 deletions state/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,20 +568,16 @@ func testHistoryState(sf Factory, t *testing.T, statetx, archive bool) {
// check archive data
if statetx {
// statetx not support archive mode
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
_, err = sf.AtHeight(0)
require.Equal(t, ErrNotSupported, errors.Cause(err))
} else {
sr, err := sf.AtHeight(0)
if !archive {
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
} else {
accountA, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
accountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
accountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(t, err)
require.Equal(t, big.NewInt(100), accountA.Balance)
require.Equal(t, big.NewInt(0), accountB.Balance)
Expand Down
89 changes: 89 additions & 0 deletions state/factory/factory_withheight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2020 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package factory

import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/state"
)

type factoryWithHeight struct {
*factory
height uint64
}

func (sf *factoryWithHeight) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
cfg, err := processOptions(opts...)
if err != nil {
return 0, err
}
if cfg.Keys != nil {
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if sf.height > sf.currentChainHeight {
return 0, errors.Errorf("query height %d is higher than tip height %d", sf.height, sf.currentChainHeight)
}
return sf.height, sf.stateAtHeight(sf.height, cfg.Namespace, cfg.Key, s)
}

func (sf *factoryWithHeight) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factoryWithHeight) States(opts ...protocol.StateOption) (uint64, state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if sf.height > sf.currentChainHeight {
return 0, nil, errors.Errorf("query height %d is higher than tip height %d", sf.height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return 0, nil, err
}
if cfg.Keys != nil {
return 0, nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, sf.height), false)
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to generate trie for %d", sf.height)
}
if err := tlt.Start(context.Background()); err != nil {
return 0, nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return 0, nil, err
}
iter, err := state.NewIterator(keys, values)
if err != nil {
return 0, nil, err
}
return sf.height, iter, err
}
49 changes: 0 additions & 49 deletions state/factory/historyfactory.go

This file was deleted.

5 changes: 5 additions & 0 deletions state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ func (sdb *stateDB) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from state db")
}

func (sdb *stateDB) AtHeight(h uint64) (protocol.ArchiveStateSimulator, error) {
// TODO: implement state db at height
return nil, ErrNotSupported
}

// State returns a confirmed state in the state factory
func (sdb *stateDB) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
cfg, err := processOptions(opts...)
Expand Down
Loading

0 comments on commit b365980

Please sign in to comment.