diff --git a/src/imports/data/data.js b/src/imports/data/data.js index 7f0c414..5df91e5 100644 --- a/src/imports/data/data.js +++ b/src/imports/data/data.js @@ -47,6 +47,7 @@ import { renderTemplate } from '../template'; import { convertStringOfFieldsSeparatedByCommaIntoObjectToFind } from '../utils/convertStringOfFieldsSeparatedByCommaIntoObjectToFind'; import { randomId } from '../utils/random'; import { errorReturn, successReturn } from '../utils/return'; +import { handleTransactionError, retryMongoTransaction } from '../utils/transaction'; import populateDetailFields from './populateDetailFields/fromArray'; export async function getNextUserFromQueue({ authTokenId, document, queueId, contextUser }) { @@ -494,7 +495,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, const dbSession = client.startSession({ defaultTransactionOptions: TRANSACTION_OPTIONS }); try { - const transactionResult = await dbSession.withTransaction(async function createTransaction() { + const transactionResult = await retryMongoTransaction(() => dbSession.withTransaction(async function createTransaction() { tracingSpan?.addEvent('Processing login'); const processLoginResult = await processCollectionLogin({ meta: metaObject, data }); if (processLoginResult.success === false) { @@ -810,6 +811,8 @@ export async function create({ authTokenId, document, data, contextUser, upsert, tracingSpan?.addEvent('Record inserted', { insertedId: insertResult.insertedId }); } } catch (e) { + await handleTransactionError(e, dbSession); + logger.error(e, `Error on insert ${MetaObject.Namespace.ns}.${document}: ${e.message}`); tracingSpan?.addEvent('Error on insert', { error: e.message }); tracingSpan?.setAttribute({ error: e.message }); @@ -892,6 +895,8 @@ export async function create({ authTokenId, document, data, contextUser, upsert, try { await Konsistent.processChangeSync(document, 'create', user, { newRecord: resultRecord }, dbSession); } catch (e) { + await handleTransactionError(e, dbSession); + tracingSpan?.addEvent('Error on sync Konsistent', { error: e.message }); logger.error(e, `Error on sync Konsistent ${document}: ${e.message}`); await dbSession.abortTransaction(); @@ -902,7 +907,7 @@ export async function create({ authTokenId, document, data, contextUser, upsert, } return errorReturn(`[${document}] Error on insert, there is no affected record`); - }); + })); if (transactionResult != null && transactionResult.success != null) { tracingSpan?.addEvent('Operation result', omit(transactionResult, ['data'])); @@ -1038,11 +1043,14 @@ export async function update({ authTokenId, document, data, contextUser, tracing } } + let isRetry = false; const originals = {}; const dbSession = client.startSession({ defaultTransactionOptions: TRANSACTION_OPTIONS }); + try { - const transactionResult = await dbSession.withTransaction(async function updateTransaction() { + const transactionResult = await retryMongoTransaction(() => dbSession.withTransaction(async function updateTransaction() { tracingSpan?.addEvent('Processing login'); + const processLoginResult = await processCollectionLogin({ meta: metaObject, data }); if (processLoginResult.success === false) { return processLoginResult; @@ -1108,7 +1116,7 @@ export async function update({ authTokenId, document, data, contextUser, tracing } // outdateRecords are records that user are trying to update but they are out of date - if (metaObject.ignoreUpdatedAt !== true) { + if (metaObject.ignoreUpdatedAt !== true && isRetry === false) { const outdateRecords = data.ids.filter(id => { const record = originals[id._id]; if (record == null) { @@ -1159,6 +1167,7 @@ export async function update({ authTokenId, document, data, contextUser, tracing } } + isRetry = true; const emailsToSend = []; const updateResults = await BluebirdPromise.mapSeries(existsRecords, async record => { @@ -1302,10 +1311,13 @@ export async function update({ authTokenId, document, data, contextUser, tracing try { tracingSpan?.addEvent('Updating record', { filter, updateOperation }); await collection.updateOne(filter, updateOperation, { session: dbSession }); + return successReturn({ _id: record._id, ...bodyData }); } catch (e) { - logger.error(e, `Error on update ${MetaObject.Namespace.ns}.${document}: ${e.message}`); - tracingSpan?.addEvent('Error on update', { error: e.message }); + await handleTransactionError(e, dbSession); + + logger.error(e, `Error updating record ${MetaObject.Namespace.ns}.${document}: ${e.message}`); + tracingSpan?.addEvent('Error updating record', { error: e.message }); tracingSpan?.setAttribute({ error: e.message }); if (e.code === 11000) { @@ -1401,9 +1413,10 @@ export async function update({ authTokenId, document, data, contextUser, tracing try { await Konsistent.processChangeSync(document, 'update', user, { originalRecord, newRecord }, dbSession); } catch (e) { + await handleTransactionError(e, dbSession); + logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`); tracingSpan?.addEvent('Error on Konsistent', { error: e.message }); - await dbSession.abortTransaction(); return errorReturn(`[${document}] Error on Konsistent: ${e.message}`); } @@ -1437,7 +1450,7 @@ export async function update({ authTokenId, document, data, contextUser, tracing // Full is the full affected documents, Changed are the changed props only return successReturn({ full: responseData, changed: updateResults.map(r => r.data) }); } - }); + })); if (transactionResult != null && transactionResult.success != null) { tracingSpan?.addEvent('Operation result', omit(transactionResult, ['data'])); diff --git a/src/imports/utils/transaction.ts b/src/imports/utils/transaction.ts new file mode 100644 index 0000000..6cb4dae --- /dev/null +++ b/src/imports/utils/transaction.ts @@ -0,0 +1,54 @@ +import { ClientSession, MongoServerError } from 'mongodb'; +import { logger } from './logger'; + +const ERROR_CODES = ['TemporarilyUnavailableException', 'WriteConflictException', 'WriteConflict']; + +/** + * Retries a MongoDB transaction after it encounters a write conflict error. + * https://stackoverflow.com/a/78659965/11068174 + * + * @param fn - The function to retry. + * @param retries - The number of retries to attempt. + * @returns The result of the function. + */ +export async function retryMongoTransaction Promise>(fn: T, retries: number = 13) { + let lastError: Error | undefined; + let isRetry = false; + + while (retries-- > 0) { + try { + return await fn(isRetry); + } catch (error) { + isRetry = true; + lastError = error as Error; + const mongoError = error as MongoServerError; + if (mongoError.type === 'MongoServerError') { + logger.debug(`MongoServerError ${mongoError.message} - ${mongoError.codeName}`); + } + if (ERROR_CODES.includes(mongoError.codeName ?? '')) { + logger.debug(`${mongoError.codeName}, retrying transaction ${retries}x - ${lastError.message}`); + await new Promise(resolve => setTimeout(resolve, 1000)); + continue; + } + + throw error; + } + } + + throw lastError; +} + +/** + * Handles error during a transaction. If it is a expected transaction error, throw it so it can be retried, otherwise do nothing. + * If a session is provided, it will be aborted. + * @param error - The error to handle. + * @param session - The session to abort. + */ +export async function handleTransactionError(error: unknown, session?: ClientSession) { + await session?.abortTransaction(); + + if (ERROR_CODES.includes((error as MongoServerError).codeName ?? '')) { + logger.error(`handleTransaction ${(error as MongoServerError).codeName}`); + throw error; + } +}