Skip to content

Commit

Permalink
mcs: add lock for forward tso stream (#9095) (#9106)
Browse files Browse the repository at this point in the history
close #9091

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
ti-chi-bot and lhy1024 authored Feb 28, 2025
1 parent 727c208 commit 2da8160
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 6 deletions.
15 changes: 12 additions & 3 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) {
}
var (
forwardedHost string
forwardStream tsopb.TSO_TsoClient
forwardStream *streamWrapper
ts *tsopb.TsoResponse
err error
ok bool
Expand Down Expand Up @@ -464,15 +464,21 @@ func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) {
if err != nil {
return pdpb.Timestamp{}, err
}
start := time.Now()
forwardStream.Lock()
err = forwardStream.Send(request)
if err != nil {
if needRetry := handleStreamError(err); needRetry {
forwardStream.Unlock()
continue
}
log.Error("send request to tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
forwardStream.Unlock()
return pdpb.Timestamp{}, err
}
ts, err = forwardStream.Recv()
forwardStream.Unlock()
forwardTsoDuration.Observe(time.Since(start).Seconds())
if err != nil {
if needRetry := handleStreamError(err); needRetry {
continue
Expand All @@ -486,7 +492,7 @@ func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) {
return pdpb.Timestamp{}, err
}

func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {
func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (*streamWrapper, error) {
s.tsoClientPool.RLock()
forwardStream, ok := s.tsoClientPool.clients[forwardedHost]
s.tsoClientPool.RUnlock()
Expand All @@ -512,11 +518,14 @@ func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoCli
done := make(chan struct{})
ctx, cancel := context.WithCancel(s.ctx)
go grpcutil.CheckStream(ctx, cancel, done)
forwardStream, err = tsopb.NewTSOClient(client).Tso(ctx)
tsoClient, err := tsopb.NewTSOClient(client).Tso(ctx)
done <- struct{}{}
if err != nil {
return nil, err
}
forwardStream = &streamWrapper{
TSO_TsoClient: tsoClient,
}
s.tsoClientPool.clients[forwardedHost] = forwardStream
return forwardStream, nil
}
9 changes: 9 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ var (
Name: "forward_fail_total",
Help: "Counter of forward fail.",
}, []string{"request", "type"})
forwardTsoDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "forward_tso_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handled forward tso requests.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
})
)

func init() {
Expand All @@ -180,4 +188,5 @@ func init() {
prometheus.MustRegister(bucketReportInterval)
prometheus.MustRegister(apiConcurrencyGauge)
prometheus.MustRegister(forwardFailCounter)
prometheus.MustRegister(forwardTsoDuration)
}
11 changes: 8 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ var (
etcdCommittedIndexGauge = etcdStateGauge.WithLabelValues("committedIndex")
)

type streamWrapper struct {
tsopb.TSO_TsoClient
syncutil.Mutex
}

// Server is the pd server. It implements bs.Server
type Server struct {
diagnosticspb.DiagnosticsServer
Expand Down Expand Up @@ -201,7 +206,7 @@ type Server struct {

tsoClientPool struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
clients map[string]*streamWrapper
}

// tsoDispatcher is used to dispatch different TSO requests to
Expand Down Expand Up @@ -262,9 +267,9 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
mode: mode,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
clients map[string]*streamWrapper
}{
clients: make(map[string]tsopb.TSO_TsoClient),
clients: make(map[string]*streamWrapper),
},
}
s.handler = newHandler(s)
Expand Down
137 changes: 137 additions & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -480,6 +481,142 @@ func (suite *APIServerForward) checkAvailableTSO(re *require.Assertions) {
re.NoError(err)
}

func TestForwardTsoConcurrently(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)

leaderName := cluster.WaitLeader()
pdLeader := cluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
leader := cluster.GetServer(cluster.WaitLeader())
rc := leader.GetServer().GetRaftCluster()
for i := range 3 {
region := &metapb.Region{
Id: uint64(i*4 + 1),
Peers: []*metapb.Peer{{Id: uint64(i*4 + 2), StoreId: uint64(i*4 + 3)}},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
}()

tc, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

wg := sync.WaitGroup{}
for i := range 3 {
wg.Add(1)
go func(i int) {
defer wg.Done()
pdClient, err := pd.NewClientWithContext(
context.Background(),
[]string{backendEndpoints},
pd.SecurityOption{})
re.NoError(err)
re.NotNil(pdClient)
defer pdClient.Close()
for range 10 {
testutil.Eventually(re, func() bool {
min, err := pdClient.UpdateServiceGCSafePoint(context.Background(), fmt.Sprintf("service-%d", i), 1000, 1)
return err == nil && min == 0
})
}
}(i)
}
wg.Wait()
}

func BenchmarkForwardTsoConcurrently(b *testing.B) {
re := require.New(b)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)

leaderName := cluster.WaitLeader()
pdLeader := cluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
leader := cluster.GetServer(cluster.WaitLeader())
rc := leader.GetServer().GetRaftCluster()
for i := range 3 {
region := &metapb.Region{
Id: uint64(i*4 + 1),
Peers: []*metapb.Peer{{Id: uint64(i*4 + 2), StoreId: uint64(i*4 + 3)}},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
}()

tc, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)

initClients := func(num int) []pd.Client {
var clients []pd.Client
for range num {
pdClient, err := pd.NewClientWithContext(context.Background(),
[]string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1))
re.NoError(err)
re.NotNil(pdClient)
clients = append(clients, pdClient)
}
return clients
}

concurrencyLevels := []int{1, 2, 5, 10, 20}
for _, clientsNum := range concurrencyLevels {
clients := initClients(clientsNum)
b.Run(fmt.Sprintf("clients_%d", clientsNum), func(b *testing.B) {
wg := sync.WaitGroup{}
b.ResetTimer()
for range b.N {
for j, client := range clients {
wg.Add(1)
go func(j int, client pd.Client) {
defer wg.Done()
for range 1000 {
min, err := client.UpdateServiceGCSafePoint(context.Background(), fmt.Sprintf("service-%d", j), 1000, 1)
re.NoError(err)
re.Equal(uint64(0), min)
}
}(j, client)
}
}
wg.Wait()
})
for _, c := range clients {
c.Close()
}
}
}

type CommonTestSuite struct {
suite.Suite
ctx context.Context
Expand Down

0 comments on commit 2da8160

Please sign in to comment.