Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Automatic retirement of state tables to their own snapshot files #12508

Open
wants to merge 107 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 99 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
5567da9
save
Giulio2002 Oct 25, 2024
11dac56
save
Giulio2002 Oct 25, 2024
af37440
save
Giulio2002 Oct 25, 2024
ae5f2ba
save
Giulio2002 Oct 25, 2024
4f8eca4
save
Giulio2002 Oct 26, 2024
352332c
save
Giulio2002 Oct 26, 2024
9d9bd2f
save
Giulio2002 Oct 26, 2024
c62a248
save
Giulio2002 Oct 26, 2024
bda7346
save
Giulio2002 Oct 26, 2024
9a01dff
save
Giulio2002 Oct 26, 2024
93fc749
save
Giulio2002 Oct 26, 2024
a9ed413
save
Giulio2002 Oct 26, 2024
aa960d9
save
Giulio2002 Oct 26, 2024
b4801e2
save
Giulio2002 Oct 26, 2024
8b606a6
save
Giulio2002 Oct 26, 2024
0588cae
save
Giulio2002 Oct 26, 2024
d8d972b
save
Giulio2002 Oct 26, 2024
a98bd84
save
Giulio2002 Oct 26, 2024
d51b414
save
Giulio2002 Oct 26, 2024
3749e48
save
Giulio2002 Oct 26, 2024
29be491
save
Giulio2002 Oct 26, 2024
1bc75ac
save
Giulio2002 Oct 26, 2024
2071381
save
Giulio2002 Oct 26, 2024
b390a57
save
Giulio2002 Oct 26, 2024
193967a
save
Giulio2002 Oct 26, 2024
1f359a6
save
Giulio2002 Oct 26, 2024
52d3d1f
save
Giulio2002 Oct 26, 2024
8ffef87
save
Giulio2002 Oct 26, 2024
ea95bc3
save
Giulio2002 Oct 26, 2024
2e7f829
save
Giulio2002 Oct 26, 2024
5423862
save
Giulio2002 Oct 26, 2024
4a72820
save
Giulio2002 Oct 26, 2024
60317d9
save
Giulio2002 Oct 26, 2024
6c12a91
save
Giulio2002 Oct 26, 2024
e772766
save
Giulio2002 Oct 26, 2024
904248b
save
Giulio2002 Oct 26, 2024
e123bf7
save
Giulio2002 Oct 26, 2024
df8e2ef
save
Giulio2002 Oct 26, 2024
22e2565
save
Giulio2002 Oct 26, 2024
0fd5197
save
Giulio2002 Oct 26, 2024
f5c52bd
save
Giulio2002 Oct 26, 2024
35f6c3d
save
Giulio2002 Oct 26, 2024
8d71404
save
Giulio2002 Oct 26, 2024
296e2e5
save
Giulio2002 Oct 26, 2024
9efb112
save
Giulio2002 Oct 26, 2024
1421683
save
Giulio2002 Oct 26, 2024
0a79e0e
save
Giulio2002 Oct 26, 2024
d26fd6c
save
Giulio2002 Oct 26, 2024
9ba3457
save
Giulio2002 Oct 26, 2024
9a69e31
save
Giulio2002 Oct 26, 2024
7462a5c
save
Giulio2002 Oct 26, 2024
a23be29
save
Giulio2002 Oct 26, 2024
ede1c48
save
Giulio2002 Oct 26, 2024
44fc37f
save
Giulio2002 Oct 26, 2024
9ec82fd
save
Giulio2002 Oct 26, 2024
17db8c9
save
Giulio2002 Oct 26, 2024
a43d9d5
save
Giulio2002 Oct 26, 2024
f94332d
save
Giulio2002 Oct 26, 2024
6370a83
save
Giulio2002 Oct 26, 2024
6c4da9f
save
Giulio2002 Oct 26, 2024
5ed3f72
save
Giulio2002 Oct 26, 2024
c9c69d1
save
Giulio2002 Oct 26, 2024
155b5f2
save
Giulio2002 Oct 26, 2024
9a4a46e
save
Giulio2002 Oct 26, 2024
39b4336
save
Giulio2002 Oct 26, 2024
5574844
save
Giulio2002 Oct 26, 2024
33dab63
save
Giulio2002 Oct 26, 2024
1c208be
save
Giulio2002 Oct 26, 2024
66b0d8d
save
Giulio2002 Oct 26, 2024
ced1247
save
Giulio2002 Oct 26, 2024
f1d8710
save
Giulio2002 Oct 26, 2024
8063667
save
Giulio2002 Oct 26, 2024
a1b7331
save
Giulio2002 Oct 26, 2024
ce25fd7
save
Giulio2002 Oct 26, 2024
a270509
save
Giulio2002 Oct 26, 2024
8be282f
save
Giulio2002 Oct 26, 2024
5a376f9
save
Giulio2002 Oct 26, 2024
661d7bf
save
Giulio2002 Oct 26, 2024
c6810c5
save
Giulio2002 Oct 26, 2024
feac154
save
Giulio2002 Oct 26, 2024
a96de6b
save
Giulio2002 Oct 26, 2024
35913bc
save
Giulio2002 Oct 27, 2024
810e0a7
save
Giulio2002 Oct 27, 2024
5470be1
save
Giulio2002 Oct 27, 2024
1df5851
save
Giulio2002 Oct 27, 2024
2485742
save
Giulio2002 Oct 27, 2024
0fe1e64
save
Giulio2002 Oct 27, 2024
4866159
save
Giulio2002 Oct 27, 2024
ed7d368
save
Giulio2002 Oct 27, 2024
07c3075
save
Giulio2002 Oct 27, 2024
1052ee6
save
Giulio2002 Oct 27, 2024
ac4e98c
save
Giulio2002 Oct 27, 2024
a4de21e
save
Giulio2002 Oct 27, 2024
bfd791a
save
Giulio2002 Oct 27, 2024
0766013
save
Giulio2002 Oct 27, 2024
b07c55c
save
Giulio2002 Oct 27, 2024
26c7d9d
save
Giulio2002 Oct 27, 2024
b881496
save
Giulio2002 Oct 27, 2024
70c0538
save
Giulio2002 Oct 27, 2024
2953bbf
save
Giulio2002 Oct 27, 2024
37838b0
save
Giulio2002 Oct 27, 2024
a1259e1
save
Giulio2002 Oct 27, 2024
106f4e5
save
Giulio2002 Oct 27, 2024
a13ef79
save
Giulio2002 Oct 27, 2024
e8cc733
save
Giulio2002 Oct 27, 2024
4147064
save
Giulio2002 Oct 27, 2024
032aed1
save
Giulio2002 Oct 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Antiquary struct {
downloader proto_downloader.DownloaderClient
logger log.Logger
sn *freezeblocks.CaplinSnapshots
stateSn *freezeblocks.CaplinStateSnapshots
snReader freezeblocks.BeaconSnapshotReader
snBuildSema *semaphore.Weighted // semaphore for building only one type (blocks, caplin, v3) at a time
ctx context.Context
Expand All @@ -65,7 +66,7 @@ type Antiquary struct {
balances32 []byte
}

func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary {
func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, stateSn *freezeblocks.CaplinStateSnapshots, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary {
backfilled := &atomic.Bool{}
blobBackfilled := &atomic.Bool{}
backfilled.Store(false)
Expand All @@ -89,6 +90,7 @@ func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, gen
blocks: blocks,
blobs: blobs,
snapgen: snapgen,
stateSn: stateSn,
}
}

Expand Down
133 changes: 122 additions & 11 deletions cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"time"

"github.com/klauspost/compress/zstd"

"github.com/erigontech/erigon-lib/common"
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/downloader/snaptype"
"github.com/erigontech/erigon-lib/etl"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
Expand Down Expand Up @@ -111,6 +113,9 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr
if err != nil {
return
}
if s.stateSn != nil {
progress = max(progress, s.stateSn.BlocksAvailable())
}

finalized, err = beacon_indicies.ReadHighestFinalized(tx)
if err != nil {
Expand All @@ -119,17 +124,84 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr
return
}

func (s *Antiquary) fillStaticValidatorsTable(ctx context.Context) error {
if s.stateSn == nil || s.stateSn.BlocksAvailable() <= s.validatorsTable.Slot() {
return nil
}
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
blocksAvaiable := s.stateSn.BlocksAvailable()
stateSnRoTx := s.stateSn.View()
defer stateSnRoTx.Close()
if s.genesisState == nil {
return fmt.Errorf("genesis state is nil")
}
startSlot := s.validatorsTable.Slot() + 1
if startSlot == 1 {
startSlot = 0
}

start := time.Now()
for slot := s.validatorsTable.Slot() + 1; slot <= blocksAvaiable; slot++ {
seg, ok := stateSnRoTx.VisibleSegment(slot, kv.StateEvents)
if !ok {
return fmt.Errorf("segment not found for slot %d", slot)
}
buf, err := seg.Get(slot)
if err != nil {
return err
}
if len(buf) == 0 {
continue
}
event := state_accessors.NewStateEventsFromBytes(buf)
state_accessors.ReplayEvents(
func(validatorIndex uint64, validator solid.Validator) error {
return s.validatorsTable.AddValidator(validator, validatorIndex, slot)
},
func(validatorIndex uint64, exitEpoch uint64) error {
return s.validatorsTable.AddExitEpoch(validatorIndex, slot, exitEpoch)
},
func(validatorIndex uint64, withdrawableEpoch uint64) error {
return s.validatorsTable.AddWithdrawableEpoch(validatorIndex, slot, withdrawableEpoch)
},
func(validatorIndex uint64, withdrawalCredentials libcommon.Hash) error {
return s.validatorsTable.AddWithdrawalCredentials(validatorIndex, slot, withdrawalCredentials)
},
func(validatorIndex uint64, activationEpoch uint64) error {
return s.validatorsTable.AddActivationEpoch(validatorIndex, slot, activationEpoch)
},
func(validatorIndex uint64, activationEligibilityEpoch uint64) error {
return s.validatorsTable.AddActivationEligibility(validatorIndex, slot, activationEligibilityEpoch)
},
func(validatorIndex uint64, slashed bool) error {
return s.validatorsTable.AddSlashed(validatorIndex, slot, slashed)
},
event,
)
s.validatorsTable.SetSlot(slot)
}
s.logger.Info("[Antiquary] Filled static validators table", "slots", blocksAvaiable, "elapsed", time.Since(start))
return nil
}

func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
var tx kv.Tx

// Check if you need to fill the static validators table
if err := s.fillStaticValidatorsTable(ctx); err != nil {
return err
}

tx, err := s.mainDB.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()

// maps which validators changes
changedValidators := make(map[uint64]struct{})
var changedValidators sync.Map

stateAntiquaryCollector := newBeaconStatesCollector(s.cfg, s.dirs.Tmp, s.logger)
defer stateAntiquaryCollector.close()
Expand All @@ -144,7 +216,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
}
// Mark all validators as touched because we just initizialized the whole state.
s.currentState.ForEachValidator(func(v solid.Validator, index, total int) bool {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
if err = s.validatorsTable.AddValidator(v, uint64(index), 0); err != nil {
return false
}
Expand Down Expand Up @@ -175,37 +247,37 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return stateAntiquaryCollector.collectIntraEpochRandaoMix(slot, mix)
},
OnNewValidator: func(index int, v solid.Validator, balance uint64) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.AddValidator(uint64(index), v)
return s.validatorsTable.AddValidator(v, uint64(index), slot)
},
OnNewValidatorActivationEpoch: func(index int, epoch uint64) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.ChangeActivationEpoch(uint64(index), epoch)
return s.validatorsTable.AddActivationEpoch(uint64(index), slot, epoch)
},
OnNewValidatorExitEpoch: func(index int, epoch uint64) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.ChangeExitEpoch(uint64(index), epoch)
return s.validatorsTable.AddExitEpoch(uint64(index), slot, epoch)
},
OnNewValidatorWithdrawableEpoch: func(index int, epoch uint64) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.ChangeWithdrawableEpoch(uint64(index), epoch)
return s.validatorsTable.AddWithdrawableEpoch(uint64(index), slot, epoch)
},
OnNewValidatorSlashed: func(index int, newSlashed bool) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.ChangeSlashed(uint64(index), newSlashed)
return s.validatorsTable.AddSlashed(uint64(index), slot, newSlashed)
},
OnNewValidatorActivationEligibilityEpoch: func(index int, epoch uint64) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.ChangeActivationEligibilityEpoch(uint64(index), epoch)
return s.validatorsTable.AddActivationEligibility(uint64(index), slot, epoch)
},
OnNewValidatorWithdrawalCredentials: func(index int, wc []byte) error {
changedValidators[uint64(index)] = struct{}{}
changedValidators.Store(uint64(index), struct{}{})
events.ChangeWithdrawalCredentials(uint64(index), libcommon.BytesToHash(wc))
return s.validatorsTable.AddWithdrawalCredentials(uint64(index), slot, libcommon.BytesToHash(wc))
},
Expand Down Expand Up @@ -389,7 +461,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {

buf := &bytes.Buffer{}
s.validatorsTable.ForEach(func(validatorIndex uint64, validator *state_accessors.StaticValidator) bool {
if _, ok := changedValidators[validatorIndex]; !ok {
if _, ok := changedValidators.Load(validatorIndex); !ok {
return true
}
buf.Reset()
Expand All @@ -413,6 +485,41 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
if s.snapgen {
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
blocksPerStatefulFile := uint64(snaptype.CaplinMergeLimit * 5)
from := s.stateSn.BlocksAvailable() + 1
if from+blocksPerStatefulFile+safetyMargin > s.currentState.Slot() {
return nil
}
to := s.currentState.Slot()
if to < (safetyMargin + blocksPerStatefulFile) {
return nil
}
to = to - (safetyMargin + blocksPerStatefulFile)
if from >= to {
return nil
}
if err := s.stateSn.DumpCaplinState(
ctx,
s.stateSn.BlocksAvailable()+1,
to,
blocksPerStatefulFile,
s.sn.Salt,
s.dirs,
runtime.NumCPU(),
log.LvlInfo,
s.logger,
); err != nil {
return err
}
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
}

return nil
}

Expand All @@ -439,12 +546,15 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv.
if err != nil {
return err
}
if s.stateSn != nil {
targetSlot = max(targetSlot, s.stateSn.BlocksAvailable())
}
// We want to backoff by some slots until we get a correct state from DB.
// we start from 10 * clparams.SlotsPerDump.
backoffStrides := uint64(10)
backoffStep := backoffStrides

historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState)
historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState, s.stateSn)

for {
attempt, err := computeSlotToBeRequested(tx, s.cfg, s.genesisState.Slot(), targetSlot, backoffStep)
Expand All @@ -465,6 +575,7 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv.
if err != nil {
return fmt.Errorf("failed to read historical state at slot %d: %w", attempt, err)
}
fmt.Println("currentState", s.currentState, attempt)
if s.currentState == nil {
log.Warn("historical state not found, backoff more and try again", "slot", attempt)
backoffStep += backoffStrides
Expand Down
14 changes: 9 additions & 5 deletions cl/beacon/handler/attestation_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,30 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
if lastSlot > stateProgress {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("requested range is not yet processed or the node is not archivial"))
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()

epochData, err := state_accessors.ReadEpochData(tx, a.beaconChainCfg.RoundSlotToEpoch(lastSlot))
stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)

epochData, err := state_accessors.ReadEpochData(stateGetter, a.beaconChainCfg.RoundSlotToEpoch(lastSlot))
if err != nil {
return nil, err
}

validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, lastSlot)
validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, stateGetter, lastSlot)
if err != nil {
return nil, err
}
if validatorSet == nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("no validator set found for this epoch"))
}

