From f69e264ee9aab41b3f5f28359b3d675cb07eb3be Mon Sep 17 00:00:00 2001 From: StanR Date: Tue, 2 Jul 2024 01:38:14 +0500 Subject: [PATCH] Add separate very-long-running job for all users updates --- backend/Mutualify/Contracts/StatsContract.cs | 2 +- backend/Mutualify/Jobs/UserAllUpdateJob.cs | 116 +++++++++++++++++++ backend/Mutualify/Jobs/UserUpdateJob.cs | 5 +- backend/Mutualify/Program.cs | 1 + backend/Mutualify/Services/UsersService.cs | 8 +- 5 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 backend/Mutualify/Jobs/UserAllUpdateJob.cs diff --git a/backend/Mutualify/Contracts/StatsContract.cs b/backend/Mutualify/Contracts/StatsContract.cs index 15aa332..d47e768 100644 --- a/backend/Mutualify/Contracts/StatsContract.cs +++ b/backend/Mutualify/Contracts/StatsContract.cs @@ -4,7 +4,7 @@ public class StatsContract { public int RegisteredCount { get; set; } public long RelationCount { get; set; } + public long UserCount { get; set; } public int LastDayRegisteredCount { get; set; } public int EligibleForUpdateCount { get; set; } - public int EligibleForUserUpdateCount { get; set; } } diff --git a/backend/Mutualify/Jobs/UserAllUpdateJob.cs b/backend/Mutualify/Jobs/UserAllUpdateJob.cs new file mode 100644 index 0000000..5ab4c3a --- /dev/null +++ b/backend/Mutualify/Jobs/UserAllUpdateJob.cs @@ -0,0 +1,116 @@ + +using Hangfire; +using Hangfire.Server; +using Microsoft.EntityFrameworkCore; +using Mutualify.Database; +using Mutualify.Services.Interfaces; + +namespace Mutualify.Jobs; +public interface IUserAllUpdateJob +{ + Task Run(PerformContext context, CancellationToken token); +} + +/// +/// Apparently there are ~400k users in the database so updating them every day is not really possible +/// +public class UserAllUpdateJob : IUserAllUpdateJob +{ + private readonly DatabaseContext _databaseContext; + private readonly IUsersService _usersService; + private readonly ILogger _logger; + + private const double _interval = 5; // seconds + + private static bool _isRunning = false; + private static DateTime _lastStartDate; + + public UserAllUpdateJob(IUsersService usersService, ILogger logger, DatabaseContext databaseContext) + { + _usersService = usersService; + _logger = logger; + _databaseContext = databaseContext; + } + + [DisableConcurrentExecution(timeoutInSeconds: 60 * 60 * 24 * 14)] + public async Task Run(PerformContext context, CancellationToken token) + { + var jobId = context.BackgroundJob.Id; + + using var _ = _logger.BeginScope("UserAllUpdateJob"); + + _logger.LogInformation("[{JobId}] Starting all users update job...", jobId); + + if (_isRunning && _lastStartDate.AddDays(1) > DateTime.Now) + { + _logger.LogInformation("[{JobId}] Job is already running, abort!", jobId); + return; + } + + _isRunning = true; + _lastStartDate = DateTime.Now; + + // since we might be running for a month shouldn't this be updated at some point? + var userUpdateQueue = await _databaseContext.Users.AsNoTracking() + .Where(x => x.UpdatedAt == null || x.UpdatedAt < DateTime.UtcNow.AddDays(-14)) + .Select(x => x.Id) + .ToListAsync(cancellationToken: token); + + for (var i = 0; i < userUpdateQueue.Count; i++) + { + token.ThrowIfCancellationRequested(); + + var userId = userUpdateQueue[i]; + var startTime = DateTime.Now; + + try + { +#if !DEBUG + if (i % 1000 == 0) + { +#endif + _logger.LogInformation("[{JobId}] ({Current}/{Total}) Updating {Id}...", jobId, i + 1, + userUpdateQueue.Count, userId); +#if !DEBUG + } +#endif + + await _usersService.Update(userId, false); + } + catch (AggregateException e) + { + if (e.InnerException is HttpRequestException) + { + // don't fail on HttpRequestExceptions, just keep going + continue; + } + + _isRunning = false; + + throw; + } + catch (DbUpdateConcurrencyException) { } // don't fail on HttpRequestExceptions or DbUpdateConcurrencyException, just keep going + catch (HttpRequestException) { } + catch (OperationCanceledException) + { + _logger.LogWarning("[{JobId}] All users update job has been cancelled!", jobId); + + _isRunning = false; + return; + } + finally + { + var endTime = DateTime.Now; + + // yea its not really accurate but we don't need precision, just run it approximately every X seconds + var elapsed = endTime - startTime; + var timeout = elapsed.TotalSeconds < _interval ? _interval - (int) elapsed.TotalSeconds : 0; + + await Task.Delay((int)(timeout * 1000), token); + } + } + + _isRunning = false; + _logger.LogInformation("[{JobId}] Finished all users update job", jobId); + } +} diff --git a/backend/Mutualify/Jobs/UserUpdateJob.cs b/backend/Mutualify/Jobs/UserUpdateJob.cs index 92b53ff..6f7932e 100644 --- a/backend/Mutualify/Jobs/UserUpdateJob.cs +++ b/backend/Mutualify/Jobs/UserUpdateJob.cs @@ -14,7 +14,7 @@ public class UserUpdateJob : IUserUpdateJob private readonly IUsersService _usersService; private readonly ILogger _logger; - private const int _interval = 2; // seconds + private const double _interval = 2; // seconds private static bool _isRunning = false; private static DateTime _lastStartDate; @@ -48,6 +48,7 @@ public async Task Run(PerformContext context, CancellationToken token) .Where(x=> x.UpdatedAt == null || x.UpdatedAt < DateTime.UtcNow.AddDays(-1)) .OrderByDescending(x=> x.FollowerCount) .Select(x => x.Id) + .Take(15000) // see UserAllUpdateJob.cs .ToListAsync(cancellationToken: token); for (var i = 0; i < userUpdateQueue.Count; i++) @@ -100,7 +101,7 @@ public async Task Run(PerformContext context, CancellationToken token) var elapsed = endTime - startTime; var timeout = elapsed.TotalSeconds < _interval ? _interval - (int) elapsed.TotalSeconds : 0; - await Task.Delay(timeout * 1000, token); + await Task.Delay((int)(timeout * 1000), token); } } diff --git a/backend/Mutualify/Program.cs b/backend/Mutualify/Program.cs index 26b0977..2ce9c52 100644 --- a/backend/Mutualify/Program.cs +++ b/backend/Mutualify/Program.cs @@ -236,6 +236,7 @@ static Task UnauthorizedRedirect(RedirectContext co RecurringJob.AddOrUpdate("user-relations-update", x => x.Run(null!, CancellationToken.None), Cron.Daily(12)); RecurringJob.AddOrUpdate("users-update", x => x.Run(null!, CancellationToken.None), Cron.Daily()); +RecurringJob.AddOrUpdate("users-update-all", x => x.Run(null!, CancellationToken.None), Cron.Monthly(3)); //BackgroundJob.Enqueue(x => x.Run(null!, JobCancellationToken.Null)); try diff --git a/backend/Mutualify/Services/UsersService.cs b/backend/Mutualify/Services/UsersService.cs index be41bb4..5d070c6 100644 --- a/backend/Mutualify/Services/UsersService.cs +++ b/backend/Mutualify/Services/UsersService.cs @@ -55,16 +55,15 @@ public async Task GetStats() var relationsUpdateEligible = await _databaseContext.Tokens.AsNoTracking() .CountAsync(); - var userUpdateEligible = await _databaseContext.Users.AsNoTracking() - .Where(x => x.UpdatedAt == null || x.UpdatedAt < DateTime.UtcNow.AddDays(-1)) - .CountAsync(); + var userCount = await _databaseContext.Users.AsNoTracking() + .LongCountAsync(); return new StatsContract { RegisteredCount = registeredUsers, RelationCount = relationCount, + UserCount = userCount, EligibleForUpdateCount = relationsUpdateEligible, - EligibleForUserUpdateCount = userUpdateEligible, LastDayRegisteredCount = lastDayRegistered }; } @@ -138,6 +137,7 @@ public async Task Update(int userId, bool useTokens) if (osuUser is null) { + _logger.LogError("User {User} doesn't exist according to API!", userId); return; }