From a179f3ee64a7054a715cb6edce86ac957ba76a39 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 19 Apr 2024 08:53:29 +0800 Subject: [PATCH 01/10] make abi param as common pkg --- pkg/util/abiutil/param.go | 103 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 pkg/util/abiutil/param.go diff --git a/pkg/util/abiutil/param.go b/pkg/util/abiutil/param.go new file mode 100644 index 0000000000..6de100f22c --- /dev/null +++ b/pkg/util/abiutil/param.go @@ -0,0 +1,103 @@ +package abiutil + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + "github.com/iotexproject/iotex-address/address" + + "github.com/iotexproject/iotex-core/action" +) + +type ( + // EventParam is a struct to hold smart contract event parameters, which can easily convert a param to go type + EventParam map[string]any +) + +var ( + // ErrInvlidEventParam is an error for invalid event param + ErrInvlidEventParam = errors.New("invalid event param") +) + +// EventField is a helper function to get a field from event param +func EventField[T any](e EventParam, name string) (T, error) { + field, ok := e[name].(T) + if !ok { + return field, errors.Wrapf(ErrInvlidEventParam, "field %s got %#v, expect %T", name, e[name], field) + } + return field, nil +} + +// FieldUint256 is a helper function to get a uint256 field from event param +func (e EventParam) FieldUint256(name string) (*big.Int, error) { + return EventField[*big.Int](e, name) +} + +// FieldBytes12 is a helper function to get a bytes12 field from event param +func (e EventParam) FieldBytes12(name string) (string, error) { + data, err := EventField[[12]byte](e, name) + if err != nil { + return "", err + } + // remove trailing zeros + tail := len(data) - 1 + for ; tail >= 0 && data[tail] == 0; tail-- { + } + return string(data[:tail+1]), nil +} + +// FieldUint256Slice is a helper function to get a uint256 slice field from event param +func (e EventParam) FieldUint256Slice(name string) ([]*big.Int, error) { + return EventField[[]*big.Int](e, name) +} + +// FieldAddress is a helper function to get an address field from event param +func (e EventParam) FieldAddress(name string) (address.Address, error) { + commAddr, err := EventField[common.Address](e, name) + if err != nil { + return nil, err + } + return address.FromBytes(commAddr.Bytes()) +} + +// IndexedFieldAddress is a helper function to get an indexed address field from event param +func (e EventParam) IndexedFieldAddress(name string) (address.Address, error) { + return e.FieldAddress(name) +} + +// IndexedFieldUint256 is a helper function to get an indexed uint256 field from event param +func (e EventParam) IndexedFieldUint256(name string) (*big.Int, error) { + return EventField[*big.Int](e, name) +} + +// UnpackEventParam is a helper function to unpack event parameters +func UnpackEventParam(abiEvent *abi.Event, log *action.Log) (EventParam, error) { + event := make(EventParam) + // unpack non-indexed fields + if len(log.Data) > 0 { + if err := abiEvent.Inputs.UnpackIntoMap(event, log.Data); err != nil { + return nil, errors.Wrap(err, "unpack event data failed") + } + } + // unpack indexed fields + args := make(abi.Arguments, 0) + for _, arg := range abiEvent.Inputs { + if arg.Indexed { + args = append(args, arg) + } + } + topics := make([]common.Hash, 0) + for i, topic := range log.Topics { + if i > 0 { + topics = append(topics, common.Hash(topic)) + } + } + err := abi.ParseTopicsIntoMap(event, args, topics) + if err != nil { + return nil, errors.Wrap(err, "unpack event indexed fields failed") + } + return event, nil +} From 106262d2469381827436c72298af129edb7aa979 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 19 Apr 2024 08:54:02 +0800 Subject: [PATCH 02/10] add contract indexer common --- systemcontractindex/common.go | 77 +++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 systemcontractindex/common.go diff --git a/systemcontractindex/common.go b/systemcontractindex/common.go new file mode 100644 index 0000000000..6f6fe12894 --- /dev/null +++ b/systemcontractindex/common.go @@ -0,0 +1,77 @@ +package systemcontractindex + +import ( + "github.com/pkg/errors" + + "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" +) + +// IndexerCommon is the common struct for all contract indexers +// It provides the basic functions, including +// 1. kvstore +// 2. put/get index height +// 3. contract address +type IndexerCommon struct { + kvstore db.KVStore + ns string + key []byte + startHeight uint64 + contractAddress string +} + +// NewIndexerCommon creates a new IndexerCommon +func NewIndexerCommon(kvstore db.KVStore, ns string, key []byte, contractAddress string, startHeight uint64) *IndexerCommon { + return &IndexerCommon{ + kvstore: kvstore, + ns: ns, + key: key, + startHeight: startHeight, + contractAddress: contractAddress, + } +} + +// KVStore returns the kvstore +func (s *IndexerCommon) KVStore() db.KVStore { return s.kvstore } + +// ContractAddress returns the contract address +func (s *IndexerCommon) ContractAddress() string { return s.contractAddress } + +// Height returns the tip block height +func (s *IndexerCommon) Height() (uint64, error) { + // get the tip block height + var height uint64 + h, err := s.kvstore.Get(s.ns, s.key) + if err != nil { + if !errors.Is(err, db.ErrNotExist) { + return 0, err + } + height = 0 + } else { + height = byteutil.BytesToUint64BigEndian(h) + } + return height, nil +} + +// StartHeight returns the start height of the indexer +func (s *IndexerCommon) StartHeight() uint64 { return s.startHeight } + +// PutHeight puts the tip block height +func (s *IndexerCommon) PutHeight(height uint64) error { + return s.kvstore.Put(s.ns, s.key, byteutil.Uint64ToBytesBigEndian(height)) +} + +// BlockContinuity checks the block continuity +func (s *IndexerCommon) BlockContinuity(height uint64) (existed bool, err error) { + expectHeight, err := s.Height() + if err != nil { + return false, err + } + if expectHeight < s.startHeight { + expectHeight = s.startHeight + } + if expectHeight >= height { + return expectHeight > height, nil + } + return false, errors.Errorf("invalid block height %d, expect %d", height, expectHeight) +} From d28c47aa2a0c1bf129c2a6fd6066f671a69d131c Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 19 Apr 2024 08:54:37 +0800 Subject: [PATCH 03/10] add contract staking indexer v2 --- systemcontractindex/stakingindex/bucket.go | 125 +++++++ systemcontractindex/stakingindex/cache.go | 139 ++++++++ .../stakingindex/event_handler.go | 310 +++++++++++++++++ systemcontractindex/stakingindex/index.go | 202 ++++++++++++ systemcontractindex/stakingindex/staking.json | 311 ++++++++++++++++++ .../stakingindex/stakingpb/staking.proto | 20 ++ .../stakingindex/stakingpb/stakingpb.go | 216 ++++++++++++ 7 files changed, 1323 insertions(+) create mode 100644 systemcontractindex/stakingindex/bucket.go create mode 100644 systemcontractindex/stakingindex/cache.go create mode 100644 systemcontractindex/stakingindex/event_handler.go create mode 100644 systemcontractindex/stakingindex/index.go create mode 100644 systemcontractindex/stakingindex/staking.json create mode 100644 systemcontractindex/stakingindex/stakingpb/staking.proto create mode 100644 systemcontractindex/stakingindex/stakingpb/stakingpb.go diff --git a/systemcontractindex/stakingindex/bucket.go b/systemcontractindex/stakingindex/bucket.go new file mode 100644 index 0000000000..cde0019615 --- /dev/null +++ b/systemcontractindex/stakingindex/bucket.go @@ -0,0 +1,125 @@ +package stakingindex + +import ( + "math/big" + "time" + + "github.com/iotexproject/iotex-address/address" + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" + + "github.com/iotexproject/iotex-core/action/protocol/staking" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" + "github.com/iotexproject/iotex-core/systemcontractindex/stakingindex/stakingpb" +) + +type VoteBucket = staking.VoteBucket + +type Bucket struct { + Candidate address.Address + Owner address.Address + StakedAmount *big.Int + StakedDurationBlockNumber uint64 + CreatedAt uint64 + UnlockedAt uint64 + UnstakedAt uint64 +} + +func (bi *Bucket) Serialize() []byte { + return byteutil.Must(proto.Marshal(bi.toProto())) +} + +// Deserialize deserializes the bucket info +func (bi *Bucket) Deserialize(b []byte) error { + m := stakingpb.Bucket{} + if err := proto.Unmarshal(b, &m); err != nil { + return err + } + return bi.loadProto(&m) +} + +// clone clones the bucket info +func (bi *Bucket) toProto() *stakingpb.Bucket { + pb := &stakingpb.Bucket{ + Candidate: bi.Candidate.String(), + CreatedAt: bi.CreatedAt, + Owner: bi.Owner.String(), + UnlockedAt: bi.UnlockedAt, + UnstakedAt: bi.UnstakedAt, + Amount: bi.StakedAmount.String(), + Duration: bi.StakedDurationBlockNumber, + } + return pb +} + +func (bi *Bucket) loadProto(p *stakingpb.Bucket) error { + candidate, err := address.FromString(p.Candidate) + if err != nil { + return err + } + owner, err := address.FromString(p.Owner) + if err != nil { + return err + } + amount, ok := new(big.Int).SetString(p.Amount, 10) + if !ok { + return errors.Errorf("invalid staked amount %s", p.Amount) + } + bi.CreatedAt = p.CreatedAt + bi.UnlockedAt = p.UnlockedAt + bi.UnstakedAt = p.UnstakedAt + bi.Candidate = candidate + bi.Owner = owner + bi.StakedAmount = amount + bi.StakedDurationBlockNumber = p.Duration + return nil +} + +func (b *Bucket) Clone() *Bucket { + clone := &Bucket{ + StakedAmount: b.StakedAmount, + StakedDurationBlockNumber: b.StakedDurationBlockNumber, + CreatedAt: b.CreatedAt, + UnlockedAt: b.UnlockedAt, + UnstakedAt: b.UnstakedAt, + } + candidate, _ := address.FromBytes(b.Candidate.Bytes()) + clone.Candidate = candidate + owner, _ := address.FromBytes(b.Owner.Bytes()) + clone.Owner = owner + stakingAmount := new(big.Int).Set(b.StakedAmount) + clone.StakedAmount = stakingAmount + return clone +} + +func assembleVoteBucket(token uint64, bkt *Bucket, contractAddr string, blockInterval time.Duration) *VoteBucket { + vb := VoteBucket{ + Index: token, + StakedAmount: bkt.StakedAmount, + StakedDuration: time.Duration(bkt.StakedDurationBlockNumber) * blockInterval, + StakedDurationBlockNumber: bkt.StakedDurationBlockNumber, + CreateBlockHeight: bkt.CreatedAt, + StakeStartBlockHeight: bkt.CreatedAt, + UnstakeStartBlockHeight: bkt.UnstakedAt, + AutoStake: bkt.UnlockedAt == maxBlockNumber, + Candidate: bkt.Candidate, + Owner: bkt.Owner, + ContractAddress: contractAddr, + } + if bkt.UnlockedAt != maxBlockNumber { + vb.StakeStartBlockHeight = bkt.UnlockedAt + } + return &vb +} + +func batchAssembleVoteBucket(idxs []uint64, bkts []*Bucket, contractAddr string, blockInterval time.Duration) []*VoteBucket { + vbs := make([]*VoteBucket, 0, len(idxs)) + for i := range idxs { + if bkts[i] == nil { + vbs = append(vbs, nil) + continue + } + vbs = append(vbs, assembleVoteBucket(idxs[i], bkts[i], contractAddr, blockInterval)) + } + return vbs +} diff --git a/systemcontractindex/stakingindex/cache.go b/systemcontractindex/stakingindex/cache.go new file mode 100644 index 0000000000..a83c796f12 --- /dev/null +++ b/systemcontractindex/stakingindex/cache.go @@ -0,0 +1,139 @@ +package stakingindex + +import ( + "errors" + + "github.com/iotexproject/iotex-address/address" + + "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" +) + +// cache is the in-memory cache for staking index +// it is not thread-safe and should be protected by the caller +type cache struct { + buckets map[uint64]*Bucket + bucketsByCandidate map[string]map[uint64]struct{} + totalBucketCount uint64 +} + +func newCache() *cache { + return &cache{ + buckets: make(map[uint64]*Bucket), + bucketsByCandidate: make(map[string]map[uint64]struct{}), + } +} + +func (s *cache) Load(kvstore db.KVStore) error { + // load total bucket count + var totalBucketCount uint64 + tbc, err := kvstore.Get(stakingNS, stakingTotalBucketCountKey) + if err != nil { + if !errors.Is(err, db.ErrNotExist) { + return err + } + totalBucketCount = 0 + } else { + totalBucketCount = byteutil.BytesToUint64BigEndian(tbc) + } + s.totalBucketCount = totalBucketCount + + // load buckets + ks, vs, err := kvstore.Filter(stakingBucketNS, func(k, v []byte) bool { return true }, nil, nil) + if err != nil && !errors.Is(err, db.ErrBucketNotExist) { + return err + } + for i := range vs { + var b Bucket + if err := b.Deserialize(vs[i]); err != nil { + return err + } + s.PutBucket(byteutil.BytesToUint64BigEndian(ks[i]), &b) + } + return nil +} + +func (s *cache) Copy() *cache { + c := newCache() + for k, v := range s.buckets { + c.buckets[k] = v.Clone() + } + for cand, btks := range s.bucketsByCandidate { + c.bucketsByCandidate[cand] = make(map[uint64]struct{}) + for btxIdx := range btks { + c.bucketsByCandidate[cand][btxIdx] = struct{}{} + } + } + c.totalBucketCount = s.totalBucketCount + return c +} + +func (s *cache) PutBucket(id uint64, bkt *Bucket) { + cand := bkt.Candidate.String() + if s.buckets[id] != nil { + prevCand := s.buckets[id].Candidate.String() + if prevCand != cand { + delete(s.bucketsByCandidate[prevCand], id) + if len(s.bucketsByCandidate[prevCand]) == 0 { + delete(s.bucketsByCandidate, prevCand) + } + } + } + s.buckets[id] = bkt + if s.bucketsByCandidate[cand] == nil { + s.bucketsByCandidate[cand] = make(map[uint64]struct{}) + } + s.bucketsByCandidate[cand][id] = struct{}{} + return +} + +func (s *cache) DeleteBucket(id uint64) { + bkt, ok := s.buckets[id] + if !ok { + return + } + cand := bkt.Candidate.String() + delete(s.bucketsByCandidate[cand], id) + if len(s.bucketsByCandidate[cand]) == 0 { + delete(s.bucketsByCandidate, cand) + } + delete(s.buckets, id) +} + +func (s *cache) BucketIdxs() []uint64 { + idxs := make([]uint64, 0, len(s.buckets)) + for id := range s.buckets { + idxs = append(idxs, id) + } + return idxs +} + +func (s *cache) Bucket(id uint64) *Bucket { + if bkt, ok := s.buckets[id]; ok { + return bkt + } + return nil +} + +func (s *cache) BucketsByIndices(indices []uint64) []*Bucket { + buckets := make([]*Bucket, 0, len(indices)) + for _, idx := range indices { + if bkt, ok := s.buckets[idx]; ok { + buckets = append(buckets, bkt) + } + } + return buckets +} + +func (s *cache) BucketIdxsByCandidate(candidate address.Address) []uint64 { + cand := candidate.String() + buckets := make([]uint64, 0, len(s.bucketsByCandidate[cand])) + for idx := range s.bucketsByCandidate[cand] { + buckets = append(buckets, idx) + } + return buckets +} + +func (s *cache) TotalBucketCount() uint64 { + return s.totalBucketCount +} diff --git a/systemcontractindex/stakingindex/event_handler.go b/systemcontractindex/stakingindex/event_handler.go new file mode 100644 index 0000000000..7ac797cf4c --- /dev/null +++ b/systemcontractindex/stakingindex/event_handler.go @@ -0,0 +1,310 @@ +package stakingindex + +import ( + "context" + _ "embed" + "math" + "strings" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + "github.com/iotexproject/iotex-address/address" + + "github.com/iotexproject/iotex-core/action" + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/db/batch" + "github.com/iotexproject/iotex-core/pkg/util/abiutil" +) + +const ( + maxBlockNumber uint64 = math.MaxUint64 +) + +type eventHandler struct { + dirty *cache // dirty cache, a view for current block + delta batch.KVStoreBatch // delta for db to store buckets of current block + tokenOwner map[uint64]address.Address +} + +var ( + // TODO: fill in the ABI of staking contract + //go:embed staking.json + stakingContractJSONABI string + stakingContractABI abi.ABI + + // ErrBucketNotExist is the error when bucket does not exist + ErrBucketNotExist = errors.New("bucket does not exist") +) + +func init() { + var err error + stakingContractABI, err = abi.JSON(strings.NewReader(stakingContractJSONABI)) + if err != nil { + panic(err) + } +} + +func newEventHandler(dirty *cache) *eventHandler { + return &eventHandler{ + dirty: dirty, + delta: batch.NewBatch(), + } +} + +func (eh *eventHandler) HandleEvent(ctx context.Context, blk *block.Block, log *action.Log) error { + // get event abi + abiEvent, err := stakingContractABI.EventByID(common.Hash(log.Topics[0])) + if err != nil { + return errors.Wrapf(err, "get event abi from topic %v failed", log.Topics[0]) + } + + // unpack event data + event, err := abiutil.UnpackEventParam(abiEvent, log) + if err != nil { + return err + } + + // handle different kinds of event + switch abiEvent.Name { + case "Staked": + return eh.handleStakedEvent(event, blk.Height()) + case "Locked": + return eh.handleLockedEvent(event) + case "Unlocked": + return eh.handleUnlockedEvent(event, blk.Height()) + case "Unstaked": + return eh.handleUnstakedEvent(event, blk.Height()) + case "Merged": + return eh.handleMergedEvent(event) + case "BucketExpanded": + return eh.handleBucketExpandedEvent(event) + case "DelegateChanged": + return eh.handleDelegateChangedEvent(event) + case "Withdrawal": + return eh.handleWithdrawalEvent(event) + case "Donated": + return eh.handleDonatedEvent(event) + case "Transfer": + return eh.handleTransferEvent(event) + case "Approval", "ApprovalForAll", "OwnershipTransferred", "Paused", "Unpaused": + // not require handling events + return nil + default: + return errors.Errorf("unknown event name %s", abiEvent.Name) + } +} + +func (eh *eventHandler) handleStakedEvent(event abiutil.EventParam, height uint64) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + delegateParam, err := event.FieldAddress("delegate") + if err != nil { + return err + } + amountParam, err := event.FieldUint256("amount") + if err != nil { + return err + } + durationParam, err := event.FieldUint256("duration") + if err != nil { + return err + } + owner, ok := eh.tokenOwner[tokenIDParam.Uint64()] + if !ok { + return errors.Errorf("no owner for token id %d", tokenIDParam.Uint64()) + } + bucket := &Bucket{ + Candidate: delegateParam, + Owner: owner, + StakedAmount: amountParam, + StakedDurationBlockNumber: durationParam.Uint64(), + CreatedAt: height, + UnlockedAt: maxBlockNumber, + UnstakedAt: maxBlockNumber, + } + eh.dirty.PutBucket(tokenIDParam.Uint64(), bucket) + return nil +} + +func (eh *eventHandler) handleLockedEvent(event abiutil.EventParam) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + durationParam, err := event.FieldUint256("duration") + if err != nil { + return err + } + + bkt := eh.dirty.Bucket(tokenIDParam.Uint64()) + if bkt == nil { + return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) + } + bkt.StakedDurationBlockNumber = durationParam.Uint64() + bkt.UnlockedAt = maxBlockNumber + eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + return nil +} + +func (eh *eventHandler) handleUnlockedEvent(event abiutil.EventParam, height uint64) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + + bkt := eh.dirty.Bucket(tokenIDParam.Uint64()) + if bkt == nil { + return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) + } + bkt.UnlockedAt = height + eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + return nil +} + +func (eh *eventHandler) handleUnstakedEvent(event abiutil.EventParam, height uint64) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + + bkt := eh.dirty.Bucket(tokenIDParam.Uint64()) + if bkt == nil { + return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) + } + bkt.UnstakedAt = height + eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + return nil +} + +func (eh *eventHandler) handleDelegateChangedEvent(event abiutil.EventParam) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + delegateParam, err := event.FieldAddress("newDelegate") + if err != nil { + return err + } + + bkt := eh.dirty.Bucket(tokenIDParam.Uint64()) + if bkt == nil { + return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) + } + bkt.Candidate = delegateParam + eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + return nil +} + +func (eh *eventHandler) handleWithdrawalEvent(event abiutil.EventParam) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + + eh.dirty.DeleteBucket(tokenIDParam.Uint64()) + return nil +} + +func (eh *eventHandler) handleTransferEvent(event abiutil.EventParam) error { + to, err := event.IndexedFieldAddress("to") + if err != nil { + return err + } + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + + tokenID := tokenIDParam.Uint64() + // cache token owner for stake event + eh.tokenOwner[tokenID] = to + // update bucket owner if token exists + bkt := eh.dirty.Bucket(tokenID) + if bkt != nil { + bkt.Owner = to + eh.dirty.PutBucket(tokenID, bkt) + } + return nil +} + +func (eh *eventHandler) handleMergedEvent(event abiutil.EventParam) error { + tokenIDsParam, err := event.FieldUint256Slice("tokenIds") + if err != nil { + return err + } + amountParam, err := event.FieldUint256("amount") + if err != nil { + return err + } + durationParam, err := event.FieldUint256("duration") + if err != nil { + return err + } + + // merge to the first bucket + b := eh.dirty.Bucket(tokenIDsParam[0].Uint64()) + if b == nil { + return errors.Wrapf(ErrBucketNotExist, "token id %d", tokenIDsParam[0].Uint64()) + } + b.StakedAmount = amountParam + b.StakedDurationBlockNumber = durationParam.Uint64() + b.UnlockedAt = maxBlockNumber + for i := 1; i < len(tokenIDsParam); i++ { + eh.dirty.DeleteBucket(tokenIDsParam[i].Uint64()) + } + eh.dirty.PutBucket(tokenIDsParam[0].Uint64(), b) + return nil +} + +func (eh *eventHandler) handleBucketExpandedEvent(event abiutil.EventParam) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + amountParam, err := event.FieldUint256("amount") + if err != nil { + return err + } + durationParam, err := event.FieldUint256("duration") + if err != nil { + return err + } + + b := eh.dirty.Bucket(tokenIDParam.Uint64()) + if b == nil { + return errors.Wrapf(ErrBucketNotExist, "token id %d", tokenIDParam.Uint64()) + } + b.StakedAmount = amountParam + b.StakedDurationBlockNumber = durationParam.Uint64() + eh.dirty.PutBucket(tokenIDParam.Uint64(), b) + return nil +} + +func (eh *eventHandler) handleDonatedEvent(event abiutil.EventParam) error { + tokenIDParam, err := event.IndexedFieldUint256("tokenId") + if err != nil { + return err + } + amountParam, err := event.FieldUint256("amount") + if err != nil { + return err + } + + b := eh.dirty.Bucket(tokenIDParam.Uint64()) + if b == nil { + return errors.Wrapf(ErrBucketNotExist, "token id %d", tokenIDParam.Uint64()) + } + b.StakedAmount.Sub(b.StakedAmount, amountParam) + eh.dirty.PutBucket(tokenIDParam.Uint64(), b) + return nil +} + +func (eh *eventHandler) Finalize() (batch.KVStoreBatch, *cache) { + delta, dirty := eh.delta, eh.dirty + eh.delta, eh.dirty = nil, nil + return delta, dirty +} diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go new file mode 100644 index 0000000000..857abac39a --- /dev/null +++ b/systemcontractindex/stakingindex/index.go @@ -0,0 +1,202 @@ +package stakingindex + +import ( + "context" + "sync" + "time" + + "github.com/iotexproject/iotex-address/address" + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "github.com/pkg/errors" + + "github.com/iotexproject/iotex-core/blockchain/block" + "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/systemcontractindex" +) + +const ( + stakingNS = "sns" + stakingBucketNS = "sbn" +) + +var ( + stakingHeightKey = []byte("shk") + stakingTotalBucketCountKey = []byte("stbck") +) + +type ( + // Indexer is the staking indexer + Indexer struct { + common *systemcontractindex.IndexerCommon + cache *cache // in-memory cache, used to query index data + mutex sync.RWMutex + blockInterval time.Duration + } +) + +// NewIndexer creates a new staking indexer +func NewIndexer(kvstore db.KVStore, contractAddr string, startHeight uint64, blockInterval time.Duration) *Indexer { + return &Indexer{ + common: systemcontractindex.NewIndexerCommon(kvstore, stakingNS, stakingHeightKey, contractAddr, startHeight), + cache: newCache(), + blockInterval: blockInterval, + } +} + +// Start starts the indexer +func (s *Indexer) Start(ctx context.Context) error { + if err := s.common.KVStore().Start(ctx); err != nil { + return err + } + return s.cache.Load(s.common.KVStore()) +} + +// Stop stops the indexer +func (s *Indexer) Stop(ctx context.Context) error { + return s.common.KVStore().Stop(ctx) +} + +// Height returns the tip block height +func (s *Indexer) Height() (uint64, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.common.Height() +} + +// StartHeight returns the start height of the indexer +func (s *Indexer) StartHeight() uint64 { + return s.common.StartHeight() +} + +// Buckets returns the buckets +func (s *Indexer) Buckets(height uint64) ([]*VoteBucket, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if unstart, err := s.checkHeight(height); err != nil { + return nil, err + } else if unstart { + return nil, nil + } + idxs := s.cache.BucketIdxs() + bkts := s.cache.BucketsByIndices(idxs) + vbs := batchAssembleVoteBucket(idxs, bkts, s.common.ContractAddress(), s.blockInterval) + return vbs, nil +} + +// Bucket returns the bucket +func (s *Indexer) Bucket(id uint64, height uint64) (*VoteBucket, bool, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if unstart, err := s.checkHeight(height); err != nil { + return nil, false, err + } else if unstart { + return nil, false, nil + } + bkt := s.cache.Bucket(id) + if bkt == nil { + return nil, false, nil + } + vbs := assembleVoteBucket(id, bkt, s.common.ContractAddress(), s.blockInterval) + return vbs, true, nil +} + +// BucketsByIndices returns the buckets by indices +func (s *Indexer) BucketsByIndices(indices []uint64, height uint64) ([]*VoteBucket, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if unstart, err := s.checkHeight(height); err != nil { + return nil, err + } else if unstart { + return nil, nil + } + bkts := s.cache.BucketsByIndices(indices) + vbs := batchAssembleVoteBucket(indices, bkts, s.common.ContractAddress(), s.blockInterval) + return vbs, nil +} + +// BucketsByCandidate returns the buckets by candidate +func (s *Indexer) BucketsByCandidate(candidate address.Address, height uint64) ([]*VoteBucket, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if unstart, err := s.checkHeight(height); err != nil { + return nil, err + } else if unstart { + return nil, nil + } + idxs := s.cache.BucketIdxsByCandidate(candidate) + bkts := s.cache.BucketsByIndices(idxs) + vbs := batchAssembleVoteBucket(idxs, bkts, s.common.ContractAddress(), s.blockInterval) + return vbs, nil +} + +// TotalBucketCount returns the total bucket count including active and burnt buckets +func (s *Indexer) TotalBucketCount(height uint64) (uint64, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if unstart, err := s.checkHeight(height); err != nil { + return 0, err + } else if unstart { + return 0, nil + } + return s.cache.TotalBucketCount(), nil +} + +// PutBlock puts a block into indexer +func (s *Indexer) PutBlock(ctx context.Context, blk *block.Block) error { + // check block continuity + if existed, err := s.common.BlockContinuity(blk.Height()); err != nil { + return err + } else if existed { + return nil + } + // handle events of block + handler := newEventHandler(s.cache.Copy()) + for _, receipt := range blk.Receipts { + if receipt.Status != uint64(iotextypes.ReceiptStatus_Success) { + continue + } + for _, log := range receipt.Logs() { + if log.Address != s.common.ContractAddress() { + continue + } + if err := handler.HandleEvent(ctx, blk, log); err != nil { + return err + } + } + } + // commit + return s.commit(handler, blk.Height()) +} + +func (s *Indexer) commit(handler *eventHandler, height uint64) error { + delta, dirty := handler.Finalize() + s.mutex.Lock() + defer s.mutex.Unlock() + // update db + if err := s.common.KVStore().WriteBatch(delta); err != nil { + return err + } + s.common.PutHeight(height) + // update cache + s.cache = dirty + return nil +} + +func (s *Indexer) checkHeight(height uint64) (unstart bool, err error) { + if height < s.common.StartHeight() { + return true, nil + } + tipHeight, err := s.common.Height() + if err != nil { + return false, err + } + if height != tipHeight { + return false, errors.Errorf("invalid block height %d, expect %d", height, tipHeight) + } + return false, nil +} diff --git a/systemcontractindex/stakingindex/staking.json b/systemcontractindex/stakingindex/staking.json new file mode 100644 index 0000000000..0c2f798df6 --- /dev/null +++ b/systemcontractindex/stakingindex/staking.json @@ -0,0 +1,311 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "address", + "name": "owner", + "type": "address" + }, + { + "indexed": true, + "internalType": "address", + "name": "approved", + "type": "address" + }, + { + "indexed": true, + "internalType": "uint256", + "name": "tokenId", + "type": "uint256" + } + ], + "name": "Approval", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "address", + "name": "owner", + "type": "address" + }, + { + "indexed": true, + "internalType": "address", + "name": "operator", + "type": "address" + }, + { + "indexed": false, + "internalType": "bool", + "name": "approved", + "type": "bool" + } + ], + "name": "ApprovalForAll", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "amount", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "duration", + "type": "uint256" + } + ], + "name": "BucketExpanded", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "address", + "name": "newDelegate", + "type": "address" + } + ], + "name": "DelegateChanged", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + }, + { + "indexed": true, + "internalType": "address", + "name": "beneficiary", + "type": "address" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "amount", + "type": "uint256" + } + ], + "name": "Donated", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "duration", + "type": "uint256" + } + ], + "name": "Locked", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint256[]", + "name": "bucketIds", + "type": "uint256[]" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "amount", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "duration", + "type": "uint256" + } + ], + "name": "Merged", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "address", + "name": "previousOwner", + "type": "address" + }, + { + "indexed": true, + "internalType": "address", + "name": "newOwner", + "type": "address" + } + ], + "name": "OwnershipTransferred", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "address", + "name": "account", + "type": "address" + } + ], + "name": "Paused", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "address", + "name": "delegate", + "type": "address" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "amount", + "type": "uint256" + }, + { + "indexed": false, + "internalType": "uint256", + "name": "duration", + "type": "uint256" + } + ], + "name": "Staked", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "address", + "name": "from", + "type": "address" + }, + { + "indexed": true, + "internalType": "address", + "name": "to", + "type": "address" + }, + { + "indexed": true, + "internalType": "uint256", + "name": "tokenId", + "type": "uint256" + } + ], + "name": "Transfer", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + } + ], + "name": "Unlocked", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "address", + "name": "account", + "type": "address" + } + ], + "name": "Unpaused", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + } + ], + "name": "Unstaked", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint256", + "name": "bucketId", + "type": "uint256" + }, + { + "indexed": true, + "internalType": "address", + "name": "recipient", + "type": "address" + } + ], + "name": "Withdrawal", + "type": "event" + } +] \ No newline at end of file diff --git a/systemcontractindex/stakingindex/stakingpb/staking.proto b/systemcontractindex/stakingindex/stakingpb/staking.proto new file mode 100644 index 0000000000..454335577c --- /dev/null +++ b/systemcontractindex/stakingindex/stakingpb/staking.proto @@ -0,0 +1,20 @@ +// Copyright (c) 2019 IoTeX +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +// To compile the proto, run: +// protoc --go_out=plugins=grpc:. *.proto +syntax = "proto3"; +package contractstakingpb; +option go_package = "github.com/iotexproject/iotex-core/systemcontractindex/stakingindex/stakingpb"; + +message Bucket { + string candidate = 1; + string owner = 2; + string amount = 3; + uint64 duration = 4; + uint64 createdAt = 5; + uint64 unlockedAt = 6; + uint64 unstakedAt = 7; +} \ No newline at end of file diff --git a/systemcontractindex/stakingindex/stakingpb/stakingpb.go b/systemcontractindex/stakingindex/stakingpb/stakingpb.go new file mode 100644 index 0000000000..030b21c2cb --- /dev/null +++ b/systemcontractindex/stakingindex/stakingpb/stakingpb.go @@ -0,0 +1,216 @@ +// Copyright (c) 2019 IoTeX +// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability +// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed. +// This source code is governed by Apache License 2.0 that can be found in the LICENSE file. + +// To compile the proto, run: +// protoc --go_out=plugins=grpc:. *.proto + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v4.23.3 +// source: systemcontractindex/stakingindex/stakingpb/staking.proto + +package stakingpb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Bucket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Candidate string `protobuf:"bytes,1,opt,name=candidate,proto3" json:"candidate,omitempty"` + Owner string `protobuf:"bytes,2,opt,name=owner,proto3" json:"owner,omitempty"` + Amount string `protobuf:"bytes,3,opt,name=amount,proto3" json:"amount,omitempty"` + Duration uint64 `protobuf:"varint,4,opt,name=duration,proto3" json:"duration,omitempty"` + CreatedAt uint64 `protobuf:"varint,5,opt,name=createdAt,proto3" json:"createdAt,omitempty"` + UnlockedAt uint64 `protobuf:"varint,6,opt,name=unlockedAt,proto3" json:"unlockedAt,omitempty"` + UnstakedAt uint64 `protobuf:"varint,7,opt,name=unstakedAt,proto3" json:"unstakedAt,omitempty"` +} + +func (x *Bucket) Reset() { + *x = Bucket{} + if protoimpl.UnsafeEnabled { + mi := &file_systemcontractindex_stakingindex_stakingpb_staking_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Bucket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Bucket) ProtoMessage() {} + +func (x *Bucket) ProtoReflect() protoreflect.Message { + mi := &file_systemcontractindex_stakingindex_stakingpb_staking_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Bucket.ProtoReflect.Descriptor instead. +func (*Bucket) Descriptor() ([]byte, []int) { + return file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescGZIP(), []int{0} +} + +func (x *Bucket) GetCandidate() string { + if x != nil { + return x.Candidate + } + return "" +} + +func (x *Bucket) GetOwner() string { + if x != nil { + return x.Owner + } + return "" +} + +func (x *Bucket) GetAmount() string { + if x != nil { + return x.Amount + } + return "" +} + +func (x *Bucket) GetDuration() uint64 { + if x != nil { + return x.Duration + } + return 0 +} + +func (x *Bucket) GetCreatedAt() uint64 { + if x != nil { + return x.CreatedAt + } + return 0 +} + +func (x *Bucket) GetUnlockedAt() uint64 { + if x != nil { + return x.UnlockedAt + } + return 0 +} + +func (x *Bucket) GetUnstakedAt() uint64 { + if x != nil { + return x.UnstakedAt + } + return 0 +} + +var File_systemcontractindex_stakingindex_stakingpb_staking_proto protoreflect.FileDescriptor + +var file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDesc = []byte{ + 0x0a, 0x38, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, + 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2f, 0x73, 0x74, 0x61, 0x6b, 0x69, 0x6e, 0x67, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x2f, 0x73, 0x74, 0x61, 0x6b, 0x69, 0x6e, 0x67, 0x70, 0x62, 0x2f, 0x73, 0x74, 0x61, + 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x63, 0x6f, 0x6e, 0x74, + 0x72, 0x61, 0x63, 0x74, 0x73, 0x74, 0x61, 0x6b, 0x69, 0x6e, 0x67, 0x70, 0x62, 0x22, 0xce, 0x01, + 0x0a, 0x06, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x61, 0x6e, 0x64, + 0x69, 0x64, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x61, 0x6e, + 0x64, 0x69, 0x64, 0x61, 0x74, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, + 0x61, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x6d, + 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1e, + 0x0a, 0x0a, 0x75, 0x6e, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x41, 0x74, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x0a, 0x75, 0x6e, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1e, + 0x0a, 0x0a, 0x75, 0x6e, 0x73, 0x74, 0x61, 0x6b, 0x65, 0x64, 0x41, 0x74, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x0a, 0x75, 0x6e, 0x73, 0x74, 0x61, 0x6b, 0x65, 0x64, 0x41, 0x74, 0x42, 0x4f, + 0x5a, 0x4d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x6f, 0x74, + 0x65, 0x78, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, 0x69, 0x6f, 0x74, 0x65, 0x78, 0x2d, + 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x63, 0x6f, 0x6e, 0x74, 0x72, + 0x61, 0x63, 0x74, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2f, 0x73, 0x74, 0x61, 0x6b, 0x69, 0x6e, 0x67, + 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2f, 0x73, 0x74, 0x61, 0x6b, 0x69, 0x6e, 0x67, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescOnce sync.Once + file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescData = file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDesc +) + +func file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescGZIP() []byte { + file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescOnce.Do(func() { + file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescData = protoimpl.X.CompressGZIP(file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescData) + }) + return file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDescData +} + +var file_systemcontractindex_stakingindex_stakingpb_staking_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_systemcontractindex_stakingindex_stakingpb_staking_proto_goTypes = []interface{}{ + (*Bucket)(nil), // 0: contractstakingpb.Bucket +} +var file_systemcontractindex_stakingindex_stakingpb_staking_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_systemcontractindex_stakingindex_stakingpb_staking_proto_init() } +func file_systemcontractindex_stakingindex_stakingpb_staking_proto_init() { + if File_systemcontractindex_stakingindex_stakingpb_staking_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_systemcontractindex_stakingindex_stakingpb_staking_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Bucket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_systemcontractindex_stakingindex_stakingpb_staking_proto_goTypes, + DependencyIndexes: file_systemcontractindex_stakingindex_stakingpb_staking_proto_depIdxs, + MessageInfos: file_systemcontractindex_stakingindex_stakingpb_staking_proto_msgTypes, + }.Build() + File_systemcontractindex_stakingindex_stakingpb_staking_proto = out.File + file_systemcontractindex_stakingindex_stakingpb_staking_proto_rawDesc = nil + file_systemcontractindex_stakingindex_stakingpb_staking_proto_goTypes = nil + file_systemcontractindex_stakingindex_stakingpb_staking_proto_depIdxs = nil +} \ No newline at end of file From cc94de3cf65e15331a0468c2534c0076c9e7699f Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 23 Apr 2024 08:24:47 +0800 Subject: [PATCH 04/10] fix event handle and put height bug --- systemcontractindex/common.go | 15 ++-- .../stakingindex/event_handler.go | 72 +++++++++++-------- systemcontractindex/stakingindex/index.go | 3 +- 3 files changed, 54 insertions(+), 36 deletions(-) diff --git a/systemcontractindex/common.go b/systemcontractindex/common.go index 6f6fe12894..8a1fa47127 100644 --- a/systemcontractindex/common.go +++ b/systemcontractindex/common.go @@ -4,6 +4,7 @@ import ( "github.com/pkg/errors" "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/db/batch" "github.com/iotexproject/iotex-core/pkg/util/byteutil" ) @@ -57,21 +58,25 @@ func (s *IndexerCommon) Height() (uint64, error) { func (s *IndexerCommon) StartHeight() uint64 { return s.startHeight } // PutHeight puts the tip block height -func (s *IndexerCommon) PutHeight(height uint64) error { - return s.kvstore.Put(s.ns, s.key, byteutil.Uint64ToBytesBigEndian(height)) +func (s *IndexerCommon) Commit(height uint64, delta batch.KVStoreBatch) error { + delta.Put(s.ns, s.key, byteutil.Uint64ToBytesBigEndian(height), "failed to put height") + return s.kvstore.WriteBatch(delta) } // BlockContinuity checks the block continuity func (s *IndexerCommon) BlockContinuity(height uint64) (existed bool, err error) { - expectHeight, err := s.Height() + tipHeight, err := s.Height() if err != nil { return false, err } + expectHeight := tipHeight + 1 if expectHeight < s.startHeight { expectHeight = s.startHeight } - if expectHeight >= height { - return expectHeight > height, nil + if expectHeight == height { + return false, nil + } else if expectHeight > height { + return true, nil } return false, errors.Errorf("invalid block height %d, expect %d", height, expectHeight) } diff --git a/systemcontractindex/stakingindex/event_handler.go b/systemcontractindex/stakingindex/event_handler.go index 7ac797cf4c..bd9ff808f6 100644 --- a/systemcontractindex/stakingindex/event_handler.go +++ b/systemcontractindex/stakingindex/event_handler.go @@ -9,13 +9,16 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" + "go.uber.org/zap" "github.com/iotexproject/iotex-address/address" "github.com/iotexproject/iotex-core/action" "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/db/batch" + "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/util/abiutil" + "github.com/iotexproject/iotex-core/pkg/util/byteutil" ) const ( @@ -31,7 +34,7 @@ type eventHandler struct { var ( // TODO: fill in the ABI of staking contract //go:embed staking.json - stakingContractJSONABI string + StakingContractJSONABI string stakingContractABI abi.ABI // ErrBucketNotExist is the error when bucket does not exist @@ -40,7 +43,7 @@ var ( func init() { var err error - stakingContractABI, err = abi.JSON(strings.NewReader(stakingContractJSONABI)) + stakingContractABI, err = abi.JSON(strings.NewReader(StakingContractJSONABI)) if err != nil { panic(err) } @@ -48,24 +51,25 @@ func init() { func newEventHandler(dirty *cache) *eventHandler { return &eventHandler{ - dirty: dirty, - delta: batch.NewBatch(), + dirty: dirty, + delta: batch.NewBatch(), + tokenOwner: make(map[uint64]address.Address), } } -func (eh *eventHandler) HandleEvent(ctx context.Context, blk *block.Block, log *action.Log) error { +func (eh *eventHandler) HandleEvent(ctx context.Context, blk *block.Block, actLog *action.Log) error { // get event abi - abiEvent, err := stakingContractABI.EventByID(common.Hash(log.Topics[0])) + abiEvent, err := stakingContractABI.EventByID(common.Hash(actLog.Topics[0])) if err != nil { - return errors.Wrapf(err, "get event abi from topic %v failed", log.Topics[0]) + return errors.Wrapf(err, "get event abi from topic %v failed", actLog.Topics[0]) } // unpack event data - event, err := abiutil.UnpackEventParam(abiEvent, log) + event, err := abiutil.UnpackEventParam(abiEvent, actLog) if err != nil { return err } - + log.L().Info("handle staking event", zap.String("event", abiEvent.Name), zap.Any("event", event)) // handle different kinds of event switch abiEvent.Name { case "Staked": @@ -97,7 +101,7 @@ func (eh *eventHandler) HandleEvent(ctx context.Context, blk *block.Block, log * } func (eh *eventHandler) handleStakedEvent(event abiutil.EventParam, height uint64) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -126,12 +130,12 @@ func (eh *eventHandler) handleStakedEvent(event abiutil.EventParam, height uint6 UnlockedAt: maxBlockNumber, UnstakedAt: maxBlockNumber, } - eh.dirty.PutBucket(tokenIDParam.Uint64(), bucket) + eh.putBucket(tokenIDParam.Uint64(), bucket) return nil } func (eh *eventHandler) handleLockedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -146,12 +150,12 @@ func (eh *eventHandler) handleLockedEvent(event abiutil.EventParam) error { } bkt.StakedDurationBlockNumber = durationParam.Uint64() bkt.UnlockedAt = maxBlockNumber - eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + eh.putBucket(tokenIDParam.Uint64(), bkt) return nil } func (eh *eventHandler) handleUnlockedEvent(event abiutil.EventParam, height uint64) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -161,12 +165,12 @@ func (eh *eventHandler) handleUnlockedEvent(event abiutil.EventParam, height uin return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) } bkt.UnlockedAt = height - eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + eh.putBucket(tokenIDParam.Uint64(), bkt) return nil } func (eh *eventHandler) handleUnstakedEvent(event abiutil.EventParam, height uint64) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -176,12 +180,12 @@ func (eh *eventHandler) handleUnstakedEvent(event abiutil.EventParam, height uin return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) } bkt.UnstakedAt = height - eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + eh.putBucket(tokenIDParam.Uint64(), bkt) return nil } func (eh *eventHandler) handleDelegateChangedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -195,17 +199,17 @@ func (eh *eventHandler) handleDelegateChangedEvent(event abiutil.EventParam) err return errors.Errorf("no bucket for token id %d", tokenIDParam.Uint64()) } bkt.Candidate = delegateParam - eh.dirty.PutBucket(tokenIDParam.Uint64(), bkt) + eh.putBucket(tokenIDParam.Uint64(), bkt) return nil } func (eh *eventHandler) handleWithdrawalEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } - eh.dirty.DeleteBucket(tokenIDParam.Uint64()) + eh.delBucket(tokenIDParam.Uint64()) return nil } @@ -226,13 +230,13 @@ func (eh *eventHandler) handleTransferEvent(event abiutil.EventParam) error { bkt := eh.dirty.Bucket(tokenID) if bkt != nil { bkt.Owner = to - eh.dirty.PutBucket(tokenID, bkt) + eh.putBucket(tokenID, bkt) } return nil } func (eh *eventHandler) handleMergedEvent(event abiutil.EventParam) error { - tokenIDsParam, err := event.FieldUint256Slice("tokenIds") + tokenIDsParam, err := event.FieldUint256Slice("bucketIds") if err != nil { return err } @@ -254,14 +258,14 @@ func (eh *eventHandler) handleMergedEvent(event abiutil.EventParam) error { b.StakedDurationBlockNumber = durationParam.Uint64() b.UnlockedAt = maxBlockNumber for i := 1; i < len(tokenIDsParam); i++ { - eh.dirty.DeleteBucket(tokenIDsParam[i].Uint64()) + eh.delBucket(tokenIDsParam[i].Uint64()) } - eh.dirty.PutBucket(tokenIDsParam[0].Uint64(), b) + eh.putBucket(tokenIDsParam[0].Uint64(), b) return nil } func (eh *eventHandler) handleBucketExpandedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -280,12 +284,12 @@ func (eh *eventHandler) handleBucketExpandedEvent(event abiutil.EventParam) erro } b.StakedAmount = amountParam b.StakedDurationBlockNumber = durationParam.Uint64() - eh.dirty.PutBucket(tokenIDParam.Uint64(), b) + eh.putBucket(tokenIDParam.Uint64(), b) return nil } func (eh *eventHandler) handleDonatedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.IndexedFieldUint256("bucketId") if err != nil { return err } @@ -299,7 +303,7 @@ func (eh *eventHandler) handleDonatedEvent(event abiutil.EventParam) error { return errors.Wrapf(ErrBucketNotExist, "token id %d", tokenIDParam.Uint64()) } b.StakedAmount.Sub(b.StakedAmount, amountParam) - eh.dirty.PutBucket(tokenIDParam.Uint64(), b) + eh.putBucket(tokenIDParam.Uint64(), b) return nil } @@ -308,3 +312,13 @@ func (eh *eventHandler) Finalize() (batch.KVStoreBatch, *cache) { eh.delta, eh.dirty = nil, nil return delta, dirty } + +func (eh *eventHandler) putBucket(id uint64, bkt *Bucket) { + eh.dirty.PutBucket(id, bkt) + eh.delta.Put(stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), bkt.Serialize(), "failed to put bucket") +} + +func (eh *eventHandler) delBucket(id uint64) { + eh.dirty.DeleteBucket(id) + eh.delta.Delete(stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), "failed to put bucket") +} diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 857abac39a..24bfc88a4f 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -178,10 +178,9 @@ func (s *Indexer) commit(handler *eventHandler, height uint64) error { s.mutex.Lock() defer s.mutex.Unlock() // update db - if err := s.common.KVStore().WriteBatch(delta); err != nil { + if err := s.common.Commit(height, delta); err != nil { return err } - s.common.PutHeight(height) // update cache s.cache = dirty return nil From f39909e578ba78bcfcd12a71536b8eb1d10daea5 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 24 Apr 2024 08:35:43 +0800 Subject: [PATCH 05/10] fix indexer bug in height and buckets --- systemcontractindex/stakingindex/cache.go | 4 ++-- systemcontractindex/stakingindex/index.go | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/systemcontractindex/stakingindex/cache.go b/systemcontractindex/stakingindex/cache.go index a83c796f12..5f5f11a49a 100644 --- a/systemcontractindex/stakingindex/cache.go +++ b/systemcontractindex/stakingindex/cache.go @@ -110,7 +110,7 @@ func (s *cache) BucketIdxs() []uint64 { func (s *cache) Bucket(id uint64) *Bucket { if bkt, ok := s.buckets[id]; ok { - return bkt + return bkt.Clone() } return nil } @@ -119,7 +119,7 @@ func (s *cache) BucketsByIndices(indices []uint64) []*Bucket { buckets := make([]*Bucket, 0, len(indices)) for _, idx := range indices { if bkt, ok := s.buckets[idx]; ok { - buckets = append(buckets, bkt) + buckets = append(buckets, bkt.Clone()) } } return buckets diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 24bfc88a4f..058448f0c1 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -190,11 +190,15 @@ func (s *Indexer) checkHeight(height uint64) (unstart bool, err error) { if height < s.common.StartHeight() { return true, nil } + // means latest height + if height == 0 { + return false, nil + } tipHeight, err := s.common.Height() if err != nil { return false, err } - if height != tipHeight { + if height > tipHeight { return false, errors.Errorf("invalid block height %d, expect %d", height, tipHeight) } return false, nil From cc60c07fee8eff0e58fde61acb5c5787323e00f1 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 24 Apr 2024 11:41:02 +0800 Subject: [PATCH 06/10] parse event param by id --- pkg/util/abiutil/param.go | 85 +++++++++++++++---- .../stakingindex/event_handler.go | 62 +++++++------- 2 files changed, 100 insertions(+), 47 deletions(-) diff --git a/pkg/util/abiutil/param.go b/pkg/util/abiutil/param.go index 6de100f22c..a83fbe8e9d 100644 --- a/pkg/util/abiutil/param.go +++ b/pkg/util/abiutil/param.go @@ -1,6 +1,7 @@ package abiutil import ( + "fmt" "math/big" "github.com/ethereum/go-ethereum/accounts/abi" @@ -14,7 +15,10 @@ import ( type ( // EventParam is a struct to hold smart contract event parameters, which can easily convert a param to go type - EventParam map[string]any + EventParam struct { + params []any + nameToIndex map[string]int + } ) var ( @@ -24,21 +28,57 @@ var ( // EventField is a helper function to get a field from event param func EventField[T any](e EventParam, name string) (T, error) { - field, ok := e[name].(T) + id, ok := e.nameToIndex[name] if !ok { - return field, errors.Wrapf(ErrInvlidEventParam, "field %s got %#v, expect %T", name, e[name], field) + var zeroValue T + return zeroValue, errors.Wrapf(ErrInvlidEventParam, "field %s not found", name) + } + return EventFieldByID[T](e, id) +} + +// EventFieldByID is a helper function to get a field from event param +func EventFieldByID[T any](e EventParam, id int) (T, error) { + field, ok := e.fieldByID(id).(T) + if !ok { + return field, errors.Wrapf(ErrInvlidEventParam, "field %d got %#v, expect %T", id, e.fieldByID(id), field) } return field, nil } +func (e EventParam) field(name string) any { + return e.params[e.nameToIndex[name]] +} + +func (e EventParam) fieldByID(id int) any { + return e.params[id] +} + +func (e EventParam) String() string { + return fmt.Sprintf("%+v", e.params) +} + // FieldUint256 is a helper function to get a uint256 field from event param func (e EventParam) FieldUint256(name string) (*big.Int, error) { return EventField[*big.Int](e, name) } +// FieldByIDUint256 is a helper function to get a uint256 field from event param +func (e EventParam) FieldByIDUint256(id int) (*big.Int, error) { + return EventFieldByID[*big.Int](e, id) +} + // FieldBytes12 is a helper function to get a bytes12 field from event param func (e EventParam) FieldBytes12(name string) (string, error) { - data, err := EventField[[12]byte](e, name) + id, ok := e.nameToIndex[name] + if !ok { + return "", errors.Wrapf(ErrInvlidEventParam, "field %s not found", name) + } + return e.FieldByIDBytes12(id) +} + +// FieldByIDBytes12 is a helper function to get a bytes12 field from event param +func (e EventParam) FieldByIDBytes12(id int) (string, error) { + data, err := EventFieldByID[[12]byte](e, id) if err != nil { return "", err } @@ -54,6 +94,11 @@ func (e EventParam) FieldUint256Slice(name string) ([]*big.Int, error) { return EventField[[]*big.Int](e, name) } +// FieldByIDUint256Slice is a helper function to get a uint256 slice field from event param +func (e EventParam) FieldByIDUint256Slice(id int) ([]*big.Int, error) { + return EventFieldByID[[]*big.Int](e, id) +} + // FieldAddress is a helper function to get an address field from event param func (e EventParam) FieldAddress(name string) (address.Address, error) { commAddr, err := EventField[common.Address](e, name) @@ -63,22 +108,21 @@ func (e EventParam) FieldAddress(name string) (address.Address, error) { return address.FromBytes(commAddr.Bytes()) } -// IndexedFieldAddress is a helper function to get an indexed address field from event param -func (e EventParam) IndexedFieldAddress(name string) (address.Address, error) { - return e.FieldAddress(name) -} - -// IndexedFieldUint256 is a helper function to get an indexed uint256 field from event param -func (e EventParam) IndexedFieldUint256(name string) (*big.Int, error) { - return EventField[*big.Int](e, name) +// FieldByIDAddress is a helper function to get an address field from event param +func (e EventParam) FieldByIDAddress(id int) (address.Address, error) { + commAddr, err := EventFieldByID[common.Address](e, id) + if err != nil { + return nil, err + } + return address.FromBytes(commAddr.Bytes()) } // UnpackEventParam is a helper function to unpack event parameters -func UnpackEventParam(abiEvent *abi.Event, log *action.Log) (EventParam, error) { - event := make(EventParam) +func UnpackEventParam(abiEvent *abi.Event, log *action.Log) (*EventParam, error) { // unpack non-indexed fields + params := make(map[string]any) if len(log.Data) > 0 { - if err := abiEvent.Inputs.UnpackIntoMap(event, log.Data); err != nil { + if err := abiEvent.Inputs.UnpackIntoMap(params, log.Data); err != nil { return nil, errors.Wrap(err, "unpack event data failed") } } @@ -95,9 +139,18 @@ func UnpackEventParam(abiEvent *abi.Event, log *action.Log) (EventParam, error) topics = append(topics, common.Hash(topic)) } } - err := abi.ParseTopicsIntoMap(event, args, topics) + err := abi.ParseTopicsIntoMap(params, args, topics) if err != nil { return nil, errors.Wrap(err, "unpack event indexed fields failed") } + // create event param + event := &EventParam{ + params: make([]any, 0, len(abiEvent.Inputs)), + nameToIndex: make(map[string]int), + } + for i, arg := range abiEvent.Inputs { + event.params = append(event.params, params[arg.Name]) + event.nameToIndex[arg.Name] = i + } return event, nil } diff --git a/systemcontractindex/stakingindex/event_handler.go b/systemcontractindex/stakingindex/event_handler.go index bd9ff808f6..25766288f2 100644 --- a/systemcontractindex/stakingindex/event_handler.go +++ b/systemcontractindex/stakingindex/event_handler.go @@ -100,20 +100,20 @@ func (eh *eventHandler) HandleEvent(ctx context.Context, blk *block.Block, actLo } } -func (eh *eventHandler) handleStakedEvent(event abiutil.EventParam, height uint64) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleStakedEvent(event *abiutil.EventParam, height uint64) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } - delegateParam, err := event.FieldAddress("delegate") + delegateParam, err := event.FieldByIDAddress(1) if err != nil { return err } - amountParam, err := event.FieldUint256("amount") + amountParam, err := event.FieldByIDUint256(2) if err != nil { return err } - durationParam, err := event.FieldUint256("duration") + durationParam, err := event.FieldByIDUint256(3) if err != nil { return err } @@ -134,12 +134,12 @@ func (eh *eventHandler) handleStakedEvent(event abiutil.EventParam, height uint6 return nil } -func (eh *eventHandler) handleLockedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleLockedEvent(event *abiutil.EventParam) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } - durationParam, err := event.FieldUint256("duration") + durationParam, err := event.FieldByIDUint256(1) if err != nil { return err } @@ -154,8 +154,8 @@ func (eh *eventHandler) handleLockedEvent(event abiutil.EventParam) error { return nil } -func (eh *eventHandler) handleUnlockedEvent(event abiutil.EventParam, height uint64) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleUnlockedEvent(event *abiutil.EventParam, height uint64) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } @@ -169,8 +169,8 @@ func (eh *eventHandler) handleUnlockedEvent(event abiutil.EventParam, height uin return nil } -func (eh *eventHandler) handleUnstakedEvent(event abiutil.EventParam, height uint64) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleUnstakedEvent(event *abiutil.EventParam, height uint64) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } @@ -184,12 +184,12 @@ func (eh *eventHandler) handleUnstakedEvent(event abiutil.EventParam, height uin return nil } -func (eh *eventHandler) handleDelegateChangedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleDelegateChangedEvent(event *abiutil.EventParam) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } - delegateParam, err := event.FieldAddress("newDelegate") + delegateParam, err := event.FieldByIDAddress(1) if err != nil { return err } @@ -203,8 +203,8 @@ func (eh *eventHandler) handleDelegateChangedEvent(event abiutil.EventParam) err return nil } -func (eh *eventHandler) handleWithdrawalEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleWithdrawalEvent(event *abiutil.EventParam) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } @@ -213,12 +213,12 @@ func (eh *eventHandler) handleWithdrawalEvent(event abiutil.EventParam) error { return nil } -func (eh *eventHandler) handleTransferEvent(event abiutil.EventParam) error { - to, err := event.IndexedFieldAddress("to") +func (eh *eventHandler) handleTransferEvent(event *abiutil.EventParam) error { + to, err := event.FieldByIDAddress(1) if err != nil { return err } - tokenIDParam, err := event.IndexedFieldUint256("tokenId") + tokenIDParam, err := event.FieldByIDUint256(2) if err != nil { return err } @@ -235,16 +235,16 @@ func (eh *eventHandler) handleTransferEvent(event abiutil.EventParam) error { return nil } -func (eh *eventHandler) handleMergedEvent(event abiutil.EventParam) error { - tokenIDsParam, err := event.FieldUint256Slice("bucketIds") +func (eh *eventHandler) handleMergedEvent(event *abiutil.EventParam) error { + tokenIDsParam, err := event.FieldByIDUint256Slice(0) if err != nil { return err } - amountParam, err := event.FieldUint256("amount") + amountParam, err := event.FieldByIDUint256(1) if err != nil { return err } - durationParam, err := event.FieldUint256("duration") + durationParam, err := event.FieldByIDUint256(2) if err != nil { return err } @@ -264,16 +264,16 @@ func (eh *eventHandler) handleMergedEvent(event abiutil.EventParam) error { return nil } -func (eh *eventHandler) handleBucketExpandedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleBucketExpandedEvent(event *abiutil.EventParam) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } - amountParam, err := event.FieldUint256("amount") + amountParam, err := event.FieldByIDUint256(1) if err != nil { return err } - durationParam, err := event.FieldUint256("duration") + durationParam, err := event.FieldByIDUint256(2) if err != nil { return err } @@ -288,12 +288,12 @@ func (eh *eventHandler) handleBucketExpandedEvent(event abiutil.EventParam) erro return nil } -func (eh *eventHandler) handleDonatedEvent(event abiutil.EventParam) error { - tokenIDParam, err := event.IndexedFieldUint256("bucketId") +func (eh *eventHandler) handleDonatedEvent(event *abiutil.EventParam) error { + tokenIDParam, err := event.FieldByIDUint256(0) if err != nil { return err } - amountParam, err := event.FieldUint256("amount") + amountParam, err := event.FieldByIDUint256(2) if err != nil { return err } From ef40dc851a9421d736adf74475af2cadaf71d288 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 24 Apr 2024 17:23:58 +0800 Subject: [PATCH 07/10] add contractAddress() --- systemcontractindex/stakingindex/index.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 058448f0c1..f77ab36801 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -68,6 +68,11 @@ func (s *Indexer) StartHeight() uint64 { return s.common.StartHeight() } +// ContractAddress returns the contract address +func (s *Indexer) ContractAddress() string { + return s.common.ContractAddress() +} + // Buckets returns the buckets func (s *Indexer) Buckets(height uint64) ([]*VoteBucket, error) { s.mutex.RLock() From 1aaa7ef738c6a9e118b0a5c397b98e48dad797b7 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 24 Apr 2024 21:20:00 +0800 Subject: [PATCH 08/10] address comments --- systemcontractindex/common.go | 50 ++++++++++++------- systemcontractindex/stakingindex/bucket.go | 3 +- systemcontractindex/stakingindex/cache.go | 4 +- .../stakingindex/event_handler.go | 2 +- systemcontractindex/stakingindex/index.go | 41 +++++++++------ 5 files changed, 62 insertions(+), 38 deletions(-) diff --git a/systemcontractindex/common.go b/systemcontractindex/common.go index 8a1fa47127..1bc1ffacb7 100644 --- a/systemcontractindex/common.go +++ b/systemcontractindex/common.go @@ -1,6 +1,8 @@ package systemcontractindex import ( + "context" + "github.com/pkg/errors" "github.com/iotexproject/iotex-core/db" @@ -18,6 +20,7 @@ type IndexerCommon struct { ns string key []byte startHeight uint64 + height uint64 contractAddress string } @@ -32,6 +35,24 @@ func NewIndexerCommon(kvstore db.KVStore, ns string, key []byte, contractAddress } } +// Start starts the indexer +func (s *IndexerCommon) Start(ctx context.Context) error { + if err := s.kvstore.Start(ctx); err != nil { + return err + } + h, err := s.loadHeight() + if err != nil { + return err + } + s.height = h + return nil +} + +// Stop stops the indexer +func (s *IndexerCommon) Stop(ctx context.Context) error { + return s.kvstore.Stop(ctx) +} + // KVStore returns the kvstore func (s *IndexerCommon) KVStore() db.KVStore { return s.kvstore } @@ -39,7 +60,11 @@ func (s *IndexerCommon) KVStore() db.KVStore { return s.kvstore } func (s *IndexerCommon) ContractAddress() string { return s.contractAddress } // Height returns the tip block height -func (s *IndexerCommon) Height() (uint64, error) { +func (s *IndexerCommon) Height() uint64 { + return s.height +} + +func (s *IndexerCommon) loadHeight() (uint64, error) { // get the tip block height var height uint64 h, err := s.kvstore.Get(s.ns, s.key) @@ -57,26 +82,17 @@ func (s *IndexerCommon) Height() (uint64, error) { // StartHeight returns the start height of the indexer func (s *IndexerCommon) StartHeight() uint64 { return s.startHeight } -// PutHeight puts the tip block height +// Commit commits the height to the indexer func (s *IndexerCommon) Commit(height uint64, delta batch.KVStoreBatch) error { + s.height = height delta.Put(s.ns, s.key, byteutil.Uint64ToBytesBigEndian(height), "failed to put height") return s.kvstore.WriteBatch(delta) } -// BlockContinuity checks the block continuity -func (s *IndexerCommon) BlockContinuity(height uint64) (existed bool, err error) { - tipHeight, err := s.Height() - if err != nil { - return false, err - } - expectHeight := tipHeight + 1 - if expectHeight < s.startHeight { - expectHeight = s.startHeight - } - if expectHeight == height { - return false, nil - } else if expectHeight > height { - return true, nil +// ExpectedHeight returns the expected height +func (s *IndexerCommon) ExpectedHeight() uint64 { + if s.height < s.startHeight { + return s.startHeight } - return false, errors.Errorf("invalid block height %d, expect %d", height, expectHeight) + return s.height + 1 } diff --git a/systemcontractindex/stakingindex/bucket.go b/systemcontractindex/stakingindex/bucket.go index cde0019615..97c599bf9b 100644 --- a/systemcontractindex/stakingindex/bucket.go +++ b/systemcontractindex/stakingindex/bucket.go @@ -40,7 +40,7 @@ func (bi *Bucket) Deserialize(b []byte) error { // clone clones the bucket info func (bi *Bucket) toProto() *stakingpb.Bucket { - pb := &stakingpb.Bucket{ + return &stakingpb.Bucket{ Candidate: bi.Candidate.String(), CreatedAt: bi.CreatedAt, Owner: bi.Owner.String(), @@ -49,7 +49,6 @@ func (bi *Bucket) toProto() *stakingpb.Bucket { Amount: bi.StakedAmount.String(), Duration: bi.StakedDurationBlockNumber, } - return pb } func (bi *Bucket) loadProto(p *stakingpb.Bucket) error { diff --git a/systemcontractindex/stakingindex/cache.go b/systemcontractindex/stakingindex/cache.go index 5f5f11a49a..93b5591746 100644 --- a/systemcontractindex/stakingindex/cache.go +++ b/systemcontractindex/stakingindex/cache.go @@ -115,7 +115,7 @@ func (s *cache) Bucket(id uint64) *Bucket { return nil } -func (s *cache) BucketsByIndices(indices []uint64) []*Bucket { +func (s *cache) Buckets(indices []uint64) []*Bucket { buckets := make([]*Bucket, 0, len(indices)) for _, idx := range indices { if bkt, ok := s.buckets[idx]; ok { @@ -125,7 +125,7 @@ func (s *cache) BucketsByIndices(indices []uint64) []*Bucket { return buckets } -func (s *cache) BucketIdxsByCandidate(candidate address.Address) []uint64 { +func (s *cache) BucketIdsByCandidate(candidate address.Address) []uint64 { cand := candidate.String() buckets := make([]uint64, 0, len(s.bucketsByCandidate[cand])) for idx := range s.bucketsByCandidate[cand] { diff --git a/systemcontractindex/stakingindex/event_handler.go b/systemcontractindex/stakingindex/event_handler.go index 25766288f2..8eab36dd9b 100644 --- a/systemcontractindex/stakingindex/event_handler.go +++ b/systemcontractindex/stakingindex/event_handler.go @@ -69,7 +69,7 @@ func (eh *eventHandler) HandleEvent(ctx context.Context, blk *block.Block, actLo if err != nil { return err } - log.L().Info("handle staking event", zap.String("event", abiEvent.Name), zap.Any("event", event)) + log.L().Debug("handle staking event", zap.String("event", abiEvent.Name), zap.Any("event", event)) // handle different kinds of event switch abiEvent.Name { case "Staked": diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index f77ab36801..6433e13285 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -8,9 +8,11 @@ import ( "github.com/iotexproject/iotex-address/address" "github.com/iotexproject/iotex-proto/golang/iotextypes" "github.com/pkg/errors" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/db" + "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/systemcontractindex" ) @@ -45,7 +47,9 @@ func NewIndexer(kvstore db.KVStore, contractAddr string, startHeight uint64, blo // Start starts the indexer func (s *Indexer) Start(ctx context.Context) error { - if err := s.common.KVStore().Start(ctx); err != nil { + s.mutex.Lock() + defer s.mutex.Unlock() + if err := s.common.Start(ctx); err != nil { return err } return s.cache.Load(s.common.KVStore()) @@ -53,23 +57,29 @@ func (s *Indexer) Start(ctx context.Context) error { // Stop stops the indexer func (s *Indexer) Stop(ctx context.Context) error { - return s.common.KVStore().Stop(ctx) + s.mutex.Lock() + defer s.mutex.Unlock() + return s.common.Stop(ctx) } // Height returns the tip block height func (s *Indexer) Height() (uint64, error) { s.mutex.RLock() defer s.mutex.RUnlock() - return s.common.Height() + return s.common.Height(), nil } // StartHeight returns the start height of the indexer func (s *Indexer) StartHeight() uint64 { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.common.StartHeight() } // ContractAddress returns the contract address func (s *Indexer) ContractAddress() string { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.common.ContractAddress() } @@ -84,7 +94,7 @@ func (s *Indexer) Buckets(height uint64) ([]*VoteBucket, error) { return nil, nil } idxs := s.cache.BucketIdxs() - bkts := s.cache.BucketsByIndices(idxs) + bkts := s.cache.Buckets(idxs) vbs := batchAssembleVoteBucket(idxs, bkts, s.common.ContractAddress(), s.blockInterval) return vbs, nil } @@ -117,7 +127,7 @@ func (s *Indexer) BucketsByIndices(indices []uint64, height uint64) ([]*VoteBuck } else if unstart { return nil, nil } - bkts := s.cache.BucketsByIndices(indices) + bkts := s.cache.Buckets(indices) vbs := batchAssembleVoteBucket(indices, bkts, s.common.ContractAddress(), s.blockInterval) return vbs, nil } @@ -132,8 +142,8 @@ func (s *Indexer) BucketsByCandidate(candidate address.Address, height uint64) ( } else if unstart { return nil, nil } - idxs := s.cache.BucketIdxsByCandidate(candidate) - bkts := s.cache.BucketsByIndices(idxs) + idxs := s.cache.BucketIdsByCandidate(candidate) + bkts := s.cache.Buckets(idxs) vbs := batchAssembleVoteBucket(idxs, bkts, s.common.ContractAddress(), s.blockInterval) return vbs, nil } @@ -153,10 +163,14 @@ func (s *Indexer) TotalBucketCount(height uint64) (uint64, error) { // PutBlock puts a block into indexer func (s *Indexer) PutBlock(ctx context.Context, blk *block.Block) error { + s.mutex.Lock() + defer s.mutex.Unlock() // check block continuity - if existed, err := s.common.BlockContinuity(blk.Height()); err != nil { - return err - } else if existed { + expect := s.common.ExpectedHeight() + if blk.Height() > expect { + return errors.Errorf("invalid block height %d, expect %d", blk.Height(), expect) + } else if blk.Height() < expect { + log.L().Debug("indexer skip block", zap.Uint64("height", blk.Height()), zap.Uint64("expect", expect)) return nil } // handle events of block @@ -180,8 +194,6 @@ func (s *Indexer) PutBlock(ctx context.Context, blk *block.Block) error { func (s *Indexer) commit(handler *eventHandler, height uint64) error { delta, dirty := handler.Finalize() - s.mutex.Lock() - defer s.mutex.Unlock() // update db if err := s.common.Commit(height, delta); err != nil { return err @@ -199,10 +211,7 @@ func (s *Indexer) checkHeight(height uint64) (unstart bool, err error) { if height == 0 { return false, nil } - tipHeight, err := s.common.Height() - if err != nil { - return false, err - } + tipHeight := s.common.Height() if height > tipHeight { return false, errors.Errorf("invalid block height %d, expect %d", height, tipHeight) } From 5e672cde998b6b3e4d6595520c3f3403a52b8c14 Mon Sep 17 00:00:00 2001 From: envestcc Date: Tue, 30 Apr 2024 15:27:45 +0800 Subject: [PATCH 09/10] address comment --- systemcontractindex/stakingindex/event_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemcontractindex/stakingindex/event_handler.go b/systemcontractindex/stakingindex/event_handler.go index 8eab36dd9b..88ce14f015 100644 --- a/systemcontractindex/stakingindex/event_handler.go +++ b/systemcontractindex/stakingindex/event_handler.go @@ -320,5 +320,5 @@ func (eh *eventHandler) putBucket(id uint64, bkt *Bucket) { func (eh *eventHandler) delBucket(id uint64) { eh.dirty.DeleteBucket(id) - eh.delta.Delete(stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), "failed to put bucket") + eh.delta.Delete(stakingBucketNS, byteutil.Uint64ToBytesBigEndian(id), "failed to delete bucket") } From 68c8016eef3728f8b6aea32bd05ca287688adb57 Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 5 Jun 2024 17:52:17 +0800 Subject: [PATCH 10/10] historical indexer --- systemcontractindex/common.go | 45 ++++++++++------ systemcontractindex/stakingindex/index.go | 63 ++++++++++++++++++----- 2 files changed, 81 insertions(+), 27 deletions(-) diff --git a/systemcontractindex/common.go b/systemcontractindex/common.go index 1bc1ffacb7..5db35595f6 100644 --- a/systemcontractindex/common.go +++ b/systemcontractindex/common.go @@ -10,19 +10,29 @@ import ( "github.com/iotexproject/iotex-core/pkg/util/byteutil" ) -// IndexerCommon is the common struct for all contract indexers -// It provides the basic functions, including -// 1. kvstore -// 2. put/get index height -// 3. contract address -type IndexerCommon struct { - kvstore db.KVStore - ns string - key []byte - startHeight uint64 - height uint64 - contractAddress string -} +type ( + // IndexerCommon is the common struct for all contract indexers + // It provides the basic functions, including + // 1. kvstore + // 2. put/get index height + // 3. contract address + IndexerCommon struct { + kvstore db.KVStore + ns string + key []byte + startHeight uint64 + height uint64 + contractAddress string + } + + stateType interface { + Load(kvstore db.KVStore) error + } + kvStoreWithVersion interface { + db.KVStore + WithVersion(version uint64) db.KVStore + } +) // NewIndexerCommon creates a new IndexerCommon func NewIndexerCommon(kvstore db.KVStore, ns string, key []byte, contractAddress string, startHeight uint64) *IndexerCommon { @@ -53,8 +63,13 @@ func (s *IndexerCommon) Stop(ctx context.Context) error { return s.kvstore.Stop(ctx) } -// KVStore returns the kvstore -func (s *IndexerCommon) KVStore() db.KVStore { return s.kvstore } +// StateAt loads the state at the given height +func (s *IndexerCommon) StateAt(state stateType, height uint64) error { + if kvstore, ok := s.kvstore.(kvStoreWithVersion); ok { + return state.Load(kvstore.WithVersion(height)) + } + return errors.New("kvstore does not support versioning") +} // ContractAddress returns the contract address func (s *IndexerCommon) ContractAddress() string { return s.contractAddress } diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 6433e13285..d4708f9a89 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -5,6 +5,7 @@ import ( "sync" "time" + iocache "github.com/iotexproject/go-pkgs/cache" "github.com/iotexproject/iotex-address/address" "github.com/iotexproject/iotex-proto/golang/iotextypes" "github.com/pkg/errors" @@ -30,9 +31,10 @@ type ( // Indexer is the staking indexer Indexer struct { common *systemcontractindex.IndexerCommon - cache *cache // in-memory cache, used to query index data mutex sync.RWMutex blockInterval time.Duration + + caches iocache.LRUCache } ) @@ -40,8 +42,8 @@ type ( func NewIndexer(kvstore db.KVStore, contractAddr string, startHeight uint64, blockInterval time.Duration) *Indexer { return &Indexer{ common: systemcontractindex.NewIndexerCommon(kvstore, stakingNS, stakingHeightKey, contractAddr, startHeight), - cache: newCache(), blockInterval: blockInterval, + caches: iocache.NewThreadSafeLruCache(8), } } @@ -52,7 +54,7 @@ func (s *Indexer) Start(ctx context.Context) error { if err := s.common.Start(ctx); err != nil { return err } - return s.cache.Load(s.common.KVStore()) + return nil } // Stop stops the indexer @@ -93,8 +95,12 @@ func (s *Indexer) Buckets(height uint64) ([]*VoteBucket, error) { } else if unstart { return nil, nil } - idxs := s.cache.BucketIdxs() - bkts := s.cache.Buckets(idxs) + cache, err := s.cacheAt(height) + if err != nil { + return nil, err + } + idxs := cache.BucketIdxs() + bkts := cache.Buckets(idxs) vbs := batchAssembleVoteBucket(idxs, bkts, s.common.ContractAddress(), s.blockInterval) return vbs, nil } @@ -109,7 +115,11 @@ func (s *Indexer) Bucket(id uint64, height uint64) (*VoteBucket, bool, error) { } else if unstart { return nil, false, nil } - bkt := s.cache.Bucket(id) + cache, err := s.cacheAt(height) + if err != nil { + return nil, false, err + } + bkt := cache.Bucket(id) if bkt == nil { return nil, false, nil } @@ -127,7 +137,11 @@ func (s *Indexer) BucketsByIndices(indices []uint64, height uint64) ([]*VoteBuck } else if unstart { return nil, nil } - bkts := s.cache.Buckets(indices) + cache, err := s.cacheAt(height) + if err != nil { + return nil, err + } + bkts := cache.Buckets(indices) vbs := batchAssembleVoteBucket(indices, bkts, s.common.ContractAddress(), s.blockInterval) return vbs, nil } @@ -142,8 +156,12 @@ func (s *Indexer) BucketsByCandidate(candidate address.Address, height uint64) ( } else if unstart { return nil, nil } - idxs := s.cache.BucketIdsByCandidate(candidate) - bkts := s.cache.Buckets(idxs) + cache, err := s.cacheAt(height) + if err != nil { + return nil, err + } + idxs := cache.BucketIdsByCandidate(candidate) + bkts := cache.Buckets(idxs) vbs := batchAssembleVoteBucket(idxs, bkts, s.common.ContractAddress(), s.blockInterval) return vbs, nil } @@ -158,7 +176,11 @@ func (s *Indexer) TotalBucketCount(height uint64) (uint64, error) { } else if unstart { return 0, nil } - return s.cache.TotalBucketCount(), nil + cache, err := s.cacheAt(height) + if err != nil { + return 0, err + } + return cache.TotalBucketCount(), nil } // PutBlock puts a block into indexer @@ -174,7 +196,11 @@ func (s *Indexer) PutBlock(ctx context.Context, blk *block.Block) error { return nil } // handle events of block - handler := newEventHandler(s.cache.Copy()) + cache, err := s.cacheAt(blk.Height()) + if err != nil { + return err + } + handler := newEventHandler(cache.Copy()) for _, receipt := range blk.Receipts { if receipt.Status != uint64(iotextypes.ReceiptStatus_Success) { continue @@ -199,7 +225,7 @@ func (s *Indexer) commit(handler *eventHandler, height uint64) error { return err } // update cache - s.cache = dirty + s.caches.Add(height, dirty) return nil } @@ -217,3 +243,16 @@ func (s *Indexer) checkHeight(height uint64) (unstart bool, err error) { } return false, nil } + +func (s *Indexer) cacheAt(height uint64) (*cache, error) { + c, ok := s.caches.Get(height) + if ok { + return c.(*cache), nil + } + nc := newCache() + if err := s.common.StateAt(nc, height); err != nil { + return nil, err + } + s.caches.Add(height, nc) + return nc, nil +}