Skip to content

Commit

Permalink
syncer: exit broadcast as soon as possible (#9018) (#9070)
Browse files Browse the repository at this point in the history
close #9017

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
ti-chi-bot and rleungx authored Feb 12, 2025
1 parent ce2d0f1 commit ca61765
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -160,13 +161,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor
RegionLeaders: leaders,
Buckets: buckets,
}
s.broadcast(regions)
s.broadcast(ctx, regions)
case <-ticker.C:
alive := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
StartIndex: s.history.GetNextIndex(),
}
s.broadcast(alive)
s.broadcast(ctx, alive)
}
requests = requests[:0]
stats = stats[:0]
Expand Down Expand Up @@ -332,23 +333,39 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) {
s.mu.streams[name] = stream
}

func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) {
broadcastDone := make(chan struct{}, 1)
go func() {
defer logutil.LogPanic()
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
select {
case <-ctx.Done():
s.mu.RUnlock()
close(broadcastDone)
return
default:
}
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
}
}
}
s.mu.RUnlock()
if len(failed) > 0 {
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
s.mu.RUnlock()
if len(failed) > 0 {
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
}
s.mu.Unlock()
}
s.mu.Unlock()
close(broadcastDone)
}()
select {
case <-broadcastDone:
case <-ctx.Done():
}
}

0 comments on commit ca61765

Please sign in to comment.