Skip to content

Commit

Permalink
More WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 11, 2024
1 parent 91d56d3 commit 9731664
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 37 deletions.
6 changes: 6 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/IAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ public interface IAttribute
/// The name of the attribute, needs to be unique in a given entity but not unique across entities.
/// </summary>
public string Name { get; }

/// <summary>
/// Create an abstract accumulator for the attribute
/// </summary>
/// <returns></returns>
public IAccumulator CreateAccumulator();
}

/// <summary>
Expand Down
18 changes: 11 additions & 7 deletions src/NexusMods.EventSourcing.Abstractions/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@ public interface IEventStore
/// <param name="ingester">The ingester to handle the events</param>
/// <param name="reverse">If true, plays the events in reverse</param>
/// <typeparam name="TIngester"></typeparam>
public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester)
public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
where TIngester : IEventIngester;

/// <summary>
/// Replays the most recent snapshot for the given entity id, if one exists, then
/// replays every event.
/// Gets the most recent snapshot for an entity that was taken before asOf. If no snapshot is found
/// the TransactionId will be default, otherwise it will be the transaction id of the snapshot. If
/// the snapshot's entity revision is not equal to the revision parameter, the snapshot is invalid
/// and default will be returned.
/// </summary>
/// <param name="asOf"></param>
/// <param name="entityId"></param>
/// <param name="ingester"></param>
/// <typeparam name="TIngester"></typeparam>
public void EventsAndSnapshotForEntity<TIngester>(EntityId entityId, TIngester ingester)
where TIngester : ISnapshotEventIngester;
/// <param name="revision"></param>
/// <param name="loadedAttributes"></param>
/// <returns></returns>
public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort revision,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes);

/// <summary>
/// Sets the snapshot for the given entity id and transaction id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
/// </summary>
public interface ISnapshotEventIngester : IEventIngester
{

/// <summary>
/// This method will be called for each attribute snapshotted, before the normal event ingestion is called
/// Ingests a snapshot of an entity, a false return value means the snapshot is invalid
/// and the entity should be rebuilt from scratch (by replaying all events)
/// </summary>
/// <param name="attributeName"></param>
/// <param name="attribute"></param>
public void IngestSnapshotAttribute(string attributeName, IAttribute attribute);
/// <param name="definition"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public bool IngestSnapshot(EntityDefinition definition,
(IAttribute Attribute, IAccumulator Accumulator)[] attributes);

}
10 changes: 10 additions & 0 deletions src/NexusMods.EventSourcing.Abstractions/TransactionId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ namespace NexusMods.EventSourcing.Abstractions;
public readonly partial struct TransactionId
{

/// <summary>
/// The lowest possible (and default constructed) transaction id.
/// </summary>
public static readonly TransactionId Min = new(0);

/// <summary>
/// The highest possible transaction id.
/// </summary>
public static readonly TransactionId Max = new(ulong.MaxValue);

/// <summary>
/// Get the next transaction id.
/// </summary>
Expand Down
88 changes: 62 additions & 26 deletions tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Buffers;
using System.Buffers.Binary;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using Reloaded.Memory.Extensions;
Expand All @@ -15,11 +16,13 @@ public class InMemoryEventStore<TSerializer> : IEventStore
private readonly IVariableSizeSerializer<string> _stringSerializer;
private readonly PooledMemoryBufferWriter _writer;
private readonly ISerializationRegistry _serializationRegistry;
private readonly IVariableSizeSerializer<EntityDefinition> _entityDefinitionSerializer;

public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry)
{
_serializer = serializer;
_stringSerializer = (serializationRegistry.GetSerializer(typeof(string)) as IVariableSizeSerializer<string>)!;
_entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IVariableSizeSerializer<EntityDefinition>)!;
_serializationRegistry = serializationRegistry;
_writer = new PooledMemoryBufferWriter();
}
Expand Down Expand Up @@ -48,64 +51,97 @@ public TransactionId Add<T>(T entity) where T : IEvent
}


