Skip to content

Commit

Permalink
Consumer API (Synchronization): save encrypted payload in the databas…
Browse files Browse the repository at this point in the history
…e instead of in the blob storage (#450)

* chore: generate db migration to add encrypted payload column to the datawallet modification table

* feat: change EncryptedPayload from ignored to property persisted in the db

* chore: update compiled models

* feat: add LoadEncryptedPayload method

* feat: add SynchronizationDbContextSeeder

* feat: add SynchronizationDbContextSeeder class

* feat: remove saving encrypted content to blob storage

* chore: remove blob storage configuration

* chore: update check if blob reference is present

* chore: update handler instantiation

* chore: fix formatting

* chore: put back blob storage configuration for testing purposes

* feat: remove fetching payloads from blob storage

* chore: remove unnecessary method arguments

* chore: update handler instantiation

* chore: remove unused directives

* chore: remove empty lines

* chore: remove blob storage configuration

* chore: remove BlobStorageOptions default values and make them required

* feat: add blob storage configuration only if the blob storage options are present
chore: remove BlobStorageOptions default value

* chore: remove the [Required] annotation from BlobStorageConfiguration

* feat: only add blob storage options if the configuration is present

* ci: trigger pipelines

* chore: revert BlobStorageOptions properties to non required due to failing tests

* chore: fix copy/paste error

* chore: fix copy/paste error

* chore: remove unnecessary properties

* chore: remove unnecessary line and remove the unnecessary async keywords

* chore: add pagination

* chore: update code so that all items are migrated and set page size to 500

* fix: handle modifications with missing payload which is also missing from the blob storage

* fix: track number of entries whose payload is missing from the storage in order to skip them
chore: increase page size to 500

* refactor: performance optimizations

* refactor: use string.IsNullOrWhiteSpace

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Timo Notheisen <[email protected]>
  • Loading branch information
3 people authored Dec 20, 2023
1 parent 9224fc0 commit 5de8243
Show file tree
Hide file tree
Showing 27 changed files with 3,055 additions and 876 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void AddBlobStorage(this IServiceCollection services, BlobStorageO

public class BlobStorageOptions
{
public string ConnectionInfo { get; set; } = string.Empty;
public string Container { get; set; } = string.Empty;
public string CloudProvider { get; set; } = string.Empty;
public string ConnectionInfo { get; set; }
public string Container { get; set; }
public string CloudProvider { get; set; }
}
4 changes: 3 additions & 1 deletion ConsumerApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ static WebApplication CreateApp(string[] args)
.SeedDbContext<QuotasDbContext, QuotasDbContextSeeder>()
.SeedDbContext<RelationshipsDbContext, RelationshipsDbContextSeeder>()
.SeedDbContext<TokensDbContext, TokensDbContextSeeder>()
.SeedDbContext<MessagesDbContext, MessagesDbContextSeeder>();
.SeedDbContext<MessagesDbContext, MessagesDbContextSeeder>()
.SeedDbContext<SynchronizationDbContext, SynchronizationDbContextSeeder>();

foreach (var module in app.Services.GetRequiredService<IEnumerable<AbstractModule>>())
{
Expand All @@ -133,6 +134,7 @@ static void ConfigureServices(IServiceCollection services, IConfiguration config
services.AddTransient<RelationshipsDbContextSeeder>();
services.AddTransient<TokensDbContextSeeder>();
services.AddTransient<MessagesDbContextSeeder>();
services.AddTransient<SynchronizationDbContextSeeder>();

services
.AddModule<ChallengesModule>(configuration)
Expand Down
139 changes: 139 additions & 0 deletions ConsumerApi/SynchronizationDbContextSeeder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System.Text.Json;
using Backbone.BuildingBlocks.API.Extensions;
using Backbone.BuildingBlocks.Application.Abstractions.Exceptions;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.Persistence.BlobStorage;
using Backbone.Modules.Synchronization.Application.Infrastructure;
using Backbone.Modules.Synchronization.Domain.Entities;
using Backbone.Modules.Synchronization.Infrastructure.Persistence.Database;
using Backbone.Tooling.Extensions;
using Microsoft.Extensions.Options;

namespace Backbone.ConsumerApi;

public class SynchronizationDbContextSeeder : IDbSeeder<SynchronizationDbContext>
{
private const int PAGE_SIZE = 500;

private readonly IBlobStorage? _blobStorage;
private readonly string? _blobRootFolder;
private readonly ILogger<SynchronizationDbContextSeeder> _logger;
private int _numberOfModificationsWithoutPayload;

public SynchronizationDbContextSeeder(IServiceProvider serviceProvider, ILogger<SynchronizationDbContextSeeder> logger)
{
_blobStorage = serviceProvider.GetService<IBlobStorage>();
_blobRootFolder = serviceProvider.GetService<IOptions<BlobOptions>>()!.Value.RootFolder;
_logger = logger;
}

public async Task SeedAsync(SynchronizationDbContext context)
{
await FillEncryptedPayloadColumnFromBlobStorage(context);
}

private async Task FillEncryptedPayloadColumnFromBlobStorage(SynchronizationDbContext context)
{
// _blobRootFolder is null when blob storage configuration is not provided, meaning the content of database entries should not be loaded from blob storage
if (_blobRootFolder == null)
return;

var hasMorePages = true;


while (hasMorePages)
{
var modificationsWithoutEncryptedPayload = context.DatawalletModifications
.Where(m => m.EncryptedPayload == null)
.OrderBy(m => m.Index)
.Skip(_numberOfModificationsWithoutPayload)
.Take(PAGE_SIZE)
.ToList();

var blobReferences = modificationsWithoutEncryptedPayload
.Where(m => !string.IsNullOrWhiteSpace(m.BlobReference))
.Select(m => m.BlobReference)
.ToList();

var blobsFromReferences = await FindBlobsByReferences(blobReferences);

await FillPayloads(context, modificationsWithoutEncryptedPayload, blobsFromReferences);

await context.SaveChangesAsync();

hasMorePages = modificationsWithoutEncryptedPayload.Count != 0;
}
}

private async Task<Dictionary<string, Dictionary<long, byte[]>>> FindBlobsByReferences(IEnumerable<string> blobReferences)
{
var blobs = await Task.WhenAll(blobReferences.Select(async r =>
{
try
{
var blobFromReference = await _blobStorage!.FindAsync(_blobRootFolder!, r);
return new KeyValuePair<string, byte[]?>(r, blobFromReference);
}
catch (NotFoundException)
{
return new KeyValuePair<string, byte[]?>(r, null);
}
}));

var deserialized = blobs
.Where(b => b.Value != null)
.Select(b => new KeyValuePair<string, Dictionary<long, byte[]>>(b.Key, JsonSerializer.Deserialize<Dictionary<long, byte[]>>(b.Value!)!))
.ToDictionary(b => b.Key, b => b.Value);

return deserialized;
}

private async Task FillPayloads(SynchronizationDbContext context, List<DatawalletModification> modifications, Dictionary<string, Dictionary<long, byte[]>> blobsFromReferences)
{
await Task.WhenAll(modifications.Select(async m => await FillPayload(context, m, blobsFromReferences)));
}

private async Task FillPayload(SynchronizationDbContext context, DatawalletModification modification, Dictionary<string, Dictionary<long, byte[]>> blobsFromReferences)
{
var hadContent = await FillPayload(modification, blobsFromReferences);

if (hadContent)
context.DatawalletModifications.Update(modification);
else
Interlocked.Increment(ref _numberOfModificationsWithoutPayload);
}

private async Task<bool> FillPayload(DatawalletModification modification, Dictionary<string, Dictionary<long, byte[]>> blobsFromReferences)
{
if (string.IsNullOrWhiteSpace(modification.BlobReference))
{
// fill via blob id
try
{
var blobContent = await _blobStorage!.FindAsync(_blobRootFolder!, modification.Id);
modification.LoadEncryptedPayload(blobContent);
}
catch (NotFoundException)
{
_logger.LogInformation("Blob with Id '{id}' not found. As the encrypted payload of a datawallet modification is not required, this is probably not an error.", modification.Id);
return false;
}
}

// fill via blob reference
if (!blobsFromReferences.TryGetValue(modification.BlobReference, out var blob))
{
_logger.LogError("Blob with reference '{blobReference}' not found.", modification.BlobReference);
return false;
}

if (!blob.TryGetValue(modification.Index, out var payload))
{
_logger.LogInformation("Blob with Id '{id}' not found in blob reference. As the encrypted payload of a datawallet modification is not required, this is probably not an error.", modification.Id);
return false;
}

modification.LoadEncryptedPayload(payload);

return true;
}
}
17 changes: 6 additions & 11 deletions ConsumerApi/appsettings.override.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,13 @@
}
},
"Synchronization": {
"Infrastructure": {
"SqlDatabase": {
"Provider": "Postgres",
"ConnectionString": "User ID=synchronization;Password=Passw0rd;Server=postgres;Port=5432;Database=enmeshed;" // postgres
// "ConnectionString": "Server=ms-sql-server;Database=enmeshed;User Id=synchronization;Password=Passw0rd;TrustServerCertificate=True" // sqlserver
},
"BlobStorage": {
"CloudProvider": "Azure",
"ConnectionInfo": "",
"ContainerName": ""
}
"Infrastructure": {
"SqlDatabase": {
"Provider": "Postgres",
"ConnectionString": "User ID=synchronization;Password=Passw0rd;Server=postgres;Port=5432;Database=enmeshed;" // postgres
// "ConnectionString": "Server=ms-sql-server;Database=enmeshed;User Id=synchronization;Password=Passw0rd;TrustServerCertificate=True" // sqlserver
}
}
},
"Tokens": {
"Infrastructure": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System.Text.Json;
using AutoMapper;
using AutoMapper;
using Backbone.BuildingBlocks.Application.Abstractions.Exceptions;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.Persistence.BlobStorage;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.UserContext;
using Backbone.BuildingBlocks.Application.Extensions;
using Backbone.DevelopmentKit.Identity.ValueObjects;
Expand All @@ -12,7 +10,6 @@
using Backbone.Modules.Synchronization.Domain.Entities;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using static Backbone.Modules.Synchronization.Domain.Entities.Datawallet;

namespace Backbone.Modules.Synchronization.Application.Datawallets.Commands.PushDatawalletModifications;
Expand All @@ -21,8 +18,6 @@ public class Handler : IRequestHandler<PushDatawalletModificationsCommand, PushD
{
private readonly DeviceId _activeDevice;
private readonly IdentityAddress _activeIdentity;
private readonly IBlobStorage _blobStorage;
private readonly BlobOptions _blobOptions;
private readonly ISynchronizationDbContext _dbContext;
private readonly IEventBus _eventBus;
private readonly IMapper _mapper;
Expand All @@ -34,12 +29,10 @@ public class Handler : IRequestHandler<PushDatawalletModificationsCommand, PushD
private DatawalletModification[] _modifications;
private PushDatawalletModificationsResponse _response;

public Handler(ISynchronizationDbContext dbContext, IUserContext userContext, IMapper mapper, IBlobStorage blobStorage, IOptions<BlobOptions> blobOptions, IEventBus eventBus)
public Handler(ISynchronizationDbContext dbContext, IUserContext userContext, IMapper mapper, IEventBus eventBus)
{
_dbContext = dbContext;
_mapper = mapper;
_blobStorage = blobStorage;
_blobOptions = blobOptions.Value;
_eventBus = eventBus;
_activeIdentity = userContext.GetAddress();
_activeDevice = userContext.GetDeviceId();
Expand Down Expand Up @@ -133,16 +126,6 @@ private async Task Save(DatawalletModification[] modifications, string blobName)
{
await _dbContext.Set<DatawalletModification>().AddRangeAsync(modifications, _cancellationToken);

var payloads = modifications
.Where(newModification => newModification.EncryptedPayload != null)
.ToDictionary(m => m.Index, m => m.EncryptedPayload);

var blobContent = JsonSerializer.SerializeToUtf8Bytes(payloads);

_blobStorage.Add(_blobOptions.RootFolder, blobName, blobContent);

await _blobStorage.SaveAsync();

try
{
await _dbContext.SaveChangesAsync(_cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
using System.Text.Json;
using AutoMapper;
using AutoMapper;
using Backbone.BuildingBlocks.Application.Abstractions.Exceptions;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.Persistence.BlobStorage;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.UserContext;
using Backbone.DevelopmentKit.Identity.ValueObjects;
using Backbone.Modules.Synchronization.Application.Datawallets.DTOs;
using Backbone.Modules.Synchronization.Application.Infrastructure;
using Backbone.Modules.Synchronization.Domain.Entities;
using Backbone.Tooling.Extensions;
using MediatR;
using Microsoft.Extensions.Options;

namespace Backbone.Modules.Synchronization.Application.Datawallets.Queries.GetModifications;

public class Handler : IRequestHandler<GetModificationsQuery, GetModificationsResponse>
{
private readonly IdentityAddress _activeIdentity;
private readonly IBlobStorage _blobStorage;
private readonly ISynchronizationDbContext _dbContext;
private readonly IMapper _mapper;
private readonly BlobOptions _blobOptions;

public Handler(ISynchronizationDbContext dbContext, IMapper mapper, IUserContext userContext, IBlobStorage blobStorage, IOptions<BlobOptions> blobOptions)
public Handler(ISynchronizationDbContext dbContext, IMapper mapper, IUserContext userContext)
{
_dbContext = dbContext;
_mapper = mapper;
_blobStorage = blobStorage;
_blobOptions = blobOptions.Value;
_activeIdentity = userContext.GetAddress();
}

Expand All @@ -41,59 +33,23 @@ public async Task<GetModificationsResponse> Handle(GetModificationsQuery request

var dbPaginationResult = await _dbContext.GetDatawalletModifications(_activeIdentity, request.LocalIndex, request.PaginationFilter, cancellationToken);

var dtos = await MapToDtos(dbPaginationResult.ItemsOnPage);
var dtos = MapToDtos(dbPaginationResult.ItemsOnPage);

return new GetModificationsResponse(dtos, request.PaginationFilter, dbPaginationResult.TotalNumberOfItems);
}

private async Task<List<DatawalletModificationDTO>> MapToDtos(IEnumerable<DatawalletModification> modifications)
private List<DatawalletModificationDTO> MapToDtos(IEnumerable<DatawalletModification> modifications)
{
var datawalletModifications = modifications as DatawalletModification[] ?? modifications.ToArray();

var blobReferences = datawalletModifications.Where(m => !m.BlobReference.IsNullOrEmpty()).Select(m => m.BlobReference).Distinct();
var blobs = await Task.WhenAll(blobReferences.Select(r =>
{
try
{
return _blobStorage.FindAsync(_blobOptions.RootFolder, r);
}
catch (NotFoundException)
{
throw new Exception($"Blob with reference '{r}' not found.");
}
}));
var mappingTasks = datawalletModifications.Select(MapToDto);

var payloads = blobs
.Select(b => JsonSerializer.Deserialize<Dictionary<long, byte[]>>(b))
.SelectMany(b => b)
.ToDictionary(b => b.Key, b => b.Value);

var mappingTasks = datawalletModifications.Select(m => MapToDto(m, payloads));

return (await Task.WhenAll(mappingTasks)).ToList();
return mappingTasks.ToList();
}

private async Task<DatawalletModificationDTO> MapToDto(DatawalletModification modification, Dictionary<long, byte[]> payloads)
private DatawalletModificationDTO MapToDto(DatawalletModification modification)
{
var dto = _mapper.Map<DatawalletModificationDTO>(modification);

if (modification.BlobReference.IsNullOrEmpty())
{
try
{
dto.EncryptedPayload = await _blobStorage.FindAsync(_blobOptions.RootFolder, modification.Id);
}
catch (NotFoundException)
{
// blob not found means that there is no payload for this modification
}
}
else
{
payloads.TryGetValue(modification.Index, out var payload);
dto.EncryptedPayload = payload;
}

return dto;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ public class InfrastructureConfiguration
[Required]
public SqlDatabaseConfiguration SqlDatabase { get; set; } = new();

[Required]
public BlobStorageConfiguration BlobStorage { get; set; } = new();
public BlobStorageConfiguration? BlobStorage { get; set; }

public class BlobStorageConfiguration
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati
options.DbOptions.Provider = parsedConfiguration.Infrastructure.SqlDatabase.Provider;
options.DbOptions.DbConnectionString = parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString;

options.BlobStorageOptions.CloudProvider = parsedConfiguration.Infrastructure.BlobStorage.CloudProvider;
options.BlobStorageOptions.ConnectionInfo = parsedConfiguration.Infrastructure.BlobStorage.ConnectionInfo;
options.BlobStorageOptions.Container =
parsedConfiguration.Infrastructure.BlobStorage.ContainerName.IsNullOrEmpty()
? "synchronization"
: parsedConfiguration.Infrastructure.BlobStorage.ContainerName;
if (parsedConfiguration.Infrastructure.BlobStorage != null)
{
options.BlobStorageOptions = new()
{
CloudProvider = parsedConfiguration.Infrastructure.BlobStorage.CloudProvider,
ConnectionInfo = parsedConfiguration.Infrastructure.BlobStorage.ConnectionInfo,
Container = parsedConfiguration.Infrastructure.BlobStorage.ContainerName.IsNullOrEmpty()
? "synchronization"
: parsedConfiguration.Infrastructure.BlobStorage.ContainerName
};
}
});

services.AddApplication();
Expand Down
Loading

0 comments on commit 5de8243

Please sign in to comment.