Skip to content

Commit

Permalink
Allow flink onprem commands to work with MDS auth
Browse files Browse the repository at this point in the history
  • Loading branch information
vsantwana committed Jan 15, 2025
1 parent caab8cd commit 3070a82
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 54 deletions.
21 changes: 18 additions & 3 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package flink

import (
"context"
"fmt"

"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/config"
"github.com/confluentinc/cli/v4/pkg/featureflags"
)

type command struct {
*pcmd.AuthenticatedCLICommand
isMDSAuth bool
}

func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
Expand All @@ -20,11 +24,15 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
Short: "Manage Apache Flink.",
}

c := &command{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}
c := &command{pcmd.NewAuthenticatedCLICommand(cmd, prerunner), false}

// On-prem commands are able to run with or without login. Accordingly, set the pre-runner.
if !cfg.IsCloudLogin() {
// On-prem commands don't require login, so change the pre-runner to account for that.
cmd.PersistentPreRunE = prerunner.Anonymous(c.AuthenticatedCLICommand.CLICommand, false)
if cfg.IsOnPremLogin() {
c = &command{pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner), true}
} else {
cmd.PersistentPreRunE = prerunner.Anonymous(c.AuthenticatedCLICommand.CLICommand, false)
}
}

// Cloud Specific Commands
Expand Down Expand Up @@ -104,3 +112,10 @@ func addCmfFlagSet(cmd *cobra.Command) {
cmd.Flags().String("client-cert-path", "", `Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.`)
cmd.Flags().String("certificate-authority-path", "", `Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.`)
}

func (c *command) createContext() context.Context {
if !c.isMDSAuth {
return context.Background()
}
return context.WithValue(context.Background(), cmfsdk.ContextAccessToken, c.Context.GetAuthToken())
}
2 changes: 1 addition & 1 deletion internal/flink/command_application_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *command) applicationCreate(cmd *cobra.Command, args []string) error {
return err
}

outputApplication, err := client.CreateApplication(cmd.Context(), environment, application)
outputApplication, err := client.CreateApplication(c.createContext(), environment, application)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/flink/command_application_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *command) applicationDelete(cmd *cobra.Command, args []string) error {
}

existenceFunc := func(name string) bool {
_, err := client.DescribeApplication(cmd.Context(), environment, name)
_, err := client.DescribeApplication(c.createContext(), environment, name)
return err == nil
}

Expand All @@ -50,7 +50,7 @@ func (c *command) applicationDelete(cmd *cobra.Command, args []string) error {
}

deleteFunc := func(name string) error {
return client.DeleteApplication(cmd.Context(), environment, name)
return client.DeleteApplication(c.createContext(), environment, name)
}

_, err = deletion.Delete(args, deleteFunc, resource.FlinkApplication)
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *command) applicationDescribe(cmd *cobra.Command, args []string) error {
}

applicationName := args[0]
application, err := client.DescribeApplication(cmd.Context(), environment, applicationName)
application, err := client.DescribeApplication(c.createContext(), environment, applicationName)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *command) applicationList(cmd *cobra.Command, _ []string) error {
return err
}

applications, err := client.ListApplications(cmd.Context(), environment)
applications, err := client.ListApplications(c.createContext(), environment)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *command) applicationUpdate(cmd *cobra.Command, args []string) error {
return err
}

outputApplication, err := client.UpdateApplication(cmd.Context(), environment, application)
outputApplication, err := client.UpdateApplication(c.createContext(), environment, application)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_webui.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err

// Get the name of the application and check for its existence
applicationName := args[0]
_, err = cmfClient.DescribeApplication(cmd.Context(), environment, applicationName)
_, err = cmfClient.DescribeApplication(c.createContext(), environment, applicationName)
if err != nil {
return fmt.Errorf(`failed to forward web UI: %s`, err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}
postEnvironment.KubernetesNamespace = kubernetesNamespace

outputEnvironment, err := client.CreateEnvironment(cmd.Context(), postEnvironment)
outputEnvironment, err := client.CreateEnvironment(c.createContext(), postEnvironment)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/flink/command_environment_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *command) environmentDelete(cmd *cobra.Command, args []string) error {
}

existenceFunc := func(name string) bool {
_, err := client.DescribeEnvironment(cmd.Context(), name)
_, err := client.DescribeEnvironment(c.createContext(), name)
return err == nil
}

Expand All @@ -44,7 +44,7 @@ func (c *command) environmentDelete(cmd *cobra.Command, args []string) error {
}

deleteFunc := func(name string) error {
return client.DeleteEnvironment(cmd.Context(), name)
return client.DeleteEnvironment(c.createContext(), name)
}

_, err = deletion.Delete(args, deleteFunc, resource.FlinkEnvironment)
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *command) environmentDescribe(cmd *cobra.Command, args []string) error {

// Get the name of the environment to be retrieved
environmentName := args[0]
environment, err := client.DescribeEnvironment(cmd.Context(), environmentName)
environment, err := client.DescribeEnvironment(c.createContext(), environmentName)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *command) environmentList(cmd *cobra.Command, _ []string) error {
return err
}

