Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow flink onprem commands to work with MDS auth #3002

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package flink

import (
"context"
"fmt"

"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/config"
"github.com/confluentinc/cli/v4/pkg/featureflags"
Expand All @@ -22,8 +25,10 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {

c := &command{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}

if !cfg.IsCloudLogin() {
// On-prem commands don't require login, so change the pre-runner to account for that.
// On-prem commands are able to run with or without login. Accordingly, set the pre-runner.
if cfg.IsOnPremLogin() {
c = &command{pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner)}
} else if !cfg.IsCloudLogin() {
cmd.PersistentPreRunE = prerunner.Anonymous(c.AuthenticatedCLICommand.CLICommand, false)
}

Expand Down Expand Up @@ -104,3 +109,10 @@ func addCmfFlagSet(cmd *cobra.Command) {
cmd.Flags().String("client-cert-path", "", `Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.`)
cmd.Flags().String("certificate-authority-path", "", `Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.`)
}

func (c *command) createContext() context.Context {
if !c.Config.IsOnPremLogin() {
return context.Background()
}
return context.WithValue(context.Background(), cmfsdk.ContextAccessToken, c.Context.GetAuthToken())
}
2 changes: 1 addition & 1 deletion internal/flink/command_application_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *command) applicationCreate(cmd *cobra.Command, args []string) error {
return err
}

outputApplication, err := client.CreateApplication(cmd.Context(), environment, application)
outputApplication, err := client.CreateApplication(c.createContext(), environment, application)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/flink/command_application_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *command) applicationDelete(cmd *cobra.Command, args []string) error {
}

existenceFunc := func(name string) bool {
_, err := client.DescribeApplication(cmd.Context(), environment, name)
_, err := client.DescribeApplication(c.createContext(), environment, name)
return err == nil
}

Expand All @@ -50,7 +50,7 @@ func (c *command) applicationDelete(cmd *cobra.Command, args []string) error {
}

deleteFunc := func(name string) error {
return client.DeleteApplication(cmd.Context(), environment, name)
return client.DeleteApplication(c.createContext(), environment, name)
}

_, err = deletion.Delete(args, deleteFunc, resource.FlinkApplication)
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *command) applicationDescribe(cmd *cobra.Command, args []string) error {
}

applicationName := args[0]
application, err := client.DescribeApplication(cmd.Context(), environment, applicationName)
application, err := client.DescribeApplication(c.createContext(), environment, applicationName)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *command) applicationList(cmd *cobra.Command, _ []string) error {
return err
}

applications, err := client.ListApplications(cmd.Context(), environment)
applications, err := client.ListApplications(c.createContext(), environment)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_application_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *command) applicationUpdate(cmd *cobra.Command, args []string) error {
return err
}

outputApplication, err := client.UpdateApplication(cmd.Context(), environment, application)
outputApplication, err := client.UpdateApplication(c.createContext(), environment, application)
if err != nil {
return err
}
Expand Down
11 changes: 8 additions & 3 deletions internal/flink/command_application_webui.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err

// Get the name of the application and check for its existence
applicationName := args[0]
_, err = cmfClient.DescribeApplication(cmd.Context(), environment, applicationName)
_, err = cmfClient.DescribeApplication(c.createContext(), environment, applicationName)
if err != nil {
return fmt.Errorf(`failed to forward web UI: %s`, err)
}
Expand All @@ -78,7 +78,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err
}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
handleRequest(w, r, url, environment, applicationName, cmfClient.APIClient.GetConfig().UserAgent, client)
c.handleRequest(w, r, url, environment, applicationName, cmfClient.APIClient.GetConfig().UserAgent, client)
})

listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
Expand All @@ -93,7 +93,7 @@ func (c *command) applicationWebUiForward(cmd *cobra.Command, args []string) err
return nil
}

