diff --git a/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs b/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs index 9c3d7793..f13b0937 100644 --- a/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs +++ b/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs @@ -94,9 +94,9 @@ public override void EventsForEntity(EntityId entityId, TIngester ing iterator.SeekToFirst(); while (iterator.Valid()) { - var key = iterator.GetKeySpan(); + var key = iterator.GetKeySpan().SliceFast(16); var txId = TransactionId.From(key); - var evt = _db.Get(key[16..], _deserializer, _eventsColumn); + var evt = _db.Get(key, _deserializer, _eventsColumn); if (!ingester.Ingest(txId, evt)) break; iterator.Next(); } @@ -113,7 +113,7 @@ public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, BinaryPrimitives.WriteUInt64BigEndian(startKey.SliceFast(16), 0); Span endKey = stackalloc byte[24]; entityId.TryWriteBytes(endKey); - BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), asOf.Value + 1); + BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), asOf.Value); var options = new ReadOptions(); unsafe @@ -122,25 +122,26 @@ public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, { fixed (byte* endKeyPtr = endKey) { - options.SetIterateUpperBound(endKeyPtr, 24); - options.SetIterateLowerBound(startKeyPtr, 24); + //options.SetIterateUpperBound(endKeyPtr, 24); + //options.SetIterateLowerBound(startKeyPtr, 24); using var iterator = _db.NewIterator(_snapshotColumn, options); - iterator.SeekToLast(); + // Iterators are top end exclusive, so we need to seek to the last item before the asOf + iterator.SeekForPrev(endKeyPtr, 24); while (iterator.Valid()) { var key = iterator.GetKeySpan(); - var txId = TransactionId.From(key); - var evt = _db.Get(key[16..], _eventsColumn); + var txId = TransactionId.From(key.SliceFast(16)); + var snapshotData = iterator.GetValueSpan(); - if (evt == null) + if (snapshotData.Length == 0) { loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>(); loadedDefinition = default!; return TransactionId.Min; } - if (!DeserializeSnapshot(out var foundDefinition, out var foundAttributes, evt)) + if (!DeserializeSnapshot(out var foundDefinition, out var foundAttributes, snapshotData)) { loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>(); loadedDefinition = default!; diff --git a/src/NexusMods.EventSourcing/EntityContext.cs b/src/NexusMods.EventSourcing/EntityContext.cs index 6bc5c417..cc46fa99 100644 --- a/src/NexusMods.EventSourcing/EntityContext.cs +++ b/src/NexusMods.EventSourcing/EntityContext.cs @@ -79,7 +79,7 @@ private Dictionary LoadAccumulators(EntityId id) foreach (var (attr, accumulator) in values) snapshot.Add(attr, accumulator); - store.SetSnapshot(asOf, id, snapshot); + store.SetSnapshot(ingester.LastTransactionId, id, snapshot); } return values; diff --git a/src/NexusMods.EventSourcing/EntityContextIngester.cs b/src/NexusMods.EventSourcing/EntityContextIngester.cs index 67a5bb67..b2ce94ad 100644 --- a/src/NexusMods.EventSourcing/EntityContextIngester.cs +++ b/src/NexusMods.EventSourcing/EntityContextIngester.cs @@ -7,11 +7,20 @@ namespace NexusMods.EventSourcing; public class EntityContextIngester(Dictionary values, EntityId id) : IEventContext, IEventIngester { + /// + /// The number of events processed by this ingester. + /// public int ProcessedEvents = 0; + /// + /// The last transaction id processed by this ingester. + /// + public TransactionId LastTransactionId = TransactionId.Min; + /// - public bool Ingest(TransactionId _, IEvent @event) + public bool Ingest(TransactionId txId, IEvent @event) { + LastTransactionId = txId; ProcessedEvents++; @event.Apply(this); return true;