environments, err := client.ListEnvironments(cmd.Context())
environments, err := client.ListEnvironments(c.createContext())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
environment.FlinkApplicationDefaults = defaultsParsed
}

outputEnvironment, err := client.UpdateEnvironment(cmd.Context(), environment)
outputEnvironment, err := client.UpdateEnvironment(c.createContext(), environment)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/output/flink/environment/list-cloud.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Error: you must log out of Confluent Cloud to use this command

Suggestions:
Log out with `confluent logout`.

73 changes: 35 additions & 38 deletions test/flink_onprem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ import (
"github.com/confluentinc/cli/v4/pkg/auth"
)

// Runs the integration test with login = "" and login = "onprem"
func runIntegrationTestsWithMultipleAuth(s *CLITestSuite, tests []CLITest) {
for _, test := range tests {
test.login = ""
s.T().Setenv("LOGIN_TYPE", "")
s.runIntegrationTest(test)

test.name = test.args + "-onprem"
test.login = "onprem"
s.T().Setenv("LOGIN_TYPE", "onprem")
s.runIntegrationTest(test)
}
}

func (s *CLITestSuite) TestFlinkApplicationList() {
tests := []CLITest{
// failure scenarios
Expand All @@ -17,9 +31,7 @@ func (s *CLITestSuite) TestFlinkApplicationList() {
{args: "flink application list --environment default --output human", fixture: "flink/application/list-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationDelete() {
Expand All @@ -35,9 +47,7 @@ func (s *CLITestSuite) TestFlinkApplicationDelete() {
{args: "flink application delete --environment delete-test default-application-1 non-existent --force", fixture: "flink/application/delete-mixed.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentList() {
Expand All @@ -47,9 +57,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentList() {
{args: "flink environment list --output human", fixture: "flink/environment/list-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentDelete() {
Expand All @@ -63,9 +71,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentDelete() {
{args: "flink environment delete default non-existent --force", fixture: "flink/environment/delete-mixed.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationCreate() {
Expand All @@ -81,9 +87,7 @@ func (s *CLITestSuite) TestFlinkApplicationCreate() {
{args: "flink application create --environment default test/fixtures/input/flink/application/create-new.json --output human", fixture: "flink/application/create-with-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationUpdate() {
Expand All @@ -100,9 +104,7 @@ func (s *CLITestSuite) TestFlinkApplicationUpdate() {
{args: "flink application update --environment default test/fixtures/input/flink/application/update-successful.json --output human", fixture: "flink/application/update-with-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentCreate() {
Expand All @@ -118,9 +120,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentCreate() {
{args: "flink environment create default", fixture: "flink/environment/create-no-namespace.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentUpdate() {
Expand All @@ -135,9 +135,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentUpdate() {
{args: "flink environment update get-failure", fixture: "flink/environment/update-get-failure.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentDescribe() {
Expand All @@ -151,9 +149,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentDescribe() {
{args: "flink environment describe", fixture: "flink/environment/describe-no-environment.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationDescribe() {
Expand All @@ -169,9 +165,7 @@ func (s *CLITestSuite) TestFlinkApplicationDescribe() {
{args: "flink application describe default-application-1", fixture: "flink/application/describe-no-environment.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationWebUiForward() {
Expand All @@ -182,14 +176,17 @@ func (s *CLITestSuite) TestFlinkApplicationWebUiForward() {
{args: "flink --url dummy-url application web-ui-forward non-existent --environment default", fixture: "flink/application/forward-nonexistent-application.golden", exitCode: 1},
{args: "flink --url dummy-url application web-ui-forward default-application-1 --environment non-existent", fixture: "flink/application/forward-nonexistent-environment.golden", exitCode: 1},
{args: "flink --url dummy-url application web-ui-forward get-failure --environment default", fixture: "flink/application/forward-get-failure.golden", exitCode: 1},
{name: "no-url-set", args: "flink application web-ui-forward --environment does-not-matter missing-applications", fixture: "flink/application/url-missing.golden", exitCode: 1},
}

for _, test := range tests {
if test.name == "no-url-set" {
// unset the environment variable
os.Unsetenv(auth.ConfluentPlatformCmfURL)
}
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)

noUrlSetTest := CLITest{name: "no-url-set", args: "flink application web-ui-forward --environment does-not-matter missing-applications", fixture: "flink/application/url-missing.golden", exitCode: 1}
// unset the environment variable
os.Unsetenv(auth.ConfluentPlatformCmfURL)
s.runIntegrationTest(noUrlSetTest)
}

func (s *CLITestSuite) TestFlinkOnPremWithCloudLogin() {
test := CLITest{args: "flink environment list --output json", fixture: "flink/environment/list-cloud.golden", login: "cloud", exitCode: 1}
s.runIntegrationTest(test)
}
Loading

0 comments on commit 3070a82

Please sign in to comment.