Skip to content

Commit

Permalink
Implemented etcd IClusterProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroves-tgiapps committed Feb 15, 2025
1 parent ace44fd commit 03323f0
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 0 deletions.
15 changes: 15 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.Etcd", "src\Proto.Cluster.Etcd\Proto.Cluster.Etcd.csproj", "{D875AADB-B827-4A61-9B33-8B71E60E9539}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1495,6 +1497,18 @@ Global
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x64.ActiveCfg = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x64.Build.0 = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x86.ActiveCfg = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x86.Build.0 = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|Any CPU.Build.0 = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x64.ActiveCfg = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x64.Build.0 = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x86.ActiveCfg = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1631,6 +1645,7 @@ Global
{CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183}
{B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
{D875AADB-B827-4A61-9B33-8B71E60E9539} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
146 changes: 146 additions & 0 deletions src/Proto.Cluster.Etcd/EtcdProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
using System.Text.Json;
using dotnet_etcd;
using dotnet_etcd.interfaces;
using Etcdserverpb;
using Google.Protobuf;
using Microsoft.Extensions.Logging;
using V3Electionpb;

namespace Proto.Cluster.Etcd;

public class EtcdProvider : IClusterProvider
{
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<EtcdProvider>();
#pragma warning restore CS0618 // Type or member is obsolete
private readonly IEtcdClient _client;
private readonly EtcdProviderConfig _config;
private Cluster _cluster = null!;
private CancellationTokenSource _stoppingCts = new();

private string MemberKey => $"{_config.MembersKeyPrefix}/{_cluster.System.Id}";

public EtcdProvider(IEtcdClient client, EtcdProviderConfig config)
{
_client = client;
_config = config;
}

public async Task StartMemberAsync(Cluster c)
{
Logger.LogInformation("Starting etcd provider");

_cluster = c;
_stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(_cluster.System.Shutdown);

var leaseId = await StartLeaseAndKeepAliveTaskAsync();
await RegisterSystemAsMember(leaseId);
StartWatchForMemberChangesTask();

if (_config.MemberElectedHandlers.Count > 0)
StartCampaignForLeaderTask(leaseId);

Logger.LogInformation("Started etcd provider");
}

public Task StartClientAsync(Cluster c)
{
Logger.LogInformation("Starting etcd provider as client");

_cluster = c;
_stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(_cluster.System.Shutdown);
StartWatchForMemberChangesTask();

Logger.LogInformation("Started etcd provider as client");

return Task.CompletedTask;
}

public async Task ShutdownAsync(bool graceful)
{
Logger.LogInformation("Shutting down etcd provider");

if (graceful)
{
await _client.DeleteAsync(MemberKey);
}

_stoppingCts.Cancel();

Logger.LogInformation("Shut down etcd provider");
}

private async Task<long> StartLeaseAndKeepAliveTaskAsync()
{
var leaseGrantResponse = await _client.LeaseGrantAsync(new LeaseGrantRequest { TTL = _config.LeaseTtl },
cancellationToken: _stoppingCts.Token);

_ = SafeTask.Run(async () =>
{
await _client.LeaseKeepAlive(leaseGrantResponse.ID, _stoppingCts.Token).ConfigureAwait(false);
Logger.LogInformation("Lease keep alive stopped");
}, _stoppingCts.Token);

return leaseGrantResponse.ID;
}

private async Task RegisterSystemAsMember(long leaseId)
{
var (host, port) = _cluster.System.GetAddress();
var kinds = _cluster.GetClusterKinds();
var memberId = _cluster.System.Id;

var request = new PutRequest
{
Lease = leaseId,
Key = ByteString.CopyFromUtf8(MemberKey),
Value = ByteString.CopyFromUtf8(JsonSerializer.Serialize(
new { id = memberId, host, port, kinds }, _config.JsonSerializerOptions))
};

await _client.PutAsync(request, cancellationToken: _cluster.System.Shutdown);

Logger.LogDebug("Registered local system as member");
}

private void StartWatchForMemberChangesTask()
{
_ = SafeTask.Run(async () =>
{
await _client.WatchRangeAsync(_config.MembersKeyPrefix, (WatchEvent[] _) =>
{
Logger.LogDebug("Cluster membership changed");

var rangeResponse = _client.GetRange(_config.MembersKeyPrefix, cancellationToken: _stoppingCts.Token);
var members = rangeResponse.Kvs
.Select(kv => JsonSerializer.Deserialize<Member>(kv.Value.ToStringUtf8(), _config.JsonSerializerOptions))
.Where(m => m != null)
.Select(m => m!).ToList();

_cluster.MemberList.UpdateClusterTopology(members);
}, cancellationToken: _stoppingCts.Token);

Logger.LogDebug("Stopped watching for member changes");
}, _stoppingCts.Token);
}

private void StartCampaignForLeaderTask(long leaseId)
{
Logger.LogInformation("Start campaigning for leader");

_ = SafeTask.Run(async () =>
{
var campaignRequest = new CampaignRequest
{
Name = ByteString.CopyFromUtf8(_config.CampaignKey),
Value = ByteString.CopyFromUtf8(_cluster.System.Id),
Lease = leaseId
};

var campaignResponse = await _client.CampaignAsync(campaignRequest, cancellationToken: _stoppingCts.Token);
Logger.LogInformation("Elected as leader with leader key {LeaderKey}", campaignResponse.Leader.Key.ToStringUtf8());

_config.MemberElectedHandlers.ForEach(h => h(_cluster));
}, _stoppingCts.Token);
}
}
40 changes: 40 additions & 0 deletions src/Proto.Cluster.Etcd/EtcdProviderConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Proto.Cluster.Etcd;

public record EtcdProviderConfig
{
public int LeaseTtl { get; init; } = 5;

public string MembersKeyPrefix { get; init; } = "cluster/members";

public string CampaignKey { get; init; } = "cluster/leader";

public string LeaderGossipKey { get; init; } = "cluster:leader";

public JsonSerializerOptions? JsonSerializerOptions { get; init; } = new()
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
NumberHandling = JsonNumberHandling.AllowReadingFromString
};

public List<Action<Cluster>> MemberElectedHandlers = new();

public EtcdProviderConfig WithLeaseTtl(int leaseTtl) => this with { LeaseTtl = leaseTtl };

public EtcdProviderConfig WithMembersKeyPrefix(string prefix) => this with { MembersKeyPrefix = prefix };

public EtcdProviderConfig WithCampaignKey(string key) => this with { CampaignKey = key };

public EtcdProviderConfig WithLeaderGossipKey(string key) => this with { LeaderGossipKey = key };

public EtcdProviderConfig WithJsonSerializerOptions(JsonSerializerOptions options) => this with { JsonSerializerOptions = options };

public EtcdProviderConfig WithElectedCallback(Action<Cluster> handler)
{
MemberElectedHandlers.Add(handler);
return this;
}
}
19 changes: 19 additions & 0 deletions src/Proto.Cluster.Etcd/Proto.Cluster.Etcd.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<LangVersion>10</LangVersion>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="dotnet-etcd" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Proto.Actor\Proto.Actor.csproj" />
<ProjectReference Include="..\Proto.Cluster\Proto.Cluster.csproj" />
</ItemGroup>

</Project>

0 comments on commit 03323f0

Please sign in to comment.