Skip to content

Commit

Permalink
Version 1.0.9 (#16)
Browse files Browse the repository at this point in the history
* Handle EventSendException - enqueue individually as either sent or unsent.

* Correct changelog spelling/formatting issue.

* Update version number.
  • Loading branch information
chullybun authored May 12, 2022
1 parent 36238e4 commit 0a8e237
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 59 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

Represents the **NuGet** versions.

## v1.0.9
- *Enhancement:* Updated the `EventOoutboxEnqueueBase` to handle new `EventSendException` and enqueue each individual message as either sent or unsent within the outbox.
- *Fixed:* Updated to `CoreEx` version `1.0.5`.

## v1.0.8
- *Fixed:* Updated to `CoreEx` version `1.0.3`.

## v1.0.7
- *Fixed:* Previous version v1.0.6 fix was incorrect; Data import order should not have been reversed. This preivous change has been corrected.
- *Fixed:* Previous version `1.0.6` fix was incorrect; Data import order should not have been reversed. This previous change has been corrected.

## v1.0.6
- *Fixed:* [Issue 12](https://github.com/Avanade/DbEx/issues/12) fixed. Data import order has been reversed.
Expand Down
12 changes: 6 additions & 6 deletions src/DbEx/Console/MigratorConsoleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace DbEx.Console
/// </summary>
/// <remarks>The standard console command-line arguments/options can be controlled via the constructor using the <see cref="SupportedOptions"/> flags. Additional capabilities can be added by inherting and overridding the
/// <see cref="OnBeforeExecute(CommandLineApplication)"/>, <see cref="OnValidation(ValidationContext)"/> and <see cref="OnMigrateAsync"/>. Changes to the console output can be achieved by overridding
/// <see cref="OnWriteMasthead"/>, <see cref="OnWriteHeader"/>, <see cref="OnWriteArgs(MigratorConsoleArgs)"/> and <see cref="OnWriteFooter(long)"/>.
/// <see cref="OnWriteMasthead"/>, <see cref="OnWriteHeader"/>, <see cref="OnWriteArgs(MigratorConsoleArgs)"/> and <see cref="OnWriteFooter(double)"/>.
/// <para>The underlying command line parsing is provided by <see href="https://natemcmaster.github.io/CommandLineUtils/"/>.</para></remarks>
public abstract class MigratorConsoleBase
{
Expand Down Expand Up @@ -58,7 +58,7 @@ public abstract class MigratorConsoleBase
protected ILogger? Logger => Args.Logger;

/// <summary>
/// Indicates whether to bypass standard execution of <see cref="OnWriteMasthead"/>, <see cref="OnWriteHeader"/>, <see cref="OnWriteArgs(MigratorConsoleArgs)"/> and <see cref="OnWriteFooter(long)"/>.
/// Indicates whether to bypass standard execution of <see cref="OnWriteMasthead"/>, <see cref="OnWriteHeader"/>, <see cref="OnWriteArgs(MigratorConsoleArgs)"/> and <see cref="OnWriteFooter(double)"/>.
/// </summary>
protected bool BypassOnWrites { get; set; }

Expand Down Expand Up @@ -273,7 +273,7 @@ private async Task<int> RunRunawayAsync() /* Method name inspired by: Slade - Ru
// Write footer and exit successfully.
sw.Stop();
if (!BypassOnWrites)
OnWriteFooter(sw.ElapsedMilliseconds);
OnWriteFooter(sw.Elapsed.TotalMilliseconds);

return 0;
}
Expand Down Expand Up @@ -345,11 +345,11 @@ public static void WriteStandardizedArgs(MigratorConsoleArgs args)
/// <summary>
/// Invoked to write the footer information to the <see cref="Logger"/>.
/// </summary>
/// <param name="elapsedMilliseconds">The elapsed execution time in milliseconds.</param>
protected virtual void OnWriteFooter(long elapsedMilliseconds)
/// <param name="totalMilliseconds">The elapsed execution time in milliseconds.</param>
protected virtual void OnWriteFooter(double totalMilliseconds)
{
Logger?.LogInformation("{Content}", string.Empty);
Logger?.LogInformation("{Content}", $"{AppName} Complete. [{elapsedMilliseconds}ms]");
Logger?.LogInformation("{Content}", $"{AppName} Complete. [{totalMilliseconds}ms]");
Logger?.LogInformation("{Content}", string.Empty);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/DbEx/DbEx.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<RootNamespace>DbEx</RootNamespace>
<Version>1.0.8</Version>
<Version>1.0.9</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>DbEx Developers</Authors>
<Company>Avanade</Company>
Expand Down Expand Up @@ -89,7 +89,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="CoreEx" Version="1.0.3" />
<PackageReference Include="CoreEx" Version="1.0.5" />
<PackageReference Include="dbup-sqlserver" Version="4.5.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="4.1.0" />
<PackageReference Include="OnRamp" Version="1.0.5" />
Expand Down
2 changes: 1 addition & 1 deletion src/DbEx/Migration/DatabaseMigratorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ protected async Task<bool> CommandExecuteAsync(string title, Func<Task<bool>> ac

sw.Stop();
Logger.LogInformation("{Content}", string.Empty);
Logger.LogInformation("{Content}", $"Complete. [{sw.ElapsedMilliseconds}ms{summary?.Invoke() ?? string.Empty}]");
Logger.LogInformation("{Content}", $"Complete. [{sw.Elapsed.TotalMilliseconds}ms{summary?.Invoke() ?? string.Empty}]");
return true;
}
catch (Exception ex)
Expand Down
12 changes: 6 additions & 6 deletions src/DbEx/Migration/SqlServer/SqlServerMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected override async Task<bool> DatabaseSchemaAsync(List<DatabaseMigrationSc
var sor = SqlServerObjectReader.Read(script.Name, sr, KnownSchemaObjectTypes, SchemaOrder.ToArray());
if (!sor.IsValid)
{
Logger.LogError($"SQL script '{script.Name}' is not valid: {sor.ErrorMessage}");
Logger.LogError("{Message}", $"SQL script '{script.Name}' is not valid: {sor.ErrorMessage}");
return false;
}

Expand Down Expand Up @@ -122,14 +122,14 @@ protected override async Task<bool> DatabaseDataAsync(IDatabase database, List<D

foreach (var table in dataTables)
{
Logger.LogInformation(string.Empty);
Logger.LogInformation($"---- Executing {table.Schema}.{table.Name} SQL:");
Logger.LogInformation("");
Logger.LogInformation("{Message}", $"---- Executing {table.Schema}.{table.Name} SQL:");

var sql = _codeGen.Generate(table);
Logger.LogInformation(sql);
Logger.LogInformation("{Message}", sql);

var rows = await database.SqlStatement(sql).ScalarAsync<int>().ConfigureAwait(false);
Logger.LogInformation($"Result: {rows} rows affected.");
Logger.LogInformation("{Message}", $"Result: {rows} rows affected.");
}

return true;
Expand All @@ -154,7 +154,7 @@ public override async Task<DatabaseUpgradeResult> ExecuteScriptsAsync(IEnumerabl
if (dur.Successful || dur.ErrorScript != null || i >= MaxRetries)
return dur;

Logger.LogWarning($" Possible transient error (will try again in 500ms): {dur.Error.Message}");
Logger.LogWarning("{Message}", $" Possible transient error (will try again in 500ms): {dur.Error.Message}");
await Task.Delay(500).ConfigureAwait(false);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/DbEx/SqlServer/EventOutboxDequeueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace DbEx.SqlServer
{
/// <summary>
/// Provides the base <see cref="EventSendData"/> <see cref="IDatabase">database</see> <i>outbox dequeue</i> and corresponding <see cref="IEventSender.SendAsync(EventSendData[])"/>.
/// Provides the base <see cref="EventSendData"/> <see cref="IDatabase">database</see> <i>outbox dequeue</i> and corresponding <see cref="IEventSender.SendAsync(IEnumerable{EventSendData}, CancellationToken)"/>.
/// </summary>
public abstract class EventOutboxDequeueBase : IDatabaseMapper<EventSendData>
{
Expand Down Expand Up @@ -105,13 +105,13 @@ public async Task<int> DequeueAndSendAsync(int maxDequeueSize = 50, string? part
return 0;
}

Logger.LogInformation("{EventCount} event(s) were dequeued. [Elapsed={Elapsed}ms]", events.Count(), sw.ElapsedMilliseconds);
Logger.LogInformation("{EventCount} event(s) were dequeued. [Elapsed={Elapsed}ms]", events.Count(), sw.Elapsed.TotalMilliseconds);

// Send the events.
sw = Stopwatch.StartNew();
await EventSender.SendAsync(events.ToArray()).ConfigureAwait(false);
await EventSender.SendAsync(events.ToArray(), cancellationToken).ConfigureAwait(false);
sw.Stop();
Logger.LogInformation("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Count(), EventSender.GetType().Name, sw.ElapsedMilliseconds);
Logger.LogInformation("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Count(), EventSender.GetType().Name, sw.Elapsed.TotalMilliseconds);

// Commit the transaction.
txn.Complete();
Expand Down
44 changes: 31 additions & 13 deletions src/DbEx/SqlServer/EventOutboxEnqueueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace DbEx.SqlServer
{
/// <summary>
/// Provides the base <see cref="EventSendData"/> <see cref="IDatabase">database</see> <i>outbox enqueue</i> <see cref="IEventSender.SendAsync(EventSendData[])"/>.
/// Provides the base <see cref="EventSendData"/> <see cref="IDatabase">database</see> <i>outbox enqueue</i> <see cref="IEventSender.SendAsync(IEnumerable{EventSendData}, CancellationToken)"/>.
/// </summary>
/// <remarks>By default the events are first sent/enqueued to the datatbase outbox, then a secondary out-of-process dequeues and sends. This can however introduce unwanted latency depending on the frequency in which the secondary process
/// performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary <see cref="IEventSender"/> can be specified using <see cref="SetPrimaryEventSender(IEventSender)"/>. This will
Expand Down Expand Up @@ -86,42 +87,58 @@ public void SetPrimaryEventSender(IEventSender eventSender)
/// <inheritdoc/>
/// </summary>
/// <param name="events"><inheritdoc/></param>
/// <param name="cancellationToken"><inheritdoc/></param>
/// <remarks>Executes the <see cref="EnqueueStoredProcedure"/> to <i>send / enqueue</i> the <paramref name="events"/> to the underlying database outbox tables.</remarks>
public async Task SendAsync(params EventSendData[] events)
public async Task SendAsync(IEnumerable<EventSendData> events, CancellationToken cancellationToken = default)
{
if (events == null || !events.Any())
return;

Stopwatch sw = Stopwatch.StartNew();
var setEventsAsDequeued = _eventSender != null;
if (setEventsAsDequeued)
var unsentEvents = new List<EventSendData>(events);

if (_eventSender != null)
{
try
{
await _eventSender!.SendAsync(events).ConfigureAwait(false);
await _eventSender!.SendAsync(events, cancellationToken).ConfigureAwait(false);
sw.Stop();
unsentEvents.Clear();
Logger.LogDebug("{EventCount} event(s) were sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox as sent. [Sender={Sender}, Elapsed={Elapsed}ms]",
events.Count(), _eventSender.GetType().Name, sw.Elapsed.TotalMilliseconds);
}
catch (EventSendException esex)
{
sw.Stop();
Logger.LogDebug("{EventCount} event(s) were sent successfully. [Sender={Sender}, Elapsed={Elapsed}ms]", events.Length, _eventSender.GetType().Name, sw.ElapsedMilliseconds);
Logger.LogWarning(esex, "{UnsentCount} of {EventCount} event(s) were unable to be sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox for an out-of-process send: {ErrorMessage} [Sender={Sender}, Elapsed={Elapsed}ms]",
esex.NotSentEvents?.Count() ?? unsentEvents.Count, events.Count(), esex.Message, _eventSender!.GetType().Name, sw.Elapsed.TotalMilliseconds);

if (esex.NotSentEvents != null)
unsentEvents = esex.NotSentEvents.ToList();
}
catch (Exception ex)
{
sw.Stop();
setEventsAsDequeued = false;
Logger.LogWarning(ex, "{EventCount} event(s) were unable to be sent successfully; will be forwarded (sent/enqueued) to the datatbase outbox for an out-of-process send: {ErrorMessage} [Sender={Sender}, Elapsed={Elapsed}ms]",
events.Length, ex.Message, _eventSender!.GetType().Name, sw.ElapsedMilliseconds);
events.Count(), ex.Message, _eventSender!.GetType().Name, sw.Elapsed.TotalMilliseconds);
}
}

sw = Stopwatch.StartNew();
await Database.StoredProcedure(EnqueueStoredProcedure, p => p.Param("@SetEventsAsDequeued", setEventsAsDequeued).AddTableValuedParameter("@EventList", CreateTableValuedParameter(events))).NonQueryAsync().ConfigureAwait(false);
await Database.StoredProcedure(EnqueueStoredProcedure, p => p.AddTableValuedParameter("@EventList", CreateTableValuedParameter(events, unsentEvents))).NonQueryAsync().ConfigureAwait(false);
sw.Stop();
Logger.LogDebug("{EventCount} event(s) were enqueued. [Sender={Sender}, SetEventsAsDequeued={SetAsDequeued}, Elapsed={Elapsed}ms]", events.Length, GetType().Name, setEventsAsDequeued, sw.ElapsedMilliseconds);
Logger.LogDebug("{EventCount} event(s) were enqueued; {SuccessCount} as sent, {ErrorCount} to be sent. [Sender={Sender}, Elapsed={Elapsed}ms]",
events.Count(), events.Count() - unsentEvents.Count, unsentEvents.Count, GetType().Name, sw.Elapsed.TotalMilliseconds);
}

/// <inheritdoc/>
public TableValuedParameter CreateTableValuedParameter(IEnumerable<EventSendData> list)
/// <summary>
/// Creates the TVP from the list.
/// </summary>
private TableValuedParameter CreateTableValuedParameter(IEnumerable<EventSendData> list, IEnumerable<EventSendData> unsentList)
{
var dt = new DataTable();
dt.Columns.Add(EventIdColumnName, typeof(string));
dt.Columns.Add("EventDequeued", typeof(bool));
dt.Columns.Add(nameof(EventSendData.Destination), typeof(string));
dt.Columns.Add(nameof(EventSendData.Subject), typeof(string));
dt.Columns.Add(nameof(EventSendData.Action), typeof(string));
Expand All @@ -139,7 +156,8 @@ public TableValuedParameter CreateTableValuedParameter(IEnumerable<EventSendData
foreach (var item in list)
{
var attributes = item.Attributes == null || item.Attributes.Count == 0 ? new BinaryData(Array.Empty<byte>()) : JsonSerializer.Default.SerializeToBinaryData(item.Attributes);
tvp.AddRow(item.Id, item.Destination ?? DefaultDestination ?? throw new InvalidOperationException($"The {nameof(DefaultDestination)} must have a non-null value."),
tvp.AddRow(item.Id, !unsentList.Contains(item),
item.Destination ?? DefaultDestination ?? throw new InvalidOperationException($"The {nameof(DefaultDestination)} must have a non-null value."),
item.Subject, item.Action, item.Type, item.Source, item.Timestamp, item.CorrelationId, item.TenantId,
item.PartitionKey ?? DefaultPartitionKey ?? throw new InvalidOperationException($"The {nameof(DefaultPartitionKey)} must have a non-null value."),
item.ETag, attributes.ToArray(), item.Data?.ToArray());
Expand Down
14 changes: 6 additions & 8 deletions src/DbEx/Templates/SqlServer/SpEventOutboxEnqueue_sql.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ BEGIN
@dequeuedDate DATETIME

SET @enqueuedDate = SYSUTCDATETIME()
IF (@SetEventsAsDequeued = 1)
BEGIN
SET @dequeuedDate = @enqueuedDate
END
SET @dequeuedDate = @enqueuedDate

-- Enqueued outbox resultant identifier.
DECLARE @enqueuedId TABLE([{{OutboxTable}}Id] BIGINT)

-- Cursor output variables.
DECLARE @eventId NVARCHAR(127),
@eventDequeued BIT,
@destination NVARCHAR(127),
@subject NVARCHAR(511),
@action NVARCHAR(255),
Expand All @@ -45,18 +43,18 @@ BEGIN

-- Declare, open, and fetch first event from cursor.
DECLARE c CURSOR FORWARD_ONLY
FOR SELECT [EventId], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList
FOR SELECT [EventId], [EventDequeued], [Destination], [Subject], [Action], [Type], [Source], [Timestamp], [CorrelationId], [TenantId], [PartitionKey], [ETag], [Attributes], [Data] FROM @EventList

OPEN c
FETCH NEXT FROM c INTO @eventId, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data
FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data

-- Iterate the event(s).
WHILE @@FETCH_STATUS = 0
BEGIN
-- Enqueue event into outbox
INSERT INTO [{{OutboxSchema}}].[{{OutboxTable}}] ([EnqueuedDate], [PartitionKey], [Destination], [DequeuedDate])
OUTPUT inserted.{{OutboxTable}}Id INTO @enqueuedId
VALUES (@enqueuedDate, @partitionKey, @destination, @dequeuedDate)
VALUES (@enqueuedDate, @partitionKey, @destination, CASE WHEN @eventDequeued IS NULL OR @eventDequeued = 0 THEN NULL ELSE @dequeuedDate END)

SELECT @{{camel OutboxTable}}Id = [{{OutboxTable}}Id] FROM @enqueuedId

Expand Down Expand Up @@ -95,7 +93,7 @@ BEGIN
)

-- Fetch the next event from the cursor.
FETCH NEXT FROM c INTO @eventId, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data
FETCH NEXT FROM c INTO @eventId, @eventDequeued, @destination, @subject, @action, @type, @source, @timestamp, @correlationId, @tenantId, @partitionKey, @etag, @attributes, @data
END

-- Close the cursor.
Expand Down
3 changes: 2 additions & 1 deletion src/DbEx/Templates/SqlServer/UdtEventOutbox_sql.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ CREATE TYPE [{{OutboxSchema}}].[udt{{OutboxTable}}List] AS TABLE (
*/

[EventId] NVARCHAR(127),
[Destination] NVARCHAR(127) NULL,
[EventDequeued] BIT NULL, -- Indicates whether to set the event as dequeued; i.e. already sent/processed.
[Destination] NVARCHAR(127) NULL, -- For example, queue/topic name.
[Subject] NVARCHAR(511) NULL,
[Action] NVARCHAR(255) NULL,
[Type] NVARCHAR(1023) NULL,
Expand Down
Loading

0 comments on commit 0a8e237

Please sign in to comment.