Skip to content

Commit

Permalink
TransactionalFlow and TransactionalSink support (#86)
Browse files Browse the repository at this point in the history
* Major update of producers core to match alpakka codebase and add features

* Added FlowWithContext implementation

* Implemented source/flow with context integration test

* Fixed producer stage reports handling

* Optimized test timing

* Fixed sample compilation errors

* Implemented transactional source

* Implemented transactional producer sink and flow

* Added tests for future check (failing, so skipped)

* Added InternalApi markers to not-working APIs
  • Loading branch information
IgorFedchenko authored and Aaronontheweb committed Nov 12, 2019
1 parent 21fd138 commit ce80944
Show file tree
Hide file tree
Showing 21 changed files with 1,046 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Helpers;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Confluent.Kafka;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Kafka.Tests.Integration
{
public class TransactionalIntegrationTests : KafkaIntegrationTests
{
public TransactionalIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
: base(nameof(TransactionalIntegrationTests), output, fixture)
{
}

[Fact(Skip = "Missing producer transactions support, see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85")]
public async Task Transactional_source_with_sink_Should_work()
{
var settings = CreateConsumerSettings<string>(CreateGroup(1));
var sourceTopic = CreateTopic(1);
var targetTopic = CreateTopic(2);
var transactionalId = Guid.NewGuid().ToString();
const int totalMessages = 10;

var control = KafkaConsumer.TransactionalSource(settings, Subscriptions.Topics(sourceTopic))
.Via(Business<TransactionalMessage<Null, string>>())
.Select(message =>
{
return ProducerMessage.Single(
new ProducerRecord<Null, string>(targetTopic, message.Record.Key, message.Record.Value),
passThrough: message.PartitionOffset);
})
.ToMaterialized(KafkaProducer.TransactionalSink(ProducerSettings, transactionalId), Keep.Both)
.MapMaterializedValue(DrainingControl<NotUsed>.Create)
.Run(Materializer);

var consumer = ConsumeStrings(targetTopic, totalMessages);

await ProduceStrings(sourceTopic, Enumerable.Range(1, totalMessages), ProducerSettings);

AssertTaskCompletesWithin(TimeSpan.FromSeconds(totalMessages), consumer.IsShutdown);
AssertTaskCompletesWithin(TimeSpan.FromSeconds(totalMessages), control.DrainAndShutdown());

consumer.DrainAndShutdown().Result.Should().HaveCount(totalMessages);
}

private Flow<T, T, NotUsed> Business<T>() => Flow.Create<T>();

private DrainingControl<IImmutableList<ConsumeResult<Null, string>>> ConsumeStrings(string topic, int count)
{
return KafkaConsumer.PlainSource(CreateConsumerSettings<string>(CreateGroup(1)), Subscriptions.Topics(topic))
.Take(count)
.ToMaterialized(Sink.Seq<ConsumeResult<Null, string>>(), Keep.Both)
.MapMaterializedValue(DrainingControl<IImmutableList<ConsumeResult<Null, string>>>.Create)
.Run(Materializer);
}
}
}
10 changes: 10 additions & 0 deletions src/Akka.Streams.Kafka/Dsl/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static Source<ConsumeResult<K, V>, IControl> AtMostOnceSource<K, V>(Consu
return Source.FromGraph(new CommittableSubSourceStage<K, V>(settings, subscription, metadataFromRecord));
}

/// <summary>
/// The <see cref="PlainPartitionedManualOffsetSource{K,V}"/> is similar to <see cref="PlainPartitionedSource{K,V}"/>
/// but allows the use of an offset store outside of Kafka, while retaining the automatic partition assignment.
/// When a topic-partition is assigned to a consumer, the <see cref="getOffsetsOnAssign"/>
Expand All @@ -164,5 +165,14 @@ public static Source<ConsumeResult<K, V>, IControl> AtMostOnceSource<K, V>(Consu
new Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>>(getOffsetsOnAssign),
onRevoke));
}

/// <summary>
/// Transactional source to setup a stream for Exactly Only Once (EoS) kafka message semantics. To enable EoS it's
/// necessary to use the [[Transactional.sink]] or [[Transactional.flow]] (for passthrough).
/// </summary>
public static Source<TransactionalMessage<K, V>, IControl> TransactionalSource<K, V>(ConsumerSettings<K, V> settings, ISubscription subscription)
{
return Source.FromGraph(new TransactionalSourceStage<K, V>(settings, subscription));
}
}
}
57 changes: 51 additions & 6 deletions src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ public static Flow<IEnvelope<TKey, TValue, TPassThrough>, IResults<TKey, TValue,
closeProducerOnStop: true))
.SelectAsync(settings.Parallelism, x => x);

