Skip to content

Commit

Permalink
Ability to use the Marten IHost.ResetAllMartenDataAsync(); *with* the…
Browse files Browse the repository at this point in the history
… wolverine managed projection assignments. Closes GH-1057
  • Loading branch information
jeremydmiller committed Oct 10, 2024
1 parent bb6697f commit 45b0053
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using JasperFx.Core;
using Marten;
using MartenTests.Distribution.Support;
using Shouldly;
using Wolverine;
Expand All @@ -13,6 +14,13 @@ public basic_agent_mechanics_single_tenant(ITestOutputHelper output) : base(outp
{
}

[Fact]
public async Task can_do_the_full_marten_reset_all_data_call()
{
// It's a smoke test to fix GH-1057
await theOriginalHost.ResetAllMartenDataAsync();
}

[Fact]
public async Task find_all_known_agents()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public async Task StopAsync(CancellationToken cancellationToken)
Status = AgentStatus.Stopped;
}

public async Task PauseAsync(CancellationToken cancellationToken)
{
var daemon = await _coordinator.DaemonForDatabase(_databaseName);
await daemon.StopAgentAsync(_shardName);
Status = AgentStatus.Paused;
}

public Uri Uri { get; }
public AgentStatus Status { get; set; } = AgentStatus.Stopped;
}
73 changes: 68 additions & 5 deletions src/Persistence/Wolverine.Marten/Distribution/ProjectionAgents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
using Marten.Events.Daemon;
using Marten.Events.Daemon.Coordination;
using Marten.Storage;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using AgentStatus = Wolverine.Runtime.Agents.AgentStatus;

namespace Wolverine.Marten.Distribution;

internal class ProjectionAgents : IStaticAgentFamily
internal class ProjectionAgents : IStaticAgentFamily, IProjectionCoordinator
{
public const string SchemeName = "event-subscriptions";

private readonly IDocumentStore _store;
private readonly IProjectionCoordinator _coordinator;
private ImHashMap<Uri, ProjectionAgent> _agents = ImHashMap<Uri, ProjectionAgent>.Empty;
private readonly object _agentLocker = new();

public static Uri UriFor(string databaseName, string shardName)
{
Expand All @@ -30,10 +35,10 @@ public static (string DatabaseName, string ProjectionName) Parse(Uri uri)

}

public ProjectionAgents(IDocumentStore store, IProjectionCoordinator coordinator)
public ProjectionAgents(IDocumentStore store, ILogger<ProjectionCoordinator> logger)
{
_store = store;
_coordinator = coordinator;
_coordinator = new ProjectionCoordinator(store, logger);
}

public ValueTask<IReadOnlyList<Uri>> AllKnownAgentsAsync()
Expand All @@ -51,10 +56,26 @@ private IEnumerable<Uri> allAgentUris(IReadOnlyList<IMartenDatabase> databases,
}
}
}

public ValueTask<IAgent> BuildAgentAsync(Uri uri, IWolverineRuntime wolverineRuntime)
{
return new ValueTask<IAgent>(new ProjectionAgent(uri, _coordinator));
if (_agents.TryFind(uri, out var agent))
{
return new ValueTask<IAgent>(agent);
}

lock (_agentLocker)
{
if (_agents.TryFind(uri, out agent))
{
return new ValueTask<IAgent>(agent);
}

agent = new ProjectionAgent(uri, _coordinator);
_agents = _agents.AddOrUpdate(uri, agent);
}

return new ValueTask<IAgent>(agent);
}

public async ValueTask<IReadOnlyList<Uri>> SupportedAgentsAsync()
Expand All @@ -72,4 +93,46 @@ public ValueTask EvaluateAssignmentsAsync(AssignmentGrid assignments)
}

public string Scheme => SchemeName;

Task IHostedService.StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

Task IHostedService.StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

IProjectionDaemon IProjectionCoordinator.DaemonForMainDatabase()
{
return _coordinator.DaemonForMainDatabase();
}

ValueTask<IProjectionDaemon> IProjectionCoordinator.DaemonForDatabase(string databaseIdentifier)
{
return _coordinator.DaemonForDatabase(databaseIdentifier);
}

async Task IProjectionCoordinator.PauseAsync()
{
var active = _agents.Enumerate().Select(x => x.Value)
.Where(x => x.Status == AgentStatus.Started).ToArray();

foreach (var agent in active)
{
await agent.PauseAsync(CancellationToken.None);
}
}

async Task IProjectionCoordinator.ResumeAsync()
{
var paused = _agents.Enumerate().Select(x => x.Value)
.Where(x => x.Status == AgentStatus.Paused).ToArray();

foreach (var agent in paused)
{
await agent.StartAsync(CancellationToken.None);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ public static MartenServiceCollectionExtensions.MartenConfigurationExpression In

if (integration.UseWolverineManagedEventSubscriptionDistribution)
{
expression.Services.AddSingleton<IAgentFamily, ProjectionAgents>();
expression.Services.AddSingleton<IProjectionCoordinator, ProjectionCoordinator>();
expression.Services.AddSingleton<ProjectionAgents>();
expression.Services.AddSingleton<IAgentFamily>(s => s.GetRequiredService<ProjectionAgents>());
expression.Services.AddSingleton<IProjectionCoordinator>(s => s.GetRequiredService<ProjectionAgents>());
}

expression.Services.AddType(typeof(IDatabaseSource), typeof(MartenMessageDatabaseDiscovery),
Expand Down
3 changes: 2 additions & 1 deletion src/Wolverine/Runtime/Agents/IAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ public interface IAgent : IHostedService
public enum AgentStatus
{
Started,
Stopped
Stopped,
Paused
}

0 comments on commit 45b0053

Please sign in to comment.