Skip to content

Commit

Permalink
refactor(functions): migrate scheduler functions to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
mkue committed Sep 30, 2023
1 parent 114a5a7 commit 98cc80e
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import axios from 'axios';
import * as functions from 'firebase-functions';
import { logger } from 'firebase-functions';
import { DateTime } from 'luxon';
import { FirestoreAdmin } from '../../../../shared/src/firebase/admin/FirestoreAdmin';
import { EXCHANGE_RATES_PATH, ExchangeRates, ExchangeRatesEntry } from '../../../../shared/src/types/ExchangeRates';
Expand Down Expand Up @@ -67,7 +67,7 @@ export class ExchangeRateImporter {
fetchAndStoreExchangeRates = async (dt: DateTime): Promise<ExchangeRateResponse> => {
const rates = await this.fetchExchangeRates(dt);
await this.storeExchangeRates(rates);
functions.logger.info('Ingested exchange rates');
logger.info('Ingested exchange rates');
return rates;
};
}
38 changes: 17 additions & 21 deletions functions/src/cron/exchange-rate-import/index.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
import * as functions from 'firebase-functions';
import { logger } from 'firebase-functions';
import { onSchedule } from 'firebase-functions/lib/v2/providers/scheduler';
import { DateTime } from 'luxon';
import { ExchangeRateImporter } from './ExchangeRateImporter';

/**
* Function periodically scrapes currency exchange rates and saves them to firebase
*/
export default functions
.runWith({
timeoutSeconds: 540,
})
.pubsub.schedule('0 1 * * *')
.onRun(async () => {
const exchangeRateImporter = new ExchangeRateImporter();
const existingExchangeRates = await exchangeRateImporter.getAllExchangeRates();
for (
let timestamp = ExchangeRateImporter.startTimestamp;
timestamp <= Date.now() / 1000;
timestamp += ExchangeRateImporter.secondsInDay
) {
if (!existingExchangeRates.has(timestamp)) {
try {
await exchangeRateImporter.fetchAndStoreExchangeRates(DateTime.fromSeconds(timestamp));
} catch (error) {
functions.logger.error(`Could not ingest exchange rate`, error);
}
export default onSchedule('0 1 * * *', async () => {
const exchangeRateImporter = new ExchangeRateImporter();
const existingExchangeRates = await exchangeRateImporter.getAllExchangeRates();
for (
let timestamp = ExchangeRateImporter.startTimestamp;
timestamp <= Date.now() / 1000;
timestamp += ExchangeRateImporter.secondsInDay
) {
if (!existingExchangeRates.has(timestamp)) {
try {
await exchangeRateImporter.fetchAndStoreExchangeRates(DateTime.fromSeconds(timestamp));
} catch (error) {
logger.error(`Could not ingest exchange rate`, error);
}
}
});
}
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as functions from 'firebase-functions';
import { logger } from 'firebase-functions';
import imaps from 'imap-simple';
import _ from 'lodash';
import { Source, simpleParser } from 'mailparser';
Expand Down Expand Up @@ -40,7 +40,7 @@ export class PostFinanceBalanceImporter {

retrieveBalanceMails = async (): Promise<BankBalance[]> => {
try {
functions.logger.info('Start checking balance inbox');
logger.info('Start checking balance inbox');
const config = {
imap: {
user: POSTFINANCE_EMAIL_USER,
Expand All @@ -54,7 +54,7 @@ export class PostFinanceBalanceImporter {
};
const connection = await imaps.connect(config);
await connection.openBox('INBOX');
functions.logger.info('Connected to inbox');
logger.info('Connected to inbox');
const messages = await connection.search(this.searchCriteria, this.fetchOptions);
const balances = await Promise.all(
messages.map(async (item: any) => {
Expand All @@ -66,10 +66,10 @@ export class PostFinanceBalanceImporter {
}),
);
connection.end();
functions.logger.info('Retrieved balances');
logger.info('Retrieved balances');
return balances.flat();
} catch (error) {
functions.logger.error('Could not ingest balance mails', error);
logger.error('Could not ingest balance mails', error);
return [];
}
};
Expand All @@ -87,7 +87,7 @@ export class PostFinanceBalanceImporter {
} as BankBalance,
];
} catch {
functions.logger.info(`Could not parse email with subject ${mail.subject}`);
logger.info(`Could not parse email with subject ${mail.subject}`);
return [];
}
};
Expand Down
4 changes: 2 additions & 2 deletions functions/src/cron/postfinance-balance-import/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as functions from 'firebase-functions';
import { onSchedule } from 'firebase-functions/v2/scheduler';
import { PostFinanceBalanceImporter } from './PostFinanceBalanceImporter';

/**
* Function periodically connects to the gmail account where we send the postfinance balance statements,
* parses the emails and stores the current balances into firestore.
*/
export default functions.pubsub.schedule('0 * * * *').onRun(async () => {
export default onSchedule('0 * * * *', async () => {
const postFinanceImporter = new PostFinanceBalanceImporter();
const balances = await postFinanceImporter.retrieveBalanceMails();
await postFinanceImporter.storeBalances(balances);
Expand Down
12 changes: 6 additions & 6 deletions functions/src/webhooks/admin/scripts/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as functions from 'firebase-functions';
import { logger } from 'firebase-functions';
import { onCall } from 'firebase-functions/v2/https';
import { FirestoreAdmin } from '../../../../../shared/src/firebase/admin/FirestoreAdmin';
import { StripeEventHandler } from '../../../../../shared/src/stripe/StripeEventHandler';
Expand Down Expand Up @@ -34,27 +34,27 @@ const batchImportStripeChargesFunction = onCall(async ({ auth }) => {
const stripeBatchSize = 100; // max batch size supported by stripe
const charges = [];
try {
functions.logger.info('Querying Stripe API...');
logger.info('Querying Stripe API...');
for await (const charge of stripeEventHandler.stripe.charges.list({
expand: ['data.balance_transaction', 'data.invoice'],
limit: stripeBatchSize,
})) {
charges.push(charge);
}
functions.logger.info(`Querying stripe finished.`);
logger.info(`Querying stripe finished.`);

await Promise.all(
charges.map((charge) => {
try {
stripeEventHandler.storeCharge(charge);
} catch (error) {
functions.logger.error(error);
logger.error(error);
}
}),
);
functions.logger.info(`Ingestion finished.`);
logger.info(`Ingestion finished.`);
} catch (error) {
functions.logger.error(error);
logger.error(error);
throw error;
}
});
Expand Down
6 changes: 3 additions & 3 deletions functions/src/webhooks/stripe/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as functions from 'firebase-functions';
import { logger } from 'firebase-functions';
import { onRequest } from 'firebase-functions/v2/https';
import Stripe from 'stripe';
import { FirestoreAdmin } from '../../../../shared/src/firebase/admin/FirestoreAdmin';
Expand All @@ -21,12 +21,12 @@ export default onRequest(async (request, response) => {
break;
}
default: {
functions.logger.info(`Unhandled event type ${event.type}`);
logger.info(`Unhandled event type ${event.type}`);
}
}
response.send();
} catch (error) {
functions.logger.error(error);
logger.error(error);
response.status(500).send(`Webhook Error. Check the logs.`);
}
});

0 comments on commit 98cc80e

Please sign in to comment.