Skip to content

Commit

Permalink
NEW FEATURE: Task Batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Sep 29, 2024
1 parent aa2256b commit bc16c33
Show file tree
Hide file tree
Showing 18 changed files with 366 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ interface InfiniticConsumer {
entity: String,
handler: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
concurrency: Int
concurrency: Int,
getBatchConfig: (suspend (S) -> Result<MessageBatchConfig?>)? = null,
handlerBatch: (suspend (List<S>) -> Unit)? = null
): Job

/**
Expand All @@ -70,6 +72,16 @@ interface InfiniticConsumer {
entity: String,
handler: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
concurrency: Int
): Unit = startAsync(subscription, entity, handler, beforeDlq, concurrency).join()
concurrency: Int,
getBatchConfig: (suspend (S) -> Result<MessageBatchConfig?>)? = null,
handlerBatch: (suspend (List<S>) -> Unit)? = null
): Unit = startAsync(
subscription,
entity,
handler,
beforeDlq,
concurrency,
getBatchConfig,
handlerBatch,
).join()
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ abstract class AbstractConsumerProcessor<S : TransportMessage, D : Any>(
/**
* Processes a deserialized message.
*/
protected suspend fun process(message: DeserializedMessage<S, D>) {
protected suspend fun processSingle(message: DeserializedMessage<S, D>) {
try {
process(message.deserialized, message.transportMessage.publishTime)
acknowledge(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ConsumerSharedProcessor<S : TransportMessage, D : Any>(
deserialize: suspend (S) -> D,
process: suspend (D, MillisInstant) -> Unit,
beforeNegativeAcknowledgement: (suspend (S, D?, Exception) -> Unit)?,
private val assessBatching: ((D) -> Result<MessageBatchConfig?>)? = null,
private val getBatchConfig: (suspend (D) -> Result<MessageBatchConfig?>)? = null,
private val processBatch: (suspend (List<D>) -> Unit)? = null
) : AbstractConsumerProcessor<S, D>(
consumer,
Expand All @@ -58,8 +58,8 @@ class ConsumerSharedProcessor<S : TransportMessage, D : Any>(
beforeNegativeAcknowledgement,
) {
init {
require((processBatch == null) == (assessBatching == null)) {
"assessBatching and processBatch should be defined together or not at all"
require((processBatch == null) == (getBatchConfig == null)) {
"${::getBatchConfig.name} and processBatch should be defined together or not at all"
}
}

Expand All @@ -82,7 +82,7 @@ class ConsumerSharedProcessor<S : TransportMessage, D : Any>(
.receiveAsFlow()
.deserialize(concurrency)
.collect { message ->
val batchConfigResult = assessBatching?.let { it(message.deserialized) }
val batchConfigResult = getBatchConfig?.let { it(message.deserialized) }
batchConfigResult
?.onFailure {
// assessBatching has the responsibility to tell
Expand Down Expand Up @@ -122,7 +122,7 @@ class ConsumerSharedProcessor<S : TransportMessage, D : Any>(
for (message in this) {
withContext(NonCancellable) {
when (message) {
is MessageSingle<S, D> -> process(message.deserialized)
is MessageSingle<S, D> -> processSingle(message.deserialized)
is MessageBatch<S, D> -> processBatch(message.deserialized)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ConsumerUniqueProcessor<S : TransportMessage, D : Any>(
.receiveAsFlow()
.collect { message ->
withContext(NonCancellable) {
tryDeserialize(message)?.let { process(it) }
tryDeserialize(message)?.let { processSingle(it) }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.github.oshai.kotlinlogging.KLogger
import io.infinitic.common.data.MillisInstant
import io.infinitic.common.messages.Message
import io.infinitic.common.transport.InfiniticConsumer
import io.infinitic.common.transport.MessageBatchConfig
import io.infinitic.common.transport.Subscription
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand All @@ -41,7 +42,9 @@ class LoggedInfiniticConsumer(
entity: String,
handler: suspend (S, MillisInstant) -> Unit,
beforeDlq: (suspend (S?, Exception) -> Unit)?,
concurrency: Int
concurrency: Int,
getBatchConfig: (suspend (S) -> Result<MessageBatchConfig?>)?,
handlerBatch: (suspend (List<S>) -> Unit)?
): Job {
val loggedHandler: suspend (S, MillisInstant) -> Unit = { message, instant ->
logger.debug { formatLog(message.id(), "Processing:", message) }
Expand All @@ -64,6 +67,8 @@ class LoggedInfiniticConsumer(
loggedHandler,
loggedBeforeDlq,
concurrency,
getBatchConfig,
handlerBatch,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ import java.lang.reflect.WildcardType
import kotlin.reflect.javaType
import kotlin.reflect.typeOf

/**
* Represents a pair of methods: a single method and its corresponding batch method.
*
* @property single The single method that processes a single item.
* @property batch The batch method that processes multiple items.
* @property constructor The constructor used to create instances for the batch method's parameter, if applicable.
*/
class BatchMethod(
val single: Method,
val batch: Method,
val constructor: Constructor<*>?,
)

/**
* Retrieves a map associating single method with their batch methods for the class.
*
Expand All @@ -43,16 +56,14 @@ import kotlin.reflect.typeOf
* @return a map where the key is the batch method and the value is the corresponding single method.
* @throws Exception if a corresponding single method is not found or multiple corresponding single methods are found.
*/
fun Class<*>.getBatchMethods(): Map<Method, Method> =
fun Class<*>.getBatchMethods(): List<BatchMethod> =
methods.filter {
// get all batched methods
it.findAnnotation(Batch::class.java) != null
}.associateByOrThrow { batchMethod ->
// for this batch method, find all single method with the same annotated name
methods.filter { singleMethod ->
(batchMethod.annotatedName == singleMethod.annotatedName) &&
(singleMethod.findAnnotation(Batch::class.java) == null) &&
batchMethod.isBatchedOf(singleMethod)
batchMethod.isBatchedOf(singleMethod)
}.also { candidates ->
// we should not have a batch method without an associated single method
if (candidates.isEmpty()) throw Exception(
Expand All @@ -64,8 +75,25 @@ fun Class<*>.getBatchMethods(): Map<Method, Method> =
"found Corresponding to @Batch method $name:${batchMethod.name}",
)
}.first()
}.map { methods ->
val singleMethod = methods.key
val batchMethod = methods.value
BatchMethod(
singleMethod, batchMethod,
batchMethod.parameters[0].parameterizedType.getComponentType()!!
.getConstructor(singleMethod.parameters.map { it.parameterizedType }),
)
}

fun BatchMethod.getArgs(args: List<List<Any?>>): List<Any?> = when (constructor) {
null -> args.map { it.first() }
else -> args.map { constructor.newInstance(*it.toTypedArray()) }
}

/**
* Associates the elements of the given [Iterable] by a key selected from each element or throws an
* [Exception] if any two elements would have the same key.
*/
private fun Iterable<Method>.associateByOrThrow(keySelector: (Method) -> Method): Map<Method, Method> {
val map = mutableMapOf<Method, Method>()
for (element in this) {
Expand All @@ -83,31 +111,32 @@ private fun Iterable<Method>.associateByOrThrow(keySelector: (Method) -> Method)
}

private fun Method.isBatchedOf(method: Method): Boolean =
hasBatchParametersOf(method) && hasBatchReturnValueOf(method)
(annotatedName == method.annotatedName) &&
(method.findAnnotation(Batch::class.java) == null) &&
hasBatchParameterTypesOf(method) &&
hasBatchReturnTypeOf(method)

// Comparing the parameters
private fun Method.hasBatchParametersOf(method: Method): Boolean {
private fun Method.hasBatchParameterTypesOf(method: Method): Boolean {
if (parameters.size != 1) throw Exception(
"A @Batch method must have exactly one parameter that is a collection or an array. " +
"The @Batch method ${declaringClass.name}::$name has ${parameters.size} parameters",
)
val parameter = parameters[0]
val type: Type = parameter.parameterizedType
val elementType = type.getComponentType()?.also { println("batch: elementType = ${it.typeName}") }
val type: Type = parameters[0].parameterizedType
val elementType = type.getComponentType()
?: throw Exception(
"A @Batch method must have exactly one parameter that is a collection or an array. " +
"A @Batch method must have exactly one parameter that is a List. " +
"But for the @Batch method ${declaringClass.name}::$name this type is $type",
)
val singleTypes = method.parameters.map { it.parameterizedType }
return elementType.isSameThan(singleTypes)
}

// Comparing the return value
private fun Method.hasBatchReturnValueOf(method: Method): Boolean {
private fun Method.hasBatchReturnTypeOf(method: Method): Boolean {
if (genericReturnType.isVoid()) return method.genericReturnType.isVoid()

val returnElementType = genericReturnType.getComponentType()
?.also { println("batch: returnElementType = ${it.typeName}") }
?: throw Exception(
"A @Batch method must have a return type that is a collection or an array. " +
"But for the @Batch method ${declaringClass.name}::$name this type is $genericReturnType",
Expand All @@ -120,10 +149,7 @@ private fun Type.isArray() = (this is Class<*> && isArray) || (this is GenericAr
// Is type a Collection? We exclude Set that does not preserve the elements
private fun Type.isList(): Boolean {
if (this !is ParameterizedType) return false
return when (val rawType = this.rawType) {
is Class<*> -> List::class.java.isAssignableFrom(rawType)
else -> false
}
return rawType == List::class.java
}

private fun Type.isObject(): Boolean = if (this is Class<*>) when {
Expand Down Expand Up @@ -178,20 +204,23 @@ private val primitiveToWrapperMap = mapOf<Type, Class<*>>(
private fun Type.isVoid() = (primitiveToWrapperMap[this] ?: this) == java.lang.Void::class.java

private fun Type.isSameThan(type: Type) =
((primitiveToWrapperMap[this] ?: this) == (primitiveToWrapperMap[type] ?: type)).also {
println("$this == $type -> $it")
}
((primitiveToWrapperMap[this] ?: this) == (primitiveToWrapperMap[type] ?: type))

private fun Type.isSameThan(types: List<Type>): Boolean {
return when {
isObject() -> (this as Class<*>).getConstructorWith(types) != null
else -> when (types.size) {
1 -> (this.isSameThan(types.first())).also { println("$this == ${types.first()} -> $it") }
1 -> (this.isSameThan(types.first()))
else -> false
}
}
}

private fun Type.getConstructor(types: List<Type>): Constructor<*>? = when (isObject()) {
true -> (this as Class<*>).getConstructorWith(types)
false -> null
}

private fun <S> Class<S>.getConstructorWith(types: List<Type>): Constructor<S>? {
val constructors = declaredConstructors.filter { constructor ->
(constructor.parameterTypes.size == types.size) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,22 @@ val Method.annotatedName: String
/**
* Get the batchMethod of a method (if any)
*/
internal val batchMethodCache = mutableMapOf<Method, Method?>()
internal val batchMethodCache = mutableMapOf<Method, BatchMethod?>()

private val batchMethodMutex = Mutex()

suspend fun Method.getBatchMethod(): Method? {
suspend fun Method.getBatchMethod(): BatchMethod? {
if (batchMethodCache.containsKey(this)) {
// Return immediately if the cache already contains the current method
return batchMethodCache[this]
}
// Lock access to the cache to avoid race conditions
return batchMethodMutex.withLock {
// Retrieve the map of batch methods for the declaring class
val batchMethodMap = declaringClass.getBatchMethods()
val batchMethodList = declaringClass.getBatchMethods()
// Update the cache with all methods of the declaring class
declaringClass.methods.forEach { method ->
batchMethodCache[method] = batchMethodMap[method]
batchMethodCache[method] = batchMethodList.firstOrNull { it.single == method }
}
// Return the corresponding value for the current method now in the cache
batchMethodCache[this]
Expand All @@ -216,7 +216,7 @@ suspend fun Method.getBatchConfig(): MessageBatchConfig? {
val batchMethod = getBatchMethod() ?: return null

// Find the @Batch annotation on this method
val batchAnnotation = batchMethod.findAnnotation(Batch::class.java) ?: thisShouldNotHappen()
val batchAnnotation = batchMethod.batch.findAnnotation(Batch::class.java) ?: thisShouldNotHappen()

// Create and return an instance of MessageBatchConfig from the annotation
return MessageBatchConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;

// 1 parameter - Batched method with Collection parameter
@SuppressWarnings("ALL")
class FooBatch1 {
public String bar(int p) {
return Integer.toString(p);
Expand Down
Loading

0 comments on commit bc16c33

Please sign in to comment.