Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate repository methods #9

Merged
merged 4 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,34 @@ class EventStoreClientWrapper(private val eventStoreDBClient: EventStoreDBClient
suspend fun readStream(
streamDescriptor: StreamDescriptor,
readStreamOptions: ReadStreamOptions,
): com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult {
): ReadResult {
val readResult =
try {
eventStoreDBClient
.readStream(streamDescriptor.streamName, readStreamOptions)
.await()
} catch (exception: Exception) {
return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult
.Failure(exception)
return ReadResult.Failure(exception)
}

return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult.Success(
readResult.events,
readResult.lastStreamPosition
)
return ReadResult.Success(readResult.events, readResult.lastStreamPosition)
}

suspend fun appendToStream(
streamDescriptor: StreamDescriptor,
options: AppendToStreamOptions,
events: List<EventData>,
): com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult {
): WriteResult {
val writeResult =
try {
eventStoreDBClient
.appendToStream(streamDescriptor.streamName, options, *events.toTypedArray())
.await()
} catch (exception: Exception) {
return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult
.Failure(exception)
return WriteResult.Failure(exception)
}

return com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult.Success(
writeResult.logPosition.commitUnsigned
)
return WriteResult.Success(writeResult.logPosition.commitUnsigned)
}

fun subscribeToStream(
Expand Down Expand Up @@ -77,18 +70,14 @@ class EventStoreClientWrapper(private val eventStoreDBClient: EventStoreDBClient
}

sealed interface ReadResult {
data class Success(val events: List<ResolvedEvent>, val streamPosition: Long) :
com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult
data class Success(val events: List<ResolvedEvent>, val streamPosition: Long) : ReadResult

data class Failure(val exception: Exception) :
com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult
data class Failure(val exception: Exception) : ReadResult
}

sealed interface WriteResult {
data class Success(val streamPosition: Long) :
com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult
data class Success(val streamPosition: Long) : WriteResult

data class Failure(val exception: Exception) :
com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult
data class Failure(val exception: Exception) : WriteResult
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.github.connorwyatt.common.eventstore.aggregates

import com.github.connorwyatt.common.eventstore.events.EventsRepository
import com.github.connorwyatt.common.eventstore.events.EventRepository
import com.github.connorwyatt.common.eventstore.streams.StreamDescriptor
import kotlin.reflect.KClass

class AggregatesRepository(
private val eventsRepository: EventsRepository,
class AggregateRepository(
private val eventRepository: EventRepository,
private val aggregateMap: AggregateMap
) {
suspend fun <TAggregate : Aggregate> load(clazz: KClass<TAggregate>, id: String): TAggregate {
Expand All @@ -14,7 +14,7 @@ class AggregatesRepository(

val aggregate = constructor.invoke(id)

val events = eventsRepository.readStream(streamDescriptor)
val events = eventRepository.readStream(streamDescriptor)

aggregate.applyEvents(events)

Expand All @@ -34,10 +34,31 @@ class AggregatesRepository(

val streamDescriptor = StreamDescriptor.Origin(category, aggregate.id)

eventsRepository.appendToStream(
eventRepository.appendToStream(
streamDescriptor,
aggregate.unsavedEvents,
aggregate.latestSavedEventVersion()
)
}

suspend fun <TAggregate : Aggregate, TReturn> usingAggregate(
clazz: KClass<TAggregate>,
id: String,
block: suspend TAggregate.() -> TReturn,
shouldSave: Boolean = false
) {
val aggregate = load(clazz, id)

block.invoke(aggregate)

if (shouldSave) save(aggregate)
}

suspend inline fun <reified TAggregate : Aggregate, TReturn> usingAggregate(
id: String,
noinline block: suspend TAggregate.() -> TReturn,
shouldSave: Boolean = false
) {
return usingAggregate(TAggregate::class, id, block, shouldSave)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class EventStoreSubscriptionsManager(
private val eventStoreClientWrapper:
com.github.connorwyatt.common.eventstore.EventStoreClientWrapper,
class EventStoreSubscriptionManager(
private val eventStoreClientWrapper: EventStoreClientWrapper,
private val eventHandlers: Set<EventHandler>,
private val eventHandlerMap: EventHandlerMap,
private val resolvedEventMapper: ResolvedEventMapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.connorwyatt.common.eventstore.events

import com.github.connorwyatt.common.eventstore.streams.StreamDescriptor

interface EventsRepository {
interface EventRepository {
suspend fun readStream(streamDescriptor: StreamDescriptor): List<EventEnvelope<out Event>>

suspend fun appendToStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ import com.eventstore.dbclient.ReadStreamOptions
import com.github.connorwyatt.common.eventstore.EventStoreClientWrapper
import com.github.connorwyatt.common.eventstore.streams.StreamDescriptor

class EventStoreEventsRepository(
private val eventStoreClient: com.github.connorwyatt.common.eventstore.EventStoreClientWrapper,
class EventStoreEventRepository(
private val eventStoreClient: EventStoreClientWrapper,
private val eventMap: EventMap,
private val resolvedEventMapper: ResolvedEventMapper
) : EventsRepository {
) : EventRepository {
override suspend fun readStream(
streamDescriptor: StreamDescriptor
): List<EventEnvelope<out Event>> {
val result = eventStoreClient.readStream(streamDescriptor, readStreamOptions)

return when (result) {
is com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult.Failure ->
emptyList()
is com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.ReadResult.Success ->
is EventStoreClientWrapper.ReadResult.Failure -> emptyList()
is EventStoreClientWrapper.ReadResult.Success ->
result.events.map(resolvedEventMapper::map)
}
}
Expand Down Expand Up @@ -48,12 +47,7 @@ class EventStoreEventsRepository(

val result = eventStoreClient.appendToStream(streamDescriptor, options, eventDataList)

if (
result
is
com.github.connorwyatt.common.eventstore.EventStoreClientWrapper.WriteResult.Failure
)
throw result.exception
if (result is EventStoreClientWrapper.WriteResult.Failure) throw result.exception
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.time.withTimeout

class InMemoryEventsRepository(
class InMemoryEventRepository(
private val clock: Clock,
private val eventMap: EventMap,
private val eventHandlerMap: EventHandlerMap,
private val eventHandlers: Set<EventHandler>,
) : EventsRepository {
) : EventRepository {
private var streams = emptyMap<StreamDescriptor, List<EventEnvelope<out Event>>>()
private val streamUpdateMutex = Mutex()
private val eventPropagationCoroutineScope = CoroutineScope(Dispatchers.Default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@ import com.github.connorwyatt.common.eventstore.EventStoreClientWrapper
import com.github.connorwyatt.common.eventstore.aggregates.Aggregate
import com.github.connorwyatt.common.eventstore.aggregates.AggregateMap
import com.github.connorwyatt.common.eventstore.aggregates.AggregateMapDefinition
import com.github.connorwyatt.common.eventstore.aggregates.AggregatesRepository
import com.github.connorwyatt.common.eventstore.aggregates.AggregateRepository
import com.github.connorwyatt.common.eventstore.configuration.EventStoreConfiguration
import com.github.connorwyatt.common.eventstore.eventhandlers.EventHandler
import com.github.connorwyatt.common.eventstore.eventhandlers.EventHandlerDefinition
import com.github.connorwyatt.common.eventstore.eventhandlers.EventHandlerMap
import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionsManager
import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionManager
import com.github.connorwyatt.common.eventstore.events.EventMap
import com.github.connorwyatt.common.eventstore.events.EventMapDefinition
import com.github.connorwyatt.common.eventstore.events.EventStoreEventsRepository
import com.github.connorwyatt.common.eventstore.events.EventsRepository
import com.github.connorwyatt.common.eventstore.events.InMemoryEventsRepository
import com.github.connorwyatt.common.eventstore.events.EventRepository
import com.github.connorwyatt.common.eventstore.events.EventStoreEventRepository
import com.github.connorwyatt.common.eventstore.events.InMemoryEventRepository
import com.github.connorwyatt.common.eventstore.events.ResolvedEventMapper
import org.kodein.di.*
import org.kodein.di.DI
import org.kodein.di.bindProvider
import org.kodein.di.bindProviderOf
import org.kodein.di.bindSet
import org.kodein.di.bindSingleton
import org.kodein.di.bindSingletonOf
import org.kodein.di.new

fun eventStoreDependenciesModule(eventStoreConfiguration: EventStoreConfiguration): DI.Module =
DI.Module(name = ::eventStoreDependenciesModule.name) {
bindSingletonOf(::AggregatesRepository)
bindSingletonOf(::AggregateRepository)
bindSingletonOf(::AggregateMap)
bindSet<AggregateMapDefinition<Aggregate>>()

if (!eventStoreConfiguration.useInMemoryEventStore) {
bindProvider<EventsRepository> { new(::EventStoreEventsRepository) }
bindProvider<EventRepository> { new(::EventStoreEventRepository) }
bindSingleton<EventStoreDBClient> {
val settings =
EventStoreDBConnectionString.parseOrThrow(
Expand All @@ -38,10 +44,10 @@ fun eventStoreDependenciesModule(eventStoreConfiguration: EventStoreConfiguratio
EventStoreDBClient.create(settings)
}
bindProviderOf(::EventStoreClientWrapper)
bindSingletonOf(::EventStoreSubscriptionsManager)
bindSingletonOf(::EventStoreSubscriptionManager)
bindProviderOf(::ResolvedEventMapper)
} else {
bindSingleton<EventsRepository> { new(::InMemoryEventsRepository) }
bindSingleton<EventRepository> { new(::InMemoryEventRepository) }
}

bindSingletonOf(::EventMap)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.github.connorwyatt.common.eventstore.ktor

import com.github.connorwyatt.common.eventstore.configuration.EventStoreConfiguration
import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionsManager
import com.github.connorwyatt.common.eventstore.events.EventsRepository
import com.github.connorwyatt.common.eventstore.events.InMemoryEventsRepository
import com.github.connorwyatt.common.eventstore.eventhandlers.EventStoreSubscriptionManager
import com.github.connorwyatt.common.eventstore.events.EventRepository
import com.github.connorwyatt.common.eventstore.events.InMemoryEventRepository
import io.ktor.server.application.*
import org.kodein.di.instance
import org.kodein.di.ktor.closestDI

fun Application.configureEventStore(eventStoreConfiguration: EventStoreConfiguration) {
if (!eventStoreConfiguration.useInMemoryEventStore) {
val eventStoreSubscriptionsManager by closestDI().instance<EventStoreSubscriptionsManager>()
val eventStoreSubscriptionManager by closestDI().instance<EventStoreSubscriptionManager>()

eventStoreSubscriptionsManager.start()
eventStoreSubscriptionManager.start()
}

val eventsRepository by closestDI().instance<EventsRepository>()
val eventRepository by closestDI().instance<EventRepository>()

(eventsRepository as? InMemoryEventsRepository)?.run { startEventPropagation() }
(eventRepository as? InMemoryEventRepository)?.run { startEventPropagation() }
}
1 change: 0 additions & 1 deletion http/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ dependencies {

implementation(libraries.kodein.di)
implementation(libraries.kotlinx.serialization.json)
implementation(libraries.ktor.client.cio)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.github.connorwyatt.common.http

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import org.kodein.di.*
import org.kodein.di.DI
import org.kodein.di.bindProvider

const val DEFAULT_JSON_HTTP_CLIENT_TAG = "DEFAULT_JSON_HTTP_CLIENT"

val httpDependenciesModule by
DI.Module { bindProvider(tag = DEFAULT_JSON_HTTP_CLIENT_TAG) { HttpClient(CIO) } }
DI.Module { bindProvider(tag = DEFAULT_JSON_HTTP_CLIENT_TAG) { HttpClient() } }
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.github.connorwyatt.common.rabbitmq

data class CommandEnvelope(val command: com.github.connorwyatt.common.rabbitmq.Command)
data class CommandEnvelope(val command: Command)
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package com.github.connorwyatt.common.rabbitmq

import kotlin.reflect.KClass

internal class CommandMap(
private val definitions: Set<com.github.connorwyatt.common.rabbitmq.CommandMapDefinition>
) {
internal class CommandMap(private val definitions: Set<CommandMapDefinition>) {
init {
checkForDuplicates()
}

fun typeFor(clazz: KClass<out com.github.connorwyatt.common.rabbitmq.Command>) =
fun typeFor(clazz: KClass<out Command>) =
definitions.singleOrNull { it.clazz == clazz }?.type
?: throw Exception("Could not find Command type for class (${clazz.simpleName}).")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,4 @@ package com.github.connorwyatt.common.rabbitmq

import kotlin.reflect.KClass

internal class CommandMapDefinition(
val type: String,
val clazz: KClass<out com.github.connorwyatt.common.rabbitmq.Command>
)
internal class CommandMapDefinition(val type: String, val clazz: KClass<out Command>)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.github.connorwyatt.common.rabbitmq.bus
import com.github.connorwyatt.common.rabbitmq.CommandEnvelope

interface CommandBus {
suspend fun send(commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope)
suspend fun send(commandEnvelope: CommandEnvelope)

suspend fun send(commandEnvelopes: List<com.github.connorwyatt.common.rabbitmq.CommandEnvelope>)
suspend fun send(commandEnvelopes: List<CommandEnvelope>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@ import kotlinx.coroutines.time.withTimeout
class InMemoryCommandBus
internal constructor(private val commandHandlerRouter: CommandHandlerRouter) : CommandBus {
private val commandPropagationCoroutineScope = CoroutineScope(Dispatchers.Default)
private val commandPropagationChannel =
Channel<com.github.connorwyatt.common.rabbitmq.Command>()
private val commandPropagationChannel = Channel<Command>()

override suspend fun send(
commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope
) {
override suspend fun send(commandEnvelope: CommandEnvelope) {
send(listOf(commandEnvelope))
}

override suspend fun send(
commandEnvelopes: List<com.github.connorwyatt.common.rabbitmq.CommandEnvelope>
) {
override suspend fun send(commandEnvelopes: List<CommandEnvelope>) {
commandEnvelopes.forEach { commandEnvelope ->
enqueueCommandForPropagation(commandEnvelope)
}
Expand All @@ -48,17 +43,13 @@ internal constructor(private val commandHandlerRouter: CommandHandlerRouter) : C
}
}

private suspend fun enqueueCommandForPropagation(
commandEnvelope: com.github.connorwyatt.common.rabbitmq.CommandEnvelope
) {
private suspend fun enqueueCommandForPropagation(commandEnvelope: CommandEnvelope) {
commandPropagationCoroutineScope.launch {
commandPropagationChannel.send(commandEnvelope.command)
}
}

private suspend fun propagateCommandToHandler(
command: com.github.connorwyatt.common.rabbitmq.Command
) {
private suspend fun propagateCommandToHandler(command: Command) {
commandHandlerRouter.handle(command)
}
}
Loading
Loading