Skip to content

Commit

Permalink
Supported decimal data type in IDataStore, and fixed error messages i…
Browse files Browse the repository at this point in the history
…n all persistence adapters
  • Loading branch information
jezzsantos committed Aug 18, 2024
1 parent 3042b31 commit 29f4bcd
Show file tree
Hide file tree
Showing 26 changed files with 354 additions and 284 deletions.
3 changes: 2 additions & 1 deletion src/Domain.Shared.UnitTests/EmailAddressSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void WhenCreate_ThenCreated()
[Fact]
public void WhenEqualAndCaseVariantEmail_ThenReturnsTrue()
{
var result = EmailAddress.Create("[email protected]").Equals(EmailAddress.Create("[email protected]"));
var result = EmailAddress.Create("[email protected]").Value
.Equals(EmailAddress.Create("[email protected]").Value);

result.Should().BeTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Common;
using Common.Configuration;
using Common.Extensions;
using Common.Recording;
using Infrastructure.Persistence.AWS.Extensions;
using Infrastructure.Persistence.Interfaces;
using JetBrains.Annotations;
Expand Down Expand Up @@ -161,7 +160,8 @@ public async Task<Result<Error>> DestroyAllAsync(string topicName, CancellationT
}
catch (Exception ex)
{
_recorder.Crash(null, CrashLevel.NonCritical, ex, "Failed to delete topic: {Topic}", topicArn);
_recorder.TraceError(null,
ex, "Failed to delete topic: {Topic}", topicArn);
return ex.ToError(ErrorCode.Unexpected);
}

Expand Down Expand Up @@ -214,8 +214,7 @@ public async Task<Result<bool, Error>> ReceiveSingleAsync(string topicName, stri
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to handle last message from topic: {Topic} for subscription: {Subscription}",
topicName, subscriptionName);
return ex.ToError(ErrorCode.Unexpected);
Expand Down Expand Up @@ -243,8 +242,7 @@ public async Task<Result<Error>> SendAsync(string topicName, string message, Can
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to send message: {Message} to the topic: {Topic}", message, topicName);
return ex.ToError(ErrorCode.Unexpected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Common;
using Common.Configuration;
using Common.Extensions;
using Common.Recording;
using Infrastructure.Persistence.AWS.Extensions;
using Infrastructure.Persistence.Interfaces;
using Task = System.Threading.Tasks.Task;
Expand Down Expand Up @@ -77,8 +76,7 @@ public async Task<Result<long, Error>> CountAsync(string queueName, Cancellation
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to retrieve attributes from queue: {Queue}", queueUrl);
return ex.ToError(ErrorCode.Unexpected);
}
Expand All @@ -103,8 +101,7 @@ public async Task<Result<Error>> DestroyAllAsync(string queueName, CancellationT
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to delete queue: {Queue}", queueUrl);
return ex.ToError(ErrorCode.Unexpected);
}
Expand Down Expand Up @@ -152,8 +149,7 @@ public async Task<Result<bool, Error>> PopSingleAsync(string queueName,
{
await ReturnMessageToQueueForNextPopAsync(queueUrl, queueMessage, cancellationToken);

_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to handle last message: {MessageId} from queue: {Queue}", queueMessage.MessageId,
queueUrl);
return ex.ToError(ErrorCode.Unexpected);
Expand Down Expand Up @@ -190,7 +186,7 @@ public async Task<Result<Error>> PushAsync(string queueName, string message, Can
}
catch (Exception ex)
{
_recorder.Crash(null, CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to push message: {Message} to queue: {Queue}", message, queueUrl);
return ex.ToError(ErrorCode.Unexpected);
}
Expand Down Expand Up @@ -246,8 +242,7 @@ public async Task<Result<QueueIdentifiers, Error>> CreateQueueAsync(string queue
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to create queue: {Queue}", sanitizedQueueName);
return ex.ToError(ErrorCode.Unexpected);
}
Expand Down Expand Up @@ -324,8 +319,7 @@ private async Task<Result<QueueIdentifiers, Error>> CreateDeadLetterQueueAsync(s
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to create dead-letter queue: {Queue}", sanitizedQueueName);
return ex.ToError(ErrorCode.Unexpected);
}
Expand All @@ -351,7 +345,7 @@ private async Task<Result<Optional<Message>, Error>> GetNextMessageAsync(string
}
catch (Exception ex)
{
_recorder.Crash(null, CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to POP last message from queue: {Queue}", queueUrl);
return Error.EntityNotFound();
}
Expand All @@ -372,8 +366,8 @@ private async Task MarkMessageAsHandledAsync(string queueUrl, Message message,
}
catch (Exception ex)
{
_recorder.Crash(null, CrashLevel.NonCritical, ex,
"Failed to remove last message: {MessageId} from queue: {Queue}", message, queueUrl);
_recorder.TraceError(null,
ex, "Failed to remove last message: {MessageId} from queue: {Queue}", message, queueUrl);
}
}

Expand All @@ -386,8 +380,7 @@ private async Task ReturnMessageToQueueForNextPopAsync(string queueUrl, Message
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to return the current message: {MessageId} to the queue: {Queue}", message.MessageId,
queueUrl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Common;
using Common.Configuration;
using Common.Extensions;
using Common.Recording;
using Infrastructure.Persistence.Azure.Extensions;
using Infrastructure.Persistence.Interfaces;
using JetBrains.Annotations;
Expand Down Expand Up @@ -118,8 +117,7 @@ public async Task<Result<bool, Error>> ReceiveSingleAsync(string topicName, stri
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to receive message from topic: {Topic} for subscription: {Subscription}",
topicName, subscriptionName);
return ex.ToError(ErrorCode.Unexpected);
Expand All @@ -141,8 +139,7 @@ public async Task<Result<bool, Error>> ReceiveSingleAsync(string topicName, stri
{
await receiver.AbandonMessageAsync(topicMessage, null, cancellationToken);

_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to handle last message: {MessageId} from topic: {Topic} for subscription: {Subscription}",
topicMessage.MessageId, topicName, subscriptionName);
return ex.ToError(ErrorCode.Unexpected);
Expand Down Expand Up @@ -173,8 +170,7 @@ public async Task<Result<Error>> SendAsync(string topicName, string message, Can
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to send message: {Message} to the topic: {Topic}", message, topicName);
return ex.ToError(ErrorCode.Unexpected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Common;
using Common.Configuration;
using Common.Extensions;
using Common.Recording;
using Infrastructure.Persistence.Azure.Extensions;
using Infrastructure.Persistence.Interfaces;
using JetBrains.Annotations;
Expand Down Expand Up @@ -111,8 +110,7 @@ public async Task<Result<bool, Error>> PopSingleAsync(string queueName,
{
await ReturnMessageToQueueForNextPopAsync(queue, queueMessage, cancellationToken);

_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to handle last message: {MessageId} from queue: {Queue}", queueMessage.MessageId,
queue.Name);
return ex.ToError(ErrorCode.Unexpected);
Expand Down Expand Up @@ -140,14 +138,14 @@ public async Task<Result<Error>> PushAsync(string queueName, string message, Can
}
catch (RequestFailedException ex)
{
_recorder.Crash(null, CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to push message: {Message} to queue: {Queue}. Error was: {ErrorCode}", message,
queue.Name, ex.ErrorCode ?? "none");
return ex.ToError(ErrorCode.Unexpected);
}
catch (Exception ex)
{
_recorder.Crash(null, CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to push message: {Message} to queue: {Queue}", message, queue.Name);
return ex.ToError(ErrorCode.Unexpected);
}
Expand All @@ -171,15 +169,14 @@ private async Task<Result<Optional<QueueMessage>, Error>> GetNextMessageAsync(Qu
}
catch (RequestFailedException ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to POP last message from queue: {Queue}. Error was: {ErrorCode}", queue.Name,
ex.ErrorCode ?? "none");
return Error.EntityNotFound();
}
catch (Exception ex)
{
_recorder.Crash(null, CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to POP last message from queue: {Queue}", queue.Name);
return Error.EntityNotFound();
}
Expand All @@ -194,16 +191,14 @@ private async Task MarkMessageAsHandledAsync(QueueClient queue, QueueMessage mes
}
catch (RequestFailedException ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to remove last message: {MessageId} from queue: {Queue}. Error was: {ErrorCode}",
message.MessageId,
queue.Name, ex.ErrorCode ?? "none");
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to remove last message: {MessageId} from queue: {Queue}", message.MessageId,
queue.Name);
}
Expand All @@ -219,15 +214,13 @@ await queue.UpdateMessageAsync(message.MessageId, message.PopReceipt, visibility
}
catch (RequestFailedException ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to return the current message: {MessageId} to the queue: {Queue}. Error was: {ErrorCode}",
message.MessageId, queue.Name, ex.ErrorCode ?? "none");
}
catch (Exception ex)
{
_recorder.Crash(null,
CrashLevel.NonCritical,
_recorder.TraceError(null,
ex, "Failed to return the current message: {MessageId} to the queue: {Queue}", message.MessageId,
queue.Name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class InProcessInMemStore : IBlobStore
partial class InProcessInMemStore : IBlobStore
{
private readonly Dictionary<string, Dictionary<string, HydrationProperties>> _blobs = new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class InProcessInMemStore : IDataStore
partial class InProcessInMemStore : IDataStore
{
private readonly Dictionary<string, Dictionary<string, HydrationProperties>> _documents = new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class InProcessInMemStore : IEventStore
partial class InProcessInMemStore : IEventStore
{
private readonly Dictionary<string, Dictionary<string, HydrationProperties>> _events = new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class InProcessInMemStore : IMessageBusStore, IMessageBusStoreTrigger
partial class InProcessInMemStore : IMessageBusStore, IMessageBusStoreTrigger
{
private readonly Dictionary<string, SubscriptionPosition> _subscriptions = new();
private readonly Dictionary<string, Dictionary<long, HydrationProperties>> _topics = new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class InProcessInMemStore : IQueueStore, IQueueStoreTrigger
partial class InProcessInMemStore : IQueueStore, IQueueStoreTrigger
{
private readonly Dictionary<string, Dictionary<string, HydrationProperties>> _queues = new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Infrastructure.Persistence.Common.ApplicationServices;

/// <summary>
/// Provides a combined store that persists all data to memory, in the current process
/// Note: Should NEVER be used in production systems, and useful only in the current process.
/// </summary>
[ExcludeFromCodeCoverage]
public sealed partial class InProcessInMemStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class LocalMachineJsonFileStore : IBlobStore
partial class LocalMachineJsonFileStore : IBlobStore
{
private const string BlobStoreContainerName = "Blobs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class LocalMachineJsonFileStore : IDataStore
partial class LocalMachineJsonFileStore : IDataStore
{
public const string NullToken = @"null";
private const string DocumentStoreContainerName = "Documents";
Expand All @@ -34,8 +34,6 @@ public Task<Result<long, Error>> CountAsync(string containerName, CancellationTo
containerName.ThrowIfNotValuedParameter(nameof(containerName),
Resources.AnyStore_MissingContainerName);

containerName.ThrowIfNotValuedParameter(nameof(containerName));

var container = EnsureContainer(GetDocumentStoreContainerPath(containerName));

return Task.FromResult<Result<long, Error>>(container.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

namespace Infrastructure.Persistence.Common.ApplicationServices;

public partial class LocalMachineJsonFileStore : IEventStore
partial class LocalMachineJsonFileStore : IEventStore
{
private const string EventStoreContainerName = "Events";
private static string? _cachedContainerName;

public async Task<Result<string, Error>> AddEventsAsync(string entityName, string entityId,
List<EventSourcedChangeEvent> events, CancellationToken cancellationToken)
Expand Down Expand Up @@ -80,10 +80,10 @@ private static string GetEventStoreContainerPath(string containerName, string? e
{
if (entityId.HasValue())
{
return $"{EventStoreContainerName}/{containerName}/{entityId}";
return $"{DetermineEventStoreContainerName()}/{containerName}/{entityId}";
}

return $"{EventStoreContainerName}/{containerName}";
return $"{DetermineEventStoreContainerName()}/{containerName}";
}

private async Task<Optional<EventStoreEntity>> GetLatestEventAsync(string entityName, string entityId,
Expand Down Expand Up @@ -133,5 +133,15 @@ private static string GetEventStreamName(string entityName, string entityId)
{
return $"{entityName}_{entityId}";
}

private static string DetermineEventStoreContainerName()
{
if (_cachedContainerName.HasNoValue())
{
_cachedContainerName = typeof(EventStoreEntity).GetEntityNameSafe();
}

return _cachedContainerName;
}
}
#endif
Loading

0 comments on commit 29f4bcd

Please sign in to comment.