diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 7d339e75dbe..d3d7c983509 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -32,6 +32,11 @@ import ( "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/grpcutil" +<<<<<<< HEAD +======= + "github.com/tikv/pd/pkg/utils/keypath" + "github.com/tikv/pd/pkg/utils/logutil" +>>>>>>> cbb4dfeb7 (syncer: exit broadcast as soon as possible (#9018)) "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -160,13 +165,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor RegionLeaders: leaders, Buckets: buckets, } - s.broadcast(regions) + s.broadcast(ctx, regions) case <-ticker.C: alive := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, StartIndex: s.history.GetNextIndex(), } - s.broadcast(alive) + s.broadcast(ctx, alive) } requests = requests[:0] stats = stats[:0] @@ -332,23 +337,39 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) { s.mu.streams[name] = stream } -func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) { - var failed []string - s.mu.RLock() - for name, sender := range s.mu.streams { - err := sender.Send(regions) - if err != nil { - log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) - failed = append(failed, name) +func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) { + broadcastDone := make(chan struct{}, 1) + go func() { + defer logutil.LogPanic() + var failed []string + s.mu.RLock() + for name, sender := range s.mu.streams { + select { + case <-ctx.Done(): + s.mu.RUnlock() + close(broadcastDone) + return + default: + } + err := sender.Send(regions) + if err != nil { + log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) + failed = append(failed, name) + } } - } - s.mu.RUnlock() - if len(failed) > 0 { - s.mu.Lock() - for _, name := range failed { - delete(s.mu.streams, name) - log.Info("region syncer delete the stream", zap.String("stream", name)) + s.mu.RUnlock() + if len(failed) > 0 { + s.mu.Lock() + for _, name := range failed { + delete(s.mu.streams, name) + log.Info("region syncer delete the stream", zap.String("stream", name)) + } + s.mu.Unlock() } - s.mu.Unlock() + close(broadcastDone) + }() + select { + case <-broadcastDone: + case <-ctx.Done(): } }