Skip to content

Commit

Permalink
- Fixes remaining logging issues (#143)
Browse files Browse the repository at this point in the history
- Fixes #142
  • Loading branch information
alexeyzimarev authored Oct 6, 2022
1 parent e587d9b commit 608feda
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public async ValueTask Consume(IMessageConsumeContext context) {
return;
}

var tasks = _eventHandlers.Select(Handle);
var tasks = _eventHandlers.Select(handler => Handle(handler));
await tasks.WhenAll().NoContext();
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Eventuous.Subscriptions.Context;
/// <summary>
/// Context type that allows to decouple subscriptions from the actual message processing
/// </summary>
public class DelayedAckConsumeContext : WrappedConsumeContext {
public class AsyncConsumeContext : WrappedConsumeContext {
readonly Acknowledge _acknowledge;
readonly Fail _fail;

Expand All @@ -28,7 +28,7 @@ public class DelayedAckConsumeContext : WrappedConsumeContext {
/// <param name="inner">The original message context</param>
/// <param name="acknowledge">Function to ACK the message</param>
/// <param name="fail">Function to NACK the message in case of failure</param>
public DelayedAckConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) : base(inner) {
public AsyncConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) : base(inner) {
inner.LogContext ??= Logger.Current;
_acknowledge = acknowledge;
_fail = fail;
Expand Down
16 changes: 10 additions & 6 deletions src/Core/src/Eventuous.Subscriptions/EventSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected EventSubscription(T options, ConsumePipe consumePipe, ILoggerFactory?
Pipe = Ensure.NotNull(consumePipe);
EventSerializer = options.EventSerializer ?? DefaultEventSerializer.Instance;
Options = options;
Log = Logger.CreateContext(options.SubscriptionId);
Log = Logger.CreateContext(options.SubscriptionId, loggerFactory);
}

OnSubscribed? _onSubscribed;
Expand Down Expand Up @@ -82,9 +82,9 @@ protected async ValueTask Handler(IMessageConsumeContext context) {
)
: null;

Logger.Current = Log;
var delayed = context is DelayedAckConsumeContext;
if (!delayed) activity?.Start();
Logger.Current ??= Log;
var isAsync = context is AsyncConsumeContext;
if (!isAsync) activity?.Start();

Log.MessageReceived(context);

Expand All @@ -93,7 +93,7 @@ protected async ValueTask Handler(IMessageConsumeContext context) {
if (activity != null) {
context.ParentContext = activity.Context;

if (delayed) {
if (isAsync) {
context.Items.AddItem(ContextItemKeys.Activity, activity);
}
}
Expand All @@ -102,6 +102,10 @@ protected async ValueTask Handler(IMessageConsumeContext context) {
}
else {
context.Ignore(SubscriptionId);
if (isAsync) {
var asyncContext = context as AsyncConsumeContext;
await asyncContext!.Acknowledge().NoContext();
}
}

if (context.WasIgnored() && activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.None;
Expand All @@ -127,7 +131,7 @@ protected async ValueTask Handler(IMessageConsumeContext context) {
}
}

if (!delayed) activity?.Dispose();
if (!isAsync) activity?.Dispose();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ protected EventSubscriptionWithCheckpoint(
);
}

static bool PipelineIsAsync(ConsumePipe pipe) => pipe.RegisteredFilters.Any(x => x is AsyncHandlingFilter);

// It's not ideal, but for now if there's any filter added on top of the default one,
// we won't add the concurrent filter, so it won't clash with any custom setup
static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
=> pipe.RegisteredFilters.All(x => x is not ConcurrentFilter)
? pipe.AddFilterFirst(new ConcurrentFilter((uint)concurrencyLimit))
: pipe;
=> PipelineIsAsync(pipe)
? pipe
: pipe.AddFilterFirst(new AsyncHandlingFilter((uint)concurrencyLimit));

protected EventPosition? LastProcessed { get; set; }
protected CheckpointCommitHandler CheckpointCommitHandler { get; }
Expand All @@ -42,8 +44,8 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected async ValueTask HandleInternal(IMessageConsumeContext context) {
try {
Logger.Configure(Options.SubscriptionId, LoggerFactory);
var ctx = new DelayedAckConsumeContext(context, Ack, Nack);
Logger.Current = Log;
var ctx = new AsyncConsumeContext(context, Ack, Nack);
await Handler(ctx).NoContext();
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

namespace Eventuous.Subscriptions.Filters;

public sealed class ConcurrentFilter : ConsumeFilter<DelayedAckConsumeContext>, IAsyncDisposable {
public sealed class AsyncHandlingFilter : ConsumeFilter<AsyncConsumeContext>, IAsyncDisposable {
readonly ConcurrentChannelWorker<WorkerTask> _worker;

public ConcurrentFilter(uint concurrencyLimit, uint bufferSize = 10) {
public AsyncHandlingFilter(uint concurrencyLimit, uint bufferSize = 10) {
var capacity = (int)(concurrencyLimit * bufferSize);

var options = new BoundedChannelOptions(capacity) {
Expand All @@ -37,7 +37,7 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c
Logger.Current = ctx.LogContext;

try {
await workerTask.Next.Value.Send(ctx, workerTask.Next).NoContext();
await workerTask.Filter.Value.Send(ctx, workerTask.Filter.Next).NoContext();

if (ctx.HasFailed()) {
var exception = ctx.HandlingResults.GetException();
Expand All @@ -52,24 +52,24 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c
if (!ctx.HandlingResults.IsPending()) await ctx.Acknowledge().NoContext();
}
catch (TaskCanceledException) {
ctx.Ignore<ConcurrentFilter>();
ctx.Ignore<AsyncHandlingFilter>();
}
catch (Exception e) {
ctx.LogContext.MessageHandlingFailed(nameof(ConcurrentFilter), workerTask.Context, e);
ctx.LogContext.MessageHandlingFailed(nameof(AsyncHandlingFilter), workerTask.Context, e);
activity?.SetActivityStatus(ActivityStatus.Error(e));
await ctx.Fail(e).NoContext();
}

if (activity != null && ctx.WasIgnored()) activity.ActivityTraceFlags = ActivityTraceFlags.None;
}

protected override ValueTask Send(DelayedAckConsumeContext context, LinkedListNode<IConsumeFilter>? next) {
protected override ValueTask Send(AsyncConsumeContext context, LinkedListNode<IConsumeFilter>? next) {
if (next == null) throw new InvalidOperationException("Concurrent context must have a next filer");

return _worker.Write(new WorkerTask(context, next), context.CancellationToken);
}

record struct WorkerTask(DelayedAckConsumeContext Context, LinkedListNode<IConsumeFilter> Next);
record struct WorkerTask(AsyncConsumeContext Context, LinkedListNode<IConsumeFilter> Filter);

public ValueTask DisposeAsync() {
// Logger.Configure(_subscriptionId, _loggerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

namespace Eventuous.Subscriptions.Filters;

public sealed class PartitioningFilter : ConsumeFilter<DelayedAckConsumeContext>, IAsyncDisposable {
public sealed class PartitioningFilter : ConsumeFilter<AsyncConsumeContext>, IAsyncDisposable {
readonly GetPartitionHash _getHash;
readonly GetPartitionKey _partitioner;
readonly ConcurrentFilter[] _filters;
readonly AsyncHandlingFilter[] _filters;
readonly int _partitionCount;

public PartitioningFilter(
Expand All @@ -26,11 +26,11 @@ public PartitioningFilter(
_partitioner = partitioner ?? (ctx => ctx.Stream);

_filters = Enumerable.Range(0, _partitionCount)
.Select(_ => new ConcurrentFilter(1))
.Select(_ => new AsyncHandlingFilter(1))
.ToArray();
}

protected override ValueTask Send(DelayedAckConsumeContext context, LinkedListNode<IConsumeFilter>? next) {
protected override ValueTask Send(AsyncConsumeContext context, LinkedListNode<IConsumeFilter>? next) {
var partitionKey = _partitioner(context);
var hash = _getHash(partitionKey);
var partition = hash % _partitionCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected override async ValueTask Send(
)
: Activity.Current;

if (activity?.IsAllDataRequested == true && context is DelayedAckConsumeContext delayedAckContext) {
if (activity?.IsAllDataRequested == true && context is AsyncConsumeContext delayedAckContext) {
activity.SetContextTags(context)?.SetTag(TelemetryTags.Eventuous.Partition, delayedAckContext.PartitionId);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static void Configure(string subscriptionId, ILoggerFactory? loggerFactor
Current = CreateContext(subscriptionId, loggerFactory);
}

public static LogContext CreateContext(string subscriptionId, ILoggerFactory? loggerFactory = null)
public static LogContext CreateContext(string subscriptionId, ILoggerFactory? loggerFactory)
=> new(subscriptionId, loggerFactory ?? NullLoggerFactory.Instance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,33 @@ namespace Eventuous.Subscriptions.Logging;

public static class LoggingExtensions {
public static void MessageReceived(this LogContext log, IMessageConsumeContext context)
=> log.DebugLog?.Log("Received {MessageType} from {Stream}", context.MessageType, context.Stream);
=> log.TraceLog?.Log(
"Received {MessageType} from {Stream}:{Position} seq {Sequence}",
context.MessageType,
context.Stream,
context.GlobalPosition,
context.Sequence
);

public static void MessageHandled(this LogContext log, string handlerType, IBaseConsumeContext context)
=> log.DebugLog?.Log("{Handler} handled {MessageType}", handlerType, context.MessageType);
=> log.DebugLog?.Log(
"{Handler} handled {MessageType} {Stream}:{Position} seq {Sequence}",
handlerType,
context.MessageType,
context.Stream,
context.GlobalPosition,
context.Sequence
);

public static void MessageIgnored(this LogContext log, string handlerType, IBaseConsumeContext context)
=> log.DebugLog?.Log("{Handler} ignored {MessageType}", handlerType, context.MessageType);
=> log.DebugLog?.Log(
"{Handler} ignored {MessageType} {Stream}:{Position} seq {Sequence}",
handlerType,
context.MessageType,
context.Stream,
context.GlobalPosition,
context.Sequence
);

public static void MessageTypeNotFound<T>(this LogContext log)
=> log.WarnLog?.Log("Message type {MessageType} not registered in the type map", typeof(T).Name);
Expand Down
8 changes: 8 additions & 0 deletions src/Core/src/Eventuous/TypeMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public static class TypeMap {
public static void AddType<T>(string name) => Instance.AddType<T>(name);

static void AddType(Type type, string name) => Instance.AddType(type, name);

public static void RemoveType<T>() => Instance.RemoveType<T>();

public static bool IsTypeRegistered<T>() => Instance.IsTypeRegistered<T>();

Expand Down Expand Up @@ -92,6 +94,12 @@ internal void AddType(Type type, string name) {
Log.TypeMapRegistered(type.Name, name);
}

public void RemoveType<T>() {
var name = GetTypeName<T>();
_reverseMap.Remove(name);
_map.Remove(typeof(T));
}

public bool IsTypeRegistered<T>() => _map.ContainsKey(typeof(T));

public void RegisterKnownEventTypes(params Assembly[] assembliesWithEvents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using Eventuous.Subscriptions.Context;
using Eventuous.Subscriptions.Diagnostics;
using Eventuous.Subscriptions.Filters;
using Eventuous.Subscriptions.Logging;

namespace Eventuous.EventStore.Subscriptions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,35 @@ public async Task SubscribeAndProduceMany() {

await Handler.Validate(5.Seconds());
await Stop();

CheckpointStore.Last.Position.Should().Be(count - 1);
}
}

[Fact]
public async Task SubscribeAndProduceManyWithIgnored() {
const int count = 10;

var testEvents = Generate().ToList();

Handler.AssertThat().Exactly(count, x => testEvents.Contains(x));

TypeMap.AddType<UnknownEvent>("ignored");
await Producer.Produce(Stream, testEvents, new Metadata());

await Start();
TypeMap.RemoveType<UnknownEvent>();
await Handler.Validate(5.Seconds());
await Stop();

CheckpointStore.Last.Position.Should().Be((ulong)(testEvents.Count - 1));

IEnumerable<object> Generate() {
for (var i = 0; i < count; i++) {
yield return new TestEvent(Auto.Create<string>(), i);
yield return new UnknownEvent(Auto.Create<string>(), i);
}
}
}

record UnknownEvent(string Data, int Number);
}
5 changes: 3 additions & 2 deletions src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Eventuous.Subscriptions.Context;
using Eventuous.Subscriptions.Filters;

namespace Eventuous.Gateway;

Expand All @@ -22,8 +23,8 @@ public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsume

AcknowledgeProduce? onAck = null;

if (context is DelayedAckConsumeContext delayed) {
onAck = _ => delayed.Acknowledge();
if (context is AsyncConsumeContext asyncContext) {
onAck = _ => asyncContext.Acknowledge();
}

var grouped = shovelMessages.GroupBy(x => x.TargetStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsume

AcknowledgeProduce? onAck = null;

if (context is DelayedAckConsumeContext delayed) {
onAck = _ => delayed.Acknowledge();
if (context is AsyncConsumeContext asyncContext) {
onAck = _ => asyncContext.Acknowledge();
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ namespace Eventuous.Projections.MongoDB;

public partial class MongoOperationBuilder<TEvent, T> where T : ProjectedDocument where TEvent : class {
public class UpdateOneBuilder : UpdateBuilder<UpdateOneBuilder>, IMongoProjectorBuilder {
public UpdateOneBuilder IdFromStream(GetDocumentIdFromStream getId)
=> Id(x => getId(x.Stream));
public UpdateOneBuilder IdFromStream(GetDocumentIdFromStream getId) => Id(x => getId(x.Stream));

public UpdateOneBuilder Id(GetDocumentIdFromContext<TEvent> getId) {
_filter.Id(getId);
Expand All @@ -29,7 +28,9 @@ ProjectTypedEvent<T, TEvent> IMongoProjectorBuilder.Build()
await collection
.UpdateOneAsync(
_filter.GetFilter(ctx),
update.Set(x => x.Position, ctx.StreamPosition),
update
.Set(x => x.StreamPosition, ctx.StreamPosition)
.Set(x => x.Position, ctx.GlobalPosition),
options,
token
);
Expand All @@ -46,11 +47,12 @@ ProjectTypedEvent<T, TEvent> IMongoProjectorBuilder.Build()
var update = await GetUpdate(ctx, Builders<T>.Update).NoContext();

await collection.UpdateManyAsync(
_filter.GetFilter(ctx),
update.Set(x => x.Position, ctx.StreamPosition),
options,
token
).NoContext();
_filter.GetFilter(ctx),
update.Set(x => x.Position, ctx.StreamPosition),
options,
token
)
.NoContext();
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ namespace Eventuous.Projections.MongoDB.Tools;
public abstract record Document(string Id);

public abstract record ProjectedDocument(string Id) : Document(Id) {
public ulong StreamPosition { get; init; }
public ulong Position { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public PostgresAllStreamSubscription(
PostgresAllStreamSubscriptionOptions options,
ICheckpointStore checkpointStore,
ConsumePipe consumePipe,
ILoggerFactory? loggerFactory = null
ILoggerFactory? loggerFactory
) : base(getConnection, options, checkpointStore, consumePipe, loggerFactory) { }

protected override NpgsqlCommand PrepareCommand(NpgsqlConnection connection, long start) {
Expand Down
Loading

0 comments on commit 608feda

Please sign in to comment.