Skip to content

Commit

Permalink
Merge branch '7.9.x' into master by pbadani
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentSemaphore committed Dec 9, 2024
2 parents 8982a63 + 1ec51d4 commit 3c4604d
Show file tree
Hide file tree
Showing 22 changed files with 294 additions and 30 deletions.
4 changes: 0 additions & 4 deletions ksqldb-api-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<!-- Skip ITs temporarily -->
<skipITs>true</skipITs>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class RestClientIntegrationTest {
.withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE)
.withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE)
.withProperty(KsqlConfig.KSQL_HEADERS_COLUMNS_ENABLED, true)
.withProperty(KsqlConfig.KSQL_UDF_SECURITY_MANAGER_ENABLED, false)
.build();

@ClassRule
Expand Down
4 changes: 0 additions & 4 deletions ksqldb-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<!-- Skip ITs temporarily -->
<skipITs>true</skipITs>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
32 changes: 31 additions & 1 deletion ksqldb-cli/src/main/java/io/confluent/ksql/Ksql.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.security.AuthType;
import io.confluent.ksql.security.BasicCredentials;
import io.confluent.ksql.security.Credentials;
import io.confluent.ksql.security.CredentialsFactory;
import io.confluent.ksql.security.KsqlClientConfig;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlException;
import java.io.Console;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -144,13 +148,39 @@ private KsqlRestClient buildClient(
final Map<String, String> localProps = stripClientSideProperties(configProps);
final Map<String, String> clientProps = PropertiesUtil.applyOverrides(configProps, systemProps);
final String server = options.getServer();
final Optional<Credentials> creds = options.getUserNameAndPassword();
final Optional<Credentials> creds = getCredentials();
final Optional<BasicCredentials> ccloudApiKey = options.getCCloudApiKey();

return clientBuilder.build(
server, localProps, clientProps, creds, ccloudApiKey);
}

private Optional<Credentials> getCredentials() {
options.validateCredentials();

AuthType authType = AuthType.NONE;
final String userName = options.getUserName();
final String password = options.getPassword();
final String token = options.getToken();

final Map<String, Object> configProps = new HashMap<>();
if ((userName != null && !userName.isEmpty())
&& (password != null && !password.isEmpty())) {
authType = AuthType.BASIC;
configProps.put(KsqlClientConfig.KSQL_BASIC_AUTH_USERNAME, userName);
configProps.put(KsqlClientConfig.KSQL_BASIC_AUTH_PASSWORD, password);
} else if (token != null && !token.isEmpty()) {
authType = AuthType.STATIC_TOKEN;
configProps.put(KsqlClientConfig.BEARER_AUTH_TOKEN_CONFIG, token);
}

return Optional.ofNullable(CredentialsFactory.createCredentials(authType))
.map(credentials -> {
credentials.configure(configProps);
return credentials;
});
}

