Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add assume role support #45

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/io/kafbat/ui/serde/glue/GlueSerde.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.SchemaId;
import software.amazon.awssdk.services.glue.model.SchemaVersionNumber;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;

public class GlueSerde implements Serde {

Expand Down Expand Up @@ -88,6 +90,7 @@ public void configure(PropertyResolver serdeProperties,
serdeProperties.getProperty("region", String.class)
.orElseThrow(() -> new IllegalArgumentException("region not provided for GlueSerde")),
serdeProperties.getProperty("endpoint", String.class).orElse(null),
serdeProperties.getProperty("roleArn", String.class).orElse(null),
serdeProperties.getProperty("registry", String.class)
.orElseThrow(() -> new IllegalArgumentException("registry not provided for GlueSerde")),
serdeProperties.getProperty("keySchemaNameTemplate", String.class)
Expand All @@ -114,6 +117,7 @@ public void configure(PropertyResolver serdeProperties,
void configure(AwsCredentialsProvider credentialsProvider,
String region,
@Nullable String endpoint,
@Nullable String roleArn,
String registryName,
@Nullable String keySchemaNameTemplate,
String valueSchemaNameTemplate,
Expand Down Expand Up @@ -160,6 +164,19 @@ static AwsCredentialsProvider createCredentialsProvider(PropertyResolver serdePr
.orElseGet(() -> () -> AwsBasicCredentials.create(awsAccessKey.get(), awsSecretKey.get()));
}

Optional<String> roleArn = serdeProperties.getProperty("roleArn", String.class);
if (roleArn.isPresent()) {
return StsAssumeRoleCredentialsProvider.builder()
.refreshRequest(b -> b.roleArn(roleArn.get())
.roleSessionName("kafbat-ui-" + UUID.randomUUID()))
.stsClient(StsClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.region(Region.of(serdeProperties.getProperty("region", String.class)
.orElseThrow(() -> new IllegalArgumentException("region required for assume role"))))
.build())
.build();
}

Optional<String> profileName = serdeProperties.getProperty("awsProfileName", String.class);
Optional<String> profileFile = serdeProperties.getProperty("awsProfileFile", String.class);
if (profileName.isPresent() || profileFile.isPresent()) {
Expand Down
9 changes: 8 additions & 1 deletion src/test/java/io/kafbat/ui/serde/glue/GlueSerdeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ class GlueSerdeTest {

private static final String REGISTRY_NAME = "kui-glue-serde-test-registry";

private static final Network NETWORK = Network.newNetwork();
private static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.2.1")).withNetwork(Network.SHARED);
DockerImageName.parse("confluentinc/cp-kafka:7.2.1")).withNetwork(NETWORK);

private static GlueClient GLUE_CLIENT;

Expand Down Expand Up @@ -115,6 +116,7 @@ static void checkCredsResolving() {
@AfterAll
static void tearDown() {
KAFKA.close();
NETWORK.close();
try {
GLUE_CLIENT.deleteRegistry(
DeleteRegistryRequest.builder().registryId(
Expand Down Expand Up @@ -194,6 +196,7 @@ private <T> void checkDeserializerIsCompatibleWithKafkaLibrarySerializer(
DefaultCredentialsProvider.create(),
REGION,
null,
null,
REGISTRY_NAME,
null,
"%s",
Expand Down Expand Up @@ -228,6 +231,7 @@ private <T> void checkSerializerIsCompatibleWithKafkaLibraryDeserializer(
DefaultCredentialsProvider.create(),
REGION,
null,
null,
REGISTRY_NAME,
null,
"%s",
Expand Down Expand Up @@ -403,6 +407,7 @@ void canSerializeAndCanDeserializeCheckTopicSchemaMappingMap() {
DefaultCredentialsProvider.create(),
REGION,
null,
null,
REGISTRY_NAME,
null,
"%s",
Expand Down Expand Up @@ -457,6 +462,7 @@ void canSerializeAndCanDeserializeUsesTopicKVTemplateToFindSchemas() {
DefaultCredentialsProvider.create(),
REGION,
null,
null,
REGISTRY_NAME,
"%s-key",
"%s-value",
Expand Down Expand Up @@ -484,6 +490,7 @@ void canDeserializeReturnsTrueForAnyTopicIfSchemaExistenceCheckIsDisabled() {
DefaultCredentialsProvider.create(),
REGION,
null,
null,
REGISTRY_NAME,
"%s-key",
"%s-value",
Expand Down
Loading