diff --git a/core/environment/environment.go b/core/environment/environment.go index d3290b88..1362bc99 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -706,6 +706,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig // respected. callErrors = pendingCalls.AwaitAll() + delete(env.callsPendingAwait[trigger], weight) } } diff --git a/core/environment/hooks_test.go b/core/environment/hooks_test.go index 5fab31e8..b4d70131 100644 --- a/core/environment/hooks_test.go +++ b/core/environment/hooks_test.go @@ -6,6 +6,7 @@ import ( "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/workflow" + "github.com/AliceO2Group/Control/core/workflow/callable" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -459,4 +460,35 @@ var _ = Describe("calling hooks on FSM events", func() { Expect(ok).To(BeTrue()) Expect(v).To(Equal("root.call1,root.call2,root.call3")) }) + + It("should allow to cancel hooks in case that await trigger never happens", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call1", // this call should return immediately and should not be accessible later + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, + "testplugin.Test()", + ""), + workflow.NewCallRole( + "call2", // this call should not return, but should be cancelled later + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", true)) + Expect(err).To(HaveOccurred()) + + callMapForAwait := env.callsPendingAwait + Expect(callMapForAwait).To(HaveKey("after_CONFIGURE")) + callsForWeight := callMapForAwait["after_CONFIGURE"] + Expect(callsForWeight).To(HaveKey(callable.HookWeight(0))) + calls := callsForWeight[0] + Expect(calls).To(HaveLen(1)) + Expect(calls[0]).NotTo(BeNil()) + // the first cancel attempt should return "true" to say it was successful + Expect(calls[0].Cancel()).To(BeTrue()) + // the subsequent cancel attempts should return "false", because the call was already cancelled + Expect(calls[0].Cancel()).To(BeFalse()) + }) }) diff --git a/core/environment/manager.go b/core/environment/manager.go index fa537e46..7d3fc7b3 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -811,6 +811,8 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error } } + envs.cancelCallsPendingAwait(env) + // we remake the pending teardown channel too, because each completed TasksReleasedEvent // automatically closes it pendingCh = make(chan *event.TasksReleasedEvent) @@ -880,6 +882,22 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error return err } +func (envs *Manager) cancelCallsPendingAwait(env *Environment) { + // unblock all calls which are stuck waiting for an await trigger which never happened + if env == nil { + return + } + for _, callMapForAwait := range env.callsPendingAwait { + for _, callsForWeight := range callMapForAwait { + for _, call := range callsForWeight { + if call != nil { + call.Cancel() + } + } + } + } +} + /*func (envs *Manager) Configuration(environmentId uuid.UUID) EnvironmentCfg { envs.mu.RLock() defer envs.mu.RUnlock() diff --git a/core/workflow/callable/call.go b/core/workflow/callable/call.go index 35971cca..7b1282da 100644 --- a/core/workflow/callable/call.go +++ b/core/workflow/callable/call.go @@ -25,6 +25,7 @@ package callable import ( + "context" "errors" "fmt" "strconv" @@ -54,7 +55,8 @@ type Call struct { Traits task.Traits parentRole ParentRole - await chan error + await chan error + awaitCancel context.CancelFunc } type Calls []*Call @@ -220,11 +222,17 @@ func (c *Call) Call() error { func (c *Call) Start() { c.await = make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + c.awaitCancel = cancel go func() { callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName()) log.Debugf("%s started", callId) defer utils.TimeTrack(time.Now(), callId, log.WithPrefix("callable")) - c.await <- c.Call() + select { + case c.await <- c.Call(): + case <-ctx.Done(): + log.Debugf("%s cancelled", callId) + } close(c.await) }() } @@ -234,6 +242,15 @@ func (c *Call) Await() error { return <-c.await } +func (c *Call) Cancel() bool { + if c.awaitCancel != nil { + c.awaitCancel() + c.awaitCancel = nil + return true + } + return false +} + func (c *Call) GetParentRole() interface{} { return c.parentRole }