Skip to content

Commit

Permalink
Merge pull request #209 from infiniticio/v0.12.2
Browse files Browse the repository at this point in the history
- restore config files as data classes instead of interfaces
- restore `fromConfig` method for clients and workers
- (internal) rename InfiniticConsumer to InfiniticConsumerAsync
- improve worker logging
  • Loading branch information
geomagilles authored Dec 27, 2023
2 parents e0ee95e + a8648dc commit 238f309
Show file tree
Hide file tree
Showing 47 changed files with 5,522 additions and 713 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Ci.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object Ci {
private const val SNAPSHOT = "-SNAPSHOT"

// base version number
private const val BASE = "0.12.1"
private const val BASE = "0.12.2"

// GitHub run number
private val githubRunNumber = System.getenv("GITHUB_RUN_NUMBER")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
package io.infinitic.cache.config

/** Cache configuration */
interface CacheConfig {
interface CacheConfigInterface {
val cache: Cache
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
*/
package io.infinitic.cache.config

data class CacheConfigImpl(override val cache: Cache = Cache()) : CacheConfig
data class CacheConfigImpl(override val cache: Cache = Cache()) : CacheConfigInterface
1 change: 1 addition & 0 deletions infinitic-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {

implementation(project(":infinitic-autoclose"))

api(project(":infinitic-common"))
api(project(":infinitic-transport"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
*/
package io.infinitic.clients

import io.infinitic.autoclose.addAutoCloseResource
import io.infinitic.autoclose.autoClose
import io.infinitic.clients.config.ClientConfig
import io.infinitic.clients.config.ClientConfigInterface
import io.infinitic.clients.dispatcher.ClientDispatcher
import io.infinitic.common.clients.messages.ClientMessage
import io.infinitic.common.proxies.ExistingWorkflowProxyHandler
Expand All @@ -34,28 +36,29 @@ import io.infinitic.common.proxies.RequestByWorkflowId
import io.infinitic.common.proxies.RequestByWorkflowTag
import io.infinitic.common.tasks.data.TaskId
import io.infinitic.common.tasks.data.TaskMeta
import io.infinitic.common.transport.InfiniticConsumer
import io.infinitic.common.transport.InfiniticConsumerAsync
import io.infinitic.common.transport.InfiniticProducerAsync
import io.infinitic.common.transport.LoggedInfiniticProducer
import io.infinitic.common.workflows.data.methodRuns.MethodRunId
import io.infinitic.common.workflows.data.workflows.WorkflowMeta
import io.infinitic.common.workflows.data.workflows.WorkflowTag
import io.infinitic.exceptions.clients.InvalidIdTagSelectionException
import io.infinitic.exceptions.clients.InvalidStubException
import io.infinitic.transport.config.TransportConfig
import io.infinitic.workflows.DeferredStatus
import org.jetbrains.annotations.TestOnly
import java.lang.reflect.Proxy
import java.util.concurrent.CompletableFuture

@Suppress("unused")
class InfiniticClient(
consumer: InfiniticConsumer,
consumerAsync: InfiniticConsumerAsync,
producerAsync: InfiniticProducerAsync
) : InfiniticClientInterface {

private val producer = LoggedInfiniticProducer(javaClass.name, producerAsync)

private val dispatcher = ClientDispatcher(javaClass.name, consumer, producer)
private val dispatcher = ClientDispatcher(javaClass.name, consumerAsync, producer)

override val name by lazy { producerAsync.name }

Expand Down Expand Up @@ -202,15 +205,37 @@ class InfiniticClient(
}

companion object {
/** Create InfiniticClient from config */
@JvmStatic
fun fromConfig(config: ClientConfigInterface): InfiniticClient = with(config) {
val transportConfig = TransportConfig(transport, pulsar)

/** Infinitic Consumer */
val consumerAsync = transportConfig.consumerAsync

/** Infinitic Producer */
val producerAsync = transportConfig.producerAsync

// apply name if it exists
name?.let { producerAsync.name = it }

/** Infinitic Client */
InfiniticClient(consumerAsync, producerAsync).also {
// close consumer with the client
it.addAutoCloseResource(consumerAsync)
}
}


/** Create InfiniticClient with config from resources directory */
@JvmStatic
fun fromConfigResource(vararg resources: String) =
ClientConfig.fromResource(*resources).client
fun fromConfigResource(vararg resources: String): InfiniticClient =
fromConfig(ClientConfig.fromResource(*resources))


/** Create InfiniticClient with config from system file */
@JvmStatic
fun fromConfigFile(vararg files: String) =
ClientConfig.fromFile(*files).client
fun fromConfigFile(vararg files: String): InfiniticClient =
fromConfig(ClientConfig.fromFile(*files))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,32 @@
*/
package io.infinitic.clients.config

import io.infinitic.clients.InfiniticClient
import io.infinitic.common.config.loadConfigFromFile
import io.infinitic.common.config.loadConfigFromResource
import io.infinitic.transport.config.TransportConfig

interface ClientConfig : TransportConfig {
import io.infinitic.pulsar.config.Pulsar
import io.infinitic.transport.config.Transport

data class ClientConfig @JvmOverloads constructor(
/** Client name */
val name: String?
override val name: String? = null,

/** Transport configuration */
override val transport: Transport = Transport.pulsar,

/** Pulsar configuration */
override val pulsar: Pulsar? = null

/** Infinitic Client */
val client: InfiniticClient
) : ClientConfigInterface {

companion object {
/** Create ClientConfig from file in file system */
@JvmStatic
fun fromFile(vararg files: String): ClientConfig =
loadConfigFromFile<ClientConfigData>(files.toList())
loadConfigFromFile<ClientConfig>(files.toList())

/** Create ClientConfig from file in resources directory */
@JvmStatic
fun fromResource(vararg resources: String): ClientConfig =
loadConfigFromResource<ClientConfigData>(resources.toList())
loadConfigFromResource<ClientConfig>(resources.toList())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* "Commons Clause" License Condition v1.0
*
* The Software is provided to you by the Licensor under the License, as defined below, subject to
* the following condition.
*
* Without limiting other conditions in the License, the grant of rights under the License will not
* include, and the License does not grant to you, the right to Sell the Software.
*
* For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you
* under the License to provide to third parties, for a fee or other consideration (including
* without limitation fees for hosting or consulting/ support services related to the Software), a
* product or service whose value derives, entirely or substantially, from the functionality of the
* Software. Any license notice or attribution required by the License must also include this
* Commons Clause License Condition notice.
*
* Software: Infinitic
*
* License: MIT License (https://opensource.org/licenses/MIT)
*
* Licensor: infinitic.io
*/
package io.infinitic.clients.config

import io.infinitic.transport.config.TransportConfigInterface

interface ClientConfigInterface : TransportConfigInterface {
/** Client name */
val name: String?
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import io.infinitic.common.exceptions.thisShouldNotHappen
import io.infinitic.workflows.SendChannel
import java.util.concurrent.CompletableFuture

internal class DeferredChannel<R : SendChannel<*>>(private val channel: R) : Deferred<R> {
class DeferredChannel<R : SendChannel<*>> internal constructor(
private val channel: R
) : Deferred<R> {

override fun cancelAsync(): CompletableFuture<Unit> {
thisShouldNotHappen()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import io.infinitic.common.exceptions.thisShouldNotHappen
import io.infinitic.common.workflows.data.channels.SignalId
import java.util.concurrent.CompletableFuture

internal class DeferredSend<R : Any?>(internal val signalId: SignalId) : Deferred<R> {
class DeferredSend<R : Any?> internal constructor(
internal val signalId: SignalId
) : Deferred<R> {

override fun cancelAsync(): CompletableFuture<Unit> {
thisShouldNotHappen()
Expand All @@ -41,7 +43,8 @@ internal class DeferredSend<R : Any?>(internal val signalId: SignalId) : Deferre
// also we do not apply the join method
// in order to send asynchronously the message
// despite the synchronous syntax: workflow.channel
@Suppress("UNCHECKED_CAST") override fun await(): R = Unit as R
@Suppress("UNCHECKED_CAST")
override fun await(): R = Unit as R

override val id: String = signalId.toString()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.infinitic.common.proxies.RequestByWorkflowTag
import io.infinitic.common.workflows.data.methodRuns.MethodRunId
import io.infinitic.common.workflows.data.workflows.WorkflowName

class ExistingDeferredWorkflow<R>(
class ExistingDeferredWorkflow<R> internal constructor(
internal val workflowName: WorkflowName,
internal val requestBy: RequestBy,
internal val methodName: MethodName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import io.infinitic.common.proxies.RequestByWorkflowId
import io.infinitic.common.workflows.data.workflows.WorkflowId
import io.infinitic.common.workflows.data.workflows.WorkflowName

class NewDeferredWorkflow<R>(
class NewDeferredWorkflow<R> internal constructor(
internal val workflowName: WorkflowName,
internal val methodName: MethodName,
internal val methodReturnClass: Class<R>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import io.infinitic.common.proxies.RequestByWorkflowTag
import io.infinitic.common.tasks.data.ServiceName
import io.infinitic.common.tasks.data.TaskId
import io.infinitic.common.tasks.executors.errors.MethodFailedError
import io.infinitic.common.transport.InfiniticConsumer
import io.infinitic.common.transport.InfiniticConsumerAsync
import io.infinitic.common.transport.InfiniticProducer
import io.infinitic.common.workflows.data.channels.SignalId
import io.infinitic.common.workflows.data.methodRuns.MethodRunId
Expand Down Expand Up @@ -99,9 +99,9 @@ import java.util.concurrent.CompletableFuture
import io.infinitic.common.workflows.engine.messages.RetryTasks as RetryTaskInWorkflow
import io.infinitic.common.workflows.tags.messages.RetryTasksByTag as RetryTaskInWorkflowByTag

class ClientDispatcher(
internal class ClientDispatcher(
logName: String,
private val consumer: InfiniticConsumer,
private val consumerAsync: InfiniticConsumerAsync,
private val producer: InfiniticProducer
) : ProxyDispatcher, Closeable {
private val logger = KotlinLogging.logger(logName)
Expand Down Expand Up @@ -741,7 +741,7 @@ class ClientDispatcher(
// lazily starts client consumer if not already started
synchronized(this) {
if (!isClientConsumerInitialized) {
consumer.startClientConsumerAsync(::handle, null, clientName)
consumerAsync.startClientConsumerAsync(::handle, null, clientName)
isClientConsumerInitialized = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import io.infinitic.common.fixtures.TestFactory
import io.infinitic.common.fixtures.later
import io.infinitic.common.tasks.executors.messages.TaskExecutorMessage
import io.infinitic.common.tasks.tags.messages.TaskTagMessage
import io.infinitic.common.transport.InfiniticConsumer
import io.infinitic.common.transport.InfiniticConsumerAsync
import io.infinitic.common.transport.InfiniticProducerAsync
import io.infinitic.common.workflows.data.channels.ChannelName
import io.infinitic.common.workflows.data.channels.ChannelType
Expand Down Expand Up @@ -128,11 +128,11 @@ private val producerAsync: InfiniticProducerAsync = mockk<InfiniticProducerAsync
every { sendAsync(capture(workflowEngineSlot), capture(delaySlot)) } answers { engineResponse() }
}

private val consumer: InfiniticConsumer = mockk<InfiniticConsumer> {
private val consumerAsync: InfiniticConsumerAsync = mockk<InfiniticConsumerAsync> {
every { startClientConsumerAsync(any(), any(), clientNameTest) } returns completed()
}

private val client = InfiniticClient(consumer, producerAsync)
private val client = InfiniticClient(consumerAsync, producerAsync)

private class ClientWorkflowTests : StringSpec(
{
Expand Down Expand Up @@ -175,7 +175,7 @@ private class ClientWorkflowTests : StringSpec(
)

// when asynchronously dispatching a workflow, the consumer should not be started
verify { consumer.startClientConsumerAsync(any(), any(), any()) wasNot called }
verify { consumerAsync.startClientConsumerAsync(any(), any(), any()) wasNot called }
}

"Should be able to dispatch a workflow with annotation" {
Expand Down Expand Up @@ -379,13 +379,13 @@ private class ClientWorkflowTests : StringSpec(
)

// when waiting for a workflow, the consumer should be started
verify { consumer.startClientConsumerAsync(any(), any(), clientNameTest) }
verify { consumerAsync.startClientConsumerAsync(any(), any(), clientNameTest) }

// restart a workflow
client.dispatch(fakeWorkflow::m3, 0, "a").await()

// the consumer should be started only once
verify { consumer.startClientConsumerAsync(any(), any(), any()) wasNot called }
verify { consumerAsync.startClientConsumerAsync(any(), any(), any()) wasNot called }
}

"Should throw a WorkflowTimedOutException when waiting for a workflow more than timeout" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.infinitic.common.workflows.engine.messages.WorkflowEngineMessage
import io.infinitic.common.workflows.tags.messages.WorkflowTagMessage
import java.util.concurrent.CompletableFuture

interface InfiniticConsumer : AutoCloseable {
interface InfiniticConsumerAsync : AutoCloseable {

// Asynchronously start consumers of messages to client
fun startClientConsumerAsync(
Expand Down
Loading

0 comments on commit 238f309

Please sign in to comment.