diff --git a/daemon/algod/server.go b/daemon/algod/server.go index 9f702a038c..fb846d6556 100644 --- a/daemon/algod/server.go +++ b/daemon/algod/server.go @@ -231,6 +231,10 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes return fmt.Errorf("couldn't initialize the node: %s", err) } s.node = serverNode + + // When a caller to logging uses Fatal, we want to stop the node before os.Exit is called. + logging.RegisterExitHandler(s.Stop) + return nil } diff --git a/ledger/acctonline.go b/ledger/acctonline.go index c73dfa010a..e2760ee0a2 100644 --- a/ledger/acctonline.go +++ b/ledger/acctonline.go @@ -349,7 +349,11 @@ func (ao *onlineAccounts) consecutiveVersion(offset uint64) uint64 { return offset } -func (ao *onlineAccounts) handleUnorderedCommitOrError(dcc *deferredCommitContext) { +func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (ao *onlineAccounts) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (ao *onlineAccounts) handleCommitError(dcc *deferredCommitContext) { } func (ao *onlineAccounts) maxBalLookback() uint64 { diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index cf89d770f5..d6876f13e9 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -1414,7 +1414,11 @@ func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err erro return off, nil } -func (au *accountUpdates) handleUnorderedCommitOrError(dcc *deferredCommitContext) { +func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (au *accountUpdates) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (au *accountUpdates) handleCommitError(dcc *deferredCommitContext) { } // prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly. diff --git a/ledger/bulletin.go b/ledger/bulletin.go index b05848193c..4465a52381 100644 --- a/ledger/bulletin.go +++ b/ledger/bulletin.go @@ -123,7 +123,11 @@ func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) { func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -func (b *bulletin) handleUnorderedCommitOrError(*deferredCommitContext) { +func (b *bulletin) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (b *bulletin) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (b *bulletin) handleCommitError(dcc *deferredCommitContext) { } func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 427494dec1..7e59406025 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -953,10 +953,27 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr } } -// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order. -// Tracker might update own state in this case. For example, account catchpoint tracker cancels -// scheduled catchpoint writing that deferred commit. -func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitContext) { +// when the deferred commit is found to be out of order, cancel writing +func (ct *catchpointTracker) handleUnorderedCommit(dcc *deferredCommitContext) { + ct.cancelWrite(dcc) +} + +// if an error is encountered during commit preparation, cancel writing +func (ct *catchpointTracker) handlePrepareCommitError(dcc *deferredCommitContext) { + ct.cancelWrite(dcc) +} + +// if an error is encountered during commit, cancel writing and clear the balances trie +func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) { + // in cases where the commitRound fails, it is not certain that the merkle trie is in a clean state, and should be cleared. + // Specifically, modifications to the trie happen through accountsUpdateBalances, + // which happens before commit to disk. Errors in this tracker, subsequent trackers, or the commit to disk may cause the trie cache to be incorrect, + // affecting the perceived root on subsequent rounds + ct.balancesTrie = nil + ct.cancelWrite(dcc) +} + +func (ct *catchpointTracker) cancelWrite(dcc *deferredCommitContext) { // if the node is configured to generate catchpoint files, we might need to update the catchpointWriting variable. if ct.enableGeneratingCatchpointFiles { // determine if this was a catchpoint round diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 6669eab4af..d5ed7621e8 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -342,6 +342,88 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics. require.NoError(t, err) } +// TestCatchpointCommitErrorHandling exists to confirm that when an error occurs during catchpoint generation, +// the catchpoint tracker will clear the appropriate state - specifically, the balancesTrie will be cleared, +// and the balancesTrie will remain functional if loaded from disk, or if lazily loaded during commitRound +func TestCatchpointCommitErrorHandling(t *testing.T) { + partitiontest.PartitionTest(t) + + temporaryDirectory := t.TempDir() + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + + ct := &catchpointTracker{} + conf := config.GetDefaultLocal() + + conf.Archival = true + ct.initialize(conf, ".") + defer ct.close() + ct.dbDirectory = temporaryDirectory + + _, err := trackerDBInitialize(ml, true, ct.dbDirectory) + require.NoError(t, err) + + err = ct.loadFromDisk(ml, ml.Latest()) + require.NoError(t, err) + + txn, err := ml.dbs.BeginTransaction(context.Background()) + require.NoError(t, err) + dcc := deferredCommitContext{ + compactKvDeltas: map[string]modifiedKvValue{"key": {data: []byte("value")}}, + } + + // before commitRound is called, record the trie RootHash + require.NotNil(t, ct.balancesTrie) + root1, err := ct.balancesTrie.RootHash() + require.NoError(t, err) + + ct.commitRound(context.Background(), txn, &dcc) + + txn.Commit() + + // after commitRound is called, confirm the RootHash has changed + root2, err := ct.balancesTrie.RootHash() + require.NoError(t, err) + require.NotEqual(t, root1, root2) + + // demonstrate that handleUnordered does not restore the trie + ct.handleUnorderedCommit(&dcc) + root2a, err := ct.balancesTrie.RootHash() + require.NoError(t, err) + require.Equal(t, root2, root2a) + + // demonstrate that handlePrepareCommitError does not restore the trie + ct.handlePrepareCommitError(&dcc) + root2b, err := ct.balancesTrie.RootHash() + require.NoError(t, err) + require.Equal(t, root2, root2b) + + // now have the ct handle a commit error + ct.handleCommitError(&dcc) + // after error handling, the trie should be nil + require.Nil(t, ct.balancesTrie) + + // after reloading from disk, the trie should be equal to root1 + err = ct.loadFromDisk(ml, ml.Latest()) + require.NoError(t, err) + root3, err := ct.balancesTrie.RootHash() + require.NoError(t, err) + require.Equal(t, root1, root3) + + // also demonstrate that lazy initialization allows a nil trie to go back to root2 immediately after error if the same delta is applied + txn, err = ml.dbs.BeginTransaction(context.Background()) + require.NoError(t, err) + ct.handleCommitError(&dcc) // clear trie + require.Nil(t, ct.balancesTrie) + ct.commitRound(context.Background(), txn, &dcc) + txn.Commit() + root4, err := ct.balancesTrie.RootHash() + require.NoError(t, err) + require.Equal(t, root2, root4) +} + // TestCatchpointFileWithLargeSpVerification makes sure that CatchpointFirstStageInfo.BiggestChunkLen is calculated based on state proof verification contexts // as well as other chunks in the catchpoint files. func TestCatchpointFileWithLargeSpVerification(t *testing.T) { @@ -702,8 +784,12 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred } } -// handleUnorderedCommitOrError is not used by the blockingTracker -func (bt *blockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) { +// control functions are not used by the blockingTracker +func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) { } // close is not used by the blockingTracker diff --git a/ledger/metrics.go b/ledger/metrics.go index 661cb0c2dc..017960d907 100644 --- a/ledger/metrics.go +++ b/ledger/metrics.go @@ -87,7 +87,11 @@ func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitCon func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -func (mt *metricsTracker) handleUnorderedCommitOrError(*deferredCommitContext) { +func (mt *metricsTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (mt *metricsTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (mt *metricsTracker) handleCommitError(dcc *deferredCommitContext) { } func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { diff --git a/ledger/notifier.go b/ledger/notifier.go index d89badce7b..c9ea45e41a 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -123,7 +123,11 @@ func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitCont func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -func (bn *blockNotifier) handleUnorderedCommitOrError(*deferredCommitContext) { +func (bn *blockNotifier) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (bn *blockNotifier) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (bn *blockNotifier) handleCommitError(dcc *deferredCommitContext) { } func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { diff --git a/ledger/spverificationtracker.go b/ledger/spverificationtracker.go index e3d3c214ea..c34825a9c5 100644 --- a/ledger/spverificationtracker.go +++ b/ledger/spverificationtracker.go @@ -160,7 +160,11 @@ func (spt *spVerificationTracker) postCommit(_ context.Context, dcc *deferredCom func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) { } -func (spt *spVerificationTracker) handleUnorderedCommitOrError(*deferredCommitContext) { +func (spt *spVerificationTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (spt *spVerificationTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (spt *spVerificationTracker) handleCommitError(dcc *deferredCommitContext) { } func (spt *spVerificationTracker) close() { diff --git a/ledger/store/trackerdb/interface.go b/ledger/store/trackerdb/interface.go index 2ddfa2020b..fa52e9fd58 100644 --- a/ledger/store/trackerdb/interface.go +++ b/ledger/store/trackerdb/interface.go @@ -19,6 +19,7 @@ package trackerdb import ( "context" "errors" + "fmt" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -30,6 +31,19 @@ import ( // ErrNotFound is returned when a record is not found. var ErrNotFound = errors.New("trackerdb: not found") +// ErrIoErr is returned when a Disk/IO error is encountered +type ErrIoErr struct { + InnerError error +} + +func (e *ErrIoErr) Error() string { + return fmt.Sprintf("trackerdb: io error: %v", e.InnerError) +} + +func (e *ErrIoErr) Unwrap() error { + return e.InnerError +} + // AccountRef is an opaque ref to an account in the db. type AccountRef interface { AccountRefMarker() diff --git a/ledger/store/trackerdb/sqlitedriver/sql_test.go b/ledger/store/trackerdb/sqlitedriver/sql_test.go index 7e4dff97d8..65e4782b5d 100644 --- a/ledger/store/trackerdb/sqlitedriver/sql_test.go +++ b/ledger/store/trackerdb/sqlitedriver/sql_test.go @@ -19,12 +19,15 @@ package sqlitedriver import ( "context" "database/sql" + "errors" "testing" "github.com/algorand/go-algorand/data/basics" storetesting "github.com/algorand/go-algorand/ledger/store/testing" + "github.com/algorand/go-algorand/ledger/store/trackerdb" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/require" ) @@ -81,3 +84,28 @@ func TestAccountsDbQueriesCreateClose(t *testing.T) { qs.Close() require.Nil(t, qs.lookupAccountStmt) } + +// TestWrapIOError ensures that SQL ErrIOErr is converted to trackerdb.ErrIoErr +// github.com/mattn/go-sqlite3/blob/master/error.go +// github.com/mattn/go-sqlite3/blob/master/sqlite3.go#L830 +func TestWrapIOError(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + // This structure is how sqlite3 returns Errors + err := sqlite3.Error{Code: sqlite3.ErrIoErr} + var trackerIOErr *trackerdb.ErrIoErr + require.ErrorAs(t, wrapIOError(err), &trackerIOErr) + + // ErrNo10 is a sqlite3 error code for ErrIoErr + err = sqlite3.Error{Code: sqlite3.ErrNo(10)} + require.ErrorAs(t, wrapIOError(err), &trackerIOErr) + + err = sqlite3.Error{Code: sqlite3.ErrSchema} + require.False(t, errors.As(wrapIOError(err), &trackerIOErr)) + + // confirm that double wrapping only applies once + err = sqlite3.Error{Code: sqlite3.ErrIoErr} + require.Equal(t, wrapIOError(err), wrapIOError(wrapIOError(err))) + +} diff --git a/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go b/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go index 34f4d363cc..02c290be2d 100644 --- a/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go +++ b/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go @@ -19,6 +19,7 @@ package sqlitedriver import ( "context" "database/sql" + "errors" "testing" "time" @@ -28,6 +29,7 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" + "github.com/mattn/go-sqlite3" ) type trackerSQLStore struct { @@ -66,60 +68,60 @@ func (s *trackerSQLStore) Batch(fn trackerdb.BatchFn) (err error) { } func (s *trackerSQLStore) BatchContext(ctx context.Context, fn trackerdb.BatchFn) (err error) { - return s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + return wrapIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { return fn(ctx, &sqlBatchScope{tx, false, &sqlWriter{tx}}) - }) + })) } func (s *trackerSQLStore) BeginBatch(ctx context.Context) (trackerdb.Batch, error) { handle, err := s.pair.Wdb.Handle.BeginTx(ctx, nil) if err != nil { - return nil, err + return nil, wrapIOError(err) } return &sqlBatchScope{handle, false, &sqlWriter{handle}}, nil } func (s *trackerSQLStore) Snapshot(fn trackerdb.SnapshotFn) (err error) { - return s.SnapshotContext(context.Background(), fn) + return wrapIOError(s.SnapshotContext(context.Background(), fn)) } func (s *trackerSQLStore) SnapshotContext(ctx context.Context, fn trackerdb.SnapshotFn) (err error) { - return s.pair.Rdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + return wrapIOError(s.pair.Rdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { return fn(ctx, &sqlSnapshotScope{tx, &sqlReader{tx}}) - }) + })) } func (s *trackerSQLStore) BeginSnapshot(ctx context.Context) (trackerdb.Snapshot, error) { handle, err := s.pair.Rdb.Handle.BeginTx(ctx, nil) if err != nil { - return nil, err + return nil, wrapIOError(err) } return &sqlSnapshotScope{handle, &sqlReader{handle}}, nil } func (s *trackerSQLStore) Transaction(fn trackerdb.TransactionFn) (err error) { - return s.TransactionContext(context.Background(), fn) + return wrapIOError(s.TransactionContext(context.Background(), fn)) } func (s *trackerSQLStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) (err error) { - return s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + return wrapIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { return fn(ctx, &sqlTransactionScope{tx, false, &sqlReader{tx}, &sqlWriter{tx}, &sqlCatchpoint{tx}}) - }) + })) } func (s *trackerSQLStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) { handle, err := s.pair.Wdb.Handle.BeginTx(ctx, nil) if err != nil { - return nil, err + return nil, wrapIOError(err) } return &sqlTransactionScope{handle, false, &sqlReader{handle}, &sqlWriter{handle}, &sqlCatchpoint{handle}}, nil } func (s trackerSQLStore) RunMigrations(ctx context.Context, params trackerdb.Params, log logging.Logger, targetVersion int32) (mgr trackerdb.InitParams, err error) { - err = s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + err = wrapIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { mgr, err = RunMigrations(ctx, tx, params, log, targetVersion) return err - }) + })) return } @@ -283,7 +285,7 @@ func (bs *sqlBatchScope) ResetTransactionWarnDeadline(ctx context.Context, deadl func (bs *sqlBatchScope) Close() error { if !bs.committed { - return bs.tx.Rollback() + return wrapIOError(bs.tx.Rollback()) } return nil } @@ -291,7 +293,7 @@ func (bs *sqlBatchScope) Close() error { func (bs *sqlBatchScope) Commit() error { err := bs.tx.Commit() if err != nil { - return err + return wrapIOError(err) } bs.committed = true return nil @@ -307,7 +309,7 @@ func (ss *sqlSnapshotScope) ResetTransactionWarnDeadline(ctx context.Context, de } func (ss *sqlSnapshotScope) Close() error { - return ss.tx.Rollback() + return wrapIOError(ss.tx.Rollback()) } type sqlTransactionScope struct { @@ -328,7 +330,7 @@ func (txs *sqlTransactionScope) ResetTransactionWarnDeadline(ctx context.Context func (txs *sqlTransactionScope) Close() error { if !txs.committed { - return txs.tx.Rollback() + return wrapIOError(txs.tx.Rollback()) } return nil } @@ -336,8 +338,28 @@ func (txs *sqlTransactionScope) Close() error { func (txs *sqlTransactionScope) Commit() error { err := txs.tx.Commit() if err != nil { - return err + return wrapIOError(err) } txs.committed = true return nil } + +// wrapIOError allows for SQL IO Errors to be represented as trackerdb.ErrIoErr +// in places which may enconter them. +func wrapIOError(err error) error { + if err == nil { + return nil + } + // if it's already a trackerdb error, don't wrap it again + var alreadyWrapped *trackerdb.ErrIoErr + if errors.As(err, &alreadyWrapped) { + return err + } + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) { + if sqliteErr.Code == sqlite3.ErrIoErr { + return &trackerdb.ErrIoErr{InnerError: err} + } + } + return err +} diff --git a/ledger/tracker.go b/ledger/tracker.go index 5975a91409..c5f3e84781 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -119,11 +119,16 @@ type ledgerTracker interface { // An optional context is provided for long-running operations. postCommitUnlocked(context.Context, *deferredCommitContext) - // handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order - // or to handle errors reported by other trackers while committing a batch. - // Tracker might update own state in this case. For example, account updates tracker cancels + // handleUnorderedCommit is a control method for handling deferred commits that are out of order + // Tracker might update its own state in this case. For example, account updates tracker cancels // scheduled catchpoint writing flag for this batch. - handleUnorderedCommitOrError(*deferredCommitContext) + handleUnorderedCommit(*deferredCommitContext) + // handlePrepareCommitError is a control method for handling self-cleanup or update if any trackers report + // error during the prepare commit phase of commitRound + handlePrepareCommitError(*deferredCommitContext) + // handleCommitError is a control method for handling self-cleanup or update if any trackers report + // error during the commit phase of commitRound + handleCommitError(*deferredCommitContext) // close terminates the tracker, reclaiming any resources // like open database connections or goroutines. close may @@ -512,7 +517,7 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { if tr.dbRound < dbRound || offset < uint64(tr.dbRound-dbRound) { tr.log.Warnf("out of order deferred commit: offset %d, dbRound %d but current tracker DB round is %d", offset, dbRound, tr.dbRound) for _, lt := range tr.trackers { - lt.handleUnorderedCommitOrError(dcc) + lt.handleUnorderedCommit(dcc) } tr.mu.RUnlock() return nil @@ -546,7 +551,7 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { } if err != nil { for _, lt := range tr.trackers { - lt.handleUnorderedCommitOrError(dcc) + lt.handlePrepareCommitError(dcc) } tr.mu.RUnlock() return err @@ -574,10 +579,18 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { ledgerCommitroundMicros.AddMicrosecondsSince(start, nil) if err != nil { + for _, lt := range tr.trackers { - lt.handleUnorderedCommitOrError(dcc) + lt.handleCommitError(dcc) } tr.log.Warnf("unable to advance tracker db snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err) + + // if the error is an IO error, shut down the node. + var trackerIOErr *trackerdb.ErrIoErr + if errors.As(err, &trackerIOErr) { + tr.log.Fatalf("Fatal IO error during CommitRound, exiting: %v", err) + } + return err } diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index 5ab2f63c99..87646f24a6 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/require" "github.com/algorand/go-algorand/agreement" @@ -142,6 +143,59 @@ func TestTrackerScheduleCommit(t *testing.T) { a.Equal(expectedOffset, dc.offset) } +type ioErrorTracker struct { +} + +// loadFromDisk is not implemented in the blockingTracker. +func (io *ioErrorTracker) loadFromDisk(ledgerForTracker, basics.Round) error { + return nil +} + +// newBlock is not implemented in the blockingTracker. +func (io *ioErrorTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { +} + +// committedUpTo in the blockingTracker just stores the committed round. +func (io *ioErrorTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { + return 0, basics.Round(0) +} + +func (io *ioErrorTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { + return dcr +} + +// prepareCommit, is not used by the blockingTracker +func (io *ioErrorTracker) prepareCommit(*deferredCommitContext) error { + return nil +} + +// commitRound is not used by the blockingTracker +func (io *ioErrorTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + return sqlite3.Error{Code: sqlite3.ErrIoErr} +} + +func (io *ioErrorTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { +} + +// postCommitUnlocked implements entry/exit blockers, designed for testing. +func (io *ioErrorTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +} + +// control functions are not used by the blockingTracker +func (io *ioErrorTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (io *ioErrorTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (io *ioErrorTracker) handleCommitError(dcc *deferredCommitContext) { +} + +// close is not used by the blockingTracker +func (io *ioErrorTracker) close() { +} + +func (io *ioErrorTracker) reset() { +} + type producePrepareBlockingTracker struct { produceReleaseLock chan struct{} prepareCommitEntryLock chan struct{} @@ -191,8 +245,12 @@ func (bt *producePrepareBlockingTracker) postCommit(ctx context.Context, dcc *de func (bt *producePrepareBlockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -// handleUnorderedCommitOrError is not used by the blockingTracker -func (bt *producePrepareBlockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) { +// control functions are not used by the blockingTracker +func (bt *producePrepareBlockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (bt *producePrepareBlockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (bt *producePrepareBlockingTracker) handleCommitError(dcc *deferredCommitContext) { } // close is not used by the blockingTracker @@ -302,6 +360,53 @@ func TestTrackerDbRoundDataRace(t *testing.T) { close(stallingTracker.produceReleaseLock) } +func TestCommitRoundIOError(t *testing.T) { + partitiontest.PartitionTest(t) + + a := require.New(t) + + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 1) + const inMem = true + log := logging.TestingLogWithoutFatalExit(t) + log.SetLevel(logging.Warn) + cfg := config.GetDefaultLocal() + ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + a.NoError(err, "could not open ledger") + defer ledger.Close() + + // flip the flag when the exit handler is called, + // which happens when Fatal logging is called + flag := false + logging.RegisterExitHandler(func() { + flag = true + }) + + io := &ioErrorTracker{} + ledger.trackerMu.Lock() + ledger.trackers.mu.Lock() + ledger.trackers.trackers = append([]ledgerTracker{io}, ledger.trackers.trackers...) + ledger.trackers.mu.Unlock() + ledger.trackerMu.Unlock() + + // create update content which would trigger a commit + targetRound := basics.Round(100) + blk := genesisInitState.Block + for i := basics.Round(0); i < targetRound-1; i++ { + blk.BlockHeader.Round++ + blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000) + err := ledger.AddBlock(blk, agreement.Certificate{}) + a.NoError(err) + } + blk.BlockHeader.Round++ + blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000) + err = ledger.AddBlock(blk, agreement.Certificate{}) + a.NoError(err) + + // confirm that after 100 blocks, the scheduled commit generated an error + // which triggered Fatal logging (and would therefore call any registered exit handlers) + a.True(flag) +} + func TestAccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/ledger/txtail.go b/ledger/txtail.go index 31e44be77a..6f05b47f98 100644 --- a/ledger/txtail.go +++ b/ledger/txtail.go @@ -325,7 +325,11 @@ func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) { func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -func (t *txTail) handleUnorderedCommitOrError(*deferredCommitContext) { +func (t *txTail) handleUnorderedCommit(dcc *deferredCommitContext) { +} +func (t *txTail) handlePrepareCommitError(dcc *deferredCommitContext) { +} +func (t *txTail) handleCommitError(dcc *deferredCommitContext) { } func (t *txTail) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { diff --git a/logging/log.go b/logging/log.go index ebe600f7f7..d9b79eb9aa 100644 --- a/logging/log.go +++ b/logging/log.go @@ -376,6 +376,12 @@ func NewWrappedLogger(l *logrus.Logger) Logger { return out } +// RegisterExitHandler registers a function to be called on exit by logrus +// Exit handling happens when logrus.Exit is called, which is called by logrus.Fatal +func RegisterExitHandler(handler func()) { + logrus.RegisterExitHandler(handler) +} + func (l logger) EnableTelemetry(cfg TelemetryConfig) (err error) { if l.loggerState.telemetry != nil || (!cfg.Enable && !cfg.SendToLog) { return nil diff --git a/logging/log_test.go b/logging/log_test.go index af62bae912..bf6db060c7 100644 --- a/logging/log_test.go +++ b/logging/log_test.go @@ -119,3 +119,23 @@ func TestSetJSONFormatter(t *testing.T) { a.True(isJSON(bufNewLogger.String())) } + +// This test ensures that handler functions registered to Fatal Logging trigger +// when Fatal logs are emitted. We attach graceful service shutdown to Fatal logging, +// and we want to notice if changes to our logging dependencies change how these handlers are called +func TestFatalExitHandler(t *testing.T) { + partitiontest.PartitionTest(t) + + nl := TestingLogWithoutFatalExit(t) + + // Make an exit handler that sets a flag to demonstrate it was called + flag := false + RegisterExitHandler(func() { + flag = true + }) + nl.Fatal("OH NO") + + // Check that the exit handler was called + require.True(t, flag) + +} diff --git a/logging/testingLogger.go b/logging/testingLogger.go index 6492c7ddd8..0f53e6225b 100644 --- a/logging/testingLogger.go +++ b/logging/testingLogger.go @@ -18,6 +18,8 @@ package logging import ( "testing" + + "github.com/sirupsen/logrus" ) // TestLogWriter is an io.Writer that wraps a testing.T (or a testing.B) -- anything written to it gets logged with t.Log(...) @@ -39,6 +41,18 @@ func (tb TestLogWriter) Write(p []byte) (n int, err error) { return len(p), nil } +// TestingLogWithoutFatalExit is a test-only convenience function to configure logging for testing in situations where Fatal() may be called +// (e.g. in the case of an expected failure) +// Calls to Fatal() will still call any registered exit handlers +func TestingLogWithoutFatalExit(tb testing.TB) Logger { + l := logrus.New() + l.ExitFunc = func(code int) {} + wl := NewWrappedLogger(l) + wl.SetLevel(Debug) + wl.SetOutput(TestLogWriter{tb}) + return wl +} + // TestingLog is a test-only convenience function to configure logging for testing func TestingLog(tb testing.TB) Logger { l := NewLogger()