diff --git a/infinitic-client/src/main/kotlin/io/infinitic/clients/InfiniticClient.kt b/infinitic-client/src/main/kotlin/io/infinitic/clients/InfiniticClient.kt index acbc4d443..78c2090ee 100644 --- a/infinitic-client/src/main/kotlin/io/infinitic/clients/InfiniticClient.kt +++ b/infinitic-client/src/main/kotlin/io/infinitic/clients/InfiniticClient.kt @@ -80,9 +80,9 @@ class InfiniticClient( private var isClosed: AtomicBoolean = AtomicBoolean(false) // Scope used to asynchronously send message, and also to consumes messages - private val clientScope = CoroutineScope(Dispatchers.IO) + internal val clientScope = CoroutineScope(Dispatchers.IO) - private val dispatcher by lazy { ClientDispatcher(clientScope, consumer, producer) } + private val dispatcher by lazy { ClientDispatcher(clientScope, consumer, producer, logger) } override suspend fun getName() = producer.getName() diff --git a/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt b/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt index 16563c9e7..6f7424065 100644 --- a/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt +++ b/infinitic-client/src/main/kotlin/io/infinitic/clients/dispatcher/ClientDispatcher.kt @@ -22,6 +22,7 @@ */ package io.infinitic.clients.dispatcher +import io.github.oshai.kotlinlogging.KLogger import io.infinitic.clients.Deferred import io.infinitic.clients.deferred.DeferredChannel import io.infinitic.clients.deferred.DeferredSend @@ -95,24 +96,30 @@ import io.infinitic.exceptions.clients.InvalidChannelUsageException import io.infinitic.exceptions.clients.MultipleCustomIdException import io.infinitic.workflows.DeferredStatus import io.infinitic.workflows.SendChannel +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.future.future +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.jetbrains.annotations.TestOnly import java.lang.reflect.Method import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionException import java.util.concurrent.atomic.AtomicBoolean import io.infinitic.common.workflows.engine.messages.RetryTasks as RetryTaskInWorkflow import io.infinitic.common.workflows.tags.messages.RetryTasksByTag as RetryTaskInWorkflowByTag +import kotlinx.coroutines.Deferred as CoroutineDeferred internal class ClientDispatcher( private val clientScope: CoroutineScope, private val consumer: InfiniticConsumer, - private val producer: InfiniticProducer + private val producer: InfiniticProducer, + private val logger: KLogger ) : ProxyDispatcher { // Name of the client - private val emitterName by lazy { clientScope.future { EmitterName(producer.getName()) }.join() } + private val emitterName by lazy { runBlocking { EmitterName(producer.getName()) } } // This as requester private val clientRequester by lazy { ClientRequester(clientName = ClientName.from(emitterName)) } @@ -134,21 +141,19 @@ internal class ClientDispatcher( sendTo(topic) } - private fun T.sendToAsync(topic: Topic) = clientScope.future { - sendTo(topic) - } - // Utility to get access to last deferred internal fun getLastDeferred(): Deferred<*>? = localLastDeferred.get() // asynchronous call: dispatch(stub::method)(*args) fun dispatchAsync(handler: ProxyHandler<*>): CompletableFuture> = - when (handler) { - is NewWorkflowProxyHandler -> handler.dispatchMethodAsync() - is ExistingWorkflowProxyHandler -> handler.dispatchMethodAsync() - is ChannelProxyHandler -> handler.dispatchSignalAsync() - is NewServiceProxyHandler -> thisShouldNotHappen() - is ExistingServiceProxyHandler -> thisShouldNotHappen() + clientScope.runAsync { + when (handler) { + is NewWorkflowProxyHandler -> handler.dispatchMethod() + is ExistingWorkflowProxyHandler -> handler.dispatchMethod() + is ChannelProxyHandler -> handler.dispatchSignal() + is NewServiceProxyHandler -> thisShouldNotHappen() + is ExistingServiceProxyHandler -> thisShouldNotHappen() + } } // synchronous call: stub.method(*args) @@ -156,32 +161,42 @@ internal class ClientDispatcher( when (handler) { is NewWorkflowProxyHandler -> handler.dispatchMethodAndWait() is ExistingWorkflowProxyHandler -> handler.dispatchMethodAndWait() - is ChannelProxyHandler -> handler.dispatchSignalAndWait() + is ChannelProxyHandler -> handler.dispatchSignal().await() is ExistingServiceProxyHandler -> thisShouldNotHappen() is NewServiceProxyHandler -> thisShouldNotHappen() } - internal fun awaitNewWorkflow( + private suspend fun awaitNewWorkflowAsync( deferred: NewDeferredWorkflow, clientWaiting: Boolean - ): T = awaitWorkflow( + ): CoroutineDeferred = awaitWorkflowAsync( deferred.workflowName, deferred.workflowId, - deferred.method, null, deferred.methodTimeout, deferred.dispatchTime, clientWaiting, ) - internal fun awaitExistingWorkflow( + internal fun awaitNewWorkflow( + deferred: NewDeferredWorkflow, + clientWaiting: Boolean + ): T = clientScope.run { + awaitNewWorkflowAsync(deferred, clientWaiting).await().getValue( + deferred.workflowName, + deferred.workflowId, + deferred.method, + WorkflowMethodId.from(deferred.workflowId), + ) + } + + private suspend fun awaitExistingWorkflowAsync( deferred: ExistingDeferredWorkflow, clientWaiting: Boolean - ): T = when (deferred.requestBy) { - is RequestByWorkflowId -> awaitWorkflow( + ): CoroutineDeferred = when (deferred.requestBy) { + is RequestByWorkflowId -> awaitWorkflowAsync( deferred.workflowName, deferred.requestBy.workflowId, - deferred.method, deferred.workflowMethodId, deferred.methodTimeout, deferred.dispatchTime, @@ -191,16 +206,38 @@ internal class ClientDispatcher( is RequestByWorkflowTag -> TODO() } + internal fun awaitExistingWorkflow( + deferred: ExistingDeferredWorkflow, + clientWaiting: Boolean + ): T = clientScope.run { + when (deferred.requestBy) { + is RequestByWorkflowId -> awaitWorkflowAsync( + deferred.workflowName, + deferred.requestBy.workflowId, + deferred.workflowMethodId, + deferred.methodTimeout, + deferred.dispatchTime, + clientWaiting, + ).await().getValue( + deferred.workflowName, + deferred.requestBy.workflowId, + deferred.method, + deferred.workflowMethodId, + ) as T + + is RequestByWorkflowTag -> TODO() + } + } + // wait for the completion of a method - private fun awaitWorkflow( + private suspend fun awaitWorkflowAsync( workflowName: WorkflowName, workflowId: WorkflowId, - workflowMethod: Method, workflowMethodId: WorkflowMethodId?, methodTimeout: MillisDuration?, dispatchTime: Long, clientWaiting: Boolean - ): T { + ): CoroutineDeferred { val methodId = workflowMethodId ?: WorkflowMethodId.from(workflowId) // calculate timeout from now @@ -209,7 +246,7 @@ internal class ClientDispatcher( ?.let { if (it < 0) 0 else it } ?: Long.MAX_VALUE - // lazily starts client consumer if not already started and waits + // lazily starts listener val waiting = awaitAsync(timeout) { it is MethodMessage && it.workflowId == workflowId && it.workflowMethodId == methodId } @@ -225,159 +262,125 @@ internal class ClientDispatcher( emittedAt = null, ) // synchronously sent the message to get errors - waitWorkflow.sendToAsync(WorkflowStateCmdTopic).join() + waitWorkflow.sendTo(WorkflowStateCmdTopic) } // Get result - val workflowResult = waiting.join() - - @Suppress("UNCHECKED_CAST") - return when (workflowResult) { - is MethodTimedOut, null -> { - throw WorkflowTimedOutException( - workflowName = workflowName.toString(), - workflowId = workflowId.toString(), - workflowMethodName = workflowMethod.name, - workflowMethodId = workflowMethodId?.toString(), - ) - } - - is MethodCompleted -> workflowMethod.decodeReturnValue(workflowResult.methodReturnValue) as T - - is MethodCanceled -> throw WorkflowCanceledException( - workflowName = workflowName.toString(), - workflowId = workflowId.toString(), - workflowMethodName = workflowMethod.name, - workflowMethodId = workflowResult.workflowMethodId.toString(), - ) - - is MethodFailed -> throw WorkflowFailedException.from( - MethodFailedError( - workflowName = workflowName, - workflowMethodName = MethodName(workflowMethod.name), - workflowId = workflowId, - workflowMethodId = workflowResult.workflowMethodId, - deferredError = workflowResult.cause, - ), - ) - - is MethodUnknown -> throw WorkflowUnknownException( - workflowName = workflowName.toString(), - workflowId = workflowId.toString(), - workflowMethodName = workflowMethod.name, - workflowMethodId = workflowMethodId?.toString(), - ) - - else -> thisShouldNotHappen("Unexpected ${workflowResult::class}") - } + return waiting } fun cancelWorkflowAsync( workflowName: WorkflowName, requestBy: RequestBy, workflowMethodId: WorkflowMethodId?, - ): CompletableFuture = when (requestBy) { - is RequestByWorkflowId -> { - val msg = CancelWorkflow( - cancellationReason = WorkflowCancellationReason.CANCELED_BY_CLIENT, - workflowMethodId = workflowMethodId, - workflowName = workflowName, - workflowId = requestBy.workflowId, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowStateCmdTopic) - } + ): CompletableFuture = clientScope.runAsync { + when (requestBy) { + is RequestByWorkflowId -> { + val msg = CancelWorkflow( + cancellationReason = WorkflowCancellationReason.CANCELED_BY_CLIENT, + workflowMethodId = workflowMethodId, + workflowName = workflowName, + workflowId = requestBy.workflowId, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowStateCmdTopic) + } - is RequestByWorkflowTag -> { - val msg = CancelWorkflowByTag( - workflowName = workflowName, - workflowTag = requestBy.workflowTag, - reason = WorkflowCancellationReason.CANCELED_BY_CLIENT, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowTagEngineTopic) - } + is RequestByWorkflowTag -> { + val msg = CancelWorkflowByTag( + workflowName = workflowName, + workflowTag = requestBy.workflowTag, + reason = WorkflowCancellationReason.CANCELED_BY_CLIENT, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowTagEngineTopic) + } - else -> thisShouldNotHappen() + else -> thisShouldNotHappen() + } } fun retryWorkflowTaskAsync( workflowName: WorkflowName, requestBy: RequestBy - ): CompletableFuture = when (requestBy) { - is RequestByWorkflowId -> { - val msg = RetryWorkflowTask( - workflowName = workflowName, - workflowId = requestBy.workflowId, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowStateCmdTopic) - } + ): CompletableFuture = clientScope.runAsync { + when (requestBy) { + is RequestByWorkflowId -> { + val msg = RetryWorkflowTask( + workflowName = workflowName, + workflowId = requestBy.workflowId, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowStateCmdTopic) + } - is RequestByWorkflowTag -> { - val msg = RetryWorkflowTaskByTag( - workflowName = workflowName, - workflowTag = requestBy.workflowTag, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowTagEngineTopic) - } + is RequestByWorkflowTag -> { + val msg = RetryWorkflowTaskByTag( + workflowName = workflowName, + workflowTag = requestBy.workflowTag, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowTagEngineTopic) + } - else -> thisShouldNotHappen() + else -> thisShouldNotHappen() + } } fun completeTaskAsync( serviceName: ServiceName, taskId: TaskId, returnValue: MethodReturnValue - ): CompletableFuture { + ): CompletableFuture = clientScope.runAsync { val msg = CompleteDelegatedTask( serviceName = serviceName, taskId = taskId, returnValue = returnValue, emitterName = emitterName, ) - return msg.sendToAsync(ServiceTagEngineTopic) + msg.sendTo(ServiceTagEngineTopic) } fun completeTimersAsync( workflowName: WorkflowName, requestBy: RequestBy, workflowMethodId: WorkflowMethodId? - ): CompletableFuture = when (requestBy) { - is RequestByWorkflowId -> { - val msg = CompleteTimers( - workflowMethodId = workflowMethodId, - workflowName = workflowName, - workflowId = requestBy.workflowId, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowStateCmdTopic) - } + ): CompletableFuture = clientScope.runAsync { + when (requestBy) { + is RequestByWorkflowId -> { + val msg = CompleteTimers( + workflowMethodId = workflowMethodId, + workflowName = workflowName, + workflowId = requestBy.workflowId, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowStateCmdTopic) + } - is RequestByWorkflowTag -> { - val msg = CompleteTimersByTag( - workflowName = workflowName, - workflowTag = requestBy.workflowTag, - workflowMethodId = workflowMethodId, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowTagEngineTopic) - } + is RequestByWorkflowTag -> { + val msg = CompleteTimersByTag( + workflowName = workflowName, + workflowTag = requestBy.workflowTag, + workflowMethodId = workflowMethodId, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowTagEngineTopic) + } - else -> thisShouldNotHappen() + else -> thisShouldNotHappen() + } } fun retryTaskAsync( @@ -386,43 +389,45 @@ internal class ClientDispatcher( taskId: TaskId?, taskStatus: DeferredStatus?, serviceName: ServiceName? - ): CompletableFuture = when (requestBy) { - is RequestByWorkflowId -> { - val msg = RetryTaskInWorkflow( - workflowName = workflowName, - workflowId = requestBy.workflowId, - emitterName = emitterName, - taskId = taskId, - taskStatus = taskStatus, - serviceName = serviceName, - requester = clientRequester, - emittedAt = null, - ) - msg.sendToAsync(WorkflowStateCmdTopic) - } + ): CompletableFuture = clientScope.runAsync { + when (requestBy) { + is RequestByWorkflowId -> { + val msg = RetryTaskInWorkflow( + workflowName = workflowName, + workflowId = requestBy.workflowId, + emitterName = emitterName, + taskId = taskId, + taskStatus = taskStatus, + serviceName = serviceName, + requester = clientRequester, + emittedAt = null, + ) + msg.sendTo(WorkflowStateCmdTopic) + } - is RequestByWorkflowTag -> { - val msg = RetryTaskInWorkflowByTag( - workflowName = workflowName, - workflowTag = requestBy.workflowTag, - taskId = taskId, - taskStatus = taskStatus, - serviceName = serviceName, - requester = clientRequester, - emitterName = emitterName, - emittedAt = null, - ) - msg.sendToAsync(WorkflowTagEngineTopic) - } + is RequestByWorkflowTag -> { + val msg = RetryTaskInWorkflowByTag( + workflowName = workflowName, + workflowTag = requestBy.workflowTag, + taskId = taskId, + taskStatus = taskStatus, + serviceName = serviceName, + requester = clientRequester, + emitterName = emitterName, + emittedAt = null, + ) + msg.sendTo(WorkflowTagEngineTopic) + } - else -> thisShouldNotHappen() + else -> thisShouldNotHappen() + } } fun getWorkflowIdsByTag( workflowName: WorkflowName, workflowTag: WorkflowTag - ): Set { + ): Set = clientScope.run { // lazily starts client consumer if not already started and waits val waiting = awaitAsync { (it is WorkflowIdsByTag) && @@ -438,44 +443,55 @@ internal class ClientDispatcher( ) // synchronously sent the message to get errors - msg.sendToAsync(WorkflowTagEngineTopic).join() + msg.sendTo(WorkflowTagEngineTopic) - val workflowIdsByTag = waiting.join() as WorkflowIdsByTag + val workflowIdsByTag = waiting.await() as WorkflowIdsByTag - return workflowIdsByTag.workflowIds.map { it.toString() }.toSet() + workflowIdsByTag.workflowIds.map { it.toString() }.toSet() } - // asynchronous call: dispatch(stub::method)(*args) - @Suppress("UNCHECKED_CAST") - private fun NewWorkflowProxyHandler<*>.dispatchMethodAsync(): CompletableFuture> = + private suspend fun NewWorkflowProxyHandler<*>.dispatchMethod(): Deferred = when (isChannelGetter()) { true -> throw InvalidChannelUsageException() false -> { - val deferredWorkflow = newDeferredWorkflow( + @Suppress("UNCHECKED_CAST") + newDeferredWorkflow( workflowName, method, method.returnType as Class, getTimeout(), - ) - dispatchMethodAsync(deferredWorkflow, false) + ).also { + dispatchMethod(it, false) + } } } // synchronous call: stub.method(*args) - @Suppress("UNCHECKED_CAST") private fun NewWorkflowProxyHandler<*>.dispatchMethodAndWait(): R = when (isChannelGetter()) { true -> throw InvalidChannelUsageException() false -> { + @Suppress("UNCHECKED_CAST") val deferredWorkflow = newDeferredWorkflow( workflowName, method, method.returnType as Class, getTimeout(), ) - dispatchMethodAsync(deferredWorkflow, true) - awaitNewWorkflow(deferredWorkflow, false) + clientScope.run { + val future = awaitNewWorkflowAsync(deferredWorkflow, false) + + // synchronously send the message to get errors + dispatchMethod(deferredWorkflow, true) + + future.await().getValue( + workflowName, + deferredWorkflow.workflowId, + deferredWorkflow.method, + WorkflowMethodId.from(deferredWorkflow.workflowId), + ) as R + } } } @@ -488,10 +504,10 @@ internal class ClientDispatcher( // store in ThreadLocal to be used in ::getDeferred .also { localLastDeferred.set(it) } - private fun NewWorkflowProxyHandler<*>.dispatchMethodAsync( + private suspend fun NewWorkflowProxyHandler<*>.dispatchMethod( deferred: NewDeferredWorkflow, clientWaiting: Boolean, - ): CompletableFuture> { + ) { // it's important to build those objects out of the coroutine scope // otherwise the handler's value could be changed if reused val customIds = workflowTags.filter { it.isCustomId() } @@ -500,17 +516,19 @@ internal class ClientDispatcher( // no customId tag provided 0 -> { // first, we send all tags in parallel - val futures = workflowTags.map { - AddTagToWorkflow( - workflowName = deferred.workflowName, - workflowTag = it, - workflowId = deferred.workflowId, - emitterName = emitterName, - emittedAt = null, - ).sendToAsync(WorkflowTagEngineTopic) + coroutineScope { + workflowTags.map { + launch { + AddTagToWorkflow( + workflowName = deferred.workflowName, + workflowTag = it, + workflowId = deferred.workflowId, + emitterName = emitterName, + emittedAt = null, + ).sendTo(WorkflowTagEngineTopic) + } + } } - CompletableFuture.allOf(*futures.toTypedArray()).join() - // dispatch workflow message val dispatchWorkflow = DispatchWorkflow( workflowName = deferred.workflowName, @@ -525,10 +543,9 @@ internal class ClientDispatcher( emitterName = emitterName, emittedAt = null, ) - // workflow message is dispatched after tags // to avoid a potential race condition if the engine remove tags - dispatchWorkflow.sendToAsync(WorkflowStateCmdTopic).thenApply { deferred } + dispatchWorkflow.sendTo(WorkflowStateCmdTopic) } // a customId tag was provided 1 -> { @@ -548,32 +565,34 @@ internal class ClientDispatcher( emitterName = emitterName, emittedAt = null, ) - dispatchWorkflowByCustomId.sendToAsync(WorkflowTagEngineTopic).thenApply { deferred } + dispatchWorkflowByCustomId.sendTo(WorkflowTagEngineTopic) } // more than 1 customId tag were provided else -> throw MultipleCustomIdException } } + // asynchronous call: dispatch(stub::method)(*args) @Suppress("UNCHECKED_CAST") - private fun ExistingWorkflowProxyHandler<*>.dispatchMethodAsync(): CompletableFuture> = + private suspend fun ExistingWorkflowProxyHandler<*>.dispatchMethod(): Deferred = when (isChannelGetter()) { true -> { // special case of getting a channel from a workflow val channel = ChannelProxyHandler>(this).stub() - CompletableFuture.completedFuture(DeferredChannel(channel) as Deferred) + DeferredChannel(channel) as Deferred } false -> { - val deferred = existingDeferredWorkflow( + existingDeferredWorkflow( workflowName, requestBy, method, method.returnType as Class, getTimeout(), - ) - dispatchMethodAsync(deferred, false).thenApply { deferred } + ).also { + dispatchMethod(it, false) + } } } @@ -583,7 +602,6 @@ internal class ClientDispatcher( when (isChannelGetter()) { true -> { // special case of getting a channel from a workflow - @Suppress("UNCHECKED_CAST") ChannelProxyHandler>(this).stub() as R } @@ -595,10 +613,20 @@ internal class ClientDispatcher( method.returnType as Class, getTimeout(), ) - // synchronously sent the message to get errors - dispatchMethodAsync(deferred, true).join() - awaitExistingWorkflow(deferred, false) + clientScope.run { + val future = awaitExistingWorkflowAsync(deferred, false) + + // send the message synchronously to get errors + dispatchMethod(deferred, true) + + future.await().getValue( + deferred.workflowName, + deferred.requestBy.workflowId!!, + deferred.method, + deferred.workflowMethodId, + ) as R + } } } @@ -618,10 +646,10 @@ internal class ClientDispatcher( ) // store in ThreadLocal to be used in ::getDeferred .also { localLastDeferred.set(it) } - private fun ExistingWorkflowProxyHandler<*>.dispatchMethodAsync( + private suspend fun ExistingWorkflowProxyHandler<*>.dispatchMethod( deferred: ExistingDeferredWorkflow, clientWaiting: Boolean, - ): CompletableFuture = when (deferred.requestBy) { + ) = when (deferred.requestBy) { is RequestByWorkflowId -> { val dispatchMethod = DispatchMethod( workflowName = deferred.workflowName, @@ -635,7 +663,7 @@ internal class ClientDispatcher( emitterName = emitterName, emittedAt = null, ) - dispatchMethod.sendToAsync(WorkflowStateCmdTopic) + dispatchMethod.sendTo(WorkflowStateCmdTopic) } is RequestByWorkflowTag -> { @@ -652,33 +680,20 @@ internal class ClientDispatcher( emitterName = emitterName, emittedAt = null, ) - dispatchMethodByTag.sendToAsync(WorkflowTagEngineTopic) + dispatchMethodByTag.sendTo(WorkflowTagEngineTopic) } } // asynchronous call: dispatch(stub.channel::send, signal) - private fun ChannelProxyHandler<*>.dispatchSignalAsync(): CompletableFuture> { - val deferredSend = deferredSend() - - return dispatchSignalAsync(deferredSend).thenApply { deferredSend } - } - - // synchronous call: stub.channel.send(signal) - private fun ChannelProxyHandler<*>.dispatchSignalAndWait(): S { - val deferredSend = deferredSend() + private fun ChannelProxyHandler<*>.dispatchSignal() = + deferredSend().also { dispatchSignal(it) } - // dispatch signal synchronously - dispatchSignalAsync(deferredSend).join() - - return deferredSend.await() - } - - private fun ChannelProxyHandler<*>.dispatchSignalAsync( + private fun ChannelProxyHandler<*>.dispatchSignal( deferredSend: DeferredSend<*>, - ): CompletableFuture { + ) = clientScope.run { if (annotatedMethodName.toString() != SendChannel<*>::send.name) thisShouldNotHappen() - return when (requestBy) { + when (requestBy) { is RequestByWorkflowId -> { val sendSignal = SendSignal( channelName = channelName, @@ -691,7 +706,7 @@ internal class ClientDispatcher( emittedAt = null, requester = clientRequester, ) - sendSignal.sendToAsync(WorkflowStateCmdTopic) + sendSignal.sendTo(WorkflowStateCmdTopic) } is RequestByWorkflowTag -> { @@ -707,61 +722,124 @@ internal class ClientDispatcher( emittedAt = null, requester = clientRequester, ) - sendSignalByTag.sendToAsync(WorkflowTagEngineTopic) + sendSignalByTag.sendTo(WorkflowTagEngineTopic) } else -> thisShouldNotHappen() } } - private fun deferredSend(): DeferredSend { - val deferredSend = DeferredSend(SignalId()) - - // store in ThreadLocal to be used in ::getLastDeferred - localLastDeferred.set(deferredSend) - - return deferredSend - } + private fun deferredSend(): DeferredSend = + DeferredSend(SignalId()).also { + // store in ThreadLocal to be used in ::getLastDeferred + localLastDeferred.set(it) + } private fun ProxyHandler<*>.getTimeout(): MillisDuration? = timeoutInMillisDuration.getOrElse { throw IllegalStateException("Unable to retrieve Timeout info when dispatching $method", it) } - private fun startListeningAsync(): CompletableFuture = clientScope.future { - try { - consumer.start( - subscription = MainSubscription(ClientTopic), - entity = emitterName.toString(), - handler = { message, _ -> responseFlow.emit(message) }, - beforeDlq = null, - concurrency = 1, + @Suppress("UNCHECKED_CAST") + private fun ClientMessage?.getValue( + workflowName: WorkflowName, + workflowId: WorkflowId, + workflowMethod: Method, + workflowMethodId: WorkflowMethodId?, + ) = when (this) { + is MethodCompleted -> workflowMethod.decodeReturnValue(methodReturnValue) as T + + is MethodTimedOut, null -> { + throw WorkflowTimedOutException( + workflowName = workflowName.toString(), + workflowId = workflowId.toString(), + workflowMethodName = workflowMethod.name, + workflowMethodId = workflowMethodId?.toString(), ) - } catch (e: Exception) { - // all subsequent calls to await will fail and trigger this exception - responseFlow.emitThrowable(e) - throw e } + + is MethodCanceled -> throw WorkflowCanceledException( + workflowName = workflowName.toString(), + workflowId = workflowId.toString(), + workflowMethodName = workflowMethod.name, + workflowMethodId = workflowMethodId.toString(), + ) + + is MethodFailed -> throw WorkflowFailedException.from( + MethodFailedError( + workflowName = workflowName, + workflowMethodName = MethodName(workflowMethod.name), + workflowId = workflowId, + workflowMethodId = workflowMethodId, + deferredError = cause, + ), + ) + + is MethodUnknown -> throw WorkflowUnknownException( + workflowName = workflowName.toString(), + workflowId = workflowId.toString(), + workflowMethodName = workflowMethod.name, + workflowMethodId = workflowMethodId?.toString(), + ) + + else -> thisShouldNotHappen("Unexpected $this") } - private fun awaitAsync( - timeout: Long = Long.MAX_VALUE, - predicate: suspend (ClientMessage) -> Boolean - ): CompletableFuture = clientScope.future { - await(timeout, predicate) + private suspend fun startListeningAsync() { + if (hasClientConsumerStarted.compareAndSet(false, true)) { + logger.info { "Starting consumer client for client $emitterName" } + // synchronously make sure that the consumer is created and started + val listenerJob = with(clientScope) { + consumer.startAsync( + subscription = MainSubscription(ClientTopic), + entity = emitterName.toString(), + handler = { message, _ -> responseFlow.emit(message) }, + beforeDlq = null, + concurrency = 1, + ) + } + // asynchronously listen + clientScope.launch { + try { + listenerJob.join() + } catch (e: Exception) { + // all subsequent calls to await will fail and trigger this exception + responseFlow.emitThrowable(e) + throw e + } + } + } } - private fun await( + private suspend fun awaitAsync( timeout: Long = Long.MAX_VALUE, predicate: suspend (ClientMessage) -> Boolean - ): ClientMessage? = runBlocking { - if (hasClientConsumerStarted.compareAndSet(false, true)) { - // asynchronously starts client consumer if not already started - startListeningAsync() + ): CoroutineDeferred { + // make sure the client is listening + startListeningAsync() + + return clientScope.async { + responseFlow.first(timeout) { predicate(it) } } + } + + private fun CoroutineScope.runAsync(block: suspend () -> S): CompletableFuture = + CompletableFuture().also { + launch { + try { + it.complete(block()) + } catch (e: CancellationException) { + it.cancel(false) + } catch (e: Throwable) { + it.completeExceptionally(e) + } + } + } - // immediately wait for the message that matches the predicate - responseFlow.first(timeout) { predicate(it) } + private fun CoroutineScope.run(block: suspend () -> S): S = try { + runAsync(block).join() + } catch (e: CompletionException) { + throw e.cause ?: e } companion object { diff --git a/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt b/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt index 99775989d..672bb5d4b 100644 --- a/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt +++ b/infinitic-client/src/test/kotlin/io/infinitic/clients/InfiniticClientTests.kt @@ -91,13 +91,15 @@ import io.infinitic.transport.config.InMemoryTransportConfig import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec import io.kotest.matchers.shouldBe -import io.mockk.Runs +import io.mockk.clearAllMocks import io.mockk.coEvery import io.mockk.coVerify import io.mockk.every -import io.mockk.just import io.mockk.mockk import io.mockk.slot +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import java.util.concurrent.CopyOnWriteArrayList private val taskTagSlots = CopyOnWriteArrayList() @@ -105,9 +107,11 @@ private val workflowTagSlots = CopyOnWriteArrayList() private val taskSlot = slot() private val workflowCmdSlot = slot() private val delaySlot = slot() +private val scopeSlot = slot() private val clientNameTest = ClientName("clientTest") private val emitterNameTest = EmitterName("clientTest") + private suspend fun tagResponse() { workflowTagSlots.forEach { if (it is GetWorkflowIdsByTag) { @@ -138,7 +142,9 @@ private suspend fun engineResponse() { } internal val mockedProducer = mockk { - coEvery { getName() } returns "$clientNameTest" + coEvery { + getName() + } returns "$clientNameTest" coEvery { internalSendTo(capture(taskTagSlots), ServiceTagEngineTopic) } answers { } @@ -151,7 +157,13 @@ internal val mockedProducer = mockk { } internal val mockedConsumer = mockk { - coEvery { start(any>(), "$clientNameTest", any(), any(), any()) } just Runs + coEvery { + with(capture(scopeSlot)) { + startAsync(any>(), "$clientNameTest", any(), any(), 1) + } + } answers { + scopeSlot.captured.launch { delay(Long.MAX_VALUE) } + } } internal val mockedTransport = mockk { @@ -179,6 +191,7 @@ internal class InfiniticClientTests : StringSpec( val fakeWorkflowWithTags = client.newWorkflow(FakeWorkflow::class.java, tags = tags) beforeTest { + scopeSlot.clear() delaySlot.clear() taskTagSlots.clear() taskSlot.clear() @@ -186,6 +199,10 @@ internal class InfiniticClientTests : StringSpec( workflowCmdSlot.clear() } + afterTest { + clearAllMocks(answers = false) + } + "Should be able to dispatch a workflow" { // when val deferred = client.dispatch(fakeWorkflow::m0) @@ -207,13 +224,15 @@ internal class InfiniticClientTests : StringSpec( // when asynchronously dispatching a workflow, the consumer should not be started coVerify(exactly = 0) { - mockedConsumer.start( - MainSubscription(ClientTopic), - "$clientNameTest", - any(), - any(), - 1, - ) + with(client.clientScope) { + mockedConsumer.start( + MainSubscription(ClientTopic), + "$clientNameTest", + any(), + any(), + 1, + ) + } } } @@ -422,13 +441,15 @@ internal class InfiniticClientTests : StringSpec( // when waiting for a workflow, the consumer should be started coVerify { - mockedConsumer.start( - MainSubscription(ClientTopic), - "$clientNameTest", - any(), - any(), - 1, - ) + with(client.clientScope) { + mockedConsumer.startAsync( + MainSubscription(ClientTopic), + "$clientNameTest", + any(), + any(), + 1, + ) + } } // restart a workflow @@ -436,13 +457,15 @@ internal class InfiniticClientTests : StringSpec( // the consumer should be started only once coVerify(exactly = 1) { - mockedConsumer.start( - MainSubscription(ClientTopic), - "$clientNameTest", - any(), - any(), - 1, - ) + with(client.clientScope) { + mockedConsumer.startAsync( + MainSubscription(ClientTopic), + "$clientNameTest", + any(), + any(), + 1, + ) + } } }