return string.IsNullOrEmpty(settings.DispatcherId)
? flow
: flow.WithAttributes(ActorAttributes.CreateDispatcher(settings.DispatcherId));
return FlowWithDispatcher(settings, flow);
}

/// <summary>
Expand Down Expand Up @@ -133,9 +131,7 @@ public static Flow<IEnvelope<TKey, TValue, TPassThrough>, IResults<TKey, TValue,
customProducerProvider: () => producer))
.SelectAsync(settings.Parallelism, x => x);

return string.IsNullOrEmpty(settings.DispatcherId)
? flow
: flow.WithAttributes(ActorAttributes.CreateDispatcher(settings.DispatcherId));
return FlowWithDispatcher(settings, flow);
}


Expand Down Expand Up @@ -213,5 +209,54 @@ public static FlowWithContext<C, IEnvelope<K, V, NotUsed>, C, IResults<K, V, C>,
collapseContext: (env, c) => env.WithPassThrough(c),
extractContext: res => res.PassThrough);
}

/// <summary>
/// API IS FOR INTERNAL USAGE: see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85
///
/// Publish records to Kafka topics and then continue the flow. The flow can only be used with a <see cref="KafkaConsumer.TransactionalSource{K,V}"/> that
/// emits a <see cref="TransactionalMessage{K,V}"/>. The flow requires a unique `transactional.id` across all app
/// instances. The flow will override producer properties to enable Kafka exactly-once transactional support.
/// </summary>
[InternalApi]
public static Flow<IEnvelope<K, V, GroupTopicPartitionOffset>, IResults<K, V, GroupTopicPartitionOffset>, NotUsed> TransactionalFlow<K, V>(
ProducerSettings<K, V> setting,
string transactionalId)
{
if (string.IsNullOrEmpty(transactionalId))
throw new ArgumentException("You must define a Transactional id");

var transactionalSettings = setting
.WithProperty("enable.idempotence", "true")
.WithProperty("transactional.id", transactionalId)
.WithProperty("max.in.flight.requests.per.connection", "1");

var flow = Flow.FromGraph(new TransactionalProducerStage<K, V, GroupTopicPartitionOffset>(closeProducerOnStop: true, settings: transactionalSettings))
.SelectAsync(transactionalSettings.Parallelism, message => message);

return FlowWithDispatcher(transactionalSettings, flow);
}

/// <summary>
/// API IS FOR INTERNAL USAGE: see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85
///
/// Sink that is aware of the <see cref="TransactionalMessage{K,V}.PartitionOffset"/> from a <see cref="KafkaConsumer.TransactionalSource{K,V}"/>.
/// It will initialize, begin, produce, and commit the consumer offset as part of a transaction.
/// </summary>
[InternalApi]
public static Sink<IEnvelope<K, V, GroupTopicPartitionOffset>, Task> TransactionalSink<K, V>(
ProducerSettings<K, V> settings,
string transactionalId)
{
return TransactionalFlow(settings, transactionalId).ToMaterialized(Sink.Ignore<IResults<K, V, GroupTopicPartitionOffset>>(), Keep.Right);
}

private static Flow<IEnvelope<K, V, TPassThrough>, IResults<K, V, TPassThrough>, NotUsed> FlowWithDispatcher<K, V, TPassThrough>(
ProducerSettings<K, V> settings,
Flow<IEnvelope<K, V, TPassThrough>, IResults<K, V, TPassThrough>, NotUsed> flow)
{
return string.IsNullOrEmpty(settings.DispatcherId)
? flow
: flow.WithAttributes(ActorAttributes.CreateDispatcher(settings.DispatcherId));
}
}
}
7 changes: 7 additions & 0 deletions src/Akka.Streams.Kafka/Extensions/ObjectExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Akka.Streams.Util;
using Akka.Util;
using Newtonsoft.Json;

namespace Akka.Streams.Kafka.Extensions
Expand All @@ -11,5 +13,10 @@ public static string ToJson(this object obj)
{
return JsonConvert.SerializeObject(obj);
}

/// <summary>
/// Wraps object to the option
/// </summary>
public static Option<T> AsOption<T>(this T obj) => new Option<T>(obj);
}
}
16 changes: 16 additions & 0 deletions src/Akka.Streams.Kafka/Extensions/OptionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Akka.Streams.Util;
using Akka.Util;

namespace Akka.Streams.Kafka.Extensions
{
public static class OptionExtensions
{
/// <summary>
/// Gets option value, if any - otherwise returns default value provided
/// </summary>
public static T GetOrElse<T>(this Option<T> option, T defaultValue)
{
return option.HasValue ? option.Value : defaultValue;
}
}
}
12 changes: 11 additions & 1 deletion src/Akka.Streams.Kafka/Helpers/Control.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,17 @@ private DrainingControl(IControl control, Task<T> streamCompletion)
/// one, so that the stream can be stopped in a controlled way without losing
/// commits.
/// </summary>
public static DrainingControl<T> Create(Tuple<IControl, Task<T>> tuple) => new DrainingControl<T>(tuple.Item1, tuple.Item2);
public static DrainingControl<T> Create((IControl, Task<T>) tuple) => new DrainingControl<T>(tuple.Item1, tuple.Item2);

