From e92d4e5c0889d4e5585980369f40e64940a0f1ef Mon Sep 17 00:00:00 2001 From: Evan Danaher Date: Fri, 17 Feb 2023 10:07:48 -0500 Subject: [PATCH] Use a recurring timer instead of an interval for processJobs. This allows us to delay the next run if the queries take longer than 5 seconds. --- lib/agenda/index.ts | 2 +- lib/agenda/start.ts | 6 +++--- lib/agenda/stop.ts | 4 ++-- lib/utils/process-jobs.ts | 38 +++++++++++++++++++++++++++++++++++--- 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/lib/agenda/index.ts b/lib/agenda/index.ts index 21b7c2b44..79eba5508 100644 --- a/lib/agenda/index.ts +++ b/lib/agenda/index.ts @@ -100,7 +100,7 @@ class Agenda extends EventEmitter { _mdb!: MongoDb; _collection!: Collection; _nextScanAt: any; - _processInterval: any; + _processTimer: any; cancel!: typeof cancel; close!: typeof close; diff --git a/lib/agenda/start.ts b/lib/agenda/start.ts index 2c4805c20..c4e2b1f3b 100644 --- a/lib/agenda/start.ts +++ b/lib/agenda/start.ts @@ -12,7 +12,7 @@ const debug = createDebugger("agenda:start"); * @returns resolves if db set beforehand, returns undefined otherwise */ export const start = async function (this: Agenda): Promise { - if (this._processInterval) { + if (this._processTimer) { debug("Agenda.start was already called, ignoring"); return this._ready; } @@ -22,8 +22,8 @@ export const start = async function (this: Agenda): Promise { "Agenda.start called, creating interval to call processJobs every [%dms]", this._processEvery ); - this._processInterval = setInterval( - processJobs.bind(this), + this._processTimer = setTimer( + processJobsOnTimer.bind(this), this._processEvery ); process.nextTick(processJobs.bind(this)); diff --git a/lib/agenda/stop.ts b/lib/agenda/stop.ts index 9fa8e98df..d13c604de 100644 --- a/lib/agenda/stop.ts +++ b/lib/agenda/stop.ts @@ -43,7 +43,7 @@ export const stop = async function (this: Agenda): Promise { }; debug("Agenda.stop called, clearing interval for processJobs()"); - clearInterval(this._processInterval); - this._processInterval = undefined; + clearTimeout(this._processTimeout); + this._processTimeout = undefined; return _unlockJobs(); }; diff --git a/lib/utils/process-jobs.ts b/lib/utils/process-jobs.ts index fdf506670..cb7275ecc 100644 --- a/lib/utils/process-jobs.ts +++ b/lib/utils/process-jobs.ts @@ -5,6 +5,38 @@ import { Agenda } from "../agenda"; const debug = createDebugger("agenda:internal:processJobs"); +/** + * Timer to run process methods for jobs + * + * Use this rather than setInterval to ensure that at most one processJobs is running at a time. + */ + +export const processJobsOnTimer = async function ( + this: Agenda, +): any { + // NOTE: There is the potential for a bit of drift if this job runs late. + // But it's internal to the process, so likely it's reasonable. We could + // store a time on `this` and add 5 seconds every time, but this is simpler + // and less invasive. + // TODO: Is this right in typescript? + const lastRun = new Date(); + + // Actually check for jobs to run + processJobs().bind(this) + + // Compute the remaining time for the next run, subtracting out the time that passed since we started. + // Unless we already missed it; then schedule it right now. + const delayToNextRun = this._processEvery - ((new Date()) - lastRun); + if (delayToNextRun < 0) + delayToNextRun = 0; + + // And schedule ourselves to run again at the appropriate time. + this._processTimer = setTimer( + processJobsOnTimer.bind(this), + this._processEvery + ); +} + /** * Process methods for jobs * @param {Job} extraJob job to run immediately @@ -20,8 +52,8 @@ export const processJobs = async function ( ); // Make sure an interval has actually been set // Prevents race condition with 'Agenda.stop' and already scheduled run - if (!this._processInterval) { - debug("no _processInterval set when calling processJobs, returning"); + if (!this._processTimer) { + debug("no _processTimer set when calling processJobs, returning"); return; } @@ -283,7 +315,7 @@ export const processJobs = async function ( * @returns {undefined} */ function runOrRetry() { - if (self._processInterval) { + if (self._processTimer) { // @todo: We should check if job exists const job = jobQueue.pop()!; const jobDefinition = definitions[job.attrs.name];