func handleRequest(userResponseWriter http.ResponseWriter, userRequest *http.Request, url, environmentName, applicationName, userAgent string, client *http.Client) {
func (c *command) handleRequest(userResponseWriter http.ResponseWriter, userRequest *http.Request, url, environmentName, applicationName, userAgent string, client *http.Client) {
body, err := io.ReadAll(userRequest.Body)
if err != nil {
http.Error(userResponseWriter, fmt.Sprintf("Failed to read request body: %s", err), http.StatusInternalServerError)
Expand All @@ -109,6 +109,11 @@ func handleRequest(userResponseWriter http.ResponseWriter, userRequest *http.Req
reqToCmf.Header = userRequest.Header
reqToCmf.Header.Set("x-confluent-cli-version", userAgent)

if c.Config.IsOnPremLogin() {
accessToken := c.Context.GetAuthToken()
reqToCmf.Header.Set("Authorization", fmt.Sprintf("Bearer %s", accessToken))
}

resFromCmf, err := client.Do(reqToCmf)
if err != nil {
http.Error(userResponseWriter, fmt.Sprintf("failed to forward the request: %s", err), http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}
postEnvironment.KubernetesNamespace = kubernetesNamespace

outputEnvironment, err := client.CreateEnvironment(cmd.Context(), postEnvironment)
outputEnvironment, err := client.CreateEnvironment(c.createContext(), postEnvironment)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/flink/command_environment_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *command) environmentDelete(cmd *cobra.Command, args []string) error {
}

existenceFunc := func(name string) bool {
_, err := client.DescribeEnvironment(cmd.Context(), name)
_, err := client.DescribeEnvironment(c.createContext(), name)
return err == nil
}

Expand All @@ -44,7 +44,7 @@ func (c *command) environmentDelete(cmd *cobra.Command, args []string) error {
}

deleteFunc := func(name string) error {
return client.DeleteEnvironment(cmd.Context(), name)
return client.DeleteEnvironment(c.createContext(), name)
}

_, err = deletion.Delete(args, deleteFunc, resource.FlinkEnvironment)
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *command) environmentDescribe(cmd *cobra.Command, args []string) error {

// Get the name of the environment to be retrieved
environmentName := args[0]
environment, err := client.DescribeEnvironment(cmd.Context(), environmentName)
environment, err := client.DescribeEnvironment(c.createContext(), environmentName)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *command) environmentList(cmd *cobra.Command, _ []string) error {
return err
}

environments, err := client.ListEnvironments(cmd.Context())
environments, err := client.ListEnvironments(c.createContext())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
environment.FlinkApplicationDefaults = defaultsParsed
}

outputEnvironment, err := client.UpdateEnvironment(cmd.Context(), environment)
outputEnvironment, err := client.UpdateEnvironment(c.createContext(), environment)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/output/flink/environment/list-cloud.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Error: you must log out of Confluent Cloud to use this command

Suggestions:
Log out with `confluent logout`.

73 changes: 35 additions & 38 deletions test/flink_onprem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ import (
"github.com/confluentinc/cli/v4/pkg/auth"
)

// Runs the integration test with login = "" and login = "onprem"
func runIntegrationTestsWithMultipleAuth(s *CLITestSuite, tests []CLITest) {
for _, test := range tests {
test.login = ""
s.T().Setenv("LOGIN_TYPE", "")
s.runIntegrationTest(test)

test.name = test.args + "-onprem"
test.login = "onprem"
s.T().Setenv("LOGIN_TYPE", "onprem")
s.runIntegrationTest(test)
}
}

func (s *CLITestSuite) TestFlinkApplicationList() {
tests := []CLITest{
// failure scenarios
Expand All @@ -17,9 +31,7 @@ func (s *CLITestSuite) TestFlinkApplicationList() {
{args: "flink application list --environment default --output human", fixture: "flink/application/list-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationDelete() {
Expand All @@ -35,9 +47,7 @@ func (s *CLITestSuite) TestFlinkApplicationDelete() {
{args: "flink application delete --environment delete-test default-application-1 non-existent --force", fixture: "flink/application/delete-mixed.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentList() {
Expand All @@ -47,9 +57,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentList() {
{args: "flink environment list --output human", fixture: "flink/environment/list-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentDelete() {
Expand All @@ -63,9 +71,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentDelete() {
{args: "flink environment delete default non-existent --force", fixture: "flink/environment/delete-mixed.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationCreate() {
Expand All @@ -81,9 +87,7 @@ func (s *CLITestSuite) TestFlinkApplicationCreate() {
{args: "flink application create --environment default test/fixtures/input/flink/application/create-new.json --output human", fixture: "flink/application/create-with-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationUpdate() {
Expand All @@ -100,9 +104,7 @@ func (s *CLITestSuite) TestFlinkApplicationUpdate() {
{args: "flink application update --environment default test/fixtures/input/flink/application/update-successful.json --output human", fixture: "flink/application/update-with-human.golden"},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentCreate() {
Expand All @@ -118,9 +120,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentCreate() {
{args: "flink environment create default", fixture: "flink/environment/create-no-namespace.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentUpdate() {
Expand All @@ -135,9 +135,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentUpdate() {
{args: "flink environment update get-failure", fixture: "flink/environment/update-get-failure.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkEnvironmentDescribe() {
Expand All @@ -151,9 +149,7 @@ func (s *CLITestSuite) TestFlinkEnvironmentDescribe() {
{args: "flink environment describe", fixture: "flink/environment/describe-no-environment.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationDescribe() {
Expand All @@ -169,9 +165,7 @@ func (s *CLITestSuite) TestFlinkApplicationDescribe() {
{args: "flink application describe default-application-1", fixture: "flink/application/describe-no-environment.golden", exitCode: 1},
}

for _, test := range tests {
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)
}

func (s *CLITestSuite) TestFlinkApplicationWebUiForward() {
Expand All @@ -182,14 +176,17 @@ func (s *CLITestSuite) TestFlinkApplicationWebUiForward() {
{args: "flink --url dummy-url application web-ui-forward non-existent --environment default", fixture: "flink/application/forward-nonexistent-application.golden", exitCode: 1},
{args: "flink --url dummy-url application web-ui-forward default-application-1 --environment non-existent", fixture: "flink/application/forward-nonexistent-environment.golden", exitCode: 1},
{args: "flink --url dummy-url application web-ui-forward get-failure --environment default", fixture: "flink/application/forward-get-failure.golden", exitCode: 1},
{name: "no-url-set", args: "flink application web-ui-forward --environment does-not-matter missing-applications", fixture: "flink/application/url-missing.golden", exitCode: 1},
}

for _, test := range tests {
if test.name == "no-url-set" {
// unset the environment variable
os.Unsetenv(auth.ConfluentPlatformCmfURL)
}
s.runIntegrationTest(test)
}
runIntegrationTestsWithMultipleAuth(s, tests)

noUrlSetTest := CLITest{name: "no-url-set", args: "flink application web-ui-forward --environment does-not-matter missing-applications", fixture: "flink/application/url-missing.golden", exitCode: 1}
// unset the environment variable
os.Unsetenv(auth.ConfluentPlatformCmfURL)
s.runIntegrationTest(noUrlSetTest)
}

func (s *CLITestSuite) TestFlinkOnPremWithCloudLogin() {
test := CLITest{args: "flink environment list --output json", fixture: "flink/environment/list-cloud.golden", login: "cloud", exitCode: 1}
s.runIntegrationTest(test)
}
Loading