Skip to content

Commit

Permalink
Key batch (#265)
Browse files Browse the repository at this point in the history
New feature: 
- can add a batchKey into task meta, this batchKey is also in Task context
- events are now processed by batch

Improvment:
If task has a batch Key but no @Batch implementation, a warning is thrown

* BREAKING CHANGES
For consistency on CloudEvents, "start" command is replaced by "dispatched", "ended" event is replaced by "completed"
  • Loading branch information
geomagilles authored Oct 14, 2024
1 parent b2e69ff commit 8b63e2f
Show file tree
Hide file tree
Showing 104 changed files with 3,124 additions and 2,519 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Libs.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object Libs {
}

object Pulsar {
const val version = "3.0.6"
const val version = "3.0.7"
const val client = "org.apache.pulsar:pulsar-client:$version"
const val clientAdmin = "org.apache.pulsar:pulsar-client-admin:$version"
const val functions = "org.apache.pulsar:pulsar-functions-api:$version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import io.infinitic.common.proxies.RequestByWorkflowId
import io.infinitic.common.proxies.RequestByWorkflowTag
import io.infinitic.common.tasks.data.ServiceName
import io.infinitic.common.tasks.data.TaskId
import io.infinitic.common.transport.logged.LoggedInfiniticConsumer
import io.infinitic.common.transport.logged.LoggedInfiniticProducer
import io.infinitic.common.transport.logged.LoggedInfiniticResources
import io.infinitic.common.transport.InfiniticConsumer
import io.infinitic.common.transport.InfiniticProducer
import io.infinitic.common.transport.InfiniticResources
import io.infinitic.common.utils.annotatedName
import io.infinitic.common.workflows.data.workflowMethods.WorkflowMethodId
import io.infinitic.common.workflows.data.workflows.WorkflowMeta
Expand All @@ -63,16 +63,12 @@ class InfiniticClient(
val config: InfiniticClientConfigInterface
) : InfiniticClientInterface {

private val resources by lazy {
LoggedInfiniticResources(logger, config.transport.resources)
}
private val consumer by lazy {
LoggedInfiniticConsumer(logger, config.transport.consumer)
}
private val producer by lazy {
LoggedInfiniticProducer(logger, config.transport.producer).apply {
config.name?.let { setSuggestedName(it) }
}
private val resources: InfiniticResources by lazy { config.transport.resources }

private val consumer: InfiniticConsumer by lazy { config.transport.consumer }

private val producer: InfiniticProducer by lazy {
config.transport.producer.apply { config.name?.let { setSuggestedName(it) } }
}

private val shutdownGracePeriodSeconds = config.transport.shutdownGracePeriodSeconds
Expand All @@ -82,7 +78,7 @@ class InfiniticClient(
// Scope used to asynchronously send message, and also to consumes messages
internal val clientScope = CoroutineScope(Dispatchers.IO)

private val dispatcher by lazy { ClientDispatcher(clientScope, consumer, producer, logger) }
private val dispatcher by lazy { ClientDispatcher(clientScope, consumer, producer) }

override suspend fun getName() = producer.getName()

Expand Down Expand Up @@ -202,20 +198,21 @@ class InfiniticClient(


/** get ids of a stub, associated to a specific tag */
override fun <T : Any> getIds(stub: T): Set<String> =
when (val handler = getProxyHandler(stub)) {
is ExistingWorkflowProxyHandler -> when (handler.requestBy) {
is RequestByWorkflowTag -> dispatcher.getWorkflowIdsByTag(
handler.workflowName,
(handler.requestBy as RequestByWorkflowTag).workflowTag,
)

is RequestByWorkflowId -> throw InvalidIdTagSelectionException("$stub")
}

else -> throw InvalidStubException("$stub")
override fun <T : Any> getIds(stub: T): Set<String> = runBlocking {
when (val handler = getProxyHandler(stub)) {
is ExistingWorkflowProxyHandler -> when (handler.requestBy) {
is RequestByWorkflowTag -> dispatcher.getWorkflowIdsByTag(
handler.workflowName,
(handler.requestBy as RequestByWorkflowTag).workflowTag,
)

is RequestByWorkflowId -> throw InvalidIdTagSelectionException("$stub")
}

else -> throw InvalidStubException("$stub")
}
}

override fun <R> startAsync(invoke: () -> R): CompletableFuture<Deferred<R>> {
val handler = ProxyHandler.async(invoke) ?: throw InvalidStubException()

Expand Down Expand Up @@ -270,7 +267,7 @@ class InfiniticClient(

companion object {

private val logger = KotlinLogging.logger {}
internal val logger = KotlinLogging.logger {}

/** Create InfiniticClient with config from resources directory */
@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DeferredChannel<R : SendChannel<*>> internal constructor(
thisShouldNotHappen()
}

override fun await(): R = channel
override suspend fun await(): R = channel

override val id: String
get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DeferredSend<R : Any?> internal constructor(
// in order to send asynchronously the message
// despite the synchronous syntax: workflow.channel
@Suppress("UNCHECKED_CAST")
override fun await(): R = Unit as R
override suspend fun await(): R = Unit as R

override val id: String = signalId.toString()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ExistingDeferredWorkflow<R> internal constructor(
// this method retries workflowTask (unique for a workflow instance)
override fun retryAsync() = dispatcher.retryWorkflowTaskAsync(workflowName, requestBy)

override fun await(): R = dispatcher.awaitExistingWorkflow(this, true)
override suspend fun await(): R = dispatcher.awaitExistingWorkflow(this, true)

override val id by lazy {
when (requestBy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class NewDeferredWorkflow<R> internal constructor(
override fun retryAsync() =
dispatcher.retryWorkflowTaskAsync(workflowName, RequestByWorkflowId(workflowId))

override fun await(): R = dispatcher.awaitNewWorkflow(this, true)
override suspend fun await(): R = dispatcher.awaitNewWorkflow(this, true)

override val id: String = workflowId.toString()

Expand Down
Loading

0 comments on commit 8b63e2f

Please sign in to comment.