Skip to content

Commit

Permalink
Persistence.cs use callback with index
Browse files Browse the repository at this point in the history
  • Loading branch information
codrut51 committed Oct 8, 2024
1 parent bf49315 commit 8058d48
Show file tree
Hide file tree
Showing 34 changed files with 39 additions and 84 deletions.
3 changes: 0 additions & 3 deletions benchmarks/GossipBenchmark/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Context;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using StackExchange.Redis;
using static Proto.CancellationTokens;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;

Expand Down
3 changes: 0 additions & 3 deletions benchmarks/GossipBenchmark/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Context;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using StackExchange.Redis;
using static System.Threading.Tasks.Task;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;

Expand Down
2 changes: 0 additions & 2 deletions benchmarks/GossipDecoder/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// See https://aka.ms/new-console-template for more information

using System.Runtime.InteropServices.JavaScript;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Proto.Cluster;

Console.WriteLine("Hello, World!");
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using System.IO.Compression;
using Grpc.Net.Client;
using Grpc.Net.Compression;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down
6 changes: 1 addition & 5 deletions benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
using System.IO.Compression;
using Grpc.Net.Client;
using Grpc.Net.Compression;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.PartitionActivator;
using Proto.DependencyInjection;
using Proto.Remote;
using Proto.Remote.GrpcNet;
Expand Down
2 changes: 0 additions & 2 deletions examples/ClusterK8sGrains/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
using static Proto.CancellationTokens;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;
using System.Runtime.Loader;
using Microsoft.Extensions.Configuration;
using Extensions = Proto.Remote.GrpcNet.Extensions;

// Hook SIGTERM to a cancel token to know when k8s is shutting us down
// hostBuilder should be used in production
Expand Down
1 change: 0 additions & 1 deletion examples/ClusterK8sGrains/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading.Tasks;
using System.Threading;
using ClusterHelloWorld.Messages;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
Expand Down
4 changes: 2 additions & 2 deletions examples/Patterns/Saga/InMemoryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ public class InMemoryProvider : IProvider
public Task<(object Snapshot, long Index)> GetSnapshotAsync(string actorName) =>
Task.FromResult(((object)default(Snapshot), 0L));

public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
if (Events.TryGetValue(actorName, out var events))
{
foreach (var e in events.Where(e => e.Key >= indexStart && e.Key <= indexEnd))
{
callback(e.Value);
callback(e.Value, e.Key);
}
}

Expand Down
5 changes: 0 additions & 5 deletions examples/Proto.Cluster.Dashboard.Host/Program.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
using Google.Protobuf.WellKnownTypes;
using MudBlazor.Services;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Dashboard;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using Proto.Remote.HealthChecks;

var builder = WebApplication.CreateBuilder(args);
Expand Down
1 change: 0 additions & 1 deletion examples/remotechannels/Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading.Tasks;
using Common;
using Proto;
using Proto.Remote;
Expand Down
4 changes: 0 additions & 4 deletions src/Proto.Actor/Configuration/ActorSystemConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Proto.Context;
using Proto.Extensions;
using System.Text.Json;
using System.Text.Json.Serialization;

// ReSharper disable once CheckNamespace
Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Actor/Extensions/IActorSystemExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Proto.Diagnostics;

namespace Proto.Extensions;
Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Actor/Process/Process.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

// ReSharper disable once CheckNamespace

using System;
using System.Threading.Tasks;
using Proto.Mailbox;

namespace Proto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down
1 change: 0 additions & 1 deletion src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down
1 change: 0 additions & 1 deletion src/Proto.Cluster/Seed/FixedServerSeedNodeDiscovery.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using Proto.Cluster.SingleNode;

namespace Proto.Cluster.Seed;

Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Immutable;

namespace Proto.Cluster.Seed;

public record SeedNodeClusterProviderOptions(ISeedNodeDiscovery Discovery);
2 changes: 0 additions & 2 deletions src/Proto.Cluster/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Proto.Cluster;
using Proto.Cluster.Identity;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.DependencyInjection;
using Proto.Remote.GrpcNet;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Persistence.Couchbase/CouchbaseProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public CouchbaseProvider(IBucket bucket)
_bucket = bucket;
}

public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var query = GenerateGetEventsQuery(actorName, indexStart, indexEnd);

