Skip to content

Commit

Permalink
Renamed all instances and references of BatchConfig to BatchProcessor…
Browse files Browse the repository at this point in the history
…Config
  • Loading branch information
geomagilles committed Nov 2, 2024
1 parent a9cd673 commit c8a24ef
Show file tree
Hide file tree
Showing 21 changed files with 140 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ import io.infinitic.common.tasks.data.TaskId
import io.infinitic.common.tasks.executors.errors.MethodFailedError
import io.infinitic.common.tasks.tags.messages.CompleteDelegatedTask
import io.infinitic.common.transport.ClientTopic
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.interfaces.InfiniticProducer
import io.infinitic.common.transport.MainSubscription
import io.infinitic.common.transport.ServiceTagEngineTopic
import io.infinitic.common.transport.Topic
import io.infinitic.common.transport.WorkflowStateCmdTopic
import io.infinitic.common.transport.WorkflowTagEngineTopic
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.interfaces.InfiniticProducer
import io.infinitic.common.workflows.data.channels.SignalId
import io.infinitic.common.workflows.data.workflowMethods.WorkflowMethodId
import io.infinitic.common.workflows.data.workflows.WorkflowCancellationReason
Expand Down Expand Up @@ -806,7 +806,7 @@ internal class ClientDispatcher(
subscription = MainSubscription(ClientTopic),
entity = emitterName.toString(),
concurrency = 1,
process = { message, _ -> responseFlow.emit(message) },
processor = { message, _ -> responseFlow.emit(message) },
)
// asynchronously listen
launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io.infinitic.events.config

