Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition in last processed height #38

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions internal/db/error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package db

import "errors"
import (
"errors"
"fmt"
)

// DuplicateKeyError is an error type for duplicate key errors
type DuplicateKeyError struct {
Expand Down Expand Up @@ -41,7 +44,7 @@ type NotFoundError struct {
}

func (e *NotFoundError) Error() string {
return e.Message
return fmt.Sprintf("%s: %s", e.Message, e.Key)
}

func IsNotFoundError(err error) bool {
Expand Down
16 changes: 8 additions & 8 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ const (
// If an error occurs, it logs the error and terminates the program.
// The method runs asynchronously to allow non-blocking operation.
func (s *Service) StartBbnBlockProcessor(ctx context.Context) {
go func() {
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}()
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}

// processBlocksSequentially processes BBN blockchain blocks in sequential order,
Expand Down Expand Up @@ -63,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
}

// Process blocks from lastProcessedHeight + 1 to latestHeight
for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ {
for i := lastProcessedHeight; i <= uint64(latestHeight); i++ {
select {
case <-ctx.Done():
return types.NewError(
Expand All @@ -76,11 +74,13 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
if err != nil {
return err
}

for _, event := range events {
s.bbnEventProcessor <- event
if err := s.processEvent(ctx, event); err != nil {
return err
}
}

// Update lastProcessedHeight after successful processing
if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
Expand Down
21 changes: 4 additions & 17 deletions internal/services/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,8 @@ func NewBbnEvent(category EventCategory, event abcitypes.Event) BbnEvent {
}
}

// startBbnEventProcessor continuously listens for events from the channel and
// processes them in the main thread
func (s *Service) StartBbnEventProcessor(ctx context.Context) {
for event := range s.bbnEventProcessor {
if event.Event.Type == "" {
log.Warn().Msg("Empty event received, skipping")
continue
}
// Create a new context with a timeout for each event
ctx, cancel := context.WithTimeout(context.Background(), eventProcessingTimeout)
defer cancel()
s.processEvent(ctx, event)
}
}

// Entry point for processing events
func (s *Service) processEvent(ctx context.Context, event BbnEvent) {
func (s *Service) processEvent(ctx context.Context, event BbnEvent) *types.Error {
// Note: We no longer need to check for the event category here. We can directly
// process the event based on its type.
bbnEvent := event.Event
Expand Down Expand Up @@ -89,8 +74,10 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) {

if err != nil {
log.Error().Err(err).Msg("Failed to process event")
panic(err)
return err
}

return nil
}

func parseEvent[T proto.Message](
Expand Down
6 changes: 2 additions & 4 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
s.SyncGlobalParams(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the BBN block processor
s.StartBbnBlockProcessor(ctx)
// Start the websocket event subscription process
s.SubscribeToBbnEvents(ctx)
// Keep processing events in the main thread
s.StartBbnEventProcessor(ctx)
// Keep processing BBN blocks in the main thread
s.StartBbnBlockProcessor(ctx)
}

func (s *Service) quitContext() (context.Context, func()) {
Expand Down
Loading