Skip to content

Commit

Permalink
refactored into rateLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
f-w committed Oct 27, 2024
1 parent aae50f4 commit ebac98d
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions src/utils/bullMQ/test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Queue, QueueEvents, Worker } from 'bullmq';
import { promisify } from 'util';
export const wait = promisify(setTimeout);

const myQueue = new Queue('foo', {
connection: {
Expand All @@ -15,32 +17,50 @@ const myQueue = new Queue('foo', {
},
});

async function addJobs() {
for (let i = 0; i < 10; i++) {
async function theWork() {
console.log({ count: i, time: new Date() });
}
const queueEvents = new QueueEvents('foo');
const queuedID = [];
// IMPORTANT: place queueEvents.on before myQueue.add
queueEvents.on('completed', ({ jobId }) => {
if (!j?.id) {
queuedID.push(jobId);
return;
}
if (jobId !== j?.id) {
return;
function rateLimit(queueName: string, fn: (...args: any[]) => Promise<any>) {
return async function (...args: any[]): Promise<any> {
return new Promise(async (resolve, reject) => {
const queueEvents = new QueueEvents(queueName);
const queuedID = [];
// IMPORTANT: place queueEvents.on before myQueue.add
queueEvents.on('completed', async ({ jobId }) => {
if (!j?.id) {
queuedID.push(jobId);
return;
}
if (jobId !== j?.id) {
return;
}
try {
resolve(await fn.apply(this, args));
} catch (ex) {
reject(ex);
}
queueEvents.close();
});
const j = await myQueue.add('myJobName', undefined);
// extra guard in case queueEvents.on is called before j is assigned.
if (queuedID.indexOf(j.id) >= 0) {
try {
resolve(await fn.apply(this, args));
} catch (ex) {
reject(ex);
}
}
theWork();
queueEvents.close();
});
const j = await myQueue.add('myJobName', {
count: i,
});
// extra guard in case queueEvents.on is called before j is assigned.
if (queuedID.indexOf(j.id) >= 0) {
theWork();
}
};
}

async function theWork(count) {
await wait(100);
console.log({ count, time: new Date() });
}

async function addJobs() {
for (let i = 0; i < 10; i++) {
rateLimit('foo', theWork)(i);
// non-rateLimited counterpart
// theWork(i);
}
}

Expand Down

0 comments on commit ebac98d

Please sign in to comment.