diff --git a/README.md b/README.md index 85a6977..e8071ce 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ A wrapper for Serilog sinks that asynchronously emits events in batches, useful when logging to a slow and/or remote target. +> [!IMPORTANT] +> Serilog 4.x and later versions support batching natively. New projects should use Serilog's `IBatchedLogEventSink` and +> `WriteTo.Sink(IBatchedLogEventSink)`, not this package which is now only maintained for compatibility reasons. + ### Getting started Sinks that, for performance reasons, need to emit events in batches, can be implemented using `PeriodicBatchingSink` diff --git a/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj b/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj index 76b2170..6142513 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj +++ b/src/Serilog.Sinks.PeriodicBatching/Serilog.Sinks.PeriodicBatching.csproj @@ -2,10 +2,15 @@ Buffer batches of log events to be flushed asynchronously. - 4.1.2 + 5.0.0 Serilog Contributors - net462 - $(TargetFrameworks);netstandard2.0;net6.0 + Serilog Contributors + + net471;net462 + + $(TargetFrameworks);net8.0;net6.0;netstandard2.0 true Serilog serilog;batching;timer @@ -21,16 +26,12 @@ snupkg - + $(DefineConstants);FEATURE_ASYNCDISPOSABLE - - - - - + diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/FailureAwareBatchScheduler.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/FailureAwareBatchScheduler.cs deleted file mode 100644 index 8b6366f..0000000 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/FailureAwareBatchScheduler.cs +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright © Serilog Contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace Serilog.Sinks.PeriodicBatching; - -/// -/// Manages reconnection period and transient fault response for . -/// During normal operation an object of this type will simply echo the configured batch transmission -/// period. When availability fluctuates, the class tracks the number of failed attempts, each time -/// increasing the interval before reconnection is attempted (up to a set maximum) and at predefined -/// points indicating that either the current batch, or entire waiting queue, should be dropped. This -/// Serves two purposes - first, a loaded receiver may need a temporary reduction in traffic while coming -/// back online. Second, the sender needs to account for both bad batches (the first fault response) and -/// also overproduction (the second, queue-dropping response). In combination these should provide a -/// reasonable delivery effort but ultimately protect the sender from memory exhaustion. -/// -class FailureAwareBatchScheduler -{ - static readonly TimeSpan MinimumBackoffPeriod = TimeSpan.FromSeconds(5); - static readonly TimeSpan MaximumBackoffInterval = TimeSpan.FromMinutes(10); - - const int FailuresBeforeDroppingBatch = 8; - const int FailuresBeforeDroppingQueue = 10; - - readonly TimeSpan _period; - - int _failuresSinceSuccessfulBatch; - - public FailureAwareBatchScheduler(TimeSpan period) - { - if (period < TimeSpan.Zero) - throw new ArgumentOutOfRangeException(nameof(period), "The batching period must be a positive timespan."); - - _period = period; - } - - public void MarkSuccess() - { - _failuresSinceSuccessfulBatch = 0; - } - - public void MarkFailure() - { - ++_failuresSinceSuccessfulBatch; - } - - public TimeSpan NextInterval - { - get - { - // Available, and first failure, just try the batch interval - if (_failuresSinceSuccessfulBatch <= 1) return _period; - - // Second failure, start ramping up the interval - first 2x, then 4x, ... - var backoffFactor = Math.Pow(2, _failuresSinceSuccessfulBatch - 1); - - // If the period is ridiculously short, give it a boost so we get some - // visible backoff. - var backoffPeriod = Math.Max(_period.Ticks, MinimumBackoffPeriod.Ticks); - - // The "ideal" interval - var backedOff = (long) (backoffPeriod * backoffFactor); - - // Capped to the maximum interval - var cappedBackoff = Math.Min(MaximumBackoffInterval.Ticks, backedOff); - - // Unless that's shorter than the period, in which case we'll just apply the period - var actual = Math.Max(_period.Ticks, cappedBackoff); - - return TimeSpan.FromTicks(actual); - } - } - - public bool ShouldDropBatch => _failuresSinceSuccessfulBatch >= FailuresBeforeDroppingBatch; - - public bool ShouldDropQueue => _failuresSinceSuccessfulBatch >= FailuresBeforeDroppingQueue; -} \ No newline at end of file diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/LegacyBatchedSinkAdapter.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/LegacyBatchedSinkAdapter.cs new file mode 100644 index 0000000..a114fbd --- /dev/null +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/LegacyBatchedSinkAdapter.cs @@ -0,0 +1,53 @@ +using Serilog.Events; + +namespace Serilog.Sinks.PeriodicBatching; + +sealed class LegacyBatchedSinkAdapter: Core.IBatchedLogEventSink, IDisposable +#if FEATURE_ASYNCDISPOSABLE + , IAsyncDisposable +#endif +{ + readonly IBatchedLogEventSink _inner; + readonly bool _dispose; + + public LegacyBatchedSinkAdapter(IBatchedLogEventSink inner, bool dispose) + { + _inner = inner; + _dispose = dispose; + } + + public Task EmitBatchAsync(IReadOnlyCollection batch) + { + return _inner.EmitBatchAsync(batch); + } + + public Task OnEmptyBatchAsync() + { + return _inner.OnEmptyBatchAsync(); + } + + public void Dispose() + { + if (!_dispose) + return; + + (_inner as IDisposable)?.Dispose(); + } + +#if FEATURE_ASYNCDISPOSABLE + public async ValueTask DisposeAsync() + { + if (!_dispose) + return; + + if (_inner is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync(); + } + else + { + Dispose(); + } + } +#endif +} diff --git a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs index fc04155..6ef723e 100644 --- a/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs +++ b/src/Serilog.Sinks.PeriodicBatching/Sinks/PeriodicBatching/PeriodicBatchingSink.cs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System.Threading.Channels; +using Serilog.Configuration; using Serilog.Core; -using Serilog.Debugging; using Serilog.Events; // ReSharper disable UnusedParameter.Global, ConvertIfStatementToConditionalTernaryExpression, MemberCanBePrivate.Global, UnusedMember.Global, VirtualMemberNeverOverridden.Global, ClassWithVirtualMembersNeverInherited.Global, SuspiciousTypeConversion.Global @@ -29,27 +28,8 @@ public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEvent , IAsyncDisposable #endif { - // Buffers events from the write- to the read side. - readonly Channel _queue; - - // These fields are used by the write side to signal shutdown. - // A mutex is required because the queue writer `Complete()` call is not idempotent and will throw if - // called multiple times, e.g. via multiple `Dispose()` calls on this sink. - readonly object _stateLock = new(); - // Needed because the read loop needs to observe shutdown even when the target batched (remote) sink is - // unable to accept events (preventing the queue from being drained and completion being observed). - readonly CancellationTokenSource _shutdownSignal = new(); - // The write side can wait on this to ensure shutdown has completed. - readonly Task _runLoop; - - // Used only by the read side. - readonly IBatchedLogEventSink _targetSink = null!; - readonly int _batchSizeLimit; - readonly bool _eagerlyEmitFirstEvent; - readonly FailureAwareBatchScheduler _batchScheduler; - readonly Queue _currentBatch = new(); - readonly Task _waitForShutdownSignal; - Task? _cachedWaitToRead; + readonly ILogEventSink _targetSink; + readonly bool _inheritanceApi; /// /// Constant used with legacy constructor to indicate that the internal queue shouldn't be limited. @@ -57,6 +37,30 @@ public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEvent [Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")] public const int NoQueueLimit = -1; + static ILogEventSink CreateSink( + IBatchedLogEventSink batchedLogEventSink, + bool disposeBatchedSink, + PeriodicBatchingSinkOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + if (options.BatchSizeLimit <= 0) + throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero."); + if (options.Period <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero."); + + var adaptedOptions = new BatchingOptions + { + BatchSizeLimit = options.BatchSizeLimit, + QueueLimit = options.QueueLimit, + BufferingTimeLimit = options.Period, + EagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent + }; + + var adapter = new LegacyBatchedSinkAdapter(batchedLogEventSink, disposeBatchedSink); + + return LoggerSinkConfiguration.CreateSink(lc => lc.Sink(adapter, adaptedOptions)); + } + /// /// Construct a . /// @@ -65,9 +69,8 @@ public class PeriodicBatchingSink : ILogEventSink, IDisposable, IBatchedLogEvent /// it will dispose this object if possible. /// Options controlling behavior of the sink. public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSinkOptions options) - : this(options) { - _targetSink = batchedSink ?? throw new ArgumentNullException(nameof(batchedSink)); + _targetSink = CreateSink(batchedSink, true, options); } /// @@ -80,15 +83,15 @@ public PeriodicBatchingSink(IBatchedLogEventSink batchedSink, PeriodicBatchingSi /// The time to wait between checking for event batches. [Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")] protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period) - : this(new PeriodicBatchingSinkOptions + { + _inheritanceApi = true; + _targetSink = CreateSink(this, false, new PeriodicBatchingSinkOptions { BatchSizeLimit = batchSizeLimit, Period = period, EagerlyEmitFirstEvent = true, QueueLimit = null - }) - { - _targetSink = this; + }); } /// @@ -102,39 +105,15 @@ protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period) /// Maximum number of events in the queue - use for an unbounded queue. [Obsolete("Implement `IBatchedLogEventSink` and use the `PeriodicBatchingSinkOptions` constructor.")] protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLimit) - : this(new PeriodicBatchingSinkOptions + { + _inheritanceApi = true; + _targetSink = CreateSink(this, false, new PeriodicBatchingSinkOptions { BatchSizeLimit = batchSizeLimit, Period = period, EagerlyEmitFirstEvent = true, QueueLimit = queueLimit == NoQueueLimit ? null : queueLimit - }) - { - _targetSink = this; - } - - PeriodicBatchingSink(PeriodicBatchingSinkOptions options) - { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (options.BatchSizeLimit <= 0) - throw new ArgumentOutOfRangeException(nameof(options), "The batch size limit must be greater than zero."); - if (options.Period <= TimeSpan.Zero) - throw new ArgumentOutOfRangeException(nameof(options), "The period must be greater than zero."); - - _batchSizeLimit = options.BatchSizeLimit; - _queue = options.QueueLimit is { } limit - ? Channel.CreateBounded(new BoundedChannelOptions(limit) { SingleReader = true }) - : Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); - _batchScheduler = new FailureAwareBatchScheduler(options.Period); - _eagerlyEmitFirstEvent = options.EagerlyEmitFirstEvent; - _waitForShutdownSignal = Task.Delay(Timeout.InfiniteTimeSpan, _shutdownSignal.Token) - .ContinueWith(e => e.Exception, TaskContinuationOptions.OnlyOnFaulted); - - // The conditional here is no longer required in .NET 8+ (dotnet/runtime#82912) - using (ExecutionContext.IsFlowSuppressed() ? (IDisposable?)null : ExecutionContext.SuppressFlow()) - { - _runLoop = Task.Run(LoopAsync); - } + }); } /// @@ -151,142 +130,8 @@ protected PeriodicBatchingSink(int batchSizeLimit, TimeSpan period, int queueLim public void Emit(LogEvent logEvent) { if (logEvent == null) throw new ArgumentNullException(nameof(logEvent)); - - if (_shutdownSignal.IsCancellationRequested) - return; - - _queue.Writer.TryWrite(logEvent); - } - - async Task LoopAsync() - { - var isEagerBatch = _eagerlyEmitFirstEvent; - do - { - // Code from here through to the `try` block is expected to be infallible. It's structured this way because - // any failure modes within it haven't been accounted for in the rest of the sink design, and would need - // consideration in order for the sink to function robustly (i.e. to avoid hot/infinite looping). - - var fillBatch = Task.Delay(_batchScheduler.NextInterval); - do - { - while (_currentBatch.Count < _batchSizeLimit && - !_shutdownSignal.IsCancellationRequested && - _queue.Reader.TryRead(out var next) && - CanInclude(next)) - { - _currentBatch.Enqueue(next); - } - } while ((_currentBatch.Count < _batchSizeLimit && !isEagerBatch || _currentBatch.Count == 0) && - !_shutdownSignal.IsCancellationRequested && - await TryWaitToReadAsync(_queue.Reader, fillBatch, _shutdownSignal.Token).ConfigureAwait(false)); - - try - { - if (_currentBatch.Count == 0) - { - await _targetSink.OnEmptyBatchAsync().ConfigureAwait(false); - } - else - { - isEagerBatch = false; - - await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false); - - _currentBatch.Clear(); - _batchScheduler.MarkSuccess(); - } - } - catch (Exception ex) - { - WriteToSelfLog("failed emitting a batch", ex); - _batchScheduler.MarkFailure(); - - if (_batchScheduler.ShouldDropBatch) - { - WriteToSelfLog("dropping the current batch"); - _currentBatch.Clear(); - } - - if (_batchScheduler.ShouldDropQueue) - { - WriteToSelfLog("dropping all queued events"); - - // Not ideal, uses some CPU capacity unnecessarily and doesn't complete in bounded time. The goal is - // to reduce memory pressure on the client if the server is offline for extended periods. May be - // worth reviewing and possibly abandoning this. - while (_queue.Reader.TryRead(out _) && !_shutdownSignal.IsCancellationRequested) { } - } - - // Wait out the remainder of the batch fill time so that we don't overwhelm the server. With each - // successive failure the interval will increase. Needs special handling so that we don't need to - // make `fillBatch` cancellable (and thus fallible). - await Task.WhenAny(fillBatch, _waitForShutdownSignal).ConfigureAwait(false); - } - } - while (!_shutdownSignal.IsCancellationRequested); - - // At this point: - // - The sink is being disposed - // - The queue has been completed - // - The queue may or may not be empty - // - The waiting batch may or may not be empty - // - The target sink may or may not be accepting events - - // Try flushing the rest of the queue, but bail out on any failure. Shutdown time is unbounded, but it - // doesn't make sense to pick an arbitrary limit - a future version might add a new option to control this. - try - { - while (_queue.Reader.TryPeek(out _)) - { - while (_currentBatch.Count < _batchSizeLimit && - _queue.Reader.TryRead(out var next)) - { - _currentBatch.Enqueue(next); - } - - if (_currentBatch.Count != 0) - { - await _targetSink.EmitBatchAsync(_currentBatch).ConfigureAwait(false); - _currentBatch.Clear(); - } - } - } - catch (Exception ex) - { - WriteToSelfLog("failed emitting a batch during shutdown; dropping remaining queued events", ex); - } - } - - // Wait until `reader` has items to read. Returns `false` if the `timeout` task completes, or if the reader is cancelled. - async Task TryWaitToReadAsync(ChannelReader reader, Task timeout, CancellationToken cancellationToken) - { - var waitToRead = _cachedWaitToRead ?? reader.WaitToReadAsync(cancellationToken).AsTask(); - _cachedWaitToRead = null; - - var completed = await Task.WhenAny(timeout, waitToRead).ConfigureAwait(false); - - // Avoid unobserved task exceptions in the cancellation and failure cases. Note that we may not end up observing - // read task cancellation exceptions during shutdown, may be some room to improve. - if (completed is { Exception: not null, IsCanceled: false }) - { - WriteToSelfLog($"could not read from queue: {completed.Exception}"); - } - - if (completed == timeout) - { - // Dropping references to `waitToRead` will cause it and some supporting objects to leak; disposing it - // will break the channel and cause future attempts to read to fail. So, we cache and reuse it next time - // around the loop. - - _cachedWaitToRead = waitToRead; - return false; - } - - if (waitToRead.Status is not TaskStatus.RanToCompletion) - return false; - - return await waitToRead; + if (!_inheritanceApi || CanInclude(logEvent)) + _targetSink.Emit(logEvent); } /// @@ -331,83 +176,21 @@ public void Dispose() protected virtual void Dispose(bool disposing) { if (!disposing) return; - - SignalShutdown(); - - try - { - _runLoop.Wait(); - } - catch (Exception ex) - { - // E.g. the task was canceled before ever being run, or internally failed and threw - // an unexpected exception. - WriteToSelfLog("caught exception during disposal", ex); - } - - if (ReferenceEquals(_targetSink, this)) - { - // The sink is being used in the obsolete inheritance-based mode. - return; - } - - (_targetSink as IDisposable)?.Dispose(); + ((IDisposable)_targetSink).Dispose(); + GC.SuppressFinalize(this); } #if FEATURE_ASYNCDISPOSABLE /// public async ValueTask DisposeAsync() { - SignalShutdown(); - - try - { - await _runLoop.ConfigureAwait(false); - } - catch (Exception ex) - { - // E.g. the task was canceled before ever being run, or internally failed and threw - // an unexpected exception. - WriteToSelfLog("caught exception during async disposal", ex); - } - - if (ReferenceEquals(_targetSink, this)) - { - // The sink is being used in the obsolete inheritance-based mode. Old sinks won't - // override something like `DisposeAsyncCore()`; we just forward to the synchronous - // `Dispose()` method to ensure whatever cleanup they do still occurs. + await ((IAsyncDisposable)_targetSink).DisposeAsync(); + if (_inheritanceApi) Dispose(true); - return; - } - - if (_targetSink is IAsyncDisposable asyncDisposable) - await asyncDisposable.DisposeAsync().ConfigureAwait(false); - else - (_targetSink as IDisposable)?.Dispose(); GC.SuppressFinalize(this); } #endif - - void SignalShutdown() - { - lock (_stateLock) - { - if (!_shutdownSignal.IsCancellationRequested) - { - // Relies on synchronization via `_stateLock`: once the writer is completed, subsequent attempts to - // complete it will throw. - _queue.Writer.Complete(); - _shutdownSignal.Cancel(); - } - } - } - - void WriteToSelfLog(string message, Exception? exception = null) - { - var ex = exception != null ? $"{Environment.NewLine}{exception}" : ""; - SelfLog.WriteLine($"PeriodicBatchingSink ({_targetSink}): {message}{ex}"); - } /// /// Determine whether a queued log event should be included in the batch. If diff --git a/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs b/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs deleted file mode 100644 index 2ac40e8..0000000 --- a/test/Serilog.Sinks.PeriodicBatching.Tests/FailureAwareBatchSchedulerTests.cs +++ /dev/null @@ -1,105 +0,0 @@ -using System.Globalization; -using Xunit; - -namespace Serilog.Sinks.PeriodicBatching.Tests; - -public class FailureAwareBatchSchedulerTests -{ - static TimeSpan Period => TimeSpan.FromSeconds(2); - FailureAwareBatchScheduler Scheduler { get; } = new(Period); - - [Fact] - public void WhenNoFailuresHaveOccurredTheInitialIntervalIsUsed() - { - Assert.Equal(Period, Scheduler.NextInterval); - } - - [Fact] - public void WhenOneFailureHasOccurredTheInitialIntervalIsUsed() - { - Scheduler.MarkFailure(); - Assert.Equal(Period, Scheduler.NextInterval); - } - - [Fact] - public void WhenTwoFailuresHaveOccurredTheIntervalBacksOff() - { - Scheduler.MarkFailure(); - Scheduler.MarkFailure(); - Assert.Equal(TimeSpan.FromSeconds(10), Scheduler.NextInterval); - } - - [Fact] - public void WhenABatchSucceedsTheStatusResets() - { - Scheduler.MarkFailure(); - Scheduler.MarkFailure(); - Scheduler.MarkSuccess(); - Assert.Equal(Period, Scheduler.NextInterval); - } - - [Fact] - public void WhenThreeFailuresHaveOccurredTheIntervalBacksOff() - { - Scheduler.MarkFailure(); - Scheduler.MarkFailure(); - Scheduler.MarkFailure(); - Assert.Equal(TimeSpan.FromSeconds(20), Scheduler.NextInterval); - Assert.False(Scheduler.ShouldDropBatch); - } - - [Fact] - public void WhenEightFailuresHaveOccurredTheIntervalBacksOffAndBatchIsDropped() - { - for (var i = 0; i < 8; ++i) - { - Assert.False(Scheduler.ShouldDropBatch); - Scheduler.MarkFailure(); - } - Assert.Equal(TimeSpan.FromMinutes(10), Scheduler.NextInterval); - Assert.True(Scheduler.ShouldDropBatch); - Assert.False(Scheduler.ShouldDropQueue); - } - - [Fact] - public void WhenTenFailuresHaveOccurredTheQueueIsDropped() - { - for (var i = 0; i < 10; ++i) - { - Assert.False(Scheduler.ShouldDropQueue); - Scheduler.MarkFailure(); - } - Assert.True(Scheduler.ShouldDropQueue); - } - - [Fact] - public void AtTheDefaultIntervalRetriesForTenMinutesBeforeDroppingBatch() - { - var cumulative = TimeSpan.Zero; - do - { - Scheduler.MarkFailure(); - - if (!Scheduler.ShouldDropBatch) - cumulative += Scheduler.NextInterval; - } while (!Scheduler.ShouldDropBatch); - - Assert.False(Scheduler.ShouldDropQueue); - Assert.Equal(TimeSpan.Parse("00:10:32", CultureInfo.InvariantCulture), cumulative); - } - - [Fact] - public void AtTheDefaultIntervalRetriesForThirtyMinutesBeforeDroppingQueue() - { - var cumulative = TimeSpan.Zero; - do - { - Scheduler.MarkFailure(); - - if (!Scheduler.ShouldDropQueue) - cumulative += Scheduler.NextInterval; - } while (!Scheduler.ShouldDropQueue); - - Assert.Equal(TimeSpan.Parse("00:30:32", CultureInfo.InvariantCulture), cumulative); - } -}