Skip to content

Commit

Permalink
Process task in batch if a batch config is provided, even if no batch…
Browse files Browse the repository at this point in the history
… method is provided
  • Loading branch information
geomagilles committed Nov 4, 2024
1 parent dc6200f commit e17ba8d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ class TaskExecutor(
executeTasksMap.map { (serviceNameAndMethodName, executeTasks) ->
when (serviceNameAndMethodName.first.isWorkflowTask()) {
// for workflow tasks, we run them in parallel
true -> executeTasks.map { async { it.process() } }
true -> executeTasks.forEach { launch { it.process() } }
// for services, we use the batched method
false -> async { executeTasks.process() }
false -> {
val (_, serviceMethod) = executeTasks.first().getInstanceAndMethod()
when (serviceMethod.getBatchMethod()) {
// there is no batch method, we just proceed with the task one by one
null -> executeTasks.forEach { launch { it.process() } }
// process using the batch method.
else -> launch { executeTasks.process() }
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,21 @@ class InfiniticWorker(
}
}

val batchProcessorConfig = config.batch?.normalized(
"taskExecutor:" + config.serviceName,
config.concurrency,
)

consumerFactory.startAsync(
subscription = MainSubscription(ServiceExecutorTopic),
entity = config.serviceName,
batchReceivingConfig = config.batch,
concurrency = config.concurrency,
processor = processor,
beforeDlq = beforeDlq,
batchProcessorConfig = { msg -> taskExecutor.getBatchConfig(msg) },
batchProcessorConfig = { msg ->
taskExecutor.getBatchConfig(msg) ?: batchProcessorConfig
},
batchProcessor = batchProcessor,
)
}
Expand Down

0 comments on commit e17ba8d

Please sign in to comment.