diff --git a/internal/flink/command.go b/internal/flink/command.go index 3319746d6b..67e0a54a3f 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -1,10 +1,13 @@ package flink import ( + "context" "fmt" "github.com/spf13/cobra" + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/config" "github.com/confluentinc/cli/v4/pkg/featureflags" @@ -22,8 +25,10 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { c := &command{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} - if !cfg.IsCloudLogin() { - // On-prem commands don't require login, so change the pre-runner to account for that. + // On-prem commands are able to run with or without login. Accordingly, set the pre-runner. + if cfg.IsOnPremLogin() { + c = &command{pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner)} + } else if !cfg.IsCloudLogin() { cmd.PersistentPreRunE = prerunner.Anonymous(c.AuthenticatedCLICommand.CLICommand, false) } @@ -104,3 +109,10 @@ func addCmfFlagSet(cmd *cobra.Command) { cmd.Flags().String("client-cert-path", "", `Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.`) cmd.Flags().String("certificate-authority-path", "", `Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.`) } + +func (c *command) createContext() context.Context { + if !c.Config.IsOnPremLogin() { + return context.Background() + } + return context.WithValue(context.Background(), cmfsdk.ContextAccessToken, c.Context.GetAuthToken()) +} diff --git a/internal/flink/command_application_create.go b/internal/flink/command_application_create.go index de4f508eef..7b85880e6f 100644 --- a/internal/flink/command_application_create.go +++ b/internal/flink/command_application_create.go @@ -66,7 +66,7 @@ func (c *command) applicationCreate(cmd *cobra.Command, args []string) error { return err } - outputApplication, err := client.CreateApplication(cmd.Context(), environment, application) + outputApplication, err := client.CreateApplication(c.createContext(), environment, application) if err != nil { return err } diff --git a/internal/flink/command_application_delete.go b/internal/flink/command_application_delete.go index 1b26f77827..516d987123 100644 --- a/internal/flink/command_application_delete.go +++ b/internal/flink/command_application_delete.go @@ -37,7 +37,7 @@ func (c *command) applicationDelete(cmd *cobra.Command, args []string) error { } existenceFunc := func(name string) bool { - _, err := client.DescribeApplication(cmd.Context(), environment, name) + _, err := client.DescribeApplication(c.createContext(), environment, name) return err == nil } @@ -50,7 +50,7 @@ func (c *command) applicationDelete(cmd *cobra.Command, args []string) error { } deleteFunc := func(name string) error { - return client.DeleteApplication(cmd.Context(), environment, name) + return client.DeleteApplication(c.createContext(), environment, name) } _, err = deletion.Delete(args, deleteFunc, resource.FlinkApplication) diff --git a/internal/flink/command_application_describe.go b/internal/flink/command_application_describe.go index d05bc289b6..dabb698ba1 100644 --- a/internal/flink/command_application_describe.go +++ b/internal/flink/command_application_describe.go @@ -36,7 +36,7 @@ func (c *command) applicationDescribe(cmd *cobra.Command, args []string) error { } applicationName := args[0] - application, err := client.DescribeApplication(cmd.Context(), environment, applicationName) + application, err := client.DescribeApplication(c.createContext(), environment, applicationName) if err != nil { return err } diff --git a/internal/flink/command_application_list.go b/internal/flink/command_application_list.go index 9b695ee564..42c50c5139 100644 --- a/internal/flink/command_application_list.go +++ b/internal/flink/command_application_list.go @@ -35,7 +35,7 @@ func (c *command) applicationList(cmd *cobra.Command, _ []string) error { return err } - applications, err := client.ListApplications(cmd.Context(), environment) + applications, err := client.ListApplications(c.createContext(), environment) if err != nil { return err } diff --git a/internal/flink/command_application_update.go b/internal/flink/command_application_update.go index 7777567218..6cde9dde16 100644 --- a/internal/flink/command_application_update.go +++ b/internal/flink/command_application_update.go @@ -66,7 +66,7 @@ func (c *command) applicationUpdate(cmd *cobra.Command, args []string) error { return err } - outputApplication, err := client.UpdateApplication(cmd.Context(), environment, application) + outputApplication, err := client.UpdateApplication(c.createContext(), environment, application) if err != nil { return err } diff --git a/internal/flink/command_application_webui.go b/internal/flink/command_application_webui.go index f112899573..be9eb136ac 100644 --- a/internal/flink/command_application_webui.go +++ b/internal/flink/command_application_webui.go @@ -62,7 +62,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err // Get the name of the application and check for its existence applicationName := args[0] - _, err = cmfClient.DescribeApplication(cmd.Context(), environment, applicationName) + _, err = cmfClient.DescribeApplication(c.createContext(), environment, applicationName) if err != nil { return fmt.Errorf(`failed to forward web UI: %s`, err) } @@ -78,7 +78,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err } http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - handleRequest(w, r, url, environment, applicationName, cmfClient.APIClient.GetConfig().UserAgent, client) + c.handleRequest(w, r, url, environment, applicationName, cmfClient.APIClient.GetConfig().UserAgent, client) }) listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) @@ -93,7 +93,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err return nil } -func handleRequest(userResponseWriter http.ResponseWriter, userRequest *http.Request, url, environmentName, applicationName, userAgent string, client *http.Client) { +func (c *command) handleRequest(userResponseWriter http.ResponseWriter, userRequest *http.Request, url, environmentName, applicationName, userAgent string, client *http.Client) { body, err := io.ReadAll(userRequest.Body) if err != nil { http.Error(userResponseWriter, fmt.Sprintf("Failed to read request body: %s", err), http.StatusInternalServerError) @@ -109,6 +109,11 @@ func handleRequest(userResponseWriter http.ResponseWriter, userRequest *http.Req reqToCmf.Header = userRequest.Header reqToCmf.Header.Set("x-confluent-cli-version", userAgent) + if c.Config.IsOnPremLogin() { + accessToken := c.Context.GetAuthToken() + reqToCmf.Header.Set("Authorization", fmt.Sprintf("Bearer %s", accessToken)) + } + resFromCmf, err := client.Do(reqToCmf) if err != nil { http.Error(userResponseWriter, fmt.Sprintf("failed to forward the request: %s", err), http.StatusInternalServerError) diff --git a/internal/flink/command_environment_create.go b/internal/flink/command_environment_create.go index c9adfe89f4..7bdf4a97b4 100644 --- a/internal/flink/command_environment_create.go +++ b/internal/flink/command_environment_create.go @@ -86,7 +86,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error { } postEnvironment.KubernetesNamespace = kubernetesNamespace - outputEnvironment, err := client.CreateEnvironment(cmd.Context(), postEnvironment) + outputEnvironment, err := client.CreateEnvironment(c.createContext(), postEnvironment) if err != nil { return err } diff --git a/internal/flink/command_environment_delete.go b/internal/flink/command_environment_delete.go index 111a6905d6..52e05f0b91 100644 --- a/internal/flink/command_environment_delete.go +++ b/internal/flink/command_environment_delete.go @@ -31,7 +31,7 @@ func (c *command) environmentDelete(cmd *cobra.Command, args []string) error { } existenceFunc := func(name string) bool { - _, err := client.DescribeEnvironment(cmd.Context(), name) + _, err := client.DescribeEnvironment(c.createContext(), name) return err == nil } @@ -44,7 +44,7 @@ func (c *command) environmentDelete(cmd *cobra.Command, args []string) error { } deleteFunc := func(name string) error { - return client.DeleteEnvironment(cmd.Context(), name) + return client.DeleteEnvironment(c.createContext(), name) } _, err = deletion.Delete(args, deleteFunc, resource.FlinkEnvironment) diff --git a/internal/flink/command_environment_describe.go b/internal/flink/command_environment_describe.go index 9d24fc62e0..448ec5a815 100644 --- a/internal/flink/command_environment_describe.go +++ b/internal/flink/command_environment_describe.go @@ -33,7 +33,7 @@ func (c *command) environmentDescribe(cmd *cobra.Command, args []string) error { // Get the name of the environment to be retrieved environmentName := args[0] - environment, err := client.DescribeEnvironment(cmd.Context(), environmentName) + environment, err := client.DescribeEnvironment(c.createContext(), environmentName) if err != nil { return err } diff --git a/internal/flink/command_environment_list.go b/internal/flink/command_environment_list.go index 488562641a..09f54ede4f 100644 --- a/internal/flink/command_environment_list.go +++ b/internal/flink/command_environment_list.go @@ -28,7 +28,7 @@ func (c *command) environmentList(cmd *cobra.Command, _ []string) error { return err } - environments, err := client.ListEnvironments(cmd.Context()) + environments, err := client.ListEnvironments(c.createContext()) if err != nil { return err } diff --git a/internal/flink/command_environment_update.go b/internal/flink/command_environment_update.go index 3ff31d3573..9e7b3972c2 100644 --- a/internal/flink/command_environment_update.go +++ b/internal/flink/command_environment_update.go @@ -75,7 +75,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error { environment.FlinkApplicationDefaults = defaultsParsed } - outputEnvironment, err := client.UpdateEnvironment(cmd.Context(), environment) + outputEnvironment, err := client.UpdateEnvironment(c.createContext(), environment) if err != nil { return err } diff --git a/test/fixtures/output/flink/environment/list-cloud.golden b/test/fixtures/output/flink/environment/list-cloud.golden new file mode 100644 index 0000000000..cd4fa6ba24 --- /dev/null +++ b/test/fixtures/output/flink/environment/list-cloud.golden @@ -0,0 +1,5 @@ +Error: you must log out of Confluent Cloud to use this command + +Suggestions: + Log out with `confluent logout`. + diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index fab286760f..d6970faf5e 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -6,6 +6,20 @@ import ( "github.com/confluentinc/cli/v4/pkg/auth" ) +// Runs the integration test with login = "" and login = "onprem" +func runIntegrationTestsWithMultipleAuth(s *CLITestSuite, tests []CLITest) { + for _, test := range tests { + test.login = "" + s.T().Setenv("LOGIN_TYPE", "") + s.runIntegrationTest(test) + + test.name = test.args + "-onprem" + test.login = "onprem" + s.T().Setenv("LOGIN_TYPE", "onprem") + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestFlinkApplicationList() { tests := []CLITest{ // failure scenarios @@ -17,9 +31,7 @@ func (s *CLITestSuite) TestFlinkApplicationList() { {args: "flink application list --environment default --output human", fixture: "flink/application/list-human.golden"}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkApplicationDelete() { @@ -35,9 +47,7 @@ func (s *CLITestSuite) TestFlinkApplicationDelete() { {args: "flink application delete --environment delete-test default-application-1 non-existent --force", fixture: "flink/application/delete-mixed.golden", exitCode: 1}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkEnvironmentList() { @@ -47,9 +57,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentList() { {args: "flink environment list --output human", fixture: "flink/environment/list-human.golden"}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkEnvironmentDelete() { @@ -63,9 +71,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentDelete() { {args: "flink environment delete default non-existent --force", fixture: "flink/environment/delete-mixed.golden", exitCode: 1}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkApplicationCreate() { @@ -81,9 +87,7 @@ func (s *CLITestSuite) TestFlinkApplicationCreate() { {args: "flink application create --environment default test/fixtures/input/flink/application/create-new.json --output human", fixture: "flink/application/create-with-human.golden"}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkApplicationUpdate() { @@ -100,9 +104,7 @@ func (s *CLITestSuite) TestFlinkApplicationUpdate() { {args: "flink application update --environment default test/fixtures/input/flink/application/update-successful.json --output human", fixture: "flink/application/update-with-human.golden"}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkEnvironmentCreate() { @@ -118,9 +120,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentCreate() { {args: "flink environment create default", fixture: "flink/environment/create-no-namespace.golden", exitCode: 1}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkEnvironmentUpdate() { @@ -135,9 +135,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentUpdate() { {args: "flink environment update get-failure", fixture: "flink/environment/update-get-failure.golden", exitCode: 1}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkEnvironmentDescribe() { @@ -151,9 +149,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentDescribe() { {args: "flink environment describe", fixture: "flink/environment/describe-no-environment.golden", exitCode: 1}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkApplicationDescribe() { @@ -169,9 +165,7 @@ func (s *CLITestSuite) TestFlinkApplicationDescribe() { {args: "flink application describe default-application-1", fixture: "flink/application/describe-no-environment.golden", exitCode: 1}, } - for _, test := range tests { - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) } func (s *CLITestSuite) TestFlinkApplicationWebUiForward() { @@ -182,14 +176,17 @@ func (s *CLITestSuite) TestFlinkApplicationWebUiForward() { {args: "flink --url dummy-url application web-ui-forward non-existent --environment default", fixture: "flink/application/forward-nonexistent-application.golden", exitCode: 1}, {args: "flink --url dummy-url application web-ui-forward default-application-1 --environment non-existent", fixture: "flink/application/forward-nonexistent-environment.golden", exitCode: 1}, {args: "flink --url dummy-url application web-ui-forward get-failure --environment default", fixture: "flink/application/forward-get-failure.golden", exitCode: 1}, - {name: "no-url-set", args: "flink application web-ui-forward --environment does-not-matter missing-applications", fixture: "flink/application/url-missing.golden", exitCode: 1}, } - for _, test := range tests { - if test.name == "no-url-set" { - // unset the environment variable - os.Unsetenv(auth.ConfluentPlatformCmfURL) - } - s.runIntegrationTest(test) - } + runIntegrationTestsWithMultipleAuth(s, tests) + + noUrlSetTest := CLITest{name: "no-url-set", args: "flink application web-ui-forward --environment does-not-matter missing-applications", fixture: "flink/application/url-missing.golden", exitCode: 1} + // unset the environment variable + os.Unsetenv(auth.ConfluentPlatformCmfURL) + s.runIntegrationTest(noUrlSetTest) +} + +func (s *CLITestSuite) TestFlinkOnPremWithCloudLogin() { + test := CLITest{args: "flink environment list --output json", fixture: "flink/environment/list-cloud.golden", login: "cloud", exitCode: 1} + s.runIntegrationTest(test) } diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index b15121bf67..d0142b2541 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "os" "strings" "testing" "time" @@ -111,6 +112,25 @@ func createEnvironment(name string, namespace string) cmfsdk.Environment { } } +// Helper function to check that the login type is either empty or onprem, and if it's onprem, +// that the headers are correct. +func handleLoginType(t *testing.T, r *http.Request) { + loginType := os.Getenv("LOGIN_TYPE") + + // Depending on the login type, we need to check if the headers are correct + if loginType == "" { + return + } + if loginType != "onprem" { + require.Fail(t, "LOGIN_TYPE besides onprem and not-logged in are not allowed - the test has a bug.") + return + } + + authValue := r.Header.Get("Authorization") + require.NotEqual(t, "", authValue) + require.True(t, strings.HasPrefix(authValue, "Bearer ")) +} + // There are a number of request and responses for each path depending on the test case. // We assume the following set of existing environments and applications as already existing: // default: default-application-1, default-application-2 @@ -126,6 +146,8 @@ func createEnvironment(name string, namespace string) cmfsdk.Environment { // Used to list, create and update environments. func handleCmfEnvironments(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + switch r.Method { case http.MethodGet: page := r.URL.Query().Get("page") @@ -185,6 +207,8 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { // Used by describe and delete. func handleCmfEnvironment(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + environment := mux.Vars(r)["environment"] switch r.Method { case http.MethodGet: @@ -225,6 +249,8 @@ func handleCmfEnvironment(t *testing.T) http.HandlerFunc { // Used by list, create and update applications. func handleCmfApplications(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + vars := mux.Vars(r) environment := vars["environment"] switch r.Method { @@ -305,6 +331,8 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc { // Used by describe and delete applications. func handleCmfApplication(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + vars := mux.Vars(r) environment := vars["environment"] application := vars["application"]