Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribe to bbn websocket new block events #32

Merged
merged 15 commits into from
Oct 30, 2024
16 changes: 16 additions & 0 deletions internal/clients/bbnclient/bbnclient.go
Original file line number Diff line number Diff line change
@@ -107,6 +107,22 @@ func (c *BbnClient) GetAllStakingParams(ctx context.Context) (map[uint32]*Stakin
return allParams, nil
}

func (c *BbnClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
}

func (c *BbnClient) UnsubscribeAll(subscriber string) error {
return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber)
}

func (c *BbnClient) IsRunning() bool {
return c.queryClient.RPCClient.IsRunning()
}

func (c *BbnClient) Start() error {
return c.queryClient.RPCClient.Start()
}

func (c *BbnClient) getBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, *types.Error) {
resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight)
if err != nil {
4 changes: 4 additions & 0 deletions internal/clients/bbnclient/interface.go
Original file line number Diff line number Diff line change
@@ -14,4 +14,8 @@ type BbnInterface interface {
GetBlockResults(
ctx context.Context, blockHeight int64,
) (*ctypes.ResultBlockResults, *types.Error)
Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
UnsubscribeAll(subscriber string) error
IsRunning() bool
Start() error
}
13 changes: 13 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
@@ -139,4 +139,17 @@ type DbInterface interface {
* @return An error if the operation failed
*/
DeleteExpiredDelegation(ctx context.Context, stakingTxHashHex string) error
/**
* GetLastProcessedHeight retrieves the last processed height.
* @param ctx The context
* @return The last processed height or an error
*/
GetLastProcessedHeight(ctx context.Context) (uint64, error)
/**
* UpdateLastProcessedHeight updates the last processed height.
* @param ctx The context
* @param height The last processed height
* @return An error if the operation failed
*/
UpdateLastProcessedHeight(ctx context.Context, height uint64) error
}
34 changes: 34 additions & 0 deletions internal/db/last_processed_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package db

import (
"context"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func (db *Database) GetLastProcessedHeight(ctx context.Context) (uint64, error) {
var result model.LastProcessedHeight
err := db.client.Database(db.dbName).
Collection(model.LastProcessedHeightCollection).
FindOne(ctx, bson.M{}).Decode(&result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to make the collection generic for storing pointers so that this collection can be used for both BTC and BBN heights. (even for any other pointer values)
So it probably make sense to have a hardcoded primary key for the BBN height

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, is it ok if we change in later pr when the requirement arises?

btw in what case would we store btc pointer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial plan for syncing withdrawal transactions was to perform the same block scan as BBN blocks by storing the BTC height as a pointer to track the last processed height, avoiding the need to sync historical data. However, since the decision has been made to use this library to subscribe to BTC transaction events, this approach is no longer necessary.

That said, I would still argue for keeping this table generic, allowing us to store any pointer in this collection for future use.

Yes, of course. feel free to raise a ticket and we can track it in later PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see sg, have made a ticket to track
#35

if err == mongo.ErrNoDocuments {
// If no document exists, return 0
return 0, nil
}
if err != nil {
return 0, err
}
return result.Height, nil
}

func (db *Database) UpdateLastProcessedHeight(ctx context.Context, height uint64) error {
update := bson.M{"$set": bson.M{"height": height}}
opts := options.Update().SetUpsert(true)
_, err := db.client.Database(db.dbName).
Collection(model.LastProcessedHeightCollection).
UpdateOne(ctx, bson.M{}, update, opts)
return err
}
5 changes: 5 additions & 0 deletions internal/db/model/last_processed_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package model

type LastProcessedHeight struct {
Height uint64 `bson:"height"`
}
2 changes: 2 additions & 0 deletions internal/db/model/setup.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ const (
BTCDelegationDetailsCollection = "btc_delegation_details"
TimeLockCollection = "timelock"
GlobalParamsCollection = "global_params"
LastProcessedHeightCollection = "last_processed_height"
)

type index struct {
@@ -30,6 +31,7 @@ var collections = map[string][]index{
BTCDelegationDetailsCollection: {{Indexes: map[string]int{}}},
TimeLockCollection: {{Indexes: map[string]int{}}},
GlobalParamsCollection: {{Indexes: map[string]int{}}},
LastProcessedHeightCollection: {{Indexes: map[string]int{}}},
}

func Setup(ctx context.Context, cfg *config.Config) error {
111 changes: 61 additions & 50 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
@@ -2,54 +2,26 @@ package services

import (
"context"
"time"
"fmt"
"net/http"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
"github.com/rs/zerolog/log"
)

// TODO: To be replaced by the actual values later and moved to a config file
const (
lastProcessedHeight = int64(0)
eventProcessorSize = 5000
retryInterval = 10 * time.Second
maxRetries = 10
eventProcessorSize = 5000
)

// bootstrapBbn handles its own retry logic and runs in a goroutine.
// It will try to bootstrap the BBN blockchain by fetching until the latest block
// height and processing events. If any errors occur during the process,
// it will retry with exponential backoff, up to a maximum of maxRetries.
// BootstrapBbn initiates the BBN blockchain bootstrapping process in a separate goroutine.
// It attempts to bootstrap by processing blocks and events.
// If an error occurs, it logs the error and terminates the program.
// The method runs asynchronously to allow non-blocking operation.
func (s *Service) BootstrapBbn(ctx context.Context) {
go func() {
bootstrapCtx, cancel := context.WithCancel(ctx)
defer cancel()

for retries := 0; retries < maxRetries; retries++ {
err := s.attemptBootstrap(bootstrapCtx)
if err != nil {
log.Err(err).
Msgf(
"Failed to bootstrap BBN blockchain, attempt %d/%d",
retries+1,
maxRetries,
)

// If the retry count reaches maxRetries, log the failure and exit
if retries == maxRetries-1 {
log.Fatal().
Msg(
"Failed to bootstrap BBN blockchain after max retries, exiting",
)
}

// Exponential backoff
time.Sleep(retryInterval * time.Duration(retries))
} else {
log.Info().Msg("Successfully bootstrapped BBN blockchain")
break // Exit the loop if successful
}
if err := s.attemptBootstrap(ctx); err != nil {
log.Fatal().Msgf("BBN bootstrap process exited with error: %v", err)
}
}()
}
@@ -58,24 +30,63 @@ func (s *Service) BootstrapBbn(ctx context.Context) {
// block height and processing the blocks from the last processed height.
// It returns an error if it fails to get the block results or events from the block.
func (s *Service) attemptBootstrap(ctx context.Context) *types.Error {
latestBbnHeight, err := s.bbn.GetLatestBlockNumber(ctx)
if err != nil {
return err
lastProcessedHeight, dbErr := s.db.GetLastProcessedHeight(ctx)
if dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get last processed height: %w", dbErr),
)
}
log.Debug().Msgf("Latest BBN block height: %d", latestBbnHeight)

// lastProcessedHeight is already synced, so start from the next block
for i := lastProcessedHeight + 1; i <= latestBbnHeight; i++ {
events, err := s.getEventsFromBlock(ctx, i)
if err != nil {
log.Err(err).Msgf("Failed to get events from block %d", i)
return err
}
for _, event := range events {
s.bbnEventProcessor <- event
for {
select {
case <-ctx.Done():
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("context cancelled during bootstrap"),
)

case latestHeight := <-s.latestHeightChan:
log.Info().
Uint64("last_processed_height", lastProcessedHeight).
Int64("latest_height", latestHeight).
Msg("Received new block height")

// Process blocks from lastProcessedHeight + 1 to latestHeight
for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ {
select {
case <-ctx.Done():
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("context cancelled during block processing"),
)
default:
events, err := s.getEventsFromBlock(ctx, int64(i))
if err != nil {
return err
}
for _, event := range events {
s.bbnEventProcessor <- event
}

// Update lastProcessedHeight after successful processing
if dbErr := s.db.UpdateLastProcessedHeight(ctx, i); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update last processed height in database: %w", dbErr),
)
}
lastProcessedHeight = i
}
}

log.Info().Msgf("Processed blocks up to height %d", lastProcessedHeight)
}
}
return nil
}

// getEventsFromBlock fetches the events for a given block by its block height
3 changes: 3 additions & 0 deletions internal/services/service.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ type Service struct {
bbn bbnclient.BbnInterface
queueManager *queue.QueueManager
bbnEventProcessor chan BbnEvent
latestHeightChan chan int64
}

func NewService(
@@ -27,13 +28,15 @@ func NewService(
qm *queue.QueueManager,
) *Service {
eventProcessor := make(chan BbnEvent, eventProcessorSize)
latestHeightChan := make(chan int64)
return &Service{
cfg: cfg,
db: db,
btc: btc,
bbn: bbn,
queueManager: qm,
bbnEventProcessor: eventProcessor,
latestHeightChan: latestHeightChan,
}
}

43 changes: 41 additions & 2 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
package services

import "context"
import (
"context"

"github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

// TODO: Placeholder for subscribing to BBN events via websocket
func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
subscriberName := "babylon-staking-indexer"
query := "tm.event='NewBlock'"

err := s.bbn.Start()
if err != nil {
log.Fatal().Msgf("Failed to start BBN client: %v", err)
}

if !s.bbn.IsRunning() {
log.Fatal().Msg("BBN client is not running")
}

eventChan, err := s.bbn.Subscribe(subscriberName, query)
if err != nil {
log.Fatal().Msgf("Failed to subscribe to events: %v", err)
}

go func() {
for {
select {
case event := <-eventChan:
newBlockEvent, ok := event.Data.(types.EventDataNewBlock)
if !ok {
log.Fatal().Msg("Event is not a NewBlock event")
}

latestHeight := newBlockEvent.Block.Height
if latestHeight == 0 {
log.Fatal().Msg("Event doesn't contain block height information")
}

// Send the latest height to the bootstrap process
s.latestHeightChan <- latestHeight

case <-ctx.Done():
err := s.bbn.UnsubscribeAll(subscriberName)
if err != nil {
log.Error().Msgf("Failed to unsubscribe from events: %v", err)
}
return
}
}
Loading
Loading