Skip to content

Commit

Permalink
Refactor producer
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Jan 18, 2024
1 parent 49a69c1 commit 3dfecd4
Show file tree
Hide file tree
Showing 23 changed files with 111 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ data object DelayedWorkflowEngineTopic : WorkflowTopic<WorkflowEngineMessage>()

data object WorkflowEventsTopic : WorkflowTopic<WorkflowEventMessage>()

data object WorkflowServiceExecutorTopic : WorkflowTopic<ServiceExecutorMessage>()
data object WorkflowTaskExecutorTopic : WorkflowTopic<ServiceExecutorMessage>()

data object DelayedWorkflowServiceExecutorTopic : WorkflowTopic<ServiceExecutorMessage>()
data object DelayedWorkflowTaskExecutorTopic : WorkflowTopic<ServiceExecutorMessage>()

data object WorkflowServiceEventsTopic : WorkflowTopic<ServiceEventMessage>()
data object WorkflowTaskEventsTopic : WorkflowTopic<ServiceEventMessage>()


/**
Expand All @@ -108,7 +108,7 @@ data object WorkflowServiceEventsTopic : WorkflowTopic<ServiceEventMessage>()
val Topic<*>.isDelayed
get() = when (this) {
DelayedWorkflowEngineTopic,
DelayedWorkflowServiceExecutorTopic,
DelayedWorkflowTaskExecutorTopic,
DelayedServiceExecutorTopic
-> true

Expand All @@ -125,7 +125,21 @@ val Topic<*>.isDelayed
val <S : Message> Topic<S>.withoutDelay
get() = when (this) {
DelayedWorkflowEngineTopic -> WorkflowEngineTopic
DelayedWorkflowServiceExecutorTopic -> WorkflowServiceExecutorTopic
DelayedWorkflowTaskExecutorTopic -> WorkflowTaskExecutorTopic
DelayedServiceExecutorTopic -> ServiceExecutorTopic
else -> this
} as Topic<S>

/**
* Returns a [Topic] relative to workflowTask
*
* @return The [Topic] without delay.
*/
@Suppress("UNCHECKED_CAST")
val <S : Message> Topic<S>.forWorkflow
get() = when (this) {
ServiceExecutorTopic -> WorkflowTaskExecutorTopic
DelayedServiceExecutorTopic -> DelayedWorkflowTaskExecutorTopic
ServiceEventsTopic -> WorkflowTaskEventsTopic
else -> this
} as Topic<S>
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ package io.infinitic.common.transport

import io.infinitic.common.data.MillisDuration
import io.infinitic.common.messages.Message
import io.infinitic.common.tasks.events.messages.ServiceEventMessage
import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage
import io.infinitic.common.tasks.tags.messages.ServiceTagMessage
import io.infinitic.common.topics.Topic
import io.infinitic.common.workflows.engine.events.WorkflowEventMessage

