From 03323f0665d1922b7a7c2a0e132434fff01e0f1b Mon Sep 17 00:00:00 2001 From: Tyrone Groves Date: Sat, 15 Feb 2025 10:24:34 -0600 Subject: [PATCH 1/2] Implemented etcd IClusterProvider --- ProtoActor.sln | 15 ++ src/Proto.Cluster.Etcd/EtcdProvider.cs | 146 ++++++++++++++++++ src/Proto.Cluster.Etcd/EtcdProviderConfig.cs | 40 +++++ .../Proto.Cluster.Etcd.csproj | 19 +++ 4 files changed, 220 insertions(+) create mode 100644 src/Proto.Cluster.Etcd/EtcdProvider.cs create mode 100644 src/Proto.Cluster.Etcd/EtcdProviderConfig.cs create mode 100644 src/Proto.Cluster.Etcd/Proto.Cluster.Etcd.csproj diff --git a/ProtoActor.sln b/ProtoActor.sln index cf661607a2..abadc8d911 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -309,6 +309,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.Etcd", "src\Proto.Cluster.Etcd\Proto.Cluster.Etcd.csproj", "{D875AADB-B827-4A61-9B33-8B71E60E9539}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1495,6 +1497,18 @@ Global {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU {B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x64.ActiveCfg = Debug|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x64.Build.0 = Debug|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x86.ActiveCfg = Debug|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x86.Build.0 = Debug|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|Any CPU.Build.0 = Release|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x64.ActiveCfg = Release|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x64.Build.0 = Release|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x86.ActiveCfg = Release|Any CPU + {D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1631,6 +1645,7 @@ Global {CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C} {087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183} {B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F} + {D875AADB-B827-4A61-9B33-8B71E60E9539} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C} diff --git a/src/Proto.Cluster.Etcd/EtcdProvider.cs b/src/Proto.Cluster.Etcd/EtcdProvider.cs new file mode 100644 index 0000000000..7fcc3f4471 --- /dev/null +++ b/src/Proto.Cluster.Etcd/EtcdProvider.cs @@ -0,0 +1,146 @@ +using System.Text.Json; +using dotnet_etcd; +using dotnet_etcd.interfaces; +using Etcdserverpb; +using Google.Protobuf; +using Microsoft.Extensions.Logging; +using V3Electionpb; + +namespace Proto.Cluster.Etcd; + +public class EtcdProvider : IClusterProvider +{ +#pragma warning disable CS0618 // Type or member is obsolete + private static readonly ILogger Logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete + private readonly IEtcdClient _client; + private readonly EtcdProviderConfig _config; + private Cluster _cluster = null!; + private CancellationTokenSource _stoppingCts = new(); + + private string MemberKey => $"{_config.MembersKeyPrefix}/{_cluster.System.Id}"; + + public EtcdProvider(IEtcdClient client, EtcdProviderConfig config) + { + _client = client; + _config = config; + } + + public async Task StartMemberAsync(Cluster c) + { + Logger.LogInformation("Starting etcd provider"); + + _cluster = c; + _stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(_cluster.System.Shutdown); + + var leaseId = await StartLeaseAndKeepAliveTaskAsync(); + await RegisterSystemAsMember(leaseId); + StartWatchForMemberChangesTask(); + + if (_config.MemberElectedHandlers.Count > 0) + StartCampaignForLeaderTask(leaseId); + + Logger.LogInformation("Started etcd provider"); + } + + public Task StartClientAsync(Cluster c) + { + Logger.LogInformation("Starting etcd provider as client"); + + _cluster = c; + _stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(_cluster.System.Shutdown); + StartWatchForMemberChangesTask(); + + Logger.LogInformation("Started etcd provider as client"); + + return Task.CompletedTask; + } + + public async Task ShutdownAsync(bool graceful) + { + Logger.LogInformation("Shutting down etcd provider"); + + if (graceful) + { + await _client.DeleteAsync(MemberKey); + } + + _stoppingCts.Cancel(); + + Logger.LogInformation("Shut down etcd provider"); + } + + private async Task StartLeaseAndKeepAliveTaskAsync() + { + var leaseGrantResponse = await _client.LeaseGrantAsync(new LeaseGrantRequest { TTL = _config.LeaseTtl }, + cancellationToken: _stoppingCts.Token); + + _ = SafeTask.Run(async () => + { + await _client.LeaseKeepAlive(leaseGrantResponse.ID, _stoppingCts.Token).ConfigureAwait(false); + Logger.LogInformation("Lease keep alive stopped"); + }, _stoppingCts.Token); + + return leaseGrantResponse.ID; + } + + private async Task RegisterSystemAsMember(long leaseId) + { + var (host, port) = _cluster.System.GetAddress(); + var kinds = _cluster.GetClusterKinds(); + var memberId = _cluster.System.Id; + + var request = new PutRequest + { + Lease = leaseId, + Key = ByteString.CopyFromUtf8(MemberKey), + Value = ByteString.CopyFromUtf8(JsonSerializer.Serialize( + new { id = memberId, host, port, kinds }, _config.JsonSerializerOptions)) + }; + + await _client.PutAsync(request, cancellationToken: _cluster.System.Shutdown); + + Logger.LogDebug("Registered local system as member"); + } + + private void StartWatchForMemberChangesTask() + { + _ = SafeTask.Run(async () => + { + await _client.WatchRangeAsync(_config.MembersKeyPrefix, (WatchEvent[] _) => + { + Logger.LogDebug("Cluster membership changed"); + + var rangeResponse = _client.GetRange(_config.MembersKeyPrefix, cancellationToken: _stoppingCts.Token); + var members = rangeResponse.Kvs + .Select(kv => JsonSerializer.Deserialize(kv.Value.ToStringUtf8(), _config.JsonSerializerOptions)) + .Where(m => m != null) + .Select(m => m!).ToList(); + + _cluster.MemberList.UpdateClusterTopology(members); + }, cancellationToken: _stoppingCts.Token); + + Logger.LogDebug("Stopped watching for member changes"); + }, _stoppingCts.Token); + } + + private void StartCampaignForLeaderTask(long leaseId) + { + Logger.LogInformation("Start campaigning for leader"); + + _ = SafeTask.Run(async () => + { + var campaignRequest = new CampaignRequest + { + Name = ByteString.CopyFromUtf8(_config.CampaignKey), + Value = ByteString.CopyFromUtf8(_cluster.System.Id), + Lease = leaseId + }; + + var campaignResponse = await _client.CampaignAsync(campaignRequest, cancellationToken: _stoppingCts.Token); + Logger.LogInformation("Elected as leader with leader key {LeaderKey}", campaignResponse.Leader.Key.ToStringUtf8()); + + _config.MemberElectedHandlers.ForEach(h => h(_cluster)); + }, _stoppingCts.Token); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs b/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs new file mode 100644 index 0000000000..facd35ae95 --- /dev/null +++ b/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs @@ -0,0 +1,40 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Proto.Cluster.Etcd; + +public record EtcdProviderConfig +{ + public int LeaseTtl { get; init; } = 5; + + public string MembersKeyPrefix { get; init; } = "cluster/members"; + + public string CampaignKey { get; init; } = "cluster/leader"; + + public string LeaderGossipKey { get; init; } = "cluster:leader"; + + public JsonSerializerOptions? JsonSerializerOptions { get; init; } = new() + { + PropertyNameCaseInsensitive = true, + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + NumberHandling = JsonNumberHandling.AllowReadingFromString + }; + + public List> MemberElectedHandlers = new(); + + public EtcdProviderConfig WithLeaseTtl(int leaseTtl) => this with { LeaseTtl = leaseTtl }; + + public EtcdProviderConfig WithMembersKeyPrefix(string prefix) => this with { MembersKeyPrefix = prefix }; + + public EtcdProviderConfig WithCampaignKey(string key) => this with { CampaignKey = key }; + + public EtcdProviderConfig WithLeaderGossipKey(string key) => this with { LeaderGossipKey = key }; + + public EtcdProviderConfig WithJsonSerializerOptions(JsonSerializerOptions options) => this with { JsonSerializerOptions = options }; + + public EtcdProviderConfig WithElectedCallback(Action handler) + { + MemberElectedHandlers.Add(handler); + return this; + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Etcd/Proto.Cluster.Etcd.csproj b/src/Proto.Cluster.Etcd/Proto.Cluster.Etcd.csproj new file mode 100644 index 0000000000..7d489a2e30 --- /dev/null +++ b/src/Proto.Cluster.Etcd/Proto.Cluster.Etcd.csproj @@ -0,0 +1,19 @@ + + + + 10 + net6.0;net7.0;net8.0 + enable + enable + + + + + + + + + + + + From c62c393e02a0205eb310d6248b8fd93208358016 Mon Sep 17 00:00:00 2001 From: Tyrone Groves Date: Sat, 15 Feb 2025 14:13:46 -0600 Subject: [PATCH 2/2] Remove JsonSerializerOptions from EtcdProviderConfig and update serialization methods in EtcdProvider --- src/Proto.Cluster.Etcd/EtcdProvider.cs | 5 ++--- src/Proto.Cluster.Etcd/EtcdProviderConfig.cs | 9 --------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/Proto.Cluster.Etcd/EtcdProvider.cs b/src/Proto.Cluster.Etcd/EtcdProvider.cs index 7fcc3f4471..bdcb48a3c1 100644 --- a/src/Proto.Cluster.Etcd/EtcdProvider.cs +++ b/src/Proto.Cluster.Etcd/EtcdProvider.cs @@ -94,8 +94,7 @@ private async Task RegisterSystemAsMember(long leaseId) { Lease = leaseId, Key = ByteString.CopyFromUtf8(MemberKey), - Value = ByteString.CopyFromUtf8(JsonSerializer.Serialize( - new { id = memberId, host, port, kinds }, _config.JsonSerializerOptions)) + Value = ByteString.CopyFromUtf8(JsonSerializer.Serialize(new { id = memberId, host, port, kinds })) }; await _client.PutAsync(request, cancellationToken: _cluster.System.Shutdown); @@ -113,7 +112,7 @@ await _client.WatchRangeAsync(_config.MembersKeyPrefix, (WatchEvent[] _) => var rangeResponse = _client.GetRange(_config.MembersKeyPrefix, cancellationToken: _stoppingCts.Token); var members = rangeResponse.Kvs - .Select(kv => JsonSerializer.Deserialize(kv.Value.ToStringUtf8(), _config.JsonSerializerOptions)) + .Select(kv => JsonParser.Default.Parse(kv.Value.ToStringUtf8())) .Where(m => m != null) .Select(m => m!).ToList(); diff --git a/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs b/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs index facd35ae95..524ced45a4 100644 --- a/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs +++ b/src/Proto.Cluster.Etcd/EtcdProviderConfig.cs @@ -13,13 +13,6 @@ public record EtcdProviderConfig public string LeaderGossipKey { get; init; } = "cluster:leader"; - public JsonSerializerOptions? JsonSerializerOptions { get; init; } = new() - { - PropertyNameCaseInsensitive = true, - PropertyNamingPolicy = JsonNamingPolicy.CamelCase, - NumberHandling = JsonNumberHandling.AllowReadingFromString - }; - public List> MemberElectedHandlers = new(); public EtcdProviderConfig WithLeaseTtl(int leaseTtl) => this with { LeaseTtl = leaseTtl }; @@ -30,8 +23,6 @@ public record EtcdProviderConfig public EtcdProviderConfig WithLeaderGossipKey(string key) => this with { LeaderGossipKey = key }; - public EtcdProviderConfig WithJsonSerializerOptions(JsonSerializerOptions options) => this with { JsonSerializerOptions = options }; - public EtcdProviderConfig WithElectedCallback(Action handler) { MemberElectedHandlers.Add(handler);