Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coordinator: send checkpointTs message when changefeed state is finished #1039

Merged
merged 4 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 40 additions & 42 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,25 @@ type Controller struct {
taskHandlers []*threadpool.TaskHandle
messageCenter messaging.MessageCenter

changefeedProgressReportCh chan map[common.ChangeFeedID]*changefeed.Changefeed
changefeedStateChangedCh chan *ChangefeedStateChangeEvent
changefeedChangeCh chan []*ChangefeedChange

lastPrintStatusTime time.Time

apiLock sync.RWMutex
}

type ChangefeedStateChangeEvent struct {
ChangefeedID common.ChangeFeedID
State model.FeedState
type ChangefeedChange struct {
changefeedID common.ChangeFeedID
changefeed *changefeed.Changefeed
state model.FeedState
changeType ChangeType
err *model.RunningError
}

func NewController(
version int64,
selfNode *node.Info,
updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed,
stateChangedCh chan *ChangefeedStateChangeEvent,
changefeedChangeCh chan []*ChangefeedChange,
backend changefeed.Backend,
eventCh *chann.DrainableChann[*Event],
taskScheduler threadpool.ThreadPool,
Expand Down Expand Up @@ -128,17 +128,16 @@ func NewController(
oc.NewMoveMaintainerOperator,
),
}),
eventCh: eventCh,
operatorController: oc,
messageCenter: mc,
changefeedDB: changefeedDB,
nodeManager: nodeManager,
taskScheduler: taskScheduler,
backend: backend,
changefeedProgressReportCh: updatedChangefeedCh,
changefeedStateChangedCh: stateChangedCh,
lastPrintStatusTime: time.Now(),
pdClient: pdClient,
eventCh: eventCh,
operatorController: oc,
messageCenter: mc,
changefeedDB: changefeedDB,
nodeManager: nodeManager,
taskScheduler: taskScheduler,
backend: backend,
changefeedChangeCh: changefeedChangeCh,
lastPrintStatusTime: time.Now(),
pdClient: pdClient,
}
c.nodeChanged.changed = false

Expand Down Expand Up @@ -319,19 +318,18 @@ func (c *Controller) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Coordin

