From 245d50c544658e1b24103ddb025d196e8618ed12 Mon Sep 17 00:00:00 2001 From: Filinto Duran Date: Sun, 8 Dec 2024 11:46:15 -0600 Subject: [PATCH] remove other commits, and add test files missing Signed-off-by: Filinto Duran --- go.mod | 2 +- pkg/kubernetes/client.go | 49 ++++++++++++ pkg/kubernetes/resources.go | 75 +++++++++++++++++++ pkg/kubernetes/resources_test.go | 50 +++++++++++++ pkg/kubernetes/run.go | 25 +++++-- .../testdata/resources/observability.yaml | 10 +++ .../testdata/resources/resiliency.yaml | 26 +++++++ .../testdata/resources/state_redis.yaml | 15 ++++ 8 files changed, 245 insertions(+), 7 deletions(-) create mode 100644 pkg/kubernetes/resources.go create mode 100644 pkg/kubernetes/resources_test.go create mode 100644 pkg/kubernetes/testdata/resources/observability.yaml create mode 100644 pkg/kubernetes/testdata/resources/resiliency.yaml create mode 100644 pkg/kubernetes/testdata/resources/state_redis.yaml diff --git a/go.mod b/go.mod index 146089460..f2297bd26 100644 --- a/go.mod +++ b/go.mod @@ -253,7 +253,7 @@ require ( k8s.io/kubectl v0.26.0 // indirect k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect oras.land/oras-go v1.2.2 // indirect - sigs.k8s.io/controller-runtime v0.16.3 // indirect + sigs.k8s.io/controller-runtime v0.16.3 sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.15.0 // indirect sigs.k8s.io/kustomize/kyaml v0.15.0 // indirect diff --git a/pkg/kubernetes/client.go b/pkg/kubernetes/client.go index 115bc07be..a153033cb 100644 --- a/pkg/kubernetes/client.go +++ b/pkg/kubernetes/client.go @@ -14,12 +14,15 @@ limitations under the License. package kubernetes import ( + "errors" "flag" "sync" + "k8s.io/apimachinery/pkg/runtime" k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" scheme "github.com/dapr/dapr/pkg/client/clientset/versioned" @@ -31,6 +34,14 @@ import ( // oidc auth _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" + + componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1" + configurationapi "github.com/dapr/dapr/pkg/apis/configuration/v1alpha1" + httpendpointsapi "github.com/dapr/dapr/pkg/apis/httpEndpoint/v1alpha1" + resiliencyapi "github.com/dapr/dapr/pkg/apis/resiliency/v1alpha1" + subscriptionsapiV1alpha1 "github.com/dapr/dapr/pkg/apis/subscriptions/v1alpha1" + subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" ) var ( @@ -96,3 +107,41 @@ func DaprClient() (scheme.Interface, error) { } return scheme.NewForConfig(config) } + +// buildScheme builds the scheme for the controller-runtime client +// from https://github.com/dapr/dapr/blob/eb49e564fbd704ceb1379498fc8e94ad7110840f/pkg/operator/operator.go#L444-L466 +func buildScheme() (*runtime.Scheme, error) { + builders := []func(*runtime.Scheme) error{ + clientgoscheme.AddToScheme, + componentsapi.AddToScheme, + configurationapi.AddToScheme, + resiliencyapi.AddToScheme, + httpendpointsapi.AddToScheme, + subscriptionsapiV1alpha1.AddToScheme, + subapi.AddToScheme, + } + + errs := make([]error, len(builders)) + scheme := runtime.NewScheme() + for i, builder := range builders { + errs[i] = builder(scheme) + } + + return scheme, errors.Join(errs...) +} + +// CtrlClient returns a new Controller-Runtime Client (https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/client) - no caching +// with the scheme built with the Dapr API groups. +func CtrlClient() (client.Client, error) { + config, err := getConfig() + if err != nil { + return nil, err + } + + scheme, err := buildScheme() + if err != nil { + return nil, err + } + + return client.New(config, client.Options{Scheme: scheme}) +} diff --git a/pkg/kubernetes/resources.go b/pkg/kubernetes/resources.go new file mode 100644 index 000000000..db6640a1d --- /dev/null +++ b/pkg/kubernetes/resources.go @@ -0,0 +1,75 @@ +package kubernetes + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/dapr/cli/pkg/print" +) + +func getResources(resourcesFolder string) ([]client.Object, error) { + // Create YAML decoder + decUnstructured := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme) + + // Read files from the resources folder + files, err := os.ReadDir(resourcesFolder) + if err != nil { + return nil, fmt.Errorf("error reading resources folder: %w", err) + } + + var resources []client.Object + for _, file := range files { + if file.IsDir() || (!strings.HasSuffix(file.Name(), ".yaml") && !strings.HasSuffix(file.Name(), ".json")) { + continue + } + + // Read file content + content, err := os.ReadFile(filepath.Join(resourcesFolder, file.Name())) + if err != nil { + return nil, fmt.Errorf("error reading file %s: %w", file.Name(), err) + } + + // Decode YAML/JSON to unstructured + obj := &unstructured.Unstructured{} + _, _, err = decUnstructured.Decode(content, nil, obj) + if err != nil { + return nil, fmt.Errorf("error decoding file %s: %w", file.Name(), err) + } + + resources = append(resources, obj) + } + + return resources, nil +} + +func createOrUpdateResources(ctx context.Context, cl client.Client, resources []client.Object, namespace string) error { + // create resources in k8s + for _, resource := range resources { + // clone the resource to avoid modifying the original + obj := resource.DeepCopyObject().(*unstructured.Unstructured) + // Set namespace on the resource metadata + obj.SetNamespace(namespace) + + print.InfoStatusEvent(os.Stdout, "Deploying resource %q kind %q to Kubernetes", obj.GetName(), obj.GetKind()) + + if err := cl.Create(ctx, obj); err != nil { + if k8serrors.IsAlreadyExists(err) { + print.InfoStatusEvent(os.Stdout, "Resource %q kind %q already exists, updating", obj.GetName(), obj.GetKind()) + if err := cl.Update(ctx, obj); err != nil { + return err + } + } else { + return fmt.Errorf("error deploying resource %q kind %q to Kubernetes: %w", obj.GetName(), obj.GetKind(), err) + } + } + } + return nil +} diff --git a/pkg/kubernetes/resources_test.go b/pkg/kubernetes/resources_test.go new file mode 100644 index 000000000..8ea9c1946 --- /dev/null +++ b/pkg/kubernetes/resources_test.go @@ -0,0 +1,50 @@ +package kubernetes + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetResources(t *testing.T) { + tests := []struct { + name string + folder string + expectError bool + expectedCount int + expectedResourceKinds []string + }{ + { + name: "resources from testdata", + folder: filepath.Join("testdata", "resources"), + expectError: false, + expectedCount: 3, + expectedResourceKinds: []string{"Component", "Configuration", "Resiliency"}, + }, + { + name: "non-existent folder", + folder: filepath.Join("testdata", "non-existent"), + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resources, err := getResources(tt.folder) + if tt.expectError { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Len(t, resources, tt.expectedCount) + foundKinds := []string{} + for _, resource := range resources { + foundKinds = append(foundKinds, resource.GetObjectKind().GroupVersionKind().Kind) + } + assert.ElementsMatch(t, tt.expectedResourceKinds, foundKinds) + }) + } +} diff --git a/pkg/kubernetes/run.go b/pkg/kubernetes/run.go index 6a7199d64..0eb7f33cb 100644 --- a/pkg/kubernetes/run.go +++ b/pkg/kubernetes/run.go @@ -119,6 +119,23 @@ func Run(runFilePath string, config runfileconfig.RunFileConfig) (bool, error) { runStates := []runState{} print.InfoStatusEvent(os.Stdout, "This is a preview feature and subject to change in future releases.") + ctrlClient, cErr := CtrlClient() + if cErr != nil { + // exit with error. + return true, fmt.Errorf("error getting controller-runtime k8s client: %w", cErr) + } + + resources, err := getResources(config.Common.ResourcesPath) + if err != nil { + print.FailureStatusEvent(os.Stderr, "Error getting resources from %q: %s", config.Common.ResourcesPath, err.Error()) + exitWithError = true + } + + if err := createOrUpdateResources(context.Background(), ctrlClient, resources, namespace); err != nil { + print.FailureStatusEvent(os.Stderr, "Error creating or updating resources: %s", err.Error()) + exitWithError = true + } + for _, app := range config.Apps { print.StatusEvent(os.Stdout, print.LogInfo, "Validating config and starting app %q", app.RunConfig.AppID) // Set defaults if zero value provided in config yaml. @@ -140,11 +157,7 @@ func Run(runFilePath string, config runfileconfig.RunFileConfig) (bool, error) { // create default deployment config. dep := createDeploymentConfig(daprClient, app) - if err != nil { - print.FailureStatusEvent(os.Stderr, "Error creating deployment file for app %q present in %s: %s", app.RunConfig.AppID, runFilePath, err.Error()) - exitWithError = true - break - } + // overwrite /.dapr/deploy/service.yaml. // overwrite /.dapr/deploy/deployment.yaml. @@ -297,7 +310,7 @@ func createDeploymentConfig(client versioned.Interface, app runfileconfig.App) d Name: app.AppID, Image: app.ContainerImage, Env: getEnv(app), - ImagePullPolicy: corev1.PullAlways, + ImagePullPolicy: corev1.PullPolicy(app.ContainerImagePullPolicy), }, }, }, diff --git a/pkg/kubernetes/testdata/resources/observability.yaml b/pkg/kubernetes/testdata/resources/observability.yaml new file mode 100644 index 000000000..74a388977 --- /dev/null +++ b/pkg/kubernetes/testdata/resources/observability.yaml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: daprConfig + namespace: default +spec: + tracing: + samplingRate: "1" + zipkin: + endpointAddress: "http://localhost:9411/api/v2/spans" \ No newline at end of file diff --git a/pkg/kubernetes/testdata/resources/resiliency.yaml b/pkg/kubernetes/testdata/resources/resiliency.yaml new file mode 100644 index 000000000..30c6e0706 --- /dev/null +++ b/pkg/kubernetes/testdata/resources/resiliency.yaml @@ -0,0 +1,26 @@ +apiVersion: dapr.io/v1alpha1 +kind: Resiliency +metadata: + name: myresiliency +scopes: + - checkout + +spec: + policies: + retries: + retryForever: + policy: constant + duration: 5s + maxRetries: -1 + + circuitBreakers: + simpleCB: + maxRequests: 1 + timeout: 5s + trip: consecutiveFailures >= 5 + + targets: + apps: + order-processor: + retry: retryForever + circuitBreaker: simpleCB \ No newline at end of file diff --git a/pkg/kubernetes/testdata/resources/state_redis.yaml b/pkg/kubernetes/testdata/resources/state_redis.yaml new file mode 100644 index 000000000..7c45ff885 --- /dev/null +++ b/pkg/kubernetes/testdata/resources/state_redis.yaml @@ -0,0 +1,15 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + initTimeout: 1m + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" \ No newline at end of file