diff --git a/src/Proto.Cluster/Cluster.cs b/src/Proto.Cluster/Cluster.cs index 88e52ba539..080a91224a 100644 --- a/src/Proto.Cluster/Cluster.cs +++ b/src/Proto.Cluster/Cluster.cs @@ -5,7 +5,6 @@ // ----------------------------------------------------------------------- using System; using System.Collections.Generic; -using System.Collections.Immutable; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -15,6 +14,7 @@ using Proto.Cluster.Identity; using Proto.Cluster.Metrics; using Proto.Cluster.PubSub; +using Proto.Cluster.Singleton; using Proto.Extensions; using Proto.Remote; @@ -43,11 +43,14 @@ public Cluster(ActorSystem system, ClusterConfig config) Gossip = new Gossiper(this); PidCache = new PidCache(); PubSub = new PubSubManager(this); + Singleton = new SingletonManager(this); SubscribeToTopologyEvents(); } public PubSubManager PubSub { get; } + + public SingletonManager Singleton { get; } public static ILogger Logger { get; } = Log.CreateLogger(); diff --git a/src/Proto.Cluster/Gossip/GossipActor.cs b/src/Proto.Cluster/Gossip/GossipActor.cs index 11182408f2..e5060ae43e 100644 --- a/src/Proto.Cluster/Gossip/GossipActor.cs +++ b/src/Proto.Cluster/Gossip/GossipActor.cs @@ -66,6 +66,8 @@ private Task OnGossipRequest(IContext context, GossipRequest gossipRequest) { context.System.EventStream.Publish(update); } + + context.System.EventStream.Publish(new Gossip(newState)); _state = newState; CheckConsensus(context); diff --git a/src/Proto.Cluster/Gossip/Gossiper.cs b/src/Proto.Cluster/Gossip/Gossiper.cs index 788df10263..d58dbc81c1 100644 --- a/src/Proto.Cluster/Gossip/Gossiper.cs +++ b/src/Proto.Cluster/Gossip/Gossiper.cs @@ -13,16 +13,6 @@ namespace Proto.Cluster.Gossip { - public record GossipUpdate(string MemberId, string Key, Any Value, long SequenceNumber); - public record GetGossipStateRequest(string Key); - - public record GetGossipStateResponse(ImmutableDictionary State); - - public record SetGossipStateKey(string Key, IMessage Value); - - public record SendGossipStateRequest; - public record SendGossipStateResponse; - public class Gossiper { public const string GossipActorName = "gossip"; diff --git a/src/Proto.Cluster/Gossip/Messages.cs b/src/Proto.Cluster/Gossip/Messages.cs new file mode 100644 index 0000000000..6589480ae4 --- /dev/null +++ b/src/Proto.Cluster/Gossip/Messages.cs @@ -0,0 +1,24 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2021 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Collections.Immutable; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Proto.Cluster.Gossip +{ + public record GossipUpdate(string MemberId, string Key, Any Value, long SequenceNumber); + + public record Gossip(GossipState State); + public record GetGossipStateRequest(string Key); + + public record GetGossipStateResponse(ImmutableDictionary State); + + public record SetGossipStateKey(string Key, IMessage Value); + + public record SendGossipStateRequest; + public record SendGossipStateResponse; +} \ No newline at end of file diff --git a/src/Proto.Cluster/Singleton/SingletonManager.cs b/src/Proto.Cluster/Singleton/SingletonManager.cs new file mode 100644 index 0000000000..496bb12fc9 --- /dev/null +++ b/src/Proto.Cluster/Singleton/SingletonManager.cs @@ -0,0 +1,65 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2021 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System.Collections.Concurrent; +using System.Collections.Immutable; +using System.Linq; +using Grpc.Core; +using Proto.Cluster.Gossip; +using Proto.Utils; + +namespace Proto.Cluster.Singleton +{ + public class SingletonManager + { + private readonly Cluster _cluster; + private ImmutableHashSet _tracked = ImmutableHashSet.Empty; + private readonly object _lock = new(); + + public SingletonManager(Cluster cluster) + { + _cluster = cluster; + + cluster.System.EventStream.Subscribe(g => { + var tracked = _tracked; + var existingKeys = ( + from member in g.State.Members + from entry in member.Value.Values + where entry.Key.StartsWith("Singleton-") + select entry.Key) + //.Distinct() + .ToImmutableHashSet(); + + var missing = tracked.Except(existingKeys); + + //iterate over all missing actors + //send a touch message to them to activate + foreach (var m in missing) + { + _ = cluster.RequestAsync("", "", new Touch(), CancellationTokens.FromSeconds(5)); + } + } + ); + } + + public void Track(ClusterIdentity identity) + { + lock (_lock) + { + _tracked = _tracked.Add(Key(identity)); + } + } + + public void Untrack(ClusterIdentity identity) + { + lock (_lock) + { + _tracked = _tracked.Remove(Key(identity)); + } + } + + private static string Key(ClusterIdentity identity) => "Singleton-" + identity.ToDiagnosticString(); + } +} \ No newline at end of file