diff --git a/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt b/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt index de36f985..1f4025b9 100644 --- a/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt +++ b/application-vanilla/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregateExtension.kt @@ -16,10 +16,11 @@ package com.fraktalio.fmodel.application +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.* /** - * Extension function - Handles the command message of type [C] + * Extension function - Handles the command message of type [C] by the snapshotting, event sourced aggregate. * * @param command Command message of type [C] * @return State of type [S] @@ -51,7 +52,7 @@ fun I.handle(command: C): Flow where I : StateComputation, in which [V] is the type of the Version (optimistic locking) @@ -90,3 +91,87 @@ suspend fun I.handleOptimistically(command: C): Flow> } emitAll(newEvents) } + + +/** + * Extension function - Handles the command message of type [C] by the snapshotting, orchestrating event sourced aggregate + * + * + * @param command Command message of type [C] + * @return State of type [S] + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +@ExperimentalCoroutinesApi +fun I.handle(command: C): Flow where I : StateOrchestratingComputation, + I : EventOrchestratingComputation, + I : StateRepository, + I : EventSnapshottingRepository = + flow { + // 1. Fetch the latest state snapshot or NULL + val latestSnapshotState = command.fetchState() + // 2. Fetch the latest events, since the latest state snapshot + val latestEvents = command.fetchEvents(latestSnapshotState).toList() + // 3. Compute the current state, based on the latest state snapshot and the latest events + val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e) } + // 4. Compute the new events, based on the latest events, latest snapshot state and the command, and save it + val newEvents = latestEvents.asFlow() + .computeNewEventsByOrchestrating(command, latestSnapshotState) { it.fetchEvents(latestSnapshotState) } + .save() + // 5. Compute the new state, based on the current state and the command and save it conditionally + with(currentState.computeNewState(command)) { + if (shouldCreateNewSnapshot(latestSnapshotState)) { + save() + } + } + emitAll(newEvents) + } + +/** + * Extension function - Handles the command message of type [C] by the snapshotting, locking, orchestrating event sourced aggregate, optimistically + * + * @param command Command message of type [C] + * @return State of type [Pair]<[S], [V]>, in which [V] is the type of the Version (optimistic locking) + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +@ExperimentalCoroutinesApi +suspend fun I.handleOptimistically(command: C): Flow> where I : StateOrchestratingComputation, + I : EventOrchestratingComputation, + I : StateLockingRepository, + I : EventSnapshottingLockingRepository = + flow { + // 1. Fetch the latest state snapshot or NULL + val (latestSnapshotState, latestSnapshotVersion) = command.fetchState() + // 2. Fetch the latest events, since the latest state snapshot + val latestEvents = command.fetchEvents(Pair(latestSnapshotState, latestSnapshotVersion)).toList() + // 3. Compute the current state, based on the latest state snapshot and the latest events + val currentState = latestEvents.fold(latestSnapshotState ?: initialState) { s, e -> evolve(s, e.first) } + // 4. Get the latest event version + val latestEventVersion = latestEvents.map { it.second }.lastOrNull() + // 5. Compute the new events, based on the latest events, latest snapshot state and the command, and save it + val newEvents = latestEvents.asFlow() + .map { it.first } + .computeNewEventsByOrchestrating(command, latestSnapshotState) { + it.fetchEvents( + Pair( + latestSnapshotState, + latestSnapshotVersion + ) + ).map { pair -> pair.first } + } + .save(latestEventVersion) + // 6. Get the new snapshot version = the last/latest event version + val newSnapshotVersion = newEvents.map { it.second }.lastOrNull() + // 7. Compute the new state, based on the current state and the command and save it conditionally + with(currentState.computeNewState(command)) { + if (shouldCreateNewSnapshot( + latestSnapshotState, + latestSnapshotVersion, + newSnapshotVersion + ) + ) + save(latestSnapshotVersion, newSnapshotVersion) + } + emitAll(newEvents) + } \ No newline at end of file diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt index 3da75762..cd97ef6d 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt @@ -40,21 +40,26 @@ interface EventComputation : IDecider { */ interface EventOrchestratingComputation : ISaga, IDecider { @ExperimentalCoroutinesApi - fun Flow.computeNewEventsByOrchestrating(command: C, fetchEvents: (C) -> Flow): Flow = flow { - val currentState = fold(initialState) { s, e -> evolve(s, e) } - var resultingEvents = decide(command, currentState) + fun Flow.computeNewEventsByOrchestrating(command: C, fetchEvents: (C) -> Flow): Flow = + computeNewEventsByOrchestrating(command, initialState, fetchEvents) - resultingEvents - .flatMapConcat { react(it) } - .onEach { c -> - val newEvents = flowOf(fetchEvents(c), resultingEvents) - .flattenConcat() - .computeNewEventsByOrchestrating(c, fetchEvents) - resultingEvents = flowOf(resultingEvents, newEvents).flattenConcat() - }.collect() + @ExperimentalCoroutinesApi + fun Flow.computeNewEventsByOrchestrating(command: C, latestSnapshot: S?, fetchEvents: (C) -> Flow): Flow = + flow { + val currentState = fold(latestSnapshot ?: initialState) { s, e -> evolve(s, e) } + var resultingEvents = decide(command, currentState) - emitAll(resultingEvents) - } + resultingEvents + .flatMapConcat { react(it) } + .onEach { c -> + val newEvents = flowOf(fetchEvents(c), resultingEvents) + .flattenConcat() + .computeNewEventsByOrchestrating(c, latestSnapshot, fetchEvents) + resultingEvents = flowOf(resultingEvents, newEvents).flattenConcat() + }.collect() + + emitAll(resultingEvents) + } } /** diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregate.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregate.kt index 0e95f46b..6d992609 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregate.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingSnapshottingAggregate.kt @@ -17,6 +17,7 @@ package com.fraktalio.fmodel.application import com.fraktalio.fmodel.domain.IDecider +import com.fraktalio.fmodel.domain.ISaga /** @@ -69,6 +70,59 @@ interface EventSourcingSnapshottingLockingAggregate : EventSnapshottingLockingRepository, StateLockingRepository +/** + * Event sourced, snapshotting, orchestrating aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]> / [EventOrchestratingComputation]<[C], [S], [E]> to handle commands and produce events. + * In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result. + * If the `decider` is combined out of many deciders via `combine` function, an optional `saga` of type [ISaga] could be used to react on new events and send new commands to the 'decider` recursively, in single transaction. + * This behaviour is formalized in [EventOrchestratingComputation]. + * Produced events are then stored via [EventSnapshottingRepository.save] suspending function. + * + * Additionally, Event sourcing aggregate enables `snapshotting` mechanism by using [StateOrchestratingComputation] and [StateRepository] interfaces to store and fetch the current state of the aggregate from time to time, removing the need to always fetch the full list of events. + * + * [EventSourcingSnapshottingOrchestratingAggregate] extends [EventOrchestratingComputation], [StateOrchestratingComputation], [StateRepository] and [EventSnapshottingRepository] interfaces, + * clearly communicating that it is composed out of these behaviours. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that this aggregate can publish + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +interface EventSourcingSnapshottingOrchestratingAggregate : + EventOrchestratingComputation, + StateOrchestratingComputation, + EventSnapshottingRepository, + StateRepository + +/** + * Event sourced, snapshotting, orchestrating and locking aggregate is using/delegating a `decider` of type [IDecider]<[C], [S], [E]> / [EventOrchestratingComputation]<[C], [S], [E]> to handle commands and produce events. + * In order to handle the command, aggregate needs to fetch the current state (represented as a list of events) via [EventSnapshottingLockingRepository.fetchEvents] function, and then delegate the command to the `decider` which can produce new event(s) as a result. + * If the `decider` is combined out of many deciders via `combine` function, an optional `saga` of type [ISaga]/[EventOrchestratingComputation] could be used to react on new events and send new commands to the 'decider` recursively, in single transaction. + * This behaviour is formalized in [EventOrchestratingComputation]. + * Produced events are then stored via [EventSnapshottingLockingRepository.save] suspending function. + * + * Locking Orchestrating Event sourcing aggregate enables `optimistic locking` mechanism more explicitly. + * If you fetch events from a storage, the application records the `version` number of that event stream. + * You can append new events, but only if the `version` number in the storage has not changed. + * If there is a `version` mismatch, it means that someone else has added the event(s) before you did. + * + * [EventSourcingSnapshottingLockingOrchestratingAggregate] extends [EventOrchestratingComputation], [StateOrchestratingComputation], [StateLockingRepository] and [EventLockingRepository] interfaces, + * clearly communicating that it is composed out of these behaviours. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that this aggregate can publish + * @param V Version + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +interface EventSourcingSnapshottingLockingOrchestratingAggregate : + EventOrchestratingComputation, + StateOrchestratingComputation, + EventSnapshottingLockingRepository, + StateLockingRepository + + /** * Event Sourced Snapshotting aggregate constructor-like function. * @@ -118,3 +172,58 @@ fun EventSourcingSnapshottingLockingAggregate( EventSnapshottingLockingRepository by eventRepository, StateLockingRepository by stateRepository, IDecider by decider {} + +/** + * Event Sourced Snapshotting Orchestrating aggregate constructor-like function. + * + * The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that are used internally to build/fold new state + * @param decider A decider component of type [IDecider]<[C], [S], [E]> + * @param saga A saga component of type [ISaga]<[E], [C]> + * @param eventRepository An aggregate event repository of type [EventSnapshottingRepository]<[C], [S], [E]> + * @return An object/instance of type [EventSourcingSnapshottingOrchestratingAggregate]<[C], [S], [E]> + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +fun EventSourcingSnapshottingOrchestratingAggregate( + decider: IDecider, + eventRepository: EventSnapshottingRepository, + stateRepository: StateRepository, + saga: ISaga +): EventSourcingSnapshottingOrchestratingAggregate = + object : EventSourcingSnapshottingOrchestratingAggregate, + EventSnapshottingRepository by eventRepository, + StateRepository by stateRepository, + IDecider by decider, + ISaga by saga {} + +/** + * Event Sourced Snapshotting Locking Orchestrating aggregate constructor-like function. + * + * The Delegation pattern has proven to be a good alternative to implementation inheritance, and Kotlin supports it natively requiring zero boilerplate code. + * + * @param C Commands of type [C] that this aggregate can handle + * @param S Aggregate state of type [S] + * @param E Events of type [E] that are used internally to build/fold new state + * @param V Version + * @param decider A decider component of type [IDecider]<[C], [S], [E]> + * @param saga A saga component of type [ISaga]<[E], [C]> + * @param eventRepository An aggregate event repository of type [EventSnapshottingLockingRepository]<[C], [S], [E], [V]> + * @return An object/instance of type [EventSourcingSnapshottingLockingOrchestratingAggregate]<[C], [S], [E], [V]> + * + * @author Иван Дугалић / Ivan Dugalic / @idugalic + */ +fun EventSourcingSnapshottingLockingOrchestratingAggregate( + decider: IDecider, + eventRepository: EventSnapshottingLockingRepository, + stateRepository: StateLockingRepository, + saga: ISaga +): EventSourcingSnapshottingLockingOrchestratingAggregate = + object : EventSourcingSnapshottingLockingOrchestratingAggregate, + EventSnapshottingLockingRepository by eventRepository, + StateLockingRepository by stateRepository, + IDecider by decider, + ISaga by saga {} \ No newline at end of file