Skip to content

Commit

Permalink
WIP: implementing batch receiving
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Nov 2, 2024
1 parent 89cffde commit e556788
Show file tree
Hide file tree
Showing 29 changed files with 217 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ internal val mockedConsumer = mockk<InMemoryInfiniticConsumer> {
coEvery {
with(capture(scopeSlot)) {
with(capture(loggerSlot)) {
startAsync(any<Subscription<*>>(), "$clientNameTest", 1, any(), any())
startAsync(any<Subscription<*>>(), "$clientNameTest", null, 1, any(), any())
}
}
} answers {
Expand Down Expand Up @@ -236,6 +236,7 @@ internal class InfiniticClientTests : StringSpec(
mockedConsumer.startAsync(
MainSubscription(ClientTopic),
"$clientNameTest",
null,
1,
any(),
any(),
Expand Down Expand Up @@ -525,6 +526,7 @@ internal class InfiniticClientTests : StringSpec(
mockedConsumer.startAsync(
MainSubscription(ClientTopic),
"$clientNameTest",
null,
1,
any(),
any(),
Expand All @@ -543,6 +545,7 @@ internal class InfiniticClientTests : StringSpec(
mockedConsumer.startAsync(
MainSubscription(ClientTopic),
"$clientNameTest",
null,
1,
any(),
any(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ package io.infinitic.events.listeners
import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.messages.Message
import io.infinitic.common.tasks.data.ServiceName
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.ServiceExecutorEventTopic
import io.infinitic.common.transport.ServiceExecutorRetryTopic
import io.infinitic.common.transport.ServiceExecutorTopic
import io.infinitic.common.transport.SubscriptionType
import io.infinitic.common.transport.interfaces.TransportMessage
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.transport.consumers.Result
import io.infinitic.common.transport.consumers.startConsuming
import io.infinitic.common.transport.create
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.interfaces.TransportMessage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
Expand All @@ -42,6 +43,7 @@ import kotlinx.coroutines.launch
context(CoroutineScope, KLogger)
internal fun InfiniticConsumer.listenToServiceExecutorTopics(
serviceName: ServiceName,
batchConfig: BatchConfig?,
subscriptionName: String?,
outChannel: Channel<Result<TransportMessage<Message>, TransportMessage<Message>>>,
): Job = launch {
Expand All @@ -51,22 +53,22 @@ internal fun InfiniticConsumer.listenToServiceExecutorTopics(
ServiceExecutorTopic,
subscriptionName,
)
buildConsumer(serviceExecutorSubscription, serviceName.toString())
.startConsuming(outChannel)
buildConsumer(serviceExecutorSubscription, serviceName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)

// Send messages from ServiceExecutorEventTopic to inChannel
val serviceExecutorEventSubscription = SubscriptionType.EVENT_LISTENER.create(
ServiceExecutorEventTopic,
subscriptionName,
)
buildConsumer(serviceExecutorEventSubscription, serviceName.toString())
.startConsuming(outChannel)
buildConsumer(serviceExecutorEventSubscription, serviceName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)

// Send messages from ServiceExecutorRetryTopic to inChannel
val serviceExecutorRetrySubscription = SubscriptionType.EVENT_LISTENER.create(
ServiceExecutorRetryTopic,
subscriptionName,
)
buildConsumer(serviceExecutorRetrySubscription, serviceName.toString())
.startConsuming(outChannel)
buildConsumer(serviceExecutorRetrySubscription, serviceName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ package io.infinitic.events.listeners

import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.SubscriptionType
import io.infinitic.common.transport.interfaces.TransportMessage
import io.infinitic.common.transport.WorkflowExecutorEventTopic
import io.infinitic.common.transport.WorkflowExecutorRetryTopic
import io.infinitic.common.transport.WorkflowExecutorTopic
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.transport.consumers.Result
import io.infinitic.common.transport.consumers.startConsuming
import io.infinitic.common.transport.create
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.interfaces.TransportMessage
import io.infinitic.common.workflows.data.workflows.WorkflowName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand All @@ -42,6 +43,7 @@ import kotlinx.coroutines.launch
context(CoroutineScope, KLogger)
internal fun InfiniticConsumer.listenToWorkflowExecutorTopics(
workflowName: WorkflowName,
batchConfig: BatchConfig?,
subscriptionName: String?,
outChannel: Channel<Result<TransportMessage<Message>, TransportMessage<Message>>>,
): Job = launch {
Expand All @@ -51,22 +53,22 @@ internal fun InfiniticConsumer.listenToWorkflowExecutorTopics(
WorkflowExecutorTopic,
subscriptionName,
)
buildConsumer(workflowExecutorSubscription, workflowName.toString())
.startConsuming(outChannel)
buildConsumer(workflowExecutorSubscription, workflowName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)

// Send messages from WorkflowExecutorEventTopic to inChannel
val workflowExecutorEventSubscription = SubscriptionType.EVENT_LISTENER.create(
WorkflowExecutorEventTopic,
subscriptionName,
)
buildConsumer(workflowExecutorEventSubscription, workflowName.toString())
.startConsuming(outChannel)
buildConsumer(workflowExecutorEventSubscription, workflowName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)

// Send messages from WorkflowExecutorRetryTopic to inChannel
val workflowExecutorRetrySubscription = SubscriptionType.EVENT_LISTENER.create(
WorkflowExecutorRetryTopic,
subscriptionName,
)
buildConsumer(workflowExecutorRetrySubscription, workflowName.toString())
.startConsuming(outChannel)
buildConsumer(workflowExecutorRetrySubscription, workflowName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ package io.infinitic.events.listeners

import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.SubscriptionType
import io.infinitic.common.transport.interfaces.TransportMessage
import io.infinitic.common.transport.WorkflowStateCmdTopic
import io.infinitic.common.transport.WorkflowStateEngineTopic
import io.infinitic.common.transport.WorkflowStateEventTopic
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.transport.consumers.Result
import io.infinitic.common.transport.consumers.startConsuming
import io.infinitic.common.transport.create
import io.infinitic.common.transport.interfaces.InfiniticConsumer
import io.infinitic.common.transport.interfaces.TransportMessage
import io.infinitic.common.workflows.data.workflows.WorkflowName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand All @@ -42,6 +43,7 @@ import kotlinx.coroutines.launch
context(CoroutineScope, KLogger)
internal fun InfiniticConsumer.listenToWorkflowStateTopics(
workflowName: WorkflowName,
batchConfig: BatchConfig?,
subscriptionName: String?,
outChannel: Channel<Result<TransportMessage<Message>, TransportMessage<Message>>>,
): Job = launch {
Expand All @@ -51,22 +53,22 @@ internal fun InfiniticConsumer.listenToWorkflowStateTopics(
WorkflowStateCmdTopic,
subscriptionName,
)
buildConsumer(workflowStateCmdSubscription, workflowName.toString())
.startConsuming(outChannel)
buildConsumer(workflowStateCmdSubscription, workflowName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)

// Send messages from WorkflowStateEngineTopic to inChannel
val workflowStateEngineSubscription = SubscriptionType.EVENT_LISTENER.create(
WorkflowStateEngineTopic,
subscriptionName,
)
buildConsumer(workflowStateEngineSubscription, workflowName.toString())
.startConsuming(outChannel)
buildConsumer(workflowStateEngineSubscription, workflowName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)

// Send messages from WorkflowStateEventTopic to inChannel
val workflowStateEventSubscription = SubscriptionType.EVENT_LISTENER.create(
WorkflowStateEventTopic,
subscriptionName,
)
buildConsumer(workflowStateEventSubscription, workflowName.toString())
.startConsuming(outChannel)
buildConsumer(workflowStateEventSubscription, workflowName.toString(), batchConfig)
.startConsuming(batchConfig != null, outChannel)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package io.infinitic.events.listeners

import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.data.MillisDuration
import io.infinitic.common.exceptions.thisShouldNotHappen
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.BatchProcessorConfig
Expand Down Expand Up @@ -55,7 +56,7 @@ fun InfiniticConsumer.startCloudEventListener(
val batchProcessorConfig = BatchProcessorConfig(
batchKey = "cloudEvent", // same for all
maxMessages = config.batchConfig.maxMessages,
maxDuration = config.batchConfig.maxMillis,
maxDuration = MillisDuration(config.batchConfig.maxMillis),
)

// Launch the processing of outChannel
Expand Down Expand Up @@ -86,15 +87,30 @@ fun InfiniticConsumer.startCloudEventListener(
resources.refreshServiceListAsync(config) { serviceName ->
info { "EventListener starts listening Service $serviceName" }

listenToServiceExecutorTopics(serviceName, config.subscriptionName, outChannel)
listenToServiceExecutorTopics(
serviceName,
config.batchConfig,
config.subscriptionName,
outChannel,
)
}

// Listen workflow topics, for each workflow found
resources.refreshWorkflowListAsync(config) { workflowName ->
info { "EventListener starts listening Workflow $workflowName" }

listenToWorkflowExecutorTopics(workflowName, config.subscriptionName, outChannel)
listenToWorkflowExecutorTopics(
workflowName,
config.batchConfig,
config.subscriptionName,
outChannel,
)

listenToWorkflowStateTopics(workflowName, config.subscriptionName, outChannel)
listenToWorkflowStateTopics(
workflowName,
config.batchConfig,
config.subscriptionName,
outChannel,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ sealed class Subscription<S : Message> {
}

data class MainSubscription<S : Message>(override val topic: Topic<S>) : Subscription<S>() {
init {
if (topic.acceptDelayed) require(!withKey) { "Keyed subscription are forbidden for topics accepting delayed messages" }
}

override val withKey
get() = when (topic) {
WorkflowTagEngineTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,6 @@ val Topic<*>.isTimer
else -> false
}

/**
* Property indicating whether a topic type receives delayed message
*/
val Topic<*>.acceptDelayed
get() = when (this) {
WorkflowStateTimerTopic -> true
WorkflowExecutorRetryTopic -> true
ServiceExecutorRetryTopic -> true

else -> false
}

/**
* @return The [Topic] without delay.
* If the current [Topic] is a delayed topic, it returns the corresponding non-delayed [Topic].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
*/
package io.infinitic.common.transport.config

import io.infinitic.common.data.MillisDuration

data class BatchConfig(
val maxMessages: Int = 1000,
val maxSeconds: Double = 1.0
Expand All @@ -33,5 +31,5 @@ data class BatchConfig(
require(maxSeconds > 0) { error("'${::maxSeconds.name}' must be > 0, but was $maxSeconds") }
}

val maxMillis = MillisDuration((maxSeconds * 1000).toLong())
val maxMillis = (maxSeconds * 1000).toLong()
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package io.infinitic.common.transport.consumers
import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.data.MillisInstant
import io.infinitic.common.transport.BatchProcessorConfig
import io.infinitic.common.transport.config.BatchConfig
import io.infinitic.common.transport.interfaces.TransportConsumer
import io.infinitic.common.transport.interfaces.TransportMessage
import kotlinx.coroutines.CoroutineScope
Expand All @@ -36,27 +37,28 @@ import kotlinx.coroutines.launch
*
* @param concurrency The number of concurrent coroutines for processing messages.
* @param deserialize A suspending function to deserialize the transport message into its payload.
* @param process A suspending function to process the deserialized message along with its publishing time.
* @param processor A suspending function to process the deserialized message along with its publishing time.
* @param batchProcessorConfig An optional suspending function to configure batching of messages.
* @param batchProcess An optional suspending function to process a batch of messages.
* @param batchProcessor An optional suspending function to process a batch of messages.
* @return A Job representing the coroutine that runs the consuming process.
*/
context(CoroutineScope, KLogger)
fun <T : TransportMessage<M>, M : Any> TransportConsumer<T>.startAsync(
batchReceivingConfig: BatchConfig?,
concurrency: Int,
deserialize: suspend (T) -> M,
process: suspend (M, MillisInstant) -> Unit,
processor: suspend (M, MillisInstant) -> Unit,
beforeDlq: (suspend (M, Exception) -> Unit)? = null,
batchProcessorConfig: (suspend (M) -> BatchProcessorConfig?)? = null,
batchProcess: (suspend (List<M>, List<MillisInstant>) -> Unit)? = null,
batchProcessor: (suspend (List<M>, List<MillisInstant>) -> Unit)? = null,
): Job = launch {
startConsuming()
startConsuming(batchReceivingConfig != null)
.completeProcess(
concurrency,
deserialize,
process,
processor,
beforeDlq,
batchProcessorConfig,
batchProcess,
batchProcessor,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ fun <S> Channel<S>.startBatching(
* or the specified timeout duration elapses.
*
* @param maxMessages The maximum number of messages to include in the batch.
* @param timeout The maximum duration (in milliseconds) to wait for messages before returning the batch.
* @param timeoutMillis The maximum duration (in milliseconds) to wait for messages before returning the batch.
* @return A BatchResult containing the collected messages and a flag indicating if the channel is still open.
*/
@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun <S> Channel<S>.batchWithTimeout(
suspend fun <S> Channel<S>.batchWithTimeout(
maxMessages: Int,
timeout: Long
timeoutMillis: Long
): BatchResult<S> {
val endTime = System.currentTimeMillis() + timeout
val endTime = System.currentTimeMillis() + timeoutMillis
// isActive becomes false after timeout
var isActive = true
// isOpen becomes false if the channel is closed
Expand Down Expand Up @@ -117,7 +117,7 @@ private suspend fun <S> Channel<S>.batchWithTimeout(
* @param messages The list of messages collected in the batch.
* @param isOpen A flag indicating if the source channel is still open after collecting the batch.
*/
private data class BatchResult<S>(
data class BatchResult<S>(
val messages: List<S>,
val isOpen: Boolean,
)
Loading

0 comments on commit e556788

Please sign in to comment.