Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix commission rate data corruption #144

Merged
12 commits merged into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Threading;
using System.Threading.Tasks;
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Entities;
using Application.Aggregates.Contract.Jobs;
using Application.Aggregates.Contract.Observability;
using Application.Api.GraphQL.EfCore;
using Application.Observability;
using Application.Configurations;
using Application.Jobs;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
Expand All @@ -17,29 +19,29 @@ namespace Application.Aggregates.Contract.BackgroundServices;
/// </summary>
internal class ContractNodeImportBackgroundService : BackgroundService
{
private readonly IContractJobFinder _jobFinder;
private readonly IJobFinder<IContractJob> _jobFinder;
private readonly IDbContextFactory<GraphQlDbContext> _dbContextFactory;
private readonly IContractRepositoryFactory _repositoryFactory;
private readonly IContractNodeClient _client;
private readonly ContractHealthCheck _healthCheck;
private readonly JobHealthCheck _jobHealthCheck;
private readonly FeatureFlagOptions _featureFlags;
private readonly ContractAggregateOptions _options;
private readonly ILogger _logger;

public ContractNodeImportBackgroundService(
IContractJobFinder jobFinder,
IJobFinder<IContractJob> jobFinder,
IDbContextFactory<GraphQlDbContext> dbContextFactory,
IContractRepositoryFactory repositoryFactory,
IContractNodeClient client,
IOptions<ContractAggregateOptions> options,
ContractHealthCheck healthCheck,
JobHealthCheck jobHealthCheck,
IOptions<FeatureFlagOptions> featureFlagsOptions)
{
_jobFinder = jobFinder;
_dbContextFactory = dbContextFactory;
_repositoryFactory = repositoryFactory;
_client = client;
_healthCheck = healthCheck;
_jobHealthCheck = jobHealthCheck;
_featureFlags = featureFlagsOptions.Value;
_options = options.Value;
_logger = Log.ForContext<ContractNodeImportBackgroundService>();
Expand Down Expand Up @@ -67,7 +69,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
catch (Exception e)
{
_logger.Fatal(e, $"{nameof(ContractNodeImportBackgroundService)} stopped due to exception.");
_healthCheck.AddUnhealthyJobWithMessage(nameof(ContractNodeImportBackgroundService), "Stopped due to exception.");
_jobHealthCheck.AddUnhealthyJobWithMessage(nameof(ContractNodeImportBackgroundService), "Stopped due to exception.");
_logger.Fatal(e, $"{nameof(ContractNodeImportBackgroundService)} stopped due to exception.");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Application.Configurations;

namespace Application.Aggregates.Contract.Configurations;

public class ContractAggregateOptions
Expand All @@ -8,8 +10,8 @@ public class ContractAggregateOptions
/// Done as dictionary such that it can be changed from configurations. Key is unique identifier of job and
/// it defined within the jobs class.
/// </summary>
public IDictionary<string, ContractAggregateJobOptions> Jobs { get; set; } =
new Dictionary<string, ContractAggregateJobOptions>();
public IDictionary<string, JobOptions> Jobs { get; set; } =
new Dictionary<string, JobOptions>();
/// <summary>
/// Delay which is used by the node importer between validation if all jobs has succeeded.
/// </summary>
Expand Down
14 changes: 7 additions & 7 deletions backend/Application/Aggregates/Contract/Entities/ContractJob.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
using Application.Jobs;

namespace Application.Aggregates.Contract.Entities;

/// <summary>
/// Jobs related to smart contracts, which has successfully executed.
/// </summary>
public sealed class ContractJob
public sealed class ContractJob : IJobEntity
{
public string Job { get; set; } = null!;
public DateTimeOffset CreatedAt { get; init; } = DateTime.UtcNow;
public string Job { get; init; } = null!;

public DateTimeOffset CreatedAt { get; } = DateTime.UtcNow;

/// <summary>
/// Needed for EF Core
/// </summary>
private ContractJob()
public ContractJob()
{}

public ContractJob(string job)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using Application.Aggregates.Contract.BackgroundServices;
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Entities;
using Application.Aggregates.Contract.Jobs;
using Application.Aggregates.Contract.Observability;
using Application.Jobs;
using Application.Observability;
using Dapper;
using HotChocolate.Execution.Configuration;
using Microsoft.Extensions.Configuration;
Expand All @@ -23,18 +25,9 @@ public static void AddContractAggregate(this IServiceCollection collection, ICon
collection.AddTransient<IContractNodeClient, ContractNodeClient>();

collection.AddContractJobs();
collection.AddObservability();

AddDapperTypeHandlers();
}

private static void AddObservability(this IServiceCollection collection)
{
collection.AddSingleton<ContractHealthCheck>();
collection.AddHealthChecks()
.AddCheck<ContractHealthCheck>("Contract", HealthStatus.Unhealthy)
.ForwardToPrometheus();
}

/// <summary>
/// Used by <see cref="Dapper"/> to specify custom mappings of types.
Expand All @@ -52,23 +45,21 @@ internal static IRequestExecutorBuilder AddContractGraphQlConfigurations(this IR
builder
.AddType<Entities.Contract.ContractQuery>()
.AddTypeExtension<Entities.Contract.ContractExtensions>()
.AddType<Entities.ModuleReferenceEvent.ModuleReferenceEventQuery>()
.AddTypeExtension<Entities.ModuleReferenceEvent.ModuleReferenceEventExtensions>()
.AddTypeExtension<Entities.ModuleReferenceContractLinkEvent.ModuleReferenceContractLinkEventExtensions>();
.AddType<ModuleReferenceEvent.ModuleReferenceEventQuery>()
.AddTypeExtension<ModuleReferenceEvent.ModuleReferenceEventExtensions>()
.AddTypeExtension<ModuleReferenceContractLinkEvent.ModuleReferenceContractLinkEventExtensions>();
return builder;
}

/// <summary>
/// Background service which executes all jobs related to Smart Contracts.
///
/// When new is implemented they should be added to the <see cref="ContractJobsBackgroundService"/>.
/// </summary>
private static void AddContractJobs(this IServiceCollection collection)
{
collection.AddHostedService<ContractJobsBackgroundService>();
collection.AddTransient<IContractJobFinder, ContractJobFinder>();

collection.AddSingleton<IContractJobRepository, ContractJobRepository>();
collection.AddHostedService<JobsBackgroundService<IContractJob, ContractJob>>();
collection.AddTransient<IJobFinder<IContractJob>, JobFinder<IContractJob, ContractJob>>();
collection.AddSingleton<IJobRepository<ContractJob>, JobRepository<ContractJob>>();

collection.AddTransient<IContractJob, ParallelBatchBlockHeightJob<InitialContractAggregateCatchUpJob>>();
collection.AddTransient<InitialContractAggregateCatchUpJob>();
collection.AddTransient<IContractJob, InitialModuleSourceCatchup>();
Expand Down
23 changes: 0 additions & 23 deletions backend/Application/Aggregates/Contract/Jobs/ContractJobFinder.cs

This file was deleted.

20 changes: 3 additions & 17 deletions backend/Application/Aggregates/Contract/Jobs/IContractJob.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
using System.Threading;
using System.Threading.Tasks;
using Application.Jobs;

namespace Application.Aggregates.Contract.Jobs;

/// <summary>
/// Interfaces which should be used for all jobs relevant for
/// Smart Contracts.
/// </summary>
public interface IContractJob
{
Task StartImport(CancellationToken token);
/// <summary>
/// This returns a unique identifier of the job.
///
/// WARNING: changing this could result in already executed jobs rerunning.
/// </summary>
string GetUniqueIdentifier();

/// <summary>
/// Returns if import from node should await job execution.
/// </summary>
bool ShouldNodeImportAwait();
}
public interface IContractJob : IJob
{}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Exceptions;
using Application.Aggregates.Contract.Observability;
using Application.Aggregates.Contract.Resilience;
using Application.Aggregates.Contract.Types;
using Application.Api.GraphQL.Accounts;
using Application.Api.GraphQL.Transactions;
using Application.Resilience;
using Microsoft.Extensions.Options;

namespace Application.Aggregates.Contract.Jobs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Dto;
using Application.Aggregates.Contract.Entities;
using Application.Aggregates.Contract.Resilience;
using Application.Api.GraphQL;
using Application.Api.GraphQL.EfCore;
using Application.Api.GraphQL.Transactions;
using Application.Configurations;
using Application.Observability;
using Application.Resilience;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
Expand All @@ -24,7 +25,7 @@ public sealed class InitialContractEventDeserializationFieldsCatchUpJob : IState
private readonly IDbContextFactory<GraphQlDbContext> _contextFactory;
private readonly ILogger _logger;
private readonly ContractAggregateOptions _contractAggregateOptions;
private readonly ContractAggregateJobOptions _jobOptions;
private readonly JobOptions _jobOptions;

public InitialContractEventDeserializationFieldsCatchUpJob(
IDbContextFactory<GraphQlDbContext> contextFactory,
Expand All @@ -35,7 +36,7 @@ IOptions<ContractAggregateOptions> options
_logger = Log.ForContext<InitialContractEventDeserializationFieldsCatchUpJob>();
_contractAggregateOptions = options.Value;
var gotJobOptions = _contractAggregateOptions.Jobs.TryGetValue(GetUniqueIdentifier(), out var jobOptions);
_jobOptions = gotJobOptions ? jobOptions! : new ContractAggregateJobOptions();
_jobOptions = gotJobOptions ? jobOptions! : new JobOptions();
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
using System.Threading.Tasks;
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Entities;
using Application.Aggregates.Contract.Resilience;
using Application.Api.GraphQL.EfCore;
using Application.Api.GraphQL.Transactions;
using Application.Configurations;
using Application.Observability;
using Application.Resilience;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;

Expand All @@ -21,7 +22,7 @@ public sealed class InitialContractRejectEventDeserializationFieldsCatchUpJob :
private readonly IDbContextFactory<GraphQlDbContext> _contextFactory;
private readonly ILogger _logger;
private readonly ContractAggregateOptions _contractAggregateOptions;
private readonly ContractAggregateJobOptions _jobOptions;
private readonly JobOptions _jobOptions;

public InitialContractRejectEventDeserializationFieldsCatchUpJob(
IDbContextFactory<GraphQlDbContext> contextFactory,
Expand All @@ -32,7 +33,7 @@ IOptions<ContractAggregateOptions> options
_logger = Log.ForContext<InitialContractRejectEventDeserializationFieldsCatchUpJob>();
_contractAggregateOptions = options.Value;
var gotJobOptions = _contractAggregateOptions.Jobs.TryGetValue(GetUniqueIdentifier(), out var jobOptions);
_jobOptions = gotJobOptions ? jobOptions! : new ContractAggregateJobOptions();
_jobOptions = gotJobOptions ? jobOptions! : new JobOptions();
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Entities;
using Application.Aggregates.Contract.Observability;
using Application.Aggregates.Contract.Resilience;
using Application.Api.GraphQL.EfCore;
using Application.Configurations;
using Application.Observability;
using Application.Resilience;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Serilog.Context;
Expand All @@ -16,10 +17,10 @@ public class InitialModuleSourceCatchup : IContractJob
{
private readonly IContractNodeClient _client;
private readonly IDbContextFactory<GraphQlDbContext> _dbContextFactory;
private readonly ContractHealthCheck _healthCheck;
private readonly JobHealthCheck _jobHealthCheck;
private readonly ILogger _logger;
private readonly ContractAggregateOptions _contractAggregateOptions;
private readonly ContractAggregateJobOptions _jobOptions;
private readonly JobOptions _jobOptions;

/// <summary>
/// WARNING - Do not change this if job already executed on environment, since it will trigger rerun of job.
Expand All @@ -30,15 +31,15 @@ public InitialModuleSourceCatchup(
IContractNodeClient client,
IDbContextFactory<GraphQlDbContext> dbContextFactory,
IOptions<ContractAggregateOptions> options,
ContractHealthCheck healthCheck)
JobHealthCheck jobHealthCheck)
{
_client = client;
_dbContextFactory = dbContextFactory;
_healthCheck = healthCheck;
_jobHealthCheck = jobHealthCheck;
_logger = Log.ForContext<InitialModuleSourceCatchup>();
_contractAggregateOptions = options.Value;
var gotJobOptions = _contractAggregateOptions.Jobs.TryGetValue(GetUniqueIdentifier(), out var jobOptions);
_jobOptions = gotJobOptions ? jobOptions! : new ContractAggregateJobOptions();
_jobOptions = gotJobOptions ? jobOptions! : new JobOptions();
}

private async Task<IList<string>> GetModuleReferences()
Expand Down Expand Up @@ -99,7 +100,7 @@ public async Task StartImport(CancellationToken token)
}
catch (Exception e)
{
_healthCheck.AddUnhealthyJobWithMessage(GetUniqueIdentifier(), "Job stopped due to exception.");
_jobHealthCheck.AddUnhealthyJobWithMessage(GetUniqueIdentifier(), "Job stopped due to exception.");
_logger.Fatal(e, $"{GetUniqueIdentifier()} stopped due to exception.");
throw;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Application.Aggregates.Contract.Configurations;
using Application.Aggregates.Contract.Observability;
using Application.Configurations;
using Application.Observability;
using Microsoft.Extensions.Options;
using Serilog.Context;
Expand All @@ -19,24 +20,24 @@ namespace Application.Aggregates.Contract.Jobs;
internal sealed class ParallelBatchBlockHeightJob<TStatelessJob> : IContractJob where TStatelessJob : IStatelessBlockHeightJobs
{

private readonly ContractHealthCheck _healthCheck;
private readonly JobHealthCheck _jobHealthCheck;
private readonly IStatelessBlockHeightJobs _statelessJob;
private readonly ILogger _logger;
private readonly ContractAggregateOptions _contractAggregateOptions;
private readonly ContractAggregateJobOptions _jobOptions;
private readonly JobOptions _jobOptions;

public ParallelBatchBlockHeightJob(
TStatelessJob statelessJob,
IOptions<ContractAggregateOptions> options,
ContractHealthCheck healthCheck
JobHealthCheck jobHealthCheck
)
{
_statelessJob = statelessJob;
_healthCheck = healthCheck;
_jobHealthCheck = jobHealthCheck;
_logger = Log.ForContext<ParallelBatchBlockHeightJob<TStatelessJob>>();
_contractAggregateOptions = options.Value;
var gotJobOptions = _contractAggregateOptions.Jobs.TryGetValue(GetUniqueIdentifier(), out var jobOptions);
_jobOptions = gotJobOptions ? jobOptions! : new ContractAggregateJobOptions();
_jobOptions = gotJobOptions ? jobOptions! : new JobOptions();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -86,7 +87,7 @@ public async Task StartImport(CancellationToken token)
catch (Exception e)
{
_logger.Fatal(e, $"{GetUniqueIdentifier()} stopped due to exception.");
_healthCheck.AddUnhealthyJobWithMessage(GetUniqueIdentifier(), "Database import job stopped due to exception.");
_jobHealthCheck.AddUnhealthyJobWithMessage(GetUniqueIdentifier(), "Database import job stopped due to exception.");
throw;
}
}
Expand Down
Loading