Skip to content

Commit

Permalink
Merge pull request #1253 from wzshiming/fix/taken-node
Browse files Browse the repository at this point in the history
Fixed node being taken repeatedly causing queue block
  • Loading branch information
wzshiming authored Oct 29, 2024
2 parents 45e59c0 + edf20c5 commit 8c62e4b
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type NodeLeaseController struct {
mutateLeaseFunc func(*coordinationv1.Lease) error

delayQueue queue.WeightDelayingQueue[string]
holdLeaseSet maps.SyncMap[string, struct{}]
holdLeaseSet maps.SyncMap[string, bool]

holderIdentity string
onNodeManagedFunc func(nodeName string)
Expand Down Expand Up @@ -112,14 +112,14 @@ func (c *NodeLeaseController) syncWorker(ctx context.Context) {
if !ok {
return
}
_, ok = c.holdLeaseSet.Load(nodeName)
first, ok := c.holdLeaseSet.Load(nodeName)
if !ok {
continue
}

dur := c.interval()

lease, err := c.sync(ctx, nodeName)
lease, err := c.sync(ctx, nodeName, first)
if err != nil {
logger.Error("Failed to sync lease", err,
"node", nodeName,
Expand All @@ -134,6 +134,10 @@ func (c *NodeLeaseController) syncWorker(ctx context.Context) {
continue
}

if first {
c.holdLeaseSet.Store(nodeName, false)
}

now := c.clock.Now()
expireDuration := expireTime.Sub(now)
hold := tryAcquireOrRenew(lease, c.holderIdentity, now)
Expand All @@ -148,7 +152,7 @@ func (c *NodeLeaseController) interval() time.Duration {

// TryHold tries to hold a lease for the NodeLeaseController
func (c *NodeLeaseController) TryHold(name string) {
_, loaded := c.holdLeaseSet.LoadOrStore(name, struct{}{})
_, loaded := c.holdLeaseSet.LoadOrStore(name, true)
if !loaded {
c.delayQueue.Add(name)
}
Expand All @@ -171,7 +175,7 @@ func (c *NodeLeaseController) Held(name string) bool {
}

// sync syncs a lease for a node
func (c *NodeLeaseController) sync(ctx context.Context, nodeName string) (lease *coordinationv1.Lease, err error) {
func (c *NodeLeaseController) sync(ctx context.Context, nodeName string, first bool) (lease *coordinationv1.Lease, err error) {
logger := log.FromContext(ctx)
logger = logger.With("node", nodeName)

Expand All @@ -182,12 +186,15 @@ func (c *NodeLeaseController) sync(ctx context.Context, nodeName string) (lease
return nil, nil
}
logger.Info("Syncing lease")
lease, err := c.renewLease(ctx, lease)
lease, transitions, err := c.renewLease(ctx, lease)
if err != nil {
return nil, fmt.Errorf("failed to update lease using lease: %w", err)
}

c.onNodeManaged(nodeName)
// it is first or it has been transitioned, and then manage the node.
if first || transitions {
c.onNodeManaged(nodeName)
}
return lease, nil
}

Expand Down Expand Up @@ -249,7 +256,7 @@ func (c *NodeLeaseController) ensureLease(ctx context.Context, leaseName string)
}

// renewLease attempts to update the lease for maxUpdateRetries, call this once you're sure the lease has been created
func (c *NodeLeaseController) renewLease(ctx context.Context, base *coordinationv1.Lease) (*coordinationv1.Lease, error) {
func (c *NodeLeaseController) renewLease(ctx context.Context, base *coordinationv1.Lease) (*coordinationv1.Lease, bool, error) {
lease := base.DeepCopy()

transitions := format.ElemOrDefault(lease.Spec.HolderIdentity) != c.holderIdentity
Expand All @@ -263,15 +270,15 @@ func (c *NodeLeaseController) renewLease(ctx context.Context, base *coordination
if c.mutateLeaseFunc != nil {
err := c.mutateLeaseFunc(lease)
if err != nil {
return nil, err
return nil, false, err
}
}

lease, err := c.typedClient.CoordinationV1().Leases(lease.Namespace).Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return nil, err
return nil, false, err
}
return lease, nil
return lease, transitions, nil
}

// setNodeOwnerFunc helps construct a mutateLeaseFunc which sets a node OwnerReference to the given lease object
Expand Down

0 comments on commit 8c62e4b

Please sign in to comment.