Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Akka.Streams.Kafka.Testkit to support other specs #263

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Akka.Streams.Kafka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventHub.Consumer", "exampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Benchmark", "src\Akka.Streams.Kafka.Benchmark\Akka.Streams.Kafka.Benchmark.csproj", "{1BF52F4E-93FA-40C4-8985-C21D779C7997}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Testkit", "src\Akka.Streams.Kafka.Testkit\Akka.Streams.Kafka.Testkit.csproj", "{49F2E094-BB98-41D8-9B3D-CF6557DCC420}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -62,6 +64,10 @@ Global
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.Build.0 = Release|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.Build.0 = Debug|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.ActiveCfg = Release|Any CPU
{49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
25 changes: 25 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Streams.TestKit" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.TestKit.Xunit2" Version="$(AkkaVersion)" />
<PackageReference Include="Docker.DotNet" Version="3.125.5" />
<PackageReference Include="DotNet.Testcontainers" Version="1.6.0-beta.2028" />
<PackageReference Include="FakeItEasy" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Streams.Kafka\Akka.Streams.Kafka.csproj" />
</ItemGroup>

<ItemGroup>
<None Remove="Resources\reference.conf" />
<EmbeddedResource Include="Resources\reference.conf" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Stages.Consumers;

namespace Akka.Streams.Kafka.Tests.TestKit.Internal
namespace Akka.Streams.Kafka.Testkit
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
public static class ConsumerResultFactory
{
Expand Down
55 changes: 55 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Helpers;

namespace Akka.Streams.Kafka.Testkit.Dsl
{
public static class ConsumerControlFactory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper functions for consumer controls

{
public static Source<TA, IControl> AttachControl<TA, TB>(Source<TA, TB> source)
=> source.ViaMaterialized(ControlFlow<TA>(), Keep.Right);

public static Flow<TA, TA, IControl> ControlFlow<TA>()
=> Flow.Create<TA>()
.ViaMaterialized(KillSwitches.Single<TA>(), Keep.Right)
.MapMaterializedValue(Control);

public static IControl Control(IKillSwitch killSwitch)
=> new FakeControl(killSwitch);

public class FakeControl : IControl
{
private readonly IKillSwitch _killSwitch;
private readonly TaskCompletionSource<Done> _shutdownPromise;

public FakeControl(IKillSwitch killSwitch)
{
_killSwitch = killSwitch;
_shutdownPromise = new TaskCompletionSource<Done>();
}

public Task Stop()
{
_killSwitch.Shutdown();
_shutdownPromise.SetResult(Done.Instance);
return _shutdownPromise.Task;
}

public Task Shutdown()
{
_killSwitch.Shutdown();
_shutdownPromise.SetResult(Done.Instance);
return _shutdownPromise.Task;
}

public Task IsShutdown => _shutdownPromise.Task;

public Task<TResult> DrainAndShutdown<TResult>(Task<TResult> streamCompletion)
{
_killSwitch.Shutdown();
_shutdownPromise.SetResult(Done.Instance);
return Task.FromResult(default(TResult));
}
}
}
}
177 changes: 177 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor.Setup;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Streams.Kafka.Testkit.Internal;
using Akka.Streams.TestKit;
using Akka.Util;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Xunit;
using Xunit.Abstractions;
using Config = Akka.Configuration.Config;

namespace Akka.Streams.Kafka.Testkit.Dsl
{
public abstract class KafkaSpec : KafkaTestKit, IAsyncLifetime
{
protected KafkaSpec(string config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
{
}

protected KafkaSpec(Config config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
{
}

protected KafkaSpec(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
{
}

protected IProducer<string, string> TestProducer { get; private set; }


public virtual Task InitializeAsync()
{
TestProducer = ProducerDefaults().CreateKafkaProducer();
SetUpAdminClient();
return Task.CompletedTask;
}

public virtual Task DisposeAsync()
{
TestProducer?.Dispose();
CleanUpAdminClient();
Shutdown();
return Task.CompletedTask;
}

protected void Sleep(TimeSpan time, string msg)
{
Log.Debug($"Sleeping {time}: {msg}");
Thread.Sleep(time);
}

protected List<T> AwaitMultiple<T>(TimeSpan timeout, IEnumerable<Task<T>> tasks)
{
var completedTasks = new List<Task<T>>();
using (var cts = new CancellationTokenSource(timeout))
{
var waitingTasks = tasks.ToList();
while (waitingTasks.Count > 0)
{
var anyTask = Task.WhenAny(waitingTasks);
try
{
anyTask.Wait(cts.Token);
}
catch (Exception e)
{
throw new Exception($"AwaitMultiple failed. Exception: {e.Message}", e);
}

var completedTask = anyTask.Result;
waitingTasks.Remove(completedTask);
completedTasks.Add(completedTask);
}
}

return completedTasks.Select(t => t.Result).ToList();
}

protected TimeSpan SleepAfterProduce => TimeSpan.FromSeconds(4);

protected void AwaitProduce(IEnumerable<Task<Done>> tasks)
{
AwaitMultiple(TimeSpan.FromSeconds(4), tasks);
Sleep(SleepAfterProduce, "to be sure producing has happened");
}

protected readonly Partition Partition0 = new Partition(0);

// Not implemented
[Obsolete("Kafka DescribeCluster API isn't supported by the .NET driver")]
protected void WaitUntilCluster(Func<object, bool> predicate)
=> Checks.WaitUntilCluster(Settings.ClusterTimeout, Settings.CheckInterval, AdminClient, predicate, Log);

protected void WaitUntilConsumerGroup(string groupId, Func<GroupInfo, bool> predicate)
=> Checks.WaitUntilConsumerGroup(
groupId: groupId,
timeout: Settings.ConsumerGroupTimeout,
sleepInBetween: Settings.CheckInterval,
adminClient: AdminClient,
predicate: predicate,
log: Log);

protected void WaitUntilConsumerSummary(string groupId, Func<List<GroupMemberInfo>, bool> predicate)
=> WaitUntilConsumerGroup(groupId, info =>
{
return info.State == "Stable" && Try<bool>.From(() => predicate(info.Members)).OrElse(false).Success.Value;
});

protected ImmutableList<string> CreateTopics(IEnumerable<int> topics)
=> CreateTopicsAsync(topics).Result;

protected async Task<ImmutableList<string>> CreateTopicsAsync(IEnumerable<int> topics)
{
var topicNames = topics.Select(CreateTopicName).ToImmutableList();
var configs = new Dictionary<string, string>();
var newTopics = topicNames.Select(topic =>
new TopicSpecification
{
Name = topic,
NumPartitions = 1,
ReplicationFactor = 1,
Configs = configs
});
await AdminClient.CreateTopicsAsync(
topics: newTopics,
options: new CreateTopicsOptions {RequestTimeout = TimeSpan.FromSeconds(10)});
return topicNames;
}

protected void PeriodicalCheck<T>(string description, int maxTries, TimeSpan sleepInBetween, Func<T> data, Func<T, bool> predicate)
=> Checks.PeriodicalCheck(description, new TimeSpan(sleepInBetween.Ticks * maxTries), sleepInBetween, data, predicate, Log);

/// <summary>
/// Produce messages to topic using specified range and return a Future so the caller can synchronize consumption.
/// </summary>
protected Task Produce(string topic, IEnumerable<int> range, int? partition = null)
=> ProduceString(topic, range.Select(i => i.ToString()), partition);

protected Task ProduceString(string topic, IEnumerable<string> range, int? partition = null)
{
partition ??= Partition0;
return Source.From(range)
// NOTE: If no partition is specified but a key is present a partition will be chosen
// using a hash of the key. If neither key nor partition is present a partition
// will be assigned in a round-robin fashion.
.Select(n => new ProducerRecord<string, string>(topic, partition, DefaultKey, n))
.RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer());
}

protected Task ProduceTimestamped(string topic, IEnumerable<(int, long)> timestampedRange)
=> Source.From(timestampedRange)
.Select( tuple =>
{
var (n, ts) = tuple;
return new ProducerRecord<string, string>(topic, Partition0, ts, DefaultKey, n.ToString());
})
.RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer());

protected (IControl, TestSubscriber.Probe<string>) CreateProbe(
ConsumerSettings<string, string> consumerSettings,
string[] topics)
=> KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topics))
.Select(s => s.Message.Value)
.ToMaterialized(this.SinkProbe<string>(), Keep.Both)
.Run(Sys.Materializer());
}
}
91 changes: 91 additions & 0 deletions src/Akka.Streams.Kafka.Testkit/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Implementation;
using Akka.Util.Internal;

namespace Akka.Streams.Kafka.Testkit
{
public static class Extensions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from Kafka.Test

{
public static async Task WithTimeout(this Task task, TimeSpan timeout)
{
using (var cts = new CancellationTokenSource())
{
var timeoutTask = Task.Delay(timeout, cts.Token);
var completed = await Task.WhenAny(task, timeoutTask);
if (completed == timeoutTask)
throw new OperationCanceledException("Operation timed out");
else
cts.Cancel();
}
}

public static List<List<T>> Grouped<T>(this IEnumerable<T> messages, int size)
{
var groups = new List<List<T>>();
var list = new List<T>();
var index = 0;
foreach (var message in messages)
{
list.Add(message);
if(index != 0 && index % size == 0)
{
groups.Add(list);
list = new List<T>();
}

index++;
}
if(list.Count > 0)
groups.Add(list);
return groups;
}

public static void AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Action block, IMaterializer materializer)
{
AssertAllStagesStopped(spec, () =>
{
block();
return NotUsed.Instance;
}, materializer);
}

public static T AssertAllStagesStopped<T>(this Akka.TestKit.Xunit2.TestKit spec, Func<T> block, IMaterializer materializer)
{
if (!(materializer is ActorMaterializerImpl impl))
return block();

var probe = spec.CreateTestProbe(impl.System);
probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance);
probe.ExpectMsg<StreamSupervisor.StoppedChildren>();
var result = block();

probe.Within(TimeSpan.FromSeconds(5), () =>
{
IImmutableSet<IActorRef> children = ImmutableHashSet<IActorRef>.Empty;
try
{
probe.AwaitAssert(() =>
{
impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref);
children = probe.ExpectMsg<StreamSupervisor.Children>().Refs;
if (children.Count != 0)
throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}");
});
}
catch
{
children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance));
throw;
}
});

return result;
}
}
}
Loading