diff --git a/docs/design-principles/0050-domain-driven-design.md b/docs/design-principles/0050-domain-driven-design.md index fa7bc2f9..a07a2902 100644 --- a/docs/design-principles/0050-domain-driven-design.md +++ b/docs/design-principles/0050-domain-driven-design.md @@ -961,37 +961,3 @@ public sealed class VehicleManagers : ValueObjectBase Unlike root aggregates and entities, there is no explicit way to verify the invariants of a constructed value object. The only invariants that need verifying are when the value object is constructed and then all data is required to be validated. This is always performed in the `Create` class factory. - -### Event Notifications - -In the design of most distributed systems of the nature of this system (or, of systems that are expected to evolve into distributed systems later), it is common to decouple each of the subdomains from each other. De-coupling effectively is absolutely vital to allowing the system to change, grow and evolve over time. Lack of effective de-coupling (at the technical level) is the main reason most software systems devolve into big-balls-of-mud, simply because of coupling. - -There are several techniques for de-coupling your subdomains, including: separating layers, using ports and adapters, starting with a modular monoliths and decomposing it into microservices later etc. - -Another one of these techniques is the use of Event-Driven Architecture (EDA), where change in communicated within and across boundaries. - -EDA relies on the fact that your system will emit "domain events", that it can share both within specific bounded contexts (as "domain events"), and externally to other systems (as "integration events". - -> When sharing events within a bounded context (or within the same process) the process can remain consistent, we call these "domain events". -> -> When sharing events across bounded contexts (or across processes and hosts) these events are called "integration events". - -In SaaStack: - -1. We use "domain events" to communicate changes (within the Domain Layer) and within all aggregates and entities. Regardless of whether we are using event sourcing for persistence or not. -2. We publish all "domain events" whenever the state of any aggregate is saved in any repository, via the `EventSourcingDddCommandStore` or via the `SnapshottingDddCommandStore`. -3. We treat "domain events" and "integration events" slightly differently: - 1. "domain events" are published synchronously and handled synchronously after the aggregate is saved, and are always consistent. - 2. "integration events" are published synchronously, but are expected to be handled asynchronously (by a message broker) and be eventually consistent. - -> We assume that all "domain events" are only ever published to other subdomains that are in the same "bounded context" and thus, also in the same host process. When this is not true, for example, if subdomains of the same bounded context are split into separate host processes, then these subdomains will need to communicate with "integration events" instead, and they will be eventually consistent. - -The synchronous publication of all "domain events" is handled automatically by the `IEventNotifyingStoreNotificationRelay` (after events have first been projected by the `IEventNotifyingStoreProjectionRelay`). - -![Eventing](../images/Persistence-Eventing.png) - -Domain events are published synchronously (round-robin) one at a time: - -1. First, to all registered `IDomainEventNotificationConsumer` consumers. These consumers can fail and report back errors that are captured synchronously. -2. Then to all registered `IIntegrationEventNotificationTranslator` translators, that have the option to translate o domain event into an integration event, or not. This translation can also fail, and report back errors that are captured synchronously. -3. Finally, if the translator translates a domain event into an integration event it is then published to the `IEventNotificationMessageBroker` that should send the integration event to some external message broker, who will deliver it asynchronous to external consumers. This can also fail, and report back errors that are captured synchronously diff --git a/docs/design-principles/0070-persistence.md b/docs/design-principles/0070-persistence.md index 7ed94be2..12b40d62 100644 --- a/docs/design-principles/0070-persistence.md +++ b/docs/design-principles/0070-persistence.md @@ -264,44 +264,4 @@ This is an example overview of how both these persistence flows work for the `Bo Regardless of the chosen persistence scheme, when the state of any aggregate is saved, it will yield some change events (most recently raised domain events, since the last rehydration) that can and will be relayed to other components in the system to drive Read Model Projections (Event Sourcing) and Notifications (Event Driven Architecture). -Read Models are typically always used within the same sub-domain or by other sub-domains running in the same process. Notifications are designed to be transmitted to remote subdomains or to other external systems - via an asynchronous "event broker". - -Projections and Notifications must be "consistent" with the update of the aggregates that produce the change events. This requires reliable implementations (e.g. Outbox Pattern) and they must guarantee delivery of the change events in order (e.g. FIFO Queues). If either of these technical requirements cannot be guaranteed then there is a high probability (when the system comes under load or stress) that downstream consumers of these change events will be permanently out of date, affecting data integrity of dependent systems. - -> By default, both mechanisms of updating read models and sending notifications should be done reliably and asynchronously after a source aggregate is changed, such that the collective system is eventually consistent. This asynchronous update (typically expected to take anywhere between ~100ms-500ms) means that read model data and consumers of notifications can be immediately out of date with the subdomains that update their source aggregates. -> -> When this update process is synchronous (and in-process), that part of the system will achieve 100% consistency, which is convenient, but this is not a true reality for when the system is eventually split up and has become a distributed system. (this is the goal of all modular monoliths). In distributed systems that are eventually consistent, API clients are required to employ different strategies to handle this eventual consistency, which are disruptive to switch to later when a monolithic backend becomes distributed. -> -> For example, if a client calls a command API and then after receiving a response, immediately calls a query API that would include the changed data, the queried data may have not yet been updated yet. This is one reason why commands should return changed data synchronously in their responses, to help clients predict changed data. -> -> Because of this constraint, it is better to start the modular monolith on an eventually consistent model rather than start a fully consistent model since these client strategies should to be established sooner rather than being later re-engineered. - -The following diagram illustrates the "logical" process that is triggered when an aggregate state is updated. - -![Persistence-Eventing](../images/Persistence-Eventing.png) - -> The implementation details of this "logical" process can be different depending on the specific "relay" mechanisms in place. - -#### Read Model Projections - -Read model "projections" are a mechanism to produce (and keep up to date) one (or more) "read models", which are typically "records" that represent the latest state of the aggregates and entities, in event sourced persistent schemes. These read models are typically used directly by CQRS queries. - -> Note: In snapshotting persistence schemes, "read models" are the exact same as the "write models", they share the same data. However, there is no "write model" in an event source scheme, and write models cannot be queried. - -One major advantage of producing "read models" is that they are all built from the historical stream of events. This means that we can have several of them at the same time, containing different data, and all coherent with each other, unlike what is possible with snapshotting stores. - -Another advantage of this scheme is that we can build several "denormalized" sets of records (e.g. in a relational database) that are optimized for specific queries - no longer requiring complex joins to give desired results. - -Another advantage (only available to event-sourced persistence scheme) is that we can rebuild any read model at any time, in any way we like, and never lose any historical data. Read models then become temporary and disposable. All the source data is in the event streams. The advantage here is that when the software changes and the queried data needs changing, we can use any of the historical data that already exists in the aggregate event streams to rebuild different data in completely new read models. - -> This capability is impossible in snapshotting persistence schemes. - -#### Notifications - -Notifications are the mechanism by which subdomains can communicate to other subdomains or to other systems about what is happening in the source subdomain. Particularly necessary in micros-services deployments. This is normally done in distributed systems with a message broker of some kind (i.e., a queue, a message bus, or a message broker). - -Change events raised through notifications are not expected to be coupled to consuming systems, so they can be mapped to more granular or coarse events. - -Consumers of notifications must register to receive notifications. - -A "notification registration" consists of a producer and a consumer. The producer translates the source `IDomainEvent` to an appropriate `IDomainEvent` to share outside the source component, and then that event is relayed to the consumer to handle. +See [Eventing](0170-eventing.md) for more details diff --git a/docs/design-principles/0170-eventing.md b/docs/design-principles/0170-eventing.md new file mode 100644 index 00000000..5c1fbaf5 --- /dev/null +++ b/docs/design-principles/0170-eventing.md @@ -0,0 +1,106 @@ +# Eventing + +Eventing is the gateway to Event Driven Architecture (EDA), but it starts in the Domain Layer of the software. + +## Design Principles + +1. We want all DDD aggregates to utilize domain events to drive use cases, irrespective of whether we are sourcing/capturing their current state from events (a.k.a Event Sourcing) or sourcing/capturing their current state from data snapshots. +2. For [event-sourcing persistence schemes](0070-persistence.md) (that are by definition "write models" only), we need to build associated "read models" so that we can query domain aggregates. +3. We want the flexibility to change our "read models" at any time, as the software changes, and ideally not have lost any data. +4. We may want denormalized data models to query for maximum efficiency. +5. We want to de-couple subdomains from each other as much as possible. Even for subdomains that are highly-coupled to begin with (e.g., `EndUsers` and `Organizations` and `Subscriptions`. +6. We want to deploy certain groups of subdomains into separate hosts and split the modular monolith into many APIs (i.e., micros-services) later in the lifecycle of the product, but not have to re-engineer flows to accommodate those changes. +7. We want the flexibility to make changes to key use cases in the product, without changing multiple subdomains at the same time. +8. We want to be able to communicate across process boundaries without coupling the processes. + +## Implementation + +In the design of most distributed systems, of the nature of this system or of systems that are expected to evolve into distributed systems later, it is common to decouple each of the subdomains from each other. De-coupling effectively is absolutely vital to allowing the system to change, grow, and evolve over time. + +Lack of effective de-coupling (at the technical level) is the main reason most software systems devolve into big-balls-of-mud, simply because of the coupling of many components to many other components often striving for maximum data and code reuse. + +There are several techniques for de-coupling your subdomains, including: separating layers, using ports and adapters, starting with a modular monoliths and decomposing it into microservices later etc. + +Another one of these techniques is the use of Event-Driven Architecture (EDA), where change is communicated within process boundaries, and across process boundaries. + +EDA relies on the fact that your system will emit "domain events" that it can share both within specific bounded contexts (as "domain events"), and externally to other systems (as "integration events"). + +> When sharing events within a bounded context (or within the same process), the process can remain consistent. We call these "domain events." +> +> When events are shared across bounded contexts (or across processes and hosts), they are called "integration events." + +In SaaStack: + +1. We use "domain events" to signal changes (within the Domain Layer) and within all aggregates and entities. Regardless of whether we are using event sourcing for persistence or not. +2. We publish all "domain events" whenever the state of any aggregate is saved in any repository via the `EventSourcingDddCommandStore` or via the `SnapshottingDddCommandStore`. +3. We treat "domain events" and "integration events" slightly differently: + 1. "domain events" are published synchronously and handled synchronously after the aggregate is saved, and are always consistent. + 2. "integration events" are published synchronously, but are expected to be handled asynchronously (by a message broker) and be eventually consistent. + +> We assume that all "domain events" are only ever published to other subdomains that are in the same "bounded context" and, thus, we also assume the bounded context in deployed in the same host process. When this is not true, for example, if subdomains of the same bounded context are split into separate host processes, then these subdomains will need to communicate with "integration events" instead, and they will necessarily become "eventually consistent" with each other. + +![Eventing](../images/Persistence-Eventing.png) + +The diagram above illustrates the "logical" process that is triggered when an aggregate state is updated. + +> The implementation details of this "logical" process can be different depending on the specific "relay" mechanisms in place. + +### Consistency + +Read model projections and notifications (one or more of them) are created by publishing "domain events" raised by aggregates from a specific subdomain. + +In a typical stateless API, this publication and subsequent updating of read models and notifications can occur synchronously or asynchronously with respect to the aggregates producing the "domain events" in a specific subdomain. + +Projections and Notifications should try to be "consistent" as possible with updating the aggregates that produce the change events as much as possible, although "eventually consistent" is also possible, and likely in highly distributed systems. + +Consistency requires "reliable" implementations (e.g., Outbox Pattern), and these implementations must guarantee the delivery of the "domain events" in order (e.g., FIFO Queues). If either of these technical requirements cannot be guaranteed then there is a high probability that when the system comes under load or stress, that downstream consumers of these change events will be permanently out of date, affecting the data integrity of downstream dependent systems. + +Eventual consistency can cause problems for clients who are making changes in synchronous "commands" and then shortly after, expecting that changed data in subsequent "queries". + +> By default, the mechanism of updating read models should be done reliably and asynchronously after a source aggregate is changed, such that the collective system is eventually consistent. This asynchronous update (typically expected to take anywhere between ~100ms-500ms) means that read model data and consumers of notifications can be "immediately" out of date with the subdomains that update their source aggregates. +> +> When this update process is synchronous (and in-process), that part of the system will achieve 100% consistency, which is convenient, but this is not a true reality for when the system is eventually split up and becomes a distributed system. (as is the goal of all modular monoliths). In distributed systems that are eventually consistent, API clients are required to employ different strategies to handle this eventual consistency, which are disruptive when switching later when a monolithic backend becomes distributed. +> +> For example, if a client calls a command API and then after receiving a response, immediately calls a query API that would include the changed data, the queried data may have not yet been updated yet. This is one reason why commands can return changed data synchronously in their responses, to help clients predict the changed data, and avoid the subsequent querying of it. +> +> Because of this constraint, it is better to start the modular monolith on an eventually consistent model rather than start a fully consistent model since these client strategies should be established sooner rather than later being re-engineered. + +### Read Model Projections + +A "Read Model" is the term used to describe data that is readable, usually by a query of some kind. + +A "Projection" is a word used in databases to "project" (verb) data from one container into a specific view of that data. + +> For example, in SQL databases, all `SELECT` statements represent projections of data into an output. + +Read Model Projections are essentially views of data produced by projecting "domain events" onto a data container. In most cases, projecting data into the rows of tables of a database, optimized for fast querying. + +Read model "projections" are a mechanism to produce (and keep up to date) one (or more) "read models", which are typically "records" that represent the latest state of the aggregates and entities in event-sourced persistent schemes. These read models are typically used directly by CQRS queries to retrieve the latest state of the system. + +> Note: In snapshotting persistence schemes, the "read models" already exist, and are the exact same as the "write models"; they share the exact same data. However, there is no "read model" in an event source scheme, and "write models" cannot be efficiently queried. + +One major advantage of producing "read models" is that they are all built from the historical stream of events. This means that we can have several of them at the same time, containing different data and all coherent with each other, unlike what is possible with snapshotting stores, with one and only one read model possible. + +Another advantage of having read models is that we can build several "denormalized" sets of records at the same time (e.g. in a relational database) that are optimized for specific queries - no longer requiring complex joins to give desired results. + +Another advantage (only available to event-sourced persistence scheme) is that we can rebuild any read model at any time, in any way we like, and never lose any historical data. Now that this is possible, read models become temporary and disposable (since we can rebuild them in any representation at any time). All the source data is in the event streams! The advantage here is that when the software changes and the queried data needs changing, we can use any of the historical data that already exists in the aggregate event streams to rebuild different data in completely new read models. No longer do you need to protect the one and only read model that defines the state of the system. + +> This capability is impossible in snapshotting persistence schemes. + +### Event Notifications + +Notifications are the mechanism by which subdomains can communicate to other subdomains (or to other processes) about what is happening in the source subdomain. This means that a source subdomain does not have to directly instruct another [dependent] target subdomain to update its state, when the source subdomain state changes. Typically, this is done by a direct synchronous method/API call. Now the target domain can simply react to the appearance of a "domain event" from the source subdomain, and take appropriate action. The coupling of the method/API call is gone. + +> This is particularly useful when you have highly inter-dependent subdomains, that require that their data be in sync with each other (i.e., `EndUser` memberships with `Organizations`. + +This characteristic is particularly necessary in distributed deployments, where direct calls are HTTP calls, requiring both the source and target subdomains to be responsive to each other. + +Instead, this decoupling via "integration events" would normally done in distributed systems with a message broker of some kind (i.e., a queue, a message bus, etc.). + +The synchronous publication of all "domain events" is handled automatically by the `IEventNotifyingStoreNotificationRelay` (after events have first been projected by the `IEventNotifyingStoreProjectionRelay`). + +Domain events are published synchronously (round-robin) one at a time: + +1. First, to all registered `IDomainEventNotificationConsumer` consumers. These consumers can fail and report back errors that are captured synchronously. +2. Then to all registered `IIntegrationEventNotificationTranslator` translators, that have the option to translate a "domain event" into an "integration event" or not. This translation can also fail, and report back errors that are captured synchronously. +3. Finally, if the translator translates a "domain event" into an "integration event" it is then published to the `IEventNotificationMessageBroker` that should send the "integration event" to some external message broker, who will deliver it asynchronous to external consumers. This can also fail, and report back errors that are captured synchronously. diff --git a/docs/design-principles/README.md b/docs/design-principles/README.md index dab6bccf..7f53fe97 100644 --- a/docs/design-principles/README.md +++ b/docs/design-principles/README.md @@ -10,6 +10,7 @@ * [Domain Driven Design](0050-domain-driven-design.md) how to design your aggregates, and domains * [Dependency Injection](0060-dependency-injection.md) how you implement DI * [Persistence](0070-persistence.md) how you design your repository layer, and promote domain events +* [Eventing](0170-eventing.md) how we implement Eventing and enable Event Driven Architecture * [Ports and Adapters](0080-ports-and-adapters.md) how we keep infrastructure components at arm's length, and testable, and how we integrate with any 3rd party system * [Authentication and Authorization](0090-authentication-authorization.md) how we authenticate and authorize users * [Email Delivery](0100-email-delivery.md) how we send emails and deliver them asynchronously and reliably @@ -17,4 +18,5 @@ * [Feature Flagging](0120-feature-flagging.md) how we enable and disable features at runtime * [Multi-Tenancy](0130-multitenancy.md) how we support multiple tenants in our system (both logical and physical infrastructure) * [Developer Tooling](0140-developer-tooling.md) all the tooling that is included in this codebase to help developers use this codebase effectively, and consistently -* [User Lifecycle](0160-user-lifecycle.md) how are users managed on the platform, and the relationship to their organizations \ No newline at end of file +* [User Lifecycle](0160-user-lifecycle.md) how are users managed on the platform, and the relationship to their organizations +* \ No newline at end of file diff --git a/docs/images/Persistence-Eventing.png b/docs/images/Persistence-Eventing.png index e52e1515..58459992 100644 Binary files a/docs/images/Persistence-Eventing.png and b/docs/images/Persistence-Eventing.png differ diff --git a/docs/images/Sources.pptx b/docs/images/Sources.pptx index 198e8733..68ea7bcc 100644 Binary files a/docs/images/Sources.pptx and b/docs/images/Sources.pptx differ diff --git a/src/Common.UnitTests/ErrorSpec.cs b/src/Common.UnitTests/ErrorSpec.cs index 0b643959..9505e6d0 100644 --- a/src/Common.UnitTests/ErrorSpec.cs +++ b/src/Common.UnitTests/ErrorSpec.cs @@ -1,5 +1,6 @@ using FluentAssertions; using Xunit; +using Environment = System.Environment; namespace Common.UnitTests; @@ -63,4 +64,45 @@ public void WhenWrapOnAnyErrorWithMessageAgain_ThenReturnsNewError() result.Message.Should() .Be($"anothermessage{Environment.NewLine}\tamessage{Environment.NewLine}\taruleviolation"); } + + [Fact] + public void WhenWrapWithCodeOnNoErrorWithNoMessage_ThenReturnsNewError() + { + var result = Error.NoError.Wrap(ErrorCode.Unexpected, ""); + + result.Code.Should().Be(ErrorCode.Unexpected); + result.Message.Should().Be(Error.NoErrorMessage); + } + + [Fact] + public void WhenWrapWithCodeOnNoErrorWithMessage_ThenReturnsNewError() + { + var result = Error.NoError.Wrap(ErrorCode.Unexpected, "amessage"); + + result.Code.Should().Be(ErrorCode.Unexpected); + result.Message.Should().Be($"{nameof(ErrorCode.NoError)}: amessage"); + } + + [Fact] + public void WhenWrapWithCodeOnAnyErrorWithMessage_ThenReturnsNewError() + { + var result = Error.RuleViolation("aruleviolation") + .Wrap(ErrorCode.Unexpected, "amessage"); + + result.Code.Should().Be(ErrorCode.Unexpected); + result.Message.Should().Be($"{nameof(ErrorCode.RuleViolation)}: amessage{Environment.NewLine}\taruleviolation"); + } + + [Fact] + public void WhenWrapWithCodeOnAnyErrorWithMessageAgain_ThenReturnsNewError() + { + var result = Error.RuleViolation("aruleviolation") + .Wrap(ErrorCode.EntityExists, "amessage") + .Wrap(ErrorCode.Unexpected, "anothermessage"); + + result.Code.Should().Be(ErrorCode.Unexpected); + result.Message.Should() + .Be( + $"{nameof(ErrorCode.EntityExists)}: anothermessage{Environment.NewLine}\t{nameof(ErrorCode.RuleViolation)}: amessage{Environment.NewLine}\taruleviolation"); + } } \ No newline at end of file diff --git a/src/Common/Error.cs b/src/Common/Error.cs index 97bdc6c3..348acc57 100644 --- a/src/Common/Error.cs +++ b/src/Common/Error.cs @@ -42,6 +42,21 @@ public Error Wrap(string message) ? $"{message}{Environment.NewLine}\t{Message}" : message); } + + /// + /// Wraps the existing message within the specified message, for the specified code + /// + public Error Wrap(ErrorCode code, string message) + { + if (message.HasNoValue()) + { + return new Error(code, Message); + } + + return new Error(code, Message.HasValue() && Message != NoErrorMessage + ? $"{Code}: {message}{Environment.NewLine}\t{Message}" + : $"{Code}: {message}"); + } #endif /// diff --git a/src/Infrastructure.Eventing.Common.UnitTests/Notifications/EventNotificationNotifierSpec.cs b/src/Infrastructure.Eventing.Common.UnitTests/Notifications/EventNotificationNotifierSpec.cs index af83b570..04b9c821 100644 --- a/src/Infrastructure.Eventing.Common.UnitTests/Notifications/EventNotificationNotifierSpec.cs +++ b/src/Infrastructure.Eventing.Common.UnitTests/Notifications/EventNotificationNotifierSpec.cs @@ -1,5 +1,6 @@ using Application.Persistence.Interfaces; using Common; +using Common.Extensions; using Domain.Common; using Domain.Common.Extensions; using Domain.Common.ValueObjects; @@ -26,7 +27,7 @@ public EventNotificationNotifierSpec() var recorder = new Mock(); var changeEventTypeMigrator = new ChangeEventTypeMigrator(); _registration = new Mock(); - _registration.Setup(p => p.IntegrationEventTranslator.RootAggregateType) + _registration.Setup(reg => reg.IntegrationEventTranslator.RootAggregateType) .Returns(typeof(string)); _registration.Setup(p => p.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny())) @@ -37,7 +38,7 @@ public EventNotificationNotifierSpec() _messageBroker = new Mock(); _messageBroker.Setup(mb => mb.PublishAsync(It.IsAny(), It.IsAny())) .ReturnsAsync(Result.Ok); - _registration.Setup(p => p.DomainEventConsumers) + _registration.Setup(reg => reg.DomainEventConsumers) .Returns([_domainConsumer.Object]); var registrations = new List { _registration.Object }; _notifier = new EventNotificationNotifier(recorder.Object, changeEventTypeMigrator, registrations, @@ -74,14 +75,15 @@ await _notifier.WriteEventStreamAsync("astreamname", [], _messageBroker.Verify(mb => mb.PublishAsync(It.IsAny(), It.IsAny()), Times.Never); _registration.Verify( - p => p.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny()), + reg => reg.IntegrationEventTranslator.TranslateAsync(It.IsAny(), + It.IsAny()), Times.Never); } [Fact] public async Task WhenWriteEventStreamAndNoRegisteredConsumers_ThenReturns() { - _registration.Setup(p => p.IntegrationEventTranslator.RootAggregateType) + _registration.Setup(reg => reg.IntegrationEventTranslator.RootAggregateType) .Returns(typeof(string)); var result = await _notifier.WriteEventStreamAsync("astreamname", [ @@ -104,7 +106,8 @@ public async Task WhenWriteEventStreamAndNoRegisteredConsumers_ThenReturns() _messageBroker.Verify(mb => mb.PublishAsync(It.IsAny(), It.IsAny()), Times.Never); _registration.Verify( - p => p.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny()), + reg => reg.IntegrationEventTranslator.TranslateAsync(It.IsAny(), + It.IsAny()), Times.Never); } @@ -153,7 +156,7 @@ public async Task WhenWriteEventStreamAndTranslatorDoesNotTranslateEvent_ThenOnl ], CancellationToken.None); result.Should().BeSuccess(); - _registration.Verify(p => p.IntegrationEventTranslator.TranslateAsync(It.Is(e => + _registration.Verify(reg => reg.IntegrationEventTranslator.TranslateAsync(It.Is(e => e.RootId == "aneventid" ), It.IsAny())); _domainConsumer.Verify(c => c.NotifyAsync(It.Is(ce => @@ -186,7 +189,7 @@ public async Task WhenWriteEventStreamWithSingleEvent_ThenNotifiesBothDomainAndI ], CancellationToken.None); result.Should().BeSuccess(); - _registration.Verify(p => p.IntegrationEventTranslator.TranslateAsync(It.Is(e => + _registration.Verify(reg => reg.IntegrationEventTranslator.TranslateAsync(It.Is(e => e.RootId == "aneventid" ), It.IsAny())); _domainConsumer.Verify(c => c.NotifyAsync(It.Is(ce => @@ -244,7 +247,7 @@ public async Task WhenWriteEventStreamWithMultipleEvents_ThenNotifiesBothDomainA ], CancellationToken.None); result.Should().BeSuccess(); - _registration.Verify(p => p.IntegrationEventTranslator.TranslateAsync(It.Is(e => + _registration.Verify(reg => reg.IntegrationEventTranslator.TranslateAsync(It.Is(e => e.RootId == "aneventid1" ), It.IsAny())); _domainConsumer.Verify(c => c.NotifyAsync(It.Is(e => @@ -253,7 +256,7 @@ public async Task WhenWriteEventStreamWithMultipleEvents_ThenNotifiesBothDomainA _messageBroker.Verify(mb => mb.PublishAsync(It.Is(e => e.RootId == "aneventid1" ), It.IsAny())); - _registration.Verify(p => p.IntegrationEventTranslator.TranslateAsync(It.Is(e => + _registration.Verify(reg => reg.IntegrationEventTranslator.TranslateAsync(It.Is(e => e.RootId == "aneventid2" ), It.IsAny())); _domainConsumer.Verify(c => c.NotifyAsync(It.Is(e => @@ -297,16 +300,92 @@ public async Task WhenWriteEventStreamAndDomainConsumerReturnsError_ThenStopsAnd } ], CancellationToken.None); - result.Should().BeError(ErrorCode.RuleViolation, "amessage"); + result.Should().BeError(ErrorCode.Unexpected, Error.RuleViolation("amessage") + .Wrap(Resources.EventNotificationNotifier_ConsumerError.Format("IDomainEventNotificationConsumerProxy", + "aneventid", typeof(TestDomainEvent).AssemblyQualifiedName!)).ToString()); _domainConsumer.Verify(c => c.NotifyAsync(It.Is(e => e.RootId == "aneventid" ), It.IsAny())); _registration.Verify( - p => p.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny()), + reg => reg.IntegrationEventTranslator.TranslateAsync(It.IsAny(), + It.IsAny()), Times.Never); _messageBroker.Verify(mb => mb.PublishAsync(It.IsAny(), It.IsAny()), Times.Never); + } + [Fact] + public async Task WhenWriteEventStreamAndIntegrationTranslatorReturnsError_ThenStopsAndReturnsError() + { + _registration.Setup(p => + p.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync((IDomainEvent domainEvent, CancellationToken _) => + new TestIntegrationEvent(domainEvent.RootId).ToOptional()); + _registration.Setup(reg => + reg.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(Error.RuleViolation("amessage")); + + var result = await _notifier.WriteEventStreamAsync("astreamname", [ + new EventStreamChangeEvent + { + Id = "anid1", + RootAggregateType = nameof(String), + Data = new TestDomainEvent { RootId = "aneventid" }.ToEventJson(), + Version = 0, + Metadata = new EventMetadata(typeof(TestDomainEvent).AssemblyQualifiedName!), + EventType = null!, + LastPersistedAtUtc = default, + StreamName = null! + } + ], CancellationToken.None); + + result.Should().BeError(ErrorCode.Unexpected, Error.RuleViolation("amessage") + .Wrap(Resources.EventNotificationNotifier_TranslatorError.Format( + "IIntegrationEventNotificationTranslatorProxy", + "aneventid", typeof(TestDomainEvent).AssemblyQualifiedName!)).ToString()); + _domainConsumer.Verify(c => c.NotifyAsync(It.Is(e => + e.RootId == "aneventid" + ), It.IsAny())); + _registration.Verify( + reg => reg.IntegrationEventTranslator.TranslateAsync(It.IsAny(), + It.IsAny())); + _messageBroker.Verify(mb => mb.PublishAsync(It.IsAny(), It.IsAny()), + Times.Never); } + [Fact] + public async Task WhenWriteEventStreamAndMessageBrokerReturnsError_ThenStopsAndReturnsError() + { + _registration.Setup(p => + p.IntegrationEventTranslator.TranslateAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync((IDomainEvent domainEvent, CancellationToken _) => + new TestIntegrationEvent(domainEvent.RootId).ToOptional()); + _messageBroker.Setup(mb => mb.PublishAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(Error.RuleViolation("amessage")); + + var result = await _notifier.WriteEventStreamAsync("astreamname", [ + new EventStreamChangeEvent + { + Id = "anid1", + RootAggregateType = nameof(String), + Data = new TestDomainEvent { RootId = "aneventid" }.ToEventJson(), + Version = 0, + Metadata = new EventMetadata(typeof(TestDomainEvent).AssemblyQualifiedName!), + EventType = null!, + LastPersistedAtUtc = default, + StreamName = null! + } + ], CancellationToken.None); + + result.Should().BeError(ErrorCode.Unexpected, Error.RuleViolation("amessage") + .Wrap(Resources.EventNotificationNotifier_MessageBrokerError.Format("IEventNotificationMessageBrokerProxy", + "aneventid", typeof(TestDomainEvent).AssemblyQualifiedName!)).ToString()); + _domainConsumer.Verify(c => c.NotifyAsync(It.Is(e => + e.RootId == "aneventid" + ), It.IsAny())); + _registration.Verify( + reg => reg.IntegrationEventTranslator.TranslateAsync(It.IsAny(), + It.IsAny())); + _messageBroker.Verify(mb => mb.PublishAsync(It.IsAny(), It.IsAny())); + } } \ No newline at end of file diff --git a/src/Infrastructure.Eventing.Common/Notifications/EventNotificationNotifier.cs b/src/Infrastructure.Eventing.Common/Notifications/EventNotificationNotifier.cs index 32dd14f4..0ada82d1 100644 --- a/src/Infrastructure.Eventing.Common/Notifications/EventNotificationNotifier.cs +++ b/src/Infrastructure.Eventing.Common/Notifications/EventNotificationNotifier.cs @@ -101,7 +101,7 @@ private async Task> RelayEventStreamToAllConsumersInOrderAsync( var @event = deserialized.Value; var domainEventsRelayed = - await RelayDomainEventToAllConsumersAsync(registration, @event, cancellationToken); + await RelayDomainEventToAllConsumersAsync(registration, changeEvent, @event, cancellationToken); if (!domainEventsRelayed.IsSuccessful) { return domainEventsRelayed.Error; @@ -120,18 +120,23 @@ private async Task> RelayEventStreamToAllConsumersInOrderAsync( private static async Task> RelayDomainEventToAllConsumersAsync( IEventNotificationRegistration registration, - IDomainEvent @event, CancellationToken cancellationToken) + EventStreamChangeEvent changeEvent, IDomainEvent @event, CancellationToken cancellationToken) { if (registration.DomainEventConsumers.HasNone()) { return Result.Ok; } - var results = await Task.WhenAll(registration.DomainEventConsumers - .Select(consumer => consumer.NotifyAsync(@event, cancellationToken))); - if (results.Any(r => !r.IsSuccessful)) + foreach (var consumer in registration.DomainEventConsumers) { - return results.First(r => !r.IsSuccessful).Error; + var result = await consumer.NotifyAsync(@event, cancellationToken); + if (!result.IsSuccessful) + { + return result.Error + .Wrap(ErrorCode.Unexpected, Resources.EventNotificationNotifier_ConsumerError.Format( + consumer.GetType().Name, @event.RootId, + changeEvent.Metadata.Fqn)); + } } return Result.Ok; @@ -144,9 +149,10 @@ private async Task> RelayIntegrationEventToBrokerAsync( var published = await registration.IntegrationEventTranslator.TranslateAsync(@event, cancellationToken); if (!published.IsSuccessful) { - return published.Error.Wrap(Resources.EventNotificationNotifier_ProducerError.Format( + return published.Error.Wrap(ErrorCode.Unexpected, + Resources.EventNotificationNotifier_TranslatorError.Format( registration.IntegrationEventTranslator.GetType().Name, - @event, changeEvent.Metadata.Fqn)); + @event.RootId, changeEvent.Metadata.Fqn)); } var publishedEvent = published.Value; @@ -162,7 +168,9 @@ private async Task> RelayIntegrationEventToBrokerAsync( var brokered = await _messageBroker.PublishAsync(integrationEvent, cancellationToken); if (!brokered.IsSuccessful) { - return brokered.Error; + return brokered.Error.Wrap(ErrorCode.Unexpected, + Resources.EventNotificationNotifier_MessageBrokerError.Format( + _messageBroker.GetType().Name, @event.RootId, changeEvent.Metadata.Fqn)); } return Result.Ok; diff --git a/src/Infrastructure.Eventing.Common/Projections/ReadModelProjector.cs b/src/Infrastructure.Eventing.Common/Projections/ReadModelProjector.cs index 08cf7bc6..f0bd4abc 100644 --- a/src/Infrastructure.Eventing.Common/Projections/ReadModelProjector.cs +++ b/src/Infrastructure.Eventing.Common/Projections/ReadModelProjector.cs @@ -110,17 +110,21 @@ private static async Task> ProjectEventAsync(IReadModelProjection var projected = await projection.ProjectEventAsync(@event, cancellationToken); if (!projected.IsSuccessful) { - return projected.Error.Wrap(Resources.ReadModelProjector_ProjectionError_HandlerError.Format( - projection.GetType().Name, - changeEvent.Id, changeEvent.Metadata.Fqn)); + return projected.Error.Wrap(ErrorCode.Unexpected, + Resources.ReadModelProjector_ProjectionError_HandlerError.Format( + projection.GetType().Name, + changeEvent.Id, changeEvent.Metadata.Fqn)); } +#if TESTINGONLY if (!projected.Value) { + //Note: this is for local development and testing only to ensure all events are configured return Error.Unexpected(Resources.ReadModelProjector_ProjectionError_MissingHandler.Format( projection.GetType().Name, changeEvent.Id, changeEvent.Metadata.Fqn)); } +#endif return Result.Ok; } diff --git a/src/Infrastructure.Eventing.Common/Resources.Designer.cs b/src/Infrastructure.Eventing.Common/Resources.Designer.cs index 035041c7..b41f5880 100644 --- a/src/Infrastructure.Eventing.Common/Resources.Designer.cs +++ b/src/Infrastructure.Eventing.Common/Resources.Designer.cs @@ -60,11 +60,29 @@ internal Resources() { } /// - /// Looks up a localized string similar to The producer '{0}' failed to handle the domain event '{1}' with event type '{2}'. Aborting notifications. + /// Looks up a localized string similar to The consumer '{0}' failed to handle the domain event '{1}' with event type '{2}'. Aborting notifications. /// - internal static string EventNotificationNotifier_ProducerError { + internal static string EventNotificationNotifier_ConsumerError { get { - return ResourceManager.GetString("EventNotificationNotifier_ProducerError", resourceCulture); + return ResourceManager.GetString("EventNotificationNotifier_ConsumerError", resourceCulture); + } + } + + /// + /// Looks up a localized string similar to The message broker '{0}' failed to handle the integration event '{1}' with event type '{2}'. Aborting notifications. + /// + internal static string EventNotificationNotifier_MessageBrokerError { + get { + return ResourceManager.GetString("EventNotificationNotifier_MessageBrokerError", resourceCulture); + } + } + + /// + /// Looks up a localized string similar to The translator '{0}' failed to handle the domain event '{1}' with event type '{2}'. Aborting notifications. + /// + internal static string EventNotificationNotifier_TranslatorError { + get { + return ResourceManager.GetString("EventNotificationNotifier_TranslatorError", resourceCulture); } } diff --git a/src/Infrastructure.Eventing.Common/Resources.resx b/src/Infrastructure.Eventing.Common/Resources.resx index 2976d494..b49d6317 100644 --- a/src/Infrastructure.Eventing.Common/Resources.resx +++ b/src/Infrastructure.Eventing.Common/Resources.resx @@ -36,7 +36,13 @@ The event stream {0} is at checkpoint '{1}', but new events are at version {2}. Perhaps some event history is missing? - - The producer '{0}' failed to handle the domain event '{1}' with event type '{2}'. Aborting notifications + + The translator '{0}' failed to handle the domain event '{1}' with event type '{2}'. Aborting notifications + + + The consumer '{0}' failed to handle the domain event '{1}' with event type '{2}'. Aborting notifications + + + The message broker '{0}' failed to handle the integration event '{1}' with event type '{2}'. Aborting notifications \ No newline at end of file diff --git a/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Notifications/InProcessSynchronousNotificationRelay.cs b/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Notifications/InProcessSynchronousNotificationRelay.cs index d35413f9..f804b367 100644 --- a/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Notifications/InProcessSynchronousNotificationRelay.cs +++ b/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Notifications/InProcessSynchronousNotificationRelay.cs @@ -10,15 +10,17 @@ namespace Infrastructure.Hosting.Common.ApplicationServices.Eventing.Notificatio /// Defines an in-process service that subscribes to one or more /// instances, listens to them raise change events, and relays them to listening consumers synchronously. /// -public class InProcessSynchronousNotificationRelay : EventStreamHandlerBase, +public sealed class InProcessSynchronousNotificationRelay : EventStreamHandlerBase, IEventNotifyingStoreNotificationRelay { + private readonly IEventNotificationNotifier _notifier; + public InProcessSynchronousNotificationRelay(IRecorder recorder, IEventSourcedChangeEventMigrator migrator, IEventNotificationMessageBroker messageBroker, IEnumerable registrations, params IEventNotifyingStore[] eventingStores) : base(recorder, eventingStores) { - Notifier = new EventNotificationNotifier(recorder, migrator, registrations.ToList(), messageBroker); + _notifier = new EventNotificationNotifier(recorder, migrator, registrations.ToList(), messageBroker); } protected override void Dispose(bool disposing) @@ -26,15 +28,13 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); if (disposing) { - (Notifier as IDisposable)?.Dispose(); + (_notifier as IDisposable)?.Dispose(); } } - public IEventNotificationNotifier Notifier { get; } - protected override async Task> HandleStreamEventsAsync(string streamName, List eventStream, CancellationToken cancellationToken) { - return await Notifier.WriteEventStreamAsync(streamName, eventStream, cancellationToken); + return await _notifier.WriteEventStreamAsync(streamName, eventStream, cancellationToken); } } \ No newline at end of file diff --git a/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Projections/InProcessSynchronousProjectionRelay.cs b/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Projections/InProcessSynchronousProjectionRelay.cs index 82f1feb3..5c78345d 100644 --- a/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Projections/InProcessSynchronousProjectionRelay.cs +++ b/src/Infrastructure.Hosting.Common/ApplicationServices/Eventing/Projections/InProcessSynchronousProjectionRelay.cs @@ -11,13 +11,15 @@ namespace Infrastructure.Hosting.Common.ApplicationServices.Eventing.Projections /// instances, listens to them raise change events, and relays them to /// registered read model projections synchronously. /// -public class InProcessSynchronousProjectionRelay : EventStreamHandlerBase, IEventNotifyingStoreProjectionRelay +public sealed class InProcessSynchronousProjectionRelay : EventStreamHandlerBase, IEventNotifyingStoreProjectionRelay { + private readonly IReadModelProjector _projector; + public InProcessSynchronousProjectionRelay(IRecorder recorder, IEventSourcedChangeEventMigrator migrator, IProjectionCheckpointRepository checkpointStore, IEnumerable projections, params IEventNotifyingStore[] eventingStores) : base(recorder, eventingStores) { - Projector = new ReadModelProjector(recorder, checkpointStore, migrator, projections.ToArray()); + _projector = new ReadModelProjector(recorder, checkpointStore, migrator, projections.ToArray()); } protected override void Dispose(bool disposing) @@ -25,15 +27,13 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); if (disposing) { - (Projector as IDisposable)?.Dispose(); + (_projector as IDisposable)?.Dispose(); } } - public IReadModelProjector Projector { get; } - protected override async Task> HandleStreamEventsAsync(string streamName, List eventStream, CancellationToken cancellationToken) { - return await Projector.WriteEventStreamAsync(streamName, eventStream, cancellationToken); + return await _projector.WriteEventStreamAsync(streamName, eventStream, cancellationToken); } } \ No newline at end of file diff --git a/src/Infrastructure.Persistence.Common.UnitTests/Extensions/EventNotifyingStoreExtensionsSpec.cs b/src/Infrastructure.Persistence.Common.UnitTests/Extensions/EventNotifyingStoreExtensionsSpec.cs index df99ae9f..6aefe275 100644 --- a/src/Infrastructure.Persistence.Common.UnitTests/Extensions/EventNotifyingStoreExtensionsSpec.cs +++ b/src/Infrastructure.Persistence.Common.UnitTests/Extensions/EventNotifyingStoreExtensionsSpec.cs @@ -34,7 +34,7 @@ public async Task WhenSaveAndPublishEventsAsyncAndAggregateHasNoChanges_ThenRetu .Returns(new Result, Error>(new List())); var wasCalled = false; - var result = await store.Object.SaveAndPublishEventsAsync(aggregate.Object, + var result = await store.Object.SaveAndPublishChangesAsync(aggregate.Object, OnEventStreamChanged, (_, _, _) => { wasCalled = true; @@ -61,7 +61,7 @@ public async Task WhenSaveAndPublishEventsAsyncAndAggregateHasChangesButSaveFail })); var wasCalled = false; - var result = await store.Object.SaveAndPublishEventsAsync(aggregate.Object, + var result = await store.Object.SaveAndPublishChangesAsync(aggregate.Object, OnEventStreamChanged, (_, _, _) => { wasCalled = true; @@ -90,15 +90,17 @@ public async Task WhenSaveAndPublishEventsAsyncAndAggregateHasChangesButPublishF _failHandler = true; var wasCalled = false; - var result = await store.Object.SaveAndPublishEventsAsync(aggregate.Object, + var result = await store.Object.SaveAndPublishChangesAsync(aggregate.Object, OnEventStreamChanged, (_, _, _) => { wasCalled = true; return Task.FromResult>("aname"); }, CancellationToken.None); - result.Should().BeError(ErrorCode.RuleViolation, - $"{Resources.EventSourcingDddCommandStore_PublishFailed.Format("aname")}{Environment.NewLine}\tamessage"); + result.Should().BeError(ErrorCode.Unexpected, + Error.RuleViolation( + $"{Resources.EventSourcingDddCommandStore_PublishFailed.Format("aname")}{Environment.NewLine}\tamessage") + .ToString()); wasCalled.Should().BeTrue(); aggregate.Verify(a => a.GetChanges()); aggregate.Verify(a => a.ClearChanges()); @@ -123,7 +125,7 @@ public async Task WhenSaveAndPublishEventsAsyncAndAggregateHasChanges_ThenSavesA })); var wasCalled = false; - var result = await store.Object.SaveAndPublishEventsAsync(aggregate.Object, + var result = await store.Object.SaveAndPublishChangesAsync(aggregate.Object, OnEventStreamChanged, (_, _, _) => { wasCalled = true; diff --git a/src/Infrastructure.Persistence.Common/EventSourcingDddCommandStore.cs b/src/Infrastructure.Persistence.Common/EventSourcingDddCommandStore.cs index 7e9c3c58..29fe2105 100644 --- a/src/Infrastructure.Persistence.Common/EventSourcingDddCommandStore.cs +++ b/src/Infrastructure.Persistence.Common/EventSourcingDddCommandStore.cs @@ -99,7 +99,7 @@ public async Task> SaveAsync(TAggregateRoot aggregate, Cancellatio return Error.EntityExists(Resources.IEventSourcingDddCommandStore_SaveWithAggregateIdMissing); } - var published = await this.SaveAndPublishEventsAsync(aggregate, OnEventStreamChanged, + var published = await this.SaveAndPublishChangesAsync(aggregate, OnEventStreamChanged, (root, changedEvents, token) => _eventStore.AddEventsAsync(_entityName, root.Id.Value, changedEvents, token), cancellationToken); if (!published.IsSuccessful) diff --git a/src/Infrastructure.Persistence.Common/Extensions/EventNotifyingStoreExtensions.cs b/src/Infrastructure.Persistence.Common/Extensions/EventNotifyingStoreExtensions.cs index 7ec12c1c..9bb6a1f1 100644 --- a/src/Infrastructure.Persistence.Common/Extensions/EventNotifyingStoreExtensions.cs +++ b/src/Infrastructure.Persistence.Common/Extensions/EventNotifyingStoreExtensions.cs @@ -9,9 +9,9 @@ namespace Infrastructure.Persistence.Common.Extensions; public static class EventNotifyingStoreExtensions { /// - /// Saves and then publishes all events from the aggregate root to any listeners + /// Saves and then publishes all events from the aggregate root to any event handlers /// - public static async Task> SaveAndPublishEventsAsync(this IEventNotifyingStore store, + public static async Task> SaveAndPublishChangesAsync(this IEventNotifyingStore store, TAggregateRoot aggregate, EventStreamChangedAsync? eventHandler, Func, CancellationToken, Task>> onSave, CancellationToken cancellationToken) @@ -39,7 +39,7 @@ public static async Task> SaveAndPublishEventsAsync> SaveAndPublishEventsAsync> PublishChangesAsync(IEventNotifyingStore store, + EventStreamChangedAsync? eventHandler, + IEnumerable changes, string streamName, CancellationToken cancellationToken) + { + if (eventHandler.NotExists()) + { + return Result.Ok; + } + + var changeEvents = changes + .Select(changeEvent => ToChangeEvent(changeEvent, streamName)) + .ToList(); + + return await NotifyEventHandlers(store, eventHandler, streamName, changeEvents, cancellationToken); + } + private static EventStreamChangeEvent ToChangeEvent(EventSourcedChangeEvent changeEvent, string streamName) { return new EventStreamChangeEvent @@ -63,31 +79,26 @@ private static EventStreamChangeEvent ToChangeEvent(EventSourcedChangeEvent chan }; } - private static async Task> PublishChangeEventsAsync(IEventNotifyingStore store, - EventStreamChangedAsync? eventHandler, - IEnumerable changes, string streamName, CancellationToken cancellationToken) - { - if (eventHandler.NotExists()) - { - return Result.Ok; - } - - var changeEvents = changes - .Select(changeEvent => ToChangeEvent(changeEvent, streamName)) - .ToList(); - - return await PublishToListeners(store, eventHandler, streamName, changeEvents, cancellationToken); - } - - private static async Task> PublishToListeners(IEventNotifyingStore store, + private static async Task> NotifyEventHandlers(IEventNotifyingStore store, EventStreamChangedAsync eventHandler, string streamName, IReadOnlyList changeEvents, CancellationToken cancellationToken) { var args = new EventStreamChangedArgs(changeEvents); eventHandler.Invoke(store, args, cancellationToken); var completed = await args.CompleteAsync(); - return !completed.IsSuccessful - ? completed.Error.Wrap(Resources.EventSourcingDddCommandStore_PublishFailed.Format(streamName)) - : Result.Ok; + + if (completed.IsSuccessful) + { + return Result.Ok; + } + + var error = completed.Error; + if (error.Code == ErrorCode.Unexpected) + { + return error; + } + + return completed.Error.Wrap(ErrorCode.Unexpected, + Resources.EventSourcingDddCommandStore_PublishFailed.Format(streamName)); } } \ No newline at end of file diff --git a/src/Infrastructure.Persistence.Common/SnapshottingDddCommandStore.cs b/src/Infrastructure.Persistence.Common/SnapshottingDddCommandStore.cs index 30dea8f7..036ee2a4 100644 --- a/src/Infrastructure.Persistence.Common/SnapshottingDddCommandStore.cs +++ b/src/Infrastructure.Persistence.Common/SnapshottingDddCommandStore.cs @@ -219,7 +219,7 @@ public async Task> UpsertAsync(TAggregateR return updated; } - var published = await this.SaveAndPublishEventsAsync(aggregate, OnEventStreamChanged, (_, _, _) => + var published = await this.SaveAndPublishChangesAsync(aggregate, OnEventStreamChanged, (_, _, _) => { var aggregateName = $"{_containerName}_{entity.Id}"; return Task.FromResult>(aggregateName);