Expand Down Expand Up @@ -105,7 +105,7 @@ private string GenerateGetEventsQuery(string actorName, long indexStart, long in
$"AND b.eventIndex <= {indexEnd} " +
"ORDER BY b.eventIndex ASC";

private async Task<long> ExecuteGetEventsQueryAsync(string query, Action<object> callback)
private async Task<long> ExecuteGetEventsQueryAsync(string query, Action<object, long> callback)
{
var req = QueryRequest.Create(query);

Expand All @@ -119,7 +119,7 @@ private async Task<long> ExecuteGetEventsQueryAsync(string query, Action<object>

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.EventIndex);
}

return events.LastOrDefault()?.EventIndex ?? -1;
Expand Down
11 changes: 6 additions & 5 deletions src/Proto.Persistence.DynamoDB/DynamoDBProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public DynamoDBProvider(IAmazonDynamoDB dynamoDBClient, DynamoDBProviderOptions
_snapshotsTable = Table.LoadTable(dynamoDBClient, options.SnapshotsTableName, DynamoDBEntryConversion.V2);
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var config = new QueryOperationConfig { ConsistentRead = true };
config.Filter.AddCondition(_options.EventsTableHashKey, QueryOperator.Equal, actorName);
Expand All @@ -52,8 +52,9 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var doc in results)
{
callback(GetData(doc));
lastIndex++;
var (@event, index) = GetData(doc);
callback(@event, index);
lastIndex=index;
}

if (query.IsDone)
Expand All @@ -64,14 +65,14 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

return lastIndex;

object GetData(Document doc)
(object @event, long index) GetData(Document doc)
{
var dataTypeE = doc.GetValueOrThrow(_options.EventsTableDataTypeKey);
var dataE = doc.GetValueOrThrow(_options.EventsTableDataKey);

var dataType = Type.GetType(dataTypeE.AsString());

return _dynamoDBContext.FromDocumentDynamic(dataE.AsDocument(), dataType);
return (_dynamoDBContext.FromDocumentDynamic(dataE.AsDocument(), dataType), dataE.AsLong());
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.Marten/MartenProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public MartenProvider(IDocumentStore store)
_store = store;
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var session = _store.IdentitySession();
await using var _ = session.ConfigureAwait(false);
Expand All @@ -27,7 +27,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.Index);
}

return events.LastOrDefault()?.Index ?? -1;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.MongoDB/MongoDBProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public MongoDBProvider(IMongoDatabase mongoDB)

private IMongoCollection<Snapshot> SnapshotCollection => _mongoDB.GetCollection<Snapshot>("snapshots");

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var sort = Builders<Event>.Sort.Ascending("EventIndex");

Expand All @@ -36,7 +36,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.EventIndex);
}

return events.Any() ? events.Last().EventIndex : -1;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.RavenDB/RavenDBProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public RavenDBProvider(IDocumentStore store)
SetupIndexes();
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
using var session = _store.OpenAsyncSession();

Expand All @@ -30,7 +30,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.Index);
}

return events.LastOrDefault()?.Index ?? -1;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.SqlServer/SqlServerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ await ExecuteNonQueryAsync(
CreateParameter("SnapshotIndex", BigInt, inclusiveToIndex)
);

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
await using var connection = new SqlConnection(_connectionString);

Expand All @@ -107,7 +107,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i
{
lastIndex = (long)eventReader["EventIndex"];

callback(JsonConvert.DeserializeObject<object>(eventReader["EventData"].ToString(), AutoTypeSettings));
callback(JsonConvert.DeserializeObject<object>(eventReader["EventData"].ToString(), AutoTypeSettings), lastIndex);
}

return lastIndex;
Expand Down
7 changes: 4 additions & 3 deletions src/Proto.Persistence.Sqlite/SqliteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task DeleteSnapshotsAsync(string actorName, long inclusiveToIndex)
await deleteCommand.ExecuteNonQueryAsync().ConfigureAwait(false);
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
using var connection = new SqliteConnection(ConnectionString);

Expand All @@ -92,9 +92,10 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

while (await reader.ReadAsync().ConfigureAwait(false))
{
indexes.Add(Convert.ToInt64(reader["EventIndex"]));
var index = Convert.ToInt64(reader["EventIndex"]);
indexes.Add(index);

callback(JsonConvert.DeserializeObject<object>(reader["EventData"].ToString(), AutoTypeSettings));
callback(JsonConvert.DeserializeObject<object>(reader["EventData"].ToString(), AutoTypeSettings), index);
}

return indexes.Any() ? indexes.LastOrDefault() : -1;
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Persistence/IProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface IEventStore
/// <param name="indexEnd">Index of the last event to get (inclusive)</param>
/// <param name="callback">A callback which should be called for each read event, in the order the events are stored</param>
/// <returns>Index of the last read event or -1 if none</returns>
Task<long> GetEventsAsync(string actorId, long indexStart, long indexEnd, Action<object> callback);
Task<long> GetEventsAsync(string actorId, long indexStart, long indexEnd, Action<object, long> callback);

/// <summary>
/// Writes an event to event stream of particular actor
Expand Down
14 changes: 7 additions & 7 deletions src/Proto.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ await _eventStore.GetEventsAsync(
_actorId,
fromEventIndex,
long.MaxValue,
@event =>
(@event, index) =>
{
Index++;
_applyEvent?.Invoke(new RecoverEvent(@event, Index));
Index = index;
_applyEvent?.Invoke(new RecoverEvent(@event, index));
}
).ConfigureAwait(false);
}
Expand All @@ -260,10 +260,10 @@ await _eventStore.GetEventsAsync(
_actorId,
fromIndex,
toIndex,
@event =>
(@event, index) =>
{
_applyEvent?.Invoke(new ReplayEvent(@event, Index));
Index++;
_applyEvent?.Invoke(new ReplayEvent(@event, index));
Index=index;
}
).ConfigureAwait(false);
}
Expand Down Expand Up @@ -339,7 +339,7 @@ private class ManualSnapshots : ISnapshotStrategy
private class NoEventStore : IEventStore
{
public Task<long>
GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback) =>
GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback) =>
Task.FromResult(-1L);

public Task<long> PersistEventAsync(string actorName, long index, object @event) => Task.FromResult(0L);
Expand Down
Loading

0 comments on commit 8058d48

Please sign in to comment.