Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: Clear Merkle Trie on Commit Error #5568

Merged
merged 25 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a523d09
Clear Merkle Trie on Commit Error
AlgoAxel Jul 13, 2023
f255dd8
Move trie = nil to handleUnorderedCommitOrError
AlgoAxel Jul 13, 2023
b17b33b
i before e except after c
AlgoAxel Jul 13, 2023
7f24ba7
Refactor trackers to use specific handling functions during commit
AlgoAxel Jul 17, 2023
e9a347f
Add unit test for commit error handling in catchpoint tracker
AlgoAxel Jul 17, 2023
54bb588
inline clearBalancesTrie
AlgoAxel Jul 17, 2023
dea79e8
Add additional checks for other handlers
AlgoAxel Jul 17, 2023
f0b102e
Register server.Stop for logrus Fatal
AlgoAxel Jul 18, 2023
4b1ec5e
Fatal on sql3.ErrIoErr in tracker.go
AlgoAxel Jul 18, 2023
88703cd
Add unit test for RegisterExitHandler
AlgoAxel Jul 18, 2023
4c5adb3
Represent sql ErrIOErr as trackerdb ErrIOErr
AlgoAxel Jul 18, 2023
533265a
inline ioErr
AlgoAxel Jul 18, 2023
fdd32a3
Add TestFatalExitHandler comment
AlgoAxel Jul 21, 2023
f8ae83d
Fix Error cast and add unit test
AlgoAxel Jul 24, 2023
a79f89e
Add IO Error Handling Tracker Test
AlgoAxel Jul 24, 2023
26ef4b3
Add os.Exit back to Fatal handling, and give tests a logger option wi…
AlgoAxel Jul 24, 2023
b4930aa
improve error matching
AlgoAxel Jul 24, 2023
d7c89e4
Add Code Check for Error Matching
AlgoAxel Jul 24, 2023
ed4fa4d
Merge branch 'master' into fix/catchpointMerkleCommit
AlgoAxel Jul 24, 2023
7b484fa
reviewdog
AlgoAxel Jul 24, 2023
b427adf
add more wrapping; rename to wrap
AlgoAxel Aug 8, 2023
0af3eb3
update test
AlgoAxel Aug 8, 2023
81adcec
Merge branch 'master' into fix/catchpointMerkleCommit
AlgoAxel Aug 9, 2023
09ebb4e
Update ledger/catchpointtracker.go
AlgoAxel Aug 11, 2023
8fca9ce
Update ledger/catchpointtracker_test.go
AlgoAxel Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@
return fmt.Errorf("couldn't initialize the node: %s", err)
}
s.node = serverNode

// When a caller to logging uses Fatal, we want to stop the node before os.Exit is called.
logging.RegisterExitHandler(s.Stop)

Check warning on line 236 in daemon/algod/server.go

View check run for this annotation

Codecov / codecov/patch

daemon/algod/server.go#L236

Added line #L236 was not covered by tests

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@
return offset
}

func (ao *onlineAccounts) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (ao *onlineAccounts) handlePrepareCommitError(dcc *deferredCommitContext) {

Check warning on line 354 in ledger/acctonline.go

View check run for this annotation

Codecov / codecov/patch

ledger/acctonline.go#L354

Added line #L354 was not covered by tests
}
func (ao *onlineAccounts) handleCommitError(dcc *deferredCommitContext) {
}

func (ao *onlineAccounts) maxBalLookback() uint64 {
Expand Down
6 changes: 5 additions & 1 deletion ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,11 @@
return off, nil
}

func (au *accountUpdates) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (au *accountUpdates) handlePrepareCommitError(dcc *deferredCommitContext) {

Check warning on line 1419 in ledger/acctupdates.go

View check run for this annotation

Codecov / codecov/patch

ledger/acctupdates.go#L1419

Added line #L1419 was not covered by tests
}
func (au *accountUpdates) handleCommitError(dcc *deferredCommitContext) {
}

// prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly.
Expand Down
6 changes: 5 additions & 1 deletion ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@
func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) handleUnorderedCommitOrError(*deferredCommitContext) {
func (b *bulletin) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (b *bulletin) handlePrepareCommitError(dcc *deferredCommitContext) {

Check warning on line 128 in ledger/bulletin.go

View check run for this annotation

Codecov / codecov/patch

ledger/bulletin.go#L128

Added line #L128 was not covered by tests
}
func (b *bulletin) handleCommitError(dcc *deferredCommitContext) {
}

func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
25 changes: 21 additions & 4 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,10 +953,27 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr
}
}

// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order.
// Tracker might update own state in this case. For example, account catchpoint tracker cancels
// scheduled catchpoint writing that deferred commit.
func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
// when the deferred commit is found to be out of order, cancel writing
func (ct *catchpointTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
ct.cancelWrite(dcc)
}

// if an error is encountered during commit preparation, cancel writing
func (ct *catchpointTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
ct.cancelWrite(dcc)
}

// if an error is encountered during commit, cancel writing and clear the balances trie
func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) {
// in cases where the commitRound fails, it is not certain that the merkle trie is in a clean state, and should be cleared.
// Specifically, modifications to the trie happen through accountsUpdateBalances,
// which happens before commit to disk. Errors in this tracker, subsequent trackers, or the commit to disk may cause the trie cache to be incorrect,
// affecting the perceived root on subsequent rounds
ct.balancesTrie = nil
ct.cancelWrite(dcc)
}

func (ct *catchpointTracker) cancelWrite(dcc *deferredCommitContext) {
// if the node is configured to generate catchpoint files, we might need to update the catchpointWriting variable.
if ct.enableGeneratingCatchpointFiles {
// determine if this was a catchpoint round
Expand Down
90 changes: 88 additions & 2 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,88 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.
require.NoError(t, err)
}

// TestCatchpointCommitErrorHandling exists to confirm that when an error occurs during catchpoint generation,
// the catchpoint tracker will clear the appropriate state - specifically, the balancesTrie will be cleared,
// and the balancesTrie will remain functional if loaded from disk, or if lazily loaded during commitRound
func TestCatchpointCommitErrorHandling(t *testing.T) {
partitiontest.PartitionTest(t)

temporaryDirectory := t.TempDir()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()

ct := &catchpointTracker{}
conf := config.GetDefaultLocal()

conf.Archival = true
ct.initialize(conf, ".")
defer ct.close()
ct.dbDirectory = temporaryDirectory

_, err := trackerDBInitialize(ml, true, ct.dbDirectory)
require.NoError(t, err)

err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)

txn, err := ml.dbs.BeginTransaction(context.Background())
require.NoError(t, err)
dcc := deferredCommitContext{
compactKvDeltas: map[string]modifiedKvValue{"key": {data: []byte("value")}},
}

// before commitRound is called, record the trie RootHash
require.NotNil(t, ct.balancesTrie)
root1, err := ct.balancesTrie.RootHash()
require.NoError(t, err)

ct.commitRound(context.Background(), txn, &dcc)

txn.Commit()

// after commitRound is called, confirm the RootHash has changed
root2, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.NotEqual(t, root1, root2)

// demonstrate that handleUnordered does not restore the trie
ct.handleUnorderedCommit(&dcc)
root2a, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root2, root2a)

// demonstrate that handlePrepareCommitError does not restore the trie
ct.handlePrepareCommitError(&dcc)
root2b, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root2, root2b)

// now have the ct handle a commit error
ct.handleCommitError(&dcc)
// after error handling, the trie should be nil
require.Nil(t, ct.balancesTrie)

// after reloading from disk, the trie should be equal to root1
err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)
root3, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root1, root3)

// also demonstrate that lazy initialization allows a nil trie to go back to root2 immediately after error if the same delta is applied
txn, err = ml.dbs.BeginTransaction(context.Background())
require.NoError(t, err)
ct.handleCommitError(&dcc) // clear trie
require.Nil(t, ct.balancesTrie)
ct.commitRound(context.Background(), txn, &dcc)
txn.Commit()
root4, err := ct.balancesTrie.RootHash()
require.NoError(t, err)
require.Equal(t, root2, root4)
}

// TestCatchpointFileWithLargeSpVerification makes sure that CatchpointFirstStageInfo.BiggestChunkLen is calculated based on state proof verification contexts
// as well as other chunks in the catchpoint files.
func TestCatchpointFileWithLargeSpVerification(t *testing.T) {
Expand Down Expand Up @@ -702,8 +784,12 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// handleUnorderedCommitOrError is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
// control functions are not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) {
}

