diff --git a/CHANGELOG.md b/CHANGELOG.md index 28983b839..2f5bfc2b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ nav_order: 1 Enable remote-backed storage. - Add `aiven_service_integration_endpoint` resource and datasource field `external_azure_blob_storage_user_config`: ExternalAzureBlobStorage user configurable settings +- Add `aiven_flink_jar_application`, `aiven_flink_jar_application_version` and `aiven_flink_jar_application_deployment` BETA resources ## [4.32.0] - 2025-01-14 diff --git a/docs/resources/flink_jar_application.md b/docs/resources/flink_jar_application.md new file mode 100644 index 000000000..14f473566 --- /dev/null +++ b/docs/resources/flink_jar_application.md @@ -0,0 +1,113 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "aiven_flink_jar_application Resource - terraform-provider-aiven" +subcategory: "" +description: |- + Creates and manages an Aiven for Apache Flink® jar application https://aiven.io/docs/products/flink/howto/create-jar-application. + This resource is in the beta stage and may change without notice. Set + the PROVIDER_AIVEN_ENABLE_BETA environment variable to use the resource. +--- + +# aiven_flink_jar_application (Resource) + +Creates and manages an [Aiven for Apache Flink® jar application](https://aiven.io/docs/products/flink/howto/create-jar-application). + +**This resource is in the beta stage and may change without notice.** Set +the `PROVIDER_AIVEN_ENABLE_BETA` environment variable to use the resource. + +## Example Usage + +```terraform +resource "aiven_flink_jar_application" "example" { + project = data.aiven_project.example.project + service_name = "example-flink-service" + name = "example-app-jar" +} +``` + + +## Schema + +### Required + +- `name` (String) Application name. Maximum length: `128`. +- `project` (String) Project name. Changing this property forces recreation of the resource. +- `service_name` (String) Service name. Changing this property forces recreation of the resource. + +### Optional + +- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts)) + +### Read-Only + +- `application_id` (String) Application ID. +- `application_versions` (List of Object) JarApplicationVersions. (see [below for nested schema](#nestedatt--application_versions)) +- `created_at` (String) The creation timestamp of this entity in ISO 8601 format, always in UTC. +- `created_by` (String) The creator of this entity. +- `current_deployment` (List of Object) Flink JarApplicationDeployment. (see [below for nested schema](#nestedatt--current_deployment)) +- `id` (String) The ID of this resource. +- `updated_at` (String) The update timestamp of this entity in ISO 8601 format, always in UTC. +- `updated_by` (String) The latest updater of this entity. + + +### Nested Schema for `timeouts` + +Optional: + +- `create` (String) +- `default` (String) +- `delete` (String) +- `read` (String) +- `update` (String) + + + +### Nested Schema for `application_versions` + +Read-Only: + +- `created_at` (String) +- `created_by` (String) +- `file_info` (List of Object) (see [below for nested schema](#nestedobjatt--application_versions--file_info)) +- `id` (String) +- `version` (Number) + + +### Nested Schema for `application_versions.file_info` + +Read-Only: + +- `file_sha256` (String) +- `file_size` (Number) +- `file_status` (String) +- `url` (String) +- `verify_error_code` (Number) +- `verify_error_message` (String) + + + + +### Nested Schema for `current_deployment` + +Read-Only: + +- `created_at` (String) +- `created_by` (String) +- `entry_class` (String) +- `error_msg` (String) +- `id` (String) +- `job_id` (String) +- `last_savepoint` (String) +- `parallelism` (Number) +- `program_args` (Set of String) +- `starting_savepoint` (String) +- `status` (String) +- `version_id` (String) + +## Import + +Import is supported using the following syntax: + +```shell +terraform import aiven_flink_jar_application.example PROJECT/SERVICE_NAME/APPLICATION_ID +``` diff --git a/docs/resources/flink_jar_application_deployment.md b/docs/resources/flink_jar_application_deployment.md new file mode 100644 index 000000000..0e3e0b001 --- /dev/null +++ b/docs/resources/flink_jar_application_deployment.md @@ -0,0 +1,89 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "aiven_flink_jar_application_deployment Resource - terraform-provider-aiven" +subcategory: "" +description: |- + Creates and manages the deployment of an Aiven for Apache Flink® application. + This resource is in the beta stage and may change without notice. Set + the PROVIDER_AIVEN_ENABLE_BETA environment variable to use the resource. +--- + +# aiven_flink_jar_application_deployment (Resource) + +Creates and manages the deployment of an Aiven for Apache Flink® application. + +**This resource is in the beta stage and may change without notice.** Set +the `PROVIDER_AIVEN_ENABLE_BETA` environment variable to use the resource. + +## Example Usage + +```terraform +resource "aiven_flink_jar_application" "example" { + project = data.aiven_project.example.project + service_name = "example-flink-service" + name = "example-app-jar" +} + +resource "aiven_flink_jar_application_version" "example" { + project = data.aiven_project.example.project + service_name = aiven_flink.example.service_name + application_id = aiven_flink_application.example.application_id + source = "./example.jar" +} + +resource "aiven_flink_jar_application_deployment" "example" { + project = data.aiven_project.example.project + service_name = aiven_flink.example.service_name + application_id = aiven_flink_jar_application.example.application_id + version_id = aiven_flink_jar_application_version.example.application_version_id +} +``` + + +## Schema + +### Required + +- `application_id` (String) Application Id. Changing this property forces recreation of the resource. +- `project` (String) Project name. Changing this property forces recreation of the resource. +- `service_name` (String) Service name. Changing this property forces recreation of the resource. +- `version_id` (String) ApplicationVersion ID. Maximum length: `36`. Changing this property forces recreation of the resource. + +### Optional + +- `entry_class` (String) The fully qualified name of the entry class to pass during Flink job submission through the entryClass parameter. Maximum length: `128`. +- `parallelism` (Number) Reading of Flink parallel execution documentation is recommended before setting this value to other than 1. Please do not set this value higher than (total number of nodes x number_of_task_slots), or every new job created will fail. +- `program_args` (Set of String) Arguments to pass during Flink job submission through the programArgsList parameter. +- `restart_enabled` (Boolean) Specifies whether a Flink Job is restarted in case it fails. Changing this property forces recreation of the resource. +- `starting_savepoint` (String) Job savepoint. Maximum length: `2048`. +- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts)) + +### Read-Only + +- `created_at` (String) The creation timestamp of this entity in ISO 8601 format, always in UTC. +- `created_by` (String) The creator of this entity. +- `deployment_id` (String) Deployment ID. +- `error_msg` (String) Error message describing what caused deployment to fail. +- `id` (String) The ID of this resource. +- `job_id` (String) Job ID. +- `last_savepoint` (String) Job savepoint. +- `status` (String) Deployment status. The possible values are `CANCELED`, `CANCELLING`, `CANCELLING_REQUESTED`, `CREATED`, `DELETE_REQUESTED`, `DELETING`, `FAILED`, `FAILING`, `FINISHED`, `INITIALIZING`, `RECONCILING`, `RESTARTING`, `RUNNING`, `SAVING`, `SAVING_AND_STOP`, `SAVING_AND_STOP_REQUESTED` and `SUSPENDED`. + + +### Nested Schema for `timeouts` + +Optional: + +- `create` (String) +- `default` (String) +- `delete` (String) +- `read` (String) +- `update` (String) + +## Import + +Import is supported using the following syntax: + +```shell +terraform import aiven_flink_jar_application_deployment.example PROJECT/SERVICE_NAME/APPLICATION_ID/DEPLOYMENT_ID +``` diff --git a/docs/resources/flink_jar_application_version.md b/docs/resources/flink_jar_application_version.md new file mode 100644 index 000000000..ae2843a73 --- /dev/null +++ b/docs/resources/flink_jar_application_version.md @@ -0,0 +1,89 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "aiven_flink_jar_application_version Resource - terraform-provider-aiven" +subcategory: "" +description: |- + Creates and manages an Aiven for Apache Flink® jar application version. + This resource is in the beta stage and may change without notice. Set + the PROVIDER_AIVEN_ENABLE_BETA environment variable to use the resource. +--- + +# aiven_flink_jar_application_version (Resource) + +Creates and manages an Aiven for Apache Flink® jar application version. + +**This resource is in the beta stage and may change without notice.** Set +the `PROVIDER_AIVEN_ENABLE_BETA` environment variable to use the resource. + +## Example Usage + +```terraform +resource "aiven_flink_jar_application" "example" { + project = data.aiven_project.example.project + service_name = "example-flink-service" + name = "example-app-jar" +} + +resource "aiven_flink_jar_application_version" "example" { + project = data.aiven_project.example.project + service_name = aiven_flink.example.service_name + application_id = aiven_flink_application.example.application_id + source = "./example.jar" +} +``` + + +## Schema + +### Required + +- `application_id` (String) Application Id. Changing this property forces recreation of the resource. +- `project` (String) Project name. Changing this property forces recreation of the resource. +- `service_name` (String) Service name. Changing this property forces recreation of the resource. +- `source` (String) The path to the jar file to upload. + +### Optional + +- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts)) + +### Read-Only + +- `application_version_id` (String) ApplicationVersion ID. +- `created_at` (String) The creation timestamp of this entity in ISO 8601 format, always in UTC. +- `created_by` (String) The creator of this entity. +- `file_info` (List of Object) Flink JarApplicationVersion FileInfo. (see [below for nested schema](#nestedatt--file_info)) +- `id` (String) The ID of this resource. +- `source_checksum` (String) The sha256 checksum of the jar file to upload. +- `version` (Number) Version number. + + +### Nested Schema for `timeouts` + +Optional: + +- `create` (String) +- `default` (String) +- `delete` (String) +- `read` (String) +- `update` (String) + + + +### Nested Schema for `file_info` + +Read-Only: + +- `file_sha256` (String) +- `file_size` (Number) +- `file_status` (String) +- `url` (String) +- `verify_error_code` (Number) +- `verify_error_message` (String) + +## Import + +Import is supported using the following syntax: + +```shell +terraform import aiven_flink_jar_application_version.example PROJECT/SERVICE_NAME/APPLICATION_ID/APPLICATION_VERSION_ID +``` diff --git a/examples/resources/aiven_flink_jar_application/import.sh b/examples/resources/aiven_flink_jar_application/import.sh new file mode 100644 index 000000000..47e816617 --- /dev/null +++ b/examples/resources/aiven_flink_jar_application/import.sh @@ -0,0 +1 @@ +terraform import aiven_flink_jar_application.example PROJECT/SERVICE_NAME/APPLICATION_ID diff --git a/examples/resources/aiven_flink_jar_application/resource.tf b/examples/resources/aiven_flink_jar_application/resource.tf new file mode 100644 index 000000000..6e6599ae4 --- /dev/null +++ b/examples/resources/aiven_flink_jar_application/resource.tf @@ -0,0 +1,5 @@ +resource "aiven_flink_jar_application" "example" { + project = data.aiven_project.example.project + service_name = "example-flink-service" + name = "example-app-jar" +} diff --git a/examples/resources/aiven_flink_jar_application_deployment/import.sh b/examples/resources/aiven_flink_jar_application_deployment/import.sh new file mode 100644 index 000000000..61e8373ed --- /dev/null +++ b/examples/resources/aiven_flink_jar_application_deployment/import.sh @@ -0,0 +1 @@ +terraform import aiven_flink_jar_application_deployment.example PROJECT/SERVICE_NAME/APPLICATION_ID/DEPLOYMENT_ID diff --git a/examples/resources/aiven_flink_jar_application_deployment/resource.tf b/examples/resources/aiven_flink_jar_application_deployment/resource.tf new file mode 100644 index 000000000..b49ff4d93 --- /dev/null +++ b/examples/resources/aiven_flink_jar_application_deployment/resource.tf @@ -0,0 +1,19 @@ +resource "aiven_flink_jar_application" "example" { + project = data.aiven_project.example.project + service_name = "example-flink-service" + name = "example-app-jar" +} + +resource "aiven_flink_jar_application_version" "example" { + project = data.aiven_project.example.project + service_name = aiven_flink.example.service_name + application_id = aiven_flink_application.example.application_id + source = "./example.jar" +} + +resource "aiven_flink_jar_application_deployment" "example" { + project = data.aiven_project.example.project + service_name = aiven_flink.example.service_name + application_id = aiven_flink_jar_application.example.application_id + version_id = aiven_flink_jar_application_version.example.application_version_id +} diff --git a/examples/resources/aiven_flink_jar_application_version/import.sh b/examples/resources/aiven_flink_jar_application_version/import.sh new file mode 100644 index 000000000..f22c4f5ab --- /dev/null +++ b/examples/resources/aiven_flink_jar_application_version/import.sh @@ -0,0 +1 @@ +terraform import aiven_flink_jar_application_version.example PROJECT/SERVICE_NAME/APPLICATION_ID/APPLICATION_VERSION_ID diff --git a/examples/resources/aiven_flink_jar_application_version/resource.tf b/examples/resources/aiven_flink_jar_application_version/resource.tf new file mode 100644 index 000000000..c27f6773f --- /dev/null +++ b/examples/resources/aiven_flink_jar_application_version/resource.tf @@ -0,0 +1,12 @@ +resource "aiven_flink_jar_application" "example" { + project = data.aiven_project.example.project + service_name = "example-flink-service" + name = "example-app-jar" +} + +resource "aiven_flink_jar_application_version" "example" { + project = data.aiven_project.example.project + service_name = aiven_flink.example.service_name + application_id = aiven_flink_application.example.application_id + source = "./example.jar" +} diff --git a/internal/acctest/acctest.go b/internal/acctest/acctest.go index da84036a8..f08b40568 100644 --- a/internal/acctest/acctest.go +++ b/internal/acctest/acctest.go @@ -3,8 +3,11 @@ package acctest import ( "context" "fmt" + "io" "log" + "net/http" "os" + "path/filepath" "sort" "strings" "sync" @@ -230,3 +233,26 @@ func ResourceFromState(state *terraform.State, name string) (*terraform.Resource return rs, nil } + +func DownloadTempFile(url string) (func(), string, error) { + file, err := os.CreateTemp("", "temp_*"+filepath.Ext(url)) + if err != nil { + return nil, "", err + } + + rsp, err := http.Get(url) + if err != nil { + return nil, "", err + } + + if rsp.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("failed to download file: %s", rsp.Status) + } + + _, err = io.Copy(file, rsp.Body) + if err != nil { + return nil, "", err + } + + return func() { os.Remove(file.Name()) }, file.Name(), nil +} diff --git a/internal/sdkprovider/provider/provider.go b/internal/sdkprovider/provider/provider.go index 4e0634002..61b1e2c88 100644 --- a/internal/sdkprovider/provider/provider.go +++ b/internal/sdkprovider/provider/provider.go @@ -243,10 +243,13 @@ func Provider(version string) (*schema.Provider, error) { "aiven_m3aggregator": m3db.ResourceM3Aggregator(), // flink - "aiven_flink": flink.ResourceFlink(), - "aiven_flink_application": flink.ResourceFlinkApplication(), - "aiven_flink_application_version": flink.ResourceFlinkApplicationVersion(), - "aiven_flink_application_deployment": flink.ResourceFlinkApplicationDeployment(), + "aiven_flink": flink.ResourceFlink(), + "aiven_flink_application": flink.ResourceFlinkApplication(), + "aiven_flink_application_version": flink.ResourceFlinkApplicationVersion(), + "aiven_flink_application_deployment": flink.ResourceFlinkApplicationDeployment(), + "aiven_flink_jar_application": flink.ResourceFlinkJarApplication(), + "aiven_flink_jar_application_version": flink.ResourceFlinkJarApplicationVersion(), + "aiven_flink_jar_application_deployment": flink.ResourceFlinkJarApplicationDeployment(), // opensearch "aiven_opensearch": opensearch.ResourceOpenSearch(), @@ -294,6 +297,9 @@ func Provider(version string) (*schema.Provider, error) { "aiven_alloydbomni", "aiven_alloydbomni_user", "aiven_alloydbomni_database", + "aiven_flink_jar_application", + "aiven_flink_jar_application_version", + "aiven_flink_jar_application_deployment", } betaDataSources := []string{ diff --git a/internal/sdkprovider/service/flink/flink_jar_application.go b/internal/sdkprovider/service/flink/flink_jar_application.go new file mode 100644 index 000000000..4d00df558 --- /dev/null +++ b/internal/sdkprovider/service/flink/flink_jar_application.go @@ -0,0 +1,99 @@ +package flink + +import ( + "context" + + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/flinkjarapplication" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + + "github.com/aiven/terraform-provider-aiven/internal/common" + "github.com/aiven/terraform-provider-aiven/internal/schemautil" +) + +func ResourceFlinkJarApplication() *schema.Resource { + return &schema.Resource{ + Description: "Creates and manages an [Aiven for Apache Flink® jar application](https://aiven.io/docs/products/flink/howto/create-jar-application).", + ReadContext: common.WithGenClient(flinkJarApplicationRead), + CreateContext: common.WithGenClient(flinkJarApplicationCreate), + UpdateContext: common.WithGenClient(flinkJarApplicationUpdate), + DeleteContext: common.WithGenClient(flinkJarApplicationDelete), + Importer: &schema.ResourceImporter{ + StateContext: schema.ImportStatePassthroughContext, + }, + Timeouts: schemautil.DefaultResourceTimeouts(), + Schema: flinkJarApplicationSchema(), + } +} + +func flinkJarApplicationCreate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + req := new(flinkjarapplication.ServiceFlinkCreateJarApplicationIn) + err := schemautil.ResourceDataGet(d, req, schemautil.RenameAliases(flinkJarApplicationRename())) + if err != nil { + return err + } + + project := d.Get("project").(string) + serviceName := d.Get("service_name").(string) + rsp, err := client.ServiceFlinkCreateJarApplication(ctx, project, serviceName, req) + if err != nil { + return err + } + + d.SetId(schemautil.BuildResourceID(project, serviceName, rsp.Id)) + return flinkJarApplicationRead(ctx, d, client) +} + +func flinkJarApplicationRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, applicationID, err := schemautil.SplitResourceID3(d.Id()) + if err != nil { + return err + } + + rsp, err := client.ServiceFlinkGetJarApplication(ctx, project, serviceName, applicationID) + if err != nil { + return schemautil.ResourceReadHandleNotFound(err, d) + } + + // Deployment is created after the application is created. + // That is a circular dependency, and it fails with a non-empty plan in tests. + // Setting an empty object to suppress the diff. + if rsp.CurrentDeployment == nil { + rsp.CurrentDeployment = new(flinkjarapplication.CurrentDeploymentOut) + } + + return schemautil.ResourceDataSet( + flinkJarApplicationSchema(), d, rsp, + schemautil.RenameAliasesReverse(flinkJarApplicationRename()), + ) +} + +func flinkJarApplicationUpdate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + req := new(flinkjarapplication.ServiceFlinkUpdateJarApplicationIn) + err := schemautil.ResourceDataGet(d, req) + if err != nil { + return err + } + + project, serviceName, applicationID, err := schemautil.SplitResourceID3(d.Id()) + if err != nil { + return err + } + + _, err = client.ServiceFlinkUpdateJarApplication(ctx, project, serviceName, applicationID, req) + if err != nil { + return err + } + + return flinkJarApplicationRead(ctx, d, client) +} + +func flinkJarApplicationDelete(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, applicationID, err := schemautil.SplitResourceID3(d.Id()) + if err != nil { + return err + } + + _, err = client.ServiceFlinkDeleteJarApplication(ctx, project, serviceName, applicationID) + return schemautil.OmitNotFound(err) +} diff --git a/internal/sdkprovider/service/flink/flink_jar_application_deployment.go b/internal/sdkprovider/service/flink/flink_jar_application_deployment.go new file mode 100644 index 000000000..edd87e33b --- /dev/null +++ b/internal/sdkprovider/service/flink/flink_jar_application_deployment.go @@ -0,0 +1,101 @@ +// Package flink is the package that contains the schema definitions for the Flink resources. +package flink + +import ( + "context" + "fmt" + "time" + + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/flinkjarapplicationdeployment" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + + "github.com/aiven/terraform-provider-aiven/internal/common" + "github.com/aiven/terraform-provider-aiven/internal/schemautil" +) + +// ResourceFlinkJarApplicationDeployment returns the schema for the Flink Jar Application Deployment resource. +func ResourceFlinkJarApplicationDeployment() *schema.Resource { + return &schema.Resource{ + Description: "Creates and manages the deployment of an Aiven for Apache Flink® application.", + CreateContext: common.WithGenClient(flinkApplicationDeploymentCreate), + ReadContext: common.WithGenClient(flinkApplicationDeploymentRead), + DeleteContext: common.WithGenClient(flinkApplicationDeploymentDelete), + Importer: &schema.ResourceImporter{ + StateContext: schema.ImportStatePassthroughContext, + }, + Timeouts: schemautil.DefaultResourceTimeouts(), + Schema: flinkJarApplicationDeploymentSchema(), + } +} + +func flinkApplicationDeploymentCreate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + var req flinkjarapplicationdeployment.ServiceFlinkCreateJarApplicationDeploymentIn + err := schemautil.ResourceDataGet(d, &req, schemautil.RenameAliases(flinkJarApplicationDeploymentRename())) + if err != nil { + return err + } + + project := d.Get("project").(string) + serviceName := d.Get("service_name").(string) + applicationID := d.Get("application_id").(string) + r, err := client.ServiceFlinkCreateJarApplicationDeployment(ctx, project, serviceName, applicationID, &req) + if err != nil { + return err + } + + d.SetId(schemautil.BuildResourceID(project, serviceName, applicationID, r.Id)) + return flinkApplicationDeploymentRead(ctx, d, client) +} + +func flinkApplicationDeploymentDelete(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, applicationID, deploymentID, err := schemautil.SplitResourceID4(d.Id()) + if err != nil { + return err + } + + // Flink Jar Application Deployment has a quite complicated state machine + // https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkGetJarApplicationDeployment + // Retries until succeeds or exceeds the timeout + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + _, err := client.ServiceFlinkGetJarApplicationDeployment(ctx, project, serviceName, applicationID, deploymentID) + if avngen.IsNotFound(err) { + return nil + } + + // Must be canceled before deleted + _, err = client.ServiceFlinkCancelJarApplicationDeployment(ctx, project, serviceName, applicationID, deploymentID) + if err != nil { + // Nothing to cancel. + // Completely ignores all errors, until it gets 404 on GET request + _, _ = client.ServiceFlinkDeleteJarApplicationDeployment(ctx, project, serviceName, applicationID, deploymentID) + } + + select { + case <-ctx.Done(): + // The context itself already comes with delete timeout + return fmt.Errorf("can't delete Flink Application Deployment: %w", ctx.Err()) + case <-ticker.C: + continue + } + } +} + +func flinkApplicationDeploymentRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, applicationID, deploymentID, err := schemautil.SplitResourceID4(d.Id()) + if err != nil { + return err + } + + rsp, err := client.ServiceFlinkGetJarApplicationDeployment(ctx, project, serviceName, applicationID, deploymentID) + if err != nil { + return err + } + + return schemautil.ResourceDataSet( + flinkJarApplicationDeploymentSchema(), d, rsp, + schemautil.RenameAliasesReverse(flinkJarApplicationDeploymentRename()), + ) +} diff --git a/internal/sdkprovider/service/flink/flink_jar_application_version.go b/internal/sdkprovider/service/flink/flink_jar_application_version.go new file mode 100644 index 000000000..f0d763d18 --- /dev/null +++ b/internal/sdkprovider/service/flink/flink_jar_application_version.go @@ -0,0 +1,226 @@ +package flink + +import ( + "context" + "crypto/sha256" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/flinkjarapplicationversion" + "github.com/avast/retry-go" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + + "github.com/aiven/terraform-provider-aiven/internal/common" + "github.com/aiven/terraform-provider-aiven/internal/schemautil" +) + +func ResourceFlinkJarApplicationVersion() *schema.Resource { + s := flinkJarApplicationVersionSchema() + s["source"] = &schema.Schema{ + Type: schema.TypeString, + Required: true, + Description: "The path to the jar file to upload.", + // Terraform required this to be set to true. + // Because the resource cannot be updated. + ForceNew: true, + DiffSuppressFunc: func(_, _, _ string, d *schema.ResourceData) bool { + // Ignores file renames. + // The checksum is used to detect changes. + // Doesn't suppress the diff for new resources. + return d.Id() != "" + }, + } + + s["source_checksum"] = &schema.Schema{ + Type: schema.TypeString, + Computed: true, + ForceNew: true, + Description: "The sha256 checksum of the jar file to upload.", + } + + return &schema.Resource{ + Description: "Creates and manages an Aiven for Apache Flink® jar application version.", + CreateContext: common.WithGenClient(flinkJarApplicationVersionCreate), + ReadContext: common.WithGenClient(flinkJarApplicationVersionRead), + DeleteContext: common.WithGenClient(flinkJarApplicationVersionDelete), + Importer: &schema.ResourceImporter{ + StateContext: schema.ImportStatePassthroughContext, + }, + Timeouts: schemautil.DefaultResourceTimeouts(), + Schema: s, + CustomizeDiff: func(_ context.Context, diff *schema.ResourceDiff, _ any) error { + sourcePath := diff.Get("source").(string) + checksum, err := filePathChecksum(sourcePath) + if err != nil { + return fmt.Errorf("failed to calculate checksum: %w", err) + } + return diff.SetNew("source_checksum", checksum) + }, + } +} + +func flinkJarApplicationVersionCreate(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project := d.Get("project").(string) + serviceName := d.Get("service_name").(string) + applicationID := d.Get("application_id").(string) + + rsp, err := client.ServiceFlinkCreateJarApplicationVersion(ctx, project, serviceName, applicationID) + if err != nil { + return err + } + + sourcePath := d.Get("source").(string) + sourceChecksum := d.Get("source_checksum").(string) + err = uploadFile(ctx, sourcePath, sourceChecksum, *rsp.FileInfo.Url) + if err != nil { + return fmt.Errorf("failed to upload file: %w", err) + } + + d.SetId(schemautil.BuildResourceID(project, serviceName, applicationID, rsp.Id)) + + // Waits until the file is uploaded. + // Retries until the context is canceled or the file is ready. + err = retry.Do( + func() error { + v, err := client.ServiceFlinkGetJarApplicationVersion(ctx, project, serviceName, applicationID, rsp.Id) + switch { + case avngen.IsNotFound(err): + // 404 means something went completely wrong. Retrying won't help + return retry.Unrecoverable(err) + case err != nil: + // The rest is retryable + return err + case v.FileInfo == nil: + // Not sure if this is possible. File info should always be present + return fmt.Errorf("file status is not ready") + case v.FileInfo.FileStatus != flinkjarapplicationversion.FileStatusTypeReady: + return fmt.Errorf("file status is not ready: %q", v.FileInfo.FileStatus) + } + + // Nothing to retry + return nil + }, + retry.Context(ctx), + retry.Delay(time.Second*5), + ) + + if err != nil { + return fmt.Errorf("failed to wait for jar application version: %w", err) + } + + return flinkJarApplicationVersionRead(ctx, d, client) +} + +func flinkJarApplicationVersionRead(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, applicationID, version, err := schemautil.SplitResourceID4(d.Id()) + if err != nil { + return err + } + + rsp, err := client.ServiceFlinkGetJarApplicationVersion(ctx, project, serviceName, applicationID, version) + if err != nil { + return schemautil.ResourceReadHandleNotFound(err, d) + } + + // This is for import. Triggers change detection. + err = d.Set("source_checksum", rsp.FileInfo.FileSha256) + if err != nil { + return err + } + + return schemautil.ResourceDataSet( + flinkJarApplicationVersionSchema(), d, rsp, + schemautil.RenameAliasesReverse(flinkJarApplicationVersionRename()), + ) +} + +func flinkJarApplicationVersionDelete(ctx context.Context, d *schema.ResourceData, client avngen.Client) error { + project, serviceName, applicationID, version, err := schemautil.SplitResourceID4(d.Id()) + if err != nil { + return err + } + + _, err = client.ServiceFlinkDeleteJarApplicationVersion(ctx, project, serviceName, applicationID, version) + return schemautil.OmitNotFound(err) +} + +func uploadFile(ctx context.Context, sourcePath, sourceChecksum, urlPath string) error { + file, err := os.Open(filepath.Clean(sourcePath)) + if err != nil { + return err + } + defer file.Close() + + size, err := fileSize(file) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, urlPath, file) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/java-archive") + req.Header.Set("Content-SHA256", sourceChecksum) + req.ContentLength = size + + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer rsp.Body.Close() + + b, err := io.ReadAll(rsp.Body) + if err != nil { + // This is a connection error or something else, not an API error + return err + } + + if len(b) > 0 { + // This is an API error + return fmt.Errorf("%s", b) + } + return nil +} + +func fileSize(file *os.File) (int64, error) { + stat, err := file.Stat() + if err != nil { + return 0, err + } + + if stat.IsDir() { + return 0, fmt.Errorf("file is a directory") + } + + return stat.Size(), nil +} + +func filePathChecksum(filePath string) (string, error) { + file, err := os.Open(filepath.Clean(filePath)) + if err != nil { + return "", err + } + defer file.Close() + return fileChecksum(file) +} + +func fileChecksum(file *os.File) (string, error) { + h := sha256.New() + _, err := io.Copy(h, file) + if err != nil { + return "", err + } + s := fmt.Sprintf("%x", h.Sum(nil)) + _, err = file.Seek(0, 0) + if err != nil { + return "", err + } + return s, nil +} diff --git a/internal/sdkprovider/service/flink/flink_jar_application_version_test.go b/internal/sdkprovider/service/flink/flink_jar_application_version_test.go new file mode 100644 index 000000000..b8fa4af67 --- /dev/null +++ b/internal/sdkprovider/service/flink/flink_jar_application_version_test.go @@ -0,0 +1,139 @@ +package flink_test + +import ( + "context" + "fmt" + "os" + "testing" + + avngen "github.com/aiven/go-client-codegen" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "github.com/hashicorp/terraform-plugin-testing/terraform" + "github.com/stretchr/testify/require" + + acc "github.com/aiven/terraform-provider-aiven/internal/acctest" + "github.com/aiven/terraform-provider-aiven/internal/schemautil" +) + +const jarFileURL = "https://github.com/streaming-with-flink/examples/raw/f7aa2f37d37bb3900cbea8bf6ccdc66ffac52838/download/examples-scala.jar" + +// TestAccAivenFlinkJarApplicationVersion_basic +// This test requires a jar file to run. +func TestAccAivenFlinkJarApplicationVersion_basic(t *testing.T) { + deps := acc.CommonTestDependencies(t) + _ = deps.IsBeta(true) + + jarFile := os.Getenv("AIVEN_TEST_FLINK_JAR_FILE") + if jarFile == "" { + remove, tmpFile, err := acc.DownloadTempFile(jarFileURL) + require.NoError(t, err) + jarFile = tmpFile + defer remove() + } + + project := os.Getenv("AIVEN_PROJECT_NAME") + resourceNameApp := "aiven_flink_jar_application.app" + resourceNameVersion := "aiven_flink_jar_application_version.version" + resourceNameDeployment := "aiven_flink_jar_application_deployment.deployment" + serviceName := fmt.Sprintf("test-acc-flink-%s", acc.RandStr()) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + ProtoV6ProviderFactories: acc.TestProtoV6ProviderFactories, + CheckDestroy: testAccCheckAivenFlinkJarApplicationVersionDestroy, + Steps: []resource.TestStep{ + { + Config: testAccFlinkJarApplicationVersionResource(project, serviceName, jarFile), + Check: resource.ComposeTestCheckFunc( + // aiven_flink_jar_application + resource.TestCheckResourceAttr(resourceNameApp, "project", project), + resource.TestCheckResourceAttr(resourceNameApp, "service_name", serviceName), + + // aiven_flink_jar_application_version + resource.TestCheckResourceAttr(resourceNameVersion, "project", project), + resource.TestCheckResourceAttr(resourceNameVersion, "service_name", serviceName), + resource.TestCheckResourceAttr(resourceNameVersion, "source", jarFile), + resource.TestCheckResourceAttrSet(resourceNameVersion, "application_id"), + resource.TestCheckResourceAttrSet(resourceNameVersion, "source_checksum"), + resource.TestCheckResourceAttr(resourceNameVersion, "file_info.0.file_status", "READY"), + + // aiven_flink_jar_application_deployment + resource.TestCheckResourceAttr(resourceNameDeployment, "project", project), + resource.TestCheckResourceAttr(resourceNameDeployment, "service_name", serviceName), + resource.TestCheckResourceAttrSet(resourceNameDeployment, "application_id"), + resource.TestCheckResourceAttrSet(resourceNameDeployment, "version_id"), + ), + }, + }, + }) +} + +func testAccCheckAivenFlinkJarApplicationVersionDestroy(s *terraform.State) error { + client, err := acc.GetTestGenAivenClient() + if err != nil { + return err + } + + ctx := context.Background() + for _, rs := range s.RootModule().Resources { + if rs.Type != "aiven_flink_jar_application_version" { + continue + } + + project, serviceName, applicationID, version, err := schemautil.SplitResourceID4(rs.Primary.ID) + if err != nil { + return err + } + + _, err = client.ServiceFlinkGetJarApplicationVersion(ctx, project, serviceName, applicationID, version) + if avngen.IsNotFound(err) { + return nil + } + + if err != nil { + return err + } + + return fmt.Errorf("flink jar application version (%s) still exists", rs.Primary.ID) + } + + return nil +} + +func testAccFlinkJarApplicationVersionResource(project, serviceName, exampleJar string) string { + return fmt.Sprintf(` +resource "aiven_flink" "service" { + project = %[1]q + service_name = %[2]q + cloud_name = "google-europe-west1" + plan = "business-4" + maintenance_window_dow = "monday" + maintenance_window_time = "04:00:00" + + flink_user_config { + custom_code = true + } +} + +resource "aiven_flink_jar_application" "app" { + project = %[1]q + service_name = %[2]q + name = "my-app-jar" + + depends_on = [aiven_flink.service] +} + +resource "aiven_flink_jar_application_version" "version" { + project = %[1]q + service_name = %[2]q + application_id = aiven_flink_jar_application.app.application_id + source = %[3]q +} + +resource "aiven_flink_jar_application_deployment" "deployment" { + project = %[1]q + service_name = %[2]q + application_id = aiven_flink_jar_application_version.version.application_id + version_id = aiven_flink_jar_application_version.version.application_version_id +} +`, project, serviceName, exampleJar) +} diff --git a/internal/sdkprovider/service/flink/schema.go b/internal/sdkprovider/service/flink/schema.go new file mode 100644 index 000000000..97c92914c --- /dev/null +++ b/internal/sdkprovider/service/flink/schema.go @@ -0,0 +1,379 @@ +// Code generated by user config generator. DO NOT EDIT. + +package flink + +import ( + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" +) + +func flinkJarApplicationRename() map[string]string { + return map[string]string{"application_id": "id"} +} + +func flinkJarApplicationSchema() map[string]*schema.Schema { + return map[string]*schema.Schema{ + "application_id": { + Computed: true, + Description: "Application ID.", + Type: schema.TypeString, + }, + "application_versions": { + Computed: true, + Description: "JarApplicationVersions.", + Elem: &schema.Resource{Schema: map[string]*schema.Schema{ + "created_at": { + Computed: true, + Description: "The creation timestamp of this entity in ISO 8601 format, always in UTC.", + Type: schema.TypeString, + }, + "created_by": { + Computed: true, + Description: "The creator of this entity.", + Type: schema.TypeString, + }, + "file_info": { + Computed: true, + Description: "Flink JarApplicationVersion FileInfo.", + Elem: &schema.Resource{Schema: map[string]*schema.Schema{ + "file_sha256": { + Computed: true, + Description: "sha256 of the file if known.", + Type: schema.TypeString, + }, + "file_size": { + Computed: true, + Description: "The size of the file in bytes.", + Type: schema.TypeInt, + }, + "file_status": { + Computed: true, + Description: "Indicates whether the uploaded .jar file has been verified by the system and deployment ready. The possible values are `INITIAL`, `READY` and `FAILED`.", + Type: schema.TypeString, + }, + "url": { + Computed: true, + Description: "The pre-signed url of the bucket where the .jar file is uploaded. Becomes null when the JarApplicationVersion is ready or failed.", + Type: schema.TypeString, + }, + "verify_error_code": { + Computed: true, + Description: "In the case file_status is FAILED, the error code of the failure. The possible values are `1`, `2` and `3`.", + Type: schema.TypeInt, + }, + "verify_error_message": { + Computed: true, + Description: "In the case file_status is FAILED, may contain details about the failure.", + Type: schema.TypeString, + }, + }}, + Type: schema.TypeList, + }, + "id": { + Computed: true, + Description: "ApplicationVersion ID.", + Type: schema.TypeString, + }, + "version": { + Computed: true, + Description: "Version number.", + Type: schema.TypeInt, + }, + }}, + Type: schema.TypeList, + }, + "created_at": { + Computed: true, + Description: "The creation timestamp of this entity in ISO 8601 format, always in UTC.", + Type: schema.TypeString, + }, + "created_by": { + Computed: true, + Description: "The creator of this entity.", + Type: schema.TypeString, + }, + "current_deployment": { + Computed: true, + Description: "Flink JarApplicationDeployment.", + Elem: &schema.Resource{Schema: map[string]*schema.Schema{ + "created_at": { + Computed: true, + Description: "The creation timestamp of this entity in ISO 8601 format, always in UTC.", + Type: schema.TypeString, + }, + "created_by": { + Computed: true, + Description: "The creator of this entity.", + Type: schema.TypeString, + }, + "entry_class": { + Computed: true, + Description: "The fully qualified name of the entry class to pass during Flink job submission through the entryClass parameter.", + Type: schema.TypeString, + }, + "error_msg": { + Computed: true, + Description: "Error message describing what caused deployment to fail.", + Type: schema.TypeString, + }, + "id": { + Computed: true, + Description: "Deployment ID.", + Type: schema.TypeString, + }, + "job_id": { + Computed: true, + Description: "Job ID.", + Type: schema.TypeString, + }, + "last_savepoint": { + Computed: true, + Description: "Job savepoint.", + Type: schema.TypeString, + }, + "parallelism": { + Computed: true, + Description: "Reading of Flink parallel execution documentation is recommended before setting this value to other than 1. Please do not set this value higher than (total number of nodes x number_of_task_slots), or every new job created will fail.", + Type: schema.TypeInt, + }, + "program_args": { + Computed: true, + Description: "Arguments to pass during Flink job submission through the programArgsList parameter.", + Elem: &schema.Schema{Type: schema.TypeString}, + Type: schema.TypeSet, + }, + "starting_savepoint": { + Computed: true, + Description: "Job savepoint.", + Type: schema.TypeString, + }, + "status": { + Computed: true, + Description: "Deployment status. The possible values are `CANCELED`, `CANCELLING`, `CANCELLING_REQUESTED`, `CREATED`, `DELETE_REQUESTED`, `DELETING`, `FAILED`, `FAILING`, `FINISHED`, `INITIALIZING`, `RECONCILING`, `RESTARTING`, `RUNNING`, `SAVING`, `SAVING_AND_STOP`, `SAVING_AND_STOP_REQUESTED` and `SUSPENDED`.", + Type: schema.TypeString, + }, + "version_id": { + Computed: true, + Description: "ApplicationVersion ID.", + Type: schema.TypeString, + }, + }}, + Type: schema.TypeList, + }, + "name": { + Description: "Application name. Maximum length: `128`.", + Required: true, + Type: schema.TypeString, + ValidateFunc: validation.StringLenBetween(0, 128), + }, + "project": { + Description: "Project name. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "service_name": { + Description: "Service name. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "updated_at": { + Computed: true, + Description: "The update timestamp of this entity in ISO 8601 format, always in UTC.", + Type: schema.TypeString, + }, + "updated_by": { + Computed: true, + Description: "The latest updater of this entity.", + Type: schema.TypeString, + }, + } +} + +func flinkJarApplicationDeploymentRename() map[string]string { + return map[string]string{"deployment_id": "id"} +} + +func flinkJarApplicationDeploymentSchema() map[string]*schema.Schema { + return map[string]*schema.Schema{ + "application_id": { + Description: "Application Id. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "created_at": { + Computed: true, + Description: "The creation timestamp of this entity in ISO 8601 format, always in UTC.", + Type: schema.TypeString, + }, + "created_by": { + Computed: true, + Description: "The creator of this entity.", + Type: schema.TypeString, + }, + "deployment_id": { + Computed: true, + Description: "Deployment ID.", + Type: schema.TypeString, + }, + "entry_class": { + Computed: true, + Description: "The fully qualified name of the entry class to pass during Flink job submission through the entryClass parameter. Maximum length: `128`.", + Optional: true, + Type: schema.TypeString, + ValidateFunc: validation.StringLenBetween(1, 128), + }, + "error_msg": { + Computed: true, + Description: "Error message describing what caused deployment to fail.", + Type: schema.TypeString, + }, + "job_id": { + Computed: true, + Description: "Job ID.", + Type: schema.TypeString, + }, + "last_savepoint": { + Computed: true, + Description: "Job savepoint.", + Type: schema.TypeString, + }, + "parallelism": { + Computed: true, + Description: "Reading of Flink parallel execution documentation is recommended before setting this value to other than 1. Please do not set this value higher than (total number of nodes x number_of_task_slots), or every new job created will fail.", + Optional: true, + Type: schema.TypeInt, + ValidateFunc: validation.IntBetween(1, 128), + }, + "program_args": { + Computed: true, + Description: "Arguments to pass during Flink job submission through the programArgsList parameter.", + Elem: &schema.Schema{Type: schema.TypeString}, + MaxItems: 32, + Optional: true, + Type: schema.TypeSet, + }, + "project": { + Description: "Project name. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "restart_enabled": { + Description: "Specifies whether a Flink Job is restarted in case it fails. Changing this property forces recreation of the resource.", + ForceNew: true, + Optional: true, + Type: schema.TypeBool, + }, + "service_name": { + Description: "Service name. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "starting_savepoint": { + Computed: true, + Description: "Job savepoint. Maximum length: `2048`.", + Optional: true, + Type: schema.TypeString, + ValidateFunc: validation.StringLenBetween(1, 2048), + }, + "status": { + Computed: true, + Description: "Deployment status. The possible values are `CANCELED`, `CANCELLING`, `CANCELLING_REQUESTED`, `CREATED`, `DELETE_REQUESTED`, `DELETING`, `FAILED`, `FAILING`, `FINISHED`, `INITIALIZING`, `RECONCILING`, `RESTARTING`, `RUNNING`, `SAVING`, `SAVING_AND_STOP`, `SAVING_AND_STOP_REQUESTED` and `SUSPENDED`.", + Type: schema.TypeString, + }, + "version_id": { + Description: "ApplicationVersion ID. Maximum length: `36`. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + } +} + +func flinkJarApplicationVersionRename() map[string]string { + return map[string]string{"application_version_id": "id"} +} + +func flinkJarApplicationVersionSchema() map[string]*schema.Schema { + return map[string]*schema.Schema{ + "application_id": { + Description: "Application Id. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "application_version_id": { + Computed: true, + Description: "ApplicationVersion ID.", + Type: schema.TypeString, + }, + "created_at": { + Computed: true, + Description: "The creation timestamp of this entity in ISO 8601 format, always in UTC.", + Type: schema.TypeString, + }, + "created_by": { + Computed: true, + Description: "The creator of this entity.", + Type: schema.TypeString, + }, + "file_info": { + Computed: true, + Description: "Flink JarApplicationVersion FileInfo.", + Elem: &schema.Resource{Schema: map[string]*schema.Schema{ + "file_sha256": { + Computed: true, + Description: "sha256 of the file if known.", + Type: schema.TypeString, + }, + "file_size": { + Computed: true, + Description: "The size of the file in bytes.", + Type: schema.TypeInt, + }, + "file_status": { + Computed: true, + Description: "Indicates whether the uploaded .jar file has been verified by the system and deployment ready. The possible values are `FAILED`, `INITIAL` and `READY`.", + Type: schema.TypeString, + }, + "url": { + Computed: true, + Description: "The pre-signed url of the bucket where the .jar file is uploaded. Becomes null when the JarApplicationVersion is ready or failed.", + Type: schema.TypeString, + }, + "verify_error_code": { + Computed: true, + Description: "In the case file_status is FAILED, the error code of the failure. The possible values are `1`, `2` and `3`.", + Type: schema.TypeInt, + }, + "verify_error_message": { + Computed: true, + Description: "In the case file_status is FAILED, may contain details about the failure.", + Type: schema.TypeString, + }, + }}, + Type: schema.TypeList, + }, + "project": { + Description: "Project name. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "service_name": { + Description: "Service name. Changing this property forces recreation of the resource.", + ForceNew: true, + Required: true, + Type: schema.TypeString, + }, + "version": { + Computed: true, + Description: "Version number.", + Type: schema.TypeInt, + }, + } +}