Skip to content

Commit

Permalink
functional sms notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
omranjamal committed Oct 3, 2024
1 parent b97cbce commit 4ec6dcb
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 4 deletions.
2 changes: 2 additions & 0 deletions apps/jonogon-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
"@t3-oss/env-core": "0.11.0",
"@trpc/server": "10.45.2",
"bufferutil": "4.0.8",
"bull": "^4.16.3",
"cors": "2.8.5",
"dedent": "^1.5.3",
"es-toolkit": "1.15.1",
"express": "4.19.2",
"firebase-admin": "12.4.0",
Expand Down
26 changes: 26 additions & 0 deletions apps/jonogon-core/src/api/queues/index.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import {milestoneDetectionQueue} from '../../services/queues/milestoneDetectionQueue.mjs';
import {notificationsSchedulerQueue} from '../../services/queues/notificationsSchedulerQueue.mjs';

export async function initQueues() {
await milestoneDetectionQueue.add(
{},
{
jobId: 'detect-milestone',
repeat: {
// 6pm every day
cron: '0 18 * * *',
},
},
);

await notificationsSchedulerQueue.add(
{},
{
jobId: 'aggregate-notifications',
repeat: {
// 8pm every day
cron: '0 20 * * *',
},
},
);
}
2 changes: 2 additions & 0 deletions apps/jonogon-core/src/api/trpc/index.mts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export const router = t.router;
export const middleware = t.middleware;

export const publicProcedure = t.procedure;

export const createCallerFactory = t.createCallerFactory;
3 changes: 2 additions & 1 deletion apps/jonogon-core/src/api/trpc/procedures/comments/crud.mts
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ export const createComment = protectedProcedure
// notify all the other users who commented on the same comment
const otherCommenters = await ctx.services.postgresQueryBuilder
.selectFrom('comments')
.select(['created_by'])
.select('created_by')
.distinct()
.where('parent_id', '=', `${input.parent_id}`)
.where('id', '<>', `${created.id}`)
.execute();
Expand Down
2 changes: 1 addition & 1 deletion apps/jonogon-core/src/db/postgres/types.mts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export interface Notifications {
meta: Json | null;
petition_id: Int8 | null;
reply_comment_id: Int8 | null;
type: string | null;
type: string;
user_id: Int8;
vote_id: Int8 | null;
}
Expand Down
11 changes: 11 additions & 0 deletions apps/jonogon-core/src/index.mts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import {registerWSHandlers} from './api/websocket/index.mjs';
import {logger} from './logger.mjs';
import {createServices} from './services.mjs';
import cors from 'cors';
import {initQueues} from './api/queues/index.mjs';
import {processMilestoneDetectionQueue} from './services/queues/milestoneDetectionQueue.mjs';
import {processNotificationsSchedulerQueue} from './services/queues/notificationsSchedulerQueue.mjs';
import {processSmsNotificationDispatchQueue} from './services/queues/smsNotificationDispatchQueue.mjs';

const services = await createServices();

Expand Down Expand Up @@ -42,4 +46,11 @@ server.listen(env.PORT, '0.0.0.0', () => {
});
});

// QUEUES
await initQueues();

processMilestoneDetectionQueue(services);
processNotificationsSchedulerQueue(services);
processSmsNotificationDispatchQueue(services);

export {TAppRouter} from './api/trpc/routers/index.mjs';
57 changes: 57 additions & 0 deletions apps/jonogon-core/src/services/queues/milestoneDetectionQueue.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Queue from 'bull';
import {env} from '../../env.mjs';
import {TServices} from '../../services.mjs';
import {appRouter} from '../../api/trpc/routers/index.mjs';

export const milestoneDetectionQueue = new Queue<{}>(
'milestone_detection_queue',
env.REDIS_CONNECTION_URL,
);

