From 168abacf95bde85ad75b26c149d8df6c638eb26a Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Tue, 24 Dec 2024 13:38:18 +0800 Subject: [PATCH] This is an automated cherry-pick of #11903 Signed-off-by: ti-chi-bot --- cdc/kv/shared_client.go | 975 +++++++++++++++++++++++++++++++++++ cdc/kv/shared_client_test.go | 407 +++++++++++++++ cdc/kv/shared_stream.go | 540 +++++++++++++++++++ pkg/version/check.go | 4 + 4 files changed, 1926 insertions(+) create mode 100644 cdc/kv/shared_client.go create mode 100644 cdc/kv/shared_client_test.go create mode 100644 cdc/kv/shared_stream.go diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go new file mode 100644 index 00000000000..3f116c47f01 --- /dev/null +++ b/cdc/kv/shared_client.go @@ -0,0 +1,975 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "encoding/binary" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/kv/regionlock" + "github.com/pingcap/tiflow/cdc/kv/sharedconn" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/txnutil" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/util/seahash" + "github.com/pingcap/tiflow/pkg/version" + "github.com/prometheus/client_golang/prometheus" + kvclientv2 "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + // Maximum total sleep time(in ms), 20 seconds. + tikvRequestMaxBackoff = 20000 + + // TiCDC always interacts with region leader, every time something goes wrong, + // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we + // don't need to force reload region anymore. + regionScheduleReload = false + + scanRegionsConcurrency = 1024 + + loadRegionRetryInterval time.Duration = 100 * time.Millisecond + resolveLockMinInterval time.Duration = 10 * time.Second + invalidSubscriptionID SubscriptionID = SubscriptionID(0) +) + +var ( + // To generate an ID for a new subscription. And the subscription ID will also be used as + // `RequestId` in region requests of the table. + subscriptionIDGen atomic.Uint64 + // To generate a streamID in `newStream`. + streamIDGen atomic.Uint64 +) + +var ( + // unreachable error, only used in unit test + errUnreachable = errors.New("kv client unreachable error") + logPanic = log.Panic +) + +var ( + metricFeedNotLeaderCounter = eventFeedErrorCounter.WithLabelValues("NotLeader") + metricFeedEpochNotMatchCounter = eventFeedErrorCounter.WithLabelValues("EpochNotMatch") + metricFeedRegionNotFoundCounter = eventFeedErrorCounter.WithLabelValues("RegionNotFound") + metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") + metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") + metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr") + metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") + metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy") + metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested") +) + +type eventError struct { + err *cdcpb.Error +} + +// Error implement error interface. +func (e *eventError) Error() string { + return e.err.String() +} + +type rpcCtxUnavailableErr struct { + verID tikv.RegionVerID +} + +func (e *rpcCtxUnavailableErr) Error() string { + return fmt.Sprintf("cannot get rpcCtx for region %v. ver:%v, confver:%v", + e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) +} + +type getStoreErr struct{} + +func (e *getStoreErr) Error() string { return "get store error" } + +type sendRequestToStoreErr struct{} + +func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } + +// SubscriptionID comes from `SharedClient.AllocSubscriptionID`. +type SubscriptionID uint64 + +// MultiplexingEvent wrap a region event with +// SubscriptionID to indicate which subscription it belongs to. +type MultiplexingEvent struct { + model.RegionFeedEvent + SubscriptionID SubscriptionID + Start time.Time +} + +// newMultiplexingEvent creates a new MultiplexingEvent. +func newMultiplexingEvent(e model.RegionFeedEvent, table *subscribedTable) MultiplexingEvent { + return MultiplexingEvent{ + RegionFeedEvent: e, + SubscriptionID: table.subscriptionID, + Start: time.Now(), + } +} + +// SharedClient is shared by many tables to pull events from TiKV. +// All exported Methods are thread-safe. +type SharedClient struct { + changefeed model.ChangeFeedID + config *config.ServerConfig + metrics sharedClientMetrics + + clusterID uint64 + filterLoop bool + + pd pd.Client + grpcPool *sharedconn.ConnAndClientPool + regionCache *tikv.RegionCache + pdClock pdutil.Clock + lockResolver txnutil.LockResolver + + totalSpans struct { + sync.RWMutex + v map[SubscriptionID]*subscribedTable + } + + workers []*sharedRegionWorker + // Note: stores is only motified in handleRegion goroutine, + // so it is not protected by a lock. + stores map[string]*requestedStore + + // rangeTaskCh is used to receive range tasks. + // The tasks will be handled in `handleRangeTask` goroutine. + rangeTaskCh *chann.DrainableChann[rangeTask] + // regionCh is used to receive region tasks have been locked in rangeLock. + // The region will be handled in `handleRegions` goroutine. + regionCh *chann.DrainableChann[regionInfo] + // resolveLockTaskCh is used to receive resolve lock tasks. + // The tasks will be handled in `handleResolveLockTasks` goroutine. + resolveLockTaskCh *chann.DrainableChann[resolveLockTask] + errCh *chann.DrainableChann[regionErrorInfo] + + logRegionDetails func(msg string, fields ...zap.Field) +} + +type resolveLockTask struct { + regionID uint64 + targetTs uint64 + state *regionlock.LockedRangeState + create time.Time +} + +// rangeTask represents a task to subscribe a range span of a table. +// It can be a part of a table or a whole table, it also can be a part of a region. +type rangeTask struct { + span tablepb.Span + subscribedTable *subscribedTable +} + +// requestedStore represents a store that has been connected. +// A store may have multiple streams. +type requestedStore struct { + storeID uint64 + storeAddr string + // Use to select a stream to send request. + nextStream atomic.Uint32 + streams []*requestedStream +} + +func (rs *requestedStore) getStream() *requestedStream { + index := rs.nextStream.Add(1) % uint32(len(rs.streams)) + return rs.streams[index] +} + +// subscribedTable represents a table to subscribe. +// It contains the span of the table, the startTs of the table, and the output event channel. +type subscribedTable struct { + subscriptionID SubscriptionID + startTs model.Ts + + // The whole span of the table. + span tablepb.Span + // The range lock of the table, + // it is used to prevent duplicate requests to the same region range, + // and it also used to calculate this table's resolvedTs. + rangeLock *regionlock.RangeLock + // The output event channel of the table. + eventCh chan<- MultiplexingEvent + + // To handle table removing. + stopped atomic.Bool + + // To handle stale lock resolvings. + tryResolveLock func(regionID uint64, state *regionlock.LockedRangeState) + staleLocksTargetTs atomic.Uint64 + + lastAdvanceTime atomic.Int64 +} + +// NewSharedClient creates a client. +func NewSharedClient( + changefeed model.ChangeFeedID, + cfg *config.ServerConfig, + filterLoop bool, + pd pd.Client, + grpcPool *sharedconn.ConnAndClientPool, + regionCache *tikv.RegionCache, + pdClock pdutil.Clock, + lockResolver txnutil.LockResolver, +) *SharedClient { + s := &SharedClient{ + changefeed: changefeed, + config: cfg, + clusterID: 0, + filterLoop: filterLoop, + + pd: pd, + grpcPool: grpcPool, + regionCache: regionCache, + pdClock: pdClock, + lockResolver: lockResolver, + + rangeTaskCh: chann.NewAutoDrainChann[rangeTask](), + regionCh: chann.NewAutoDrainChann[regionInfo](), + resolveLockTaskCh: chann.NewAutoDrainChann[resolveLockTask](), + errCh: chann.NewAutoDrainChann[regionErrorInfo](), + + stores: make(map[string]*requestedStore), + } + s.totalSpans.v = make(map[SubscriptionID]*subscribedTable) + if cfg.Debug.Puller.LogRegionDetails { + s.logRegionDetails = log.Info + } else { + s.logRegionDetails = log.Debug + } + + s.initMetrics() + return s +} + +// AllocSubscriptionID gets an ID can be used in `Subscribe`. +func (s *SharedClient) AllocSubscriptionID() SubscriptionID { + return SubscriptionID(subscriptionIDGen.Add(1)) +} + +// Subscribe the given table span. +// NOTE: `span.TableID` must be set correctly. +// It new a subscribedTable and store it in `s.totalSpans`, +// and send a rangeTask to `s.rangeTaskCh`. +// The rangeTask will be handled in `handleRangeTasks` goroutine. +func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startTs uint64, eventCh chan<- MultiplexingEvent) { + if span.TableID == 0 { + log.Panic("event feed subscribe with zero tablepb.Span.TableID", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + } + + rt := s.newSubscribedTable(subID, span, startTs, eventCh) + s.totalSpans.Lock() + s.totalSpans.v[subID] = rt + s.totalSpans.Unlock() + s.rangeTaskCh.In() <- rangeTask{span: span, subscribedTable: rt} + log.Info("event feed subscribes table success", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.String("span", rt.span.String())) +} + +// Unsubscribe the given table span. All covered regions will be deregistered asynchronously. +// NOTE: `span.TableID` must be set correctly. +func (s *SharedClient) Unsubscribe(subID SubscriptionID) { + // NOTE: `subID` is cleared from `s.totalSpans` in `onTableDrained`. + s.totalSpans.Lock() + rt := s.totalSpans.v[subID] + s.totalSpans.Unlock() + if rt != nil { + s.setTableStopped(rt) + log.Info("event feed unsubscribes table", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.String("span", rt.span.String())) + return + } + log.Warn("event feed unsubscribes table, but not found", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subID)) +} + +// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is +// advanced slowly or stopped, they can try to resolve locks in the given span. +func (s *SharedClient) ResolveLock(subID SubscriptionID, targetTs uint64) { + s.totalSpans.Lock() + rt := s.totalSpans.v[subID] + s.totalSpans.Unlock() + if rt != nil { + rt.resolveStaleLocks(s, targetTs) + } +} + +// RegionCount returns subscribed region count for the span. +func (s *SharedClient) RegionCount(subID SubscriptionID) uint64 { + s.totalSpans.RLock() + defer s.totalSpans.RUnlock() + if rt := s.totalSpans.v[subID]; rt != nil { + return uint64(rt.rangeLock.Len()) + } + return 0 +} + +// Run the client. +func (s *SharedClient) Run(ctx context.Context) error { + s.clusterID = s.pd.GetClusterID(ctx) + + g, ctx := errgroup.WithContext(ctx) + s.workers = make([]*sharedRegionWorker, 0, s.config.KVClient.WorkerConcurrent) + for i := uint(0); i < s.config.KVClient.WorkerConcurrent; i++ { + worker := newSharedRegionWorker(s) + g.Go(func() error { return worker.run(ctx) }) + s.workers = append(s.workers, worker) + } + + g.Go(func() error { return s.handleRangeTasks(ctx) }) + g.Go(func() error { return s.handleRegions(ctx, g) }) + g.Go(func() error { return s.handleErrors(ctx) }) + g.Go(func() error { return s.handleResolveLockTasks(ctx) }) + g.Go(func() error { return s.logSlowRegions(ctx) }) + + log.Info("event feed started", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + defer log.Info("event feed exits", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + return g.Wait() +} + +// Close closes the client. Must be called after `Run` returns. +func (s *SharedClient) Close() { + s.rangeTaskCh.CloseAndDrain() + s.regionCh.CloseAndDrain() + s.resolveLockTaskCh.CloseAndDrain() + s.errCh.CloseAndDrain() + s.clearMetrics() + + for _, rs := range s.stores { + for _, stream := range rs.streams { + stream.requests.CloseAndDrain() + } + } +} + +func (s *SharedClient) setTableStopped(rt *subscribedTable) { + log.Info("event feed starts to stop table", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.Int64("tableID", rt.span.TableID)) + + // Set stopped to true so we can stop handling region events from the table. + // Then send a special singleRegionInfo to regionRouter to deregister the table + // from all TiKV instances. + if rt.stopped.CompareAndSwap(false, true) { + s.regionCh.In() <- regionInfo{subscribedTable: rt} + if rt.rangeLock.Stop() { + s.onTableDrained(rt) + } + } +} + +func (s *SharedClient) onTableDrained(rt *subscribedTable) { + log.Info("event feed stop table is finished", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.Int64("tableID", rt.span.TableID)) + + s.totalSpans.Lock() + defer s.totalSpans.Unlock() + delete(s.totalSpans.v, rt.subscriptionID) +} + +func (s *SharedClient) onRegionFail(errInfo regionErrorInfo) { + s.errCh.In() <- errInfo +} + +// handleRegions receives regionInfo from regionCh and attch rpcCtx to them, +// then send them to corresponding requestedStore. +func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) error { + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case region := <-s.regionCh.Out(): + if region.isStopped() { + for _, rs := range s.stores { + s.broadcastRequest(rs, region) + } + continue + } + + region, ok := s.attachRPCContextForRegion(ctx, region) + // If attachRPCContextForRegion fails, the region will be re-scheduled. + if !ok { + continue + } + + store := s.getStore(ctx, eg, region.rpcCtx.Peer.StoreId, region.rpcCtx.Addr) + stream := store.getStream() + stream.requests.In() <- region + + s.logRegionDetails("event feed will request a region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Any("subscriptionID", region.subscribedTable.subscriptionID), + zap.Uint64("regionID", region.verID.GetID()), + zap.String("span", region.span.String()), + zap.Uint64("storeID", store.storeID), + zap.String("addr", store.storeAddr)) + } + } +} + +func (s *SharedClient) attachRPCContextForRegion(ctx context.Context, region regionInfo) (regionInfo, bool) { + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + rpcCtx, err := s.regionCache.GetTiKVRPCContext(bo, region.verID, kvclientv2.ReplicaReadLeader, 0) + if rpcCtx != nil { + region.rpcCtx = rpcCtx + locateTime := time.Since(region.lockedRangeState.Created).Milliseconds() + s.metrics.regionLocateDuration.Observe(float64(locateTime)) + return region, true + } + if err != nil { + log.Debug("event feed get RPC context fail", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", region.subscribedTable.subscriptionID), + zap.Uint64("regionID", region.verID.GetID()), + zap.Error(err)) + } + s.onRegionFail(newRegionErrorInfo(region, &rpcCtxUnavailableErr{verID: region.verID})) + return region, false +} + +// getStore gets a requestedStore from requestedStores by storeAddr. +func (s *SharedClient) getStore( + ctx context.Context, g *errgroup.Group, + storeID uint64, storeAddr string, +) *requestedStore { + var rs *requestedStore + if rs = s.stores[storeAddr]; rs != nil { + return rs + } + rs = &requestedStore{storeID: storeID, storeAddr: storeAddr} + s.stores[storeAddr] = rs + for i := uint(0); i < s.config.KVClient.GrpcStreamConcurrent; i++ { + stream := newStream(ctx, s, g, rs) + rs.streams = append(rs.streams, stream) + } + + return rs +} + +func (s *SharedClient) createRegionRequest(region regionInfo) *cdcpb.ChangeDataRequest { + return &cdcpb.ChangeDataRequest{ + Header: &cdcpb.Header{ClusterId: s.clusterID, TicdcVersion: version.ReleaseSemver()}, + RegionId: region.verID.GetID(), + RequestId: uint64(region.subscribedTable.subscriptionID), + RegionEpoch: region.rpcCtx.Meta.RegionEpoch, + CheckpointTs: region.resolvedTs(), + StartKey: region.span.StartKey, + EndKey: region.span.EndKey, + ExtraOp: kvrpcpb.ExtraOp_ReadOldValue, + FilterLoop: s.filterLoop, + } +} + +func (s *SharedClient) broadcastRequest(r *requestedStore, region regionInfo) { + for _, stream := range r.streams { + stream.requests.In() <- region + } +} + +func (s *SharedClient) handleRangeTasks(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(scanRegionsConcurrency) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case task := <-s.rangeTaskCh.Out(): + g.Go(func() error { return s.divideSpanAndScheduleRegionRequests(ctx, task.span, task.subscribedTable) }) + } + } +} + +// divideSpanAndScheduleRegionRequests processes the specified span by dividing it into +// manageable regions and schedules requests to subscribe to these regions. +// 1. Load regions from PD. +// 2. Find the intersection of each region.span and the subscribedTable.span. +// 3. Schedule a region request to subscribe the region. +func (s *SharedClient) divideSpanAndScheduleRegionRequests( + ctx context.Context, + span tablepb.Span, + subscribedTable *subscribedTable, +) error { + // Limit the number of regions loaded at a time to make the load more stable. + limit := 1024 + nextSpan := span + backoffBeforeLoad := false + for { + if backoffBeforeLoad { + if err := util.Hang(ctx, loadRegionRetryInterval); err != nil { + return err + } + backoffBeforeLoad = false + } + log.Debug("event feed is going to load regions", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscribedTable.subscriptionID), + zap.Any("span", nextSpan)) + + backoff := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + regions, err := s.regionCache.BatchLoadRegionsWithKeyRange(backoff, nextSpan.StartKey, nextSpan.EndKey, limit) + if err != nil { + log.Warn("event feed load regions failed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscribedTable.subscriptionID), + zap.String("span", nextSpan.String()), + zap.Error(err)) + backoffBeforeLoad = true + continue + } + + regionMetas := make([]*metapb.Region, 0, len(regions)) + for _, region := range regions { + if meta := region.GetMeta(); meta != nil { + regionMetas = append(regionMetas, meta) + } + } + regionMetas = regionlock.CutRegionsLeftCoverSpan(regionMetas, nextSpan) + if len(regionMetas) == 0 { + log.Warn("event feed load regions with holes", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscribedTable.subscriptionID), + zap.String("span", nextSpan.String())) + backoffBeforeLoad = true + continue + } + + for _, regionMeta := range regionMetas { + regionSpan := tablepb.Span{StartKey: regionMeta.StartKey, EndKey: regionMeta.EndKey} + // NOTE: the End key return by the PD API will be nil to represent the biggest key. + // So we need to fix it by calling spanz.HackSpan. + regionSpan = spanz.HackSpan(regionSpan) + + // Find the intersection of the regionSpan returned by PD and the subscribedTable.span. + // The intersection is the span that needs to be subscribed. + intersectantSpan, err := spanz.Intersect(subscribedTable.span, regionSpan) + if err != nil { + log.Panic("event feed check spans intersect shouldn't fail", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscribedTable.subscriptionID), + zap.String("span", nextSpan.String())) + } + + verID := tikv.NewRegionVerID(regionMeta.Id, regionMeta.RegionEpoch.ConfVer, regionMeta.RegionEpoch.Version) + regionInfo := newRegionInfo(verID, intersectantSpan, nil, subscribedTable) + + // Schedule a region request to subscribe the region. + s.scheduleRegionRequest(ctx, regionInfo) + + nextSpan.StartKey = regionMeta.EndKey + // If the nextSpan.StartKey is larger than the subscribedTable.span.EndKey, + // it means all span of the subscribedTable have been requested. So we return. + if spanz.EndCompare(nextSpan.StartKey, span.EndKey) >= 0 { + return nil + } + } + } +} + +// scheduleRegionRequest locks the region's range and send the region to regionCh, +// which will be handled by handleRegions. +func (s *SharedClient) scheduleRegionRequest(ctx context.Context, region regionInfo) { + lockRangeResult := region.subscribedTable.rangeLock.LockRange( + ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer()) + + if lockRangeResult.Status == regionlock.LockRangeStatusWait { + lockRangeResult = lockRangeResult.WaitFn() + } + + switch lockRangeResult.Status { + case regionlock.LockRangeStatusSuccess: + region.lockedRangeState = lockRangeResult.LockedRangeState + lockTime := time.Since(region.lockedRangeState.Created).Milliseconds() + s.metrics.regionLockDuration.Observe(float64(lockTime)) + select { + case s.regionCh.In() <- region: + case <-ctx.Done(): + } + case regionlock.LockRangeStatusStale: + for _, r := range lockRangeResult.RetryRanges { + s.scheduleRangeRequest(ctx, r, region.subscribedTable) + } + default: + return + } +} + +func (s *SharedClient) scheduleRangeRequest( + ctx context.Context, span tablepb.Span, + subscribedTable *subscribedTable, +) { + select { + case s.rangeTaskCh.In() <- rangeTask{span: span, subscribedTable: subscribedTable}: + case <-ctx.Done(): + } +} + +func (s *SharedClient) handleErrors(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case errInfo := <-s.errCh.Out(): + if err := s.doHandleError(ctx, errInfo); err != nil { + return err + } + } + } +} + +func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInfo) error { + if errInfo.subscribedTable.rangeLock.UnlockRange( + errInfo.span.StartKey, errInfo.span.EndKey, + errInfo.verID.GetID(), errInfo.verID.GetVer(), errInfo.resolvedTs()) { + s.onTableDrained(errInfo.subscribedTable) + return nil + } + + err := errors.Cause(errInfo.err) + switch eerr := err.(type) { + case *eventError: + innerErr := eerr.err + s.logRegionDetails("cdc region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Int64("tableID", errInfo.span.TableID), + zap.Stringer("error", innerErr)) + + if notLeader := innerErr.GetNotLeader(); notLeader != nil { + metricFeedNotLeaderCounter.Inc() + s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) + s.scheduleRegionRequest(ctx, errInfo.regionInfo) + return nil + } + if innerErr.GetEpochNotMatch() != nil { + metricFeedEpochNotMatchCounter.Inc() + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil + } + if innerErr.GetRegionNotFound() != nil { + metricFeedRegionNotFoundCounter.Inc() + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil + } + if innerErr.GetCongested() != nil { + metricKvCongestedCounter.Inc() + s.scheduleRegionRequest(ctx, errInfo.regionInfo) + return nil + } + if innerErr.GetServerIsBusy() != nil { + metricKvIsBusyCounter.Inc() + s.scheduleRegionRequest(ctx, errInfo.regionInfo) + return nil + } + if duplicated := innerErr.GetDuplicateRequest(); duplicated != nil { + metricFeedDuplicateRequestCounter.Inc() + // TODO(qupeng): It's better to add a new mechanism to deregister one region. + return errors.New("duplicate request") + } + if compatibility := innerErr.GetCompatibility(); compatibility != nil { + return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility) + } + if mismatch := innerErr.GetClusterIdMismatch(); mismatch != nil { + return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request) + } + + log.Warn("empty or unknown cdc error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Int64("tableID", errInfo.span.TableID), + zap.Stringer("error", innerErr)) + metricFeedUnknownErrorCounter.Inc() + s.scheduleRegionRequest(ctx, errInfo.regionInfo) + return nil + case *rpcCtxUnavailableErr: + metricFeedRPCCtxUnavailable.Inc() + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil + case *getStoreErr: + metricGetStoreErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + // cannot get the store the region belongs to, so we need to reload the region. + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable) + return nil + case *sendRequestToStoreErr: + metricStoreSendRequestErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) + s.scheduleRegionRequest(ctx, errInfo.regionInfo) + return nil + default: + // TODO(qupeng): for some errors it's better to just deregister the region from TiKVs. + log.Warn("event feed meets an internal error, fail the changefeed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Int64("tableID", errInfo.span.TableID), + zap.Error(err)) + return err + } +} + +func (s *SharedClient) handleResolveLockTasks(ctx context.Context) error { + resolveLastRun := make(map[uint64]time.Time) + + gcResolveLastRun := func() { + if len(resolveLastRun) > 1024 { + copied := make(map[uint64]time.Time) + now := time.Now() + for regionID, lastRun := range resolveLastRun { + if now.Sub(lastRun) < resolveLockMinInterval { + resolveLastRun[regionID] = lastRun + } + } + resolveLastRun = copied + } + } + + doResolve := func(regionID uint64, state *regionlock.LockedRangeState, targetTs uint64) { + if state.ResolvedTs.Load() > targetTs || !state.Initialzied.Load() { + return + } + if lastRun, ok := resolveLastRun[regionID]; ok { + if time.Since(lastRun) < resolveLockMinInterval { + return + } + } + start := time.Now() + defer func() { + s.metrics.lockResolveRunDuration.Observe(float64(time.Since(start).Milliseconds())) + }() + + if err := s.lockResolver.Resolve(ctx, regionID, targetTs); err != nil { + log.Warn("event feed resolve lock fail", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Uint64("regionID", regionID), + zap.Error(err)) + } + resolveLastRun[regionID] = time.Now() + } + + gcTicker := time.NewTicker(resolveLockMinInterval * 3 / 2) + defer gcTicker.Stop() + for { + var task resolveLockTask + select { + case <-ctx.Done(): + return ctx.Err() + case <-gcTicker.C: + gcResolveLastRun() + case task = <-s.resolveLockTaskCh.Out(): + s.metrics.lockResolveWaitDuration.Observe(float64(time.Since(task.create).Milliseconds())) + doResolve(task.regionID, task.state, task.targetTs) + } + } +} + +func (s *SharedClient) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + log.Info("event feed starts to check locked regions", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + + currTime := s.pdClock.CurrentTime() + s.totalSpans.RLock() + var slowInitializeRegionCount int + for subscriptionID, rt := range s.totalSpans.v { + attr := rt.rangeLock.IterAll(nil) + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs) + if attr.SlowestRegion.Initialized { + if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { + log.Info("event feed finds a initialized slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + slowInitializeRegionCount += 1 + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), + zap.Any("slowRegion", attr.SlowestRegion)) + } else if currTime.Sub(ckptTime) > 10*time.Minute { + log.Info("event feed finds a uninitialized slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.UnLockedRanges) > 0 { + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), + zap.Any("holes", attr.UnLockedRanges)) + } + } + s.totalSpans.RUnlock() + s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegionCount)) + } +} + +func (s *SharedClient) newSubscribedTable( + subID SubscriptionID, span tablepb.Span, startTs uint64, + eventCh chan<- MultiplexingEvent, +) *subscribedTable { + cfName := s.changefeed.String() + rangeLock := regionlock.NewRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs, cfName) + + rt := &subscribedTable{ + subscriptionID: subID, + span: span, + startTs: startTs, + rangeLock: rangeLock, + eventCh: eventCh, + } + + rt.tryResolveLock = func(regionID uint64, state *regionlock.LockedRangeState) { + targetTs := rt.staleLocksTargetTs.Load() + if state.ResolvedTs.Load() < targetTs && state.Initialzied.Load() { + s.resolveLockTaskCh.In() <- resolveLockTask{ + regionID: regionID, + targetTs: targetTs, + state: state, + create: time.Now(), + } + } + } + return rt +} + +func (r *subscribedTable) resolveStaleLocks(s *SharedClient, targetTs uint64) { + util.MustCompareAndMonotonicIncrease(&r.staleLocksTargetTs, targetTs) + res := r.rangeLock.IterAll(r.tryResolveLock) + s.logRegionDetails("event feed finds slow locked ranges", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", r.subscriptionID), + zap.Any("ranges", res)) +} + +type sharedClientMetrics struct { + regionLockDuration prometheus.Observer + regionLocateDuration prometheus.Observer + regionConnectDuration prometheus.Observer + batchResolvedSize prometheus.Observer + lockResolveWaitDuration prometheus.Observer + lockResolveRunDuration prometheus.Observer + slowInitializeRegion prometheus.Gauge +} + +func (s *SharedClient) initMetrics() { + eventFeedGauge.Inc() + + s.metrics.regionLockDuration = regionConnectDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "lock") + s.metrics.regionLocateDuration = regionConnectDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "locate") + s.metrics.regionConnectDuration = regionConnectDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "connect") + + s.metrics.lockResolveWaitDuration = lockResolveDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "wait") + s.metrics.lockResolveRunDuration = lockResolveDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "run") + + s.metrics.batchResolvedSize = batchResolvedEventSize. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) + + s.metrics.slowInitializeRegion = slowInitializeRegion. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) +} + +func (s *SharedClient) clearMetrics() { + eventFeedGauge.Dec() + + regionConnectDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "lock") + regionConnectDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "locate") + regionConnectDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "connect") + + lockResolveDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "wait") + lockResolveDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "run") + + batchResolvedEventSize.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID) +} + +func hashRegionID(regionID uint64, slots int) int { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, regionID) + return int(seahash.Sum64(b) % uint64(slots)) +} diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go new file mode 100644 index 00000000000..b6db06e5c82 --- /dev/null +++ b/cdc/kv/shared_client_test.go @@ -0,0 +1,407 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "net" + "sync" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/store/mockstore/mockcopr" + "github.com/pingcap/tiflow/cdc/kv/regionlock" + "github.com/pingcap/tiflow/cdc/kv/sharedconn" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/txnutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/testutils" + "github.com/tikv/client-go/v2/tikv" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +func newMockService( + ctx context.Context, + t *testing.T, + srv cdcpb.ChangeDataServer, + wg *sync.WaitGroup, +) (grpcServer *grpc.Server, addr string) { + return newMockServiceSpecificAddr(ctx, t, srv, "127.0.0.1:0", wg) +} + +func newMockServiceSpecificAddr( + ctx context.Context, + t *testing.T, + srv cdcpb.ChangeDataServer, + listenAddr string, + wg *sync.WaitGroup, +) (grpcServer *grpc.Server, addr string) { + lc := &net.ListenConfig{} + lis, err := lc.Listen(ctx, "tcp", listenAddr) + require.Nil(t, err) + addr = lis.Addr().String() + kaep := keepalive.EnforcementPolicy{ + // force minimum ping interval + MinTime: 3 * time.Second, + PermitWithoutStream: true, + } + // Some tests rely on connect timeout and ping test, so we use a smaller num + kasp := keepalive.ServerParameters{ + MaxConnectionIdle: 10 * time.Second, // If a client is idle for 20 seconds, send a GOAWAY + MaxConnectionAge: 10 * time.Second, // If any connection is alive for more than 20 seconds, send a GOAWAY + MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections + Time: 3 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active + Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead + } + grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) + // grpcServer is the server, srv is the service + cdcpb.RegisterChangeDataServer(grpcServer, srv) + wg.Add(1) + go func() { + defer wg.Done() + err := grpcServer.Serve(lis) + require.Nil(t, err) + }() + return +} + +func TestRequestedStreamRequestedRegions(t *testing.T) { + stream := newRequestedStream(100) + + require.Nil(t, stream.getState(1, 2)) + require.Nil(t, stream.takeState(1, 2)) + + stream.setState(1, 2, ®ionFeedState{}) + require.NotNil(t, stream.getState(1, 2)) + require.NotNil(t, stream.takeState(1, 2)) + require.Nil(t, stream.getState(1, 2)) + require.Equal(t, 0, len(stream.requestedRegions.m)) + + stream.setState(1, 2, ®ionFeedState{}) + require.NotNil(t, stream.getState(1, 2)) + require.NotNil(t, stream.takeState(1, 2)) + require.Nil(t, stream.getState(1, 2)) + require.Equal(t, 0, len(stream.requestedRegions.m)) +} + +func TestSubscribedTable(t *testing.T) { + s := &SharedClient{resolveLockTaskCh: chann.NewAutoDrainChann[resolveLockTask]()} + s.logRegionDetails = log.Info + span := tablepb.Span{TableID: 1, StartKey: []byte{'a'}, EndKey: []byte{'z'}} + table := s.newSubscribedTable(SubscriptionID(1), span, 100, nil) + s.totalSpans.v = make(map[SubscriptionID]*subscribedTable) + s.totalSpans.v[SubscriptionID(1)] = table + s.pdClock = pdutil.NewClock4Test() + + // Lock a range, and then ResolveLock will trigger a task for it. + res := table.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 1, 100) + require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) + res.LockedRangeState.Initialzied.Store(true) + + s.ResolveLock(SubscriptionID(1), 200) + select { + case <-s.resolveLockTaskCh.Out(): + case <-time.After(100 * time.Millisecond): + require.True(t, false, "must get a resolve lock task") + } + + // Lock another range, no task will be triggered before initialized. + res = table.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100) + require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) + state := newRegionFeedState(regionInfo{lockedRangeState: res.LockedRangeState, subscribedTable: table}, 1) + select { + case <-s.resolveLockTaskCh.Out(): + require.True(t, false, "shouldn't get a resolve lock task") + case <-time.After(100 * time.Millisecond): + } + + // Task will be triggered after initialized. + state.setInitialized() + state.updateResolvedTs(101) + select { + case <-s.resolveLockTaskCh.Out(): + case <-time.After(100 * time.Millisecond): + require.True(t, false, "must get a resolve lock task") + } + + s.resolveLockTaskCh.CloseAndDrain() +} + +func TestConnectToOfflineOrFailedTiKV(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + events2 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + srv2 := newMockChangeDataServer(events2) + server2, addr2 := newMockService(ctx, t, srv2, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore := "localhost:1" + cluster.AddStore(1, addr1) + cluster.AddStore(2, addr2) + cluster.AddStore(3, invalidStore) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 6) + + client := NewSharedClient( + model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{ + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 1, + GrpcStreamConcurrent: 1, + AdvanceIntervalInMs: 10, + }, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}, + }, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver, + ) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + srv2.wg.Wait() + server1.Stop() + server2.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + // After trying to receive something from the invalid store, + // it should auto switch to other stores and fetch events finally. + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } + + // Stop server1 and the client needs to handle it. + server1.Stop() + + events2 <- mockInitializedEvent(11, uint64(subID)) + ts = oracle.GoTimeToTS(pdClock.CurrentTime()) + events2 <- makeTsEvent(11, ts, uint64(subID)) + // After trying to receive something from a failed store, + // it should auto switch to other stores and fetch events finally. + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + +func TestGetStoreFailed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore1 := "localhost:1" + invalidStore2 := "localhost:2" + cluster.AddStore(1, addr1) + cluster.AddStore(2, invalidStore1) + cluster.AddStore(3, invalidStore2) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4) + + client := NewSharedClient( + model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{ + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 1, + GrpcStreamConcurrent: 1, + AdvanceIntervalInMs: 10, + }, + Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}}, + }, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver, + ) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + server1.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`) + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + select { + case <-eventCh: + require.True(t, false, "should not get event when get store failed") + case <-time.After(5 * time.Second): + } + failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed") + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + +type mockChangeDataServer struct { + ch chan *cdcpb.ChangeDataEvent + wg sync.WaitGroup +} + +func newMockChangeDataServer(ch chan *cdcpb.ChangeDataEvent) *mockChangeDataServer { + return &mockChangeDataServer{ch: ch} +} + +func (m *mockChangeDataServer) EventFeed(s cdcpb.ChangeData_EventFeedServer) error { + closed := make(chan struct{}) + m.wg.Add(1) + go func() { + defer m.wg.Done() + defer close(closed) + for { + if _, err := s.Recv(); err != nil { + return + } + } + }() + m.wg.Add(1) + defer m.wg.Done() + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-closed: + return nil + case <-ticker.C: + } + select { + case event := <-m.ch: + if err := s.Send(event); err != nil { + return err + } + default: + } + } +} + +func (m *mockChangeDataServer) EventFeedV2(s cdcpb.ChangeData_EventFeedV2Server) error { + return m.EventFeed(s) +} diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go new file mode 100644 index 00000000000..419cf4b0d12 --- /dev/null +++ b/cdc/kv/shared_stream.go @@ -0,0 +1,540 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/kv/sharedconn" + "github.com/pingcap/tiflow/pkg/chann" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/version" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + grpcstatus "google.golang.org/grpc/status" +) + +type requestedStream struct { + streamID uint64 + + // To trigger a connect action lazily. + preFetchForConnecting *regionInfo + requests *chann.DrainableChann[regionInfo] + + requestedRegions struct { + sync.RWMutex + // map[SubscriptionID]map[RegionID]*regionFeedState + m map[SubscriptionID]map[uint64]*regionFeedState + } + + logRegionDetails func(msg string, fields ...zap.Field) + + // multiplexing is for sharing one GRPC stream in many tables. + multiplexing *sharedconn.ConnAndClient + + // tableExclusives means one GRPC stream is exclusive by one table. + tableExclusives chan tableExclusive +} + +type tableExclusive struct { + subscriptionID SubscriptionID + cc *sharedconn.ConnAndClient +} + +func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *requestedStore) *requestedStream { + stream := newRequestedStream(streamIDGen.Add(1)) + stream.logRegionDetails = c.logRegionDetails + stream.requests = chann.NewAutoDrainChann[regionInfo]() + + waitForPreFetching := func() error { + if stream.preFetchForConnecting != nil { + log.Panic("preFetchForConnecting should be nil", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr)) + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + case region := <-stream.requests.Out(): + if !region.isStopped() { + stream.preFetchForConnecting = new(regionInfo) + *stream.preFetchForConnecting = region + return nil + } + } + } + } + + g.Go(func() error { + for { + if err := waitForPreFetching(); err != nil { + return err + } + var regionErr error + if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil { + log.Info("event feed check store version fails", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Uint64("storeID", r.storeID), + zap.String("addr", r.storeAddr), + zap.Error(err)) + if errors.Cause(err) == context.Canceled { + return nil + } else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) { + regionErr = &getStoreErr{} + } else { + regionErr = &sendRequestToStoreErr{} + } + } else { + if canceled := stream.run(ctx, c, r); canceled { + return nil + } + regionErr = &sendRequestToStoreErr{} + } + for _, m := range stream.clearStates() { + for _, state := range m { + state.markStopped(regionErr) + sfEvent := newEventItem(nil, state, stream) + slot := hashRegionID(state.region.verID.GetID(), len(c.workers)) + _ = c.workers[slot].sendEvent(ctx, sfEvent) + } + } + // Why we need to re-schedule pending regions? This because the store can + // fail forever, and all regions are scheduled to other stores. + for _, region := range stream.clearPendingRegions() { + if region.isStopped() { + // It means it's a special task for stopping the table. + continue + } + c.onRegionFail(newRegionErrorInfo(region, regionErr)) + } + if err := util.Hang(ctx, time.Second); err != nil { + return err + } + } + }) + + return stream +} + +func newRequestedStream(streamID uint64) *requestedStream { + stream := &requestedStream{streamID: streamID} + stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + return stream +} + +func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) { + isCanceled := func() bool { + select { + case <-ctx.Done(): + return true + default: + return false + } + } + + log.Info("event feed going to create grpc stream", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr)) + + defer func() { + log.Info("event feed grpc stream exits", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Bool("canceled", canceled)) + if s.multiplexing != nil { + s.multiplexing = nil + } else if s.tableExclusives != nil { + close(s.tableExclusives) + s.tableExclusives = nil + } + }() + + // grpc stream can be canceled by this context when any goroutine meet error, + // the underline established grpc connections is unaffected. + g, gctx := errgroup.WithContext(ctx) + cc, err := c.grpcPool.Connect(gctx, rs.storeAddr) + if err != nil { + log.Warn("event feed create grpc stream failed", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Error(err)) + return isCanceled() + } + + if cc.Multiplexing() { + s.multiplexing = cc + g.Go(func() error { return s.receive(gctx, c, rs, s.multiplexing, invalidSubscriptionID) }) + } else { + log.Info("event feed stream multiplexing is not supported, will fallback", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr)) + cc.Release() + + s.tableExclusives = make(chan tableExclusive, 8) + g.Go(func() error { + for { + select { + case <-gctx.Done(): + return gctx.Err() + case tableExclusive := <-s.tableExclusives: + subscriptionID := tableExclusive.subscriptionID + cc := tableExclusive.cc + g.Go(func() error { return s.receive(gctx, c, rs, cc, subscriptionID) }) + } + } + }) + } + g.Go(func() error { return s.send(gctx, c, rs) }) + _ = g.Wait() + return isCanceled() +} + +func (s *requestedStream) receive( + ctx context.Context, + c *SharedClient, + rs *requestedStore, + cc *sharedconn.ConnAndClient, + subscriptionID SubscriptionID, +) error { + client := cc.Client() + for { + cevent, err := client.Recv() + if err != nil { + s.logRegionDetails("event feed receive from grpc stream failed", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.String("code", grpcstatus.Code(err).String()), + zap.Error(err)) + if sharedconn.StatusIsEOF(grpcstatus.Convert(err)) { + return nil + } + return errors.Trace(err) + } + if len(cevent.Events) > 0 { + if err := s.sendRegionChangeEvents(ctx, c, cevent.Events, subscriptionID); err != nil { + return err + } + } + if cevent.ResolvedTs != nil { + c.metrics.batchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions))) + if err := s.sendResolvedTs(ctx, c, cevent.ResolvedTs, subscriptionID); err != nil { + return err + } + } + } +} + +func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) { + fetchMoreReq := func() (regionInfo, error) { + waitReqTicker := time.NewTicker(60 * time.Second) + defer waitReqTicker.Stop() + for { + var region regionInfo + select { + case <-ctx.Done(): + return region, ctx.Err() + case region = <-s.requests.Out(): + return region, nil + case <-waitReqTicker.C: + // The stream is idle now, will be re-established when necessary. + if s.countStates() == 0 { + return region, errors.New("closed as idle") + } + } + } + } + + tableExclusives := make(map[SubscriptionID]*sharedconn.ConnAndClient) + getTableExclusiveConn := func(subscriptionID SubscriptionID) (cc *sharedconn.ConnAndClient, err error) { + if cc = tableExclusives[subscriptionID]; cc == nil { + if cc, err = c.grpcPool.Connect(ctx, rs.storeAddr); err != nil { + return + } + if cc.Multiplexing() { + cc.Release() + cc, err = nil, errors.New("multiplexing is enabled, will re-establish the stream") + return + } + tableExclusives[subscriptionID] = cc + select { + case <-ctx.Done(): + case s.tableExclusives <- tableExclusive{subscriptionID, cc}: + } + } + return + } + defer func() { + if s.multiplexing != nil { + s.multiplexing.Release() + } + for _, cc := range tableExclusives { + cc.Release() + } + }() + + region := *s.preFetchForConnecting + s.preFetchForConnecting = nil + for { + subscriptionID := region.subscribedTable.subscriptionID + // It means it's a special task for stopping the table. + if region.isStopped() { + if s.multiplexing != nil { + req := &cdcpb.ChangeDataRequest{ + RequestId: uint64(subscriptionID), + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, + } + if err = s.multiplexing.Client().Send(req); err != nil { + log.Warn("event feed send deregister request to grpc stream failed", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", region.span.TableID), + zap.Uint64("regionID", req.RegionId), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Error(err)) + } + } else if cc := tableExclusives[subscriptionID]; cc != nil { + delete(tableExclusives, subscriptionID) + cc.Release() + } + // NOTE: some principles to help understand deregistering a table: + // 1. after a Deregister(requestID) message is sent out, no more region requests + // with the same requestID will be sent out in the same GRPC stream; + // 2. so it's OK to clear all pending states in the GRPC stream; + // 3. is it possible that TiKV is keeping to send events belong to a removed state? + // I guess no because internal errors will cause the changefeed or table stopped, + // and then those regions from the bad requestID will be unsubscribed finally. + for _, state := range s.takeStates(subscriptionID) { + state.markStopped(&sendRequestToStoreErr{}) + sfEvent := newEventItem(nil, state, s) + slot := hashRegionID(state.region.verID.GetID(), len(c.workers)) + if err = c.workers[slot].sendEvent(ctx, sfEvent); err != nil { + return errors.Trace(err) + } + } + } else if region.subscribedTable.stopped.Load() { + // It can be skipped directly because there must be no pending states from + // the stopped subscribedTable, or the special singleRegionInfo for stopping + // the table will be handled later. + c.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + } else { + connectTime := time.Since(region.lockedRangeState.Created).Milliseconds() + c.metrics.regionConnectDuration.Observe(float64(connectTime)) + + state := newRegionFeedState(region, uint64(subscriptionID)) + state.start() + s.setState(subscriptionID, region.verID.GetID(), state) + + var cc *sharedconn.ConnAndClient + if s.multiplexing != nil { + cc = s.multiplexing + } else if cc, err = getTableExclusiveConn(subscriptionID); err != nil { + return err + } + if err = cc.Client().Send(c.createRegionRequest(region)); err != nil { + log.Warn("event feed send request to grpc stream failed", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Uint64("regionID", region.verID.GetID()), + zap.Int64("tableID", region.span.TableID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Error(err)) + } + } + + if region, err = fetchMoreReq(); err != nil { + return err + } + } +} + +func (s *requestedStream) countStates() (sum int) { + s.requestedRegions.Lock() + defer s.requestedRegions.Unlock() + for _, mm := range s.requestedRegions.m { + sum += len(mm) + } + return +} + +func (s *requestedStream) setState(subscriptionID SubscriptionID, regionID uint64, state *regionFeedState) { + s.requestedRegions.Lock() + defer s.requestedRegions.Unlock() + var m map[uint64]*regionFeedState + if m = s.requestedRegions.m[subscriptionID]; m == nil { + m = make(map[uint64]*regionFeedState) + s.requestedRegions.m[subscriptionID] = m + } + m[regionID] = state +} + +func (s *requestedStream) getState(subscriptionID SubscriptionID, regionID uint64) (state *regionFeedState) { + s.requestedRegions.RLock() + defer s.requestedRegions.RUnlock() + if m, ok := s.requestedRegions.m[subscriptionID]; ok { + state = m[regionID] + } + return state +} + +func (s *requestedStream) takeState(subscriptionID SubscriptionID, regionID uint64) (state *regionFeedState) { + s.requestedRegions.Lock() + defer s.requestedRegions.Unlock() + if m, ok := s.requestedRegions.m[subscriptionID]; ok { + state = m[regionID] + delete(m, regionID) + if len(m) == 0 { + delete(s.requestedRegions.m, subscriptionID) + } + } + return +} + +func (s *requestedStream) takeStates(subscriptionID SubscriptionID) (v map[uint64]*regionFeedState) { + s.requestedRegions.Lock() + defer s.requestedRegions.Unlock() + v = s.requestedRegions.m[subscriptionID] + delete(s.requestedRegions.m, subscriptionID) + return +} + +func (s *requestedStream) clearStates() (v map[SubscriptionID]map[uint64]*regionFeedState) { + s.requestedRegions.Lock() + defer s.requestedRegions.Unlock() + v = s.requestedRegions.m + s.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + return +} + +func (s *requestedStream) clearPendingRegions() []regionInfo { + regions := make([]regionInfo, 0, s.requests.Len()+1) + if s.preFetchForConnecting != nil { + region := *s.preFetchForConnecting + s.preFetchForConnecting = nil + regions = append(regions, region) + } + for i := 1; i < cap(regions); i++ { + regions = append(regions, <-s.requests.Out()) + } + return regions +} + +func (s *requestedStream) sendRegionChangeEvents( + ctx context.Context, c *SharedClient, events []*cdcpb.Event, + tableSubID SubscriptionID, +) error { + for _, event := range events { + regionID := event.RegionId + var subscriptionID SubscriptionID + if tableSubID == invalidSubscriptionID { + subscriptionID = SubscriptionID(event.RequestId) + } else { + subscriptionID = tableSubID + } + + state := s.getState(subscriptionID, regionID) + switch x := event.Event.(type) { + case *cdcpb.Event_Error: + fields := []zap.Field{ + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Uint64("regionID", event.RegionId), + zap.Bool("stateIsNil", state == nil), + zap.Any("error", x.Error), + } + if state != nil { + fields = append(fields, zap.Int64("tableID", state.region.span.TableID)) + } + s.logRegionDetails("event feed receives a region error", fields...) + } + + if state != nil { + sfEvent := newEventItem(event, state, s) + slot := hashRegionID(regionID, len(c.workers)) + if err := c.workers[slot].sendEvent(ctx, sfEvent); err != nil { + return errors.Trace(err) + } + } + } + return nil +} + +func (s *requestedStream) sendResolvedTs( + ctx context.Context, c *SharedClient, resolvedTs *cdcpb.ResolvedTs, + tableSubID SubscriptionID, +) error { + var subscriptionID SubscriptionID + if tableSubID == invalidSubscriptionID { + subscriptionID = SubscriptionID(resolvedTs.RequestId) + } else { + subscriptionID = tableSubID + } + sfEvents := make([]statefulEvent, len(c.workers)) + for _, regionID := range resolvedTs.Regions { + slot := hashRegionID(regionID, len(c.workers)) + if sfEvents[slot].stream == nil { + sfEvents[slot] = newResolvedTsBatch(resolvedTs.Ts, s) + } + x := &sfEvents[slot].resolvedTsBatch + if state := s.getState(subscriptionID, regionID); state != nil { + x.regions = append(x.regions, state) + } + } + + for i, sfEvent := range sfEvents { + if len(sfEvent.resolvedTsBatch.regions) > 0 { + sfEvent.stream = s + if err := c.workers[i].sendEvent(ctx, sfEvent); err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/version/check.go b/pkg/version/check.go index 2be6566f2a3..a313a1d2d5c 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/util/engine" @@ -196,6 +197,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre // CheckStoreVersion checks whether the given TiKV is compatible with this CDC. // If storeID is 0, it checks all TiKV. func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error { + failpoint.Inject("GetStoreFailed", func() { + failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID))) + }) var stores []*metapb.Store var err error if storeID == 0 {