Skip to content

Commit

Permalink
puller: close kvclient correctly when stopping a processor (#11957)
Browse files Browse the repository at this point in the history
close #11954
  • Loading branch information
hicqu authored Jan 6, 2025
1 parent 686f8ea commit 80f49c6
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
6 changes: 5 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,4 +1084,8 @@ func (d *ddlHandler) Run(ctx context.Context, _ ...chan<- error) error {

func (d *ddlHandler) WaitForReady(_ context.Context) {}

func (d *ddlHandler) Close() {}
func (d *ddlHandler) Close() {
if d.puller != nil {
d.puller.Close()
}
}
6 changes: 4 additions & 2 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ func (m *SourceManager) Close() {
zap.String("changefeed", m.changefeedID.ID))

start := time.Now()

log.Info("All pullers have been closed",
if m.puller != nil {
m.puller.Close()
}
log.Info("SourceManager puller have been closed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
Expand Down
19 changes: 16 additions & 3 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ func (p *ddlJobPullerImpl) Run(ctx context.Context, _ ...chan<- error) error {
func (p *ddlJobPullerImpl) WaitForReady(_ context.Context) {}

// Close implements util.Runnable.
func (p *ddlJobPullerImpl) Close() {}
func (p *ddlJobPullerImpl) Close() {
if p.mp != nil {
p.mp.Close()
}
}

// Output implements DDLJobPuller, it returns the output channel of DDL job.
func (p *ddlJobPullerImpl) Output() <-chan *model.DDLJobEntry {
Expand Down Expand Up @@ -737,6 +741,12 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error {
zap.String("changefeed", h.changefeedID.ID),
zap.Uint64("resolvedTS", atomic.LoadUint64(&h.resolvedTS)))

defer func() {
log.Info("DDL puller stopped",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID))
}()

return g.Wait()
}

Expand All @@ -754,10 +764,13 @@ func (h *ddlPullerImpl) PopFrontDDL() (uint64, *timodel.Job) {

// Close the ddl puller, release all resources.
func (h *ddlPullerImpl) Close() {
log.Info("close the ddl puller",
h.cancel()
if h.ddlJobPuller != nil {
h.ddlJobPuller.Close()
}
log.Info("DDL puller closed",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID))
h.cancel()
}

func (h *ddlPullerImpl) ResolvedTs() uint64 {
Expand Down
10 changes: 10 additions & 0 deletions cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,16 @@ func (p *MultiplexingPuller) run(ctx context.Context, includeClient bool) error
return eg.Wait()
}

// Close closes the puller.
func (p *MultiplexingPuller) Close() {
if p.client != nil {
p.client.Close()
}
log.Info("MultiplexingPuller is closed",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID))
}

// runEventHandler consumes events from inputCh:
// 1. If the event is a kv event, consume by calling progress.consume.f.
// 2. If the event is a resolved event, send it to the resolvedEventsCache of the corresponding progress.
Expand Down

0 comments on commit 80f49c6

Please sign in to comment.