Skip to content

Commit

Permalink
Move model saving to engine definition
Browse files Browse the repository at this point in the history
Fix model cleaning hanfire storage
  • Loading branch information
johnml1135 committed Feb 5, 2024
1 parent 86910b3 commit 54c2a35
Show file tree
Hide file tree
Showing 19 changed files with 134 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ private static IMachineBuilder AddHangfireBuildJobRunner(this IMachineBuilder bu
return builder;
}

private static MongoStorageOptions GetMongoStorageOptions()
{
var mongoStorageOptions = new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
};
return mongoStorageOptions;
}

public static IMachineBuilder AddMongoHangfireJobClient(
this IMachineBuilder builder,
string? connectionString = null
Expand All @@ -164,19 +179,7 @@ public static IMachineBuilder AddMongoHangfireJobClient(
c.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseMongoStorage(
connectionString,
new MongoStorageOptions
{
MigrationOptions = new MongoMigrationOptions
{
MigrationStrategy = new MigrateMongoMigrationStrategy(),
BackupStrategy = new CollectionMongoBackupStrategy()
},
CheckConnection = true,
CheckQueuedJobsStrategy = CheckQueuedJobsStrategy.TailNotificationsCollection,
}
)
.UseMongoStorage(connectionString, GetMongoStorageOptions())
.UseFilter(new AutomaticRetryAttribute { Attempts = 0 })
);
builder.Services.AddHealthChecks().AddCheck<HangfireHealthCheck>(name: "Hangfire");
Expand Down Expand Up @@ -208,7 +211,7 @@ public static IMachineBuilder AddHangfireJobServer(

builder.Services.AddHangfireServer(o =>
{
o.Queues = queues.ToArray();
o.Queues = [.. queues];
});
return builder;
}
Expand Down Expand Up @@ -402,6 +405,20 @@ public static IMachineBuilder AddBuildJobService(this IMachineBuilder builder)
return builder;
}

public static IMachineBuilder AddModelCleanupJob(this IMachineBuilder builder, string? connectionString = null)
{
connectionString ??= builder.Configuration?.GetConnectionString("Hangfire");
if (connectionString is null)
throw new InvalidOperationException("Hangfire connection string is required");

var mongoClientSettings = MongoClientSettings.FromUrl(new MongoUrl(connectionString));
JobStorage.Current = new MongoStorage(mongoClientSettings, "recurring_job", GetMongoStorageOptions());
builder.Services.AddSingleton<ICleanupOldModelsJob, CleanupOldModelsJob>();
RecurringJobOptions options = new() { TimeZone = TimeZoneInfo.Utc };
RecurringJob.AddOrUpdate<ICleanupOldModelsJob>("Cleanup-job", x => x.RunAsync(), Cron.Daily, options);
return builder;
}

private static IMachineBuilder AddBuildJobService(this IMachineBuilder builder, BuildJobOptions options)
{
builder.Services.AddScoped<IBuildJobService, BuildJobService>();
Expand Down
1 change: 1 addition & 0 deletions src/SIL.Machine.AspNetCore/Models/TranslationEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class TranslationEngine : IEntity
public string EngineId { get; set; } = default!;
public string SourceLanguage { get; set; } = default!;
public string TargetLanguage { get; set; } = default!;
public bool IsModelRetrievable { get; set; } = false;
public int BuildRevision { get; set; }
public Build? CurrentBuild { get; set; }
}
4 changes: 1 addition & 3 deletions src/SIL.Machine.AspNetCore/Services/CleanupJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ public class CleanupOldModelsJob(ISharedFileService sharedFileService) : ICleanu
private List<string> _filesPreviouslyMarkedForDeletion = [];
private readonly List<string> _filesNewlyMarkedForDeletion = [];

private static readonly string MODEL_DIRECTORY = "models/";

