Skip to content

Commit

Permalink
Implementing workflow-cmd and workflow-engine events - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Jan 23, 2024
1 parent 012fe00 commit d8c13e4
Show file tree
Hide file tree
Showing 33 changed files with 895 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class InfiniticClient(
): CompletableFuture<Unit> =
when (val handler = getProxyHandler(stub)) {
is ExistingWorkflowProxyHandler -> {
val taskName =
val serviceName =
taskClass?.let {
// Use NewTaskProxyHandler in case of use of @Name annotation
NewServiceProxyHandler(it, setOf(), TaskMeta()) { dispatcher }.serviceName
Expand All @@ -199,7 +199,7 @@ class InfiniticClient(
dispatcher.retryTaskAsync(
workflowName = handler.workflowName,
requestBy = handler.requestBy,
serviceName = taskName,
serviceName = serviceName,
taskStatus = taskStatus,
taskId = taskId?.let { TaskId(it) },
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ internal class ClientDispatcher(
workflowName = workflowName,
workflowId = workflowId,
emitterName = emitterName,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
// synchronously sent the message to get errors
Expand Down Expand Up @@ -286,6 +289,9 @@ internal class ClientDispatcher(
workflowMethodId = workflowMethodId,
workflowName = workflowName,
workflowId = requestBy.workflowId,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterName,
emittedAt = null,
)
Expand All @@ -297,7 +303,9 @@ internal class ClientDispatcher(
workflowName = workflowName,
workflowTag = requestBy.workflowTag,
reason = WorkflowCancellationReason.CANCELED_BY_CLIENT,
emitterWorkflowId = null,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterName,
emittedAt = null,
)
Expand All @@ -316,6 +324,9 @@ internal class ClientDispatcher(
workflowName = workflowName,
workflowId = requestBy.workflowId,
emitterName = emitterName,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
msg.sendToAsync(WorkflowCmdTopic)
Expand All @@ -325,6 +336,9 @@ internal class ClientDispatcher(
val msg = RetryWorkflowTaskByTag(
workflowName = workflowName,
workflowTag = requestBy.workflowTag,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterName,
emittedAt = null,
)
Expand All @@ -346,6 +360,9 @@ internal class ClientDispatcher(
workflowName = workflowName,
workflowId = requestBy.workflowId,
emitterName = emitterName,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
msg.sendToAsync(WorkflowCmdTopic)
Expand All @@ -357,6 +374,9 @@ internal class ClientDispatcher(
workflowTag = requestBy.workflowTag,
workflowMethodId = workflowMethodId,
emitterName = emitterName,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
msg.sendToAsync(WorkflowTagTopic)
Expand All @@ -381,6 +401,9 @@ internal class ClientDispatcher(
taskId = taskId,
taskStatus = taskStatus,
serviceName = serviceName,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
msg.sendToAsync(WorkflowCmdTopic)
Expand All @@ -394,6 +417,9 @@ internal class ClientDispatcher(
taskStatus = taskStatus,
serviceName = serviceName,
emitterName = emitterName,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
msg.sendToAsync(WorkflowTagTopic)
Expand Down Expand Up @@ -700,6 +726,9 @@ internal class ClientDispatcher(
channelTypes = handler.channelTypes,
workflowName = handler.workflowName,
workflowId = (handler.requestBy as RequestByWorkflowId).workflowId,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterName,
emittedAt = null,
)
Expand All @@ -715,6 +744,8 @@ internal class ClientDispatcher(
signalData = handler.signalData,
channelTypes = handler.channelTypes,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterName,
emittedAt = null,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ internal class InfiniticClientTests : StringSpec(
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = msg.workflowId,
emitterName = emitterNameTest,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)

Expand Down Expand Up @@ -466,6 +469,9 @@ internal class InfiniticClientTests : StringSpec(
channelTypes = ChannelType.allFrom(String::class.java),
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -486,6 +492,9 @@ internal class InfiniticClientTests : StringSpec(
channelTypes = ChannelType.allFrom(String::class.java),
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -505,6 +514,8 @@ internal class InfiniticClientTests : StringSpec(
signalData = SignalData.from("a"),
channelTypes = ChannelType.allFrom(String::class.java),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -525,6 +536,8 @@ internal class InfiniticClientTests : StringSpec(
signalData = SignalData.from("a"),
channelTypes = ChannelType.allFrom(String::class.java),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -550,6 +563,9 @@ internal class InfiniticClientTests : StringSpec(
),
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -575,6 +591,9 @@ internal class InfiniticClientTests : StringSpec(
),
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -593,6 +612,9 @@ internal class InfiniticClientTests : StringSpec(
serviceName = null,
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -611,6 +633,9 @@ internal class InfiniticClientTests : StringSpec(
serviceName = null,
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -627,6 +652,9 @@ internal class InfiniticClientTests : StringSpec(
workflowMethodId = null,
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -643,6 +671,9 @@ internal class InfiniticClientTests : StringSpec(
workflowMethodId = null,
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand Down Expand Up @@ -709,6 +740,9 @@ internal class InfiniticClientTests : StringSpec(
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
emitterName = emitterNameTest,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
}
Expand All @@ -725,6 +759,9 @@ internal class InfiniticClientTests : StringSpec(
workflowMethodId = null,
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(id),
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -741,7 +778,9 @@ internal class InfiniticClientTests : StringSpec(
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowTag = WorkflowTag(tag),
reason = WorkflowCancellationReason.CANCELED_BY_CLIENT,
emitterWorkflowId = null,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -759,7 +798,9 @@ internal class InfiniticClientTests : StringSpec(
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowTag = WorkflowTag("foo"),
reason = WorkflowCancellationReason.CANCELED_BY_CLIENT,
emitterWorkflowId = null,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emitterName = emitterNameTest,
emittedAt = null,
)
Expand All @@ -778,6 +819,9 @@ internal class InfiniticClientTests : StringSpec(
workflowName = WorkflowName(FakeWorkflow::class.java.name),
workflowId = WorkflowId(deferred.id),
emitterName = emitterNameTest,
parentWorkflowId = null,
parentWorkflowName = null,
parentWorkflowMethodId = null,
emittedAt = null,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
*/
package io.infinitic.common.tasks.data

import com.github.avrokotlin.avro4k.Avro
import com.github.avrokotlin.avro4k.AvroDefault
import com.github.avrokotlin.avro4k.AvroNamespace
import io.infinitic.common.data.ReturnValue
import io.infinitic.common.data.methods.MethodName
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable

@Serializable
@AvroNamespace("io.infinitic.tasks.data")
data class TaskReturnValue(
val taskId: TaskId,
@SerialName("taskName") val serviceName: ServiceName,
val taskMeta: TaskMeta,
val returnValue: ReturnValue
val taskId: TaskId,
@SerialName("taskName") val serviceName: ServiceName,
@AvroDefault(Avro.NULL) val taskMethodName: MethodName?,
val taskMeta: TaskMeta,
val returnValue: ReturnValue
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ object TaskTagSerializer : KSerializer<TaskTag> {

override fun deserialize(decoder: Decoder) = TaskTag(decoder.decodeString())
}

val Set<TaskTag>.set get() = map { it.tag }.toSet()
Loading

0 comments on commit d8c13e4

Please sign in to comment.