Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented etcd IClusterProvider #2150

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "template", "template", "{08
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EndpointManagerTest", "benchmarks\EndpointManagerTest\EndpointManagerTest.csproj", "{B7258689-41D2-4284-AF93-050DD1DFEAC4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.Etcd", "src\Proto.Cluster.Etcd\Proto.Cluster.Etcd.csproj", "{D875AADB-B827-4A61-9B33-8B71E60E9539}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1495,6 +1497,18 @@ Global
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x64.Build.0 = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.ActiveCfg = Release|Any CPU
{B7258689-41D2-4284-AF93-050DD1DFEAC4}.Release|x86.Build.0 = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x64.ActiveCfg = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x64.Build.0 = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x86.ActiveCfg = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Debug|x86.Build.0 = Debug|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|Any CPU.Build.0 = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x64.ActiveCfg = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x64.Build.0 = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x86.ActiveCfg = Release|Any CPU
{D875AADB-B827-4A61-9B33-8B71E60E9539}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1631,6 +1645,7 @@ Global
{CDCE3D4C-1BDD-460F-93B8-75123A258183} = {ADE7A14E-FFE9-4137-AC25-E2F2A82B0A8C}
{087E5441-1582-4D55-8233-014C0FB06FF0} = {CDCE3D4C-1BDD-460F-93B8-75123A258183}
{B7258689-41D2-4284-AF93-050DD1DFEAC4} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
{D875AADB-B827-4A61-9B33-8B71E60E9539} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
145 changes: 145 additions & 0 deletions src/Proto.Cluster.Etcd/EtcdProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using System.Text.Json;
using dotnet_etcd;
using dotnet_etcd.interfaces;
using Etcdserverpb;
using Google.Protobuf;
using Microsoft.Extensions.Logging;
using V3Electionpb;

namespace Proto.Cluster.Etcd;

public class EtcdProvider : IClusterProvider
{
#pragma warning disable CS0618 // Type or member is obsolete
private static readonly ILogger Logger = Log.CreateLogger<EtcdProvider>();
#pragma warning restore CS0618 // Type or member is obsolete
private readonly IEtcdClient _client;
private readonly EtcdProviderConfig _config;
private Cluster _cluster = null!;
private CancellationTokenSource _stoppingCts = new();

private string MemberKey => $"{_config.MembersKeyPrefix}/{_cluster.System.Id}";

public EtcdProvider(IEtcdClient client, EtcdProviderConfig config)
{
_client = client;
_config = config;
}

public async Task StartMemberAsync(Cluster c)
{
Logger.LogInformation("Starting etcd provider");

_cluster = c;
_stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(_cluster.System.Shutdown);

var leaseId = await StartLeaseAndKeepAliveTaskAsync();
await RegisterSystemAsMember(leaseId);
StartWatchForMemberChangesTask();

if (_config.MemberElectedHandlers.Count > 0)
StartCampaignForLeaderTask(leaseId);

Logger.LogInformation("Started etcd provider");
}

public Task StartClientAsync(Cluster c)
{
Logger.LogInformation("Starting etcd provider as client");

_cluster = c;
_stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(_cluster.System.Shutdown);
StartWatchForMemberChangesTask();

Logger.LogInformation("Started etcd provider as client");

return Task.CompletedTask;
}

public async Task ShutdownAsync(bool graceful)
{
Logger.LogInformation("Shutting down etcd provider");

if (graceful)
{
await _client.DeleteAsync(MemberKey);
}

_stoppingCts.Cancel();

Logger.LogInformation("Shut down etcd provider");
}

private async Task<long> StartLeaseAndKeepAliveTaskAsync()
{
var leaseGrantResponse = await _client.LeaseGrantAsync(new LeaseGrantRequest { TTL = _config.LeaseTtl },
cancellationToken: _stoppingCts.Token);

_ = SafeTask.Run(async () =>
{
await _client.LeaseKeepAlive(leaseGrantResponse.ID, _stoppingCts.Token).ConfigureAwait(false);
Logger.LogInformation("Lease keep alive stopped");
}, _stoppingCts.Token);

return leaseGrantResponse.ID;
}

private async Task RegisterSystemAsMember(long leaseId)
{
var (host, port) = _cluster.System.GetAddress();
var kinds = _cluster.GetClusterKinds();
var memberId = _cluster.System.Id;

var request = new PutRequest
{
Lease = leaseId,
Key = ByteString.CopyFromUtf8(MemberKey),
Value = ByteString.CopyFromUtf8(JsonSerializer.Serialize(new { id = memberId, host, port, kinds }))
};

await _client.PutAsync(request, cancellationToken: _cluster.System.Shutdown);

Logger.LogDebug("Registered local system as member");
}

private void StartWatchForMemberChangesTask()
{
_ = SafeTask.Run(async () =>
{
await _client.WatchRangeAsync(_config.MembersKeyPrefix, (WatchEvent[] _) =>
{
Logger.LogDebug("Cluster membership changed");

var rangeResponse = _client.GetRange(_config.MembersKeyPrefix, cancellationToken: _stoppingCts.Token);
var members = rangeResponse.Kvs
.Select(kv => JsonParser.Default.Parse<Member>(kv.Value.ToStringUtf8()))
.Where(m => m != null)
.Select(m => m!).ToList();

_cluster.MemberList.UpdateClusterTopology(members);
}, cancellationToken: _stoppingCts.Token);

Logger.LogDebug("Stopped watching for member changes");
}, _stoppingCts.Token);
}

private void StartCampaignForLeaderTask(long leaseId)
{
Logger.LogInformation("Start campaigning for leader");

_ = SafeTask.Run(async () =>
{
var campaignRequest = new CampaignRequest
{
Name = ByteString.CopyFromUtf8(_config.CampaignKey),
Value = ByteString.CopyFromUtf8(_cluster.System.Id),
Lease = leaseId
};

var campaignResponse = await _client.CampaignAsync(campaignRequest, cancellationToken: _stoppingCts.Token);
Logger.LogInformation("Elected as leader with leader key {LeaderKey}", campaignResponse.Leader.Key.ToStringUtf8());

_config.MemberElectedHandlers.ForEach(h => h(_cluster));
}, _stoppingCts.Token);
}
}
31 changes: 31 additions & 0 deletions src/Proto.Cluster.Etcd/EtcdProviderConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Proto.Cluster.Etcd;

public record EtcdProviderConfig
{
public int LeaseTtl { get; init; } = 5;

public string MembersKeyPrefix { get; init; } = "cluster/members";

public string CampaignKey { get; init; } = "cluster/leader";

public string LeaderGossipKey { get; init; } = "cluster:leader";

public List<Action<Cluster>> MemberElectedHandlers = new();

public EtcdProviderConfig WithLeaseTtl(int leaseTtl) => this with { LeaseTtl = leaseTtl };

public EtcdProviderConfig WithMembersKeyPrefix(string prefix) => this with { MembersKeyPrefix = prefix };

public EtcdProviderConfig WithCampaignKey(string key) => this with { CampaignKey = key };

public EtcdProviderConfig WithLeaderGossipKey(string key) => this with { LeaderGossipKey = key };

public EtcdProviderConfig WithElectedCallback(Action<Cluster> handler)
{
MemberElectedHandlers.Add(handler);
return this;
}
}
19 changes: 19 additions & 0 deletions src/Proto.Cluster.Etcd/Proto.Cluster.Etcd.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<LangVersion>10</LangVersion>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="dotnet-etcd" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Proto.Actor\Proto.Actor.csproj" />
<ProjectReference Include="..\Proto.Cluster\Proto.Cluster.csproj" />
</ItemGroup>

</Project>