diff --git a/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs b/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs
index b4df07a7..b6392465 100644
--- a/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs
+++ b/src/NexusMods.EventSourcing.Abstractions/IAttribute.cs
@@ -16,6 +16,12 @@ public interface IAttribute
/// The name of the attribute, needs to be unique in a given entity but not unique across entities.
///
public string Name { get; }
+
+ ///
+ /// Create an abstract accumulator for the attribute
+ ///
+ ///
+ public IAccumulator CreateAccumulator();
}
///
diff --git a/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs b/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs
index d59e35a7..951754be 100644
--- a/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs
+++ b/src/NexusMods.EventSourcing.Abstractions/IEventStore.cs
@@ -23,18 +23,22 @@ public interface IEventStore
/// The ingester to handle the events
/// If true, plays the events in reverse
///
- public void EventsForEntity(EntityId entityId, TIngester ingester)
+ public void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
where TIngester : IEventIngester;
///
- /// Replays the most recent snapshot for the given entity id, if one exists, then
- /// replays every event.
+ /// Gets the most recent snapshot for an entity that was taken before asOf. If no snapshot is found
+ /// the TransactionId will be default, otherwise it will be the transaction id of the snapshot. If
+ /// the snapshot's entity revision is not equal to the revision parameter, the snapshot is invalid
+ /// and default will be returned.
///
+ ///
///
- ///
- ///
- public void EventsAndSnapshotForEntity(EntityId entityId, TIngester ingester)
- where TIngester : ISnapshotEventIngester;
+ ///
+ ///
+ ///
+ public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort revision,
+ out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes);
///
/// Sets the snapshot for the given entity id and transaction id.
diff --git a/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs b/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs
index 768fbb88..211fb34f 100644
--- a/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs
+++ b/src/NexusMods.EventSourcing.Abstractions/ISnapshotEventIngester.cs
@@ -5,10 +5,15 @@
///
public interface ISnapshotEventIngester : IEventIngester
{
+
///
- /// This method will be called for each attribute snapshotted, before the normal event ingestion is called
+ /// Ingests a snapshot of an entity, a false return value means the snapshot is invalid
+ /// and the entity should be rebuilt from scratch (by replaying all events)
///
- ///
- ///
- public void IngestSnapshotAttribute(string attributeName, IAttribute attribute);
+ ///
+ ///
+ ///
+ public bool IngestSnapshot(EntityDefinition definition,
+ (IAttribute Attribute, IAccumulator Accumulator)[] attributes);
+
}
diff --git a/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs b/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs
index 46962058..051597cb 100644
--- a/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs
+++ b/src/NexusMods.EventSourcing.Abstractions/TransactionId.cs
@@ -13,6 +13,16 @@ namespace NexusMods.EventSourcing.Abstractions;
public readonly partial struct TransactionId
{
+ ///
+ /// The lowest possible (and default constructed) transaction id.
+ ///
+ public static readonly TransactionId Min = new(0);
+
+ ///
+ /// The highest possible transaction id.
+ ///
+ public static readonly TransactionId Max = new(ulong.MaxValue);
+
///
/// Get the next transaction id.
///
diff --git a/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs b/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
index 6214273f..0ce2ba44 100644
--- a/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
+++ b/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using System.Buffers.Binary;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using Reloaded.Memory.Extensions;
@@ -15,11 +16,13 @@ public class InMemoryEventStore : IEventStore
private readonly IVariableSizeSerializer _stringSerializer;
private readonly PooledMemoryBufferWriter _writer;
private readonly ISerializationRegistry _serializationRegistry;
+ private readonly IVariableSizeSerializer _entityDefinitionSerializer;
public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry)
{
_serializer = serializer;
_stringSerializer = (serializationRegistry.GetSerializer(typeof(string)) as IVariableSizeSerializer)!;
+ _entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IVariableSizeSerializer)!;
_serializationRegistry = serializationRegistry;
_writer = new PooledMemoryBufferWriter();
}
@@ -48,7 +51,7 @@ public TransactionId Add(T entity) where T : IEvent
}
- public void EventsForEntity(EntityId entityId, TIngester ingester)
+ public void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
where TIngester : IEventIngester
{
if (!_events.TryGetValue(entityId, out var events))
@@ -56,56 +59,89 @@ public void EventsForEntity(EntityId entityId, TIngester ingester)
foreach (var data in events)
{
+ if (data.TxId < fromId) continue;
+ if (data.TxId > toId) break;
+
var @event = _serializer.Deserialize(data.Data)!;
if (!ingester.Ingest(data.TxId, @event)) break;
}
}
- public void EventsAndSnapshotForEntity(TransactionId asOf, EntityId entityId, TIngester ingester) where TIngester : ISnapshotEventIngester
+ public TransactionId EventsAndSnapshotForEntity(TransactionId asOf, EntityId entityId, ushort revision,
+ out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes)
{
if (!_snapshots.TryGetValue(entityId, out var snapshots))
- return;
+ {
+ loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
+ return default;
+ }
var startPoint = snapshots.LastOrDefault(s => s.Key <= asOf);
- if (startPoint != default)
+ if (startPoint.Value == default)
{
- var snapshot = startPoint.Value.AsSpanFast();
-
- while (snapshot.Length > 0)
- {
- var attributeName = _stringSerializer.Deserialize(snapshot, out var read);
- snapshot = snapshot.SliceFast(read);
- var accumulator = _serializationRegistry.GetAccumulator(attributeName);
- accumulator.ReadFrom(snapshot, _serializationRegistry, out read);
- snapshot = snapshot.Slice(read);
- ingester.IngestSnapshot(attributeName, accumulator);
- }
-
+ loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
+ return default;
}
- foreach (var (txId, data) in snapshots.)
+ var snapshot = (ReadOnlySpan)startPoint.Value.AsSpanFast();
+ var offset = _entityDefinitionSerializer.Deserialize(snapshot, out var entityDefinition);
+
+ if (entityDefinition.Revision != revision)
{
- var @event = _serializer.Deserialize(data)!;
- if (!ingester.Ingest(txId, @event)) break;
+ loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
+ return default;
}
- if (!_events.TryGetValue(entityId, out var events))
- return;
+ snapshot = snapshot.SliceFast(offset);
- foreach (var data in events)
+ var numberOfAttrs = BinaryPrimitives.ReadUInt16BigEndian(snapshot);
+ snapshot = snapshot.SliceFast(sizeof(ushort));
+
+ var results = GC.AllocateUninitializedArray<(IAttribute, IAccumulator)>(numberOfAttrs);
+
+ if (!EntityStructureRegistry.TryGetAttributes(entityDefinition.Type, out var attributes))
+ throw new Exception("Entity definition does not match the current structure registry.");
+
+ for (var i = 0; i < numberOfAttrs; i++)
{
- var @event = _serializer.Deserialize(data.Data)!;
- if (!ingester.Ingest(data.TxId, @event)) break;
+ var read = _stringSerializer.Deserialize(snapshot, out var attributeName);
+ snapshot = snapshot.SliceFast(read);
+
+ if (!attributes.TryGetValue(attributeName, out var attribute))
+ throw new Exception("Entity definition does not match the current structure registry.");
+
+ var accumulator = attribute.CreateAccumulator();
+
+ accumulator.ReadFrom(ref snapshot, _serializationRegistry);
+ snapshot = snapshot.SliceFast(read);
+
+ results[i] = (attribute, accumulator);
}
+
+ loadedAttributes = results;
+ return startPoint.Key;
}
- public void SetSnapshot(TransactionId txId, EntityId id, IEnumerable<(string AttributeName, IAccumulator Accumulator)> attributes)
+ public void SetSnapshot(TransactionId txId, EntityId id, IDictionary attributes)
{
_writer.Reset();
- foreach (var (attributeName, accumulator) in attributes)
+ // Snapshot starts with the type attribute value
+ var typeAccumulator = attributes[IEntity.TypeAttribute];
+ typeAccumulator.WriteTo(_writer, _serializationRegistry);
+
+ var sizeSpan = _writer.GetSpan(sizeof(ushort));
+ BinaryPrimitives.WriteUInt16BigEndian(sizeSpan, (ushort) attributes.Count);
+ _writer.Advance(sizeof(ushort));
+
+
+ // And then each attribute in any order
+ foreach (var (attribute, accumulator) in attributes)
{
+ if (attribute == IEntity.TypeAttribute) continue;
+
+ var attributeName = attribute.Name;
_stringSerializer.Serialize(attributeName, _writer);
accumulator.WriteTo(_writer, _serializationRegistry);
}