public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester)
public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
where TIngester : IEventIngester
{
if (!_events.TryGetValue(entityId, out var events))
return;

foreach (var data in events)
{
if (data.TxId < fromId) continue;
if (data.TxId > toId) break;

var @event = _serializer.Deserialize(data.Data)!;
if (!ingester.Ingest(data.TxId, @event)) break;
}
}

public void EventsAndSnapshotForEntity<TIngester>(TransactionId asOf, EntityId entityId, TIngester ingester) where TIngester : ISnapshotEventIngester
public TransactionId EventsAndSnapshotForEntity(TransactionId asOf, EntityId entityId, ushort revision,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes)
{
if (!_snapshots.TryGetValue(entityId, out var snapshots))
return;
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
return default;
}

var startPoint = snapshots.LastOrDefault(s => s.Key <= asOf);

if (startPoint != default)
if (startPoint.Value == default)
{
var snapshot = startPoint.Value.AsSpanFast();

while (snapshot.Length > 0)
{
var attributeName = _stringSerializer.Deserialize(snapshot, out var read);
snapshot = snapshot.SliceFast(read);
var accumulator = _serializationRegistry.GetAccumulator(attributeName);
accumulator.ReadFrom(snapshot, _serializationRegistry, out read);
snapshot = snapshot.Slice(read);
ingester.IngestSnapshot(attributeName, accumulator);
}

loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
return default;
}

foreach (var (txId, data) in snapshots.)
var snapshot = (ReadOnlySpan<byte>)startPoint.Value.AsSpanFast();
var offset = _entityDefinitionSerializer.Deserialize(snapshot, out var entityDefinition);

if (entityDefinition.Revision != revision)
{
var @event = _serializer.Deserialize(data)!;
if (!ingester.Ingest(txId, @event)) break;
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
return default;
}

if (!_events.TryGetValue(entityId, out var events))
return;
snapshot = snapshot.SliceFast(offset);

foreach (var data in events)
var numberOfAttrs = BinaryPrimitives.ReadUInt16BigEndian(snapshot);
snapshot = snapshot.SliceFast(sizeof(ushort));

var results = GC.AllocateUninitializedArray<(IAttribute, IAccumulator)>(numberOfAttrs);

if (!EntityStructureRegistry.TryGetAttributes(entityDefinition.Type, out var attributes))
throw new Exception("Entity definition does not match the current structure registry.");

for (var i = 0; i < numberOfAttrs; i++)
{
var @event = _serializer.Deserialize(data.Data)!;
if (!ingester.Ingest(data.TxId, @event)) break;
var read = _stringSerializer.Deserialize(snapshot, out var attributeName);
snapshot = snapshot.SliceFast(read);

if (!attributes.TryGetValue(attributeName, out var attribute))
throw new Exception("Entity definition does not match the current structure registry.");

var accumulator = attribute.CreateAccumulator();

accumulator.ReadFrom(ref snapshot, _serializationRegistry);
snapshot = snapshot.SliceFast(read);

results[i] = (attribute, accumulator);
}

loadedAttributes = results;
return startPoint.Key;
}

public void SetSnapshot(TransactionId txId, EntityId id, IEnumerable<(string AttributeName, IAccumulator Accumulator)> attributes)
public void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
{
_writer.Reset();

foreach (var (attributeName, accumulator) in attributes)
// Snapshot starts with the type attribute value
var typeAccumulator = attributes[IEntity.TypeAttribute];
typeAccumulator.WriteTo(_writer, _serializationRegistry);

var sizeSpan = _writer.GetSpan(sizeof(ushort));
BinaryPrimitives.WriteUInt16BigEndian(sizeSpan, (ushort) attributes.Count);
_writer.Advance(sizeof(ushort));


// And then each attribute in any order
foreach (var (attribute, accumulator) in attributes)
{
if (attribute == IEntity.TypeAttribute) continue;

var attributeName = attribute.Name;
_stringSerializer.Serialize(attributeName, _writer);
accumulator.WriteTo(_writer, _serializationRegistry);
}
Expand Down

0 comments on commit 9731664

Please sign in to comment.