diff --git a/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt b/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt index 162a0929f..f0ebe9c48 100644 --- a/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt +++ b/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt @@ -61,13 +61,13 @@ import io.infinitic.common.tasks.data.TaskId import io.infinitic.common.tasks.executors.errors.MethodFailedError import io.infinitic.common.tasks.tags.messages.CompleteDelegatedTask import io.infinitic.common.transport.ClientTopic -import io.infinitic.common.transport.interfaces.InfiniticConsumer -import io.infinitic.common.transport.interfaces.InfiniticProducer import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.ServiceTagEngineTopic import io.infinitic.common.transport.Topic import io.infinitic.common.transport.WorkflowStateCmdTopic import io.infinitic.common.transport.WorkflowTagEngineTopic +import io.infinitic.common.transport.interfaces.InfiniticConsumer +import io.infinitic.common.transport.interfaces.InfiniticProducer import io.infinitic.common.workflows.data.channels.SignalId import io.infinitic.common.workflows.data.workflowMethods.WorkflowMethodId import io.infinitic.common.workflows.data.workflows.WorkflowCancellationReason @@ -806,7 +806,7 @@ internal class ClientDispatcher( subscription = MainSubscription(ClientTopic), entity = emitterName.toString(), concurrency = 1, - process = { message, _ -> responseFlow.emit(message) }, + processor = { message, _ -> responseFlow.emit(message) }, ) // asynchronously listen launch { diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/config/EventListenerConfig.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/config/EventListenerConfig.kt index 525f7c16e..218e09539 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/config/EventListenerConfig.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/config/EventListenerConfig.kt @@ -24,7 +24,7 @@ package io.infinitic.events.config import io.infinitic.cloudEvents.CloudEventListener import io.infinitic.cloudEvents.EntityListConfig -import io.infinitic.common.transport.config.LoadedBatchConfig +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.utils.annotatedName import io.infinitic.common.utils.getInstance import io.infinitic.config.loadFromYamlFile @@ -35,7 +35,7 @@ sealed class EventListenerConfig { abstract val listener: CloudEventListener abstract val concurrency: Int abstract val subscriptionName: String? - abstract val batchConfig: LoadedBatchConfig + abstract val batchConfig: BatchConfig abstract val serviceListConfig: EntityListConfig abstract val workflowListConfig: EntityListConfig @@ -88,7 +88,7 @@ sealed class EventListenerConfig { private val disallowedWorkflows: MutableList = mutableListOf() private var serviceListRefreshSeconds: Double = 60.0 private var workflowListRefreshSeconds: Double = 60.0 - private var batchConfig = LoadedBatchConfig() + private var batchConfig = BatchConfig() fun setListener(cloudEventListener: CloudEventListener) = apply { this.listener = cloudEventListener } @@ -138,7 +138,7 @@ sealed class EventListenerConfig { apply { this.workflowListRefreshSeconds = listRefreshSeconds } fun setBatch(maxEvents: Int, maxSeconds: Double) = - apply { this.batchConfig = LoadedBatchConfig(maxEvents, maxSeconds) } + apply { this.batchConfig = BatchConfig(maxEvents, maxSeconds) } fun build(): EventListenerConfig { require(listener != null) { "${EventListenerConfig::listener.name} must not be null" } @@ -170,7 +170,7 @@ data class BuiltEventListenerConfig( override val listener: CloudEventListener, override val concurrency: Int, override val subscriptionName: String?, - override val batchConfig: LoadedBatchConfig, + override val batchConfig: BatchConfig, override val serviceListConfig: EntityListConfig, override val workflowListConfig: EntityListConfig, ) : EventListenerConfig() @@ -182,7 +182,7 @@ data class LoadedEventListenerConfig( val `class`: String, override val concurrency: Int = 1, override val subscriptionName: String? = null, - val batch: LoadedBatchConfig = LoadedBatchConfig(), + val batch: BatchConfig = BatchConfig(), val services: EntityListConfig = EntityListConfig(), val workflows: EntityListConfig = EntityListConfig() ) : EventListenerConfig() { diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt index 49653eaee..928c43ac2 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt @@ -25,7 +25,7 @@ package io.infinitic.events.listeners import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.exceptions.thisShouldNotHappen import io.infinitic.common.messages.Message -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.consumers.Result import io.infinitic.common.transport.consumers.acknowledge import io.infinitic.common.transport.consumers.batchBy @@ -52,7 +52,7 @@ fun InfiniticConsumer.startCloudEventListener( val outChannel = Channel, TransportMessage>>() // all messages will have this batch config - val batchConfig = BatchConfig( + val batchProcessorConfig = BatchProcessorConfig( batchKey = "cloudEvent", // same for all maxMessages = config.batchConfig.maxMessages, maxDuration = config.batchConfig.maxMillis, @@ -62,7 +62,7 @@ fun InfiniticConsumer.startCloudEventListener( launch { outChannel .process(config.concurrency) { _, message -> message.deserialize() } - .batchBy { batchConfig } + .batchBy { batchProcessorConfig } .batchProcess( config.concurrency, { _, _ -> thisShouldNotHappen() }, diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/BatchConfig.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/BatchProcessorConfig.kt similarity index 94% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/BatchConfig.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/BatchProcessorConfig.kt index c41ecafc6..8d4fec579 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/BatchConfig.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/BatchProcessorConfig.kt @@ -29,12 +29,12 @@ import io.infinitic.common.data.MillisDuration * Messages with the same [batchKey] will be batched together, it typically represents a task type. * [maxMessages] and [maxDuration] are expected to be always the same for a given [batchKey] */ -data class BatchConfig( +data class BatchProcessorConfig( val batchKey: String, val maxMessages: Int, val maxDuration: MillisDuration, ) { companion object { - val NONE = BatchConfig("", 1, MillisDuration(0)) + val NONE = BatchProcessorConfig("", 1, MillisDuration(0)) } } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/LoadedBatchConfig.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt similarity index 97% rename from infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/LoadedBatchConfig.kt rename to infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt index 742980666..64f3a2716 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/LoadedBatchConfig.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt @@ -24,7 +24,7 @@ package io.infinitic.common.transport.config import io.infinitic.common.data.MillisDuration -data class LoadedBatchConfig( +data class BatchConfig( val maxMessages: Int = 1000, val maxSeconds: Double = 1.0 ) { diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt index f40d8b12d..73b270883 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/batchBy.kt @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.cancel @@ -40,14 +40,14 @@ import kotlinx.coroutines.withContext * and the messages with the same batch key are grouped together. * * @param I The type of message. - * @param getBatchConfig A suspending function that returns the batch configuration for a given message. + * @param getBatchProcessorConfig A suspending function that returns the batch configuration for a given message. * If null, messages are not batched. * @return A channel that emits batched results wrapped in a [Result] containing * either a [SingleMessage] or a [MultipleMessages] instance. */ context(CoroutineScope, KLogger) fun Channel>.batchBy( - getBatchConfig: suspend (I) -> BatchConfig?, + getBatchProcessorConfig: suspend (I) -> BatchProcessorConfig?, ): Channel>> { val callingScope: CoroutineScope = this@CoroutineScope @@ -83,7 +83,7 @@ fun Channel>.batchBy( } // Get or create a batch channel based on configuration - suspend fun getBatchingChannel(config: BatchConfig): Channel> { + suspend fun getBatchingChannel(config: BatchProcessorConfig): Channel> { // check if the channel already exists before using a lock batchingChannels[config.batchKey]?.let { return it } @@ -108,7 +108,7 @@ fun Channel>.batchBy( } if (result.isSuccess) { val batchConfig = try { - getBatchConfig(result.value()) + getBatchProcessorConfig(result.value()) } catch (e: Exception) { outputChannel.send(One(result.failure(e))) continue diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/completeProcess.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/completeProcess.kt index ab813c607..b442182da 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/completeProcess.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/completeProcess.kt @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.interfaces.TransportMessage import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel @@ -35,10 +35,10 @@ fun , M : Any> Channel>.completeProcess( deserialize: suspend (T) -> M, process: suspend (M, MillisInstant) -> Unit, beforeDlq: (suspend (M, Exception) -> Unit)? = null, - batchConfig: (suspend (M) -> BatchConfig?)? = null, + batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null, batchProcess: (suspend (List, List) -> Unit)? = null ) { - require((batchConfig == null) == (batchProcess == null)) { + require((batchProcessorConfig == null) == (batchProcess == null)) { "batchConfig and batchProcess must be null or !null together" } @@ -72,7 +72,7 @@ fun , M : Any> Channel>.completeProcess( false -> this .process(concurrency) { _, message -> loggedDeserialize(message) } - .batchBy { datum -> batchConfig?.invoke(datum) } + .batchBy { datum -> batchProcessorConfig?.invoke(datum) } .batchProcess( concurrency, { message, datum -> loggedProcess(datum, message.publishTime) }, diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt index 94a614ba3..75a2c0fea 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.interfaces.TransportConsumer import io.infinitic.common.transport.interfaces.TransportMessage import kotlinx.coroutines.CoroutineScope @@ -37,7 +37,7 @@ import kotlinx.coroutines.launch * @param concurrency The number of concurrent coroutines for processing messages. * @param deserialize A suspending function to deserialize the transport message into its payload. * @param process A suspending function to process the deserialized message along with its publishing time. - * @param batchConfig An optional suspending function to configure batching of messages. + * @param batchProcessorConfig An optional suspending function to configure batching of messages. * @param batchProcess An optional suspending function to process a batch of messages. * @return A Job representing the coroutine that runs the consuming process. */ @@ -47,7 +47,7 @@ fun , M : Any> TransportConsumer.startAsync( deserialize: suspend (T) -> M, process: suspend (M, MillisInstant) -> Unit, beforeDlq: (suspend (M, Exception) -> Unit)? = null, - batchConfig: (suspend (M) -> BatchConfig?)? = null, + batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null, batchProcess: (suspend (List, List) -> Unit)? = null, ): Job = launch { startConsuming() @@ -56,7 +56,7 @@ fun , M : Any> TransportConsumer.startAsync( deserialize, process, beforeDlq, - batchConfig, + batchProcessorConfig, batchProcess, ) } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt index 3a12feca1..31087c0da 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt @@ -25,7 +25,7 @@ package io.infinitic.common.transport.interfaces import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.data.MillisInstant import io.infinitic.common.messages.Message -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.Subscription import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -60,10 +60,10 @@ interface InfiniticConsumer { * @param subscription The subscription from which to consume messages. * @param entity The entity associated with the consumer. * @param concurrency The number of concurrent coroutines for processing messages. - * @param process A suspending function to process the deserialized message along with its publishing time. + * @param processor A suspending function to process the deserialized message along with its publishing time. * @param beforeDlq An optional suspending function to execute before sending a message to DLQ. - * @param batchConfig An optional suspending function to configure message batching. - * @param batchProcess An optional suspending function to process batches of messages. + * @param batchProcessorConfig An optional suspending function to configure message batching. + * @param batchProcessor An optional suspending function to process batches of messages. * @return A Job representing the coroutine that runs the consuming process. */ context(CoroutineScope, KLogger) @@ -71,10 +71,10 @@ interface InfiniticConsumer { subscription: Subscription, entity: String, concurrency: Int, - process: suspend (S, MillisInstant) -> Unit, + processor: suspend (S, MillisInstant) -> Unit, beforeDlq: (suspend (S, Exception) -> Unit)? = null, - batchConfig: (suspend (S) -> BatchConfig?)? = null, - batchProcess: (suspend (List, List) -> Unit)? = null + batchProcessorConfig: (suspend (S) -> BatchProcessorConfig?)? = null, + batchProcessor: (suspend (List, List) -> Unit)? = null ): Job /** @@ -86,8 +86,8 @@ interface InfiniticConsumer { * @param concurrency The number of concurrent coroutines for processing messages. * @param process A suspending function to process the deserialized message along with its publishing time. * @param beforeDlq An optional suspending function to execute before sending a message to DLQ. - * @param batchConfig An optional suspending function to configure message batching. - * @param batchProcess An optional suspending function to process batches of messages. + * @param batchProcessorConfig An optional suspending function to configure message batching. + * @param batchProcessor An optional suspending function to process batches of messages. */ context(CoroutineScope, KLogger) suspend fun start( @@ -96,9 +96,9 @@ interface InfiniticConsumer { concurrency: Int, process: suspend (M, MillisInstant) -> Unit, beforeDlq: (suspend (M, Exception) -> Unit)? = null, - batchConfig: (suspend (M) -> BatchConfig?)? = null, - batchProcess: (suspend (List, List) -> Unit)? = null + batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null, + batchProcessor: (suspend (List, List) -> Unit)? = null ) = startAsync( - subscription, entity, concurrency, process, beforeDlq, batchConfig, batchProcess, + subscription, entity, concurrency, process, beforeDlq, batchProcessorConfig, batchProcessor, ).join() } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt index 1eee81106..efb56fb2c 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt @@ -31,7 +31,7 @@ import io.infinitic.annotations.Retry import io.infinitic.annotations.Timeout import io.infinitic.common.data.MillisDuration import io.infinitic.common.exceptions.thisShouldNotHappen -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.exceptions.tasks.NoMethodFoundWithParameterCountException import io.infinitic.exceptions.tasks.NoMethodFoundWithParameterTypesException import io.infinitic.exceptions.tasks.TooManyMethodsFoundWithParameterCountException @@ -207,7 +207,7 @@ suspend fun Class<*>.initBatchMethods() { } } -fun Method.getBatchConfig(): BatchConfig? { +fun Method.getBatchConfig(): BatchProcessorConfig? { // Retrieve the method annotated as batch, if it exists val batchMethod = getBatchMethod() ?: return null @@ -215,7 +215,7 @@ fun Method.getBatchConfig(): BatchConfig? { val batchAnnotation = batchMethod.batch.findAnnotation(Batch::class.java) ?: thisShouldNotHappen() // Create and return an instance of MessageBatchConfig from the annotation - return BatchConfig( + return BatchProcessorConfig( batchKey = toUniqueString(), maxMessages = batchAnnotation.maxMessages, maxDuration = MillisDuration((batchAnnotation.maxSeconds * 1000).toLong()), diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt index 067ca3b1f..0521dedb5 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.common.data.MillisDuration -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec @@ -44,12 +44,12 @@ internal class BatchByTests : StringSpec( val logger = KotlinLogging.logger {} fun getScope() = CoroutineScope(Dispatchers.IO) - fun getBatchingConfig(value: IntMessage): BatchConfig? { + fun getBatchingConfig(value: IntMessage): BatchProcessorConfig? { val i = value.value return when { i == 0 -> null - (i % 2) == 0 -> BatchConfig("even", 5, MillisDuration(1000 * 3600 * 50)) - (i % 2) == 1 -> BatchConfig("odd", 5, MillisDuration(1000 * 3600 * 50)) + (i % 2) == 0 -> BatchProcessorConfig("even", 5, MillisDuration(1000 * 3600 * 50)) + (i % 2) == 1 -> BatchProcessorConfig("odd", 5, MillisDuration(1000 * 3600 * 50)) else -> throw IllegalStateException() } } diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt index a5c875eb5..4ce88e35c 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt @@ -24,7 +24,7 @@ package io.infinitic.common.transport.consumers import io.infinitic.common.data.MillisDuration import io.infinitic.common.data.MillisInstant -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.Topic import io.infinitic.common.transport.interfaces.TransportConsumer import io.infinitic.common.transport.interfaces.TransportMessage @@ -95,12 +95,12 @@ internal fun processBatch(batch: List, publishTimes: Lis processedList.addAll(batch.map { it.value.value }) } -internal fun batchConfig(deserialized: DeserializedIntMessage): BatchConfig? { +internal fun batchConfig(deserialized: DeserializedIntMessage): BatchProcessorConfig? { val i = deserialized.value.value return when { (i % 3) == 0 -> null - (i % 3) == 1 -> BatchConfig("1", 20, MillisDuration(1000 * 3600 * 50)) - (i % 3) == 2 -> BatchConfig("2", 20, MillisDuration(1000 * 3600 * 50)) + (i % 3) == 1 -> BatchProcessorConfig("1", 20, MillisDuration(1000 * 3600 * 50)) + (i % 3) == 2 -> BatchProcessorConfig("2", 20, MillisDuration(1000 * 3600 * 50)) else -> throw IllegalStateException() } } diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt index 18ed4e872..06b92c7f3 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt @@ -25,7 +25,7 @@ package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.common.data.MillisInstant import io.infinitic.common.fixtures.later -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.kotest.core.spec.style.StringSpec import io.kotest.matchers.collections.shouldContainAll import io.kotest.matchers.collections.shouldNotContain @@ -125,7 +125,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( "An Error during getBatchingConfig triggers quitting, but does not prevent finishing current processing" { with(logger) { - fun batchConfigWithError(deserialized: DeserializedIntMessage): BatchConfig? = + fun batchConfigWithError(deserialized: DeserializedIntMessage): BatchProcessorConfig? = if (deserialized.value.value == 10) throw Error("Expected Error") else batchConfig(deserialized) @@ -232,7 +232,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( "An Exception during getBatchingConfig triggers quitting, but does not prevent finishing current processing" { with(logger) { - fun batchConfigWithException(deserialized: DeserializedIntMessage): BatchConfig? = + fun batchConfigWithException(deserialized: DeserializedIntMessage): BatchProcessorConfig? = when (deserialized.value.value) { 10 -> throw Exception("Expected Exception") 20 -> throw Error("Expected Error") diff --git a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt index d24af4c52..125f552e1 100644 --- a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt +++ b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt @@ -45,7 +45,7 @@ import io.infinitic.common.tasks.events.messages.TaskRetriedEvent import io.infinitic.common.tasks.events.messages.TaskStartedEvent import io.infinitic.common.tasks.executors.messages.ExecuteTask import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.ServiceExecutorEventTopic import io.infinitic.common.transport.ServiceExecutorRetryTopic import io.infinitic.common.transport.interfaces.InfiniticProducer @@ -120,12 +120,12 @@ class TaskExecutor( } context(KLogger) - fun getBatchConfig(msg: ServiceExecutorMessage): BatchConfig? = when (msg) { + fun getBatchConfig(msg: ServiceExecutorMessage): BatchProcessorConfig? = when (msg) { is ExecuteTask -> msg.getBatchConfig() } context(KLogger) - private fun ExecuteTask.getBatchConfig(): BatchConfig? = when (isWorkflowTask()) { + private fun ExecuteTask.getBatchConfig(): BatchProcessorConfig? = when (isWorkflowTask()) { true -> thisShouldNotHappen() false -> { val (instance, method) = getInstanceAndMethod() diff --git a/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt b/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt index 99055896f..ee619cae5 100644 --- a/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt +++ b/infinitic-transport-inMemory/src/main/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumer.kt @@ -25,7 +25,7 @@ package io.infinitic.inMemory import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.data.MillisInstant import io.infinitic.common.messages.Message -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.EventListenerSubscription import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.Subscription @@ -73,10 +73,10 @@ class InMemoryInfiniticConsumer( subscription: Subscription, entity: String, concurrency: Int, - process: suspend (S, MillisInstant) -> Unit, + processor: suspend (S, MillisInstant) -> Unit, beforeDlq: (suspend (S, Exception) -> Unit)?, - batchConfig: (suspend (S) -> BatchConfig?)?, - batchProcess: (suspend (List, List) -> Unit)? + batchProcessorConfig: (suspend (S) -> BatchProcessorConfig?)?, + batchProcessor: (suspend (List, List) -> Unit)? ): Job { val deserialize = { message: TransportMessage -> message.deserialize() } @@ -90,10 +90,10 @@ class InMemoryInfiniticConsumer( consumers[index].startAsync( 1, deserialize, - process, + processor, beforeDlq, - batchConfig, - batchProcess, + batchProcessorConfig, + batchProcessor, ) } } @@ -105,10 +105,10 @@ class InMemoryInfiniticConsumer( consumer.startAsync( concurrency, deserialize, - process, + processor, beforeDlq, - batchConfig, - batchProcess, + batchProcessorConfig, + batchProcessor, ) } } diff --git a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt index c46580d83..7722c4290 100644 --- a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt +++ b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt @@ -26,7 +26,7 @@ import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.data.MillisInstant import io.infinitic.common.messages.Envelope import io.infinitic.common.messages.Message -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.EventListenerSubscription import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.Subscription @@ -109,10 +109,10 @@ class PulsarInfiniticConsumer( subscription: Subscription, entity: String, concurrency: Int, - process: suspend (S, MillisInstant) -> Unit, + processor: suspend (S, MillisInstant) -> Unit, beforeDlq: (suspend (S, Exception) -> Unit)?, - batchConfig: (suspend (S) -> BatchConfig?)?, - batchProcess: (suspend (List, List) -> Unit)? + batchProcessorConfig: (suspend (S) -> BatchProcessorConfig?)?, + batchProcessor: (suspend (List, List) -> Unit)? ): Job { val deserialize = { message: TransportMessage -> message.deserialize() } @@ -126,10 +126,10 @@ class PulsarInfiniticConsumer( consumers[index].startAsync( concurrency = 1, deserialize, - process, + processor, beforeDlq, - batchConfig, - batchProcess, + batchProcessorConfig, + batchProcessor, ) } } @@ -141,10 +141,10 @@ class PulsarInfiniticConsumer( consumer.startAsync( concurrency, deserialize, - process, + processor, beforeDlq, - batchConfig, - batchProcess, + batchProcessorConfig, + batchProcessor, ) } } diff --git a/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt b/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt index 9932f3cc4..41515788d 100644 --- a/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt +++ b/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt @@ -31,7 +31,7 @@ import io.infinitic.common.tasks.events.messages.ServiceExecutorEventMessage import io.infinitic.common.tasks.executors.messages.ExecuteTask import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage import io.infinitic.common.tasks.tags.messages.ServiceTagMessage -import io.infinitic.common.transport.BatchConfig +import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.ServiceExecutorEventTopic import io.infinitic.common.transport.ServiceExecutorRetryTopic @@ -385,7 +385,7 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (ServiceTagMessage, MillisInstant) -> Unit = + val processor: suspend (ServiceTagMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) taskTagEngine.handle(message, publishedAt) @@ -395,7 +395,7 @@ class InfiniticWorker( subscription = MainSubscription(ServiceTagEngineTopic), entity = config.serviceName, concurrency = config.concurrency, - process = process, + processor = processor, ) } } @@ -420,13 +420,13 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (ServiceExecutorMessage, MillisInstant) -> Unit = + val processor: suspend (ServiceExecutorMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) taskExecutor.process(message) } - val batchProcess: suspend (List, List) -> Unit = + val batchProcessor: suspend (List, List) -> Unit = { messages, publishedAtList -> coroutineScope { messages.zip(publishedAtList).forEach { (message, publishedAt) -> @@ -448,10 +448,10 @@ class InfiniticWorker( subscription = MainSubscription(ServiceExecutorTopic), entity = config.serviceName, concurrency = config.concurrency, - process = process, + processor = processor, beforeDlq = beforeDlq, - batchConfig = { msg -> taskExecutor.getBatchConfig(msg) }, - batchProcess = batchProcess, + batchProcessorConfig = { msg -> taskExecutor.getBatchConfig(msg) }, + batchProcessor = batchProcessor, ) } @@ -464,7 +464,7 @@ class InfiniticWorker( subscription = MainSubscription(ServiceExecutorRetryTopic), entity = config.serviceName, concurrency = config.concurrency, - process = taskRetryHandler::handle, + processor = taskRetryHandler::handle, ) } @@ -480,7 +480,7 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (ServiceExecutorEventMessage, MillisInstant) -> Unit = + val processor: suspend (ServiceExecutorEventMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) taskEventHandler.handle(message, publishedAt) @@ -490,7 +490,7 @@ class InfiniticWorker( subscription = MainSubscription(ServiceExecutorEventTopic), entity = config.serviceName, concurrency = config.concurrency, - process = process, + processor = processor, ) } @@ -515,7 +515,7 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (WorkflowTagEngineMessage, MillisInstant) -> Unit = + val processor: suspend (WorkflowTagEngineMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) workflowTagEngine.handle(message, publishedAt) @@ -525,7 +525,7 @@ class InfiniticWorker( subscription = MainSubscription(WorkflowTagEngineTopic), entity = config.workflowName, concurrency = config.concurrency, - process = process, + processor = processor, ) } } @@ -547,13 +547,13 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (WorkflowStateEngineMessage, MillisInstant) -> Unit = + val processor: suspend (WorkflowStateEngineMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) workflowStateCmdHandler.process(message, publishedAt) } - val batchProcess: suspend (List, List) -> Unit = + val batchProcessor: suspend (List, List) -> Unit = { messages, publishedAtList -> coroutineScope { messages.zip(publishedAtList).forEach { (message, publishedAt) -> @@ -563,17 +563,17 @@ class InfiniticWorker( } } - val batchConfig = config.batch?.let { - BatchConfig("workflowCmd:" + config.workflowName, it.maxMessages, it.maxMillis) + val batchProcessorConfig = config.batch?.let { + BatchProcessorConfig("workflowCmd:" + config.workflowName, it.maxMessages, it.maxMillis) } consumer.startAsync( subscription = MainSubscription(WorkflowStateCmdTopic), entity = config.workflowName, concurrency = config.concurrency, - process = process, - batchConfig = { _ -> batchConfig }, - batchProcess = batchProcess, + processor = processor, + batchProcessorConfig = { _ -> batchProcessorConfig }, + batchProcessor = batchProcessor, ) } @@ -590,13 +590,13 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (WorkflowStateEngineMessage, MillisInstant) -> Unit = + val processor: suspend (WorkflowStateEngineMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) workflowStateEngine.process(message, publishedAt) } - val batchProcess: suspend (List, List) -> Unit = + val batchProcessor: suspend (List, List) -> Unit = { messages, publishedAtList -> coroutineScope { messages.zip(publishedAtList).forEach { (message, publishedAt) -> @@ -606,17 +606,17 @@ class InfiniticWorker( } } - val batchConfig = config.batch?.let { - BatchConfig("workflowState:" + config.workflowName, it.maxMessages, it.maxMillis) + val batchProcessorConfig = config.batch?.let { + BatchProcessorConfig("workflowState:" + config.workflowName, it.maxMessages, it.maxMillis) } consumer.startAsync( subscription = MainSubscription(WorkflowStateEngineTopic), entity = config.workflowName, concurrency = config.concurrency, - process = process, - batchConfig = { _ -> batchConfig }, - batchProcess = batchProcess, + processor = processor, + batchProcessorConfig = { _ -> batchProcessorConfig }, + batchProcessor = batchProcessor, ) } @@ -629,7 +629,7 @@ class InfiniticWorker( subscription = MainSubscription(WorkflowStateTimerTopic), entity = config.workflowName, concurrency = config.concurrency, - process = workflowStateTimerHandler::process, + processor = workflowStateTimerHandler::process, ) } @@ -645,13 +645,13 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (WorkflowStateEventMessage, MillisInstant) -> Unit = + val processor: suspend (WorkflowStateEventMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) workflowStateEventHandler.process(message, publishedAt) } - val batchProcess: suspend (List, List) -> Unit = + val batchProcessor: suspend (List, List) -> Unit = { messages, publishedAtList -> coroutineScope { messages.zip(publishedAtList).forEach { (message, publishedAt) -> @@ -661,17 +661,17 @@ class InfiniticWorker( } } - val batchConfig = config.batch?.let { - BatchConfig("workflowEvent:" + config.workflowName, it.maxMessages, it.maxMillis) + val batchProcessorConfig = config.batch?.let { + BatchProcessorConfig("workflowEvent:" + config.workflowName, it.maxMessages, it.maxMillis) } consumer.startAsync( subscription = MainSubscription(WorkflowStateEventTopic), entity = config.workflowName, concurrency = config.concurrency, - process = process, - batchConfig = { _ -> batchConfig }, - batchProcess = batchProcess, + processor = processor, + batchProcessorConfig = { _ -> batchProcessorConfig }, + batchProcessor = batchProcessor, ) } @@ -695,7 +695,7 @@ class InfiniticWorker( beautifyLogs, ) - val process: suspend (ServiceExecutorMessage, MillisInstant) -> Unit = + val processor: suspend (ServiceExecutorMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) workflowTaskExecutor.process(message) @@ -710,11 +710,15 @@ class InfiniticWorker( } } - val batchConfig = config.batchConfig?.let { - BatchConfig("workflowExecutor:" + config.workflowName, it.maxMessages, it.maxMillis) + val batchProcessorConfig = config.batchConfig?.let { + BatchProcessorConfig( + "workflowExecutor:" + config.workflowName, + it.maxMessages, + it.maxMillis, + ) } - val batchProcess: suspend (List, List) -> Unit = + val batchProcessor: suspend (List, List) -> Unit = { messages, publishedAtList -> coroutineScope { messages.zip(publishedAtList).forEach { (message, publishedAt) -> @@ -728,10 +732,10 @@ class InfiniticWorker( subscription = MainSubscription(WorkflowExecutorTopic), entity = config.workflowName, concurrency = config.concurrency, - process = process, + processor = processor, beforeDlq = beforeDlq, - batchConfig = { _ -> batchConfig }, - batchProcess = batchProcess, + batchProcessorConfig = { _ -> batchProcessorConfig }, + batchProcessor = batchProcessor, ) } @@ -744,7 +748,7 @@ class InfiniticWorker( subscription = MainSubscription(WorkflowExecutorRetryTopic), entity = config.workflowName, concurrency = config.concurrency, - process = taskRetryHandler::handle, + processor = taskRetryHandler::handle, ) } @@ -760,7 +764,7 @@ class InfiniticWorker( beautifyLogs, ) - val handler: suspend (ServiceExecutorEventMessage, MillisInstant) -> Unit = + val processor: suspend (ServiceExecutorEventMessage, MillisInstant) -> Unit = { message, publishedAt -> cloudEventLogger.log(message, publishedAt) workflowTaskEventHandler.handle(message, publishedAt) @@ -770,7 +774,7 @@ class InfiniticWorker( subscription = MainSubscription(WorkflowExecutorEventTopic), entity = config.workflowName, concurrency = config.concurrency, - process = handler, + processor = processor, ) } diff --git a/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowExecutorConfig.kt b/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowExecutorConfig.kt index 5ad2554c1..706a35583 100644 --- a/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowExecutorConfig.kt +++ b/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowExecutorConfig.kt @@ -22,7 +22,7 @@ */ package io.infinitic.workers.config -import io.infinitic.common.transport.config.LoadedBatchConfig +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.utils.getInstance import io.infinitic.common.workers.config.RetryPolicy import io.infinitic.common.workers.config.UNSET_RETRY_POLICY @@ -48,7 +48,7 @@ sealed class WorkflowExecutorConfig { abstract val withRetry: WithRetry? abstract val withTimeout: WithTimeout? abstract val checkMode: WorkflowCheckMode? - abstract val batchConfig: LoadedBatchConfig? + abstract val batchConfig: BatchConfig? companion object { @JvmStatic @@ -86,7 +86,7 @@ sealed class WorkflowExecutorConfig { private var timeoutSeconds: Double? = UNSET_TIMEOUT private var withRetry: WithRetry? = WithRetry.UNSET private var checkMode: WorkflowCheckMode? = null - private var batchConfig: LoadedBatchConfig? = null + private var batchConfig: BatchConfig? = null fun setWorkflowName(workflowName: String) = apply { this.workflowName = workflowName } @@ -107,7 +107,7 @@ sealed class WorkflowExecutorConfig { apply { this.checkMode = checkMode } fun setBatch(maxMessages: Int, maxSeconds: Double) = - apply { this.batchConfig = LoadedBatchConfig(maxMessages, maxSeconds) } + apply { this.batchConfig = BatchConfig(maxMessages, maxSeconds) } fun build(): WorkflowExecutorConfig { workflowName.checkWorkflowName() @@ -145,7 +145,7 @@ data class BuiltWorkflowExecutorConfig( override var withTimeout: WithTimeout?, override var withRetry: WithRetry?, override var checkMode: WorkflowCheckMode?, - override val batchConfig: LoadedBatchConfig? + override val batchConfig: BatchConfig? ) : WorkflowExecutorConfig() /** @@ -159,7 +159,7 @@ data class LoadedWorkflowExecutorConfig( val timeoutSeconds: Double? = UNSET_TIMEOUT, var retry: RetryPolicy? = UNSET_RETRY_POLICY, override var checkMode: WorkflowCheckMode? = null, - val batch: LoadedBatchConfig? = null, + val batch: BatchConfig? = null, ) : WorkflowExecutorConfig(), WithMutableWorkflowName { private val allInstances = mutableListOf() @@ -171,7 +171,7 @@ data class LoadedWorkflowExecutorConfig( allInstances.map { { it::class.java.getInstance().getOrThrow() } } } - override val batchConfig: LoadedBatchConfig? = batch + override val batchConfig: BatchConfig? = batch init { // Needed if the workflow context is referenced within the properties of the workflow diff --git a/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowStateEngineConfig.kt b/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowStateEngineConfig.kt index 46807338e..e5825da4f 100644 --- a/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowStateEngineConfig.kt +++ b/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/WorkflowStateEngineConfig.kt @@ -23,7 +23,7 @@ package io.infinitic.workers.config import io.infinitic.common.exceptions.thisShouldNotHappen -import io.infinitic.common.transport.config.LoadedBatchConfig +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.config.loadFromYamlFile import io.infinitic.config.loadFromYamlResource import io.infinitic.config.loadFromYamlString @@ -34,7 +34,7 @@ data class WorkflowStateEngineConfig( override var workflowName: String = "", val concurrency: Int = 1, override var storage: StorageConfig? = null, - val batch: LoadedBatchConfig? = null + val batch: BatchConfig? = null ) : WithMutableWorkflowName, WithMutableStorage { init { require(concurrency >= 0) { "concurrency must be positive" } @@ -90,7 +90,7 @@ data class WorkflowStateEngineConfig( apply { this.storage = storage } fun setBatch(maxMessages: Int, maxSeconds: Double) = - apply { this.batch = LoadedBatchConfig(maxMessages, maxSeconds) } + apply { this.batch = BatchConfig(maxMessages, maxSeconds) } fun build(): WorkflowStateEngineConfig { workflowName.checkWorkflowName() diff --git a/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowConfigTests.kt b/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowConfigTests.kt index da888c53a..312b122f4 100644 --- a/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowConfigTests.kt +++ b/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowConfigTests.kt @@ -23,7 +23,7 @@ package io.infinitic.workers.config import com.sksamuel.hoplite.ConfigException -import io.infinitic.common.transport.config.LoadedBatchConfig +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.workers.samples.WorkflowA import io.infinitic.workers.samples.WorkflowAImpl import io.infinitic.workers.samples.WorkflowAImpl_1 @@ -56,7 +56,7 @@ executor: config.name shouldBe workflowName config.executor.shouldBeInstanceOf() - config.executor!!.batchConfig shouldBe LoadedBatchConfig(100, 0.5) + config.executor!!.batchConfig shouldBe BatchConfig(100, 0.5) config.tagEngine shouldBe null config.stateEngine shouldBe null } diff --git a/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowExecutorConfigTests.kt b/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowExecutorConfigTests.kt index 3eb642cb4..534c9a1f6 100644 --- a/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowExecutorConfigTests.kt +++ b/infinitic-worker/src/test/kotlin/io/infinitic/workers/config/WorkflowExecutorConfigTests.kt @@ -23,7 +23,7 @@ package io.infinitic.workers.config import com.sksamuel.hoplite.ConfigException -import io.infinitic.common.transport.config.LoadedBatchConfig +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.workers.config.WithExponentialBackoffRetry import io.infinitic.common.workflows.emptyWorkflowContext import io.infinitic.tasks.WithRetry @@ -152,7 +152,7 @@ retry: config.withTimeout?.getTimeoutSeconds() shouldBe 3.0 config.withRetry shouldBe withRetry config.checkMode shouldBe WorkflowCheckMode.strict - config.batchConfig shouldBe LoadedBatchConfig(100, 0.5) + config.batchConfig shouldBe BatchConfig(100, 0.5) } "class is mandatory when building WorkflowExecutorConfig from YAML" {