diff --git a/internal/evmreader/claim.go b/internal/evmreader/claim.go index 73720fc87..027aa31c5 100644 --- a/internal/evmreader/claim.go +++ b/internal/evmreader/claim.go @@ -104,9 +104,9 @@ func (r *EvmReader) readAndUpdateClaims( epochs := []*Epoch{} for _, claimAcceptance := range claimAcceptances { - // Get Previous claims and update their statuses - // rejecting all - previousClaims, err := r.repository.GetPreviousSubmittedClaims( + // Get Previous Epochs with submitted claims, that got no acceptance event + // and update their statuses rejecting them all + previousEpochs, err := r.repository.GetPreviousSubmittedClaimsEpochs( ctx, app, claimAcceptance.LastProcessedBlockNumber.Uint64()) if err != nil { slog.Error("Error retrieving previous submitted claims", @@ -114,57 +114,58 @@ func (r *EvmReader) readAndUpdateClaims( continue APP_LOOP } - for _, previousClaim := range previousClaims { - previousClaim.Status = EpochStatusClaimRejected - epochs = append(epochs, previousClaim) + for _, previousEpoch := range previousEpochs { + previousEpoch.Status = EpochStatusClaimRejected + epochs = append(epochs, previousEpoch) slog.Warn("Claim rejected", "app", app, - "lastBlock", previousClaim.LastBlock, - "hash", previousClaim.ClaimHash) + "lastBlock", previousEpoch.LastBlock, + "hash", previousEpoch.ClaimHash) } - // Get Claim - claim, err := r.repository.GetEpoch( + // Get the Epoch for the current Claim Acceptance Event + epoch, err := r.repository.GetEpoch( ctx, calculateEpochIndex( r.epochLengthCache[app], claimAcceptance.LastProcessedBlockNumber.Uint64()), app) if err != nil { - slog.Error("Error retrieving claim", "app", app, "error", err) + slog.Error("Error retrieving Epoch", "app", app, "error", err) continue APP_LOOP } - // Check Claim - if claim == nil { - slog.Error("Got unknown claim event", + // Check Epoch + if epoch == nil { + slog.Error("Got claim acceptance event for an unknown epoch", "app", app, "claim last block", claimAcceptance.LastProcessedBlockNumber, "hash", claimAcceptance.Claim) continue APP_LOOP } - // Update claim status - if claimAcceptance.Claim != *claim.ClaimHash { + // Update Epoch claim status + if claimAcceptance.Claim != *epoch.ClaimHash || + claimAcceptance.LastProcessedBlockNumber.Uint64() != epoch.LastBlock { slog.Warn("Claim Rejected", "app", app, - "lastBlock", claim.LastBlock, - "hash", claim.ClaimHash) + "lastBlock", epoch.LastBlock, + "hash", epoch.ClaimHash) - claim.Status = EpochStatusClaimRejected - epochs = append(epochs, claim) + epoch.Status = EpochStatusClaimRejected + epochs = append(epochs, epoch) } else { slog.Info("Claim Accepted", "app", app, - "lastBlock", claim.LastBlock, - "hash", claim.ClaimHash) + "lastBlock", epoch.LastBlock, + "hash", epoch.ClaimHash) - claim.Status = EpochStatusClaimAccepted - epochs = append(epochs, claim) + epoch.Status = EpochStatusClaimAccepted + epochs = append(epochs, epoch) } } // Store everything - err = r.repository.StoreClaimsTransaction( + err = r.repository.UpdateEpochsClaimStatusTransaction( ctx, app, epochs, mostRecentBlockNumber) if err != nil { slog.Error("Error storing claims", "app", app, "error", err) diff --git a/internal/evmreader/claim_test.go b/internal/evmreader/claim_test.go index 66e5574fc..883e5c5d6 100644 --- a/internal/evmreader/claim_test.go +++ b/internal/evmreader/claim_test.go @@ -50,8 +50,8 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { LastClaimCheckBlock: 0x11, }}, nil).Once() - s.repository.Unset("StoreClaimsTransaction") - s.repository.On("StoreClaimsTransaction", + s.repository.Unset("UpdateEpochsClaimStatusTransaction") + s.repository.On("UpdateEpochsClaimStatusTransaction", mock.Anything, mock.Anything, mock.Anything, @@ -67,7 +67,7 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { s.Require().Equal(uint64(17), lastClaimCheck) }).Return(nil) - s.repository.On("StoreClaimsTransaction", + s.repository.On("UpdateEpochsClaimStatusTransaction", mock.Anything, mock.Anything, mock.Anything, @@ -131,7 +131,7 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { s.repository.AssertNumberOfCalls( s.T(), - "StoreClaimsTransaction", + "UpdateEpochsClaimStatusTransaction", 2, ) @@ -232,15 +232,15 @@ func (s *EvmReaderSuite) TestReadClaimAcceptance() { mock.Anything, mock.Anything).Return(claim1, nil) - s.repository.Unset("GetPreviousSubmittedClaims") - s.repository.On("GetPreviousSubmittedClaims", + s.repository.Unset("GetPreviousSubmittedClaimsEpochs") + s.repository.On("GetPreviousSubmittedClaimsEpochs", mock.Anything, mock.Anything, mock.Anything, ).Return([]*Epoch{claim0}, nil) - s.repository.Unset("StoreClaimsTransaction") - s.repository.On("StoreClaimsTransaction", + s.repository.Unset("UpdateEpochsClaimStatusTransaction") + s.repository.On("UpdateEpochsClaimStatusTransaction", mock.Anything, mock.Anything, mock.Anything, @@ -295,7 +295,7 @@ func (s *EvmReaderSuite) TestReadClaimAcceptance() { s.repository.AssertNumberOfCalls( s.T(), - "StoreClaimsTransaction", + "UpdateEpochsClaimStatusTransaction", 1, ) @@ -390,15 +390,15 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { mock.Anything, mock.Anything).Return(claim1, nil) - s.repository.Unset("GetPreviousSubmittedClaims") - s.repository.On("GetPreviousSubmittedClaims", + s.repository.Unset("GetPreviousSubmittedClaimsEpochs") + s.repository.On("GetPreviousSubmittedClaimsEpochs", mock.Anything, mock.Anything, mock.Anything, ).Return([]*Epoch{}, fmt.Errorf("No previous epochs for you")) - s.repository.Unset("StoreClaimsTransaction") - s.repository.On("StoreClaimsTransaction", + s.repository.Unset("UpdateEpochsClaimStatusTransaction") + s.repository.On("UpdateEpochsClaimStatusTransaction", mock.Anything, mock.Anything, mock.Anything, @@ -440,7 +440,7 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { s.repository.AssertNumberOfCalls( s.T(), - "StoreClaimsTransaction", + "UpdateEpochsClaimStatusTransaction", 0, ) @@ -534,15 +534,15 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { mock.Anything, mock.Anything).Return(nil, fmt.Errorf("No epoch for you")) - s.repository.Unset("GetPreviousSubmittedClaims") - s.repository.On("GetPreviousSubmittedClaims", + s.repository.Unset("GetPreviousSubmittedClaimsEpochs") + s.repository.On("GetPreviousSubmittedClaimsEpochs", mock.Anything, mock.Anything, mock.Anything, ).Return([]*Epoch{claim0}, nil) - s.repository.Unset("StoreClaimsTransaction") - s.repository.On("StoreClaimsTransaction", + s.repository.Unset("UpdateEpochsClaimStatusTransaction") + s.repository.On("UpdateEpochsClaimStatusTransaction", mock.Anything, mock.Anything, mock.Anything, @@ -584,7 +584,7 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { s.repository.AssertNumberOfCalls( s.T(), - "StoreClaimsTransaction", + "UpdateEpochsClaimStatusTransaction", 0, ) diff --git a/internal/evmreader/consensus_adapter.go b/internal/evmreader/consensus_adapter.go index 22e76708a..a85ebd3d9 100644 --- a/internal/evmreader/consensus_adapter.go +++ b/internal/evmreader/consensus_adapter.go @@ -50,8 +50,7 @@ func (c *ConsensusContractAdapter) RetrieveClaimAcceptanceEvents( claimAcceptanceEvent := itr.Event events = append(events, claimAcceptanceEvent) } - err = itr.Error() - if err != nil { + if err = itr.Error(); err != nil { return nil, err } return events, nil diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index c15804215..dd4436718 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -40,8 +40,8 @@ type EvmReaderRepository interface { GetAllRunningApplications(ctx context.Context) ([]Application, error) GetNodeConfig(ctx context.Context) (*NodePersistentConfig, error) GetEpoch(ctx context.Context, indexKey uint64, appAddressKey Address) (*Epoch, error) - GetPreviousSubmittedClaims(ctx context.Context, app Address, lastBlock uint64) ([]*Epoch, error) - StoreClaimsTransaction(ctx context.Context, + GetPreviousSubmittedClaimsEpochs(ctx context.Context, app Address, lastBlock uint64) ([]*Epoch, error) + UpdateEpochsClaimStatusTransaction(ctx context.Context, app Address, claims []*Epoch, mostRecentBlockNumber uint64, @@ -91,7 +91,7 @@ type application struct { consensusContract ConsensusContract } -// EvmReader reads inputs from the blockchain +// EvmReader reads Input Added and Claim Submitted events from the blockchain type EvmReader struct { client EthClient wsClient EthWsClient @@ -170,7 +170,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} // Get All Applications runningApps, err := r.repository.GetAllRunningApplications(ctx) if err != nil { - slog.Error("Error retrieving running applications for new inputs", + slog.Error("Error retrieving running applications", "error", err, ) @@ -191,7 +191,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} } if len(apps) == 0 { - slog.Info("No running consistent applications") + slog.Info("No correctly configured applications running") continue } @@ -288,20 +288,6 @@ func (r *EvmReader) getAppContracts(app Application, return applicationContract, consensus, nil } -// getEpochLengthFromContract reads the application epoch length given it's consensus contract -func getEpochLengthFromContract(consensus ConsensusContract) (uint64, error) { - - epochLengthRaw, err := consensus.GetEpochLength(nil) - if err != nil { - return 0, errors.Join( - fmt.Errorf("error retrieving application epoch length"), - err, - ) - } - - return epochLengthRaw.Uint64(), nil -} - // Util functions // calculateEpochIndex calculates the epoch index given the input block number diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 23bec77a1..b8d1dc3ca 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -252,7 +252,7 @@ func (s *EvmReaderSuite) TestItWrongIConsensus() { s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveClaimAcceptanceEvents", 0) s.repository.AssertNumberOfCalls( s.T(), - "StoreClaimsTransaction", + "UpdateEpochsClaimStatusTransaction", 0, ) } @@ -437,12 +437,12 @@ func newMockRepository() *MockRepository { mock.Anything, mock.Anything).Return(1, nil) - repo.On("GetPreviousSubmittedClaims", + repo.On("GetPreviousSubmittedClaimsEpochs", mock.Anything, mock.Anything, ).Return([]Epoch{}, nil) - repo.On("StoreClaimsTransaction", + repo.On("UpdateEpochsClaimStatusTransaction", mock.Anything, mock.Anything, mock.Anything, @@ -506,7 +506,7 @@ func (m *MockRepository) InsertEpoch( return args.Get(0).(uint64), args.Error(1) } -func (m *MockRepository) GetPreviousSubmittedClaims( +func (m *MockRepository) GetPreviousSubmittedClaimsEpochs( ctx context.Context, app Address, lastBlock uint64, @@ -519,7 +519,7 @@ func (m *MockRepository) GetPreviousSubmittedClaims( return obj.([]*Epoch), args.Error(1) } -func (m *MockRepository) StoreClaimsTransaction(ctx context.Context, +func (m *MockRepository) UpdateEpochsClaimStatusTransaction(ctx context.Context, app Address, epochs []*Epoch, mostRecentBlockNumber uint64, diff --git a/internal/evmreader/input.go b/internal/evmreader/input.go index ab5768b74..3da07a266 100644 --- a/internal/evmreader/input.go +++ b/internal/evmreader/input.go @@ -306,3 +306,17 @@ func (r *EvmReader) readInputsFromBlockchain( func byLastProcessedBlock(app application) uint64 { return app.LastProcessedBlock } + +// getEpochLengthFromContract reads the application epoch length given it's consensus contract +func getEpochLengthFromContract(consensus ConsensusContract) (uint64, error) { + + epochLengthRaw, err := consensus.GetEpochLength(nil) + if err != nil { + return 0, errors.Join( + fmt.Errorf("error retrieving application epoch length"), + err, + ) + } + + return epochLengthRaw.Uint64(), nil +} diff --git a/internal/evmreader/inputsource_adapter.go b/internal/evmreader/inputsource_adapter.go index eda9754a4..750bb3f16 100644 --- a/internal/evmreader/inputsource_adapter.go +++ b/internal/evmreader/inputsource_adapter.go @@ -47,8 +47,7 @@ func (i *InputSourceAdapter) RetrieveInputs( inputAddedEvent := itr.Event events = append(events, *inputAddedEvent) } - err = itr.Error() - if err != nil { + if err = itr.Error(); err != nil { return nil, err } return events, nil diff --git a/internal/repository/evmreader.go b/internal/repository/evmreader.go index 061057dd6..2e51386aa 100644 --- a/internal/repository/evmreader.go +++ b/internal/repository/evmreader.go @@ -12,6 +12,11 @@ import ( "github.com/jackc/pgx/v5" ) +var ( + errInsertInputs = errors.New("unable to insert inputs") + errUpdateEpochs = errors.New("unable to update epochs") +) + // This method should be called at the end of EVMReader read input cycle // In a single transaction it updates or inserts epochs, insert inputs related to each epoch // and also updates the last processed block @@ -22,8 +27,6 @@ func (pg *Database) StoreEpochAndInputsTransaction( contractAddress Address, ) (epochIndexIdMap map[uint64]uint64, epochIndexInputIdsMap map[uint64][]uint64, _ error) { - var errInsertInputs = errors.New("unable to insert inputs") - insertEpochQuery := ` INSERT INTO epoch (application_address, @@ -243,7 +246,7 @@ func (pg *Database) GetLastProcessedBlock( return block, nil } -func (pg *Database) GetPreviousSubmittedClaims( +func (pg *Database) GetPreviousSubmittedClaimsEpochs( ctx context.Context, app Address, block uint64, @@ -307,15 +310,13 @@ func (pg *Database) GetPreviousSubmittedClaims( return results, nil } -func (pg *Database) StoreClaimsTransaction( +func (pg *Database) UpdateEpochsClaimStatusTransaction( ctx context.Context, app Address, claims []*Epoch, blockNumber uint64, ) error { - var errUpdateClaims = errors.New("unable to update claims") - updateEpochQuery := ` UPDATE epoch SET @@ -326,7 +327,7 @@ func (pg *Database) StoreClaimsTransaction( tx, err := pg.db.Begin(ctx) if err != nil { - return errors.Join(errUpdateClaims, err) + return errors.Join(errUpdateEpochs, err) } for _, claim := range claims { @@ -337,11 +338,11 @@ func (pg *Database) StoreClaimsTransaction( tag, err := tx.Exec(ctx, updateEpochQuery, updateClaimArgs) if err != nil { - return errors.Join(errUpdateClaims, err, tx.Rollback(ctx)) + return errors.Join(errUpdateEpochs, err, tx.Rollback(ctx)) } if tag.RowsAffected() != 1 { - return errors.Join(errUpdateClaims, - fmt.Errorf("no rows affected when updating claim %d", claim.Index), + return errors.Join(errUpdateEpochs, + fmt.Errorf("no row affected when updating claim %d", claim.Index), tx.Rollback(ctx)) } } @@ -360,13 +361,13 @@ func (pg *Database) StoreClaimsTransaction( _, err = tx.Exec(ctx, updateLastBlockQuery, updateLastBlockArgs) if err != nil { - return errors.Join(errUpdateClaims, err, tx.Rollback(ctx)) + return errors.Join(errUpdateEpochs, err, tx.Rollback(ctx)) } // Commit transaction err = tx.Commit(ctx) if err != nil { - return errors.Join(errUpdateClaims, err, tx.Rollback(ctx)) + return errors.Join(errUpdateEpochs, err, tx.Rollback(ctx)) } return nil diff --git a/internal/repository/evmreader_test.go b/internal/repository/evmreader_test.go index 2d8f46fd4..b8d610e17 100644 --- a/internal/repository/evmreader_test.go +++ b/internal/repository/evmreader_test.go @@ -160,8 +160,8 @@ func (s *RepositorySuite) TestGetMostRecentBlock() { s.Require().Equal(block, response) } -func (s *RepositorySuite) TestGetPreviousSubmittedClaims() { - response, err := s.database.GetPreviousSubmittedClaims( +func (s *RepositorySuite) TestGetPreviousSubmittedClaimsEpochs() { + response, err := s.database.GetPreviousSubmittedClaimsEpochs( s.ctx, common.HexToAddress("deadbeef"), 300) s.Require().Nil(err) @@ -174,7 +174,7 @@ func (s *RepositorySuite) TestGetPreviousSubmittedClaims() { s.Require().Equal(epoch, response[0]) } -func (s *RepositorySuite) TestStoreClaimsTransaction() { +func (s *RepositorySuite) TestUpdateEpochsClaimStatusTransaction() { claim, err := s.database.GetEpoch(s.ctx, 2, common.HexToAddress("deadbeef")) s.Require().Nil(err) @@ -185,7 +185,7 @@ func (s *RepositorySuite) TestStoreClaimsTransaction() { claims := []*Epoch{claim} - err = s.database.StoreClaimsTransaction(s.ctx, common.HexToAddress("deadbeef"), claims, 499) + err = s.database.UpdateEpochsClaimStatusTransaction(s.ctx, common.HexToAddress("deadbeef"), claims, 499) s.Require().Nil(err) claim, err = s.database.GetEpoch(s.ctx, 2, common.HexToAddress("deadbeef"))