// handleMaintainerStatus handle the status report from the maintainers
func (c *Controller) handleMaintainerStatus(from node.ID, statusList []*heartbeatpb.MaintainerStatus) {
changedCfs := make(map[common.ChangeFeedID]*changefeed.Changefeed, len(statusList))

changes := make([]*ChangefeedChange, 0, len(statusList))
for _, status := range statusList {
cfID := common.NewChangefeedIDFromPB(status.ChangefeedID)
cf := c.handleSingleMaintainerStatus(from, status, cfID)
if cf != nil {
changedCfs[cfID] = cf
change := c.handleSingleMaintainerStatus(from, status, cfID)
if change != nil {
changes = append(changes, change)
}
}

// Try to send updated changefeeds without blocking
select {
case c.changefeedProgressReportCh <- changedCfs:
case c.changefeedChangeCh <- changes:
default:
}
}
Expand All @@ -340,7 +338,7 @@ func (c *Controller) handleSingleMaintainerStatus(
from node.ID,
status *heartbeatpb.MaintainerStatus,
cfID common.ChangeFeedID,
) *changefeed.Changefeed {
) *ChangefeedChange {
// Update the operator status first
c.operatorController.UpdateOperatorStatus(cfID, from, status)

Expand All @@ -354,8 +352,8 @@ func (c *Controller) handleSingleMaintainerStatus(
return nil
}

c.updateChangefeedStatus(cf, cfID, status)
return cf
change := c.updateChangefeedStatus(cf, cfID, status)
return change
}

func (c *Controller) handleNonExistentChangefeed(
Expand Down Expand Up @@ -403,31 +401,31 @@ func (c *Controller) updateChangefeedStatus(
cf *changefeed.Changefeed,
cfID common.ChangeFeedID,
status *heartbeatpb.MaintainerStatus,
) {
) *ChangefeedChange {
changed, state, err := cf.UpdateStatus(status)
change := &ChangefeedChange{
changefeedID: cfID,
changefeed: cf,
state: state,
changeType: ChangeStateAndTs,
}
if !changed {
return
change.changeType = ChangeTs
return change
}

log.Info("changefeed status changed",
zap.Stringer("changefeed", cfID),
zap.String("state", string(state)),
zap.Stringer("error", err))

var mErr *model.RunningError
if err != nil {
mErr = &model.RunningError{
change.err = &model.RunningError{
Time: time.Now(),
Addr: err.Node,
Code: err.Code,
Message: err.Message,
}
}
c.changefeedStateChangedCh <- &ChangefeedStateChangeEvent{
ChangefeedID: cfID,
State: state,
err: mErr,
}
log.Info("changefeed status changed",
zap.Stringer("changefeed", cfID),
zap.String("state", string(change.state)),
zap.Stringer("error", err))
return change
}

// FinishBootstrap is called when all nodes have sent bootstrap response
Expand Down
106 changes: 55 additions & 51 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,8 @@ type coordinator struct {
// eventCh is used to receive the event from message center, basically these messages
// are from maintainer.
eventCh *chann.DrainableChann[*Event]
// changefeedProgressReportCh is used to receive the changefeed progress report from the controller
changefeedProgressReportCh chan map[common.ChangeFeedID]*changefeed.Changefeed
// changefeedStateChangedCh is used to receive the changefeed state changed event from the controller
changefeedStateChangedCh chan *ChangefeedStateChangeEvent
// changefeedChangeCh is used to receive the changefeed change from the controller
changefeedChangeCh chan []*ChangefeedChange

cancel func()
closed atomic.Bool
Expand All @@ -109,18 +107,17 @@ func New(node *node.Info,
) server.Coordinator {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
c := &coordinator{
version: version,
nodeInfo: node,
gcServiceID: gcServiceID,
lastTickTime: time.Now(),
gcManager: gc.NewManager(gcServiceID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
mc: mc,
changefeedProgressReportCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024),
changefeedStateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024),
backend: backend,
version: version,
nodeInfo: node,
gcServiceID: gcServiceID,
lastTickTime: time.Now(),
gcManager: gc.NewManager(gcServiceID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
mc: mc,
changefeedChangeCh: make(chan []*ChangefeedChange, 1024),
backend: backend,
}
// handle messages from message center
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)
Expand All @@ -131,8 +128,7 @@ func New(node *node.Info,
controller := NewController(
c.version,
c.nodeInfo,
c.changefeedProgressReportCh,
c.changefeedStateChangedCh,
c.changefeedChangeCh,
c.backend,
c.eventCh,
c.taskScheduler,
Expand Down Expand Up @@ -210,13 +206,16 @@ func (c *coordinator) run(ctx context.Context) error {
now := time.Now()
metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second))
c.lastTickTime = now
case cfs := <-c.changefeedProgressReportCh:
if err := c.saveCheckpointTs(ctx, cfs); err != nil {
case changes := <-c.changefeedChangeCh:
if err := c.saveCheckpointTs(ctx, changes); err != nil {
return errors.Trace(err)
}
case event := <-c.changefeedStateChangedCh:
if err := c.handleStateChangedEvent(ctx, event); err != nil {
return errors.Trace(err)
for _, change := range changes {
if change.changeType == ChangeState || change.changeType == ChangeStateAndTs {
if err := c.handleStateChange(ctx, change); err != nil {
return errors.Trace(err)
}
}
}
}
}
Expand All @@ -234,23 +233,23 @@ func (c *coordinator) runHandleEvent(ctx context.Context) error {
}
}

func (c *coordinator) handleStateChangedEvent(
func (c *coordinator) handleStateChange(
ctx context.Context,
event *ChangefeedStateChangeEvent,
event *ChangefeedChange,
) error {
cf := c.controller.getChangefeed(event.ChangefeedID)
cf := c.controller.getChangefeed(event.changefeedID)
if cf == nil {
log.Warn("changefeed not found", zap.String("changefeed", event.ChangefeedID.String()))
log.Warn("changefeed not found", zap.String("changefeed", event.changefeedID.String()))
return nil
}
cfInfo, err := cf.GetInfo().Clone()
if err != nil {
return errors.Trace(err)
}
cfInfo.State = event.State
cfInfo.State = event.state
cfInfo.Error = event.err
progress := config.ProgressNone
if event.State == model.StateFailed || event.State == model.StateFinished {
if event.state == model.StateFailed || event.state == model.StateFinished {
progress = config.ProgressStopping
}
if err := c.backend.UpdateChangefeed(context.Background(), cfInfo, cf.GetStatus().CheckpointTs, progress); err != nil {
Expand All @@ -260,24 +259,24 @@ func (c *coordinator) handleStateChangedEvent(
}
cf.SetInfo(cfInfo)

switch event.State {
switch event.state {
case model.StateWarning:
c.controller.operatorController.StopChangefeed(ctx, event.ChangefeedID, false)
c.controller.updateChangefeedEpoch(ctx, event.ChangefeedID)
c.controller.moveChangefeedToSchedulingQueue(event.ChangefeedID, false, false)
c.controller.operatorController.StopChangefeed(ctx, event.changefeedID, false)
c.controller.updateChangefeedEpoch(ctx, event.changefeedID)
c.controller.moveChangefeedToSchedulingQueue(event.changefeedID, false, false)
case model.StateFailed, model.StateFinished:
c.controller.operatorController.StopChangefeed(ctx, event.ChangefeedID, false)
c.controller.operatorController.StopChangefeed(ctx, event.changefeedID, false)
case model.StateNormal:
log.Info("changefeed is resumed or created successfully, try to delete its safeguard gc safepoint",
zap.String("changefeed", event.ChangefeedID.String()))
zap.String("changefeed", event.changefeedID.String()))
// We need to clean its gc safepoint when changefeed is resumed or created
gcServiceID := c.getEnsureGCServiceID(gc.EnsureGCServiceCreating)
err := gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID)
err := gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.changefeedID)
if err != nil {
log.Warn("failed to delete create changefeed gc safepoint", zap.Error(err))
}
gcServiceID = c.getEnsureGCServiceID(gc.EnsureGCServiceResuming)
err = gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID)
err = gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.changefeedID)
if err != nil {
log.Warn("failed to delete resume changefeed gc safepoint", zap.Error(err))
}
Expand All @@ -297,31 +296,39 @@ func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.Chan
if !errors.IsChangefeedGCFastFailErrorCode(errCode) {
state = model.StateWarning
}
change := &ChangefeedChange{
changefeedID: id,
state: state,
err: &model.RunningError{
Code: string(errCode),
Message: err.Error(),
},
changeType: ChangeState,
}
select {
case <-ctx.Done():
log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+
"there may be a lot of state need to be handled. Try next time",
zap.String("changefeed", id.String()),
zap.Error(ctx.Err()))
return
case c.changefeedStateChangedCh <- &ChangefeedStateChangeEvent{
ChangefeedID: id,
State: state,
err: &model.RunningError{
Code: string(errCode),
Message: err.Error(),
},
}:
case c.changefeedChangeCh <- []*ChangefeedChange{change}:
}
}
}

func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.ChangeFeedID]*changefeed.Changefeed) error {
func (c *coordinator) saveCheckpointTs(ctx context.Context, changes []*ChangefeedChange) error {
statusMap := make(map[common.ChangeFeedID]uint64)
for _, upCf := range cfs {
cfsMap := make(map[common.ChangeFeedID]*changefeed.Changefeed)
for _, change := range changes {
if change.changeType == ChangeState {
continue
}
upCf := change.changefeed
reportedCheckpointTs := upCf.GetStatus().CheckpointTs
if upCf.GetLastSavedCheckPointTs() < reportedCheckpointTs {
statusMap[upCf.ID] = reportedCheckpointTs
cfsMap[upCf.ID] = upCf
c.checkStaleCheckpointTs(ctx, upCf.ID, reportedCheckpointTs)
}
}
Expand All @@ -335,10 +342,7 @@ func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.Chang
}
// update the last saved checkpoint ts and send checkpointTs to maintainer
for id, cp := range statusMap {
cf, ok := cfs[id]
if !ok {
continue
}
cf := cfsMap[id]
cf.SetLastSavedCheckPointTs(cp)
if cf.IsMQSink() {
msg := cf.NewCheckpointTsMessage(cf.GetLastSavedCheckPointTs())
Expand Down
11 changes: 11 additions & 0 deletions coordinator/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ import (
"github.com/pingcap/ticdc/pkg/messaging"
)

type ChangeType int

const (
// ChangeStateAndTs indicate changefeed state and checkpointTs need update
ChangeStateAndTs ChangeType = iota
// ChangeTs indicate changefeed checkpointTs needs update
ChangeTs
// ChangeState indicate changefeed state needs update
ChangeState
)

const (
// EventMessage is triggered when a grpc message received
EventMessage = iota
Expand Down
Loading