Skip to content

Commit

Permalink
add optional isolation.Level config (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
l139533 authored Oct 25, 2024
1 parent d676596 commit 52ea038
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
3 changes: 2 additions & 1 deletion api/src/main/kotlin/no/nav/kafkamanager/domain/AppConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ data class TopicConfig(
val name: String,
val location: TopicLocation,
val keyDeserializerType: DeserializerType,
val valueDeserializerType: DeserializerType
val valueDeserializerType: DeserializerType,
val isolationLevel: String?
)

enum class TopicLocation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import no.nav.kafkamanager.utils.DTOMappers.toKafkaRecordHeader
import no.nav.kafkamanager.utils.KafkaPropertiesFactory.createAivenConsumerProperties
import no.nav.kafkamanager.utils.KafkaPropertiesFactory.createOnPremConsumerProperties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_ISOLATION_LEVEL
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -129,17 +130,19 @@ class KafkaAdminService(
private fun createPropertiesForTopic(consumerGroupId: String?, topicConfig: TopicConfig): Properties {
val keyDesType = topicConfig.keyDeserializerType
val valueDesType = topicConfig.valueDeserializerType
val isolationLevel = topicConfig.isolationLevel ?: DEFAULT_ISOLATION_LEVEL

val properties = when (topicConfig.location) {
TopicLocation.ON_PREM -> createOnPremConsumerProperties(
environmentProperties.onPremKafkaBrokersUrl,
systemUserCredentialsSupplier.get(),
environmentProperties.onPremSchemaRegistryUrl,
keyDesType,
valueDesType
valueDesType,
isolationLevel
)

TopicLocation.AIVEN -> createAivenConsumerProperties(keyDesType, valueDesType)
TopicLocation.AIVEN -> createAivenConsumerProperties(keyDesType, valueDesType, isolationLevel)
}

if (consumerGroupId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import no.nav.common.utils.EnvironmentUtils.getRequiredProperty
import no.nav.kafkamanager.controller.KafkaAdminController
import no.nav.kafkamanager.domain.DeserializerType
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.consumer.ConsumerConfig.*
import org.apache.kafka.common.serialization.*
import java.util.*

Expand All @@ -21,11 +20,13 @@ object KafkaPropertiesFactory {
credentials: Credentials,
schemaRegistryUrl: String?,
keyDeserializerType: DeserializerType,
valueDeserializerType: DeserializerType
valueDeserializerType: DeserializerType,
isolationLevel: String
): Properties {
val builder = KafkaPropertiesBuilder.consumerBuilder()
.withBaseProperties()
.withProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KafkaAdminController.MAX_KAFKA_RECORDS)
.withProp(ISOLATION_LEVEL_CONFIG, isolationLevel)
.withProp(MAX_POLL_RECORDS_CONFIG, KafkaAdminController.MAX_KAFKA_RECORDS)
.withBrokerUrl(kafkaBrokerUrl)
.withOnPremAuth(credentials.username, credentials.password)
.withProp(KEY_DESERIALIZER_CLASS_CONFIG, findDeserializer(keyDeserializerType).name)
Expand All @@ -42,17 +43,19 @@ object KafkaPropertiesFactory {

fun createAivenConsumerProperties(
keyDeserializerType: DeserializerType,
valueDeserializerType: DeserializerType
valueDeserializerType: DeserializerType,
isolationLevel: String
): Properties {
val schemaRegistryUrl = getRequiredProperty(KAFKA_SCHEMA_REGISTRY)
val schemaRegistryUsername = getRequiredProperty(KAFKA_SCHEMA_REGISTRY_USER)
val schemaRegistryPassword = getRequiredProperty(KAFKA_SCHEMA_REGISTRY_PASSWORD)

return KafkaPropertiesBuilder.consumerBuilder()
.withBaseProperties()
.withProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KafkaAdminController.MAX_KAFKA_RECORDS)
.withProp(MAX_POLL_RECORDS_CONFIG, KafkaAdminController.MAX_KAFKA_RECORDS)
.withAivenBrokerUrl()
.withAivenAuth()
.withProp(ISOLATION_LEVEL_CONFIG, isolationLevel)
.withProp(KEY_DESERIALIZER_CLASS_CONFIG, findDeserializer(keyDeserializerType).name)
.withProp(VALUE_DESERIALIZER_CLASS_CONFIG, findDeserializer(valueDeserializerType).name)
.withProp(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
Expand Down

0 comments on commit 52ea038

Please sign in to comment.