diff --git a/apis/pipelines/v1alpha6/provider_types.go b/apis/pipelines/v1alpha6/provider_types.go index 4e0957c5..9451df11 100644 --- a/apis/pipelines/v1alpha6/provider_types.go +++ b/apis/pipelines/v1alpha6/provider_types.go @@ -13,7 +13,7 @@ import ( // +kubebuilder:object:root=true // +kubebuilder:resource:shortName="mlprv" // +kubebuilder:subresource:status -// +kubebuilder:printcolumn:name="SynchronizationState",type="string",JSONPath=".status.conditions[?(@.type == 'Synchronized')].reason" +// +kubebuilder:printcolumn:name="SynchronizationState",type="string",JSONPath=".status.synchronizationState" // +kubebuilder:storageversion // +kubebuilder:pruning:PreserveUnknownFields type Provider struct { diff --git a/apis/pipelines/v1alpha6/test_generators.go b/apis/pipelines/v1alpha6/test_generators.go index 62faebb8..6dfdfcaa 100644 --- a/apis/pipelines/v1alpha6/test_generators.go +++ b/apis/pipelines/v1alpha6/test_generators.go @@ -55,7 +55,8 @@ func RandomProviderSpec() ProviderSpec { } return ProviderSpec{ - CliImage: "kfp-operator-stub-provider", + ServiceImage: fmt.Sprintf("%s:%s", RandomLowercaseString(), RandomShortHash()), + CliImage: fmt.Sprintf("%s:%s", RandomLowercaseString(), RandomShortHash()), ExecutionMode: "none", ServiceAccount: "default", DefaultBeamArgs: RandomNamedValues(), diff --git a/config/crd/bases/pipelines.kubeflow.org_providers.yaml b/config/crd/bases/pipelines.kubeflow.org_providers.yaml index 8f6eb46c..f300d589 100644 --- a/config/crd/bases/pipelines.kubeflow.org_providers.yaml +++ b/config/crd/bases/pipelines.kubeflow.org_providers.yaml @@ -156,7 +156,7 @@ spec: subresources: status: {} - additionalPrinterColumns: - - jsonPath: .status.conditions[?(@.type == 'Synchronized')].reason + - jsonPath: .status.synchronizationState name: SynchronizationState type: string name: v1alpha6 diff --git a/controllers/pipelines/provider_controller.go b/controllers/pipelines/provider_controller.go index d483ddc4..0ade679c 100644 --- a/controllers/pipelines/provider_controller.go +++ b/controllers/pipelines/provider_controller.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" + "github.com/sky-uk/kfp-operator/apis" config "github.com/sky-uk/kfp-operator/apis/config/v1alpha6" "github.com/sky-uk/kfp-operator/apis/pipelines" pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha6" @@ -67,38 +68,75 @@ func (r *ProviderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c desiredDeployment, err := r.constructDeployment(provider, req.Namespace, *r.Config.DeepCopy()) if err != nil { logger.Error(err, "unable to construct provider service deployment") + provider.Status.SynchronizationState = apis.Failed + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } return ctrl.Result{}, err } + logger.Info("desired provider deployment", "deployment", desiredDeployment) + existingDeployment, err := r.getDeployment(ctx, req.Namespace, provider.Name, *provider) if err != nil && !apierrors.IsNotFound(err) { logger.Error(err, "unable to get existing deployment") + provider.Status.SynchronizationState = apis.Failed + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } return ctrl.Result{}, err } - logger.Info("desired provider deployment", "deployment", desiredDeployment) - if existingDeployment != nil { + if deploymentFailed(existingDeployment) { + provider.Status.SynchronizationState = apis.Failed + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } + } + logger.Info("found existing provider deployment", "deployment", existingDeployment) if existingDeployment.Annotations != nil && existingDeployment.Annotations[ResourceHashAnnotation] != desiredDeployment.Annotations[ResourceHashAnnotation] { + logger.Info("resource hash mismatch, updating deployment") existingDeployment.Spec = desiredDeployment.Spec existingDeployment.SetLabels(desiredDeployment.Labels) existingDeployment.Annotations[ResourceHashAnnotation] = desiredDeployment.Annotations[ResourceHashAnnotation] + if err = r.EC.Client.Update(ctx, existingDeployment); err != nil { logger.Error(err, "unable to update provider service deployment", "deployment", desiredDeployment) + provider.Status.SynchronizationState = apis.Failed + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } return ctrl.Result{}, err } } } else { + provider.Status.SynchronizationState = apis.Creating + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } + if err = r.EC.Client.Create(ctx, desiredDeployment); err != nil { logger.Error(err, "unable to create provider service deployment") + provider.Status.SynchronizationState = apis.Failed + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } return ctrl.Result{}, err } logger.Info("created provider deployment", "deployment", desiredDeployment) } + if !deploymentFailed(existingDeployment) { + provider.Status.SynchronizationState = apis.Succeeded + if err = r.EC.Client.Status().Update(ctx, provider); err != nil { + logger.Error(err, "unable to update provider resource status", "provider", provider) + } + } + duration := time.Since(startTime) logger.Info("reconciliation ended", logkeys.Duration, duration) @@ -203,3 +241,14 @@ func setResourceHashAnnotation(deployment *appsv1.Deployment) error { return nil } + +func deploymentFailed(deployment *appsv1.Deployment) bool { + for _, condition := range deployment.Status.Conditions { + if (condition.Type == appsv1.DeploymentProgressing && condition.Status == v1.ConditionFalse) || + (condition.Type == appsv1.DeploymentAvailable && condition.Status == v1.ConditionFalse) || + (condition.Type == appsv1.DeploymentReplicaFailure && condition.Status == v1.ConditionTrue) { + return true + } + } + return false +} diff --git a/controllers/pipelines/suite_decoupled_test.go b/controllers/pipelines/suite_decoupled_test.go index 58b75b4a..89286af7 100644 --- a/controllers/pipelines/suite_decoupled_test.go +++ b/controllers/pipelines/suite_decoupled_test.go @@ -100,14 +100,10 @@ var _ = BeforeSuite(func() { Expect(NewRunConfigurationReconciler(ec, k8sManager.GetScheme(), TestConfig).SetupWithManager(k8sManager)).To(Succeed()) Expect(NewRunScheduleReconciler(ec, &workflowRepository, TestConfig).SetupWithManager(k8sManager)).To(Succeed()) Expect(NewExperimentReconciler(ec, &workflowRepository, TestConfig).SetupWithManager(k8sManager)).To(Succeed()) + Expect(NewProviderReconciler(ec, TestConfig).SetupWithManager(k8sManager)).To(Succeed()) Expect((&pipelinesv1.RunConfiguration{}).SetupWebhookWithManager(k8sManager)).To(Succeed()) Expect((&pipelinesv1.Run{}).SetupWebhookWithManager(k8sManager)).To(Succeed()) - Provider = pipelinesv1.RandomProvider() - Provider.Name = apis.RandomLowercaseString() - Provider.Namespace = TestConfig.WorkflowNamespace - Expect(K8sClient.Create(Ctx, Provider)).To(Succeed()) - go func() { Expect(k8sManager.Start(ctrl.SetupSignalHandler())).To(Succeed()) }() @@ -137,6 +133,17 @@ var _ = BeforeEach(func() { for _, r := range allPipelines.Items { Expect(client.IgnoreNotFound(K8sClient.Delete(Ctx, &r))).To(Succeed()) } + + allProviders := &pipelinesv1.ProviderList{} + Expect(K8sClient.List(Ctx, allProviders)).To(Succeed()) + for _, r := range allProviders.Items { + Expect(client.IgnoreNotFound(K8sClient.Delete(Ctx, &r))).To(Succeed()) + } + + Provider = pipelinesv1.RandomProvider() + Provider.Name = apis.RandomLowercaseString() + Provider.Namespace = TestConfig.WorkflowNamespace + Expect(K8sClient.Create(Ctx, Provider)).To(Succeed()) }) var _ = AfterSuite(func() { diff --git a/helm/kfp-operator/templates/crd/bases/pipelines.kubeflow.org_providers.yaml b/helm/kfp-operator/templates/crd/bases/pipelines.kubeflow.org_providers.yaml index 64092e0f..88d6400b 100644 --- a/helm/kfp-operator/templates/crd/bases/pipelines.kubeflow.org_providers.yaml +++ b/helm/kfp-operator/templates/crd/bases/pipelines.kubeflow.org_providers.yaml @@ -157,7 +157,7 @@ spec: status: {} {{- end }} - additionalPrinterColumns: - - jsonPath: .status.conditions[?(@.type == 'Synchronized')].reason + - jsonPath: .status.synchronizationState name: SynchronizationState type: string name: v1alpha6 diff --git a/provider-service/kfp/internal/client/ml_metadata/metadata_store.pb.go b/provider-service/kfp/internal/client/ml_metadata/metadata_store.pb.go index 13afac0c..7f883be9 100644 --- a/provider-service/kfp/internal/client/ml_metadata/metadata_store.pb.go +++ b/provider-service/kfp/internal/client/ml_metadata/metadata_store.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc v4.24.4 // source: ml_metadata/proto/metadata_store.proto package ml_metadata diff --git a/provider-service/kfp/internal/client/ml_metadata/metadata_store_service.pb.go b/provider-service/kfp/internal/client/ml_metadata/metadata_store_service.pb.go index 7535cddf..655e70d1 100644 --- a/provider-service/kfp/internal/client/ml_metadata/metadata_store_service.pb.go +++ b/provider-service/kfp/internal/client/ml_metadata/metadata_store_service.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc v4.24.4 // source: ml_metadata/proto/metadata_store_service.proto package ml_metadata