export function processMilestoneDetectionQueue(services: TServices) {
const caller = appRouter.createCaller({services});

milestoneDetectionQueue.process(async (job) => {
const firstPagePetitions = await caller.petitions.list({
sort: 'votes',
order: 'desc',
filter: 'request',
page: 0,
});

const top5 = firstPagePetitions.data.slice(0, 5);

const topPetitionIDs = top5.map((petition) => `${petition.data.id}`);
const topPetitionSet = new Set<string>(topPetitionIDs);

const notifications = await services.postgresQueryBuilder
.selectFrom('notifications')
.select(['petition_id'])
.where('type', '=', 'top')
.where('petition_id', 'in', topPetitionIDs)
.execute();

notifications.forEach((notification) => {
topPetitionSet.delete(`${notification.petition_id}`);
});

const nextNotifications = [...topPetitionSet]
.map((petition_id) => {
return top5.find(
(petition) => petition.data.id === petition_id,
);
})
.filter((petition) => !!petition)
.map((petition) => ({
type: 'top',
petition_id: petition.data.id,
user_id: petition.data.created_by.id,
}));

await services.postgresQueryBuilder
.insertInto('notifications')
.values(nextNotifications)
.returning(['id'])
.execute();
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import Queue from 'bull';
import {env} from '../../env.mjs';
import {TServices} from '../../services.mjs';
import {smsNotificationDispatchQueue} from './smsNotificationDispatchQueue.mjs';

export const notificationsSchedulerQueue = new Queue<{}>(
'notification_aggregator_queue',
env.REDIS_CONNECTION_URL,
);

export function processNotificationsSchedulerQueue(services: TServices) {
notificationsSchedulerQueue.process(async (job) => {
const result = await services.postgresQueryBuilder
.selectFrom('notifications')
.select('user_id')
.distinct()
.where(
'created_at',
'>=',
new Date(Date.now() - 24 * 60 * 60 * 1000),
)
.execute();

await smsNotificationDispatchQueue.addBulk(
result.map((result) => ({
data: {
user_id: result.user_id,
},
})),
);
});
}
220 changes: 220 additions & 0 deletions apps/jonogon-core/src/services/queues/smsNotificationDispatchQueue.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import Queue from 'bull';
import {env} from '../../env.mjs';
import {TServices} from '../../services.mjs';
import {groupBy, countBy} from 'es-toolkit';
import {decrypt} from '../../lib/crypto/encryption.mjs';
import {deriveKey} from '../../lib/crypto/keys.mjs';

export const smsNotificationDispatchQueue = new Queue<{user_id: string}>(
'sms_notification_dispatch_queue',
{
redis: env.REDIS_CONNECTION_URL,
limiter: {
max: 120,
duration: 60_000,
},
},
);

export function processSmsNotificationDispatchQueue(services: TServices) {
smsNotificationDispatchQueue.process(async (job) => {
const user = await services.postgresQueryBuilder
.selectFrom('users')
.select([
'id',
'phone_number_encryption_iv',
'phone_number_encryption_key_salt',
'encrypted_phone_number',
])
.where('id', '=', job.data.user_id)
.executeTakeFirst();

if (!user) {
return;
}

const result = await services.postgresQueryBuilder
.selectFrom('notifications')
.where('user_id', '=', job.data.user_id)
.where('actor_user_id', '<>', job.data.user_id)
.selectAll()
.execute();

const grouped = groupBy(result, (item) => item.type);

// PETITION COMMENTS
const postCommentIDs = new Set<string>();

for (const notification of grouped.comment ?? []) {
if (notification.comment_id) {
postCommentIDs.add(notification.comment_id);
}
}

for (const notification of grouped.reply_to_someones_comment ?? []) {
if (notification.reply_comment_id) {
postCommentIDs.add(notification.reply_comment_id);
}
}

const petitionCommentCount = postCommentIDs.size;

// COMMENT REPLIES
const commentReplyIDs = new Set<string>();

for (const notification of grouped.reply ?? []) {
if (notification.reply_comment_id) {
commentReplyIDs.add(notification.reply_comment_id);
}
}

const commentReplyCount = commentReplyIDs.size;

// COMMENT VOTES
const commentVoteIDs = new Set<string>();

for (const notification of grouped.reply_vote ?? []) {
if (notification.comment_vote_id) {
commentVoteIDs.add(notification.comment_vote_id);
}
}

for (const notification of grouped.comment_vote ?? []) {
if (notification.comment_vote_id) {
commentVoteIDs.add(notification.comment_vote_id);
}
}

const commentVoteCount = commentVoteIDs.size;

// PETITION VOTES
const petitionVoteIDs = new Set<string>();

for (const notification of grouped.vote ?? []) {
if (notification.vote_id) {
petitionVoteIDs.add(notification.vote_id);
}
}

const petitionVoteCount = petitionVoteIDs.size;

// PETITION MODERATION
const petitionStatus: {
[petition_id: string]: 'approved' | 'rejected' | 'formalized';
} = {};

for (const notification of grouped.petition_approved ?? []) {
if (notification.petition_id) {
petitionStatus[notification.petition_id] = 'approved';
}
}

for (const notification of grouped.petition_rejected ?? []) {
if (notification.petition_id) {
petitionStatus[notification.petition_id] = 'rejected';
}
}

for (const notification of grouped.petition_formalized ?? []) {
if (notification.petition_id) {
petitionStatus[notification.petition_id] = 'formalized';
}
}

const petitionStatusCounts = countBy(
Object.values(petitionStatus),
(item) => item,
);

const approvedPetitionCount = petitionStatusCounts.approved ?? 0;
const rejectedPetitionCount = petitionStatusCounts.rejected ?? 0;
const formalizedPetitionCount = petitionStatusCounts.formalized ?? 0;

// PETITION MILESTONES
const topPetitionIDs = new Set<string>();

for (const notification of grouped.top ?? []) {
if (notification.petition_id) {
topPetitionIDs.add(notification.petition_id);
}
}

const topPetitionCount = topPetitionIDs.size;

let message = `Your https://jonogon.org in the last 24 hours:\n`;

if (topPetitionCount > 0) {
if (topPetitionCount === 1) {
message += '- 1 petition is in top 5\n';
} else {
message += `- ${topPetitionCount} petitions are top 5\n`;
}
}

if (petitionVoteCount > 0) {
if (petitionVoteCount === 1) {
message += '- 1 vote on a petition\n';
} else {
message += `- ${petitionVoteCount} votes on your petitions\n`;
}
}

if (petitionCommentCount > 0) {
if (petitionCommentCount === 1) {
message += '- 1 new comment on a petition\n';
} else {
message += `- ${petitionCommentCount} new comments across your petitions\n`;
}
}

if (commentReplyCount > 0) {
if (commentReplyCount === 1) {
message += '- 1 reply to a comment\n';
} else {
message += `- ${commentReplyCount} replies to your comments\n`;
}
}

if (commentVoteCount > 0) {
if (commentVoteCount === 1) {
message += '- 1 vote on a comment\n';
} else {
message += `- ${commentVoteCount} votes on your comments\n`;
}
}

if (approvedPetitionCount > 0) {
if (approvedPetitionCount === 1) {
message += '- 1 petition was approved\n';
} else {
message += `- ${approvedPetitionCount} petitions were approved\n`;
}
}

if (rejectedPetitionCount > 0) {
if (rejectedPetitionCount === 1) {
message += '- 1 petition was rejected\n';
} else {
message += `- ${rejectedPetitionCount} petitions were rejected\n`;
}
}

if (formalizedPetitionCount > 0) {
if (formalizedPetitionCount === 1) {
message += '- 1 petition was formalized\n';
} else {
message += `- ${formalizedPetitionCount} petitions were formalized\n`;
}
}

const key = await deriveKey(
env.COMMON_ENCRYPTION_SECRET,
user.phone_number_encryption_key_salt,
);

const iv = Buffer.from(user.phone_number_encryption_iv, 'base64');
const number = decrypt(key, iv, user.encrypted_phone_number);

await services.smsService.sendSMS(number, message);
});
}
Loading

0 comments on commit 4ec6dcb

Please sign in to comment.