public async Task RunAsync()
{
var files = await SharedFileService.ListFilesAsync(MODEL_DIRECTORY);
var files = await SharedFileService.ListFilesAsync(ISharedFileService.ModelDirectory);
// split name by underscore into engineID and buildRevision
Dictionary<string, int> modelsByEngineId = [];
foreach (string file in files)
Expand Down
12 changes: 3 additions & 9 deletions src/SIL.Machine.AspNetCore/Services/HangfireHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
namespace SIL.Machine.AspNetCore.Services;

public class HangfireHealthCheck : IHealthCheck
public class HangfireHealthCheck(JobStorage jobStorage, IOptions<BackgroundJobServerOptions> options) : IHealthCheck
{
private readonly JobStorage _jobStorage;
private readonly IOptions<BackgroundJobServerOptions> _options;

public HangfireHealthCheck(JobStorage jobStorage, IOptions<BackgroundJobServerOptions> options)
{
_jobStorage = jobStorage;
_options = options;
}
private readonly JobStorage _jobStorage = jobStorage;
private readonly IOptions<BackgroundJobServerOptions> _options = options;

public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
Expand Down
2 changes: 1 addition & 1 deletion src/SIL.Machine.AspNetCore/Services/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Task<IReadOnlyCollection<string>> ListFilesAsync(

Task<Stream> OpenWriteAsync(string path, CancellationToken cancellationToken = default);

Task<string> GetPresignedUrlAsync(string path, CancellationToken cancellationToken = default);
Task<string> GetPresignedUrlAsync(string path, int minutesToExpire, CancellationToken cancellationToken = default);

Task DeleteAsync(string path, bool recurse = false, CancellationToken cancellationToken = default);
}
4 changes: 3 additions & 1 deletion src/SIL.Machine.AspNetCore/Services/ISharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

public interface ISharedFileService
{
public const string ModelDirectory = "models/";

Uri GetBaseUri();

Uri GetResolvedUri(string path);

Task<Uri> GetPresignedUrlAsync(string path);
Task<Uri> GetPresignedUrlAsync(string path, int minutesToExpire);

Task<IReadOnlyCollection<string>> ListFilesAsync(
string path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Task CreateAsync(
string? engineName,
string sourceLanguage,
string targetLanguage,
bool isModelRetrievable = false,
CancellationToken cancellationToken = default
);
Task DeleteAsync(string engineId, CancellationToken cancellationToken = default);
Expand Down
6 changes: 5 additions & 1 deletion src/SIL.Machine.AspNetCore/Services/InMemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ public Task<IReadOnlyCollection<string>> ListFilesAsync(
);
}

public Task<string> GetPresignedUrlAsync(string path, CancellationToken cancellationToken = default)
public Task<string> GetPresignedUrlAsync(
string path,
int minutesToExpire,
CancellationToken cancellationToken = default
)
{
return Task.FromResult(path);
}
Expand Down
6 changes: 5 additions & 1 deletion src/SIL.Machine.AspNetCore/Services/LocalStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public Task<IReadOnlyCollection<string>> ListFilesAsync(
);
}

public Task<string> GetPresignedUrlAsync(string path, CancellationToken cancellationToken = default)
public Task<string> GetPresignedUrlAsync(
string path,
int minutesToExpire,
CancellationToken cancellationToken = default
)
{
return Task.FromResult(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public async Task<string> CreateJobScriptAsync(
+ $" 'shared_file_uri': '{baseUri}',\n"
+ $" 'shared_file_folder': '{folder}',\n"
+ (buildOptions is not null ? $" 'build_options': '''{buildOptions}''',\n" : "")
+ (engine.IsModelRetrievable ? $" 'save_model': '{engineId}_{engine.BuildRevision + 1}',\n" : "")
+ $" 'clearml': True\n"
+ "}\n"
+ "run(args)\n";
Expand Down
51 changes: 41 additions & 10 deletions src/SIL.Machine.AspNetCore/Services/NmtEngineService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ ISharedFileService sharedFileService

public TranslationEngineType Type => TranslationEngineType.Nmt;

private const int MinutesToExpire = 60;

public async Task CreateAsync(
string engineId,
string? engineName,
string sourceLanguage,
string targetLanguage,
bool isModelRetrievable = false,
CancellationToken cancellationToken = default
)
{
Expand All @@ -43,12 +46,13 @@ await _engines.InsertAsync(
{
EngineId = engineId,
SourceLanguage = sourceLanguage,
TargetLanguage = targetLanguage
TargetLanguage = targetLanguage,
IsModelRetrievable = isModelRetrievable
},
cancellationToken
);
await _buildJobService.CreateEngineAsync(
new[] { BuildJobType.Cpu, BuildJobType.Gpu },
[BuildJobType.Cpu, BuildJobType.Gpu],
engineId,
engineName,
cancellationToken
Expand Down Expand Up @@ -115,16 +119,35 @@ public async Task<ModelPresignedUrl> GetModelPresignedUrlAsync(
CancellationToken cancellationToken = default
)
{
var files = await _sharedFileService.ListFilesAsync($"models/", cancellationToken: cancellationToken);
// find latest file that start with the engineId
var latestFile = files.Where(f => f.StartsWith(engineId)).OrderByDescending(f => f).FirstOrDefault();
if (latestFile is null)
throw new FileNotFoundException("No built, saved model found for engine.", engineId);
string buildRevision = latestFile.Split('_').Last();
TranslationEngine engine = await GetEngineAsync(engineId, cancellationToken);
if (!engine.IsModelRetrievable)
throw new InvalidOperationException(
"The model cannot be downloaded. "
+ "To enable downloading the model, recreate the engine with IsModelRetrievable property to true."
);
if (engine.BuildRevision == 0)
throw new InvalidOperationException("The engine has not been built yet.");
string filename = $"{engineId}_{engine.BuildRevision}.tar.gz";
bool fileExists = await _sharedFileService.ExistsAsync(
ISharedFileService.ModelDirectory + filename,
cancellationToken
);
if (!fileExists)
throw new FileNotFoundException(
$"The model should exist to be downloaded but is not there for BuildRevision {engine.BuildRevision}."
);
var modelInfo = new ModelPresignedUrl
{
PresignedUrl = (await _sharedFileService.GetPresignedUrlAsync($"models/{latestFile}")).ToString(),
BuildRevision = buildRevision
PresignedUrl = (
await _sharedFileService.GetPresignedUrlAsync(
ISharedFileService.ModelDirectory + filename,
MinutesToExpire
)
).ToString(),
BuildRevision = engine.BuildRevision,
UrlExpirationTime = DateTime
.UtcNow.AddMinutes(MinutesToExpire)
.ToString("yyyy-MM-ddTHH\\:mm\\:ss.fffffffzzz", CultureInfo.InvariantCulture)
};
return modelInfo;
}
Expand Down Expand Up @@ -178,4 +201,12 @@ private async Task CancelBuildJobAsync(string engineId, CancellationToken cancel
if (buildId is not null && jobState is BuildJobState.None)
await _platformService.BuildCanceledAsync(buildId, CancellationToken.None);
}

private async Task<TranslationEngine> GetEngineAsync(string engineId, CancellationToken cancellationToken)
{
TranslationEngine? engine = await _engines.GetAsync(e => e.EngineId == engineId, cancellationToken);
if (engine is null)
throw new InvalidOperationException($"The engine {engineId} does not exist.");
return engine;
}
}
5 changes: 5 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/NmtTrainBuildJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ await PipInstallModuleAsync(
+ $" 'trg_lang': '{ConvertLanguageTag(engine.TargetLanguage)}',\n"
+ $" 'shared_file_uri': '{_sharedFileService.GetBaseUri()}',\n"
+ (buildOptions is not null ? $" 'build_options': '''{buildOptions}''',\n" : "")
+ (
engine.IsModelRetrievable
? $" 'save_model': '{engine.Id}_{engine.BuildRevision + 1}',\n"
: ""
)
+ $" 'clearml': False\n"
+ "}\n"
+ "run(args)\n"
Expand Down
8 changes: 6 additions & 2 deletions src/SIL.Machine.AspNetCore/Services/S3FileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ public async Task<IReadOnlyCollection<string>> ListFilesAsync(
return response.S3Objects.Select(s3Obj => s3Obj.Key[_basePath.Length..]).ToList();
}

public Task<string> GetPresignedUrlAsync(string path, CancellationToken cancellationToken = default)
public Task<string> GetPresignedUrlAsync(
string path,
int minutesToExpire,
CancellationToken cancellationToken = default
)
{
return Task.FromResult(
_client.GetPreSignedURL(
new GetPreSignedUrlRequest
{
BucketName = _bucketName,
Key = _basePath + Normalize(path),
Expires = DateTime.UtcNow.AddMinutes(60)
Expires = DateTime.UtcNow.AddMinutes(minutesToExpire)
}
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ await engineService.CreateAsync(
request.HasEngineName ? request.EngineName : null,
request.SourceLanguage,
request.TargetLanguage,
request.IsModelRetrievable,
context.CancellationToken
);
return Empty;
Expand Down Expand Up @@ -132,7 +133,8 @@ ServerCallContext context
return new GetModelPresignedUrlResponse
{
PresignedUrl = modelPresignedUrl.PresignedUrl,
BuildRevision = modelPresignedUrl.BuildRevision
BuildRevision = modelPresignedUrl.BuildRevision,
UrlExpirationTime = modelPresignedUrl.UrlExpirationTime
};
}

Expand Down
4 changes: 2 additions & 2 deletions src/SIL.Machine.AspNetCore/Services/SharedFileService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ public Uri GetResolvedUri(string path)
return new Uri(_baseUri, path);
}

public async Task<Uri> GetPresignedUrlAsync(string path)
public async Task<Uri> GetPresignedUrlAsync(string path, int minutesToExpire)
{
string presignedUrl = path;
if (_baseUri is not null)
if (_baseUri.Scheme == "s3")
presignedUrl = await _fileStorage.GetPresignedUrlAsync(path);
presignedUrl = await _fileStorage.GetPresignedUrlAsync(path, minutesToExpire);
var url = GetResolvedUri(presignedUrl);
return url;
}
Expand Down
Loading

0 comments on commit 54c2a35

Please sign in to comment.