From 757c59a2c76acf08e537d4e00685dd0c89a8e391 Mon Sep 17 00:00:00 2001 From: Gurjot Singh <111540954+gusin13@users.noreply.github.com> Date: Fri, 29 Nov 2024 15:20:22 +0530 Subject: [PATCH] chore: update events schema (#75) --- cmd/babylon-staking-indexer/main.go | 13 +- config/config-docker.yml | 4 +- config/config-local.yml | 6 +- consumer/event_consumer.go | 12 + go.mod | 2 +- go.sum | 4 + internal/queue/client/client.go | 37 --- internal/queue/client/rabbitmq_client.go | 279 ------------------- internal/queue/client/schema.go | 330 ----------------------- internal/queue/queue.go | 151 ----------- internal/services/consumer_events.go | 66 +---- internal/services/delegation.go | 22 +- internal/services/expiry_checker.go | 5 - internal/services/service.go | 12 +- 14 files changed, 62 insertions(+), 881 deletions(-) create mode 100644 consumer/event_consumer.go delete mode 100644 internal/queue/client/client.go delete mode 100644 internal/queue/client/rabbitmq_client.go delete mode 100644 internal/queue/client/schema.go delete mode 100644 internal/queue/queue.go diff --git a/cmd/babylon-staking-indexer/main.go b/cmd/babylon-staking-indexer/main.go index 8956aac..4e88645 100644 --- a/cmd/babylon-staking-indexer/main.go +++ b/cmd/babylon-staking-indexer/main.go @@ -6,6 +6,7 @@ import ( "github.com/joho/godotenv" "github.com/rs/zerolog/log" + "go.uber.org/zap" "github.com/babylonlabs-io/babylon-staking-indexer/cmd/babylon-staking-indexer/cli" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" @@ -13,8 +14,8 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" "github.com/babylonlabs-io/babylon-staking-indexer/internal/services" + "github.com/babylonlabs-io/staking-queue-client/queuemngr" ) func init() { @@ -44,9 +45,13 @@ func main() { log.Fatal().Err(err).Msg("error while creating db client") } - qm, err := queue.NewQueueManager(&cfg.Queue) + // Create a basic zap logger + zapLogger, _ := zap.NewProduction() + defer zapLogger.Sync() + + queueConsumer, err := queuemngr.NewQueueManager(&cfg.Queue, zapLogger) if err != nil { - log.Fatal().Err(err).Msg("error while creating queue manager") + log.Fatal().Err(err).Msg("failed to initialize event consumer") } btcClient, err := btcclient.NewBTCClient(&cfg.BTC) @@ -64,7 +69,7 @@ func main() { log.Fatal().Err(err).Msg("error while creating btc notifier") } - service := services.NewService(cfg, dbClient, btcClient, btcNotifier, bbnClient, qm) + service := services.NewService(cfg, dbClient, btcClient, btcNotifier, bbnClient, queueConsumer) if err != nil { log.Fatal().Err(err).Msg("error while creating service") } diff --git a/config/config-docker.yml b/config/config-docker.yml index d4530b2..cbc7cfe 100644 --- a/config/config-docker.yml +++ b/config/config-docker.yml @@ -28,9 +28,9 @@ queue: queue_user: user # can be replaced by values in .env file queue_password: password url: "localhost:5672" - processing_timeout: 5 # 5 second + processing_timeout: 5s # 5 second msg_max_retry_attempts: 10 - requeue_delay_time: 300 + requeue_delay_time: 300s queue_type: quorum metrics: host: 0.0.0.0 diff --git a/config/config-local.yml b/config/config-local.yml index 0b568e6..13a1a94 100644 --- a/config/config-local.yml +++ b/config/config-local.yml @@ -28,9 +28,9 @@ queue: queue_user: user # can be replaced by values in .env file queue_password: password url: "localhost:5672" - processing_timeout: 5 # 5 second - msg_max_retry_attempts: 3 - requeue_delay_time: 60 + processing_timeout: 5s # 5 second + msg_max_retry_attempts: 10 + requeue_delay_time: 300s queue_type: quorum metrics: host: 0.0.0.0 diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go new file mode 100644 index 0000000..6aec021 --- /dev/null +++ b/consumer/event_consumer.go @@ -0,0 +1,12 @@ +package consumer + +import ( + "github.com/babylonlabs-io/staking-queue-client/client" +) + +type EventConsumer interface { + Start() error + PushStakingEvent(ev *client.StakingEvent) error + PushUnbondingEvent(ev *client.StakingEvent) error + Stop() error +} diff --git a/go.mod b/go.mod index 20b24ea..bbdfa89 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.1 + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129073153-a69b329ff376 github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index c42fdea..6988672 100644 --- a/go.sum +++ b/go.sum @@ -284,6 +284,10 @@ github.com/babylonlabs-io/babylon v0.17.1 h1:lyWGdR7B49qDw5pllLyTW/HAM5uQWXXPZef github.com/babylonlabs-io/babylon v0.17.1/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw= github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHbUU9CzhF42Ke6roK+0N3I= github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b h1:kdBDqW+wm4fiBhEiUzos9TnhmRcf6//tWyUBiBkyoqQ= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129073153-a69b329ff376 h1:m2jkCF17HzW59ER5iezaK0HBtVFSmEsoA88N+iT4TW4= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129073153-a69b329ff376/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/queue/client/client.go b/internal/queue/client/client.go deleted file mode 100644 index 1c0f92b..0000000 --- a/internal/queue/client/client.go +++ /dev/null @@ -1,37 +0,0 @@ -package client - -import ( - "context" - - "github.com/babylonlabs-io/staking-queue-client/config" -) - -type QueueMessage struct { - Body string - Receipt string - RetryAttempts int32 -} - -func (m QueueMessage) IncrementRetryAttempts() int32 { - m.RetryAttempts++ - return m.RetryAttempts -} - -func (m QueueMessage) GetRetryAttempts() int32 { - return m.RetryAttempts -} - -// A common interface for queue clients regardless if it's a SQS, RabbitMQ, etc. -type QueueClient interface { - SendMessage(ctx context.Context, messageBody string) error - ReceiveMessages() (<-chan QueueMessage, error) - DeleteMessage(receipt string) error - Stop() error - GetQueueName() string - ReQueueMessage(ctx context.Context, message QueueMessage) error - Ping(ctx context.Context) error -} - -func NewQueueClient(config *config.QueueConfig, queueName string) (QueueClient, error) { - return NewRabbitMqClient(config, queueName) -} diff --git a/internal/queue/client/rabbitmq_client.go b/internal/queue/client/rabbitmq_client.go deleted file mode 100644 index 975921f..0000000 --- a/internal/queue/client/rabbitmq_client.go +++ /dev/null @@ -1,279 +0,0 @@ -package client - -import ( - "context" - "fmt" - "strconv" - "time" - - amqp "github.com/rabbitmq/amqp091-go" - - "github.com/babylonlabs-io/staking-queue-client/config" -) - -const ( - dlxName = "common_dlx" - dlxRoutingPostfix = "_routing_key" - delayedQueuePostfix = "_delay" -) - -type RabbitMqClient struct { - connection *amqp.Connection - channel *amqp.Channel - queueName string - stopCh chan struct{} // This is used to gracefully stop the message receiving loop - delayedRequeueTime time.Duration -} - -func NewRabbitMqClient(config *config.QueueConfig, queueName string) (*RabbitMqClient, error) { - amqpURI := fmt.Sprintf("amqp://%s:%s@%s", config.QueueUser, config.QueuePassword, config.Url) - - conn, err := amqp.Dial(amqpURI) - if err != nil { - return nil, err - } - - ch, err := conn.Channel() - if err != nil { - return nil, err - } - - // Declare a single common DLX for all queues - err = ch.ExchangeDeclare(dlxName, - "direct", - true, - false, - false, - false, - amqp.Table{ - "x-queue-type": config.QueueType, - }, - ) - if err != nil { - return nil, err - } - - // Declare a delay queue specific to this particular queue - delayQueueName := queueName + delayedQueuePostfix - _, err = ch.QueueDeclare( - delayQueueName, - true, - false, - false, - false, - amqp.Table{ - // Default exchange to route messages back to the main queue - // The "" in rabbitMq referring to the default exchange which allows - // to route messages to the queue by the routing key which is the queue name - "x-queue-type": config.QueueType, - "x-dead-letter-exchange": "", - "x-dead-letter-routing-key": queueName, - }, - ) - if err != nil { - return nil, err - } - - // Declare the queue that will be created if not exists - customDlxRoutingKey := queueName + dlxRoutingPostfix - _, err = ch.QueueDeclare( - queueName, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - amqp.Table{ - "x-queue-type": config.QueueType, - "x-dead-letter-exchange": dlxName, - "x-dead-letter-routing-key": customDlxRoutingKey, - }, - ) - if err != nil { - return nil, err - } - - // Bind the delay queue to the common DLX - err = ch.QueueBind(delayQueueName, customDlxRoutingKey, dlxName, false, nil) - if err != nil { - return nil, err - } - - err = ch.Confirm(false) - if err != nil { - return nil, err - } - - return &RabbitMqClient{ - connection: conn, - channel: ch, - queueName: queueName, - stopCh: make(chan struct{}), - delayedRequeueTime: time.Duration(config.ReQueueDelayTime) * time.Second, - }, nil -} - -// Ping checks the health of the RabbitMQ infrastructure. -func (c *RabbitMqClient) Ping(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // Check if the RabbitMQ connection is closed - if c.connection.IsClosed() { - return fmt.Errorf("rabbitMQ connection is closed") - } - - // Check if the RabbitMQ channel is closed - if c.channel.IsClosed() { - return fmt.Errorf("rabbitMQ channel is closed") - } - } - - return nil -} - -func (c *RabbitMqClient) ReceiveMessages() (<-chan QueueMessage, error) { - msgs, err := c.channel.Consume( - c.queueName, // queueName - "", // consumer - false, // auto-ack. We want to manually acknowledge the message after processing it. - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - return nil, err - } - output := make(chan QueueMessage) - go func() { - defer close(output) - for { - select { - case d, ok := <-msgs: - if !ok { - return // Channel closed, exit goroutine - } - attempts := d.Headers["x-processing-attempts"] - if attempts == nil { - attempts = int32(0) - } - currentAttempt := attempts.(int32) - - output <- QueueMessage{ - Body: string(d.Body), - Receipt: strconv.FormatUint(d.DeliveryTag, 10), - RetryAttempts: currentAttempt, - } - case <-c.stopCh: - return // Stop signal received, exit goroutine - } - } - }() - - return output, nil -} - -// DeleteMessage deletes a message from the queue. In RabbitMQ, this is equivalent to acknowledging the message. -// The deliveryTag is the unique identifier for the message. -func (c *RabbitMqClient) DeleteMessage(deliveryTag string) error { - deliveryTagInt, err := strconv.ParseUint(deliveryTag, 10, 64) - if err != nil { - return err - } - return c.channel.Ack(deliveryTagInt, false) -} - -// ReQueueMessage requeues a message back to the queue with a delay. -// This is done by sending the message again with an incremented counter. -// The original message is then deleted from the queue. -func (c *RabbitMqClient) ReQueueMessage(ctx context.Context, message QueueMessage) error { - // For requeueing, we will send the message to a delay queue that has a TTL pre-configured. - delayQueueName := c.queueName + delayedQueuePostfix - err := c.sendMessageWithAttempts(ctx, message.Body, delayQueueName, message.IncrementRetryAttempts(), c.delayedRequeueTime) - if err != nil { - return fmt.Errorf("failed to requeue message: %w", err) - } - - err = c.DeleteMessage(message.Receipt) - if err != nil { - return fmt.Errorf("failed to delete message while requeuing: %w", err) - } - - return nil -} - -// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. -func (c *RabbitMqClient) sendMessageWithAttempts(ctx context.Context, messageBody, queueName string, attempts int32, ttl time.Duration) error { - // Ensure the channel is open - if c.channel == nil { - return fmt.Errorf("RabbitMQ channel not initialized") - } - - // Prepare new headers with the incremented counter - newHeaders := amqp.Table{ - "x-processing-attempts": attempts, - } - - publishMsg := amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - Body: []byte(messageBody), - Headers: newHeaders, - } - - // Exclude the expiration if the TTL is 0. - if ttl > 0 { - publishMsg.Expiration = strconv.Itoa(int(ttl.Milliseconds())) - } - - // Publish a message to the queue - confirmation, err := c.channel.PublishWithDeferredConfirmWithContext( - ctx, - "", // exchange: Use the default exchange - queueName, // routing key: The queue this message should be routed to - true, // mandatory: true indicates the server must route the message to a queue, otherwise error - false, // immediate: false indicates the server may wait to send the message until a consumer is available - publishMsg, - ) - - if err != nil { - return fmt.Errorf("failed to publish a message to queue %s: %w", queueName, err) - } - - if confirmation == nil { - return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) - } - confirmed, err := confirmation.WaitContext(ctx) - if err != nil { - return fmt.Errorf("failed to confirm message when publishing into queue %s: %w", queueName, err) - } - if !confirmed { - return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) - } - - return nil -} - -// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. -func (c *RabbitMqClient) SendMessage(ctx context.Context, messageBody string) error { - return c.sendMessageWithAttempts(ctx, messageBody, c.queueName, 0, 0) -} - -// Stop stops the message receiving process. -func (c *RabbitMqClient) Stop() error { - if err := c.channel.Close(); err != nil { - return err - } - if err := c.connection.Close(); err != nil { - return err - } - - close(c.stopCh) - - return nil -} - -func (c *RabbitMqClient) GetQueueName() string { - return c.queueName -} diff --git a/internal/queue/client/schema.go b/internal/queue/client/schema.go deleted file mode 100644 index 1af41c7..0000000 --- a/internal/queue/client/schema.go +++ /dev/null @@ -1,330 +0,0 @@ -package client - -const ( - ActiveStakingQueueName string = "active_staking_queue" - UnbondingStakingQueueName string = "unbonding_staking_queue" - WithdrawStakingQueueName string = "withdraw_staking_queue" - ExpiredStakingQueueName string = "expired_staking_queue" - StakingStatsQueueName string = "staking_stats_queue" - BtcInfoQueueName string = "btc_info_queue" - ConfirmedInfoQueueName string = "confirmed_info_queue" - VerifiedStakingQueueName string = "verified_staking_queue" - PendingStakingQueueName string = "pending_staking_queue" -) - -const ( - ActiveStakingEventType EventType = 1 - UnbondingStakingEventType EventType = 2 - WithdrawStakingEventType EventType = 3 - ExpiredStakingEventType EventType = 4 - StatsEventType EventType = 5 - BtcInfoEventType EventType = 6 - ConfirmedInfoEventType EventType = 7 - VerifiedStakingEventType EventType = 8 - PendingStakingEventType EventType = 9 -) - -// Event schema versions, only increment when the schema changes -const ( - ActiveEventVersion int = 0 - UnbondingEventVersion int = 0 - WithdrawEventVersion int = 1 - ExpiredEventVersion int = 0 - StatsEventVersion int = 1 - BtcInfoEventVersion int = 0 - ConfirmedInfoEventVersion int = 0 - VerifiedEventVersion int = 0 - PendingEventVersion int = 0 -) - -type EventType int - -type EventMessage interface { - GetEventType() EventType - GetStakingTxHashHex() string -} - -type ActiveStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerBtcPkHex string `json:"staker_btc_pk_hex"` - FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingValue uint64 `json:"staking_value"` - StakingStartHeight uint64 `json:"staking_start_height"` - StakingStartTimestamp int64 `json:"staking_start_timestamp"` - StakingTimeLock uint64 `json:"staking_timelock"` - StakingOutputIndex uint64 `json:"staking_output_index"` - StakingTxHex string `json:"staking_tx_hex"` - IsOverflow bool `json:"is_overflow"` -} - -func (e ActiveStakingEvent) GetEventType() EventType { - return ActiveStakingEventType -} - -func (e ActiveStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewActiveStakingEvent( - stakingTxHashHex string, - stakerBtcPkHex string, - finalityProviderBtcPksHex []string, - stakingValue uint64, - stakingStartHeight uint64, - stakingStartTimestamp int64, - stakingTimeLock uint64, - stakingOutputIndex uint64, - stakingTxHex string, - isOverflow bool, -) ActiveStakingEvent { - return ActiveStakingEvent{ - SchemaVersion: ActiveEventVersion, - EventType: ActiveStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerBtcPkHex: stakerBtcPkHex, - FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingValue: stakingValue, - StakingStartHeight: stakingStartHeight, - StakingStartTimestamp: stakingStartTimestamp, - StakingTimeLock: stakingTimeLock, - StakingOutputIndex: stakingOutputIndex, - StakingTxHex: stakingTxHex, - IsOverflow: isOverflow, - } -} - -type UnbondingStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - UnbondingStartHeight uint64 `json:"unbonding_start_height"` - UnbondingStartTimestamp int64 `json:"unbonding_start_timestamp"` - UnbondingTimeLock uint64 `json:"unbonding_timelock"` - UnbondingOutputIndex uint64 `json:"unbonding_output_index"` - UnbondingTxHex string `json:"unbonding_tx_hex"` - UnbondingTxHashHex string `json:"unbonding_tx_hash_hex"` -} - -func (e UnbondingStakingEvent) GetEventType() EventType { - return UnbondingStakingEventType -} - -func (e UnbondingStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewUnbondingStakingEvent( - stakingTxHashHex string, - unbondingStartHeight uint64, - unbondingStartTimestamp int64, - unbondingTimeLock uint64, - unbondingOutputIndex uint64, - unbondingTxHex string, - unbondingTxHashHex string, -) UnbondingStakingEvent { - return UnbondingStakingEvent{ - SchemaVersion: UnbondingEventVersion, - EventType: UnbondingStakingEventType, - StakingTxHashHex: stakingTxHashHex, - UnbondingStartHeight: unbondingStartHeight, - UnbondingStartTimestamp: unbondingStartTimestamp, - UnbondingTimeLock: unbondingTimeLock, - UnbondingOutputIndex: unbondingOutputIndex, - UnbondingTxHex: unbondingTxHex, - UnbondingTxHashHex: unbondingTxHashHex, - } -} - -type WithdrawStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - WithdrawTxHashHex string `json:"withdraw_tx_hash_hex"` - WithdrawTxBtcHeight uint64 `json:"withdraw_tx_btc_height"` - WithdrawTxHex string `json:"withdraw_tx_hex"` -} - -func (e WithdrawStakingEvent) GetEventType() EventType { - return WithdrawStakingEventType -} - -func (e WithdrawStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewWithdrawStakingEvent( - stakingTxHashHex string, - withdrawTxHashHex string, - withdrawTxBtcHeight uint64, - withdrawTxHex string, -) WithdrawStakingEvent { - return WithdrawStakingEvent{ - SchemaVersion: WithdrawEventVersion, - EventType: WithdrawStakingEventType, - StakingTxHashHex: stakingTxHashHex, - WithdrawTxHashHex: withdrawTxHashHex, - WithdrawTxBtcHeight: withdrawTxBtcHeight, - WithdrawTxHex: withdrawTxHex, - } -} - -type ExpiredStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - TxType string `json:"tx_type"` -} - -func (e ExpiredStakingEvent) GetEventType() EventType { - return ExpiredStakingEventType -} - -func (e ExpiredStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { - return ExpiredStakingEvent{ - SchemaVersion: ExpiredEventVersion, - EventType: ExpiredStakingEventType, - StakingTxHashHex: stakingTxHashHex, - TxType: txType, - } -} - -type StatsEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 5. StatsEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerPkHex string `json:"staker_pk_hex"` - FinalityProviderPkHex string `json:"finality_provider_pk_hex"` - StakingValue uint64 `json:"staking_value"` - State string `json:"state"` - IsOverflow bool `json:"is_overflow"` -} - -func (e StatsEvent) GetEventType() EventType { - return StatsEventType -} - -func (e StatsEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewStatsEvent( - stakingTxHashHex string, - stakerPkHex string, - finalityProviderPkHex string, - stakingValue uint64, - state string, - isOverflow bool, -) StatsEvent { - return StatsEvent{ - SchemaVersion: StatsEventVersion, - EventType: StatsEventType, - StakingTxHashHex: stakingTxHashHex, - StakerPkHex: stakerPkHex, - FinalityProviderPkHex: finalityProviderPkHex, - StakingValue: stakingValue, - State: state, - IsOverflow: isOverflow, - } -} - -type BtcInfoEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 6. BtcInfoEventType - Height uint64 `json:"height"` - ConfirmedTvl uint64 `json:"confirmed_tvl"` - UnconfirmedTvl uint64 `json:"unconfirmed_tvl"` -} - -func (e BtcInfoEvent) GetEventType() EventType { - return BtcInfoEventType -} - -// Not applicable, add it here to implement the EventMessage interface -func (e BtcInfoEvent) GetStakingTxHashHex() string { - return "" -} - -func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { - return BtcInfoEvent{ - SchemaVersion: BtcInfoEventVersion, - EventType: BtcInfoEventType, - Height: height, - ConfirmedTvl: confirmedTvl, - UnconfirmedTvl: unconfirmedTvl, - } -} - -type ConfirmedInfoEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType - Height uint64 `json:"height"` - Tvl uint64 `json:"tvl"` -} - -func (e ConfirmedInfoEvent) GetEventType() EventType { - return ConfirmedInfoEventType -} - -// Not applicable, add it here to implement the EventMessage interface -func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { - return "" -} - -func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { - return ConfirmedInfoEvent{ - SchemaVersion: ConfirmedInfoEventVersion, - EventType: ConfirmedInfoEventType, - Height: height, - Tvl: tvl, - } -} - -type VerifiedStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 8. VerifiedStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` -} - -func (e VerifiedStakingEvent) GetEventType() EventType { - return VerifiedStakingEventType -} - -func (e VerifiedStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewVerifiedStakingEvent(stakingTxHashHex string) VerifiedStakingEvent { - return VerifiedStakingEvent{ - SchemaVersion: VerifiedEventVersion, - EventType: VerifiedStakingEventType, - StakingTxHashHex: stakingTxHashHex, - } -} - -type PendingStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 9. PendingStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` -} - -func (e PendingStakingEvent) GetEventType() EventType { - return PendingStakingEventType -} - -func (e PendingStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewPendingStakingEvent(stakingTxHashHex string) PendingStakingEvent { - return PendingStakingEvent{ - SchemaVersion: PendingEventVersion, - EventType: PendingStakingEventType, - StakingTxHashHex: stakingTxHashHex, - } -} diff --git a/internal/queue/queue.go b/internal/queue/queue.go deleted file mode 100644 index 62cf0a8..0000000 --- a/internal/queue/queue.go +++ /dev/null @@ -1,151 +0,0 @@ -package queue - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" - "github.com/rs/zerolog/log" - - "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" - queueConfig "github.com/babylonlabs-io/staking-queue-client/config" -) - -type QueueManager struct { - stakingExpiredEventQueue client.QueueClient - unbondingEventQueue client.QueueClient - activeStakingEventQueue client.QueueClient - verifiedStakingEventQueue client.QueueClient - pendingStakingEventQueue client.QueueClient -} - -func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { - stakingEventQueue, err := client.NewQueueClient(cfg, client.ExpiredStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize staking event queue: %w", err) - } - - unbondingEventQueue, err := client.NewQueueClient(cfg, client.UnbondingStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize unbonding event queue: %w", err) - } - - activeStakingEventQueue, err := client.NewQueueClient(cfg, client.ActiveStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize active staking event queue: %w", err) - } - - verifiedStakingEventQueue, err := client.NewQueueClient(cfg, client.VerifiedStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize verified staking event queue: %w", err) - } - - pendingStakingEventQueue, err := client.NewQueueClient(cfg, client.PendingStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize pending staking event queue: %w", err) - } - - return &QueueManager{ - stakingExpiredEventQueue: stakingEventQueue, - unbondingEventQueue: unbondingEventQueue, - activeStakingEventQueue: activeStakingEventQueue, - verifiedStakingEventQueue: verifiedStakingEventQueue, - pendingStakingEventQueue: pendingStakingEventQueue, - }, nil -} - -func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.ExpiredStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("publishing expired staking event") - err = qm.stakingExpiredEventQueue.SendMessage(ctx, messageBody) - if err != nil { - metrics.RecordQueueSendError() - log.Fatal().Err(err).Str("tx_hash", ev.StakingTxHashHex).Msg("failed to publish staking event") - } - log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("successfully published expired staking event") - - return nil -} - -func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("pushing unbonding event") - err = qm.unbondingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push unbonding event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("successfully pushed unbonding event") - - return nil -} - -func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.ActiveStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing active staking event") - err = qm.activeStakingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push active staking event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed active staking event") - - return nil -} - -func (qm *QueueManager) SendVerifiedStakingEvent(ctx context.Context, ev *client.VerifiedStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing verified staking event") - err = qm.verifiedStakingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push verified staking event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed verified staking event") - - return nil -} - -func (qm *QueueManager) SendPendingStakingEvent(ctx context.Context, ev *client.PendingStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing pending staking event") - err = qm.pendingStakingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push pending staking event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed pending staking event") - - return nil -} - -// Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. -func (qm *QueueManager) Shutdown() { - err := qm.stakingExpiredEventQueue.Stop() - if err != nil { - log.Error().Err(err).Msg("failed to stop staking expired event queue") - } - -} diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 6e7e18f..b48b5a3 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -4,12 +4,10 @@ import ( "context" "fmt" "net/http" - "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" - queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" - "github.com/rs/zerolog/log" + queuecli "github.com/babylonlabs-io/staking-queue-client/client" ) func (s *Service) emitConsumerEvent( @@ -18,14 +16,8 @@ func (s *Service) emitConsumerEvent( switch newState { case types.StateActive: return s.sendActiveDelegationEvent(ctx, delegation) - case types.StateVerified: - return s.sendVerifiedDelegationEvent(ctx, delegation) - case types.StatePending: - return s.sendPendingDelegationEvent(ctx, delegation) case types.StateUnbonding: return s.sendUnbondingDelegationEvent(ctx, delegation) - case types.StateWithdrawable: - return s.sendWithdrawableDelegationEvent(ctx, delegation) default: return types.NewError( http.StatusInternalServerError, @@ -37,64 +29,28 @@ func (s *Service) emitConsumerEvent( // TODO: fix the queue event schema func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewActiveStakingEvent( + stakingEvent := queuecli.NewActiveStakingEventV2( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, - 0, - uint64(delegation.StartHeight), - time.Now().Unix(), - uint64(delegation.StakingTime), - uint64(delegation.StakingOutputIdx), - delegation.StakingTxHex, - false, + delegation.StakingAmount, ) - if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) - } - return nil -} - -func (s *Service) sendVerifiedDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewVerifiedStakingEvent(delegation.StakingTxHashHex) - if err := s.queueManager.SendVerifiedStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send verified staking event: %w", err)) - } - return nil -} -func (s *Service) sendPendingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewPendingStakingEvent(delegation.StakingTxHashHex) - if err := s.queueManager.SendPendingStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send pending staking event: %w", err)) + if err := s.queueManager.PushStakingEvent(&stakingEvent); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to push the staking event to the queue: %w", err)) } return nil } func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewUnbondingStakingEvent( + ev := queuecli.NewUnbondingStakingEventV2( delegation.StakingTxHashHex, - uint64(delegation.EndHeight), - time.Now().Unix(), - uint64(delegation.StartHeight), - uint64(delegation.EndHeight), - delegation.UnbondingTx, - "", + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + delegation.StakingAmount, ) - if err := s.queueManager.SendUnbondingStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send unbonding staking event: %w", err)) + if err := s.queueManager.PushUnbondingEvent(&ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err)) } return nil } - -func (s *Service) sendWithdrawableDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, "") // TODO: add the correct tx type - if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { - log.Error().Err(err).Msg("Error sending expired staking event") - return types.NewInternalServiceError( - fmt.Errorf("failed to send expired staking event: %w", err), - ) - } - - return nil -} diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 49bfcea..d55ed88 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -55,10 +55,6 @@ func (s *Service) processNewBTCDelegationEvent( return err } - if err = s.emitConsumerEvent(ctx, types.StatePending, delegationDoc); err != nil { - return err - } - if dbErr := s.db.SaveNewBTCDelegation( ctx, delegationDoc, ); dbErr != nil { @@ -153,9 +149,12 @@ func (s *Service) processCovenantQuorumReachedEvent( ) } newState := types.DelegationState(covenantQuorumReachedEvent.NewState) - err = s.emitConsumerEvent(ctx, newState, delegation) - if err != nil { - return err + // Emit consumer event if the new state is active + if newState == types.StateActive { + err = s.emitConsumerEvent(ctx, newState, delegation) + if err != nil { + return err + } } if dbErr := s.db.UpdateBTCDelegationState( @@ -204,9 +203,12 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( } newState := types.DelegationState(inclusionProofEvent.NewState) - err = s.emitConsumerEvent(ctx, newState, delegation) - if err != nil { - return err + // Emit consumer event if the new state is active + if newState == types.StateActive { + err = s.emitConsumerEvent(ctx, newState, delegation) + if err != nil { + return err + } } if dbErr := s.db.UpdateBTCDelegationDetails( diff --git a/internal/services/expiry_checker.go b/internal/services/expiry_checker.go index c81c49f..0df880d 100644 --- a/internal/services/expiry_checker.go +++ b/internal/services/expiry_checker.go @@ -58,11 +58,6 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { continue } - consumerErr := s.emitConsumerEvent(ctx, types.StateWithdrawable, delegation) - if consumerErr != nil { - return consumerErr - } - if err := s.db.UpdateBTCDelegationState( ctx, delegation.StakingTxHashHex, diff --git a/internal/services/service.go b/internal/services/service.go index 75b56a0..0bddfea 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -6,11 +6,11 @@ import ( "github.com/rs/zerolog/log" + "github.com/babylonlabs-io/babylon-staking-indexer/consumer" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" notifier "github.com/lightningnetwork/lnd/chainntnfs" ) @@ -23,7 +23,7 @@ type Service struct { btc btcclient.BtcInterface btcNotifier notifier.ChainNotifier bbn bbnclient.BbnInterface - queueManager *queue.QueueManager + queueManager consumer.EventConsumer bbnEventProcessor chan BbnEvent latestHeightChan chan int64 } @@ -34,7 +34,7 @@ func NewService( btc btcclient.BtcInterface, btcNotifier notifier.ChainNotifier, bbn bbnclient.BbnInterface, - qm *queue.QueueManager, + consumer consumer.EventConsumer, ) *Service { eventProcessor := make(chan BbnEvent, eventProcessorSize) latestHeightChan := make(chan int64) @@ -45,7 +45,7 @@ func NewService( btc: btc, btcNotifier: btcNotifier, bbn: bbn, - queueManager: qm, + queueManager: consumer, bbnEventProcessor: eventProcessor, latestHeightChan: latestHeightChan, } @@ -60,6 +60,10 @@ func (s *Service) StartIndexerSync(ctx context.Context) { log.Fatal().Err(err).Msg("failed to start btc chain notifier") } + if err := s.queueManager.Start(); err != nil { + log.Fatal().Err(err).Msg("failed to start the event consumer") + } + // Sync global parameters s.SyncGlobalParams(ctx) // Start the expiry checker