From 79f45dcb90523e75539e5f268e1ca477d5780a21 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Thu, 13 Feb 2025 11:38:46 +0800 Subject: [PATCH] fix: Remove dispatcher tasks before cleaning up targets Related to #39840 The target could be updated async in previous code. This PR make remove collection from target observer block until all tasks related in dispatchers are removed preventing the metrics being updated after collection released. Signed-off-by: Congqi Xia --- internal/querycoordv2/observers/target_observer.go | 3 +++ internal/querycoordv2/observers/task_dispatcher.go | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 5844577cb743f..d2d9870aa9129 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -219,6 +219,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) { delete(ob.readyNotifiers, req.CollectionID) ob.mut.Unlock() + ob.loadedDispatcher.RemoveTask(req.CollectionID) + ob.loadingDispatcher.RemoveTask(req.CollectionID) + ob.targetMgr.RemoveCollection(ctx, req.CollectionID) req.Notifier <- nil case ReleasePartition: diff --git a/internal/querycoordv2/observers/task_dispatcher.go b/internal/querycoordv2/observers/task_dispatcher.go index 29ede76b1bb28..4848aabd8716e 100644 --- a/internal/querycoordv2/observers/task_dispatcher.go +++ b/internal/querycoordv2/observers/task_dispatcher.go @@ -33,6 +33,7 @@ type taskDispatcher[K comparable] struct { tasks *typeutil.ConcurrentMap[K, bool] pool *conc.Pool[any] notifyCh chan struct{} + removeCh chan K taskRunner task[K] wg sync.WaitGroup cancel context.CancelFunc @@ -46,6 +47,7 @@ func newTaskDispatcher[K comparable](runner task[K]) *taskDispatcher[K] { tasks: typeutil.NewConcurrentMap[K, bool](), pool: conc.NewPool[any](paramtable.Get().QueryCoordCfg.ObserverTaskParallel.GetAsInt()), notifyCh: make(chan struct{}, 1), + removeCh: make(chan K), taskRunner: runner, } } @@ -81,6 +83,10 @@ func (d *taskDispatcher[K]) AddTask(keys ...K) { } } +func (d *taskDispatcher[K]) RemoveTask(key K) { + d.removeCh <- key +} + func (d *taskDispatcher[K]) notify() { select { case d.notifyCh <- struct{}{}: @@ -105,6 +111,8 @@ func (d *taskDispatcher[K]) schedule(ctx context.Context) { } return true }) + case key := <-d.removeCh: + d.tasks.Remove(key) } } }