Skip to content

Commit

Permalink
fix: Remove dispatcher tasks before cleaning up targets
Browse files Browse the repository at this point in the history
Related to milvus-io#39840

The target could be updated async in previous code. This PR make remove
collection from target observer block until all tasks related in
dispatchers are removed preventing the metrics being updated after
collection released.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Feb 13, 2025
1 parent cb1bf6d commit 79f45dc
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
3 changes: 3 additions & 0 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
delete(ob.readyNotifiers, req.CollectionID)
ob.mut.Unlock()

ob.loadedDispatcher.RemoveTask(req.CollectionID)
ob.loadingDispatcher.RemoveTask(req.CollectionID)

ob.targetMgr.RemoveCollection(ctx, req.CollectionID)
req.Notifier <- nil
case ReleasePartition:
Expand Down
8 changes: 8 additions & 0 deletions internal/querycoordv2/observers/task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type taskDispatcher[K comparable] struct {
tasks *typeutil.ConcurrentMap[K, bool]
pool *conc.Pool[any]
notifyCh chan struct{}
removeCh chan K
taskRunner task[K]
wg sync.WaitGroup
cancel context.CancelFunc
Expand All @@ -46,6 +47,7 @@ func newTaskDispatcher[K comparable](runner task[K]) *taskDispatcher[K] {
tasks: typeutil.NewConcurrentMap[K, bool](),
pool: conc.NewPool[any](paramtable.Get().QueryCoordCfg.ObserverTaskParallel.GetAsInt()),
notifyCh: make(chan struct{}, 1),
removeCh: make(chan K),
taskRunner: runner,
}
}
Expand Down Expand Up @@ -81,6 +83,10 @@ func (d *taskDispatcher[K]) AddTask(keys ...K) {
}
}

func (d *taskDispatcher[K]) RemoveTask(key K) {
d.removeCh <- key
}

func (d *taskDispatcher[K]) notify() {
select {
case d.notifyCh <- struct{}{}:
Expand All @@ -105,6 +111,8 @@ func (d *taskDispatcher[K]) schedule(ctx context.Context) {
}
return true
})
case key := <-d.removeCh:
d.tasks.Remove(key)
}
}
}

0 comments on commit 79f45dc

Please sign in to comment.