Skip to content

Commit

Permalink
WIP: Set Provider SynchronizationState
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgeorgousis committed Jan 17, 2025
1 parent 59c07d4 commit f58ede9
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 13 deletions.
2 changes: 1 addition & 1 deletion apis/pipelines/v1alpha6/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// +kubebuilder:object:root=true
// +kubebuilder:resource:shortName="mlprv"
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="SynchronizationState",type="string",JSONPath=".status.conditions[?(@.type == 'Synchronized')].reason"
// +kubebuilder:printcolumn:name="SynchronizationState",type="string",JSONPath=".status.synchronizationState"
// +kubebuilder:storageversion
// +kubebuilder:pruning:PreserveUnknownFields
type Provider struct {
Expand Down
3 changes: 2 additions & 1 deletion apis/pipelines/v1alpha6/test_generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func RandomProviderSpec() ProviderSpec {
}

return ProviderSpec{
CliImage: "kfp-operator-stub-provider",
ServiceImage: fmt.Sprintf("%s:%s", RandomLowercaseString(), RandomShortHash()),
CliImage: fmt.Sprintf("%s:%s", RandomLowercaseString(), RandomShortHash()),
ExecutionMode: "none",
ServiceAccount: "default",
DefaultBeamArgs: RandomNamedValues(),
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/pipelines.kubeflow.org_providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ spec:
subresources:
status: {}
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type == 'Synchronized')].reason
- jsonPath: .status.synchronizationState
name: SynchronizationState
type: string
name: v1alpha6
Expand Down
53 changes: 51 additions & 2 deletions controllers/pipelines/provider_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"k8s.io/apimachinery/pkg/runtime"

"github.com/sky-uk/kfp-operator/apis"
config "github.com/sky-uk/kfp-operator/apis/config/v1alpha6"
"github.com/sky-uk/kfp-operator/apis/pipelines"
pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha6"
Expand Down Expand Up @@ -67,38 +68,75 @@ func (r *ProviderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
desiredDeployment, err := r.constructDeployment(provider, req.Namespace, *r.Config.DeepCopy())
if err != nil {
logger.Error(err, "unable to construct provider service deployment")
provider.Status.SynchronizationState = apis.Failed
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}
return ctrl.Result{}, err
}

logger.Info("desired provider deployment", "deployment", desiredDeployment)

existingDeployment, err := r.getDeployment(ctx, req.Namespace, provider.Name, *provider)
if err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "unable to get existing deployment")
provider.Status.SynchronizationState = apis.Failed
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}
return ctrl.Result{}, err
}

logger.Info("desired provider deployment", "deployment", desiredDeployment)

if existingDeployment != nil {
if deploymentFailed(existingDeployment) {
provider.Status.SynchronizationState = apis.Failed
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}
}

logger.Info("found existing provider deployment", "deployment", existingDeployment)
if existingDeployment.Annotations != nil && existingDeployment.Annotations[ResourceHashAnnotation] != desiredDeployment.Annotations[ResourceHashAnnotation] {

logger.Info("resource hash mismatch, updating deployment")
existingDeployment.Spec = desiredDeployment.Spec
existingDeployment.SetLabels(desiredDeployment.Labels)
existingDeployment.Annotations[ResourceHashAnnotation] = desiredDeployment.Annotations[ResourceHashAnnotation]

if err = r.EC.Client.Update(ctx, existingDeployment); err != nil {
logger.Error(err, "unable to update provider service deployment", "deployment", desiredDeployment)
provider.Status.SynchronizationState = apis.Failed
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}
return ctrl.Result{}, err
}
}
} else {
provider.Status.SynchronizationState = apis.Creating
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}

if err = r.EC.Client.Create(ctx, desiredDeployment); err != nil {
logger.Error(err, "unable to create provider service deployment")
provider.Status.SynchronizationState = apis.Failed
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}
return ctrl.Result{}, err
}

logger.Info("created provider deployment", "deployment", desiredDeployment)
}

if !deploymentFailed(existingDeployment) {
provider.Status.SynchronizationState = apis.Succeeded
if err = r.EC.Client.Status().Update(ctx, provider); err != nil {
logger.Error(err, "unable to update provider resource status", "provider", provider)
}
}

duration := time.Since(startTime)
logger.Info("reconciliation ended", logkeys.Duration, duration)

Expand Down Expand Up @@ -203,3 +241,14 @@ func setResourceHashAnnotation(deployment *appsv1.Deployment) error {

return nil
}

func deploymentFailed(deployment *appsv1.Deployment) bool {
for _, condition := range deployment.Status.Conditions {
if (condition.Type == appsv1.DeploymentProgressing && condition.Status == v1.ConditionFalse) ||
(condition.Type == appsv1.DeploymentAvailable && condition.Status == v1.ConditionFalse) ||
(condition.Type == appsv1.DeploymentReplicaFailure && condition.Status == v1.ConditionTrue) {
return true
}
}
return false
}
17 changes: 12 additions & 5 deletions controllers/pipelines/suite_decoupled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,10 @@ var _ = BeforeSuite(func() {
Expect(NewRunConfigurationReconciler(ec, k8sManager.GetScheme(), TestConfig).SetupWithManager(k8sManager)).To(Succeed())
Expect(NewRunScheduleReconciler(ec, &workflowRepository, TestConfig).SetupWithManager(k8sManager)).To(Succeed())
Expect(NewExperimentReconciler(ec, &workflowRepository, TestConfig).SetupWithManager(k8sManager)).To(Succeed())
Expect(NewProviderReconciler(ec, TestConfig).SetupWithManager(k8sManager)).To(Succeed())
Expect((&pipelinesv1.RunConfiguration{}).SetupWebhookWithManager(k8sManager)).To(Succeed())
Expect((&pipelinesv1.Run{}).SetupWebhookWithManager(k8sManager)).To(Succeed())

Provider = pipelinesv1.RandomProvider()
Provider.Name = apis.RandomLowercaseString()
Provider.Namespace = TestConfig.WorkflowNamespace
Expect(K8sClient.Create(Ctx, Provider)).To(Succeed())

go func() {
Expect(k8sManager.Start(ctrl.SetupSignalHandler())).To(Succeed())
}()
Expand Down Expand Up @@ -137,6 +133,17 @@ var _ = BeforeEach(func() {
for _, r := range allPipelines.Items {
Expect(client.IgnoreNotFound(K8sClient.Delete(Ctx, &r))).To(Succeed())
}

allProviders := &pipelinesv1.ProviderList{}
Expect(K8sClient.List(Ctx, allProviders)).To(Succeed())
for _, r := range allProviders.Items {
Expect(client.IgnoreNotFound(K8sClient.Delete(Ctx, &r))).To(Succeed())
}

Provider = pipelinesv1.RandomProvider()
Provider.Name = apis.RandomLowercaseString()
Provider.Namespace = TestConfig.WorkflowNamespace
Expect(K8sClient.Create(Ctx, Provider)).To(Succeed())
})

var _ = AfterSuite(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ spec:
status: {}
{{- end }}
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type == 'Synchronized')].reason
- jsonPath: .status.synchronizationState
name: SynchronizationState
type: string
name: v1alpha6
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f58ede9

Please sign in to comment.