Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: Clear Merkle Trie on Commit Error #5568

Merged
merged 25 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a523d09
Clear Merkle Trie on Commit Error
AlgoAxel Jul 13, 2023
f255dd8
Move trie = nil to handleUnorderedCommitOrError
AlgoAxel Jul 13, 2023
b17b33b
i before e except after c
AlgoAxel Jul 13, 2023
7f24ba7
Refactor trackers to use specific handling functions during commit
AlgoAxel Jul 17, 2023
e9a347f
Add unit test for commit error handling in catchpoint tracker
AlgoAxel Jul 17, 2023
54bb588
inline clearBalancesTrie
AlgoAxel Jul 17, 2023
dea79e8
Add additional checks for other handlers
AlgoAxel Jul 17, 2023
f0b102e
Register server.Stop for logrus Fatal
AlgoAxel Jul 18, 2023
4b1ec5e
Fatal on sql3.ErrIoErr in tracker.go
AlgoAxel Jul 18, 2023
88703cd
Add unit test for RegisterExitHandler
AlgoAxel Jul 18, 2023
4c5adb3
Represent sql ErrIOErr as trackerdb ErrIOErr
AlgoAxel Jul 18, 2023
533265a
inline ioErr
AlgoAxel Jul 18, 2023
fdd32a3
Add TestFatalExitHandler comment
AlgoAxel Jul 21, 2023
f8ae83d
Fix Error cast and add unit test
AlgoAxel Jul 24, 2023
a79f89e
Add IO Error Handling Tracker Test
AlgoAxel Jul 24, 2023
26ef4b3
Add os.Exit back to Fatal handling, and give tests a logger option wi…
AlgoAxel Jul 24, 2023
b4930aa
improve error matching
AlgoAxel Jul 24, 2023
d7c89e4
Add Code Check for Error Matching
AlgoAxel Jul 24, 2023
ed4fa4d
Merge branch 'master' into fix/catchpointMerkleCommit
AlgoAxel Jul 24, 2023
7b484fa
reviewdog
AlgoAxel Jul 24, 2023
b427adf
add more wrapping; rename to wrap
AlgoAxel Aug 8, 2023
0af3eb3
update test
AlgoAxel Aug 8, 2023
81adcec
Merge branch 'master' into fix/catchpointMerkleCommit
AlgoAxel Aug 9, 2023
09ebb4e
Update ledger/catchpointtracker.go
AlgoAxel Aug 11, 2023
8fca9ce
Update ledger/catchpointtracker_test.go
AlgoAxel Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes
return fmt.Errorf("couldn't initialize the node: %s", err)
}
s.node = serverNode

// When a caller to logging uses Fatal, we want to stop the node before os.Exit is called.
logging.RegisterExitHandler(s.Stop)

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ func (ao *onlineAccounts) consecutiveVersion(offset uint64) uint64 {
return offset
}

func (ao *onlineAccounts) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (ao *onlineAccounts) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (ao *onlineAccounts) handleCommitError(dcc *deferredCommitContext) {
}

func (ao *onlineAccounts) maxBalLookback() uint64 {
Expand Down
6 changes: 5 additions & 1 deletion ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,11 @@ func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err erro
return off, nil
}

func (au *accountUpdates) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (au *accountUpdates) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (au *accountUpdates) handleCommitError(dcc *deferredCommitContext) {
}

// prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly.
Expand Down
6 changes: 5 additions & 1 deletion ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) handleUnorderedCommitOrError(*deferredCommitContext) {
func (b *bulletin) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (b *bulletin) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (b *bulletin) handleCommitError(dcc *deferredCommitContext) {
}

func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
25 changes: 21 additions & 4 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,27 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr
}
}

// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order.
// Tracker might update own state in this case. For example, account catchpoint tracker cancels
// scheduled catchpoint writing that deferred commit.
func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
// when the deferred commit is found to be out of order, cancel writing
func (ct *catchpointTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
ct.cancelWrite(dcc)
}

// if an error is encountered during commit preparation, cancel writing
func (ct *catchpointTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
ct.cancelWrite(dcc)
}

// if an error is encountered during commit, cancel writing and clear the balances trie
func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) {
// in cases where the commitRound fails, it is not certain that the merkle trie is in a clean state, and should be cleared.
// Specifically, modifications to the trie happen through accountsUpdateBalances,
// which happens before comit to disk. Errors in this tracker, subsequent trackers, or the commit to disk may cause the trie cache to be incorrect,
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
// affecting the perceived root on subsequent rounds
ct.balancesTrie = nil
ct.cancelWrite(dcc)
}

