From 97316648af816b64889f87d509c12511af973582 Mon Sep 17 00:00:00 2001 From: Timothy Baldridge Date: Thu, 11 Jan 2024 15:31:03 -0700 Subject: [PATCH] More WIP --- .../IAttribute.cs | 6 ++ .../IEventStore.cs | 18 ++-- .../ISnapshotEventIngester.cs | 13 ++- .../TransactionId.cs | 10 +++ .../InMemoryEventStore.cs | 88 +++++++++++++------ 5 files changed, 98 insertions(+), 37 deletions(-) diff --git a/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs b/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs index b4df07a7..b6392465 100644 --- a/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs +++ b/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs @@ -16,6 +16,12 @@ public interface IAttribute /// The name of the attribute, needs to be unique in a given entity but not unique across entities. /// public string Name { get; } + + /// + /// Create an abstract accumulator for the attribute + /// + /// + public IAccumulator CreateAccumulator(); } /// diff --git a/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs b/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs index d59e35a7..951754be 100644 --- a/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs +++ b/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs @@ -23,18 +23,22 @@ public interface IEventStore /// The ingester to handle the events /// If true, plays the events in reverse /// - public void EventsForEntity(EntityId entityId, TIngester ingester) + public void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) where TIngester : IEventIngester; /// - /// Replays the most recent snapshot for the given entity id, if one exists, then - /// replays every event. + /// Gets the most recent snapshot for an entity that was taken before asOf. If no snapshot is found + /// the TransactionId will be default, otherwise it will be the transaction id of the snapshot. If + /// the snapshot's entity revision is not equal to the revision parameter, the snapshot is invalid + /// and default will be returned. /// + /// /// - /// - /// - public void EventsAndSnapshotForEntity(EntityId entityId, TIngester ingester) - where TIngester : ISnapshotEventIngester; + /// + /// + /// + public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort revision, + out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes); /// /// Sets the snapshot for the given entity id and transaction id. diff --git a/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs b/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs index 768fbb88..211fb34f 100644 --- a/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs +++ b/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs @@ -5,10 +5,15 @@ /// public interface ISnapshotEventIngester : IEventIngester { + /// - /// This method will be called for each attribute snapshotted, before the normal event ingestion is called + /// Ingests a snapshot of an entity, a false return value means the snapshot is invalid + /// and the entity should be rebuilt from scratch (by replaying all events) /// - /// - /// - public void IngestSnapshotAttribute(string attributeName, IAttribute attribute); + /// + /// + /// + public bool IngestSnapshot(EntityDefinition definition, + (IAttribute Attribute, IAccumulator Accumulator)[] attributes); + } diff --git a/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs b/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs index 46962058..051597cb 100644 --- a/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs +++ b/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs @@ -13,6 +13,16 @@ namespace NexusMods.EventSourcing.Abstractions; public readonly partial struct TransactionId { + /// + /// The lowest possible (and default constructed) transaction id. + /// + public static readonly TransactionId Min = new(0); + + /// + /// The highest possible transaction id. + /// + public static readonly TransactionId Max = new(ulong.MaxValue); + /// /// Get the next transaction id. /// diff --git a/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs b/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs index 6214273f..0ce2ba44 100644 --- a/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs +++ b/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs @@ -1,4 +1,5 @@ using System.Buffers; +using System.Buffers.Binary; using NexusMods.EventSourcing.Abstractions; using NexusMods.EventSourcing.Abstractions.Serialization; using Reloaded.Memory.Extensions; @@ -15,11 +16,13 @@ public class InMemoryEventStore : IEventStore private readonly IVariableSizeSerializer _stringSerializer; private readonly PooledMemoryBufferWriter _writer; private readonly ISerializationRegistry _serializationRegistry; + private readonly IVariableSizeSerializer _entityDefinitionSerializer; public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry) { _serializer = serializer; _stringSerializer = (serializationRegistry.GetSerializer(typeof(string)) as IVariableSizeSerializer)!; + _entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IVariableSizeSerializer)!; _serializationRegistry = serializationRegistry; _writer = new PooledMemoryBufferWriter(); } @@ -48,7 +51,7 @@ public TransactionId Add(T entity) where T : IEvent } - public void EventsForEntity(EntityId entityId, TIngester ingester) + public void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) where TIngester : IEventIngester { if (!_events.TryGetValue(entityId, out var events)) @@ -56,56 +59,89 @@ public void EventsForEntity(EntityId entityId, TIngester ingester) foreach (var data in events) { + if (data.TxId < fromId) continue; + if (data.TxId > toId) break; + var @event = _serializer.Deserialize(data.Data)!; if (!ingester.Ingest(data.TxId, @event)) break; } } - public void EventsAndSnapshotForEntity(TransactionId asOf, EntityId entityId, TIngester ingester) where TIngester : ISnapshotEventIngester + public TransactionId EventsAndSnapshotForEntity(TransactionId asOf, EntityId entityId, ushort revision, + out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes) { if (!_snapshots.TryGetValue(entityId, out var snapshots)) - return; + { + loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>(); + return default; + } var startPoint = snapshots.LastOrDefault(s => s.Key <= asOf); - if (startPoint != default) + if (startPoint.Value == default) { - var snapshot = startPoint.Value.AsSpanFast(); - - while (snapshot.Length > 0) - { - var attributeName = _stringSerializer.Deserialize(snapshot, out var read); - snapshot = snapshot.SliceFast(read); - var accumulator = _serializationRegistry.GetAccumulator(attributeName); - accumulator.ReadFrom(snapshot, _serializationRegistry, out read); - snapshot = snapshot.Slice(read); - ingester.IngestSnapshot(attributeName, accumulator); - } - + loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>(); + return default; } - foreach (var (txId, data) in snapshots.) + var snapshot = (ReadOnlySpan)startPoint.Value.AsSpanFast(); + var offset = _entityDefinitionSerializer.Deserialize(snapshot, out var entityDefinition); + + if (entityDefinition.Revision != revision) { - var @event = _serializer.Deserialize(data)!; - if (!ingester.Ingest(txId, @event)) break; + loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>(); + return default; } - if (!_events.TryGetValue(entityId, out var events)) - return; + snapshot = snapshot.SliceFast(offset); - foreach (var data in events) + var numberOfAttrs = BinaryPrimitives.ReadUInt16BigEndian(snapshot); + snapshot = snapshot.SliceFast(sizeof(ushort)); + + var results = GC.AllocateUninitializedArray<(IAttribute, IAccumulator)>(numberOfAttrs); + + if (!EntityStructureRegistry.TryGetAttributes(entityDefinition.Type, out var attributes)) + throw new Exception("Entity definition does not match the current structure registry."); + + for (var i = 0; i < numberOfAttrs; i++) { - var @event = _serializer.Deserialize(data.Data)!; - if (!ingester.Ingest(data.TxId, @event)) break; + var read = _stringSerializer.Deserialize(snapshot, out var attributeName); + snapshot = snapshot.SliceFast(read); + + if (!attributes.TryGetValue(attributeName, out var attribute)) + throw new Exception("Entity definition does not match the current structure registry."); + + var accumulator = attribute.CreateAccumulator(); + + accumulator.ReadFrom(ref snapshot, _serializationRegistry); + snapshot = snapshot.SliceFast(read); + + results[i] = (attribute, accumulator); } + + loadedAttributes = results; + return startPoint.Key; } - public void SetSnapshot(TransactionId txId, EntityId id, IEnumerable<(string AttributeName, IAccumulator Accumulator)> attributes) + public void SetSnapshot(TransactionId txId, EntityId id, IDictionary attributes) { _writer.Reset(); - foreach (var (attributeName, accumulator) in attributes) + // Snapshot starts with the type attribute value + var typeAccumulator = attributes[IEntity.TypeAttribute]; + typeAccumulator.WriteTo(_writer, _serializationRegistry); + + var sizeSpan = _writer.GetSpan(sizeof(ushort)); + BinaryPrimitives.WriteUInt16BigEndian(sizeSpan, (ushort) attributes.Count); + _writer.Advance(sizeof(ushort)); + + + // And then each attribute in any order + foreach (var (attribute, accumulator) in attributes) { + if (attribute == IEntity.TypeAttribute) continue; + + var attributeName = attribute.Name; _stringSerializer.Serialize(attributeName, _writer); accumulator.WriteTo(_writer, _serializationRegistry); }