From 608feda348426bb4b74086c4b5933dd17405fad5 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Thu, 6 Oct 2022 10:40:40 +0200 Subject: [PATCH] - Fixes remaining logging issues (#143) - Fixes #142 --- .../Consumers/DefaultConsumer.cs | 2 +- ...nsumeContext.cs => AsyncConsumeContext.cs} | 4 +-- .../EventSubscription.cs | 16 ++++++---- .../EventSubscriptionWithCheckpoint.cs | 12 ++++--- ...urrentFilter.cs => AsyncHandlingFilter.cs} | 14 ++++---- .../Filters/PartitioningFilter.cs | 8 ++--- .../Filters/TracingFilter.cs | 2 +- .../Eventuous.Subscriptions/Logging/Logger.cs | 2 +- .../Logging/SubscriptionLogging.cs | 26 +++++++++++++-- src/Core/src/Eventuous/TypeMap.cs | 8 +++++ .../Subscriptions/AllStreamSubscription.cs | 1 - ...PublishAndSubscribeManyPartitionedTests.cs | 32 +++++++++++++++++-- .../src/Eventuous.Gateway/GatewayHandler.cs | 5 +-- .../GatewayHandlerWithOptions.cs | 4 +-- .../Operations/UpdateBuilder.cs | 18 ++++++----- .../Tools/Document.cs | 1 + .../PostgresAllStreamSubscription.cs | 2 +- .../Subscriptions/PostgresSubscriptionBase.cs | 3 +- .../Subscriptions/RabbitMqSubscription.cs | 4 +-- 19 files changed, 115 insertions(+), 49 deletions(-) rename src/Core/src/Eventuous.Subscriptions/Context/{DelayedAckConsumeContext.cs => AsyncConsumeContext.cs} (90%) rename src/Core/src/Eventuous.Subscriptions/Filters/{ConcurrentFilter.cs => AsyncHandlingFilter.cs} (80%) diff --git a/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs b/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs index a52b2969..268f596c 100644 --- a/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs +++ b/src/Core/src/Eventuous.Subscriptions/Consumers/DefaultConsumer.cs @@ -18,7 +18,7 @@ public async ValueTask Consume(IMessageConsumeContext context) { return; } - var tasks = _eventHandlers.Select(Handle); + var tasks = _eventHandlers.Select(handler => Handle(handler)); await tasks.WhenAll().NoContext(); } catch (Exception e) { diff --git a/src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs b/src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs similarity index 90% rename from src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs rename to src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs index 2c79de17..3fb3a420 100644 --- a/src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs +++ b/src/Core/src/Eventuous.Subscriptions/Context/AsyncConsumeContext.cs @@ -18,7 +18,7 @@ namespace Eventuous.Subscriptions.Context; /// /// Context type that allows to decouple subscriptions from the actual message processing /// -public class DelayedAckConsumeContext : WrappedConsumeContext { +public class AsyncConsumeContext : WrappedConsumeContext { readonly Acknowledge _acknowledge; readonly Fail _fail; @@ -28,7 +28,7 @@ public class DelayedAckConsumeContext : WrappedConsumeContext { /// The original message context /// Function to ACK the message /// Function to NACK the message in case of failure - public DelayedAckConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) : base(inner) { + public AsyncConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) : base(inner) { inner.LogContext ??= Logger.Current; _acknowledge = acknowledge; _fail = fail; diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscription.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscription.cs index cca7225d..995d217b 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscription.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscription.cs @@ -36,7 +36,7 @@ protected EventSubscription(T options, ConsumePipe consumePipe, ILoggerFactory? Pipe = Ensure.NotNull(consumePipe); EventSerializer = options.EventSerializer ?? DefaultEventSerializer.Instance; Options = options; - Log = Logger.CreateContext(options.SubscriptionId); + Log = Logger.CreateContext(options.SubscriptionId, loggerFactory); } OnSubscribed? _onSubscribed; @@ -82,9 +82,9 @@ protected async ValueTask Handler(IMessageConsumeContext context) { ) : null; - Logger.Current = Log; - var delayed = context is DelayedAckConsumeContext; - if (!delayed) activity?.Start(); + Logger.Current ??= Log; + var isAsync = context is AsyncConsumeContext; + if (!isAsync) activity?.Start(); Log.MessageReceived(context); @@ -93,7 +93,7 @@ protected async ValueTask Handler(IMessageConsumeContext context) { if (activity != null) { context.ParentContext = activity.Context; - if (delayed) { + if (isAsync) { context.Items.AddItem(ContextItemKeys.Activity, activity); } } @@ -102,6 +102,10 @@ protected async ValueTask Handler(IMessageConsumeContext context) { } else { context.Ignore(SubscriptionId); + if (isAsync) { + var asyncContext = context as AsyncConsumeContext; + await asyncContext!.Acknowledge().NoContext(); + } } if (context.WasIgnored() && activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.None; @@ -127,7 +131,7 @@ protected async ValueTask Handler(IMessageConsumeContext context) { } } - if (!delayed) activity?.Dispose(); + if (!isAsync) activity?.Dispose(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs index 02d84257..1cfc2780 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs @@ -28,12 +28,14 @@ protected EventSubscriptionWithCheckpoint( ); } + static bool PipelineIsAsync(ConsumePipe pipe) => pipe.RegisteredFilters.Any(x => x is AsyncHandlingFilter); + // It's not ideal, but for now if there's any filter added on top of the default one, // we won't add the concurrent filter, so it won't clash with any custom setup static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit) - => pipe.RegisteredFilters.All(x => x is not ConcurrentFilter) - ? pipe.AddFilterFirst(new ConcurrentFilter((uint)concurrencyLimit)) - : pipe; + => PipelineIsAsync(pipe) + ? pipe + : pipe.AddFilterFirst(new AsyncHandlingFilter((uint)concurrencyLimit)); protected EventPosition? LastProcessed { get; set; } protected CheckpointCommitHandler CheckpointCommitHandler { get; } @@ -42,8 +44,8 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit) [MethodImpl(MethodImplOptions.AggressiveInlining)] protected async ValueTask HandleInternal(IMessageConsumeContext context) { try { - Logger.Configure(Options.SubscriptionId, LoggerFactory); - var ctx = new DelayedAckConsumeContext(context, Ack, Nack); + Logger.Current = Log; + var ctx = new AsyncConsumeContext(context, Ack, Nack); await Handler(ctx).NoContext(); } catch (Exception e) { diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs similarity index 80% rename from src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs rename to src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs index e4728d1a..80a202ab 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/AsyncHandlingFilter.cs @@ -10,10 +10,10 @@ namespace Eventuous.Subscriptions.Filters; -public sealed class ConcurrentFilter : ConsumeFilter, IAsyncDisposable { +public sealed class AsyncHandlingFilter : ConsumeFilter, IAsyncDisposable { readonly ConcurrentChannelWorker _worker; - public ConcurrentFilter(uint concurrencyLimit, uint bufferSize = 10) { + public AsyncHandlingFilter(uint concurrencyLimit, uint bufferSize = 10) { var capacity = (int)(concurrencyLimit * bufferSize); var options = new BoundedChannelOptions(capacity) { @@ -37,7 +37,7 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c Logger.Current = ctx.LogContext; try { - await workerTask.Next.Value.Send(ctx, workerTask.Next).NoContext(); + await workerTask.Filter.Value.Send(ctx, workerTask.Filter.Next).NoContext(); if (ctx.HasFailed()) { var exception = ctx.HandlingResults.GetException(); @@ -52,10 +52,10 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c if (!ctx.HandlingResults.IsPending()) await ctx.Acknowledge().NoContext(); } catch (TaskCanceledException) { - ctx.Ignore(); + ctx.Ignore(); } catch (Exception e) { - ctx.LogContext.MessageHandlingFailed(nameof(ConcurrentFilter), workerTask.Context, e); + ctx.LogContext.MessageHandlingFailed(nameof(AsyncHandlingFilter), workerTask.Context, e); activity?.SetActivityStatus(ActivityStatus.Error(e)); await ctx.Fail(e).NoContext(); } @@ -63,13 +63,13 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c if (activity != null && ctx.WasIgnored()) activity.ActivityTraceFlags = ActivityTraceFlags.None; } - protected override ValueTask Send(DelayedAckConsumeContext context, LinkedListNode? next) { + protected override ValueTask Send(AsyncConsumeContext context, LinkedListNode? next) { if (next == null) throw new InvalidOperationException("Concurrent context must have a next filer"); return _worker.Write(new WorkerTask(context, next), context.CancellationToken); } - record struct WorkerTask(DelayedAckConsumeContext Context, LinkedListNode Next); + record struct WorkerTask(AsyncConsumeContext Context, LinkedListNode Filter); public ValueTask DisposeAsync() { // Logger.Configure(_subscriptionId, _loggerFactory); diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs index 17c18c77..96a717c8 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/PartitioningFilter.cs @@ -7,10 +7,10 @@ namespace Eventuous.Subscriptions.Filters; -public sealed class PartitioningFilter : ConsumeFilter, IAsyncDisposable { +public sealed class PartitioningFilter : ConsumeFilter, IAsyncDisposable { readonly GetPartitionHash _getHash; readonly GetPartitionKey _partitioner; - readonly ConcurrentFilter[] _filters; + readonly AsyncHandlingFilter[] _filters; readonly int _partitionCount; public PartitioningFilter( @@ -26,11 +26,11 @@ public PartitioningFilter( _partitioner = partitioner ?? (ctx => ctx.Stream); _filters = Enumerable.Range(0, _partitionCount) - .Select(_ => new ConcurrentFilter(1)) + .Select(_ => new AsyncHandlingFilter(1)) .ToArray(); } - protected override ValueTask Send(DelayedAckConsumeContext context, LinkedListNode? next) { + protected override ValueTask Send(AsyncConsumeContext context, LinkedListNode? next) { var partitionKey = _partitioner(context); var hash = _getHash(partitionKey); var partition = hash % _partitionCount; diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs index 70fefb41..84fa4576 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/TracingFilter.cs @@ -34,7 +34,7 @@ protected override async ValueTask Send( ) : Activity.Current; - if (activity?.IsAllDataRequested == true && context is DelayedAckConsumeContext delayedAckContext) { + if (activity?.IsAllDataRequested == true && context is AsyncConsumeContext delayedAckContext) { activity.SetContextTags(context)?.SetTag(TelemetryTags.Eventuous.Partition, delayedAckContext.PartitionId); } diff --git a/src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs b/src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs index 098d37a0..9dd36fa0 100644 --- a/src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs +++ b/src/Core/src/Eventuous.Subscriptions/Logging/Logger.cs @@ -28,7 +28,7 @@ public static void Configure(string subscriptionId, ILoggerFactory? loggerFactor Current = CreateContext(subscriptionId, loggerFactory); } - public static LogContext CreateContext(string subscriptionId, ILoggerFactory? loggerFactory = null) + public static LogContext CreateContext(string subscriptionId, ILoggerFactory? loggerFactory) => new(subscriptionId, loggerFactory ?? NullLoggerFactory.Instance); } diff --git a/src/Core/src/Eventuous.Subscriptions/Logging/SubscriptionLogging.cs b/src/Core/src/Eventuous.Subscriptions/Logging/SubscriptionLogging.cs index a9c6109b..ff5e7d70 100644 --- a/src/Core/src/Eventuous.Subscriptions/Logging/SubscriptionLogging.cs +++ b/src/Core/src/Eventuous.Subscriptions/Logging/SubscriptionLogging.cs @@ -7,13 +7,33 @@ namespace Eventuous.Subscriptions.Logging; public static class LoggingExtensions { public static void MessageReceived(this LogContext log, IMessageConsumeContext context) - => log.DebugLog?.Log("Received {MessageType} from {Stream}", context.MessageType, context.Stream); + => log.TraceLog?.Log( + "Received {MessageType} from {Stream}:{Position} seq {Sequence}", + context.MessageType, + context.Stream, + context.GlobalPosition, + context.Sequence + ); public static void MessageHandled(this LogContext log, string handlerType, IBaseConsumeContext context) - => log.DebugLog?.Log("{Handler} handled {MessageType}", handlerType, context.MessageType); + => log.DebugLog?.Log( + "{Handler} handled {MessageType} {Stream}:{Position} seq {Sequence}", + handlerType, + context.MessageType, + context.Stream, + context.GlobalPosition, + context.Sequence + ); public static void MessageIgnored(this LogContext log, string handlerType, IBaseConsumeContext context) - => log.DebugLog?.Log("{Handler} ignored {MessageType}", handlerType, context.MessageType); + => log.DebugLog?.Log( + "{Handler} ignored {MessageType} {Stream}:{Position} seq {Sequence}", + handlerType, + context.MessageType, + context.Stream, + context.GlobalPosition, + context.Sequence + ); public static void MessageTypeNotFound(this LogContext log) => log.WarnLog?.Log("Message type {MessageType} not registered in the type map", typeof(T).Name); diff --git a/src/Core/src/Eventuous/TypeMap.cs b/src/Core/src/Eventuous/TypeMap.cs index 95b111ca..3025218b 100644 --- a/src/Core/src/Eventuous/TypeMap.cs +++ b/src/Core/src/Eventuous/TypeMap.cs @@ -26,6 +26,8 @@ public static class TypeMap { public static void AddType(string name) => Instance.AddType(name); static void AddType(Type type, string name) => Instance.AddType(type, name); + + public static void RemoveType() => Instance.RemoveType(); public static bool IsTypeRegistered() => Instance.IsTypeRegistered(); @@ -92,6 +94,12 @@ internal void AddType(Type type, string name) { Log.TypeMapRegistered(type.Name, name); } + public void RemoveType() { + var name = GetTypeName(); + _reverseMap.Remove(name); + _map.Remove(typeof(T)); + } + public bool IsTypeRegistered() => _map.ContainsKey(typeof(T)); public void RegisterKnownEventTypes(params Assembly[] assembliesWithEvents) { diff --git a/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs b/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs index 23753380..10dda57a 100644 --- a/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs +++ b/src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs @@ -6,7 +6,6 @@ using Eventuous.Subscriptions.Context; using Eventuous.Subscriptions.Diagnostics; using Eventuous.Subscriptions.Filters; -using Eventuous.Subscriptions.Logging; namespace Eventuous.EventStore.Subscriptions; diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs index c16e66d6..5c54225f 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs @@ -29,7 +29,35 @@ public async Task SubscribeAndProduceMany() { await Handler.Validate(5.Seconds()); await Stop(); - + CheckpointStore.Last.Position.Should().Be(count - 1); } -} \ No newline at end of file + + [Fact] + public async Task SubscribeAndProduceManyWithIgnored() { + const int count = 10; + + var testEvents = Generate().ToList(); + + Handler.AssertThat().Exactly(count, x => testEvents.Contains(x)); + + TypeMap.AddType("ignored"); + await Producer.Produce(Stream, testEvents, new Metadata()); + + await Start(); + TypeMap.RemoveType(); + await Handler.Validate(5.Seconds()); + await Stop(); + + CheckpointStore.Last.Position.Should().Be((ulong)(testEvents.Count - 1)); + + IEnumerable Generate() { + for (var i = 0; i < count; i++) { + yield return new TestEvent(Auto.Create(), i); + yield return new UnknownEvent(Auto.Create(), i); + } + } + } + + record UnknownEvent(string Data, int Number); +} diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs index 8f2639d1..d227759f 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs @@ -1,4 +1,5 @@ using Eventuous.Subscriptions.Context; +using Eventuous.Subscriptions.Filters; namespace Eventuous.Gateway; @@ -22,8 +23,8 @@ public override async ValueTask HandleEvent(IMessageConsume AcknowledgeProduce? onAck = null; - if (context is DelayedAckConsumeContext delayed) { - onAck = _ => delayed.Acknowledge(); + if (context is AsyncConsumeContext asyncContext) { + onAck = _ => asyncContext.Acknowledge(); } var grouped = shovelMessages.GroupBy(x => x.TargetStream); diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs index 32438483..234b89ce 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandlerWithOptions.cs @@ -30,8 +30,8 @@ public override async ValueTask HandleEvent(IMessageConsume AcknowledgeProduce? onAck = null; - if (context is DelayedAckConsumeContext delayed) { - onAck = _ => delayed.Acknowledge(); + if (context is AsyncConsumeContext asyncContext) { + onAck = _ => asyncContext.Acknowledge(); } try { diff --git a/src/Mongo/src/Eventuous.Projections.MongoDB/Operations/UpdateBuilder.cs b/src/Mongo/src/Eventuous.Projections.MongoDB/Operations/UpdateBuilder.cs index 3a1cbc8b..fee5492a 100644 --- a/src/Mongo/src/Eventuous.Projections.MongoDB/Operations/UpdateBuilder.cs +++ b/src/Mongo/src/Eventuous.Projections.MongoDB/Operations/UpdateBuilder.cs @@ -9,8 +9,7 @@ namespace Eventuous.Projections.MongoDB; public partial class MongoOperationBuilder where T : ProjectedDocument where TEvent : class { public class UpdateOneBuilder : UpdateBuilder, IMongoProjectorBuilder { - public UpdateOneBuilder IdFromStream(GetDocumentIdFromStream getId) - => Id(x => getId(x.Stream)); + public UpdateOneBuilder IdFromStream(GetDocumentIdFromStream getId) => Id(x => getId(x.Stream)); public UpdateOneBuilder Id(GetDocumentIdFromContext getId) { _filter.Id(getId); @@ -29,7 +28,9 @@ ProjectTypedEvent IMongoProjectorBuilder.Build() await collection .UpdateOneAsync( _filter.GetFilter(ctx), - update.Set(x => x.Position, ctx.StreamPosition), + update + .Set(x => x.StreamPosition, ctx.StreamPosition) + .Set(x => x.Position, ctx.GlobalPosition), options, token ); @@ -46,11 +47,12 @@ ProjectTypedEvent IMongoProjectorBuilder.Build() var update = await GetUpdate(ctx, Builders.Update).NoContext(); await collection.UpdateManyAsync( - _filter.GetFilter(ctx), - update.Set(x => x.Position, ctx.StreamPosition), - options, - token - ).NoContext(); + _filter.GetFilter(ctx), + update.Set(x => x.Position, ctx.StreamPosition), + options, + token + ) + .NoContext(); } ); } diff --git a/src/Mongo/src/Eventuous.Projections.MongoDB/Tools/Document.cs b/src/Mongo/src/Eventuous.Projections.MongoDB/Tools/Document.cs index fa825431..dd53d0c9 100644 --- a/src/Mongo/src/Eventuous.Projections.MongoDB/Tools/Document.cs +++ b/src/Mongo/src/Eventuous.Projections.MongoDB/Tools/Document.cs @@ -3,5 +3,6 @@ namespace Eventuous.Projections.MongoDB.Tools; public abstract record Document(string Id); public abstract record ProjectedDocument(string Id) : Document(Id) { + public ulong StreamPosition { get; init; } public ulong Position { get; init; } } \ No newline at end of file diff --git a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs index bf51a140..60c7d37e 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresAllStreamSubscription.cs @@ -17,7 +17,7 @@ public PostgresAllStreamSubscription( PostgresAllStreamSubscriptionOptions options, ICheckpointStore checkpointStore, ConsumePipe consumePipe, - ILoggerFactory? loggerFactory = null + ILoggerFactory? loggerFactory ) : base(getConnection, options, checkpointStore, consumePipe, loggerFactory) { } protected override NpgsqlCommand PrepareCommand(NpgsqlConnection connection, long start) { diff --git a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresSubscriptionBase.cs b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresSubscriptionBase.cs index ecee1eac..e80dc636 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresSubscriptionBase.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Subscriptions/PostgresSubscriptionBase.cs @@ -6,8 +6,8 @@ using Eventuous.Subscriptions; using Eventuous.Subscriptions.Checkpoints; using Eventuous.Subscriptions.Context; -using Eventuous.Subscriptions.Diagnostics; using Eventuous.Subscriptions.Filters; +using Eventuous.Subscriptions.Logging; using Microsoft.Extensions.Logging; using Npgsql; @@ -98,6 +98,7 @@ async Task PollingQuery(ulong? position, CancellationToken cancellationToken) { protected abstract long MoveStart(PersistedEvent evt); IMessageConsumeContext ToConsumeContext(PersistedEvent evt, CancellationToken cancellationToken) { + Logger.Current = Log; var data = DeserializeData( ContentType, evt.MessageType, diff --git a/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs b/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs index 3ca29cd7..bb889a3e 100644 --- a/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs +++ b/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs @@ -52,7 +52,7 @@ public RabbitMqSubscription( ) : base( Ensure.NotNull(options), - consumePipe.AddFilterFirst(new ConcurrentFilter(options.ConcurrencyLimit * 10)), + consumePipe.AddFilterFirst(new AsyncHandlingFilter(options.ConcurrencyLimit * 10)), loggerFactory ) { _failureHandler = options.FailureHandler ?? DefaultEventFailureHandler; @@ -135,7 +135,7 @@ async Task HandleReceived(object sender, BasicDeliverEventArgs received) { Logger.Current = Log; try { var ctx = CreateContext(sender, received).WithItem(ReceivedMessageKey, received); - await Handler(new DelayedAckConsumeContext(ctx, Ack, Nack)).NoContext(); + await Handler(new AsyncConsumeContext(ctx, Ack, Nack)).NoContext(); } catch (Exception) { // This won't stop the subscription, but the reader will be gone. Not sure how to solve this one.