Skip to content

Commit

Permalink
Improve handling of watch expired errors for attached pods
Browse files Browse the repository at this point in the history
We are currently getting issues on a regular basis with pods succeeding,
but Theatre thinking there were issues.

This updates the code to have a specific handling of the expire error,
and to retry 2 more times, with growing backoff.
  • Loading branch information
mbarrin-incident committed Dec 12, 2024
1 parent 2956e6f commit e282019
Showing 1 changed file with 86 additions and 55 deletions.
141 changes: 86 additions & 55 deletions pkg/workloads/console/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,72 +251,103 @@ func (c *Runner) waitForSuccess(ctx context.Context, csl *workloadsv1alpha1.Cons
}

listOptions := metav1.SingleObject(pod.ObjectMeta)
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
}

return fmt.Errorf("error retrieving pod: %w", err)
}

if succeeded(pod) {
return nil
}
// Set list options to the original resource version from the initial attach pod request
rv := pod.ObjectMeta.ResourceVersion

if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}
// Attmept to watch the pod for a successful completion up to 3 times
// This loop will only iterate if the watch returns the expired error
maxAttempts := 3
for i := 0; i < maxAttempts; i++ {
// Set the list options resource version to the last known resource version
// On the first loop this will be from the GetAttachablePod call above
// And if this has looped, then it will be the last value from the watch
listOptions.ResourceVersion = rv

status := w.ResultChan()
defer w.Stop()
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early.
if status, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("received failure from Kubernetes: %s", status.Reason)
}
return fmt.Errorf("error retrieving pod: %w", err)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}
if succeeded(pod) {
return nil
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()

// If we get a watch expired error, then we will break to this loop, which will action the next line of code after the select
// This will break out of the for loop and trigger another attempt to watch the pod
WATCHEXPIRED:
for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early.
if status, ok := event.Object.(*metav1.Status); ok {
// Handle the watch having expired
if status.Reason == metav1.StatusReasonExpired {
// We get the resource version in the status object.
// We set it here, so when this code loops again, we pick up from where we left off.
rv = status.ResourceVersion
break WATCHEXPIRED
}
return fmt.Errorf("received failure from Kubernetes: %s", status.Reason)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}

// Sleep for a backoff period before trying again
// 0.5s -> 1s -> 1.5s
backoff := time.Duration(500*(i+1)) * time.Millisecond
time.Sleep(backoff)
}
// This error will only be raised after we have used all attempts to get a successful watch for the pod
return fmt.Errorf("received watch expired %d times", maxAttempts)
}

type GetOptions struct {
Expand Down

0 comments on commit e282019

Please sign in to comment.