diff --git a/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go b/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go index f615c0912c..47b808b2d2 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_broadcastjob_controller.go @@ -17,6 +17,10 @@ package advancedcronjob import ( "context" "fmt" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sort" "time" @@ -40,7 +44,9 @@ func watchBroadcastJob(c controller.Controller) error { }); err != nil { return err } - + if err := c.Watch(&source.Kind{Type: &appsv1alpha1.BroadcastJob{}}, &DelayEnqueueRequest{}); err != nil { + return err + } return nil } @@ -383,3 +389,29 @@ func (r *ReconcileAdvancedCronJob) reconcileBroadcastJob(ctx context.Context, re // we'll requeue once we see the running job, and update our status return scheduledResult, nil } + +type DelayEnqueueRequest struct { +} + +func (d DelayEnqueueRequest) Create(event event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) { + if v, ok := event.Object.(*appsv1alpha1.BroadcastJob); ok { + if v.Spec.CompletionPolicy.Type == appsv1alpha1.Always && v.Spec.CompletionPolicy.ActiveDeadlineSeconds != nil { + limitingInterface.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: event.Object.GetName(), + Namespace: event.Object.GetNamespace(), + }}, time.Duration(*v.Spec.CompletionPolicy.ActiveDeadlineSeconds)*time.Second) + } + } +} + +func (d DelayEnqueueRequest) Update(event event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) { + return +} + +func (d DelayEnqueueRequest) Delete(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) { + return +} + +func (d DelayEnqueueRequest) Generic(event event.GenericEvent, limitingInterface workqueue.RateLimitingInterface) { + return +}