Skip to content

Commit

Permalink
tortoise: stay in verifying mode while recovering from disk (#5514)
Browse files Browse the repository at this point in the history
closes #5246

verifying mode is a faster way to count ballots, and it is also has fewer known security issues.

during debug i observed the following problems, which are mostly fixed here:
- certificates are not kept forever and hence tortoise cannot form an opinion on the layer without them. to correctly handle that we update last layer and opinion is loaded the database.
- when data is recovered from disk we need to start counting from the first layer in the epoch, in order to compute reference epoch height. this variable is used later in verifyLayers call 
- after layers were recovered we compute matching opinions and evict unnecessary events. this eviction was too agressive and should be limited to layers that were verified by tortoise. this is to avoid the same race as in #5523

additionally i switched tortoise to use IterateAtxs directly on the database without using lru cache, as it is faster and intend to work on dropping that in other components.
  • Loading branch information
dshulyak committed Feb 5, 2024
1 parent 9d6e559 commit 5622163
Show file tree
Hide file tree
Showing 13 changed files with 406 additions and 77 deletions.
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (app *App) initServices(ctx context.Context) error {
start := time.Now()
trtl, err := tortoise.Recover(
ctx,
app.cachedDB,
app.db,
app.clock.CurrentLayer(), trtlopts...,
)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions sql/blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ func Get(db sql.Executor, id types.BlockID) (rst *types.Block, err error) {
return rst, err
}

func LastValid(db sql.Executor) (types.LayerID, error) {
var lid types.LayerID
// it doesn't use max(layer) in order to get rows == 0 when there are no layers.
// aggregation always returns rows == 1, hence the check below doesn't work.
rows, err := db.Exec(
"select layer from blocks where validity = 1 order by layer desc limit 1;",
nil,
func(stmt *sql.Statement) bool {
lid = types.LayerID(uint32(stmt.ColumnInt64(0)))
return false
},
)
if err != nil {
return lid, fmt.Errorf("get last valid layer: %w", err)
} else if rows == 0 {
return lid, fmt.Errorf("%w: no valid layers", sql.ErrNotFound)
}
return lid, nil
}

func UpdateValid(db sql.Executor, id types.BlockID, valid bool) error {
if valid {
return SetValid(db, id)
Expand Down
53 changes: 53 additions & 0 deletions sql/blocks/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,56 @@ func TestGetLayer(t *testing.T) {
require.Equal(t, b.LayerIndex, lid)
}
}

func TestLastValid(t *testing.T) {
t.Run("empty", func(t *testing.T) {
db := sql.InMemory()
_, err := LastValid(db)
require.ErrorIs(t, err, sql.ErrNotFound)
})
t.Run("all valid", func(t *testing.T) {
db := sql.InMemory()
blocks := map[types.BlockID]struct {
lid types.LayerID
}{
{1}: {lid: 11},
{2}: {lid: 22},
{3}: {lid: 33},
}
for bid, layer := range blocks {
block := types.NewExistingBlock(
bid,
types.InnerBlock{LayerIndex: layer.lid},
)
require.NoError(t, Add(db, block))
require.NoError(t, SetValid(db, bid))
}
last, err := LastValid(db)
require.NoError(t, err)
require.Equal(t, 33, int(last))
})
t.Run("last is invalid", func(t *testing.T) {
db := sql.InMemory()
blocks := map[types.BlockID]struct {
invalid bool
lid types.LayerID
}{
{1}: {lid: 11},
{2}: {lid: 22},
{3}: {invalid: true, lid: 33},
}
for bid, layer := range blocks {
block := types.NewExistingBlock(
bid,
types.InnerBlock{LayerIndex: layer.lid},
)
require.NoError(t, Add(db, block))
if !layer.invalid {
require.NoError(t, SetValid(db, bid))
}
}
last, err := LastValid(db)
require.NoError(t, err)
require.Equal(t, 22, int(last))
})
}
71 changes: 62 additions & 9 deletions tortoise/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ func New(opts ...Opt) (*Tortoise, error) {
}

func (t *Tortoise) RecoverFrom(lid types.LayerID, opinion, prev types.Hash32) {
if lid <= types.GetEffectiveGenesis() {
t.logger.Panic("recover should be after effective genesis",
zap.Uint32("lid", lid.Uint32()),
zap.Uint32("effective genesis", types.GetEffectiveGenesis().Uint32()),
)
}
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("recover from",
Expand All @@ -140,7 +146,7 @@ func (t *Tortoise) RecoverFrom(lid types.LayerID, opinion, prev types.Hash32) {
t.trtl.evicted = lid - 1
t.trtl.pending = lid
t.trtl.verified = lid
t.trtl.processed = lid
t.trtl.processed = lid - 1 // -1 so that iteration in tallyVotes starts from the target layer
t.trtl.last = lid
layer := t.trtl.layer(lid)
layer.opinion = opinion
Expand Down Expand Up @@ -276,7 +282,7 @@ func (t *Tortoise) TallyVotes(ctx context.Context, lid types.LayerID) {
defer t.mu.Unlock()
waitTallyVotes.Observe(float64(time.Since(start).Nanoseconds()))
start = time.Now()
t.trtl.onLayer(ctx, lid)
t.trtl.tallyVotes(ctx, lid)
executeTallyVotes.Observe(float64(time.Since(start).Nanoseconds()))
if t.tracer != nil {
t.tracer.On(&TallyTrace{Layer: lid})
Expand Down Expand Up @@ -307,16 +313,43 @@ func (t *Tortoise) OnBlock(header types.BlockHeader) {
}
}

// OnValidBlock inserts block, updates that data is stored locally
// and that block was previously considered valid by tortoise.
func (t *Tortoise) OnValidBlock(header types.BlockHeader) {
start := time.Now()
// OnRecoveredBlocks uploads blocks to the state with all metadata.
//
// Implementation assumes that they will be uploaded in order.
func (t *Tortoise) OnRecoveredBlocks(lid types.LayerID, validity map[types.BlockHeader]bool, hare *types.BlockID) {
t.mu.Lock()
defer t.mu.Unlock()
waitBlockDuration.Observe(float64(time.Since(start).Nanoseconds()))
t.trtl.onBlock(header, true, true)

for block, valid := range validity {
if exists := t.trtl.getBlock(block); exists != nil {
continue
}
info := newBlockInfo(block)
info.data = true
if valid {
info.validity = support
}
t.trtl.addBlock(info)
}
if hare != nil {
t.trtl.onHareOutput(lid, *hare)
} else if !withinDistance(t.cfg.Hdist, lid, t.trtl.last) {
layer := t.trtl.state.layer(lid)
layer.hareTerminated = true
for _, info := range layer.blocks {
if info.validity == abstain {
info.validity = against
}
}
}

t.logger.Debug("loaded recovered blocks",
zap.Uint32("lid", lid.Uint32()),
zap.Uint32("last", t.trtl.last.Uint32()),
zapBlocks(t.trtl.state.layer(lid).blocks),
)
if t.tracer != nil {
t.tracer.On(&BlockTrace{Header: header, Valid: true})
t.tracer.On(newRecoveredBlocksTrace(lid, validity, hare))
}
}

Expand Down Expand Up @@ -607,3 +640,23 @@ func (t *Tortoise) Mode() Mode {
}
return Verifying
}

// UpdateLastLayer updates last layer which is used for determining weight thresholds.
func (t *Tortoise) UpdateLastLayer(last types.LayerID) {
t.mu.Lock()
defer t.mu.Unlock()
t.trtl.updateLast(last)
}

// UpdateVerified layers based on the previously known verified layer.
func (t *Tortoise) UpdateVerified(verified types.LayerID) {
t.mu.Lock()
defer t.mu.Unlock()
t.trtl.verified = verified
}

func (t *Tortoise) WithinHdist(lid types.LayerID) bool {
t.mu.Lock()
defer t.mu.Unlock()
return withinDistance(t.cfg.Hdist, lid, t.trtl.last)
}
2 changes: 1 addition & 1 deletion tortoise/model/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *core) OnMessage(m Messenger, event Message) {
m.Send(MessageBallot{Ballot: ballot})
case MessageLayerEnd:
if ev.LayerID.After(types.GetEffectiveGenesis()) {
tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb, ev.LayerID, c.tortoise.OnBallot)
tortoise.RecoverLayer(context.Background(), c.tortoise, c.cdb.Executor, ev.LayerID, c.tortoise.OnBallot)
c.tortoise.TallyVotes(context.Background(), ev.LayerID)
m.Notify(EventVerified{ID: c.id, Verified: c.tortoise.LatestComplete(), Layer: ev.LayerID})
}
Expand Down
87 changes: 54 additions & 33 deletions tortoise/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
Expand All @@ -20,7 +19,7 @@ import (
// Recover tortoise state from database.
func Recover(
ctx context.Context,
db *datastore.CachedDB,
db sql.Executor,
current types.LayerID,
opts ...Opt,
) (*Tortoise, error) {
Expand All @@ -33,17 +32,21 @@ func Recover(
if err != nil {
return nil, fmt.Errorf("failed to load latest known layer: %w", err)
}

applied, err := layers.GetLastApplied(db)
if err != nil {
return nil, fmt.Errorf("get last applied: %w", err)
}

start := types.GetEffectiveGenesis() + 1
if applied > types.LayerID(trtl.cfg.WindowSize) {
// we want to emulate the same condition as during genesis with one difference.
// genesis starts with zero opinion (aggregated hash) - see computeOpinion method.
// but in this case first processed layer should use non-zero opinion of the the previous layer.

window := applied - types.LayerID(trtl.cfg.WindowSize)
window = window.GetEpoch().
FirstLayer()
// windback to the start of the epoch to load ref ballots
// we start tallying votes from the first layer of the epoch to guarantee that we load reference ballots.
// reference ballots track beacon and eligibilities
window = window.GetEpoch().FirstLayer()
if window > start {
prev, err1 := layers.GetAggregatedHash(db, window-1)
opinion, err2 := layers.GetAggregatedHash(db, window)
Expand All @@ -70,18 +73,14 @@ func Recover(
}
}

epoch, err := atxs.LatestEpoch(db)
if err != nil {
return nil, fmt.Errorf("failed to load latest epoch: %w", err)
valid, err := blocks.LastValid(db)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, fmt.Errorf("get last valid: %w", err)
}
epoch++ // recoverEpoch expects target epoch, rather than publish
if last.GetEpoch() != epoch {
for eid := last.GetEpoch(); eid <= epoch; eid++ {
if err := recoverEpoch(eid, trtl, db); err != nil {
return nil, err
}
}
if err == nil {
trtl.UpdateVerified(valid)
}
trtl.UpdateLastLayer(last)
for lid := start; !lid.After(last); lid = lid.Add(1) {
select {
case <-ctx.Done():
Expand All @@ -101,9 +100,9 @@ func Recover(
return trtl, nil
}
trtl.TallyVotes(ctx, last)
// find topmost layer that was already applied and reset pending
// so that result for that layer is not returned
for prev := last - 1; prev >= start; prev-- {
// find topmost layer that was already applied with same result
// and reset pending so that result for that layer is not returned
for prev := valid; prev >= start; prev-- {
opinion, err := layers.GetAggregatedHash(db, prev)
if err == nil && opinion != types.EmptyLayerHash {
if trtl.OnApplied(prev, opinion) {
Expand All @@ -114,19 +113,33 @@ func Recover(
return nil, fmt.Errorf("check opinion %w", err)
}
}
// load activations from future epochs that are not yet referenced by the ballots
epoch, err := atxs.LatestEpoch(db)
if err != nil {
return nil, fmt.Errorf("failed to load latest epoch: %w", err)
}
epoch++ // recoverEpoch expects target epoch, rather than publish
if last.GetEpoch() != epoch {
for eid := last.GetEpoch() + 1; eid <= epoch; eid++ {
if err := recoverEpoch(eid, trtl, db); err != nil {
return nil, err
}
}
}
return trtl, nil
}

func recoverEpoch(epoch types.EpochID, trtl *Tortoise, db *datastore.CachedDB) error {
if err := db.IterateEpochATXHeaders(epoch, func(header *types.ActivationTxHeader) error {
trtl.OnAtx(header.ToData())
return nil
func recoverEpoch(target types.EpochID, trtl *Tortoise, db sql.Executor) error {
publish := target - 1
if err := atxs.IterateAtxs(db, publish, publish, func(atx *types.VerifiedActivationTx) bool {
trtl.OnAtx(atx.ToHeader().ToData())
return true
}); err != nil {
return err
return fmt.Errorf("iterate atxs: %w", err)
}
beacon, err := beacons.Get(db, epoch)
beacon, err := beacons.Get(db, target)
if err == nil && beacon != types.EmptyBeacon {
trtl.OnBeacon(epoch, beacon)
trtl.OnBeacon(target, beacon)
}
return nil
}
Expand All @@ -136,7 +149,7 @@ type ballotFunc func(*types.BallotTortoiseData)
func RecoverLayer(
ctx context.Context,
trtl *Tortoise,
db *datastore.CachedDB,
db sql.Executor,
lid types.LayerID,
onBallot ballotFunc,
) error {
Expand All @@ -149,29 +162,37 @@ func RecoverLayer(
if err != nil {
return err
}

results := map[types.BlockHeader]bool{}
for _, block := range blocksrst {
valid, err := blocks.IsValid(db, block.ID())
if err != nil && errors.Is(err, sql.ErrNotFound) {
return err
}
if valid {
trtl.OnValidBlock(block.ToVote())
} else {
trtl.OnBlock(block.ToVote())
}
results[block.ToVote()] = valid
}
var hareResult *types.BlockID
if trtl.WithinHdist(lid) {
hare, err := certificates.GetHareOutput(db, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return err
}
if err == nil {
trtl.OnHareOutput(lid, hare)
hareResult = &hare
}
}
trtl.OnRecoveredBlocks(lid, results, hareResult)
// tortoise votes according to the hare only within hdist (protocol parameter).
// also node is free to prune certificates outside hdist to minimize space usage.

// NOTE(dshulyak) we loaded information about malicious identities earlier.
ballotsrst, err := ballots.LayerNoMalicious(db, lid)
if err != nil {
return err
}
// NOTE(dshulyak) it is done in two steps so that if ballot from the same layer was used
// as reference or base ballot we will be able to decode it.
// it might be possible to invalidate such ballots, but until then this is required
for _, ballot := range ballotsrst {
if ballot.EpochData != nil {
onBallot(ballot.ToTortoiseData())
Expand Down
Loading

0 comments on commit 5622163

Please sign in to comment.