From e55678832d6793d681e9adfa1da7ab9532d06a87 Mon Sep 17 00:00:00 2001 From: Gilles Barbier Date: Sat, 2 Nov 2024 19:21:45 +0100 Subject: [PATCH] WIP: implementing batch receiving --- .../infinitic/clients/InfiniticClientTests.kt | 5 ++- .../listenToServiceExecutorTopics.kt | 18 +++++----- .../listenToWorkflowExecutorTopics.kt | 18 +++++----- .../listeners/listenToWorkflowStateTopics.kt | 18 +++++----- .../listeners/startCloudEventListener.kt | 24 ++++++++++--- .../common/transport/Subscriptions.kt | 4 --- .../io/infinitic/common/transport/Topic.kt | 12 ------- .../common/transport/config/BatchConfig.kt | 4 +-- .../common/transport/consumers/startAsync.kt | 16 +++++---- .../transport/consumers/startBatching.kt | 10 +++--- .../transport/consumers/startConsuming.kt | 18 ++++++++-- .../transport/interfaces/InfiniticConsumer.kt | 16 +++++++-- .../transport/interfaces/InfiniticProducer.kt | 4 --- .../transport/interfaces/TransportConsumer.kt | 7 ++++ .../io/infinitic/common/utils/ClassUtil.kt | 2 +- .../transport/consumers/BatchByTests.kt | 5 +-- .../transport/consumers/StartBatchingTests.kt | 4 +-- .../consumers/StartConsumingTests.kt | 6 ++-- .../common/transport/consumers/fakes.kt | 2 ++ .../transport/consumers/startAsyncTests.kt | 10 +++--- .../consumers/startAsyncWithBatchTests.kt | 10 +++++- .../InMemoryInfiniticConsumerTests.kt | 1 + .../pulsar/PulsarInfiniticConsumer.kt | 16 +++++++-- .../pulsar/client/InfiniticPulsarClient.kt | 13 +++++++ .../consumers/PulsarTransportConsumer.kt | 6 ++++ .../client/PulsarInfiniticClientTests.kt | 3 ++ .../pulsar/consumers/ConsumerTests.kt | 8 ++--- .../io/infinitic/workers/InfiniticWorker.kt | 35 +++++++++++++++---- .../workers/config/ServiceExecutorConfig.kt | 22 +++++++++--- 29 files changed, 217 insertions(+), 100 deletions(-) diff --git a/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt b/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt index 56a967b4e..a279d1188 100644 --- a/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt +++ b/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt @@ -163,7 +163,7 @@ internal val mockedConsumer = mockk { coEvery { with(capture(scopeSlot)) { with(capture(loggerSlot)) { - startAsync(any>(), "$clientNameTest", 1, any(), any()) + startAsync(any>(), "$clientNameTest", null, 1, any(), any()) } } } answers { @@ -236,6 +236,7 @@ internal class InfiniticClientTests : StringSpec( mockedConsumer.startAsync( MainSubscription(ClientTopic), "$clientNameTest", + null, 1, any(), any(), @@ -525,6 +526,7 @@ internal class InfiniticClientTests : StringSpec( mockedConsumer.startAsync( MainSubscription(ClientTopic), "$clientNameTest", + null, 1, any(), any(), @@ -543,6 +545,7 @@ internal class InfiniticClientTests : StringSpec( mockedConsumer.startAsync( MainSubscription(ClientTopic), "$clientNameTest", + null, 1, any(), any(), diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToServiceExecutorTopics.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToServiceExecutorTopics.kt index eb7aa03cd..a98824d26 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToServiceExecutorTopics.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToServiceExecutorTopics.kt @@ -25,15 +25,16 @@ package io.infinitic.events.listeners import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.messages.Message import io.infinitic.common.tasks.data.ServiceName -import io.infinitic.common.transport.interfaces.InfiniticConsumer import io.infinitic.common.transport.ServiceExecutorEventTopic import io.infinitic.common.transport.ServiceExecutorRetryTopic import io.infinitic.common.transport.ServiceExecutorTopic import io.infinitic.common.transport.SubscriptionType -import io.infinitic.common.transport.interfaces.TransportMessage +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.transport.consumers.Result import io.infinitic.common.transport.consumers.startConsuming import io.infinitic.common.transport.create +import io.infinitic.common.transport.interfaces.InfiniticConsumer +import io.infinitic.common.transport.interfaces.TransportMessage import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel @@ -42,6 +43,7 @@ import kotlinx.coroutines.launch context(CoroutineScope, KLogger) internal fun InfiniticConsumer.listenToServiceExecutorTopics( serviceName: ServiceName, + batchConfig: BatchConfig?, subscriptionName: String?, outChannel: Channel, TransportMessage>>, ): Job = launch { @@ -51,22 +53,22 @@ internal fun InfiniticConsumer.listenToServiceExecutorTopics( ServiceExecutorTopic, subscriptionName, ) - buildConsumer(serviceExecutorSubscription, serviceName.toString()) - .startConsuming(outChannel) + buildConsumer(serviceExecutorSubscription, serviceName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) // Send messages from ServiceExecutorEventTopic to inChannel val serviceExecutorEventSubscription = SubscriptionType.EVENT_LISTENER.create( ServiceExecutorEventTopic, subscriptionName, ) - buildConsumer(serviceExecutorEventSubscription, serviceName.toString()) - .startConsuming(outChannel) + buildConsumer(serviceExecutorEventSubscription, serviceName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) // Send messages from ServiceExecutorRetryTopic to inChannel val serviceExecutorRetrySubscription = SubscriptionType.EVENT_LISTENER.create( ServiceExecutorRetryTopic, subscriptionName, ) - buildConsumer(serviceExecutorRetrySubscription, serviceName.toString()) - .startConsuming(outChannel) + buildConsumer(serviceExecutorRetrySubscription, serviceName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) } diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowExecutorTopics.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowExecutorTopics.kt index 534d3de11..213cc27e6 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowExecutorTopics.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowExecutorTopics.kt @@ -24,15 +24,16 @@ package io.infinitic.events.listeners import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.messages.Message -import io.infinitic.common.transport.interfaces.InfiniticConsumer import io.infinitic.common.transport.SubscriptionType -import io.infinitic.common.transport.interfaces.TransportMessage import io.infinitic.common.transport.WorkflowExecutorEventTopic import io.infinitic.common.transport.WorkflowExecutorRetryTopic import io.infinitic.common.transport.WorkflowExecutorTopic +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.transport.consumers.Result import io.infinitic.common.transport.consumers.startConsuming import io.infinitic.common.transport.create +import io.infinitic.common.transport.interfaces.InfiniticConsumer +import io.infinitic.common.transport.interfaces.TransportMessage import io.infinitic.common.workflows.data.workflows.WorkflowName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -42,6 +43,7 @@ import kotlinx.coroutines.launch context(CoroutineScope, KLogger) internal fun InfiniticConsumer.listenToWorkflowExecutorTopics( workflowName: WorkflowName, + batchConfig: BatchConfig?, subscriptionName: String?, outChannel: Channel, TransportMessage>>, ): Job = launch { @@ -51,22 +53,22 @@ internal fun InfiniticConsumer.listenToWorkflowExecutorTopics( WorkflowExecutorTopic, subscriptionName, ) - buildConsumer(workflowExecutorSubscription, workflowName.toString()) - .startConsuming(outChannel) + buildConsumer(workflowExecutorSubscription, workflowName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) // Send messages from WorkflowExecutorEventTopic to inChannel val workflowExecutorEventSubscription = SubscriptionType.EVENT_LISTENER.create( WorkflowExecutorEventTopic, subscriptionName, ) - buildConsumer(workflowExecutorEventSubscription, workflowName.toString()) - .startConsuming(outChannel) + buildConsumer(workflowExecutorEventSubscription, workflowName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) // Send messages from WorkflowExecutorRetryTopic to inChannel val workflowExecutorRetrySubscription = SubscriptionType.EVENT_LISTENER.create( WorkflowExecutorRetryTopic, subscriptionName, ) - buildConsumer(workflowExecutorRetrySubscription, workflowName.toString()) - .startConsuming(outChannel) + buildConsumer(workflowExecutorRetrySubscription, workflowName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) } diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowStateTopics.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowStateTopics.kt index 588ef06ee..65356f704 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowStateTopics.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/listenToWorkflowStateTopics.kt @@ -24,15 +24,16 @@ package io.infinitic.events.listeners import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.messages.Message -import io.infinitic.common.transport.interfaces.InfiniticConsumer import io.infinitic.common.transport.SubscriptionType -import io.infinitic.common.transport.interfaces.TransportMessage import io.infinitic.common.transport.WorkflowStateCmdTopic import io.infinitic.common.transport.WorkflowStateEngineTopic import io.infinitic.common.transport.WorkflowStateEventTopic +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.transport.consumers.Result import io.infinitic.common.transport.consumers.startConsuming import io.infinitic.common.transport.create +import io.infinitic.common.transport.interfaces.InfiniticConsumer +import io.infinitic.common.transport.interfaces.TransportMessage import io.infinitic.common.workflows.data.workflows.WorkflowName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -42,6 +43,7 @@ import kotlinx.coroutines.launch context(CoroutineScope, KLogger) internal fun InfiniticConsumer.listenToWorkflowStateTopics( workflowName: WorkflowName, + batchConfig: BatchConfig?, subscriptionName: String?, outChannel: Channel, TransportMessage>>, ): Job = launch { @@ -51,22 +53,22 @@ internal fun InfiniticConsumer.listenToWorkflowStateTopics( WorkflowStateCmdTopic, subscriptionName, ) - buildConsumer(workflowStateCmdSubscription, workflowName.toString()) - .startConsuming(outChannel) + buildConsumer(workflowStateCmdSubscription, workflowName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) // Send messages from WorkflowStateEngineTopic to inChannel val workflowStateEngineSubscription = SubscriptionType.EVENT_LISTENER.create( WorkflowStateEngineTopic, subscriptionName, ) - buildConsumer(workflowStateEngineSubscription, workflowName.toString()) - .startConsuming(outChannel) + buildConsumer(workflowStateEngineSubscription, workflowName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) // Send messages from WorkflowStateEventTopic to inChannel val workflowStateEventSubscription = SubscriptionType.EVENT_LISTENER.create( WorkflowStateEventTopic, subscriptionName, ) - buildConsumer(workflowStateEventSubscription, workflowName.toString()) - .startConsuming(outChannel) + buildConsumer(workflowStateEventSubscription, workflowName.toString(), batchConfig) + .startConsuming(batchConfig != null, outChannel) } diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt index 928c43ac2..27066dec9 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/listeners/startCloudEventListener.kt @@ -23,6 +23,7 @@ package io.infinitic.events.listeners import io.github.oshai.kotlinlogging.KLogger +import io.infinitic.common.data.MillisDuration import io.infinitic.common.exceptions.thisShouldNotHappen import io.infinitic.common.messages.Message import io.infinitic.common.transport.BatchProcessorConfig @@ -55,7 +56,7 @@ fun InfiniticConsumer.startCloudEventListener( val batchProcessorConfig = BatchProcessorConfig( batchKey = "cloudEvent", // same for all maxMessages = config.batchConfig.maxMessages, - maxDuration = config.batchConfig.maxMillis, + maxDuration = MillisDuration(config.batchConfig.maxMillis), ) // Launch the processing of outChannel @@ -86,15 +87,30 @@ fun InfiniticConsumer.startCloudEventListener( resources.refreshServiceListAsync(config) { serviceName -> info { "EventListener starts listening Service $serviceName" } - listenToServiceExecutorTopics(serviceName, config.subscriptionName, outChannel) + listenToServiceExecutorTopics( + serviceName, + config.batchConfig, + config.subscriptionName, + outChannel, + ) } // Listen workflow topics, for each workflow found resources.refreshWorkflowListAsync(config) { workflowName -> info { "EventListener starts listening Workflow $workflowName" } - listenToWorkflowExecutorTopics(workflowName, config.subscriptionName, outChannel) + listenToWorkflowExecutorTopics( + workflowName, + config.batchConfig, + config.subscriptionName, + outChannel, + ) - listenToWorkflowStateTopics(workflowName, config.subscriptionName, outChannel) + listenToWorkflowStateTopics( + workflowName, + config.batchConfig, + config.subscriptionName, + outChannel, + ) } } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Subscriptions.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Subscriptions.kt index a5855b0f9..701df05e2 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Subscriptions.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Subscriptions.kt @@ -30,10 +30,6 @@ sealed class Subscription { } data class MainSubscription(override val topic: Topic) : Subscription() { - init { - if (topic.acceptDelayed) require(!withKey) { "Keyed subscription are forbidden for topics accepting delayed messages" } - } - override val withKey get() = when (topic) { WorkflowTagEngineTopic, diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Topic.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Topic.kt index 94805ff0d..a19506046 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Topic.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/Topic.kt @@ -141,18 +141,6 @@ val Topic<*>.isTimer else -> false } -/** - * Property indicating whether a topic type receives delayed message - */ -val Topic<*>.acceptDelayed - get() = when (this) { - WorkflowStateTimerTopic -> true - WorkflowExecutorRetryTopic -> true - ServiceExecutorRetryTopic -> true - - else -> false - } - /** * @return The [Topic] without delay. * If the current [Topic] is a delayed topic, it returns the corresponding non-delayed [Topic]. diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt index 64f3a2716..74a503b7f 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/config/BatchConfig.kt @@ -22,8 +22,6 @@ */ package io.infinitic.common.transport.config -import io.infinitic.common.data.MillisDuration - data class BatchConfig( val maxMessages: Int = 1000, val maxSeconds: Double = 1.0 @@ -33,5 +31,5 @@ data class BatchConfig( require(maxSeconds > 0) { error("'${::maxSeconds.name}' must be > 0, but was $maxSeconds") } } - val maxMillis = MillisDuration((maxSeconds * 1000).toLong()) + val maxMillis = (maxSeconds * 1000).toLong() } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt index 75a2c0fea..ba1ed5b91 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startAsync.kt @@ -25,6 +25,7 @@ package io.infinitic.common.transport.consumers import io.github.oshai.kotlinlogging.KLogger import io.infinitic.common.data.MillisInstant import io.infinitic.common.transport.BatchProcessorConfig +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.transport.interfaces.TransportConsumer import io.infinitic.common.transport.interfaces.TransportMessage import kotlinx.coroutines.CoroutineScope @@ -36,27 +37,28 @@ import kotlinx.coroutines.launch * * @param concurrency The number of concurrent coroutines for processing messages. * @param deserialize A suspending function to deserialize the transport message into its payload. - * @param process A suspending function to process the deserialized message along with its publishing time. + * @param processor A suspending function to process the deserialized message along with its publishing time. * @param batchProcessorConfig An optional suspending function to configure batching of messages. - * @param batchProcess An optional suspending function to process a batch of messages. + * @param batchProcessor An optional suspending function to process a batch of messages. * @return A Job representing the coroutine that runs the consuming process. */ context(CoroutineScope, KLogger) fun , M : Any> TransportConsumer.startAsync( + batchReceivingConfig: BatchConfig?, concurrency: Int, deserialize: suspend (T) -> M, - process: suspend (M, MillisInstant) -> Unit, + processor: suspend (M, MillisInstant) -> Unit, beforeDlq: (suspend (M, Exception) -> Unit)? = null, batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null, - batchProcess: (suspend (List, List) -> Unit)? = null, + batchProcessor: (suspend (List, List) -> Unit)? = null, ): Job = launch { - startConsuming() + startConsuming(batchReceivingConfig != null) .completeProcess( concurrency, deserialize, - process, + processor, beforeDlq, batchProcessorConfig, - batchProcess, + batchProcessor, ) } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt index 2aab60b85..be8d09360 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startBatching.kt @@ -75,15 +75,15 @@ fun Channel.startBatching( * or the specified timeout duration elapses. * * @param maxMessages The maximum number of messages to include in the batch. - * @param timeout The maximum duration (in milliseconds) to wait for messages before returning the batch. + * @param timeoutMillis The maximum duration (in milliseconds) to wait for messages before returning the batch. * @return A BatchResult containing the collected messages and a flag indicating if the channel is still open. */ @OptIn(ExperimentalCoroutinesApi::class) -private suspend fun Channel.batchWithTimeout( +suspend fun Channel.batchWithTimeout( maxMessages: Int, - timeout: Long + timeoutMillis: Long ): BatchResult { - val endTime = System.currentTimeMillis() + timeout + val endTime = System.currentTimeMillis() + timeoutMillis // isActive becomes false after timeout var isActive = true // isOpen becomes false if the channel is closed @@ -117,7 +117,7 @@ private suspend fun Channel.batchWithTimeout( * @param messages The list of messages collected in the batch. * @param isOpen A flag indicating if the source channel is still open after collecting the batch. */ -private data class BatchResult( +data class BatchResult( val messages: List, val isOpen: Boolean, ) diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt index 2cba861dc..2bfa2e2b2 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/consumers/startConsuming.kt @@ -43,6 +43,7 @@ import kotlin.coroutines.cancellation.CancellationException */ context(CoroutineScope, KLogger) fun , M> TransportConsumer.startConsuming( + batchReceiving: Boolean, channel: Channel, TransportMessage>> = Channel(), ): Channel> { debug { "startConsuming: starting producing on channel ${channel.hashCode()} from ${this@startConsuming.name}" } @@ -53,10 +54,21 @@ fun , M> TransportConsumer.startConsuming( trace { "startConsuming: producer added to consuming channel ${channel.hashCode()}" } while (isActive) { try { - val msg = receive().also { - trace { "consuming: received $it from ${this@startConsuming.name}" } + when (batchReceiving) { + true -> { + batchReceive().forEach { + trace { "consuming: received $it from ${this@startConsuming.name}" } + channel.send(Result.success(it, it)) + } + } + + false -> { + receive().let { + trace { "consuming: received $it from ${this@startConsuming.name}" } + channel.send(Result.success(it, it)) + } + } } - channel.send(Result.success(msg, msg)) } catch (e: CancellationException) { // do nothing, will exit if calling scope is not active anymore } catch (e: Exception) { diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt index 31087c0da..ba2b8324a 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticConsumer.kt @@ -27,6 +27,7 @@ import io.infinitic.common.data.MillisInstant import io.infinitic.common.messages.Message import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.Subscription +import io.infinitic.common.transport.config.BatchConfig import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -45,6 +46,7 @@ interface InfiniticConsumer { suspend fun buildConsumers( subscription: Subscription, entity: String, + batchConfig: BatchConfig?, occurrence: Int? ): List>> @@ -52,7 +54,8 @@ interface InfiniticConsumer { suspend fun buildConsumer( subscription: Subscription, entity: String, - ): TransportConsumer> = buildConsumers(subscription, entity, null).first() + batchConfig: BatchConfig? + ): TransportConsumer> = buildConsumers(subscription, entity, batchConfig, null).first() /** * Starts asynchronous processing of messages for a given subscription. @@ -70,6 +73,7 @@ interface InfiniticConsumer { suspend fun startAsync( subscription: Subscription, entity: String, + batchConsumerConfig: BatchConfig? = null, concurrency: Int, processor: suspend (S, MillisInstant) -> Unit, beforeDlq: (suspend (S, Exception) -> Unit)? = null, @@ -93,12 +97,20 @@ interface InfiniticConsumer { suspend fun start( subscription: Subscription, entity: String, + batchConsumerConfig: BatchConfig? = null, concurrency: Int, process: suspend (M, MillisInstant) -> Unit, beforeDlq: (suspend (M, Exception) -> Unit)? = null, batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null, batchProcessor: (suspend (List, List) -> Unit)? = null ) = startAsync( - subscription, entity, concurrency, process, beforeDlq, batchProcessorConfig, batchProcessor, + subscription, + entity, + batchConsumerConfig, + concurrency, + process, + beforeDlq, + batchProcessorConfig, + batchProcessor, ).join() } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticProducer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticProducer.kt index 095935464..77a198259 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticProducer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/InfiniticProducer.kt @@ -23,12 +23,10 @@ package io.infinitic.common.transport.interfaces import io.infinitic.common.data.MillisDuration -import io.infinitic.common.exceptions.thisShouldNotHappen import io.infinitic.common.messages.Message import io.infinitic.common.tasks.events.messages.ServiceExecutorEventMessage import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage import io.infinitic.common.transport.Topic -import io.infinitic.common.transport.acceptDelayed import io.infinitic.common.transport.forWorkflow import io.infinitic.common.transport.withoutDelay @@ -57,8 +55,6 @@ interface InfiniticProducer { topic: Topic, after: MillisDuration = MillisDuration(0) ) { - require(after <= 0 || topic.acceptDelayed) { thisShouldNotHappen("Trying to send to $topic with a delay $after") } - // Switch to workflow-related topics for workflowTasks val t = when (this) { is ServiceExecutorMessage -> if (isWorkflowTask()) topic.forWorkflow else topic diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/TransportConsumer.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/TransportConsumer.kt index 45bc5c6bc..f93180aca 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/TransportConsumer.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/transport/interfaces/TransportConsumer.kt @@ -30,6 +30,13 @@ interface TransportConsumer> { */ suspend fun receive(): T + /** + * Receives a batch of transport messages from the consumer asynchronously. + * + * @return A list of messages of type [T] received from the transport. + */ + suspend fun batchReceive(): List + /** * Defines the maximum number of times a message can be redelivered * when processing messages from a transport consumer. diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt index efb56fb2c..15934aad6 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/utils/ClassUtil.kt @@ -196,7 +196,7 @@ private val batchMethodMutex = Mutex() fun Method.getBatchMethod(): BatchMethod? = batchMethodCache[this] -suspend fun Class<*>.initBatchMethods() { +suspend fun Class<*>.initBatchProcessorMethods() { // Retrieve the list of BatchMethod for the class val batchMethodList = getBatchMethods() // Update the cache with all methods of the class diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt index 0521dedb5..07ccbf240 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt @@ -57,7 +57,7 @@ internal class BatchByTests : StringSpec( "should be able to batch by max message, up to scope cancellation" { with(logger) { val scope = getScope() - val channel = with(scope) { IntConsumer().startConsuming() } + val channel = with(scope) { IntConsumer().startConsuming(false) } val outputChannel = with(scope) { channel.batchBy(::getBatchingConfig) } @@ -86,11 +86,12 @@ internal class BatchByTests : StringSpec( "should be able to batch by max duration, up to scope cancellation" { class SlowConsumer : IntConsumer() { override suspend fun receive() = super.receive().also { delay(70) } + override suspend fun batchReceive() = super.batchReceive().also { delay(70) } } with(logger) { val scope = getScope() - val channel = with(scope) { SlowConsumer().startConsuming() } + val channel = with(scope) { SlowConsumer().startConsuming(false) } val outputChannel = Channel>>() channel.startBatching(5, 100, outputChannel) diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt index 821c7da9f..5dc395694 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartBatchingTests.kt @@ -47,7 +47,7 @@ internal class StartBatchingTests : StringSpec( "should be able to batch by max message, up to scope cancellation" { with(logger) { val scope = getScope() - val channel = with(scope) { IntConsumer().startConsuming() } + val channel = with(scope) { IntConsumer().startConsuming(false) } val outputChannel = Channel>>() channel.startBatching(5, 100, outputChannel) @@ -84,7 +84,7 @@ internal class StartBatchingTests : StringSpec( with(logger) { val scope = getScope() - val channel = with(scope) { SlowConsumer().startConsuming() } + val channel = with(scope) { SlowConsumer().startConsuming(false) } val outputChannel = Channel>>() channel.startBatching(5, 100, outputChannel) diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt index 59f4a9d23..8a115f59a 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/StartConsumingTests.kt @@ -42,7 +42,7 @@ internal class StartConsumingTests : StringSpec( "Consumer should consume up to scope cancellation" { with(logger) { val scope = getScope() - val channel = with(scope) { IntConsumer().startConsuming() } + val channel = with(scope) { IntConsumer().startConsuming(false) } // while no error shouldNotThrowAny { repeat(100) { channel.receive() } } @@ -64,7 +64,7 @@ internal class StartConsumingTests : StringSpec( with(logger) { val scope = getScope() - val channel = with(scope) { ErrorConsumer().startConsuming() } + val channel = with(scope) { ErrorConsumer().startConsuming(false) } // while no error shouldNotThrowAny { repeat(98) { channel.receive() } } @@ -90,7 +90,7 @@ internal class StartConsumingTests : StringSpec( with(logger) { val scope = getScope() - val channel = with(scope) { ExceptionConsumer().startConsuming() } + val channel = with(scope) { ExceptionConsumer().startConsuming(false) } shouldNotThrowAny { repeat(100) { channel.receive() } } scope.isActive shouldBe true diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt index 4ce88e35c..eb9776a2a 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/fakes.kt @@ -74,6 +74,8 @@ internal open class IntConsumer : TransportConsumer { override suspend fun receive() = IntMessage(counter.incrementAndGet()) .also { receivedList.add(it.value) } + override suspend fun batchReceive(): List = listOf(receive(), receive()) + override val maxRedeliveryCount = 1 override val name: String = this.toString() } diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncTests.kt index 7cb9bbc49..2b0e19362 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncTests.kt @@ -58,7 +58,7 @@ internal class ProcessorConsumerTests : StringSpec( with(scope) { consumer - .startAsync(3, ::deserialize, ::process) + .startAsync(null, 3, ::deserialize, ::process) .join() } receivedList.size shouldBeGreaterThan 0 @@ -75,7 +75,7 @@ internal class ProcessorConsumerTests : StringSpec( with(getScope()) { consumer - .startAsync(3, ::deserializeWitError, ::process) + .startAsync(null, 3, ::deserializeWitError, ::process) .join() } @@ -98,7 +98,7 @@ internal class ProcessorConsumerTests : StringSpec( with(getScope()) { consumer - .startAsync(3, ::deserialize, ::processWithError) + .startAsync(null, 3, ::deserialize, ::processWithError) .join() } @@ -119,7 +119,7 @@ internal class ProcessorConsumerTests : StringSpec( with(getScope()) { consumer - .startAsync(3, ::deserializeWitError, ::process) + .startAsync(null, 3, ::deserializeWitError, ::process) .join() } @@ -142,7 +142,7 @@ internal class ProcessorConsumerTests : StringSpec( with(getScope()) { consumer - .startAsync(3, ::deserialize, ::processWithException) + .startAsync(null, 3, ::deserialize, ::processWithException) .join() } diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt index 06b92c7f3..1114a8df1 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/startAsyncWithBatchTests.kt @@ -61,7 +61,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(scope) { consumer - .startAsync(3, ::deserialize, ::process, null, ::batchConfig, ::processBatch) + .startAsync(null, 3, ::deserialize, ::process, null, ::batchConfig, ::processBatch) .join() } receivedList.size shouldBeGreaterThan 0 @@ -77,6 +77,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserializeWitError, ::process, @@ -106,6 +107,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserialize, ::processWithError, @@ -132,6 +134,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserialize, ::process, @@ -158,6 +161,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserialize, ::process, @@ -185,6 +189,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserializeWithException, ::process, @@ -215,6 +220,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserialize, ::processWithException, @@ -242,6 +248,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserialize, ::process, @@ -273,6 +280,7 @@ internal class ProcessorConsumerWithBatchTests : StringSpec( with(getScope()) { consumer .startAsync( + null, 3, ::deserialize, ::process, diff --git a/infinitic-transport-inMemory/src/test/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumerTests.kt b/infinitic-transport-inMemory/src/test/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumerTests.kt index fab674ffc..bd0a70889 100644 --- a/infinitic-transport-inMemory/src/test/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumerTests.kt +++ b/infinitic-transport-inMemory/src/test/kotlin/io/infinitic/inMemory/InMemoryInfiniticConsumerTests.kt @@ -74,6 +74,7 @@ class InMemoryInfiniticConsumerTests : StringSpec( consumer.startAsync( MainSubscription(ServiceExecutorTopic), serviceName.toString(), + null, 10, ::process, ) diff --git a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt index 7722c4290..c4f60940c 100644 --- a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt +++ b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/PulsarInfiniticConsumer.kt @@ -30,6 +30,7 @@ import io.infinitic.common.transport.BatchProcessorConfig import io.infinitic.common.transport.EventListenerSubscription import io.infinitic.common.transport.MainSubscription import io.infinitic.common.transport.Subscription +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.transport.consumers.startAsync import io.infinitic.common.transport.interfaces.InfiniticConsumer import io.infinitic.common.transport.interfaces.TransportMessage @@ -60,6 +61,7 @@ class PulsarInfiniticConsumer( override suspend fun buildConsumers( subscription: Subscription, entity: String, + batchConfig: BatchConfig?, occurrence: Int? ): List> { // Retrieve the name of the topic and of the DLQ topic @@ -79,6 +81,7 @@ class PulsarInfiniticConsumer( subscriptionNameDlq = subscription.nameDLQ, subscriptionType = subscription.type, consumerName = consumerName, + batchConfig = batchConfig, ).onSuccess { trace { "Consumer '${consumerName}' created for $topicName" } } @@ -101,13 +104,15 @@ class PulsarInfiniticConsumer( override suspend fun buildConsumer( subscription: Subscription, entity: String, + batchConfig: BatchConfig? ): PulsarTransportConsumer = - buildConsumers(subscription, entity, null).first() + buildConsumers(subscription, entity, batchConfig, null).first() context(CoroutineScope, KLogger) override suspend fun startAsync( subscription: Subscription, entity: String, + batchConsumerConfig: BatchConfig?, concurrency: Int, processor: suspend (S, MillisInstant) -> Unit, beforeDlq: (suspend (S, Exception) -> Unit)?, @@ -120,10 +125,11 @@ class PulsarInfiniticConsumer( return when (subscription.withKey) { true -> { // multiple consumers with unique processing - val consumers = buildConsumers(subscription, entity, concurrency) + val consumers = buildConsumers(subscription, entity, batchConsumerConfig, concurrency) launch { repeat(concurrency) { index -> consumers[index].startAsync( + batchConsumerConfig, concurrency = 1, deserialize, processor, @@ -137,8 +143,9 @@ class PulsarInfiniticConsumer( false -> { // unique consumer with parallel processing - val consumer = buildConsumer(subscription, entity) + val consumer = buildConsumer(subscription, entity, batchConsumerConfig) consumer.startAsync( + batchConsumerConfig, concurrency, deserialize, processor, @@ -196,12 +203,14 @@ class PulsarInfiniticConsumer( subscriptionNameDlq: String, subscriptionType: SubscriptionType, consumerName: String, + batchConfig: BatchConfig?, ): Result> { val consumerDef = InfiniticPulsarClient.ConsumerDef( topic = topic, subscriptionName = subscriptionName, // MUST be the same for all instances! subscriptionType = subscriptionType, consumerName = consumerName, + batchReceivingConfig = batchConfig, pulsarConsumerConfig = pulsarConsumerConfig, ) val consumerDefDlq = topicDlq?.let { @@ -210,6 +219,7 @@ class PulsarInfiniticConsumer( subscriptionName = subscriptionNameDlq, // MUST be the same for all instances! subscriptionType = SubscriptionType.Shared, consumerName = "$consumerName-dlq", + batchReceivingConfig = batchConfig, pulsarConsumerConfig = pulsarConsumerConfig, ) } diff --git a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/client/InfiniticPulsarClient.kt b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/client/InfiniticPulsarClient.kt index 16bba6d41..3e90546b4 100644 --- a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/client/InfiniticPulsarClient.kt +++ b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/client/InfiniticPulsarClient.kt @@ -25,12 +25,14 @@ package io.infinitic.pulsar.client import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.common.messages.Envelope import io.infinitic.common.messages.Message +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.pulsar.config.PulsarConsumerConfig import io.infinitic.pulsar.config.PulsarProducerConfig import io.infinitic.pulsar.schemas.schemaDefinition import kotlinx.coroutines.future.await import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import org.apache.pulsar.client.api.BatchReceivePolicy import org.apache.pulsar.client.api.BatcherBuilder import org.apache.pulsar.client.api.Consumer import org.apache.pulsar.client.api.DeadLetterPolicy @@ -230,6 +232,7 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) { subscriptionName, subscriptionType, consumerName, + batchConfig, consumerConfig ) = consumerDef @@ -241,6 +244,15 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) { .consumerName(consumerName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + // Batch Receive Policy + batchConfig?.let { + builder.batchReceivePolicy( + BatchReceivePolicy.builder() + .maxNumMessages(it.maxMessages) + .timeout((it.maxSeconds * 1000).toInt(), TimeUnit.MILLISECONDS) + .build(), + ) + } // Dead Letter Queue consumerDefDlq?.let { builder @@ -384,6 +396,7 @@ class InfiniticPulsarClient(private val pulsarClient: PulsarClient) { val subscriptionName: String, val subscriptionType: SubscriptionType, val consumerName: String, + val batchReceivingConfig: BatchConfig?, val pulsarConsumerConfig: PulsarConsumerConfig, ) diff --git a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarTransportConsumer.kt b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarTransportConsumer.kt index 0787194a2..40c40549a 100644 --- a/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarTransportConsumer.kt +++ b/infinitic-transport-pulsar/src/main/kotlin/io/infinitic/pulsar/consumers/PulsarTransportConsumer.kt @@ -41,5 +41,11 @@ class PulsarTransportConsumer( return PulsarTransportMessage(pulsarMessage, pulsarConsumer, topic, maxRedeliveryCount) } + override suspend fun batchReceive(): List> { + val pulsarMessages = pulsarConsumer.batchReceiveAsync().await() + + return pulsarMessages.map { PulsarTransportMessage(it, pulsarConsumer, topic, maxRedeliveryCount) } + } + override val name: String = pulsarConsumer.consumerName } diff --git a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/client/PulsarInfiniticClientTests.kt b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/client/PulsarInfiniticClientTests.kt index f45e117b9..917854276 100644 --- a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/client/PulsarInfiniticClientTests.kt +++ b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/client/PulsarInfiniticClientTests.kt @@ -183,6 +183,7 @@ class PulsarInfiniticClientTests : subscriptionName = randomSubscriptionName, // MUST be the same for all instances! subscriptionType = randomSubscriptionType, consumerName = randomConsumerName, + batchReceivingConfig = null, pulsarConsumerConfig = randomConfig, ) @@ -239,6 +240,7 @@ class PulsarInfiniticClientTests : subscriptionName = "subscriptionName", subscriptionType = SubscriptionType.Shared, consumerName = "consumerName", + batchReceivingConfig = null, pulsarConsumerConfig = randomConfig, ) val consumerDefDlq = InfiniticPulsarClient.ConsumerDef( @@ -246,6 +248,7 @@ class PulsarInfiniticClientTests : subscriptionName = randomSubscriptionName, subscriptionType = randomSubscriptionType, consumerName = randomConsumerName, + batchReceivingConfig = null, pulsarConsumerConfig = randomConfig, ) diff --git a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt index 12de57ac0..4a2f4d838 100644 --- a/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt +++ b/infinitic-transport-pulsar/src/test/kotlin/io/infinitic/pulsar/consumers/ConsumerTests.kt @@ -152,7 +152,7 @@ class ConsumerTests : StringSpec( try { with(scope) { - consumer.start(subscription, entity, 1, handler, null) + consumer.start(subscription, entity, null, 1, handler, null) } } catch (e: CancellationException) { // do nothing @@ -200,7 +200,7 @@ class ConsumerTests : StringSpec( try { with(scope) { - consumer.start(subscription, entity, 100, handler, null) + consumer.start(subscription, entity, null, 100, handler, null) } } catch (e: CancellationException) { // do nothing @@ -245,7 +245,7 @@ class ConsumerTests : StringSpec( try { with(scope) { - consumer.start(subscription, entity, 1, handler, null) + consumer.start(subscription, entity, null, 1, handler, null) } } catch (e: CancellationException) { // do nothing @@ -287,7 +287,7 @@ class ConsumerTests : StringSpec( try { val job = with(scope) { - consumer.startAsync(subscription, entity, 100, process, null) + consumer.startAsync(subscription, entity, null, 100, process, null) } // on the consumer created, we send the messages // to avoid that the first consumer up captures all keys right-away diff --git a/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt b/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt index 41515788d..71fbdabb0 100644 --- a/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt +++ b/infinitic-worker/src/main/kotlin/io/infinitic/workers/InfiniticWorker.kt @@ -25,6 +25,7 @@ package io.infinitic.workers import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging import io.infinitic.clients.InfiniticClient +import io.infinitic.common.data.MillisDuration import io.infinitic.common.data.MillisInstant import io.infinitic.common.messages.Message import io.infinitic.common.tasks.events.messages.ServiceExecutorEventMessage @@ -71,7 +72,7 @@ import io.infinitic.workers.config.WorkflowConfig import io.infinitic.workers.config.WorkflowExecutorConfig import io.infinitic.workers.config.WorkflowStateEngineConfig import io.infinitic.workers.config.WorkflowTagEngineConfig -import io.infinitic.workers.config.initBatchMethods +import io.infinitic.workers.config.initBatchProcessorMethods import io.infinitic.workers.registry.ExecutorRegistry import io.infinitic.workflows.Workflow import io.infinitic.workflows.engine.WorkflowStateCmdHandler @@ -406,7 +407,7 @@ class InfiniticWorker( logServiceExecutorStart(config) // init batch methods for current factory - config.initBatchMethods() + config.initBatchProcessorMethods() // TASK-EXECUTOR val jobExecutor = with(TaskExecutor.logger) { @@ -447,6 +448,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(ServiceExecutorTopic), entity = config.serviceName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = processor, beforeDlq = beforeDlq, @@ -463,6 +465,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(ServiceExecutorRetryTopic), entity = config.serviceName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = taskRetryHandler::handle, ) @@ -489,6 +492,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(ServiceExecutorEventTopic), entity = config.serviceName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = processor, ) @@ -564,12 +568,17 @@ class InfiniticWorker( } val batchProcessorConfig = config.batch?.let { - BatchProcessorConfig("workflowCmd:" + config.workflowName, it.maxMessages, it.maxMillis) + BatchProcessorConfig( + "workflowCmd:" + config.workflowName, + it.maxMessages, + MillisDuration(it.maxMillis), + ) } consumer.startAsync( subscription = MainSubscription(WorkflowStateCmdTopic), entity = config.workflowName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = processor, batchProcessorConfig = { _ -> batchProcessorConfig }, @@ -607,12 +616,17 @@ class InfiniticWorker( } val batchProcessorConfig = config.batch?.let { - BatchProcessorConfig("workflowState:" + config.workflowName, it.maxMessages, it.maxMillis) + BatchProcessorConfig( + "workflowState:" + config.workflowName, + it.maxMessages, + MillisDuration(it.maxMillis), + ) } consumer.startAsync( subscription = MainSubscription(WorkflowStateEngineTopic), entity = config.workflowName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = processor, batchProcessorConfig = { _ -> batchProcessorConfig }, @@ -628,6 +642,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(WorkflowStateTimerTopic), entity = config.workflowName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = workflowStateTimerHandler::process, ) @@ -662,12 +677,17 @@ class InfiniticWorker( } val batchProcessorConfig = config.batch?.let { - BatchProcessorConfig("workflowEvent:" + config.workflowName, it.maxMessages, it.maxMillis) + BatchProcessorConfig( + "workflowEvent:" + config.workflowName, + it.maxMessages, + MillisDuration(it.maxMillis), + ) } consumer.startAsync( subscription = MainSubscription(WorkflowStateEventTopic), entity = config.workflowName, + batchConsumerConfig = config.batch, concurrency = config.concurrency, processor = processor, batchProcessorConfig = { _ -> batchProcessorConfig }, @@ -714,7 +734,7 @@ class InfiniticWorker( BatchProcessorConfig( "workflowExecutor:" + config.workflowName, it.maxMessages, - it.maxMillis, + MillisDuration(it.maxMillis), ) } @@ -731,6 +751,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(WorkflowExecutorTopic), entity = config.workflowName, + batchConsumerConfig = config.batchConfig, concurrency = config.concurrency, processor = processor, beforeDlq = beforeDlq, @@ -746,6 +767,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(WorkflowExecutorRetryTopic), + batchConsumerConfig = config.batchConfig, entity = config.workflowName, concurrency = config.concurrency, processor = taskRetryHandler::handle, @@ -773,6 +795,7 @@ class InfiniticWorker( consumer.startAsync( subscription = MainSubscription(WorkflowExecutorEventTopic), entity = config.workflowName, + batchConsumerConfig = config.batchConfig, concurrency = config.concurrency, processor = processor, ) diff --git a/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/ServiceExecutorConfig.kt b/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/ServiceExecutorConfig.kt index 41ce9aedd..455a2efe4 100644 --- a/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/ServiceExecutorConfig.kt +++ b/infinitic-worker/src/main/kotlin/io/infinitic/workers/config/ServiceExecutorConfig.kt @@ -22,8 +22,9 @@ */ package io.infinitic.workers.config +import io.infinitic.common.transport.config.BatchConfig import io.infinitic.common.utils.getInstance -import io.infinitic.common.utils.initBatchMethods +import io.infinitic.common.utils.initBatchProcessorMethods import io.infinitic.common.workers.config.RetryPolicy import io.infinitic.common.workers.config.UNSET_RETRY_POLICY import io.infinitic.config.loadFromYamlFile @@ -36,8 +37,8 @@ private typealias ServiceFactory = () -> Any internal const val UNSET_TIMEOUT = Double.MAX_VALUE -suspend fun ServiceExecutorConfig.initBatchMethods() { - factory()::class.java.initBatchMethods() +suspend fun ServiceExecutorConfig.initBatchProcessorMethods() { + factory()::class.java.initBatchProcessorMethods() } @Suppress("unused") @@ -47,6 +48,11 @@ sealed class ServiceExecutorConfig { abstract val factory: ServiceFactory abstract val concurrency: Int + /** + * Configuration settings for batch message when consuming + */ + abstract val batch: BatchConfig? + /** * WithRetry instance for this executor * Set to WithRetry.UNSET if not defined @@ -96,6 +102,7 @@ sealed class ServiceExecutorConfig { private var concurrency: Int = 1 private var timeoutSeconds: Double? = UNSET_TIMEOUT private var withRetry: WithRetry? = WithRetry.UNSET + private var batchConfig: BatchConfig? = null fun setServiceName(name: String) = apply { this.serviceName = name } @@ -112,6 +119,10 @@ sealed class ServiceExecutorConfig { fun withRetry(retry: WithRetry) = apply { this.withRetry = retry } + fun setBatch(maxMessages: Int, maxSeconds: Double) { + apply { this.batchConfig = BatchConfig(maxMessages, maxSeconds) } + } + fun build(): ServiceExecutorConfig { serviceName.checkServiceName() require(factory != null) { "${::factory.name} must not be null" } @@ -124,13 +135,14 @@ sealed class ServiceExecutorConfig { concurrency, withRetry, timeoutSeconds.withTimeout, + batchConfig ) } } } /** - * ServiceExecutorConfig built from builder + * ServiceExecutorConfig built by builder */ data class BuiltServiceExecutorConfig( override val serviceName: String, @@ -138,6 +150,7 @@ data class BuiltServiceExecutorConfig( override val concurrency: Int, override val withRetry: WithRetry?, override val withTimeout: WithTimeout?, + override val batch: BatchConfig? ) : ServiceExecutorConfig() /** @@ -149,6 +162,7 @@ data class LoadedServiceExecutorConfig( override val concurrency: Int = 1, val timeoutSeconds: Double? = UNSET_TIMEOUT, val retry: RetryPolicy? = UNSET_RETRY_POLICY, + override val batch: BatchConfig? = null ) : ServiceExecutorConfig(), WithMutableServiceName { override val factory: () -> Any = { `class`.getInstance().getOrThrow() }