diff --git a/.gitignore b/.gitignore index 0f002b5144..3c227bee64 100644 --- a/.gitignore +++ b/.gitignore @@ -55,7 +55,7 @@ go.work.sum .LSOverride # Icon must end with two \r -Icon +Icon # Thumbnails ._* diff --git a/internal/flink/command_statement.go b/internal/flink/command_statement.go index 4d8af68248..0f46b12c47 100644 --- a/internal/flink/command_statement.go +++ b/internal/flink/command_statement.go @@ -31,7 +31,6 @@ func (c *command) newStatementCommand() *cobra.Command { cmd.AddCommand(c.newStatementDescribeCommand()) cmd.AddCommand(c.newStatementExceptionCommand()) cmd.AddCommand(c.newStatementListCommand()) - cmd.AddCommand(c.newStatementResumeCommand()) cmd.AddCommand(c.newStatementStopCommand()) cmd.AddCommand(c.newStatementUpdateCommand()) diff --git a/internal/flink/command_statement_resume.go b/internal/flink/command_statement_resume.go deleted file mode 100644 index b294a73867..0000000000 --- a/internal/flink/command_statement_resume.go +++ /dev/null @@ -1,100 +0,0 @@ -package flink - -import ( - "fmt" - - "github.com/spf13/cobra" - - flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" - - pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/examples" - "github.com/confluentinc/cli/v4/pkg/output" - "github.com/confluentinc/cli/v4/pkg/resource" -) - -func (c *command) newStatementResumeCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "resume ", - Short: "Resume a Flink SQL statement.", - Args: cobra.ExactArgs(1), - ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStatementArgs), - RunE: c.statementResume, - Example: examples.BuildExampleString( - examples.Example{ - Text: `Request to resume statement "my-statement" with original principal id and compute pool.`, - Code: "confluent flink statement resume my-statement", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" with service account "sa-123456".`, - Code: "confluent flink statement resume my-statement --principal sa-123456", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" with user account "u-987654".`, - Code: "confluent flink statement resume my-statement --principal u-987654", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" and move to compute pool "lfcp-123456".`, - Code: "confluent flink statement resume my-statement --compute-pool lfcp-123456", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" with service account "sa-123456" and move to compute pool "lfcp-123456".`, - Code: "confluent flink statement resume my-statement --principal sa-123456 --compute-pool lfcp-123456", - }, - ), - } - - c.addPrincipalFlag(cmd) - c.addComputePoolFlag(cmd) - pcmd.AddCloudFlag(cmd) - pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) - pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) - pcmd.AddContextFlag(cmd, c.CLICommand) - - return cmd -} - -func (c *command) statementResume(cmd *cobra.Command, args []string) error { - environmentId, err := c.Context.EnvironmentId() - if err != nil { - return err - } - - client, err := c.GetFlinkGatewayClient(false) - if err != nil { - return err - } - - statement, err := client.GetStatement(environmentId, args[0], c.Context.GetCurrentOrganization()) - if err != nil { - return err - } - - // Support resume a Flink statement with a different principal and/or compute-pool - principal, err := cmd.Flags().GetString("principal") - if err != nil { - return err - } - if principal != "" { - statement.Spec.SetPrincipal(principal) - } - - computePool, err := cmd.Flags().GetString("compute-pool") - if err != nil { - return err - } - if computePool != "" { - statement.Spec.SetComputePoolId(computePool) - } - - statement.Spec.Stopped = flinkgatewayv1.PtrBool(false) - - // the UPDATE statement is an async API - // An accepted response 202 doesn't necessarily mean the UPDATE will be successful/complete - if err := client.UpdateStatement(environmentId, args[0], c.Context.GetCurrentOrganization(), statement); err != nil { - return fmt.Errorf("failed to resume %s \"%s\": %w", resource.FlinkStatement, args[0], err) - } - - output.Printf(c.Config.EnableColor, "Requested to resume %s \"%s\".\n", resource.FlinkStatement, args[0]) - return nil -} diff --git a/internal/flink/command_statement_update.go b/internal/flink/command_statement_update.go index ce1011f011..122f134409 100644 --- a/internal/flink/command_statement_update.go +++ b/internal/flink/command_statement_update.go @@ -28,25 +28,9 @@ func (c *command) newStatementUpdateCommand() *cobra.Command { Code: "confluent flink statement update my-statement --compute-pool lfcp-123456", }, examples.Example{ - Text: `Request to resume statement "my-statement" with original principal id and compute pool.`, + Text: `Request to resume statement "my-statement".`, Code: "confluent flink statement update my-statement --stopped=false", }, - examples.Example{ - Text: `Request to resume statement "my-statement" with service account "sa-123456".`, - Code: "confluent flink statement update my-statement --stopped=false --principal sa-123456", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" with user account "u-987654".`, - Code: "confluent flink statement update my-statement --stopped=false --principal u-987654", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" and move to compute pool "lfcp-123456".`, - Code: "confluent flink statement update my-statement --stopped=false --compute-pool lfcp-123456", - }, - examples.Example{ - Text: `Request to resume statement "my-statement" with service account "sa-123456" and move to compute pool "lfcp-123456".`, - Code: "confluent flink statement update my-statement --stopped=false --principal sa-123456 --compute-pool lfcp-123456", - }, examples.Example{ Text: `Request to stop statement "my-statement".`, Code: "confluent flink statement update my-statement --stopped=true", @@ -56,7 +40,7 @@ func (c *command) newStatementUpdateCommand() *cobra.Command { c.addPrincipalFlag(cmd) c.addComputePoolFlag(cmd) - cmd.Flags().Bool("stopped", false, "Request to stop or resume the statement.") + cmd.Flags().Bool("stopped", false, "Request to stop the statement.") pcmd.AddCloudFlag(cmd) pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) @@ -136,10 +120,8 @@ func (c *command) statementUpdate(cmd *cobra.Command, args []string) error { statement.Spec.SetStopped(stopped) } - // the UPDATE statement is an async API - // An accepted response 202 doesn't necessarily mean the UPDATE will be successful/complete if err := client.UpdateStatement(environmentId, args[0], c.Context.GetCurrentOrganization(), statement); err != nil { - return fmt.Errorf("failed to update %s \"%s\": %w", resource.FlinkStatement, args[0], err) + return err } output.Printf(c.Config.EnableColor, "Requested to update %s \"%s\".\n", resource.FlinkStatement, args[0]) diff --git a/test/fixtures/output/flink/statement/help.golden b/test/fixtures/output/flink/statement/help.golden index 0128ad7ead..e0f1ff58c8 100644 --- a/test/fixtures/output/flink/statement/help.golden +++ b/test/fixtures/output/flink/statement/help.golden @@ -9,7 +9,6 @@ Available Commands: describe Describe a Flink SQL statement. exception Manage Flink SQL statement exceptions. list List Flink SQL statements. - resume Resume a Flink SQL statement. stop Stop a Flink SQL statement. update Update a Flink SQL statement. diff --git a/test/fixtures/output/flink/statement/resume-help.golden b/test/fixtures/output/flink/statement/resume-help.golden deleted file mode 100644 index d9a5507677..0000000000 --- a/test/fixtures/output/flink/statement/resume-help.golden +++ /dev/null @@ -1,38 +0,0 @@ -Resume a Flink SQL statement. - -Usage: - confluent flink statement resume [flags] - -Examples: -Request to resume statement "my-statement" with original principal id and compute pool. - - $ confluent flink statement resume my-statement - -Request to resume statement "my-statement" with service account "sa-123456". - - $ confluent flink statement resume my-statement --principal sa-123456 - -Request to resume statement "my-statement" with user account "u-987654". - - $ confluent flink statement resume my-statement --principal u-987654 - -Request to resume statement "my-statement" and move to compute pool "lfcp-123456". - - $ confluent flink statement resume my-statement --compute-pool lfcp-123456 - -Request to resume statement "my-statement" with service account "sa-123456" and move to compute pool "lfcp-123456". - - $ confluent flink statement resume my-statement --principal sa-123456 --compute-pool lfcp-123456 - -Flags: - --principal string A user or service account the statement runs as. - --compute-pool string Flink compute pool ID. - --cloud string Specify the cloud provider as "aws", "azure", or "gcp". - --region string Cloud region for Flink (use "confluent flink region list" to see all). - --environment string Environment ID. - --context string CLI context name. - -Global Flags: - -h, --help Show help for this command. - --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. - -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/statement/resume-invalid-compute-pool.golden b/test/fixtures/output/flink/statement/resume-invalid-compute-pool.golden deleted file mode 100644 index 033251b8f2..0000000000 --- a/test/fixtures/output/flink/statement/resume-invalid-compute-pool.golden +++ /dev/null @@ -1 +0,0 @@ -Error: failed to resume Flink SQL statement "my-statement": logical compute pool=lfcp-654321 not found diff --git a/test/fixtures/output/flink/statement/resume-invalid-principal.golden b/test/fixtures/output/flink/statement/resume-invalid-principal.golden deleted file mode 100644 index 97e7545fbb..0000000000 --- a/test/fixtures/output/flink/statement/resume-invalid-principal.golden +++ /dev/null @@ -1 +0,0 @@ -Error: failed to resume Flink SQL statement "my-statement": Bad Request diff --git a/test/fixtures/output/flink/statement/resume-valid.golden b/test/fixtures/output/flink/statement/resume-valid.golden deleted file mode 100644 index 295dd74014..0000000000 --- a/test/fixtures/output/flink/statement/resume-valid.golden +++ /dev/null @@ -1 +0,0 @@ -Requested to resume Flink SQL statement "my-statement". diff --git a/test/fixtures/output/flink/statement/update-help.golden b/test/fixtures/output/flink/statement/update-help.golden index 79353da849..06a3d2bdf9 100644 --- a/test/fixtures/output/flink/statement/update-help.golden +++ b/test/fixtures/output/flink/statement/update-help.golden @@ -12,26 +12,10 @@ Request to move "my-statement" to compute pool "lfcp-123456". $ confluent flink statement update my-statement --compute-pool lfcp-123456 -Request to resume statement "my-statement" with original principal id and compute pool. +Request to resume statement "my-statement". $ confluent flink statement update my-statement --stopped=false -Request to resume statement "my-statement" with service account "sa-123456". - - $ confluent flink statement update my-statement --stopped=false --principal sa-123456 - -Request to resume statement "my-statement" with user account "u-987654". - - $ confluent flink statement update my-statement --stopped=false --principal u-987654 - -Request to resume statement "my-statement" and move to compute pool "lfcp-123456". - - $ confluent flink statement update my-statement --stopped=false --compute-pool lfcp-123456 - -Request to resume statement "my-statement" with service account "sa-123456" and move to compute pool "lfcp-123456". - - $ confluent flink statement update my-statement --stopped=false --principal sa-123456 --compute-pool lfcp-123456 - Request to stop statement "my-statement". $ confluent flink statement update my-statement --stopped=true @@ -39,7 +23,7 @@ Request to stop statement "my-statement". Flags: --principal string A user or service account the statement runs as. --compute-pool string Flink compute pool ID. - --stopped Request to stop or resume the statement. + --stopped Request to stop the statement. --cloud string Specify the cloud provider as "aws", "azure", or "gcp". --region string Cloud region for Flink (use "confluent flink region list" to see all). --environment string Environment ID. diff --git a/test/fixtures/output/flink/statement/update-invalid-compute-pool.golden b/test/fixtures/output/flink/statement/update-invalid-compute-pool.golden deleted file mode 100644 index 730db8775a..0000000000 --- a/test/fixtures/output/flink/statement/update-invalid-compute-pool.golden +++ /dev/null @@ -1 +0,0 @@ -Error: failed to update Flink SQL statement "my-statement": logical compute pool=lfcp-654321 not found diff --git a/test/fixtures/output/flink/statement/update-invalid-principal.golden b/test/fixtures/output/flink/statement/update-invalid-principal.golden deleted file mode 100644 index c30b4121cc..0000000000 --- a/test/fixtures/output/flink/statement/update-invalid-principal.golden +++ /dev/null @@ -1 +0,0 @@ -Error: failed to update Flink SQL statement "my-statement": Bad Request diff --git a/test/flink_test.go b/test/flink_test.go index 64bfbcedfd..c35420ce92 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -224,20 +224,9 @@ func (s *CLITestSuite) TestFlinkStatement() { {args: "flink statement list --cloud aws --region eu-west-1 --compute-pool lfcp-nonexistent", fixture: "flink/statement/list-cp-not-found.golden", exitCode: 1}, {args: "flink statement list --cloud aws --region eu-west-2 --compute-pool lfcp-123456", fixture: "flink/statement/list-cp-incorrect-region.golden", exitCode: 1}, {args: "flink statement stop my-statement --cloud aws --region eu-west-1", fixture: "flink/statement/stop.golden"}, - {args: "flink statement resume my-statement --cloud aws --region eu-west-1", fixture: "flink/statement/resume-valid.golden"}, - {args: "flink statement resume my-statement --cloud aws --region eu-west-1 --principal u-123456", fixture: "flink/statement/resume-valid.golden"}, - {args: "flink statement resume my-statement --cloud aws --region eu-west-1 --compute-pool lfcp-123456", fixture: "flink/statement/resume-valid.golden"}, - {args: "flink statement resume my-statement --cloud aws --region eu-west-1 --principal u-123456 --compute-pool lfcp-123456", fixture: "flink/statement/resume-valid.golden"}, - {args: "flink statement resume my-statement --cloud aws --region eu-west-1 --principal sa-654321", fixture: "flink/statement/resume-invalid-principal.golden", exitCode: 1}, - {args: "flink statement resume my-statement --cloud aws --region eu-west-1 --compute-pool lfcp-654321", fixture: "flink/statement/resume-invalid-compute-pool.golden", exitCode: 1}, {args: "flink statement update my-statement --cloud aws --region eu-west-1 --compute-pool lfcp-123456", fixture: "flink/statement/update-compute-pool.golden"}, - {args: "flink statement update my-statement --cloud aws --region eu-west-1 --principal u-123456", fixture: "flink/statement/update-principal.golden"}, + {args: "flink statement update my-statement --cloud aws --region eu-west-1 --principal sa-123456", fixture: "flink/statement/update-principal.golden"}, {args: "flink statement update my-statement --cloud aws --region eu-west-1 --stopped=false", fixture: "flink/statement/update-stopped.golden"}, - {args: "flink statement update my-statement --cloud aws --region eu-west-1 --stopped=false --principal u-123456", fixture: "flink/statement/update-stopped.golden"}, - {args: "flink statement update my-statement --cloud aws --region eu-west-1 --stopped=false --compute-pool lfcp-123456", fixture: "flink/statement/update-stopped.golden"}, - {args: "flink statement update my-statement --cloud aws --region eu-west-1 --stopped=false --compute-pool lfcp-123456 --principal u-123456", fixture: "flink/statement/update-stopped.golden"}, - {args: "flink statement update my-statement --cloud aws --region eu-west-1 --stopped=false --compute-pool lfcp-654321", fixture: "flink/statement/update-invalid-compute-pool.golden", exitCode: 1}, - {args: "flink statement update my-statement --cloud aws --region eu-west-1 --stopped=false --principal u-654321", fixture: "flink/statement/update-invalid-principal.golden", exitCode: 1}, } for _, test := range tests { diff --git a/test/test-server/flink_gateway_router.go b/test/test-server/flink_gateway_router.go index f245a5af73..56128bb735 100644 --- a/test/test-server/flink_gateway_router.go +++ b/test/test-server/flink_gateway_router.go @@ -2,7 +2,6 @@ package testserver import ( "encoding/json" - "fmt" "net/http" "strings" "testing" @@ -15,11 +14,6 @@ import ( flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" ) -const ( - validFlinkStatementPrincipalId = "u-123456" - validFlinkStatementComputePoolId = "lfcp-123456" -) - var flinkGatewayRoutes = []route{ {"/sql/v1/organizations/{organization_id}/environments/{environment}/statements", handleSqlEnvironmentsEnvironmentStatements}, {"/sql/v1/organizations/{organization_id}/environments/{environment}/statements/{statement}", handleSqlEnvironmentsEnvironmentStatementsStatement}, @@ -129,7 +123,6 @@ func handleSqlEnvironmentsEnvironmentConnectionsConnection(t *testing.T) http.Ha } } -// Handler for "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements" func handleSqlEnvironmentsEnvironmentStatements(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { @@ -138,7 +131,7 @@ func handleSqlEnvironmentsEnvironmentStatements(t *testing.T) http.HandlerFunc { Name: flinkgatewayv1.PtrString("11111111-1111-1111-1"), Spec: &flinkgatewayv1.SqlV1StatementSpec{ Statement: flinkgatewayv1.PtrString("CREATE TABLE test;"), - ComputePoolId: flinkgatewayv1.PtrString(validFlinkStatementComputePoolId), + ComputePoolId: flinkgatewayv1.PtrString("lfcp-123456"), }, Status: &flinkgatewayv1.SqlV1StatementStatus{ Phase: "COMPLETED", @@ -153,7 +146,7 @@ func handleSqlEnvironmentsEnvironmentStatements(t *testing.T) http.HandlerFunc { Name: flinkgatewayv1.PtrString("22222222-2222-2222-2"), Spec: &flinkgatewayv1.SqlV1StatementSpec{ Statement: flinkgatewayv1.PtrString("CREATE TABLE test;"), - ComputePoolId: flinkgatewayv1.PtrString(validFlinkStatementComputePoolId), + ComputePoolId: flinkgatewayv1.PtrString("lfcp-123456"), }, Status: &flinkgatewayv1.SqlV1StatementStatus{ Phase: "COMPLETED", @@ -174,7 +167,7 @@ func handleSqlEnvironmentsEnvironmentStatements(t *testing.T) http.HandlerFunc { require.NoError(t, err) statement.Metadata = &flinkgatewayv1.StatementObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))} - statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(validFlinkStatementComputePoolId) + statement.Spec.ComputePoolId = flinkgatewayv1.PtrString("lfcp-123456") statement.Status = &flinkgatewayv1.SqlV1StatementStatus{Phase: "PENDING"} err = json.NewEncoder(w).Encode(statement) @@ -202,22 +195,7 @@ func handleSqlEnvironmentsEnvironmentStatementExceptions(t *testing.T) http.Hand } } -// Handler for "/sql/v1/organizations/{organization_id}/environments/{environment_id}/statements/{statement_name}" func handleSqlEnvironmentsEnvironmentStatementsStatement(t *testing.T) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - handleStatementGet(t)(w, r) - case http.MethodPut: - handleStatementUpdate(t)(w, r) - case http.MethodDelete: - w.WriteHeader(http.StatusNoContent) - return - } - } -} - -func handleStatementGet(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { statement := flinkgatewayv1.SqlV1Statement{ Name: flinkgatewayv1.PtrString(mux.Vars(r)["statement"]), @@ -227,8 +205,8 @@ func handleStatementGet(t *testing.T) http.HandlerFunc { "sql.current-catalog": "default", "sql.current-database": "my-cluster", }, - ComputePoolId: flinkgatewayv1.PtrString(validFlinkStatementComputePoolId), - Principal: flinkgatewayv1.PtrString(validFlinkStatementPrincipalId), + ComputePoolId: flinkgatewayv1.PtrString("lfcp-123456"), + Principal: flinkgatewayv1.PtrString("u-123456"), }, Status: &flinkgatewayv1.SqlV1StatementStatus{ Phase: "COMPLETED", @@ -245,41 +223,3 @@ func handleStatementGet(t *testing.T) http.HandlerFunc { require.NoError(t, err) } } - -func handleStatementUpdate(t *testing.T) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - req := new(flinkgatewayv1.SqlV1Statement) - err := json.NewDecoder(r.Body).Decode(req) - require.NoError(t, err) - - stopped := req.Spec.GetStopped() - principal := req.Spec.GetPrincipal() - computePool := req.Spec.GetComputePoolId() - - // Handle the stop case, principal and computerPool shouldn't matter - if stopped { - w.WriteHeader(http.StatusAccepted) - err = json.NewEncoder(w).Encode(flinkgatewayv1.NewSqlV1Statement()) - require.NoError(t, err) - return - } - - // Handle the resume case: invalid principal ID - if principal != "" && principal != validFlinkStatementPrincipalId { - w.WriteHeader(http.StatusBadRequest) - err = writeError(w, "Bad Request") - require.NoError(t, err) - return - } - - // Handle the resume case: invalid compute pool - if computePool != "" && computePool != validFlinkStatementComputePoolId { - w.WriteHeader(http.StatusBadRequest) - err = writeError(w, fmt.Sprintf("logical compute pool=%s not found", computePool)) - require.NoError(t, err) - return - } - - w.WriteHeader(http.StatusAccepted) - } -}