func (ct *catchpointTracker) cancelWrite(dcc *deferredCommitContext) {
// if the node is configured to generate catchpoint files, we might need to update the catchpointWriting variable.
if ct.enableGeneratingCatchpointFiles {
// determine if this was a catchpoint round
Expand Down
90 changes: 88 additions & 2 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,88 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.
require.NoError(t, err)
}

// TestCatchpointCommitErrorHandling exists to confirm that when an error occurs during catchpoint generation,
// the catchpoint tracker will clear the appropriate state - specifically, the balancesTrie will be cleared,
// and the balancesTrie will remain functional if loaded from disk, or if lazily loaded during commitRound
func TestCatchpointCommitErrorHandling(t *testing.T) {
partitiontest.PartitionTest(t)

temporaryDirectory := t.TempDir()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()

ct := &catchpointTracker{}
conf := config.GetDefaultLocal()

conf.Archival = true
ct.initialize(conf, ".")
defer ct.close()
ct.dbDirectory = temporaryDirectory

_, err := trackerDBInitialize(ml, true, ct.dbDirectory)
require.NoError(t, err)

err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)

txn, err := ml.dbs.BeginTransaction(context.Background())
require.NoError(t, err)
dcc := deferredCommitContext{
compactKvDeltas: map[string]modifiedKvValue{"key": {data: []byte("value")}},
}

// before commitRound is called, record the trie RootHash
require.NotNil(t, ct.balancesTrie)
root1, err := ct.balancesTrie.RootHash()
require.NoError(t, err)

ct.commitRound(context.Background(), txn, &dcc)

txn.Commit()

// after commitRound is called, confirm the RootHash has changed
root2, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.NotEqual(t, root1, root2)

// demonstrate that handleUnordered does not restore the trie
ct.handleUnorderedCommit(&dcc)
root2a, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root2, root2a)

// demonstrate that handlePrepareCommitError does not restore the trie
ct.handlePrepareCommitError(&dcc)
root2b, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root2, root2b)

// now have the ct handle a commit error
ct.handleCommitError(&dcc)
// after error handling, the trie should be nil
require.Nil(t, ct.balancesTrie)

// after reloading from disk, the trie should be equal to root1
err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)
root3, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root1, root3)

// also demonstrate that lazy initialization allows a nil trie to go back to root2 immediatly after error if the same delta is applied
bbroder-algo marked this conversation as resolved.
Show resolved Hide resolved
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
txn, err = ml.dbs.BeginTransaction(context.Background())
require.NoError(t, err)
ct.handleCommitError(&dcc) // clear trie
require.Nil(t, ct.balancesTrie)
ct.commitRound(context.Background(), txn, &dcc)
txn.Commit()
root4, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root2, root4)
}

// TestCatchpointFileWithLargeSpVerification makes sure that CatchpointFirstStageInfo.BiggestChunkLen is calculated based on state proof verification contexts
// as well as other chunks in the catchpoint files.
func TestCatchpointFileWithLargeSpVerification(t *testing.T) {
Expand Down Expand Up @@ -692,8 +774,12 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// handleUnorderedCommitOrError is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
// control functions are not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) {
}

// close is not used by the blockingTracker
Expand Down
6 changes: 5 additions & 1 deletion ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitCon
func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
func (mt *metricsTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (mt *metricsTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (mt *metricsTracker) handleCommitError(dcc *deferredCommitContext) {
}

func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
6 changes: 5 additions & 1 deletion ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitCont
func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) handleUnorderedCommitOrError(*deferredCommitContext) {
func (bn *blockNotifier) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bn *blockNotifier) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bn *blockNotifier) handleCommitError(dcc *deferredCommitContext) {
}

func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
6 changes: 5 additions & 1 deletion ledger/spverificationtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ func (spt *spVerificationTracker) postCommit(_ context.Context, dcc *deferredCom
func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) {
}

