Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[facotry] add factoryWithHeight for archive mode #4474

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions action/protocol/managers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package protocol

import (
"context"
"math/big"

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-address/address"
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/state"
)

Expand Down Expand Up @@ -86,6 +89,14 @@ type (
Dock
}

// ArchiveStateSimulator defines an interface to read state and
// simulate-run actions on a given height
ArchiveStateSimulator interface {
StateReader
SimulateExecution(context.Context, address.Address, action.Envelope, ...SimulateOption) ([]byte, *action.Receipt, error)
ReadContractStorage(context.Context, address.Address, []byte) ([]byte, error)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 are needed for archive-node API

}

// Dock defines an interface for protocol to read/write their private data in StateReader/Manager
// data are stored as interface{}, user needs to type-assert on their own upon Unload()
Dock interface {
Expand Down
8 changes: 6 additions & 2 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
if height != "" {
inputHeight, err := strconv.ParseUint(height, 0, 64)
if err != nil {
return nil, uint64(0), err
return nil, 0, err
}
rp := rolldpos.FindProtocol(core.registry)
if rp != nil {
Expand All @@ -961,7 +961,11 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
}
if inputHeight < tipHeight {
// old data, wrap to history state reader
d, h, err := p.ReadState(ctx, factory.NewHistoryStateReader(core.sf, inputHeight), methodName, arguments...)
sfAtHeight, err := core.sf.AtHeight(inputHeight)
if err != nil {
return nil, 0, err
}
d, h, err := p.ReadState(ctx, sfAtHeight, methodName, arguments...)
if err == nil {
key.Height = strconv.FormatUint(h, 10)
core.readCache.Put(key.Hash(), d)
Expand Down
1 change: 1 addition & 0 deletions api/serverV2_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func addActsToActPool(ctx context.Context, ap actpool.ActPool) error {

func setupChain(cfg testConfig) (blockchain.Blockchain, blockdao.BlockDAO, blockindex.Indexer, blockindex.BloomFilterIndexer, factory.Factory, actpool.ActPool, *protocol.Registry, string, error) {
cfg.chain.ProducerPrivKey = hex.EncodeToString(identityset.PrivateKey(0).Bytes())
cfg.chain.EnableArchiveMode = true
registry := protocol.NewRegistry()
factoryCfg := factory.GenerateConfig(cfg.chain, cfg.genesis)
sf, err := factory.NewFactory(factoryCfg, db.NewMemKVStore(), factory.RegistryOption(registry))
Expand Down
17 changes: 9 additions & 8 deletions blockchain/integrity/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2355,14 +2355,14 @@ func testHistoryForAccount(t *testing.T, statetx bool) {

// check history account's balance
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
_, err = sf.AtHeight(0)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
AccountA, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
sr, err := sf.AtHeight(bc.TipHeight() - 1)
require.NoError(err)
AccountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(err)
AccountB, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
AccountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(err)
require.Equal(big.NewInt(100), AccountA.Balance)
require.Equal(big.NewInt(100), AccountB.Balance)
Expand Down Expand Up @@ -2403,10 +2403,11 @@ func testHistoryForContract(t *testing.T, statetx bool) {

// check the the original balance again
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), contractAddr)
require.True(errors.Cause(err) == factory.ErrNotSupported)
_, err = sf.AtHeight(bc.TipHeight() - 1)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
sr := factory.NewHistoryStateReader(sf, bc.TipHeight()-1)
sr, err := sf.AtHeight(bc.TipHeight() - 1)
require.NoError(err)
account, err = accountutil.AccountState(ctx, sr, contractAddr)
require.NoError(err)
balance = BalanceOfContract(contract, genesisAccount, sr, t, account.Root)
Expand Down
84 changes: 16 additions & 68 deletions state/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ type (
ReadContractStorage(context.Context, address.Address, []byte) ([]byte, error)
PutBlock(context.Context, *block.Block) error
DeleteTipBlock(context.Context, *block.Block) error
StateAtHeight(uint64, interface{}, ...protocol.StateOption) error
StatesAtHeight(uint64, ...protocol.StateOption) (state.Iterator, error)
AtHeight(uint64) (protocol.ArchiveStateSimulator, error) // returns a factory at the given height
}

// factory implements StateFactory interface, tracks changes to account/contract and batch-commits to DB
Expand Down Expand Up @@ -346,6 +345,21 @@ func (sf *factory) Validate(ctx context.Context, blk *block.Block) error {
return nil
}

func (sf *factory) AtHeight(h uint64) (protocol.ArchiveStateSimulator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if h == sf.currentChainHeight {
return sf, nil
}
if !sf.saveHistory {
return nil, errors.Wrap(ErrNoArchiveData, "archive mode not enabled")
}
return &factoryWithHeight{
factory: sf,
height: h,
}, nil
}

// NewBlockBuilder returns block builder which hasn't been signed yet
func (sf *factory) NewBlockBuilder(
ctx context.Context,
Expand Down Expand Up @@ -485,52 +499,6 @@ func (sf *factory) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from factory")
}

// StateAtHeight returns a confirmed state at height -- archive mode
func (sf *factory) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
cfg, err := processOptions(opts...)
if err != nil {
return err
}
if cfg.Keys != nil {
return errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if height > sf.currentChainHeight {
return errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.stateAtHeight(height, cfg.Namespace, cfg.Key, s)
}

// StatesAtHeight returns a set states in the state factory at height -- archive mode
func (sf *factory) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return nil, err
}
if cfg.Keys != nil {
return nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return nil, err
}
return state.NewIterator(keys, values)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to factoryWithHeight

// State returns a confirmed state in the state factory
func (sf *factory) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
Expand Down Expand Up @@ -602,26 +570,6 @@ func legacyKeyLen() int {
return 20
}

func (sf *factory) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factory) createGenesisStates(ctx context.Context) error {
ws, err := sf.newWorkingSet(ctx, 0)
if err != nil {
Expand Down
12 changes: 4 additions & 8 deletions state/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,20 +568,16 @@ func testHistoryState(sf Factory, t *testing.T, statetx, archive bool) {
// check archive data
if statetx {
// statetx not support archive mode
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
_, err = sf.AtHeight(0)
require.Equal(t, ErrNotSupported, errors.Cause(err))
} else {
sr, err := sf.AtHeight(0)
if !archive {
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
} else {
accountA, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
accountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
accountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(t, err)
require.Equal(t, big.NewInt(100), accountA.Balance)
require.Equal(t, big.NewInt(0), accountB.Balance)
Expand Down
89 changes: 89 additions & 0 deletions state/factory/factory_withheight.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimulateExecution and ReadContractStorage also need re-implement, they are not currently based on the specified height, but rather on the latest height

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok just added, was thinking to add it later in API PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2020 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package factory

import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/state"
)

type factoryWithHeight struct {
*factory
height uint64
}

func (sf *factoryWithHeight) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it share the same mutex with factory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think reading on archive state can be lock-free

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in latest commit

cfg, err := processOptions(opts...)
if err != nil {
return 0, err
}
if cfg.Keys != nil {
return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if sf.height > sf.currentChainHeight {
return 0, errors.Errorf("query height %d is higher than tip height %d", sf.height, sf.currentChainHeight)
}
return sf.height, sf.stateAtHeight(sf.height, cfg.Namespace, cfg.Key, s)
}

func (sf *factoryWithHeight) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factoryWithHeight) States(opts ...protocol.StateOption) (uint64, state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if sf.height > sf.currentChainHeight {
return 0, nil, errors.Errorf("query height %d is higher than tip height %d", sf.height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return 0, nil, err
}
if cfg.Keys != nil {
return 0, nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, sf.height), false)
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to generate trie for %d", sf.height)
}
if err := tlt.Start(context.Background()); err != nil {
return 0, nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return 0, nil, err
}
iter, err := state.NewIterator(keys, values)
if err != nil {
return 0, nil, err
}
return sf.height, iter, err
}
49 changes: 0 additions & 49 deletions state/factory/historyfactory.go

This file was deleted.

5 changes: 5 additions & 0 deletions state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ func (sdb *stateDB) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from state db")
}

func (sdb *stateDB) AtHeight(h uint64) (protocol.ArchiveStateSimulator, error) {
// TODO: implement state db at height
return nil, ErrNotSupported
}

// State returns a confirmed state in the state factory
func (sdb *stateDB) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
cfg, err := processOptions(opts...)
Expand Down
Loading
Loading