private static Map<String, String> stripClientSideProperties(final Map<String, String> props) {
return PropertiesUtil.filterByKey(props, NOT_CLIENT_SIDE_CONFIG);
}
Expand Down
34 changes: 28 additions & 6 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class Options {
private static final String CONFIGURATION_FILE_OPTION_NAME = "--config-file";
private static final String CONFLUENT_API_KEY_OPTION = "--confluent-api-key";
private static final String CONFLUENT_API_SECRET_OPTION = "--confluent-api-secret";
private static final String CONFLUENT_TOKEN_OPTION = "--token";
private static final String CONFLUENT_TOKEN_SHORT_OPTION = "-t";

// Only here so that the help message generated by Help.help() is accurate
@Inject
Expand Down Expand Up @@ -161,6 +163,13 @@ public class Options {
+ " flag")
private String ccloudApiSecret = "";

@Option(
name = {CONFLUENT_TOKEN_OPTION, CONFLUENT_TOKEN_SHORT_OPTION},
description =
"If your KSQL server is configured for authentication, "
+ "then provide the confluent token retrieved from confluent cli here.")
private String token = "";

public static Options parse(final String...args) throws IOException {
final SingleCommand<Options> optionsParser = SingleCommand.singleCommand(Options.class);

Expand Down Expand Up @@ -231,6 +240,25 @@ public String getPassword() {
}

public Optional<Credentials> getUserNameAndPassword() {
validateCredentials();

if (userName.isEmpty()) {
return Optional.empty();
}

return Optional.of(BasicCredentials.of(userName, password));
}

public String getToken() {
return token;
}

public void validateCredentials() {
// Ensure only one of username/password or token is provided
if (!(userName.isEmpty() || password.isEmpty()) && !token.isEmpty()) {
throw new ConfigException("Only one of username/password or token can be provided");
}

if (userName.isEmpty() != password.isEmpty()) {
throw new ConfigException(
"You must specify both a username and a password. If you don't want to use an "
Expand All @@ -240,12 +268,6 @@ public Optional<Credentials> getUserNameAndPassword() {
+ PASSWORD_OPTION
+ " flags on the command line");
}

if (userName.isEmpty()) {
return Optional.empty();
}

return Optional.of(BasicCredentials.of(userName, password));
}

public Optional<BasicCredentials> getCCloudApiKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,83 @@ public void shouldNotTrimPasswords() {
is(" "));
}

@Test
public void shouldThrowExceptionWhenUsernamePasswordAndTokenAreProvided() {
// Given:
final Options options = parse("-u", "joe", "-p", "pp", "-t", "token");

// When:
final Exception e = assertThrows(
ConfigException.class,
options::validateCredentials
);

// Then:
assertThat(e.getMessage(),
containsString("Only one of username/password or token can be provided"));
}

@Test
public void shouldAcceptUsernamePasswordWhenNoTokenProvided() {
// Given:
final Options options = parse("-u", "joe", "-p", "pp");

// When:
options.validateCredentials();

// Then: No exception thrown
assertThat(options.getUserNameAndPassword().isPresent(), is(true));
assertThat(options.getUserName(), is("joe"));
assertThat(options.getPassword(), is("pp"));

assertThat(options.getToken(), is(""));
}

@Test
public void shouldAcceptTokenIfNoUsernamePasswordProvided() {
// Given:
final Options options = parse("-t", "abcdef");

// When:
options.validateCredentials();

// Then: No exception thrown
assertThat(options.getUserNameAndPassword().isPresent(), is(false));
assertThat(options.getUserName(), is(""));
assertThat(options.getPassword(), is(""));

assertThat(options.getToken(), is("abcdef"));
}

@Test
public void shouldThrowConfigExceptionIfEitherUsernameOrPasswordProvidedWithToken() {
// Given:
Options options = parse("-u","joe","-t", "abcdef");

// When:
Exception e = assertThrows(
ConfigException.class,
options::validateCredentials
);

// Then:
assertThat(e.getMessage(),
containsString("You must specify both a username and a password."));

// Given:
options = parse("-p","ee","-t", "abcdef");

// When:
e = assertThrows(
ConfigException.class,
options::validateCredentials
);

// Then:
assertThat(e.getMessage(),
containsString("You must specify both a username and a password."));
}

@Test
public void shouldThrowConfigExceptionIfOnlyApiKeyIsProvided() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
public enum AuthType {
BASIC,
OAUTHBEARER,
STATIC_TOKEN,
NONE
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.security;

import io.confluent.ksql.security.oauth.OAuthBearerCredentials;
import io.confluent.ksql.security.oauth.StaticTokenCredentials;

@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
public class CredentialsFactory {
Expand All @@ -25,6 +26,8 @@ public static Credentials createCredentials(final AuthType authType) {
return new BasicCredentials();
case OAUTHBEARER:
return new OAuthBearerCredentials();
case STATIC_TOKEN:
return new StaticTokenCredentials();
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ public final class KsqlClientConfig extends AbstractConfig {
public static final String KSQL_BASIC_AUTH_USERNAME = "ksql.auth.basic.username";
public static final String KSQL_BASIC_AUTH_PASSWORD = "ksql.auth.basic.password";

//OAuth AUTHORIZATION SERVER related configs
// Static token related configs
public static final String BEARER_AUTH_TOKEN_CONFIG = "bearer.auth.token";

// OAuth AUTHORIZATION SERVER related configs
public static final String BEARER_AUTH_TOKEN_ENDPOINT_URL = "bearer.auth.issuer.endpoint.url";
public static final String BEARER_AUTH_CLIENT_ID = "bearer.auth.client.id";
public static final String BEARER_AUTH_CLIENT_SECRET = "bearer.auth.client.secret";
Expand All @@ -37,7 +40,7 @@ public final class KsqlClientConfig extends AbstractConfig {
public static final String BEARER_AUTH_SUB_CLAIM_NAME_DEFAULT =
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME;

//OAuth config related to token cache
// OAuth config related to token cache
public static final String BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS =
"bearer.auth.cache.expiry.buffer.seconds";
public static final short BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS_DEFAULT = 300;
Expand Down Expand Up @@ -67,6 +70,12 @@ private KsqlClientConfig(final Map<String, String> configs) {
"",
ConfigDef.Importance.MEDIUM,
"The password for the KSQL server"
).define(
BEARER_AUTH_TOKEN_CONFIG,
ConfigDef.Type.PASSWORD,
"",
ConfigDef.Importance.MEDIUM,
"The static bearer token for the IDP Authorization server"
).define(
BEARER_AUTH_TOKEN_ENDPOINT_URL,
ConfigDef.Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.security.oauth;

import io.confluent.ksql.security.Credentials;
import io.confluent.ksql.security.KsqlClientConfig;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;

public class StaticTokenCredentials implements Credentials {

private String token;

@Override
public String getAuthHeader() {
return "Bearer " + this.token;
}

@Override
public void validateConfigs(final Map<String, ?> configs) throws ConfigException {
final String token = (String) configs
.get(KsqlClientConfig.BEARER_AUTH_TOKEN_CONFIG);
if (token == null || token.isEmpty()) {
throw new ConfigException("Cannot configure StaticTokenCredentials without "
+ "a proper token.");
}
}

@Override
public void configure(final Map<String, ?> configs) {
validateConfigs(configs);
this.token = (String) configs
.get(KsqlClientConfig.BEARER_AUTH_TOKEN_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;

import io.confluent.ksql.security.oauth.OAuthBearerCredentials;
import io.confluent.ksql.security.oauth.StaticTokenCredentials;
import org.junit.Test;

public class CredentialsFactoryTest {
Expand All @@ -37,6 +38,13 @@ public void testCreateOAuthBearerCredentials() {
"Should return an instance of OAuthBearerCredentials");
}

@Test
public void testCreateStaticTokenCredentials() {
Credentials credentials = CredentialsFactory.createCredentials(AuthType.STATIC_TOKEN);
assertInstanceOf(StaticTokenCredentials.class, credentials,
"Should return an instance of StaticTokenCredentials");
}

@Test
public void testCreateCredentialsWithUnsupportedAuthType() {
Credentials credentials = CredentialsFactory.createCredentials(AuthType.NONE);
Expand Down
Loading

0 comments on commit 3c4604d

Please sign in to comment.