func (spt *spVerificationTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
func (spt *spVerificationTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (spt *spVerificationTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (spt *spVerificationTracker) handleCommitError(dcc *deferredCommitContext) {
}

func (spt *spVerificationTracker) close() {
Expand Down
3 changes: 3 additions & 0 deletions ledger/store/trackerdb/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
// ErrNotFound is returned when a record is not found.
var ErrNotFound = errors.New("trackerdb: not found")

// ErrIoErr is returned when a Disk/IO error is encountered
var ErrIoErr = errors.New("trackerdb: io error")

// AccountRef is an opaque ref to an account in the db.
type AccountRef interface {
AccountRefMarker()
Expand Down
21 changes: 21 additions & 0 deletions ledger/store/trackerdb/sqlitedriver/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (

"github.com/algorand/go-algorand/data/basics"
storetesting "github.com/algorand/go-algorand/ledger/store/testing"
"github.com/algorand/go-algorand/ledger/store/trackerdb"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -81,3 +83,22 @@ func TestAccountsDbQueriesCreateClose(t *testing.T) {
qs.Close()
require.Nil(t, qs.lookupAccountStmt)
}

// TestMaybeIOError ensures that SQL ErrIOErr is converted to trackerdb.ErrIoErr
// github.com/mattn/go-sqlite3/blob/master/error.go
// github.com/mattn/go-sqlite3/blob/master/sqlite3.go#L830
func TestMaybeIOError(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

// This structure is how sqlite3 returns Errors
err := sqlite3.Error{Code: sqlite3.ErrIoErr}
require.Equal(t, trackerdb.ErrIoErr, maybeIOError(err))

// ErrNo10 is a sqlite3 error code for ErrIoErr
err = sqlite3.Error{Code: sqlite3.ErrNo(10)}
require.Equal(t, trackerdb.ErrIoErr, maybeIOError(err))

err = sqlite3.Error{Code: sqlite3.ErrSchema}
require.NotEqual(t, trackerdb.ErrIoErr, maybeIOError(err))
}
40 changes: 27 additions & 13 deletions ledger/store/trackerdb/sqlitedriver/sqlitedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/db"
"github.com/mattn/go-sqlite3"
)

type trackerSQLStore struct {
Expand Down Expand Up @@ -66,9 +67,9 @@ func (s *trackerSQLStore) Batch(fn trackerdb.BatchFn) (err error) {
}

func (s *trackerSQLStore) BatchContext(ctx context.Context, fn trackerdb.BatchFn) (err error) {
return s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return maybeIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return fn(ctx, &sqlBatchScope{tx, false, &sqlWriter{tx}})
})
}))
}

func (s *trackerSQLStore) BeginBatch(ctx context.Context) (trackerdb.Batch, error) {
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -84,9 +85,9 @@ func (s *trackerSQLStore) Snapshot(fn trackerdb.SnapshotFn) (err error) {
}

func (s *trackerSQLStore) SnapshotContext(ctx context.Context, fn trackerdb.SnapshotFn) (err error) {
return s.pair.Rdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return maybeIOError(s.pair.Rdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return fn(ctx, sqlSnapshotScope{tx, &sqlReader{tx}})
})
}))
}

func (s *trackerSQLStore) BeginSnapshot(ctx context.Context) (trackerdb.Snapshot, error) {
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -102,9 +103,9 @@ func (s *trackerSQLStore) Transaction(fn trackerdb.TransactionFn) (err error) {
}

func (s *trackerSQLStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) (err error) {
return s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return maybeIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
return fn(ctx, &sqlTransactionScope{tx, false, &sqlReader{tx}, &sqlWriter{tx}, &sqlCatchpoint{tx}})
})
}))
}

func (s *trackerSQLStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) {
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -116,10 +117,10 @@ func (s *trackerSQLStore) BeginTransaction(ctx context.Context) (trackerdb.Trans
}

func (s trackerSQLStore) RunMigrations(ctx context.Context, params trackerdb.Params, log logging.Logger, targetVersion int32) (mgr trackerdb.InitParams, err error) {
err = s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
err = maybeIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error {
mgr, err = RunMigrations(ctx, tx, params, log, targetVersion)
return err
})
}))
return
}

Expand Down Expand Up @@ -283,15 +284,15 @@ func (bs *sqlBatchScope) ResetTransactionWarnDeadline(ctx context.Context, deadl

func (bs *sqlBatchScope) Close() error {
if !bs.committed {
return bs.tx.Rollback()
return maybeIOError(bs.tx.Rollback())
}
return nil
}

func (bs *sqlBatchScope) Commit() error {
err := bs.tx.Commit()
if err != nil {
return err
return maybeIOError(err)
}
bs.committed = true
return nil
Expand All @@ -303,7 +304,7 @@ type sqlSnapshotScope struct {
}

func (ss sqlSnapshotScope) Close() error {
return ss.tx.Rollback()
return maybeIOError(ss.tx.Rollback())
}

type sqlTransactionScope struct {
Expand All @@ -324,16 +325,29 @@ func (txs *sqlTransactionScope) ResetTransactionWarnDeadline(ctx context.Context

func (txs *sqlTransactionScope) Close() error {
if !txs.committed {
return txs.tx.Rollback()
return maybeIOError(txs.tx.Rollback())
}
return nil
}

func (txs *sqlTransactionScope) Commit() error {
err := txs.tx.Commit()
if err != nil {
return err
return maybeIOError(err)
}
txs.committed = true
return nil
}

// maybeIOError allows for SQL IO Errors to be represented as trackerdb.ErrIoErr
// in places which may enconter them.
func maybeIOError(err error) error {
AlgoAxel marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
return nil
}
serr, ok := err.(sqlite3.Error)
if !ok || serr.Code == sqlite3.ErrIoErr {
return trackerdb.ErrIoErr
}
return err
}
Loading