interface InfiniticProducer {
/**
Expand All @@ -40,79 +36,4 @@ interface InfiniticProducer {
topic: Topic<T>,
after: MillisDuration = MillisDuration(0)
)

/**
* Synchronously send a message to a client
*
* @param message the message to send
*/
//suspend fun sendToClient(message: ClientMessage)

/**
* Synchronously send a message to a workflow tag engine
*
* @param message the message to send
*/
//suspend fun sendToWorkflowTag(message: WorkflowTagMessage)

/**
* Synchronously send a message to a workflow-cmd
*
* @param message the message to send
*/
//suspend fun sendToWorkflowCmd(message: WorkflowEngineMessage)

/**
* Synchronously send a message to a workflow-engine
*
* @param message the message to send
*/
//suspend fun sendToWorkflowEngine(message: WorkflowEngineMessage)

// suspend fun sendToWorkflowEngineAfter(
// message: WorkflowEngineMessage,
// after: MillisDuration
// )

/**
* Synchronously send a message to workflow-events
*
* @param message the message to send
*/
suspend fun sendToWorkflowEvents(message: WorkflowEventMessage)

/**
* Synchronously send a message to a task-tag
*
* @param message the message to send
*/
suspend fun sendToServiceTag(message: ServiceTagMessage)

/**
* Synchronously send a message to a task-executor
*
* @param message the message to send
*/
suspend fun sendToServiceExecutor(message: ServiceExecutorMessage)

/**
* Synchronously send a message to a task-executor
*
* @param message the message to send
* @param after the delay before sending the message
*/
suspend fun sendToServiceExecutorAfter(
message: ServiceExecutorMessage,
after: MillisDuration
)


/**
* Synchronously send a message to task-events
*
* @param message the message to send to the task result handler.
*/
suspend fun sendToTaskEvents(
message: ServiceEventMessage
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ package io.infinitic.common.transport

import io.infinitic.common.data.MillisDuration
import io.infinitic.common.messages.Message
import io.infinitic.common.tasks.events.messages.ServiceEventMessage
import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage
import io.infinitic.common.topics.Topic
import io.infinitic.common.topics.forWorkflow
import io.infinitic.common.topics.isDelayed
import io.infinitic.common.topics.withoutDelay
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -60,9 +63,16 @@ interface InfiniticProducerAsync {
): CompletableFuture<Unit> {
require(after <= 0 || topic.isDelayed) { "Trying to send to $topic with a delay $after" }

// Switch to workflow-related topics for workflowTasks
val t = when (this) {
is ServiceExecutorMessage -> if (isWorkflowTask()) topic.forWorkflow else topic
is ServiceEventMessage -> if (isWorkflowTask()) topic.forWorkflow else topic
else -> topic
}

return when {
after <= 0 -> internalSendToAsync(this, topic.withoutDelay, MillisDuration(0))
else -> internalSendToAsync(this, topic, after)
after <= 0 -> internalSendToAsync(this, t.withoutDelay, MillisDuration(0))
else -> internalSendToAsync(this, t, after)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,14 @@ package io.infinitic.common.transport
import io.github.oshai.kotlinlogging.KotlinLogging
import io.infinitic.common.data.MillisDuration
import io.infinitic.common.messages.Message
import io.infinitic.common.tasks.events.messages.ServiceEventMessage
import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage
import io.infinitic.common.tasks.tags.messages.ServiceTagMessage
import io.infinitic.common.topics.DelayedServiceExecutorTopic
import io.infinitic.common.topics.DelayedWorkflowServiceExecutorTopic
import io.infinitic.common.topics.ServiceEventsTopic
import io.infinitic.common.topics.ServiceExecutorTopic
import io.infinitic.common.topics.ServiceTagTopic
import io.infinitic.common.topics.Topic
import io.infinitic.common.topics.WorkflowEventsTopic
import io.infinitic.common.topics.WorkflowServiceEventsTopic
import io.infinitic.common.topics.WorkflowServiceExecutorTopic
import io.infinitic.common.workflows.engine.events.WorkflowEventMessage
import kotlinx.coroutines.future.await

class LoggedInfiniticProducer(
logName: String,
private val producerAsync: InfiniticProducerAsync,
) : InfiniticProducer {

private val logger = KotlinLogging.logger(logName)

lateinit var id: String
Expand All @@ -63,101 +52,6 @@ class LoggedInfiniticProducer(
logTrace(this)
}

// override suspend fun sendToClient(message: ClientMessage) {
// logDebug(message)
// with(producerAsync) { message.sendToAsync(ClientTopic) }.await()
// logTrace(message)
// }

// override suspend fun sendToWorkflowTag(message: WorkflowTagMessage) {
// logDebug(message)
// with(producerAsync) { message.sendToAsync(WorkflowTagTopic) }.await()
// logTrace(message)
// }

// override suspend fun sendToWorkflowCmd(message: WorkflowEngineMessage) {
// logDebug(message)
// with(producerAsync) { message.sendToAsync(WorkflowCmdTopic) }.await()
// logTrace(message)
// }

// override suspend fun sendToWorkflowEngine(
// message: WorkflowEngineMessage,
// ) {
// logDebug(message)
// with(producerAsync) { message.sendToAsync(WorkflowEngineTopic) }.await()
// logTrace(message)
// }

// override suspend fun sendToWorkflowEngineAfter(
// message: WorkflowEngineMessage,
// after: MillisDuration
// ) {
// logDebug(message, after)
// with(producerAsync) { message.sendToAsync(DelayedWorkflowEngineTopic, after) }.await()
// logTrace(message)
// }

override suspend fun sendToWorkflowEvents(message: WorkflowEventMessage) {
logDebug(message)
with(producerAsync) { message.sendToAsync(WorkflowEventsTopic) }.await()
logTrace(message)
}

override suspend fun sendToServiceTag(message: ServiceTagMessage) {
logDebug(message)
with(producerAsync) { message.sendToAsync(ServiceTagTopic) }.await()
logTrace(message)
}

override suspend fun sendToServiceExecutor(message: ServiceExecutorMessage) {
logDebug(message)
with(producerAsync) {
when (message.isWorkflowTask()) {
true -> message.sendToAsync(WorkflowServiceExecutorTopic)
false -> message.sendToAsync(ServiceExecutorTopic)
}
}.await()
logTrace(message)
}

override suspend fun sendToServiceExecutorAfter(
message: ServiceExecutorMessage,
after: MillisDuration
) {
logDebug(message, after)

when (message.isWorkflowTask()) {
true -> when (after > 0) {
true -> with(producerAsync) {
message.sendToAsync(DelayedWorkflowServiceExecutorTopic, after)
}.await()

false -> sendToServiceExecutor(message)
}

false -> when (after > 0) {
true -> with(producerAsync) {
message.sendToAsync(DelayedServiceExecutorTopic, after)
}.await()

false -> sendToServiceExecutor(message)
}
}
logTrace(message)
}

override suspend fun sendToTaskEvents(message: ServiceEventMessage) {
logDebug(message)
with(producerAsync) {
when (message.isWorkflowTask()) {
true -> message.sendToAsync(WorkflowServiceEventsTopic)
false -> message.sendToAsync(ServiceEventsTopic)
}
}.await()
logTrace(message)
}

private fun logDebug(message: Message, after: MillisDuration? = null) {
logger.debug {
val idStr = if (::id.isInitialized) "Id $id - " else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
package io.infinitic.dashboard.panels.infrastructure

import io.infinitic.common.topics.WorkflowServiceExecutorTopic
import io.infinitic.common.topics.WorkflowTaskExecutorTopic
import io.infinitic.dashboard.Infinitic
import io.infinitic.dashboard.panels.infrastructure.requests.Loading
import org.apache.pulsar.common.policies.data.PartitionedTopicStats
Expand All @@ -42,7 +42,7 @@ data class AllWorkflowsState(

override fun getPartitionedStats(name: String): Result<PartitionedTopicStats?> {
val topic =
with(Infinitic.pulsarResources) { WorkflowServiceExecutorTopic.fullName(name) }
with(Infinitic.pulsarResources) { WorkflowTaskExecutorTopic.fullName(name) }

return Infinitic.pulsarResources.admin.getPartitionedTopicStats(topic)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.infinitic.common.tasks.events.messages.TaskFailedEvent
import io.infinitic.common.tasks.events.messages.TaskRetriedEvent
import io.infinitic.common.tasks.events.messages.TaskStartedEvent
import io.infinitic.common.topics.ClientTopic
import io.infinitic.common.topics.ServiceTagTopic
import io.infinitic.common.topics.WorkflowEngineTopic
import io.infinitic.common.transport.InfiniticProducerAsync
import io.infinitic.common.transport.LoggedInfiniticProducer
Expand Down Expand Up @@ -100,7 +101,7 @@ class TaskEventHandler(producerAsync: InfiniticProducerAsync) {
}
// remove tags
msg.getEventsForTag(emitterName).forEach {
launch { producer.sendToServiceTag(it) }
launch { with(producer) { it.sendTo(ServiceTagTopic) } }
}
}
// If we are dealing with a workflowTask, we ensure that new commands are dispatched only AFTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import io.infinitic.common.tasks.events.messages.TaskRetriedEvent
import io.infinitic.common.tasks.events.messages.TaskStartedEvent
import io.infinitic.common.tasks.executors.messages.ExecuteTask
import io.infinitic.common.tasks.executors.messages.ServiceExecutorMessage
import io.infinitic.common.topics.DelayedServiceExecutorTopic
import io.infinitic.common.topics.ServiceEventsTopic
import io.infinitic.common.transport.InfiniticProducerAsync
import io.infinitic.common.transport.LoggedInfiniticProducer
import io.infinitic.common.utils.getCheckMode
Expand Down Expand Up @@ -197,8 +199,7 @@ class TaskExecutor(

private suspend fun sendTaskStarted(msg: ExecuteTask) {
val event = TaskStartedEvent.from(msg, emitterName)

producer.sendToTaskEvents(event)
with(producer) { event.sendTo(ServiceEventsTopic) }
}

suspend fun sendTaskFailed(
Expand All @@ -212,8 +213,7 @@ class TaskExecutor(
description?.let { msg.logError(cause, it) }

val event = TaskFailedEvent.from(msg, emitterName, cause, meta)

producer.sendToTaskEvents(event)
with(producer) { event.sendTo(ServiceEventsTopic) }
}

private suspend fun sendRetryTask(
Expand All @@ -225,13 +225,11 @@ class TaskExecutor(
msg.logWarn(cause) { "Retrying in $delay" }

val executeTask = ExecuteTask.retryFrom(msg, emitterName, cause, meta)

producer.sendToServiceExecutorAfter(executeTask, delay)
with(producer) { executeTask.sendTo(DelayedServiceExecutorTopic, delay) }

// once sent, we publish the event
val event = TaskRetriedEvent.from(msg, emitterName, cause, delay, meta)

producer.sendToTaskEvents(event)
with(producer) { event.sendTo(ServiceEventsTopic) }
}

private suspend fun sendTaskCompleted(
Expand All @@ -240,8 +238,7 @@ class TaskExecutor(
meta: MutableMap<String, ByteArray>
) {
val event = TaskCompletedEvent.from(msg, emitterName, value, meta)

producer.sendToTaskEvents(event)
with(producer) { event.sendTo(ServiceEventsTopic) }
}

private fun parse(msg: ExecuteTask): TaskCommand {
Expand Down
Loading

0 comments on commit 3dfecd4

Please sign in to comment.