diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/EventStoreClientWrapper.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/EventStoreClientWrapper.kt index 9fa8dbc..f36e8b6 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/EventStoreClientWrapper.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/EventStoreClientWrapper.kt @@ -8,41 +8,34 @@ class EventStoreClientWrapper(private val eventStoreDBClient: EventStoreDBClient suspend fun readStream( streamDescriptor: StreamDescriptor, readStreamOptions: ReadStreamOptions, - ): com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult { + ): ReadResult { val readResult = try { eventStoreDBClient .readStream(streamDescriptor.streamName, readStreamOptions) .await() } catch (exception: Exception) { - return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult - .Failure(exception) + return ReadResult.Failure(exception) } - return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult.Success( - readResult.events, - readResult.lastStreamPosition - ) + return ReadResult.Success(readResult.events, readResult.lastStreamPosition) } suspend fun appendToStream( streamDescriptor: StreamDescriptor, options: AppendToStreamOptions, events: List, - ): com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult { + ): WriteResult { val writeResult = try { eventStoreDBClient .appendToStream(streamDescriptor.streamName, options, *events.toTypedArray()) .await() } catch (exception: Exception) { - return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult - .Failure(exception) + return WriteResult.Failure(exception) } - return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult.Success( - writeResult.logPosition.commitUnsigned - ) + return WriteResult.Success(writeResult.logPosition.commitUnsigned) } fun subscribeToStream( @@ -77,18 +70,14 @@ class EventStoreClientWrapper(private val eventStoreDBClient: EventStoreDBClient } sealed interface ReadResult { - data class Success(val events: List, val streamPosition: Long) : - com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult + data class Success(val events: List, val streamPosition: Long) : ReadResult - data class Failure(val exception: Exception) : - com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult + data class Failure(val exception: Exception) : ReadResult } sealed interface WriteResult { - data class Success(val streamPosition: Long) : - com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult + data class Success(val streamPosition: Long) : WriteResult - data class Failure(val exception: Exception) : - com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult + data class Failure(val exception: Exception) : WriteResult } } diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/aggregates/AggregatesRepository.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/aggregates/AggregateRepository.kt similarity index 56% rename from eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/aggregates/AggregatesRepository.kt rename to eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/aggregates/AggregateRepository.kt index 3201864..26c22a4 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/aggregates/AggregatesRepository.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/aggregates/AggregateRepository.kt @@ -1,11 +1,11 @@ package com.github.connorwyatt.common.eventstore.aggregates -import com.github.connorwyatt.common.eventstore.events.EventsRepository +import com.github.connorwyatt.common.eventstore.events.EventRepository import com.github.connorwyatt.common.eventstore.streams.StreamDescriptor import kotlin.reflect.KClass -class AggregatesRepository( - private val eventsRepository: EventsRepository, +class AggregateRepository( + private val eventRepository: EventRepository, private val aggregateMap: AggregateMap ) { suspend fun load(clazz: KClass, id: String): TAggregate { @@ -14,7 +14,7 @@ class AggregatesRepository( val aggregate = constructor.invoke(id) - val events = eventsRepository.readStream(streamDescriptor) + val events = eventRepository.readStream(streamDescriptor) aggregate.applyEvents(events) @@ -34,10 +34,31 @@ class AggregatesRepository( val streamDescriptor = StreamDescriptor.Origin(category, aggregate.id) - eventsRepository.appendToStream( + eventRepository.appendToStream( streamDescriptor, aggregate.unsavedEvents, aggregate.latestSavedEventVersion() ) } + + suspend fun usingAggregate( + clazz: KClass, + id: String, + block: suspend TAggregate.() -> TReturn, + shouldSave: Boolean = false + ) { + val aggregate = load(clazz, id) + + block.invoke(aggregate) + + if (shouldSave) save(aggregate) + } + + suspend inline fun usingAggregate( + id: String, + noinline block: suspend TAggregate.() -> TReturn, + shouldSave: Boolean = false + ) { + return usingAggregate(TAggregate::class, id, block, shouldSave) + } } diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/eventhandlers/EventStoreSubscriptionsManager.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/eventhandlers/EventStoreSubscriptionManager.kt similarity index 94% rename from eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/eventhandlers/EventStoreSubscriptionsManager.kt rename to eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/eventhandlers/EventStoreSubscriptionManager.kt index 30a92ba..45c62cf 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/eventhandlers/EventStoreSubscriptionsManager.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/eventhandlers/EventStoreSubscriptionManager.kt @@ -10,9 +10,8 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -class EventStoreSubscriptionsManager( - private val eventStoreClientWrapper: - com.github.connorwyatt.common.eventstore.EventStoreClientWrapper, +class EventStoreSubscriptionManager( + private val eventStoreClientWrapper: EventStoreClientWrapper, private val eventHandlers: Set, private val eventHandlerMap: EventHandlerMap, private val resolvedEventMapper: ResolvedEventMapper diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventsRepository.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventRepository.kt similarity index 92% rename from eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventsRepository.kt rename to eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventRepository.kt index 0449b92..c79e67e 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventsRepository.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventRepository.kt @@ -2,7 +2,7 @@ package com.github.connorwyatt.common.eventstore.events import com.github.connorwyatt.common.eventstore.streams.StreamDescriptor -interface EventsRepository { +interface EventRepository { suspend fun readStream(streamDescriptor: StreamDescriptor): List> suspend fun appendToStream( diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventStoreEventsRepository.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventStoreEventRepository.kt similarity index 78% rename from eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventStoreEventsRepository.kt rename to eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventStoreEventRepository.kt index 34c5a0a..b964f53 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventStoreEventsRepository.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/EventStoreEventRepository.kt @@ -7,20 +7,19 @@ import com.eventstore.dbclient.ReadStreamOptions import com.github.connorwyatt.common.eventstore.EventStoreClientWrapper import com.github.connorwyatt.common.eventstore.streams.StreamDescriptor -class EventStoreEventsRepository( - private val eventStoreClient: com.github.connorwyatt.common.eventstore.EventStoreClientWrapper, +class EventStoreEventRepository( + private val eventStoreClient: EventStoreClientWrapper, private val eventMap: EventMap, private val resolvedEventMapper: ResolvedEventMapper -) : EventsRepository { +) : EventRepository { override suspend fun readStream( streamDescriptor: StreamDescriptor ): List> { val result = eventStoreClient.readStream(streamDescriptor, readStreamOptions) return when (result) { - is com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult.Failure -> - emptyList() - is com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult.Success -> + is EventStoreClientWrapper.ReadResult.Failure -> emptyList() + is EventStoreClientWrapper.ReadResult.Success -> result.events.map(resolvedEventMapper::map) } } @@ -48,12 +47,7 @@ class EventStoreEventsRepository( val result = eventStoreClient.appendToStream(streamDescriptor, options, eventDataList) - if ( - result - is - com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult.Failure - ) - throw result.exception + if (result is EventStoreClientWrapper.WriteResult.Failure) throw result.exception } companion object { diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/InMemoryEventsRepository.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/InMemoryEventRepository.kt similarity index 99% rename from eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/InMemoryEventsRepository.kt rename to eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/InMemoryEventRepository.kt index 1ba559b..d6ef560 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/InMemoryEventsRepository.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/events/InMemoryEventRepository.kt @@ -10,12 +10,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.time.withTimeout -class InMemoryEventsRepository( +class InMemoryEventRepository( private val clock: Clock, private val eventMap: EventMap, private val eventHandlerMap: EventHandlerMap, private val eventHandlers: Set, -) : EventsRepository { +) : EventRepository { private var streams = emptyMap>>() private val streamUpdateMutex = Mutex() private val eventPropagationCoroutineScope = CoroutineScope(Dispatchers.Default) diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/kodein/EventStoreDependenciesModule.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/kodein/EventStoreDependenciesModule.kt index 2924bd1..02407aa 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/kodein/EventStoreDependenciesModule.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/kodein/EventStoreDependenciesModule.kt @@ -6,28 +6,34 @@ import com.github.connorwyatt.common.eventstore.EventStoreClientWrapper import com.github.connorwyatt.common.eventstore.aggregates.Aggregate import com.github.connorwyatt.common.eventstore.aggregates.AggregateMap import com.github.connorwyatt.common.eventstore.aggregates.AggregateMapDefinition -import com.github.connorwyatt.common.eventstore.aggregates.AggregatesRepository +import com.github.connorwyatt.common.eventstore.aggregates.AggregateRepository import com.github.connorwyatt.common.eventstore.configuration.EventStoreConfiguration import com.github.connorwyatt.common.eventstore.eventhandlers.EventHandler import com.github.connorwyatt.common.eventstore.eventhandlers.EventHandlerDefinition import com.github.connorwyatt.common.eventstore.eventhandlers.EventHandlerMap -import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionsManager +import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionManager import com.github.connorwyatt.common.eventstore.events.EventMap import com.github.connorwyatt.common.eventstore.events.EventMapDefinition -import com.github.connorwyatt.common.eventstore.events.EventStoreEventsRepository -import com.github.connorwyatt.common.eventstore.events.EventsRepository -import com.github.connorwyatt.common.eventstore.events.InMemoryEventsRepository +import com.github.connorwyatt.common.eventstore.events.EventRepository +import com.github.connorwyatt.common.eventstore.events.EventStoreEventRepository +import com.github.connorwyatt.common.eventstore.events.InMemoryEventRepository import com.github.connorwyatt.common.eventstore.events.ResolvedEventMapper -import org.kodein.di.* +import org.kodein.di.DI +import org.kodein.di.bindProvider +import org.kodein.di.bindProviderOf +import org.kodein.di.bindSet +import org.kodein.di.bindSingleton +import org.kodein.di.bindSingletonOf +import org.kodein.di.new fun eventStoreDependenciesModule(eventStoreConfiguration: EventStoreConfiguration): DI.Module = DI.Module(name = ::eventStoreDependenciesModule.name) { - bindSingletonOf(::AggregatesRepository) + bindSingletonOf(::AggregateRepository) bindSingletonOf(::AggregateMap) bindSet>() if (!eventStoreConfiguration.useInMemoryEventStore) { - bindProvider { new(::EventStoreEventsRepository) } + bindProvider { new(::EventStoreEventRepository) } bindSingleton { val settings = EventStoreDBConnectionString.parseOrThrow( @@ -38,10 +44,10 @@ fun eventStoreDependenciesModule(eventStoreConfiguration: EventStoreConfiguratio EventStoreDBClient.create(settings) } bindProviderOf(::EventStoreClientWrapper) - bindSingletonOf(::EventStoreSubscriptionsManager) + bindSingletonOf(::EventStoreSubscriptionManager) bindProviderOf(::ResolvedEventMapper) } else { - bindSingleton { new(::InMemoryEventsRepository) } + bindSingleton { new(::InMemoryEventRepository) } } bindSingletonOf(::EventMap) diff --git a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/ktor/ApplicationExt.kt b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/ktor/ApplicationExt.kt index 485b1ec..4a4f156 100644 --- a/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/ktor/ApplicationExt.kt +++ b/eventstore/src/main/kotlin/com/github/connorwyatt/common/eventstore/ktor/ApplicationExt.kt @@ -1,21 +1,21 @@ package com.github.connorwyatt.common.eventstore.ktor import com.github.connorwyatt.common.eventstore.configuration.EventStoreConfiguration -import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionsManager -import com.github.connorwyatt.common.eventstore.events.EventsRepository -import com.github.connorwyatt.common.eventstore.events.InMemoryEventsRepository +import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionManager +import com.github.connorwyatt.common.eventstore.events.EventRepository +import com.github.connorwyatt.common.eventstore.events.InMemoryEventRepository import io.ktor.server.application.* import org.kodein.di.instance import org.kodein.di.ktor.closestDI fun Application.configureEventStore(eventStoreConfiguration: EventStoreConfiguration) { if (!eventStoreConfiguration.useInMemoryEventStore) { - val eventStoreSubscriptionsManager by closestDI().instance() + val eventStoreSubscriptionManager by closestDI().instance() - eventStoreSubscriptionsManager.start() + eventStoreSubscriptionManager.start() } - val eventsRepository by closestDI().instance() + val eventRepository by closestDI().instance() - (eventsRepository as? InMemoryEventsRepository)?.run { startEventPropagation() } + (eventRepository as? InMemoryEventRepository)?.run { startEventPropagation() } } diff --git a/http/build.gradle.kts b/http/build.gradle.kts index 39a704d..35a359e 100644 --- a/http/build.gradle.kts +++ b/http/build.gradle.kts @@ -5,5 +5,4 @@ dependencies { implementation(libraries.kodein.di) implementation(libraries.kotlinx.serialization.json) - implementation(libraries.ktor.client.cio) } diff --git a/http/src/main/kotlin/com/github/connorwyatt/common/http/HttpDependenciesModule.kt b/http/src/main/kotlin/com/github/connorwyatt/common/http/HttpDependenciesModule.kt index a13f85f..cb74afd 100644 --- a/http/src/main/kotlin/com/github/connorwyatt/common/http/HttpDependenciesModule.kt +++ b/http/src/main/kotlin/com/github/connorwyatt/common/http/HttpDependenciesModule.kt @@ -1,10 +1,10 @@ package com.github.connorwyatt.common.http import io.ktor.client.* -import io.ktor.client.engine.cio.* -import org.kodein.di.* +import org.kodein.di.DI +import org.kodein.di.bindProvider const val DEFAULT_JSON_HTTP_CLIENT_TAG = "DEFAULT_JSON_HTTP_CLIENT" val httpDependenciesModule by - DI.Module { bindProvider(tag = DEFAULT_JSON_HTTP_CLIENT_TAG) { HttpClient(CIO) } } + DI.Module { bindProvider(tag = DEFAULT_JSON_HTTP_CLIENT_TAG) { HttpClient() } } diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandEnvelope.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandEnvelope.kt index 598e6a6..5825635 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandEnvelope.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandEnvelope.kt @@ -1,3 +1,3 @@ package com.github.connorwyatt.common.rabbitmq -data class CommandEnvelope(val command: com.github.connorwyatt.common.rabbitmq.Command) +data class CommandEnvelope(val command: Command) diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMap.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMap.kt index 500916f..a9700b7 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMap.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMap.kt @@ -2,14 +2,12 @@ package com.github.connorwyatt.common.rabbitmq import kotlin.reflect.KClass -internal class CommandMap( - private val definitions: Set -) { +internal class CommandMap(private val definitions: Set) { init { checkForDuplicates() } - fun typeFor(clazz: KClass) = + fun typeFor(clazz: KClass) = definitions.singleOrNull { it.clazz == clazz }?.type ?: throw Exception("Could not find Command type for class (${clazz.simpleName}).") diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMapDefinition.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMapDefinition.kt index f91e4f9..542f4c4 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMapDefinition.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/CommandMapDefinition.kt @@ -2,7 +2,4 @@ package com.github.connorwyatt.common.rabbitmq import kotlin.reflect.KClass -internal class CommandMapDefinition( - val type: String, - val clazz: KClass -) +internal class CommandMapDefinition(val type: String, val clazz: KClass) diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/CommandBus.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/CommandBus.kt index c7edeb0..9b33835 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/CommandBus.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/CommandBus.kt @@ -3,7 +3,7 @@ package com.github.connorwyatt.common.rabbitmq.bus import com.github.connorwyatt.common.rabbitmq.CommandEnvelope interface CommandBus { - suspend fun send(commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope) + suspend fun send(commandEnvelope: CommandEnvelope) - suspend fun send(commandEnvelopes: List) + suspend fun send(commandEnvelopes: List) } diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/InMemoryCommandBus.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/InMemoryCommandBus.kt index c4df23c..c144631 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/InMemoryCommandBus.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/InMemoryCommandBus.kt @@ -14,18 +14,13 @@ import kotlinx.coroutines.time.withTimeout class InMemoryCommandBus internal constructor(private val commandHandlerRouter: CommandHandlerRouter) : CommandBus { private val commandPropagationCoroutineScope = CoroutineScope(Dispatchers.Default) - private val commandPropagationChannel = - Channel() + private val commandPropagationChannel = Channel() - override suspend fun send( - commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope - ) { + override suspend fun send(commandEnvelope: CommandEnvelope) { send(listOf(commandEnvelope)) } - override suspend fun send( - commandEnvelopes: List - ) { + override suspend fun send(commandEnvelopes: List) { commandEnvelopes.forEach { commandEnvelope -> enqueueCommandForPropagation(commandEnvelope) } @@ -48,17 +43,13 @@ internal constructor(private val commandHandlerRouter: CommandHandlerRouter) : C } } - private suspend fun enqueueCommandForPropagation( - commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope - ) { + private suspend fun enqueueCommandForPropagation(commandEnvelope: CommandEnvelope) { commandPropagationCoroutineScope.launch { commandPropagationChannel.send(commandEnvelope.command) } } - private suspend fun propagateCommandToHandler( - command: com.github.connorwyatt.common.rabbitmq.Command - ) { + private suspend fun propagateCommandToHandler(command: Command) { commandHandlerRouter.handle(command) } } diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/RabbitMQCommandBus.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/RabbitMQCommandBus.kt index 6cfd92d..79d5d08 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/RabbitMQCommandBus.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/bus/RabbitMQCommandBus.kt @@ -8,32 +8,27 @@ import com.github.connorwyatt.common.rabbitmq.routing.RoutingKeyUtilities import com.rabbitmq.client.AMQP import com.rabbitmq.client.Connection import kotlin.reflect.KClass -import kotlinx.serialization.* -import kotlinx.serialization.json.* +import kotlinx.serialization.InternalSerializationApi +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer internal class RabbitMQCommandBus( private val connection: Connection, private val exchangeName: String, - private val commandMap: com.github.connorwyatt.common.rabbitmq.CommandMap, + private val commandMap: CommandMap, private val commandRoutingRules: CommandRoutingRules ) : CommandBus { - override suspend fun send( - commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope - ) { + override suspend fun send(commandEnvelope: CommandEnvelope) { send(listOf(commandEnvelope)) } - override suspend fun send( - commandEnvelopes: List - ) { + override suspend fun send(commandEnvelopes: List) { connection.createChannel().use { channel -> try { channel.txSelect() commandEnvelopes.forEach { commandEnvelope -> // TODO: Fix this. - val commandClass = - commandEnvelope.command::class - as KClass + val commandClass = commandEnvelope.command::class as KClass @OptIn(InternalSerializationApi::class) val serializer = commandClass.serializer() diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandler.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandler.kt index 17d6f79..b97c27b 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandler.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandler.kt @@ -4,35 +4,29 @@ import com.github.connorwyatt.common.rabbitmq.Command import kotlin.reflect.KClass abstract class CommandHandler { - private var handlers = - mapOf< - KClass, - suspend (com.github.connorwyatt.common.rabbitmq.Command) -> Unit - >() + private var handlers = mapOf, suspend (Command) -> Unit>() - protected fun handle( + protected fun handle( commandClass: KClass, handler: suspend (TCommand) -> Unit ) { @Suppress("UNCHECKED_CAST") handlers = handlers.plus( - commandClass as KClass to - handler as suspend (com.github.connorwyatt.common.rabbitmq.Command) -> Unit + commandClass as KClass to handler as suspend (Command) -> Unit ) } - protected inline fun handle( + protected inline fun handle( noinline handler: suspend (TCommand) -> Unit ) { handle(TCommand::class, handler) } - internal suspend fun handleCommand( - command: com.github.connorwyatt.common.rabbitmq.Command, - ) = getHandlerOrThrow(command).invoke(command) + internal suspend fun handleCommand(command: Command) = + getHandlerOrThrow(command).invoke(command) - private fun getHandlerOrThrow(command: com.github.connorwyatt.common.rabbitmq.Command) = + private fun getHandlerOrThrow(command: Command) = (handlers[command::class] ?: throw Exception( "CommandHandler has no handler for command ${command::class.simpleName}" diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerDefinition.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerDefinition.kt index 3005439..8beebfd 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerDefinition.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerDefinition.kt @@ -2,9 +2,9 @@ package com.github.connorwyatt.common.rabbitmq.commandhandlers import com.github.connorwyatt.common.rabbitmq.Command import kotlin.reflect.KClass -import org.kodein.di.* +import org.kodein.di.DirectDI internal class CommandHandlerDefinition( - val clazz: KClass, + val clazz: KClass, val creator: DirectDI.() -> CommandHandler ) diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerMap.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerMap.kt index 46a323f..2e29660 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerMap.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerMap.kt @@ -8,7 +8,7 @@ internal class CommandHandlerMap(private val definitions: Set) = + internal fun creatorFor(clazz: KClass) = definitions.singleOrNull { it.clazz == clazz }?.creator ?: throw Exception( "Could not find CommandHandler creator for class (${clazz.simpleName})." diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerRouter.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerRouter.kt index 3cfe6c1..a210775 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerRouter.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/CommandHandlerRouter.kt @@ -3,11 +3,8 @@ package com.github.connorwyatt.common.rabbitmq.commandhandlers import com.github.connorwyatt.common.rabbitmq.Command import kotlin.reflect.KClass -internal class CommandHandlerRouter( - private val factory: - (KClass) -> CommandHandler -) { - internal suspend fun handle(command: com.github.connorwyatt.common.rabbitmq.Command) { +internal class CommandHandlerRouter(private val factory: (KClass) -> CommandHandler) { + internal suspend fun handle(command: Command) { val commandClass = command::class val commandHandler = factory(commandClass) commandHandler.handleCommand(command) diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/RabbitMQSubscriptionsManager.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/RabbitMQSubscriptionManager.kt similarity index 86% rename from rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/RabbitMQSubscriptionsManager.kt rename to rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/RabbitMQSubscriptionManager.kt index 478e09b..e21138f 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/RabbitMQSubscriptionsManager.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/commandhandlers/RabbitMQSubscriptionManager.kt @@ -11,14 +11,15 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.serialization.* -import kotlinx.serialization.json.* +import kotlinx.serialization.InternalSerializationApi +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer -internal class RabbitMQSubscriptionsManager( +internal class RabbitMQSubscriptionManager( private val exchangeName: String, private val connection: Connection, private val commandQueueList: CommandQueueList, - private val commandMap: com.github.connorwyatt.common.rabbitmq.CommandMap, + private val commandMap: CommandMap, private val commandHandlerRouter: CommandHandlerRouter ) { private val coroutineScope = CoroutineScope(Dispatchers.Default) @@ -52,9 +53,7 @@ internal class RabbitMQSubscriptionsManager( } } - private fun deserializeCommand( - message: Delivery - ): com.github.connorwyatt.common.rabbitmq.Command { + private fun deserializeCommand(message: Delivery): Command { @OptIn(InternalSerializationApi::class) val serializer = commandMap.classFor(message.properties.type).serializer() return Json.decodeFromString(serializer, message.body.decodeToString()) diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/DIBuilderExt.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/DIBuilderExt.kt index f524866..5a83f38 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/DIBuilderExt.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/DIBuilderExt.kt @@ -7,25 +7,25 @@ import com.github.connorwyatt.common.rabbitmq.commandhandlers.CommandHandlerDefi import com.github.connorwyatt.common.rabbitmq.queues.CommandQueueDefinition import com.github.connorwyatt.common.rabbitmq.routing.CommandRoutingRulesBuilder import kotlin.reflect.KClass -import org.kodein.di.* +import org.kodein.di.DI +import org.kodein.di.DirectDI +import org.kodein.di.bindSingleton +import org.kodein.di.inBindSet +import org.kodein.di.singleton fun DI.Builder.bindCommandQueueDefinition(queueName: String) { inBindSet { add { singleton { CommandQueueDefinition(queueName) } } } } -fun DI.Builder.bindCommandDefinition( - type: String, - clazz: KClass -) { +fun DI.Builder.bindCommandDefinition(type: String, clazz: KClass) { inBindSet { add { singleton { CommandMapDefinition(type, clazz) } } } } -inline fun DI.Builder - .bindCommandDefinition(type: String) { +inline fun DI.Builder.bindCommandDefinition(type: String) { bindCommandDefinition(type, TCommand::class) } -fun DI.Builder.bindCommandHandler( +fun DI.Builder.bindCommandHandler( constructor: DirectDI.() -> CommandHandler, clazz: KClass ) { @@ -34,8 +34,9 @@ fun DI.Builder.bindC } } -inline fun DI.Builder - .bindCommandHandler(noinline constructor: DirectDI.() -> CommandHandler) { +inline fun DI.Builder.bindCommandHandler( + noinline constructor: DirectDI.() -> CommandHandler +) { bindCommandHandler(constructor, TCommand::class) } diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/RabbitMQDependenciesModule.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/RabbitMQDependenciesModule.kt index 7e59952..af8d101 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/RabbitMQDependenciesModule.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/kodein/RabbitMQDependenciesModule.kt @@ -8,7 +8,7 @@ import com.github.connorwyatt.common.rabbitmq.bus.RabbitMQCommandBus import com.github.connorwyatt.common.rabbitmq.commandhandlers.CommandHandlerDefinition import com.github.connorwyatt.common.rabbitmq.commandhandlers.CommandHandlerMap import com.github.connorwyatt.common.rabbitmq.commandhandlers.CommandHandlerRouter -import com.github.connorwyatt.common.rabbitmq.commandhandlers.RabbitMQSubscriptionsManager +import com.github.connorwyatt.common.rabbitmq.commandhandlers.RabbitMQSubscriptionManager import com.github.connorwyatt.common.rabbitmq.configuration.RabbitMQConfiguration import com.github.connorwyatt.common.rabbitmq.queues.CommandQueueCreator import com.github.connorwyatt.common.rabbitmq.queues.CommandQueueDefinition @@ -41,7 +41,7 @@ fun rabbitMQDependenciesModule(rabbitMQConfiguration: RabbitMQConfiguration): DI } bindSingleton { - RabbitMQSubscriptionsManager( + RabbitMQSubscriptionManager( rabbitMQConfiguration.exchangeName, instance(), instance(), diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/ktor/ApplicationExt.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/ktor/ApplicationExt.kt index 13168b3..18021c6 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/ktor/ApplicationExt.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/ktor/ApplicationExt.kt @@ -2,7 +2,7 @@ package com.github.connorwyatt.common.rabbitmq.ktor import com.github.connorwyatt.common.rabbitmq.bus.CommandBus import com.github.connorwyatt.common.rabbitmq.bus.InMemoryCommandBus -import com.github.connorwyatt.common.rabbitmq.commandhandlers.RabbitMQSubscriptionsManager +import com.github.connorwyatt.common.rabbitmq.commandhandlers.RabbitMQSubscriptionManager import com.github.connorwyatt.common.rabbitmq.configuration.RabbitMQConfiguration import com.github.connorwyatt.common.rabbitmq.queues.CommandQueueCreator import io.ktor.server.application.* @@ -17,8 +17,8 @@ fun Application.configureRabbitMQ(rabbitMQConfiguration: RabbitMQConfiguration) commandQueueCreator.createQueues() if (!rabbitMQConfiguration.useInMemoryRabbitMQ) { - val rabbitMQSubscriptionsManager by closestDI().instance() - rabbitMQSubscriptionsManager.start() + val rabbitMQSubscriptionManager by closestDI().instance() + rabbitMQSubscriptionManager.start() } val commandBus by closestDI().instance() diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRules.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRules.kt index 4fcc2ec..bfd0d31 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRules.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRules.kt @@ -5,8 +5,7 @@ import kotlin.reflect.KClass internal data class CommandRoutingRules( private val defaultQueueName: String, - private val rules: Map, String> + private val rules: Map, String> ) { - fun queueFor(clazz: KClass): String = - rules[clazz] ?: defaultQueueName + fun queueFor(clazz: KClass): String = rules[clazz] ?: defaultQueueName } diff --git a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRulesBuilder.kt b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRulesBuilder.kt index 2bbb518..e6dc7d0 100644 --- a/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRulesBuilder.kt +++ b/rabbitmq/src/main/kotlin/com/github/connorwyatt/common/rabbitmq/routing/CommandRoutingRulesBuilder.kt @@ -5,8 +5,7 @@ import kotlin.reflect.KClass class CommandRoutingRulesBuilder internal constructor() { private var defaultQueueName: String? = null - private var rules = - emptyMap, String>() + private var rules = emptyMap, String>() fun defaultQueue(queueName: String) { if (defaultQueueName != null) { @@ -16,10 +15,7 @@ class CommandRoutingRulesBuilder internal constructor() { defaultQueueName = queueName } - fun queueFor( - clazz: KClass, - queueName: String - ) { + fun queueFor(clazz: KClass, queueName: String) { if (rules.containsKey(clazz)) { throw Exception("Already registered queue for ${clazz.simpleName}.") } @@ -27,9 +23,7 @@ class CommandRoutingRulesBuilder internal constructor() { rules = rules.plus(clazz to queueName) } - inline fun queueFor( - queueName: String - ) { + inline fun queueFor(queueName: String) { queueFor(TCommand::class, queueName) } diff --git a/settings.gradle.kts b/settings.gradle.kts index eed9680..b2b114c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -74,7 +74,6 @@ dependencyResolutionManagement { library("kotlinx-serialization-json", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version( kotlinxSerializationVersion ) - library("ktor-client-cio", "io.ktor", "ktor-client-cio").version(ktorVersion) library("ktor-client-core", "io.ktor", "ktor-client-core").version(ktorVersion) library("ktor-serialization-kotlinx-json", "io.ktor", "ktor-serialization-kotlinx-json").version( ktorVersion