Skip to content

Commit

Permalink
cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Nov 4, 2024
1 parent e17ba8d commit 984a551
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.infinitic.common.data.MillisDuration
import io.infinitic.common.exceptions.thisShouldNotHappen
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.config.maxMillis
import io.infinitic.common.transport.consumers.Result
import io.infinitic.common.transport.consumers.acknowledge
import io.infinitic.common.transport.consumers.batchBy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ data class BatchConfig(
require(maxMessages > 0) { error("'${::maxMessages.name}' must be > 0, but was $maxMessages") }
require(maxSeconds > 0) { error("'${::maxSeconds.name}' must be > 0, but was $maxSeconds") }
}

val maxMillis = (maxSeconds * 1000).toLong()
}

fun BatchConfig?.normalized(concurrency: Int) =
Expand All @@ -47,3 +45,6 @@ fun BatchConfig?.normalized(key: String, concurrency: Int = 1) = this?.let {
MillisDuration(maxMillis),
)
}

val BatchConfig.maxMillis: Long
get() = (maxSeconds * 1000).toLong()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ interface InfiniticProducerFactory {
/**
* Retrieves an `InfiniticProducer` based on the provided batch producing configuration.
*/
fun getProducer(batchProducingConfig: BatchConfig?): InfiniticProducer
fun getProducer(batchSendingConfig: BatchConfig?): InfiniticProducer

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class InMemoryInfiniticProducerFactory(
this.name = name
}

override fun getProducer(batchProducingConfig: BatchConfig?) = InMemoryInfiniticProducer(
override fun getProducer(batchSendingConfig: BatchConfig?) = InMemoryInfiniticProducer(
mainChannels,
eventListenerChannels,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package io.infinitic.inMemory.consumers
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.Topic
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.transport.config.maxMillis
import io.infinitic.common.transport.consumers.batchWithTimeout
import io.infinitic.common.transport.interfaces.TransportConsumer
import kotlinx.coroutines.channels.Channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class PulsarInfiniticConsumerFactory(
subscriptionNameDlq = subscription.nameDLQ,
subscriptionType = subscription.type,
consumerName = consumerName,
batchConfig = batchReceivingConfig,
batchReceivingConfig = batchReceivingConfig,
).onSuccess {
trace { "Consumer '${consumerName}' created for $topicName" }
}
Expand Down Expand Up @@ -134,14 +134,14 @@ class PulsarInfiniticConsumerFactory(
subscriptionNameDlq: String,
subscriptionType: SubscriptionType,
consumerName: String,
batchConfig: BatchConfig?,
batchReceivingConfig: BatchConfig?,
): Result<Consumer<S>> {
val consumerDef = InfiniticPulsarClient.ConsumerDef(
topic = topic,
subscriptionName = subscriptionName, // MUST be the same for all instances!
subscriptionType = subscriptionType,
consumerName = consumerName,
batchReceivingConfig = batchConfig,
batchReceivingConfig = batchReceivingConfig,
pulsarConsumerConfig = pulsarConsumerConfig,
)
val consumerDefDlq = topicDlq?.let {
Expand All @@ -150,7 +150,7 @@ class PulsarInfiniticConsumerFactory(
subscriptionName = subscriptionNameDlq, // MUST be the same for all instances!
subscriptionType = SubscriptionType.Shared,
consumerName = "$consumerName-dlq",
batchReceivingConfig = batchConfig,
batchReceivingConfig = batchReceivingConfig,
pulsarConsumerConfig = pulsarConsumerConfig,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PulsarInfiniticProducer(
private val client: InfiniticPulsarClient,
private val pulsarProducerConfig: PulsarProducerConfig,
private val pulsarResources: PulsarResources,
private val batchConfig: BatchConfig?
private val batchSendingConfig: BatchConfig?
) : InfiniticProducer {

override val emitterName by lazy { EmitterName(client.name) }
Expand Down Expand Up @@ -81,7 +81,7 @@ class PulsarInfiniticProducer(
val envelope = topic.envelope(message)

// get cached producer or create it
val producer = getProducer(topic, message.entity(), envelope::class, key, batchConfig)
val producer = getProducer(topic, message.entity(), envelope::class, key, batchSendingConfig)
.getOrElse { return CompletableFuture.failedFuture(it) }

logger.trace { "Sending${if (after > 0) " after $after ms" else ""} to topic '${producer.topic}' with key '$key': '$envelope'" }
Expand Down Expand Up @@ -115,7 +115,7 @@ class PulsarInfiniticProducer(
entity: String,
envelopeKClass: KClass<out Envelope<out T>>,
key: String?,
batchConfig: BatchConfig?
batchSendingConfig: BatchConfig?
): Result<Producer<Envelope<out T>>> {
val topicFullName = with(pulsarResources) {
topic.forEntity(
Expand All @@ -128,7 +128,7 @@ class PulsarInfiniticProducer(
return client.getProducer(
topicFullName,
envelopeKClass,
batchConfig,
batchSendingConfig,
pulsarProducerConfig,
key,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class PulsarInfiniticProducerFactory(
suggestedName = name
}

override fun getProducer(batchProducingConfig: BatchConfig?): PulsarInfiniticProducer {
override fun getProducer(batchSendingConfig: BatchConfig?): PulsarInfiniticProducer {
// init client name
runBlocking { getName() }

return PulsarInfiniticProducer(
client,
pulsarProducerConfig,
pulsarResources,
batchProducingConfig,
batchSendingConfig,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import io.infinitic.common.messages.Envelope
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.transport.config.maxMillis
import io.infinitic.pulsar.config.PulsarConsumerConfig
import io.infinitic.pulsar.config.PulsarProducerConfig
import io.infinitic.pulsar.schemas.schemaDefinition
Expand Down Expand Up @@ -104,7 +105,7 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
fun <T : Message> getProducer(
topic: String,
schemaKClass: KClass<out Envelope<out T>>,
batchConfig: BatchConfig?,
batchSendingConfig: BatchConfig?,
pulsarProducerConfig: PulsarProducerConfig,
key: String? = null,
): Result<Producer<Envelope<out T>>> {
Expand All @@ -113,7 +114,7 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
@Suppress("UNCHECKED_CAST")
Result.success(
producers.computeIfAbsent(topic) {
createProducer(topic, schemaKClass, batchConfig, pulsarProducerConfig, key)
createProducer(topic, schemaKClass, pulsarProducerConfig, batchSendingConfig, key)
} as Producer<Envelope<out T>>,
)
} catch (e: PulsarClientException) {
Expand All @@ -125,8 +126,8 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
private fun createProducer(
topic: String,
schemaKClass: KClass<out Envelope<out Message>>,
batchConfig: BatchConfig?,
pulsarProducerConfig: PulsarProducerConfig,
batchSendingConfig: BatchConfig?,
key: String? = null,
): Producer<Envelope<out Message>> {
// otherwise create it
Expand Down Expand Up @@ -219,16 +220,14 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
}

// if batchConfig is defined, it replaces above settings
batchConfig?.also {
batchSendingConfig?.also {
logInfo { "Producer batchConfig=$it" }
batchingMaxMessages(it.maxMessages)
batchingMaxPublishDelay(it.maxMillis, TimeUnit.MILLISECONDS)
enableBatching(true)
}
}



@Suppress("UNCHECKED_CAST")
return builder.create() as Producer<Envelope<out Message>>
}
Expand All @@ -251,7 +250,7 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
subscriptionName,
subscriptionType,
consumerName,
batchConfig,
batchReceivingConfig,
consumerConfig
) = consumerDef

Expand Down Expand Up @@ -383,7 +382,7 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) {
}

// Batch Receive Policy
batchConfig?.also {
batchReceivingConfig?.also {
logInfo { "Subscription $subscriptionName: batchConfig=$it" }
batchReceivePolicy(
BatchReceivePolicy.builder()
Expand Down

0 comments on commit 984a551

Please sign in to comment.