Skip to content

Commit

Permalink
Added job to export Workforce Data to GCS (#1490)
Browse files Browse the repository at this point in the history
  • Loading branch information
hortha authored Sep 2, 2024
1 parent 43f1df9 commit 4732cb7
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 2 deletions.
4 changes: 3 additions & 1 deletion TeachingRecordSystem/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<PackageVersion Include="Faker.Net" Version="2.0.154" />
<PackageVersion Include="FakeXrmEasy.v9" Version="3.5.0" />
<PackageVersion Include="FluentValidation.AspNetCore" Version="11.3.0" />
<PackageVersion Include="Google.Cloud.Storage.V1" Version="4.10.0" />
<PackageVersion Include="GovUk.OneLogin.AspNetCore" Version="0.3.1" />
<PackageVersion Include="GovukNotify" Version="6.1.0" />
<PackageVersion Include="GovUk.Frontend.AspNetCore" Version="2.0.1" />
Expand Down Expand Up @@ -67,6 +68,7 @@
<PackageVersion Include="OpenIddict.AspNetCore" Version="5.2.0" />
<PackageVersion Include="OpenIddict.EntityFrameworkCore" Version="5.2.0" />
<PackageVersion Include="Optional" Version="4.0.0" />
<PackageVersion Include="Parquet.Net" Version="4.24.0" />
<PackageVersion Include="PdfSharpCore" Version="1.3.62" />
<PackageVersion Include="Polly.Core" Version="8.2.1" />
<PackageVersion Include="prometheus-net.AspNetCore" Version="8.2.1" />
Expand Down Expand Up @@ -96,4 +98,4 @@
<PackageVersion Include="Xunit.DependencyInjection" Version="8.9.1" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
</Project>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using TeachingRecordSystem.Core.Services.WorkforceData;

namespace TeachingRecordSystem.Core.Jobs;

public class ExportWorkforceDataJob(WorkforceDataExporter workforceDataExporter)
{
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
await workforceDataExporter.Export(cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public static IHostApplicationBuilder AddBackgroundJobs(this IHostApplicationBui
job => job.ExecuteAsync(CancellationToken.None),
Cron.Never);

recurringJobManager.AddOrUpdate<ExportWorkforceDataJob>(
nameof(ExportWorkforceDataJob),
job => job.ExecuteAsync(CancellationToken.None),
Cron.Never);

return Task.CompletedTask;
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Google.Cloud.Storage.V1;

namespace TeachingRecordSystem.Core.Services.WorkforceData.Google;

public interface IStorageClientProvider
{
ValueTask<StorageClient> GetStorageClientAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Google.Cloud.Storage.V1;
using Microsoft.Extensions.Options;

namespace TeachingRecordSystem.Core.Services.WorkforceData.Google;

public class OptionsStorageClientProvider : IStorageClientProvider
{
private readonly IOptions<WorkforceDataExportOptions> _optionsAccessor;

public OptionsStorageClientProvider(IOptions<WorkforceDataExportOptions> optionsAccessor)
{
ArgumentNullException.ThrowIfNull(optionsAccessor);
_optionsAccessor = optionsAccessor;
}

public ValueTask<StorageClient> GetStorageClientAsync()
{
var configuredClient = _optionsAccessor.Value.StorageClient ??
throw new InvalidOperationException($"No {nameof(WorkforceDataExportOptions.StorageClient)} has been configured.");

return new ValueTask<StorageClient>(configuredClient);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using TeachingRecordSystem.Core.Services.Establishments.Tps;
using TeachingRecordSystem.Core.Services.WorkforceData.Google;

namespace TeachingRecordSystem.Core.Services.WorkforceData;

Expand All @@ -11,6 +13,9 @@ public static IServiceCollection AddWorkforceData(this IServiceCollection servic
services.AddSingleton<TpsCsvExtractFileImporter>();
services.AddSingleton<TpsCsvExtractProcessor>();
services.AddSingleton<TpsEstablishmentRefresher>();
services.AddSingleton<IConfigureOptions<WorkforceDataExportOptions>, WorkforceDataExportConfigureOptions>();
services.AddSingleton<IStorageClientProvider, OptionsStorageClientProvider>();
services.AddSingleton<WorkforceDataExporter>();

return services;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Text.Json;
using Google.Apis.Auth.OAuth2;
using Google.Cloud.Storage.V1;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;

namespace TeachingRecordSystem.Core.Services.WorkforceData;

internal class WorkforceDataExportConfigureOptions : IConfigureOptions<WorkforceDataExportOptions>
{
private readonly IConfiguration _configuration;

public WorkforceDataExportConfigureOptions(IConfiguration configuration)
{
_configuration = configuration;
}

public void Configure(WorkforceDataExportOptions options)
{
ArgumentNullException.ThrowIfNull(options);

var section = _configuration.GetSection("WorkforceDataExport");

options.BucketName = section["BucketName"]!;
var credentialsJson = section["CredentialsJson"];

if (!string.IsNullOrEmpty(credentialsJson))
{
var credentialsJsonDoc = JsonDocument.Parse(credentialsJson);

if (credentialsJsonDoc.RootElement.TryGetProperty("private_key", out _))
{
var creds = GoogleCredential.FromJson(credentialsJson);
options.StorageClient = StorageClient.Create(creds);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace TeachingRecordSystem.Core.Services.WorkforceData;

public record WorkforceDataExportItem
{
public required Guid TpsEmploymentId { get; init; }
public required Guid PersonId { get; init; }
public required string Trn { get; init; }
public required Guid EstablishmentId { get; init; }
public required string EstablishmentSource { get; init; }
public required int? EstablishmentUrn { get; init; }
public required string LocalAuthorityCode { get; init; }
public required string? EstablishmentNumber { get; init; }
public required string EstablishmentName { get; init; }
public required DateOnly StartDate { get; init; }
public required DateOnly? EndDate { get; init; }
public required DateOnly LastKnownTpsEmployedDate { get; init; }
public required string EmploymentType { get; init; }
public required bool WithdrawalConfirmed { get; init; }
public required DateOnly LastExtractDate { get; init; }
public required string Key { get; init; }
public required string NationalInsuranceNumber { get; init; }
public required string? PersonPostcode { get; init; }
public required DateTime CreatedOn { get; init; }
public required DateTime UpdatedOn { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Diagnostics.CodeAnalysis;
using Google.Cloud.Storage.V1;

namespace TeachingRecordSystem.Core.Services.WorkforceData;

public class WorkforceDataExportOptions
{
public StorageClient? StorageClient { get; set; }
[DisallowNull]
public string? BucketName { get; set; }

[MemberNotNull(nameof(BucketName))]
internal void ValidateOptions()
{
if (BucketName is null)
{
throw new InvalidOperationException($"{nameof(BucketName)} has not been configured.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using Microsoft.Extensions.Options;
using Parquet.Serialization;
using Parquet.Utils;
using TeachingRecordSystem.Core.DataStore.Postgres;
using TeachingRecordSystem.Core.Services.WorkforceData.Google;

namespace TeachingRecordSystem.Core.Services.WorkforceData;

public class WorkforceDataExporter(
IClock clock,
IDbContextFactory<TrsDbContext> dbContextFactory,
IOptions<WorkforceDataExportOptions> optionsAccessor,
IStorageClientProvider storageClientProvider)
{
public async Task Export(CancellationToken cancellationToken)
{
using var dbContext = dbContextFactory.CreateDbContext();
dbContext.Database.SetCommandTimeout(300);

FormattableString querySql =
$"""
SELECT
t.tps_employment_id,
t.person_id,
p.trn,
e.establishment_id,
s.name establishment_source,
e.urn establishment_urn,
e.la_code local_authority_code,
e.establishment_number,
e.establishment_name,
t.start_date,
t.end_date,
t.last_known_tps_employed_date,
CASE
WHEN t.employment_type = 0 THEN 'FT'
WHEN t.employment_type = 1 THEN 'PTR'
WHEN t.employment_type = 2 THEN 'PTI'
WHEN t.employment_type = 3 THEN 'PT'
END employment_type,
t.withdrawal_confirmed,
t.last_extract_date,
t.key,
t.national_insurance_number,
t.person_postcode,
t.created_on,
t.updated_on
FROM
tps_employments t
JOIN
persons p ON p.person_id = t.person_id
JOIN
establishments e ON e.establishment_id = t.establishment_id
JOIN
establishment_sources s ON s.establishment_source_id = e.establishment_source_id
""";

var fileDateTime = clock.UtcNow.ToString("yyyyMMddHHmm");
var tempDirectory = Path.Combine(Path.GetTempPath(), $"workforce_data_{fileDateTime}");
Directory.CreateDirectory(tempDirectory);

var i = 0;
var fileNumber = 0;
var itemsToExport = new List<WorkforceDataExportItem>();
await foreach (var item in dbContext.Database.SqlQuery<WorkforceDataExportItem>(querySql).AsAsyncEnumerable())
{
i++;
itemsToExport.Add(item);

if (i % 50000 == 0)
{
fileNumber++;
await ParquetSerializer.SerializeAsync(itemsToExport, Path.Combine(tempDirectory, $"workforce_data_{fileDateTime}_{fileNumber}.parquet"));
itemsToExport.Clear();
}
}

if (itemsToExport.Count > 0)
{
fileNumber++;
await ParquetSerializer.SerializeAsync(itemsToExport, Path.Combine(tempDirectory, $"workforce_data_{fileDateTime}_{fileNumber}.parquet"));
itemsToExport.Clear();
}

using var stream = new MemoryStream();
var merger = new FileMerger(new DirectoryInfo(tempDirectory));
await merger.MergeFilesAsync(stream);
await UploadFile(stream, $"workforce_data_{clock.UtcNow:yyyyMMddHHmm}.parquet", cancellationToken);
Directory.Delete(tempDirectory, true);
}

private async Task UploadFile(Stream stream, string fileName, CancellationToken cancellationToken)
{
var storageClient = await storageClientProvider.GetStorageClientAsync();
var options = optionsAccessor.Value;
options.ValidateOptions();

await storageClient.UploadObjectAsync(options.BucketName, fileName, null, stream, cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
<PackageReference Include="DistributedLock.Azure" />
<PackageReference Include="DistributedLock.FileSystem" />
<PackageReference Include="EFCore.NamingConventions" />
<PackageReference Include="Google.Cloud.Storage.V1" />
<PackageReference Include="GovukNotify" />
<PackageReference Include="Hangfire.Core" />
<PackageReference Include="Hangfire.NetCore" />
Expand Down Expand Up @@ -159,6 +160,7 @@
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" />
<PackageReference Include="OpenIddict.EntityFrameworkCore" />
<PackageReference Include="Optional" />
<PackageReference Include="Parquet.Net" />
<PackageReference Include="PdfSharpCore" />
<PackageReference Include="Polly.Core" />
<PackageReference Include="Scrutor" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace TeachingRecordSystem.Core.Tests.Services.WorkforceData;

public class TpsCsvExtractProcessorTests
public class TpsCsvExtractProcessorTests : IAsyncLifetime
{
public TpsCsvExtractProcessorTests(
DbFixture dbFixture,
Expand Down Expand Up @@ -515,6 +515,10 @@ public async Task BackfillNinoAndPersonPostcodeInEmploymentHistory_WhenCalledWit
Assert.Equal(memberPostcode, updatedPersonEmployment.PersonPostcode);
}

public Task InitializeAsync() => Task.CompletedTask;

public Task DisposeAsync() => DbFixture.DbHelper.ClearData();

private DbFixture DbFixture { get; }

private TestData TestData { get; }
Expand Down
Loading

0 comments on commit 4732cb7

Please sign in to comment.