From b67839652e1ffa4cf0142c6752767a57fba93aab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A1=95?= Date: Tue, 12 Sep 2023 17:16:43 +0800 Subject: [PATCH] AddAfter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 刘硕 --- ...advancedcronjob_broadcastjob_controller.go | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go b/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go index f615c0912c..4e513d77fb 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go @@ -20,6 +20,11 @@ import ( "sort" "time" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/robfig/cron/v3" corev1 "k8s.io/api/core/v1" @@ -40,7 +45,9 @@ func watchBroadcastJob(c controller.Controller) error { }); err != nil { return err } - + if err := c.Watch(&source.Kind{Type: &appsv1alpha1.BroadcastJob{}}, &DelayEnqueueRequest{}); err != nil { + return err + } return nil } @@ -383,3 +390,29 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re // we'll requeue once we see the running job, and update our status return scheduledResult, nil } + +type DelayEnqueueRequest struct { +} + +func (d DelayEnqueueRequest) Create(event event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { + if v, ok := event.Object.(*appsv1alpha1.BroadcastJob); ok { + if v.Spec.CompletionPolicy.Type == appsv1alpha1.Always && v.Spec.CompletionPolicy.ActiveDeadlineSeconds != nil { + limitingInterface.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: event.Object.GetName(), + Namespace: event.Object.GetNamespace(), + }}, time.Duration(*v.Spec.CompletionPolicy.ActiveDeadlineSeconds)*time.Second) + } + } +} + +func (d DelayEnqueueRequest) Update(event event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { + return +} + +func (d DelayEnqueueRequest) Delete(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { + return +} + +func (d DelayEnqueueRequest) Generic(event event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) { + return +}