Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition in last processed height #38

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix event processor
gusin13 committed Nov 11, 2024
commit a546ea4d8edb39a8597ad0f30686292ce6e2c615
7 changes: 5 additions & 2 deletions internal/db/error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package db

import "errors"
import (
"errors"
"fmt"
)

// DuplicateKeyError is an error type for duplicate key errors
type DuplicateKeyError struct {
@@ -41,7 +44,7 @@ type NotFoundError struct {
}

func (e *NotFoundError) Error() string {
return e.Message
return fmt.Sprintf("%s: %s", e.Message, e.Key)
}

func IsNotFoundError(err error) bool {
16 changes: 8 additions & 8 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
@@ -19,11 +19,9 @@ const (
// If an error occurs, it logs the error and terminates the program.
// The method runs asynchronously to allow non-blocking operation.
func (s *Service) StartBbnBlockProcessor(ctx context.Context) {
go func() {
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}()
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}

// processBlocksSequentially processes BBN blockchain blocks in sequential order,
@@ -63,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
}

// Process blocks from lastProcessedHeight + 1 to latestHeight
for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ {
for i := lastProcessedHeight; i <= uint64(latestHeight); i++ {
select {
case <-ctx.Done():
return types.NewError(
@@ -76,11 +74,13 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
if err != nil {
return err
}

for _, event := range events {
s.bbnEventProcessor <- event
if err := s.processEvent(ctx, event); err != nil {
return err
}
}

// Update lastProcessedHeight after successful processing
if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
21 changes: 4 additions & 17 deletions internal/services/events.go
Original file line number Diff line number Diff line change
@@ -35,23 +35,8 @@ func NewBbnEvent(category EventCategory, event abcitypes.Event) BbnEvent {
}
}

// startBbnEventProcessor continuously listens for events from the channel and
// processes them in the main thread
func (s *Service) StartBbnEventProcessor(ctx context.Context) {
for event := range s.bbnEventProcessor {
if event.Event.Type == "" {
log.Warn().Msg("Empty event received, skipping")
continue
}
// Create a new context with a timeout for each event
ctx, cancel := context.WithTimeout(context.Background(), eventProcessingTimeout)
defer cancel()
s.processEvent(ctx, event)
}
}

// Entry point for processing events
func (s *Service) processEvent(ctx context.Context, event BbnEvent) {
func (s *Service) processEvent(ctx context.Context, event BbnEvent) *types.Error {
// Note: We no longer need to check for the event category here. We can directly
// process the event based on its type.
bbnEvent := event.Event
@@ -89,8 +74,10 @@ func (s *Service) processEvent(ctx context.Context, event BbnEvent) {

if err != nil {
log.Error().Err(err).Msg("Failed to process event")
panic(err)
return err
}

return nil
}

func parseEvent[T proto.Message](
6 changes: 2 additions & 4 deletions internal/services/service.go
Original file line number Diff line number Diff line change
@@ -64,12 +64,10 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
s.SyncGlobalParams(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the BBN block processor
s.StartBbnBlockProcessor(ctx)
// Start the websocket event subscription process
s.SubscribeToBbnEvents(ctx)
// Keep processing events in the main thread
s.StartBbnEventProcessor(ctx)
// Start the BBN block processor
s.StartBbnBlockProcessor(ctx)
}

func (s *Service) quitContext() (context.Context, func()) {