Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#9018
Browse files Browse the repository at this point in the history
close tikv#9017

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed Feb 12, 2025
1 parent ce2d0f1 commit d05c709
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/grpcutil"
<<<<<<< HEAD

Check failure on line 35 in pkg/syncer/server.go

View workflow job for this annotation

GitHub Actions / statics

missing import path
=======
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
>>>>>>> cbb4dfeb7 (syncer: exit broadcast as soon as possible (#9018))
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -160,13 +165,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor
RegionLeaders: leaders,
Buckets: buckets,
}
s.broadcast(regions)
s.broadcast(ctx, regions)
case <-ticker.C:
alive := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
StartIndex: s.history.GetNextIndex(),
}
s.broadcast(alive)
s.broadcast(ctx, alive)
}
requests = requests[:0]
stats = stats[:0]
Expand Down Expand Up @@ -332,23 +337,39 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) {
s.mu.streams[name] = stream
}

func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) {
broadcastDone := make(chan struct{}, 1)
go func() {
defer logutil.LogPanic()
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
select {
case <-ctx.Done():
s.mu.RUnlock()
close(broadcastDone)
return
default:
}
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
}
}
}
s.mu.RUnlock()
if len(failed) > 0 {
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
s.mu.RUnlock()
if len(failed) > 0 {
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
}
s.mu.Unlock()
}
s.mu.Unlock()
close(broadcastDone)
}()
select {
case <-broadcastDone:
case <-ctx.Done():
}
}

0 comments on commit d05c709

Please sign in to comment.