Skip to content

Commit

Permalink
debugging snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 11, 2024
1 parent 92ec2d9 commit d3fab7b
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/NexusMods.EventSourcing.Abstractions/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester)
/// <param name="revision"></param>
/// <param name="loadedAttributes"></param>
/// <returns></returns>
public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort revision,
public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId,
out IAccumulator loadedDefinition,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes);

/// <summary>
Expand Down
20 changes: 11 additions & 9 deletions src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,7 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, Tr
throw new NotImplementedException();
}

public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort revision,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes)
{
throw new NotImplementedException();
}

public void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
{
throw new NotImplementedException();
}

public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester) where TIngester : IEventIngester
{
Expand Down Expand Up @@ -115,4 +106,15 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester) wh
}
}
}

public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes)
{
throw new NotImplementedException();
}

public void SetSnapshot(TransactionId txId, EntityId id, IDictionary<IAttribute, IAccumulator> attributes)
{
throw new NotImplementedException();
}
}
24 changes: 23 additions & 1 deletion src/NexusMods.EventSourcing/EntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace NexusMods.EventSourcing;

public class EntityContext(IEventStore store) : IEntityContext
{
public const int MaxEventsBeforeSnapshotting = 250;

private TransactionId asOf = TransactionId.From(0);
private object _lock = new();

Expand Down Expand Up @@ -58,8 +60,28 @@ private Dictionary<IAttribute, IAccumulator> GetAccumulators(EntityId id)
private Dictionary<IAttribute, IAccumulator> LoadAccumulators(EntityId id)
{
var values = new Dictionary<IAttribute, IAccumulator>();

var snapshotTxId = store.GetSnapshot(asOf, id, out var loadedDefinition, out var loadedAttributes);

if (snapshotTxId != TransactionId.Min)
{
values.Add(IEntity.TypeAttribute, loadedDefinition);
foreach (var (attr, accumulator) in loadedAttributes)
values.Add(attr, accumulator);
}

var ingester = new EntityContextIngester(values, id);
store.EventsForEntity(id, ingester);
store.EventsForEntity(id, ingester, snapshotTxId, asOf);

if (ingester.ProcessedEvents > MaxEventsBeforeSnapshotting)
{
var snapshot = new Dictionary<IAttribute, IAccumulator>();
foreach (var (attr, accumulator) in values)
snapshot.Add(attr, accumulator);

store.SetSnapshot(asOf, id, snapshot);
}

return values;
}

Expand Down
5 changes: 4 additions & 1 deletion src/NexusMods.EventSourcing/EntityContextIngester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

namespace NexusMods.EventSourcing;

public struct EntityContextIngester(Dictionary<IAttribute, IAccumulator> values, EntityId id) : IEventContext, IEventIngester
public class EntityContextIngester(Dictionary<IAttribute, IAccumulator> values, EntityId id) : IEventContext, IEventIngester
{
public int ProcessedEvents = 0;

/// <inheritdoc />
public bool Ingest(TransactionId _, IEvent @event)
{
ProcessedEvents++;
@event.Apply(this);
return true;
}
Expand Down
24 changes: 17 additions & 7 deletions tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public class InMemoryEventStore<TSerializer> : IEventStore
private readonly IVariableSizeSerializer<string> _stringSerializer;
private readonly PooledMemoryBufferWriter _writer;
private readonly ISerializationRegistry _serializationRegistry;
private readonly IVariableSizeSerializer<EntityDefinition> _entityDefinitionSerializer;
private readonly IFixedSizeSerializer<EntityDefinition> _entityDefinitionSerializer;

public InMemoryEventStore(TSerializer serializer, ISerializationRegistry serializationRegistry)
{
_serializer = serializer;
_stringSerializer = (serializationRegistry.GetSerializer(typeof(string)) as IVariableSizeSerializer<string>)!;
_entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IVariableSizeSerializer<EntityDefinition>)!;
_entityDefinitionSerializer = (serializationRegistry.GetSerializer(typeof(EntityDefinition)) as IFixedSizeSerializer<EntityDefinition>)!;
_serializationRegistry = serializationRegistry;
_writer = new PooledMemoryBufferWriter();
}
Expand Down Expand Up @@ -67,33 +67,42 @@ public void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, Tr
}
}

public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort revision,
public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
out (IAttribute Attribute, IAccumulator Accumulator)[] loadedAttributes)
{
if (!_snapshots.TryGetValue(entityId, out var snapshots))
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
return default;
loadedDefinition = default!;
return TransactionId.Min;
}

var startPoint = snapshots.LastOrDefault(s => s.Key <= asOf);

if (startPoint.Value == default)
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
loadedDefinition = default!;
return default;
}

var snapshot = (ReadOnlySpan<byte>)startPoint.Value.AsSpanFast();
var offset = _entityDefinitionSerializer.Deserialize(snapshot, out var entityDefinition);
var entityDefinition = _entityDefinitionSerializer.Deserialize(snapshot.SliceFast(0, 18));

var typeAccumulator = IEntity.TypeAttribute.CreateAccumulator();
typeAccumulator.ReadFrom(ref snapshot, _serializationRegistry);


var appDefinition = EntityStructureRegistry.GetDefinitionByUUID(entityDefinition.UUID);

if (entityDefinition.Revision != revision)
if (entityDefinition.Revision != appDefinition.Revision)
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
loadedDefinition = default!;
return default;
}

snapshot = snapshot.SliceFast(offset);
snapshot = snapshot.SliceFast(18);

var numberOfAttrs = BinaryPrimitives.ReadUInt16BigEndian(snapshot);
snapshot = snapshot.SliceFast(sizeof(ushort));
Expand All @@ -120,6 +129,7 @@ public TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, ushort r
}

loadedAttributes = results;
loadedDefinition = typeAccumulator;
return startPoint.Key;
}

Expand Down
29 changes: 29 additions & 0 deletions tests/NexusMods.EventSourcing.Tests/AEventStoreTest.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.VisualStudio.TestPlatform.ObjectModel.DataCollection;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.TestModel.Events;
using NexusMods.EventSourcing.TestModel.Model;
Expand All @@ -11,8 +12,11 @@ public abstract class AEventStoreTest<T> where T : IEventStore
public AEventStoreTest(T store)
{
Store = store;
Context = new EntityContext(store);
}

public EntityContext Context { get; set; }

[Fact]
public void CanGetAndReturnEvents()
{
Expand All @@ -33,6 +37,31 @@ public void CanGetAndReturnEvents()
}
}

[Fact]
public void CanGetSnapshots()
{
var id = EntityId<Loadout>.NewId();

Context.Add(new CreateLoadout(id, "Test"));

for (var i = 0; i < 1024; i++)
{
Context.Add(new RenameLoadout(id, $"Test {i}"));
}

Context.EmptyCaches();
var loadout = Context.Get(id);

loadout.Should().NotBeNull();
loadout.Name.Should().Be("Test 1023");


var snapshotId = Store.GetSnapshot(TransactionId.Max, id.Value, out var definition, out var attributes);

snapshotId.Should().Be(TransactionId.From(1024));

}

private class EventAccumulator : IEventIngester
{
public List<IEvent> Events { get; } = new();
Expand Down

0 comments on commit d3fab7b

Please sign in to comment.