Skip to content

Commit

Permalink
chore: cosmetics + close the block API in case of failure
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Nov 25, 2024
1 parent 55b868a commit 73179bb
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
41 changes: 34 additions & 7 deletions rpc/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/libs/pubsub"
"sync"
"time"

Expand Down Expand Up @@ -54,11 +55,15 @@ type BlockAPI struct {
sync.Mutex
heightListeners map[chan NewHeightEvent]struct{}
newBlockSubscription eventstypes.Subscription
subscriptionID string
subscriptionQuery pubsub.Query
}

func NewBlockAPI() *BlockAPI {
return &BlockAPI{
heightListeners: make(map[chan NewHeightEvent]struct{}, 1000),
heightListeners: make(map[chan NewHeightEvent]struct{}, 1000),
subscriptionID: fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
subscriptionQuery: eventstypes.EventQueryNewBlock,
}
}

Expand All @@ -68,8 +73,8 @@ func (blockAPI *BlockAPI) StartNewBlockEventListener(ctx context.Context) error
var err error
blockAPI.newBlockSubscription, err = env.EventBus.Subscribe(
ctx,
fmt.Sprintf("new-block-grpc-subscription-%s", rand.Str(6)),
eventstypes.EventQueryNewBlock,
blockAPI.subscriptionID,
blockAPI.subscriptionQuery,
500,
)
if err != nil {
Expand All @@ -85,7 +90,6 @@ func (blockAPI *BlockAPI) StartNewBlockEventListener(ctx context.Context) error
env.Logger.Error("cancelled grpc subscription. retrying")
ok, err := blockAPI.retryNewBlocksSubscription(ctx)
if err != nil {
blockAPI.closeAllListeners()
return err
}
if !ok {
Expand All @@ -97,7 +101,6 @@ func (blockAPI *BlockAPI) StartNewBlockEventListener(ctx context.Context) error
env.Logger.Error("new blocks subscription closed. re-subscribing")
ok, err := blockAPI.retryNewBlocksSubscription(ctx)
if err != nil {
blockAPI.closeAllListeners()
return err
}
if !ok {
Expand Down Expand Up @@ -140,8 +143,8 @@ func (blockAPI *BlockAPI) retryNewBlocksSubscription(ctx context.Context) (bool,
var err error
blockAPI.newBlockSubscription, err = env.EventBus.Subscribe(
ctx,
fmt.Sprintf("new-block-grpc-subscription-%s", rand.Str(6)),
eventstypes.EventQueryNewBlock,
fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
blockAPI.subscriptionQuery,
SubscriptionCapacity,
)
if err != nil {
Expand All @@ -161,7 +164,9 @@ func (blockAPI *BlockAPI) broadcastToListeners(ctx context.Context, height int64
func() {
defer func() {
if r := recover(); r != nil {
// logging the error then removing the heights listener
core.GetEnvironment().Logger.Debug("failed to write to heights listener", "err", r)
blockAPI.removeHeightListener(ch)
}
}()
select {
Expand Down Expand Up @@ -190,11 +195,33 @@ func (blockAPI *BlockAPI) removeHeightListener(ch chan NewHeightEvent) {
func (blockAPI *BlockAPI) closeAllListeners() {
blockAPI.Lock()
defer blockAPI.Unlock()
if blockAPI.heightListeners == nil {
// if this is nil, then there is no need to close anything
return
}
for channel := range blockAPI.heightListeners {
delete(blockAPI.heightListeners, channel)
}
}

// Stop cleans up the BlockAPI instance by closing all listeners
// and ensuring no further events are processed.
func (blockAPI *BlockAPI) Stop(ctx context.Context) {
blockAPI.Lock()
defer blockAPI.Unlock()

// close all height listeners
blockAPI.closeAllListeners()

// stop the events subscription
if blockAPI.newBlockSubscription != nil {
core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)

Check failure on line 218 in rpc/grpc/api.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value of `.Unsubscribe` is not checked (errcheck)
blockAPI.newBlockSubscription = nil
}

core.GetEnvironment().Logger.Info("gRPC streaming API has been stopped")
}

func (blockAPI *BlockAPI) BlockByHash(req *BlockByHashRequest, stream BlockAPI_BlockByHashServer) error {
blockStore := core.GetEnvironment().BlockStore
blockMeta := blockStore.LoadBlockMetaByHash(req.Hash)
Expand Down
1 change: 1 addition & 0 deletions rpc/grpc/client_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func StartGRPCServer(ln net.Listener) error {
errCh <- grpcServer.Serve(ln)
}()
defer grpcServer.GracefulStop()
defer api.Stop(ctx)
// blocks until one errors or returns nil
return <-errCh
}
Expand Down

0 comments on commit 73179bb

Please sign in to comment.