// close is not used by the blockingTracker
Expand Down
6 changes: 5 additions & 1 deletion ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@
func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
func (mt *metricsTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (mt *metricsTracker) handlePrepareCommitError(dcc *deferredCommitContext) {

Check warning on line 92 in ledger/metrics.go

View check run for this annotation

Codecov / codecov/patch

ledger/metrics.go#L92

Added line #L92 was not covered by tests
}
func (mt *metricsTracker) handleCommitError(dcc *deferredCommitContext) {
}

func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
6 changes: 5 additions & 1 deletion ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@
func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) handleUnorderedCommitOrError(*deferredCommitContext) {
func (bn *blockNotifier) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bn *blockNotifier) handlePrepareCommitError(dcc *deferredCommitContext) {

Check warning on line 128 in ledger/notifier.go

View check run for this annotation

Codecov / codecov/patch

ledger/notifier.go#L128

Added line #L128 was not covered by tests
}
func (bn *blockNotifier) handleCommitError(dcc *deferredCommitContext) {
}

func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
6 changes: 5 additions & 1 deletion ledger/spverificationtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@
func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) {
}

func (spt *spVerificationTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
func (spt *spVerificationTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (spt *spVerificationTracker) handlePrepareCommitError(dcc *deferredCommitContext) {

Check warning on line 165 in ledger/spverificationtracker.go

View check run for this annotation

Codecov / codecov/patch

ledger/spverificationtracker.go#L165

Added line #L165 was not covered by tests
}
func (spt *spVerificationTracker) handleCommitError(dcc *deferredCommitContext) {
}

func (spt *spVerificationTracker) close() {
Expand Down
14 changes: 14 additions & 0 deletions ledger/store/trackerdb/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import (
"context"
"errors"
"fmt"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand All @@ -30,6 +31,19 @@
// ErrNotFound is returned when a record is not found.
var ErrNotFound = errors.New("trackerdb: not found")

// ErrIoErr is returned when a Disk/IO error is encountered
type ErrIoErr struct {
InnerError error
}

func (e *ErrIoErr) Error() string {
return fmt.Sprintf("trackerdb: io error: %v", e.InnerError)

Check warning on line 40 in ledger/store/trackerdb/interface.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/interface.go#L39-L40

Added lines #L39 - L40 were not covered by tests
}

func (e *ErrIoErr) Unwrap() error {
return e.InnerError

Check warning on line 44 in ledger/store/trackerdb/interface.go

View check run for this annotation

Codecov / codecov/patch

ledger/store/trackerdb/interface.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}

// AccountRef is an opaque ref to an account in the db.
type AccountRef interface {
AccountRefMarker()
Expand Down
28 changes: 28 additions & 0 deletions ledger/store/trackerdb/sqlitedriver/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package sqlitedriver
import (
"context"
"database/sql"
"errors"
"testing"

"github.com/algorand/go-algorand/data/basics"
storetesting "github.com/algorand/go-algorand/ledger/store/testing"
"github.com/algorand/go-algorand/ledger/store/trackerdb"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -81,3 +84,28 @@ func TestAccountsDbQueriesCreateClose(t *testing.T) {
qs.Close()
require.Nil(t, qs.lookupAccountStmt)
}

// TestWrapIOError ensures that SQL ErrIOErr is converted to trackerdb.ErrIoErr
// github.com/mattn/go-sqlite3/blob/master/error.go
// github.com/mattn/go-sqlite3/blob/master/sqlite3.go#L830
func TestWrapIOError(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

// This structure is how sqlite3 returns Errors
err := sqlite3.Error{Code: sqlite3.ErrIoErr}
var trackerIOErr *trackerdb.ErrIoErr
require.ErrorAs(t, wrapIOError(err), &trackerIOErr)

// ErrNo10 is a sqlite3 error code for ErrIoErr
err = sqlite3.Error{Code: sqlite3.ErrNo(10)}
require.ErrorAs(t, wrapIOError(err), &trackerIOErr)

err = sqlite3.Error{Code: sqlite3.ErrSchema}
require.False(t, errors.As(wrapIOError(err), &trackerIOErr))

// confirm that double wrapping only applies once
err = sqlite3.Error{Code: sqlite3.ErrIoErr}
require.Equal(t, wrapIOError(err), wrapIOError(wrapIOError(err)))

}
Loading