_, previousIdx, err := a.stateReader.ReadParticipations(tx, lastSlot)
_, previousIdx, err := a.stateReader.ReadParticipations(tx, stateGetter, lastSlot)
if err != nil {
return nil, err
}

_, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
_, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -212,7 +216,7 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
return resp.WithFinalized(true).WithOptimistic(a.forkchoiceStore.IsRootOptimistic(root)), nil
}
inactivityScores := solid.NewUint64ListSSZ(int(a.beaconChainCfg.ValidatorRegistryLimit))
if err := a.stateReader.ReconstructUint64ListDump(tx, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil {
if err := a.stateReader.ReconstructUint64ListDump(stateGetter, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil {
return nil, err
}
resp, err := a.computeAttestationsRewardsForAltair(
Expand Down
9 changes: 7 additions & 2 deletions cl/beacon/handler/committees.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,13 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea
}
return newBeaconResponse(resp).WithFinalized(isFinalized).WithOptimistic(isOptimistic), nil
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()
stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)
// finality case
activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
activeIdxs, err := state_accessors.ReadActiveIndicies(
stateGetter,
epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -134,7 +139,7 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea
}

mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err))
}
Expand Down
11 changes: 9 additions & 2 deletions cl/beacon/handler/duties_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
if (epoch)*a.beaconChainCfg.SlotsPerEpoch >= stageStateProgress {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Errorf("epoch %d is too far in the future", epoch))
}

snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()

stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)
// finality case
activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
activeIdxs, err := state_accessors.ReadActiveIndicies(
stateGetter,
epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -164,7 +171,7 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
}

mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err))
}
Expand Down
6 changes: 5 additions & 1 deletion cl/beacon/handler/duties_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func (a *ApiHandler) getSyncDuties(w http.ResponseWriter, r *http.Request) (*bea
if !ok {
_, syncCommittee, ok = a.forkchoiceStore.GetSyncCommittees(period - 1)
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()
// Read them from the archive node if we do not have them in the fast-access storage
if !ok {
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(
state_accessors.GetValFnTxAndSnapshot(tx, snRoTx),
a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
if syncCommittee == nil {
log.Warn("could not find sync committee for epoch", "epoch", epoch, "period", period)
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not find sync committee for epoch %d", epoch))
Expand Down
Loading
Loading