Skip to content

Commit

Permalink
Fix snapshotting in RocksDBEventStore.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 13, 2024
1 parent be9f369 commit bc14d67
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
21 changes: 11 additions & 10 deletions src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public override void EventsForEntity<TIngester>(EntityId entityId, TIngester ing
iterator.SeekToFirst();
while (iterator.Valid())
{
var key = iterator.GetKeySpan();
var key = iterator.GetKeySpan().SliceFast(16);
var txId = TransactionId.From(key);
var evt = _db.Get(key[16..], _deserializer, _eventsColumn);
var evt = _db.Get(key, _deserializer, _eventsColumn);
if (!ingester.Ingest(txId, evt)) break;
iterator.Next();
}
Expand All @@ -113,7 +113,7 @@ public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId,
BinaryPrimitives.WriteUInt64BigEndian(startKey.SliceFast(16), 0);
Span<byte> endKey = stackalloc byte[24];
entityId.TryWriteBytes(endKey);
BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), asOf.Value + 1);
BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), asOf.Value);

var options = new ReadOptions();
unsafe
Expand All @@ -122,25 +122,26 @@ public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId,
{
fixed (byte* endKeyPtr = endKey)
{
options.SetIterateUpperBound(endKeyPtr, 24);
options.SetIterateLowerBound(startKeyPtr, 24);
//options.SetIterateUpperBound(endKeyPtr, 24);
//options.SetIterateLowerBound(startKeyPtr, 24);
using var iterator = _db.NewIterator(_snapshotColumn, options);

iterator.SeekToLast();
// Iterators are top end exclusive, so we need to seek to the last item before the asOf
iterator.SeekForPrev(endKeyPtr, 24);
while (iterator.Valid())
{
var key = iterator.GetKeySpan();
var txId = TransactionId.From(key);
var evt = _db.Get(key[16..], _eventsColumn);
var txId = TransactionId.From(key.SliceFast(16));
var snapshotData = iterator.GetValueSpan();

if (evt == null)
if (snapshotData.Length == 0)
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
loadedDefinition = default!;
return TransactionId.Min;
}

if (!DeserializeSnapshot(out var foundDefinition, out var foundAttributes, evt))
if (!DeserializeSnapshot(out var foundDefinition, out var foundAttributes, snapshotData))
{
loadedAttributes = Array.Empty<(IAttribute, IAccumulator)>();
loadedDefinition = default!;
Expand Down
2 changes: 1 addition & 1 deletion src/NexusMods.EventSourcing/EntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private Dictionary<IAttribute, IAccumulator> LoadAccumulators(EntityId id)
foreach (var (attr, accumulator) in values)
snapshot.Add(attr, accumulator);

store.SetSnapshot(asOf, id, snapshot);
store.SetSnapshot(ingester.LastTransactionId, id, snapshot);
}

return values;
Expand Down
11 changes: 10 additions & 1 deletion src/NexusMods.EventSourcing/EntityContextIngester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@ namespace NexusMods.EventSourcing;

public class EntityContextIngester(Dictionary<IAttribute, IAccumulator> values, EntityId id) : IEventContext, IEventIngester
{
/// <summary>
/// The number of events processed by this ingester.
/// </summary>
public int ProcessedEvents = 0;

/// <summary>
/// The last transaction id processed by this ingester.
/// </summary>
public TransactionId LastTransactionId = TransactionId.Min;

/// <inheritdoc />
public bool Ingest(TransactionId _, IEvent @event)
public bool Ingest(TransactionId txId, IEvent @event)
{
LastTransactionId = txId;
ProcessedEvents++;
@event.Apply(this);
return true;
Expand Down

0 comments on commit bc14d67

Please sign in to comment.