Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Active subscribers #22

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { sendPushNotification } from './helpers/beams';
import db from './helpers/mysql';
import { sha256 } from './helpers/utils';
import { getProposal, getProposalScores } from './helpers/proposal';
import {
getActiveSubscribers,
updateActiveSubscribers,
updateSubscribers
} from './helpers/subscribers';

const delay = 5;
const interval = 15;
Expand Down Expand Up @@ -62,32 +67,42 @@ export async function sendEvent(event, to) {
event.secret = sha256(`${to}${serviceEventsSalt}`);
const headerSecret = sha256(`${to}${process.env.SERVICE_EVENTS_SALT}`);
try {
const controller = new AbortController();
setTimeout(() => controller.abort(), 10000); // Abort request after 10 seconds if there is no response
const res = await fetch(to, {
signal: controller.signal,
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authentication: headerSecret
},
body: JSON.stringify(event)
});
return res.text();
if (res.status !== 200) {
throw new Error(`${res.status} ${res.statusText}`);
}
return 'success';
} catch (error) {
console.log('[events] Error sending event data to webhook', to, JSON.stringify(error));
return;
return 'failed';
}
}

const sendEventToWebhookSubscribers = (event, subscribers) => {
Promise.allSettled(
subscribers
.filter(subscriber => [event.space, '*'].includes(subscriber.space))
.map(subscriber => sendEvent(event, subscriber.url))
)
.then(() => console.log('[events] Process event done'))
.catch(e => console.log('[events] Process event failed', e));
const sendEventToWebhookSubscribers = async event => {
const subscribers = await getActiveSubscribers(event.space);
console.log('[events] Subscribers for this event: ', subscribers.length);
if (!subscribers.length) return;
try {
const allSubscribersStatus = await Promise.all(
subscribers.map(subscriber => sendEvent(event, subscriber.url))
);
updateSubscribers(subscribers, allSubscribersStatus);
} catch (e) {
console.log('[events] sendEventToWebhookSubscribers failed', e);
}
};

async function processEvents(subscribers) {
async function processEvents() {
const ts = parseInt((Date.now() / 1e3).toFixed()) - delay;
const events = await db.queryAsync('SELECT * FROM events WHERE expire <= ?', [ts]);

Expand All @@ -108,7 +123,7 @@ async function processEvents(subscribers) {
// TODO: handle errors and retry
if (servicePushNotifications && event.event === 'proposal/start') sendPushNotification(event);
sendEventToDiscordSubscribers(event.event, proposalId);
sendEventToWebhookSubscribers(event, subscribers);
sendEventToWebhookSubscribers(event);

try {
await db.queryAsync('DELETE FROM events WHERE id = ? AND event = ? LIMIT 1', [
Expand All @@ -123,15 +138,14 @@ async function processEvents(subscribers) {
}

async function run() {
await snapshot.utils.sleep(interval * 1e3);
try {
const subscribers = await db.queryAsync('SELECT * FROM subscribers');
console.log('[events] Subscribers', subscribers.length);
await processEvents(subscribers);
await processEvents();
} catch (e) {
console.log('[events] Failed to process', e);
}
await snapshot.utils.sleep(interval * 1e3);
await run();
updateActiveSubscribers();
run();
}

if (serviceEvents) setTimeout(() => run(), interval * 1e3);
if (serviceEvents) run();
6 changes: 4 additions & 2 deletions src/helpers/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ CREATE TABLE subscribers (
owner VARCHAR(256) NOT NULL,
url TEXT NOT NULL,
space VARCHAR(256) NOT NULL,
active INT(11) NOT NULL,
created INT(11) NOT NULL,
last_active INT(11) DEFAULT 0 NOT NULL,
last_attempt INT(11) DEFAULT 0 NOT NULL,
active int DEFAULT 1 NOT NULL,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (id),
INDEX owner (owner),
INDEX space (space),
Expand Down
44 changes: 44 additions & 0 deletions src/helpers/subscribers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import db from './mysql';

const subcribersActiveTime = 30; // days

export const getActiveSubscribers = async space => {
const subscribersQuery = `
SELECT * FROM subscribers
WHERE active = 1 AND (space LIKE CONCAT('%', ?, '%') OR space = "*")
`;
const subscribersParams = [space];
return db.queryAsync(subscribersQuery, subscribersParams);
};

export const updateSubscribers = async (subscribers, allSubscribersStatus) => {
const subscriberSuccessIds = subscribers
.filter((_subscriber, index) => allSubscribersStatus[index] === 'success')
.map(subscriber => subscriber.id);

const ts = parseInt((Date.now() / 1e3).toFixed());
let query = `
UPDATE subscribers SET last_attempt = ? WHERE id IN (?);
`;
const params = [ts, subscribers.map(subscriber => subscriber.id)];
if (subscriberSuccessIds.length) {
query += `UPDATE subscribers SET last_active = ? WHERE id IN (?);`;
params.push(ts, subscriberSuccessIds);
}
return db.queryAsync(query, params).catch(e => {
console.log('[events] updateSubscribers failed', e);
});
};

export const updateActiveSubscribers = async () => {
const ts = subcribersActiveTime * 24 * 60 * 60;
const query = `
UPDATE subscribers SET active = 0
WHERE active = 1 AND
DATEDIFF(CURRENT_TIMESTAMP, created) > ? AND
last_active + ? < last_attempt;`;
const params = [subcribersActiveTime, ts];
return db.queryAsync(query, params).catch(e => {
console.log('[events] updateActiveSubscribers failed', e);
});;
};
2 changes: 1 addition & 1 deletion src/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ export async function loadSubscriptions() {
if (!subs[sub.space]) subs[sub.space] = [];
subs[sub.space].push(sub);
});
console.log('Subscriptions', Object.keys(subs).length);
console.log('Discord Subscriptions', Object.keys(subs).length);
}