/// <summary>
/// Combine control and a stream completion signal materialized values into
/// one, so that the stream can be stopped in a controlled way without losing
/// commits.
/// </summary>
public static DrainingControl<NotUsed> Create((IControl, Task) tuple)
{
return new DrainingControl<NotUsed>(tuple.Item1, tuple.Item2.ContinueWith(t => NotUsed.Instance, TaskContinuationOptions.NotOnFaulted));
}
}

/// <summary>
Expand Down
43 changes: 43 additions & 0 deletions src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,47 @@ public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedCons
{
}
}

/// <summary>
/// Creates new handler with chaining of other two handlers
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
internal class PartitionAssignedHandlersChain<K, V> : IPartitionEventHandler<K, V>
{
private readonly IPartitionEventHandler<K, V> _handler1;
private readonly IPartitionEventHandler<K, V> _handler2;

/// <summary>
/// PartitionAssignedHandlersChain
/// </summary>
/// <param name="handler1">First handler in chain</param>
/// <param name="handler2">Second handler in chain</param>
public PartitionAssignedHandlersChain(IPartitionEventHandler<K, V> handler1, IPartitionEventHandler<K, V> handler2)
{
_handler1 = handler1;
_handler2 = handler2;
}

/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_handler1.OnRevoke(revokedTopicPartitions, consumer);
_handler2.OnRevoke(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, RestrictedConsumer<K, V> consumer)
{
_handler1.OnAssign(assignedTopicPartitions, consumer);
_handler2.OnAssign(assignedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, RestrictedConsumer<K, V> consumer)
{
_handler1.OnStop(topicPartitions, consumer);
_handler2.OnStop(topicPartitions, consumer);
}
}
}
46 changes: 46 additions & 0 deletions src/Akka.Streams.Kafka/Messages/CommittedMarker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Streams.Kafka.Stages.Consumers;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Messages
{
/// <summary>
/// Committed marker
/// </summary>
internal interface ICommittedMarker
{
/// <summary>
/// Marks offsets as already committed
/// </summary>
Task Committed(IImmutableDictionary<TopicPartition, OffsetAndMetadata> offsets);

/// <summary>
/// Marks committing failure
/// </summary>
void Failed();
}

/// <summary>
/// Used by <see cref="TransactionalMessageBuilder{K,V}"/>
/// </summary>
internal sealed class PartitionOffsetCommittedMarker : GroupTopicPartitionOffset
{
/// <summary>
/// Committed marker
/// </summary>
public ICommittedMarker CommittedMarker { get; }

public PartitionOffsetCommittedMarker(string groupId, string topic, int partition, Offset offset, ICommittedMarker committedMarker)
: base(groupId, topic, partition, offset)
{
CommittedMarker = committedMarker;
}

public PartitionOffsetCommittedMarker(GroupTopicPartition groupTopicPartition, Offset offset, ICommittedMarker committedMarker)
: base(groupTopicPartition, offset)
{
CommittedMarker = committedMarker;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Akka.Streams.Kafka.Messages
/// <summary>
/// Offset position for a groupId, topic, partition.
/// </summary>
public sealed class GroupTopicPartitionOffset : IEquatable<GroupTopicPartitionOffset>
public class GroupTopicPartitionOffset : IEquatable<GroupTopicPartitionOffset>
{
/// <summary>
/// GroupTopicPartitionOffset
Expand Down
30 changes: 30 additions & 0 deletions src/Akka.Streams.Kafka/Messages/TransactionalMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Akka.Streams.Kafka.Dsl;
using Confluent.Kafka;

namespace Akka.Streams.Kafka.Messages
{
/// <summary>
/// Output element of <see cref="KafkaConsumer.TransactionalSource{K,V}"/>
/// The offset is automatically committed as by the Producer
/// </summary>
public sealed class TransactionalMessage<K, V>
{
/// <summary>
/// TransactionalMessage
/// </summary>
public TransactionalMessage(ConsumeResult<K, V> record, GroupTopicPartitionOffset partitionOffset)
{
Record = record;
PartitionOffset = partitionOffset;
}

/// <summary>
/// Consumed record
/// </summary>
public ConsumeResult<K, V> Record { get; }
/// <summary>
/// Partition offset
/// </summary>
public GroupTopicPartitionOffset PartitionOffset { get; }
}
}
Loading

0 comments on commit ce80944

Please sign in to comment.