import io.infinitic.cloudEvents.CloudEventListener
import io.infinitic.cloudEvents.EntityListConfig
import io.infinitic.common.transport.config.LoadedBatchConfig
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.utils.annotatedName
import io.infinitic.common.utils.getInstance
import io.infinitic.config.loadFromYamlFile
Expand All @@ -35,7 +35,7 @@ sealed class EventListenerConfig {
abstract val listener: CloudEventListener
abstract val concurrency: Int
abstract val subscriptionName: String?
abstract val batchConfig: LoadedBatchConfig
abstract val batchConfig: BatchConfig
abstract val serviceListConfig: EntityListConfig
abstract val workflowListConfig: EntityListConfig

Expand Down Expand Up @@ -88,7 +88,7 @@ sealed class EventListenerConfig {
private val disallowedWorkflows: MutableList<String> = mutableListOf()
private var serviceListRefreshSeconds: Double = 60.0
private var workflowListRefreshSeconds: Double = 60.0
private var batchConfig = LoadedBatchConfig()
private var batchConfig = BatchConfig()

fun setListener(cloudEventListener: CloudEventListener) =
apply { this.listener = cloudEventListener }
Expand Down Expand Up @@ -138,7 +138,7 @@ sealed class EventListenerConfig {
apply { this.workflowListRefreshSeconds = listRefreshSeconds }

fun setBatch(maxEvents: Int, maxSeconds: Double) =
apply { this.batchConfig = LoadedBatchConfig(maxEvents, maxSeconds) }
apply { this.batchConfig = BatchConfig(maxEvents, maxSeconds) }

fun build(): EventListenerConfig {
require(listener != null) { "${EventListenerConfig::listener.name} must not be null" }
Expand Down Expand Up @@ -170,7 +170,7 @@ data class BuiltEventListenerConfig(
override val listener: CloudEventListener,
override val concurrency: Int,
override val subscriptionName: String?,
override val batchConfig: LoadedBatchConfig,
override val batchConfig: BatchConfig,
override val serviceListConfig: EntityListConfig,
override val workflowListConfig: EntityListConfig,
) : EventListenerConfig()
Expand All @@ -182,7 +182,7 @@ data class LoadedEventListenerConfig(
val `class`: String,
override val concurrency: Int = 1,
override val subscriptionName: String? = null,
val batch: LoadedBatchConfig = LoadedBatchConfig(),
val batch: BatchConfig = BatchConfig(),
val services: EntityListConfig = EntityListConfig(),
val workflows: EntityListConfig = EntityListConfig()
) : EventListenerConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package io.infinitic.events.listeners
import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.exceptions.thisShouldNotHappen
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.consumers.Result
import io.infinitic.common.transport.consumers.acknowledge
import io.infinitic.common.transport.consumers.batchBy
Expand All @@ -52,7 +52,7 @@ fun InfiniticConsumer.startCloudEventListener(
val outChannel = Channel<Result<TransportMessage<Message>, TransportMessage<Message>>>()

// all messages will have this batch config
val batchConfig = BatchConfig(
val batchProcessorConfig = BatchProcessorConfig(
batchKey = "cloudEvent", // same for all
maxMessages = config.batchConfig.maxMessages,
maxDuration = config.batchConfig.maxMillis,
Expand All @@ -62,7 +62,7 @@ fun InfiniticConsumer.startCloudEventListener(
launch {
outChannel
.process(config.concurrency) { _, message -> message.deserialize() }
.batchBy { batchConfig }
.batchBy { batchProcessorConfig }
.batchProcess(
config.concurrency,
{ _, _ -> thisShouldNotHappen() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import io.infinitic.common.data.MillisDuration
* Messages with the same [batchKey] will be batched together, it typically represents a task type.
* [maxMessages] and [maxDuration] are expected to be always the same for a given [batchKey]
*/
data class BatchConfig(
data class BatchProcessorConfig(
val batchKey: String,
val maxMessages: Int,
val maxDuration: MillisDuration,
) {
companion object {
val NONE = BatchConfig("", 1, MillisDuration(0))
val NONE = BatchProcessorConfig("", 1, MillisDuration(0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io.infinitic.common.transport.config

import io.infinitic.common.data.MillisDuration

data class LoadedBatchConfig(
data class BatchConfig(
val maxMessages: Int = 1000,
val maxSeconds: Double = 1.0
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package io.infinitic.common.transport.consumers

import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.cancel
Expand All @@ -40,14 +40,14 @@ import kotlinx.coroutines.withContext
* and the messages with the same batch key are grouped together.
*
* @param I The type of message.
* @param getBatchConfig A suspending function that returns the batch configuration for a given message.
* @param getBatchProcessorConfig A suspending function that returns the batch configuration for a given message.
* If null, messages are not batched.
* @return A channel that emits batched results wrapped in a [Result] containing
* either a [SingleMessage] or a [MultipleMessages] instance.
*/
context(CoroutineScope, KLogger)
fun <M : Any, I> Channel<Result<M, I>>.batchBy(
getBatchConfig: suspend (I) -> BatchConfig?,
getBatchProcessorConfig: suspend (I) -> BatchProcessorConfig?,
): Channel<OneOrMany<Result<M, I>>> {
val callingScope: CoroutineScope = this@CoroutineScope

Expand Down Expand Up @@ -83,7 +83,7 @@ fun <M : Any, I> Channel<Result<M, I>>.batchBy(
}

// Get or create a batch channel based on configuration
suspend fun getBatchingChannel(config: BatchConfig): Channel<Result<M, I>> {
suspend fun getBatchingChannel(config: BatchProcessorConfig): Channel<Result<M, I>> {
// check if the channel already exists before using a lock
batchingChannels[config.batchKey]?.let { return it }

Expand All @@ -108,7 +108,7 @@ fun <M : Any, I> Channel<Result<M, I>>.batchBy(
}
if (result.isSuccess) {
val batchConfig = try {
getBatchConfig(result.value())
getBatchProcessorConfig(result.value())
} catch (e: Exception) {
outputChannel.send(One(result.failure(e)))
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers

import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.data.MillisInstant
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.interfaces.TransportMessage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
Expand All @@ -35,10 +35,10 @@ fun <T : TransportMessage<M>, M : Any> Channel<Result<T, T>>.completeProcess(
deserialize: suspend (T) -> M,
process: suspend (M, MillisInstant) -> Unit,
beforeDlq: (suspend (M, Exception) -> Unit)? = null,
batchConfig: (suspend (M) -> BatchConfig?)? = null,
batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null,
batchProcess: (suspend (List<M>, List<MillisInstant>) -> Unit)? = null
) {
require((batchConfig == null) == (batchProcess == null)) {
require((batchProcessorConfig == null) == (batchProcess == null)) {
"batchConfig and batchProcess must be null or !null together"
}

Expand Down Expand Up @@ -72,7 +72,7 @@ fun <T : TransportMessage<M>, M : Any> Channel<Result<T, T>>.completeProcess(

false -> this
.process(concurrency) { _, message -> loggedDeserialize(message) }
.batchBy { datum -> batchConfig?.invoke(datum) }
.batchBy { datum -> batchProcessorConfig?.invoke(datum) }
.batchProcess(
concurrency,
{ message, datum -> loggedProcess(datum, message.publishTime) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers

import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.data.MillisInstant
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.interfaces.TransportConsumer
import io.infinitic.common.transport.interfaces.TransportMessage
import kotlinx.coroutines.CoroutineScope
Expand All @@ -37,7 +37,7 @@ import kotlinx.coroutines.launch
* @param concurrency The number of concurrent coroutines for processing messages.
* @param deserialize A suspending function to deserialize the transport message into its payload.
* @param process A suspending function to process the deserialized message along with its publishing time.
* @param batchConfig An optional suspending function to configure batching of messages.
* @param batchProcessorConfig An optional suspending function to configure batching of messages.
* @param batchProcess An optional suspending function to process a batch of messages.
* @return A Job representing the coroutine that runs the consuming process.
*/
Expand All @@ -47,7 +47,7 @@ fun <T : TransportMessage<M>, M : Any> TransportConsumer<T>.startAsync(
deserialize: suspend (T) -> M,
process: suspend (M, MillisInstant) -> Unit,
beforeDlq: (suspend (M, Exception) -> Unit)? = null,
batchConfig: (suspend (M) -> BatchConfig?)? = null,
batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null,
batchProcess: (suspend (List<M>, List<MillisInstant>) -> Unit)? = null,
): Job = launch {
startConsuming()
Expand All @@ -56,7 +56,7 @@ fun <T : TransportMessage<M>, M : Any> TransportConsumer<T>.startAsync(
deserialize,
process,
beforeDlq,
batchConfig,
batchProcessorConfig,
batchProcess,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package io.infinitic.common.transport.interfaces
import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.data.MillisInstant
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.Subscription
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -60,21 +60,21 @@ interface InfiniticConsumer {
* @param subscription The subscription from which to consume messages.
* @param entity The entity associated with the consumer.
* @param concurrency The number of concurrent coroutines for processing messages.
* @param process A suspending function to process the deserialized message along with its publishing time.
* @param processor A suspending function to process the deserialized message along with its publishing time.
* @param beforeDlq An optional suspending function to execute before sending a message to DLQ.
* @param batchConfig An optional suspending function to configure message batching.
* @param batchProcess An optional suspending function to process batches of messages.
* @param batchProcessorConfig An optional suspending function to configure message batching.
* @param batchProcessor An optional suspending function to process batches of messages.
* @return A Job representing the coroutine that runs the consuming process.
*/
context(CoroutineScope, KLogger)
suspend fun <S : Message> startAsync(
subscription: Subscription<S>,
entity: String,
concurrency: Int,
process: suspend (S, MillisInstant) -> Unit,
processor: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S, Exception) -> Unit)? = null,
batchConfig: (suspend (S) -> BatchConfig?)? = null,
batchProcess: (suspend (List<S>, List<MillisInstant>) -> Unit)? = null
batchProcessorConfig: (suspend (S) -> BatchProcessorConfig?)? = null,
batchProcessor: (suspend (List<S>, List<MillisInstant>) -> Unit)? = null
): Job

/**
Expand All @@ -86,8 +86,8 @@ interface InfiniticConsumer {
* @param concurrency The number of concurrent coroutines for processing messages.
* @param process A suspending function to process the deserialized message along with its publishing time.
* @param beforeDlq An optional suspending function to execute before sending a message to DLQ.
* @param batchConfig An optional suspending function to configure message batching.
* @param batchProcess An optional suspending function to process batches of messages.
* @param batchProcessorConfig An optional suspending function to configure message batching.
* @param batchProcessor An optional suspending function to process batches of messages.
*/
context(CoroutineScope, KLogger)
suspend fun <M : Message> start(
Expand All @@ -96,9 +96,9 @@ interface InfiniticConsumer {
concurrency: Int,
process: suspend (M, MillisInstant) -> Unit,
beforeDlq: (suspend (M, Exception) -> Unit)? = null,
batchConfig: (suspend (M) -> BatchConfig?)? = null,
batchProcess: (suspend (List<M>, List<MillisInstant>) -> Unit)? = null
batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null,
batchProcessor: (suspend (List<M>, List<MillisInstant>) -> Unit)? = null
) = startAsync(
subscription, entity, concurrency, process, beforeDlq, batchConfig, batchProcess,
subscription, entity, concurrency, process, beforeDlq, batchProcessorConfig, batchProcessor,
).join()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import io.infinitic.annotations.Retry
import io.infinitic.annotations.Timeout
import io.infinitic.common.data.MillisDuration
import io.infinitic.common.exceptions.thisShouldNotHappen
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.exceptions.tasks.NoMethodFoundWithParameterCountException
import io.infinitic.exceptions.tasks.NoMethodFoundWithParameterTypesException
import io.infinitic.exceptions.tasks.TooManyMethodsFoundWithParameterCountException
Expand Down Expand Up @@ -207,15 +207,15 @@ suspend fun Class<*>.initBatchMethods() {
}
}

fun Method.getBatchConfig(): BatchConfig? {
fun Method.getBatchConfig(): BatchProcessorConfig? {
// Retrieve the method annotated as batch, if it exists
val batchMethod = getBatchMethod() ?: return null

// Find the @Batch annotation on this method
val batchAnnotation = batchMethod.batch.findAnnotation(Batch::class.java) ?: thisShouldNotHappen()

// Create and return an instance of MessageBatchConfig from the annotation
return BatchConfig(
return BatchProcessorConfig(
batchKey = toUniqueString(),
maxMessages = batchAnnotation.maxMessages,
maxDuration = MillisDuration((batchAnnotation.maxSeconds * 1000).toLong()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers

import io.github.oshai.kotlinlogging.KotlinLogging
import io.infinitic.common.data.MillisDuration
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.kotest.assertions.throwables.shouldNotThrowAny
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
Expand All @@ -44,12 +44,12 @@ internal class BatchByTests : StringSpec(
val logger = KotlinLogging.logger {}
fun getScope() = CoroutineScope(Dispatchers.IO)

fun getBatchingConfig(value: IntMessage): BatchConfig? {
fun getBatchingConfig(value: IntMessage): BatchProcessorConfig? {
val i = value.value
return when {
i == 0 -> null
(i % 2) == 0 -> BatchConfig("even", 5, MillisDuration(1000 * 3600 * 50))
(i % 2) == 1 -> BatchConfig("odd", 5, MillisDuration(1000 * 3600 * 50))
(i % 2) == 0 -> BatchProcessorConfig("even", 5, MillisDuration(1000 * 3600 * 50))
(i % 2) == 1 -> BatchProcessorConfig("odd", 5, MillisDuration(1000 * 3600 * 50))
else -> throw IllegalStateException()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers

import io.infinitic.common.data.MillisDuration
import io.infinitic.common.data.MillisInstant
import io.infinitic.common.transport.BatchConfig
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.Topic
import io.infinitic.common.transport.interfaces.TransportConsumer
import io.infinitic.common.transport.interfaces.TransportMessage
Expand Down Expand Up @@ -95,12 +95,12 @@ internal fun processBatch(batch: List<DeserializedIntMessage>, publishTimes: Lis
processedList.addAll(batch.map { it.value.value })
}

internal fun batchConfig(deserialized: DeserializedIntMessage): BatchConfig? {
internal fun batchConfig(deserialized: DeserializedIntMessage): BatchProcessorConfig? {
val i = deserialized.value.value
return when {
(i % 3) == 0 -> null
(i % 3) == 1 -> BatchConfig("1", 20, MillisDuration(1000 * 3600 * 50))
(i % 3) == 2 -> BatchConfig("2", 20, MillisDuration(1000 * 3600 * 50))
(i % 3) == 1 -> BatchProcessorConfig("1", 20, MillisDuration(1000 * 3600 * 50))
(i % 3) == 2 -> BatchProcessorConfig("2", 20, MillisDuration(1000 * 3600 * 50))
else -> throw IllegalStateException()
}
}
Expand Down
Loading

0 comments on commit c8a24ef

Please sign in to comment.