From 190a754849b45266995552b2db426230cb82ea1e Mon Sep 17 00:00:00 2001 From: xsalonx <65893715+xsalonx@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:14:40 +0200 Subject: [PATCH] [ORCT-118] Extract find or create period (#229) --- .../alimonitor-services/BookkeepingService.js | 71 ++++-------- .../alimonitor-services/MonalisaService.js | 39 +------ .../MonalisaServiceDetails.js | 99 ++++++++++------- .../alimonitor-services/MonalisaServiceMC.js | 38 ++----- app/lib/database/DatabaseManager.js | 4 +- app/lib/database/repositories/Repository.js | 2 +- .../periods/findOrUpdateOrCreatePeriod.js | 103 ++++++++++++++++++ 7 files changed, 202 insertions(+), 154 deletions(-) create mode 100644 app/lib/services/periods/findOrUpdateOrCreatePeriod.js diff --git a/app/lib/alimonitor-services/BookkeepingService.js b/app/lib/alimonitor-services/BookkeepingService.js index 6611c03d2..a2f29a05e 100644 --- a/app/lib/alimonitor-services/BookkeepingService.js +++ b/app/lib/alimonitor-services/BookkeepingService.js @@ -21,11 +21,11 @@ const { databaseManager: { repositories: { RunRepository, DetectorSubsystemRepository, - PeriodRepository, - BeamTypeRepository, RunDetectorsRepository, }, + sequelize, } } = require('../database/DatabaseManager.js'); +const { createOrForceUpdate } = require('../services/periods/findOrUpdateOrCreatePeriod.js'); /** * BookkeepingService used to synchronize runs @@ -127,52 +127,27 @@ class BookkeepingService extends AbstractServiceSynchronizer { const period = extractPeriod(periodName, beamType); const { detectorsNameToId } = this; - return await BeamTypeRepository.T.findOrCreate({ - where: { - name: beamType, - }, - }) - .then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({ - where: { - name: period.name, - }, - defaults: { - name: period.name, - year: period.year, - BeamTypeId: beamType.id, - }, - })) - .catch((e) => { - throw new Error('Find or create period failed', { - cause: { - error: e.message, - meta: { - explicitValues: { - name: period.name, - year: period.year, - BeamTypeId: beamType.id, - }, - implicitValues: { - BeamType: beamType, - }, - }, - }, - }); - }) - .then(async ([period, _]) => await RunRepository.T.upsert({ - PeriodId: period.id, - ...run, - })) - .then(async ([run, _]) => { - const d = detectorNames?.map((detectorName, i) => ({ - run_number: run.runNumber, - detector_id: detectorsNameToId[detectorName], - quality: detectorQualities[i] })); - - await RunDetectorsRepository.T.bulkCreate( - d, { updateOnDuplicate: ['quality'] }, - ); - }); + const upsertRun = async ([dbPeriod, _]) => await RunRepository.upsert({ + PeriodId: dbPeriod.id, + ...run, + }); + + const bulkCreateRunDetectors = async ([run, _]) => { + const d = detectorNames?.map((detectorName, i) => ({ + run_number: run.runNumber, + detector_id: detectorsNameToId[detectorName], + quality: detectorQualities[i] })); + + await RunDetectorsRepository.bulkCreate( + d, { updateOnDuplicate: ['quality'] }, + ); + }; + + const pipeline = async () => await createOrForceUpdate(period) + .then(upsertRun) + .then(bulkCreateRunDetectors); + + return await sequelize.transaction(async () => await pipeline()); } /** diff --git a/app/lib/alimonitor-services/MonalisaService.js b/app/lib/alimonitor-services/MonalisaService.js index e758c8c56..86ee456ed 100644 --- a/app/lib/alimonitor-services/MonalisaService.js +++ b/app/lib/alimonitor-services/MonalisaService.js @@ -27,12 +27,11 @@ const config = require('../config/configProvider.js'); const { databaseManager: { repositories: { - BeamTypeRepository, - PeriodRepository, DataPassRepository, }, sequelize, } } = require('../database/DatabaseManager.js'); +const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js'); class MonalisaService extends AbstractServiceSynchronizer { constructor() { @@ -93,40 +92,14 @@ class MonalisaService extends AbstractServiceSynchronizer { async executeDbAction(dataPass) { const { period } = dataPass; - - return await BeamTypeRepository.T.findOrCreate({ - where: { - name: period.beamType, - }, - }) - .then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({ - where: { - name: period.name, - }, - defaults: { - name: period.name, - year: period.year, - BeamTypeId: beamType.id, - }, - })) - .catch((e) => { - throw new Error('Find or create period failed', { - cause: { - error: e.message, - meta: { - explicitValues: { - name: period.name, - year: period.year, - }, - }, - }, - }); - }) - .then(async ([period, _]) => await DataPassRepository.T.upsert({ + const act = async () => findOrCreatePeriod(period) + .then(async ([period, _]) => await DataPassRepository.upsert({ PeriodId: period.id, ...dataPass, })) - .then(async ([dataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ parentDataUnit: dataPass })); + .then(async ([dbDataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ parentDataUnit: dbDataPass })); + + return await sequelize.transaction(async (_t1) => await act()); } } diff --git a/app/lib/alimonitor-services/MonalisaServiceDetails.js b/app/lib/alimonitor-services/MonalisaServiceDetails.js index 49b284947..060d3f708 100644 --- a/app/lib/alimonitor-services/MonalisaServiceDetails.js +++ b/app/lib/alimonitor-services/MonalisaServiceDetails.js @@ -15,15 +15,15 @@ const AbstractServiceSynchronizer = require('./AbstractServiceSynchronizer.js'); const Utils = require('../utils'); -const { ServicesEndpointsFormatter } = require('./helpers'); +const { ServicesEndpointsFormatter, ServicesDataCommons: { PERIOD_NAME_REGEX } } = require('./helpers'); +const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js'); const { databaseManager: { repositories: { RunRepository, - PeriodRepository, }, - sequelize, } } = require('../database/DatabaseManager.js'); +const { extractPeriod } = require('./helpers/ServicesDataCommons.js'); class MonalisaServiceDetails extends AbstractServiceSynchronizer { constructor() { @@ -32,7 +32,7 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer { this.ketpFields = { run_no: 'runNumber', - raw_partition: 'period', + raw_partition: 'periodName', }; } @@ -52,7 +52,10 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer { } adjustDataUnit(dataPassDetails) { - return Utils.filterObject(dataPassDetails, this.ketpFields); + dataPassDetails = Utils.filterObject(dataPassDetails, this.ketpFields); + const { periodName } = dataPassDetails; + dataPassDetails.period = PERIOD_NAME_REGEX.test(periodName) ? extractPeriod(periodName) : undefined; + return dataPassDetails; } isDataUnitValid() { @@ -60,51 +63,63 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer { } async executeDbAction(dataPassDetails, forUrlMetaStore) { - const { parentDataUnit: dataPass } = forUrlMetaStore; - return (async () => { - if (/LHC[0-9]{2}[a-z]+/.test(dataPassDetails.period)) { - return await PeriodRepository.T.findOrCreate({ - where: { - name: dataPassDetails.period, - }, - }); + const { parentDataUnit: dbDataPass } = forUrlMetaStore; + + const getPresumedPeriod = async () => { + if (dataPassDetails.period) { + return await findOrCreatePeriod(dataPassDetails.period); } else { - // eslint-disable-next-line max-len - this.logger.warn(`Incorrect period from monalisa ${dataPassDetails.period} for run ${dataPassDetails.runNumber} in data pass ${dataPass.name}`); + this.logger.warn(`Incorrect period name from monalisa ${dataPassDetails.periodName} + for run ${dataPassDetails.runNumber} in details of data pass ${dbDataPass.name}`); return [undefined, undefined]; } - })() - .then(async ([period, _]) => { - dataPassDetails.PeriodId = period?.id; - return await RunRepository.T.findOrCreate({ - where: { - runNumber: dataPassDetails.runNumber, - }, - defualt: { - runNumber: dataPassDetails.runNumber, - PeriodId: dataPassDetails.PeriodId, - }, - }); + }; + + const findOrCreateRun = async ([dbPeriod, _]) => { + dataPassDetails.PeriodId = dbPeriod?.id; + return await RunRepository.findOrCreate({ + where: { + runNumber: dataPassDetails.runNumber, + }, + defualt: { + runNumber: dataPassDetails.runNumber, + PeriodId: dataPassDetails.PeriodId, + }, }) - .catch(async (e) => { - throw new Error('Find or create run failed', { - cause: { - error: e.message, - meta: { - actualValueInDB: await RunRepository.findOne({ where: { runNumber: dataPassDetails.runNumber } }, { raw: true }), - inQueryValues: { - runNumber: dataPassDetails.runNumber, - PeriodId: dataPassDetails.PeriodId, + .catch(async (e) => { + throw new Error('Find or create run failed', { + cause: { + error: { + error: e.message, + cause: e.cause, }, - sourceValues: { - runNumber: dataPassDetails.runNumber, - periodName: dataPassDetails.period, + meta: { + actualValueInDB: await RunRepository.findOne( + { where: { runNumber: dataPassDetails.runNumber } }, + { raw: true }, + ).catch((error) => `ERROR RETRIVING ADDITIONAL INFO FROM DB: ${error.message}`), + + inQueryValues: { + runNumber: dataPassDetails.runNumber, + PeriodId: dataPassDetails.PeriodId, + }, + sourceValues: { + runNumber: dataPassDetails.runNumber, + periodName: dataPassDetails.period, + }, }, }, - }, + }); }); - }) - .then(async ([run, _]) => await sequelize.transaction(() => run.addDataPasses(dataPass.id, { ignoreDuplicates: true }))); + }; + + const addRunToDataPass = async ([dbRun, _]) => await dbRun.addDataPasses(dbDataPass.id, { ignoreDuplicates: true }); + + const pipeline = async () => await getPresumedPeriod() + .then(findOrCreateRun) + .then(addRunToDataPass); + + return await pipeline(); } } diff --git a/app/lib/alimonitor-services/MonalisaServiceMC.js b/app/lib/alimonitor-services/MonalisaServiceMC.js index 75f638b4e..be0f101fb 100644 --- a/app/lib/alimonitor-services/MonalisaServiceMC.js +++ b/app/lib/alimonitor-services/MonalisaServiceMC.js @@ -20,14 +20,13 @@ const config = require('../config/configProvider.js'); const { databaseManager: { repositories: { - BeamTypeRepository, - PeriodRepository, SimulationPassRepository, DataPassRepository, RunRepository, }, sequelize, } } = require('../database/DatabaseManager.js'); +const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js'); class MonalisaServiceMC extends AbstractServiceSynchronizer { constructor() { @@ -114,37 +113,20 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer { requestedEvents: simulationPass.requestedEvents, outputSize: simulationPass.outputSize, }) - .then(async ([simulationPassDBInstance, _]) => { + .then(async ([dbSimulationPass, _]) => { await Promise.all(simulationPass.anchoredPeriods.map(async (period) => - this.findOrCreatePeriod(period) + findOrCreatePeriod(period) .then(async ([period, _]) => { - const periodAddPromise = simulationPassDBInstance.addPeriod(period.id, { ignoreDuplicates: true }); - const dataPassPipelinePromises = this.findOrCreateAndAddDataPasses(simulationPass, simulationPassDBInstance, period); - const runsAddPipeline = this.findOrCreateAndAddRuns(simulationPass, simulationPassDBInstance, period); + const periodAddPromise = dbSimulationPass.addPeriod(period.id, { ignoreDuplicates: true }); + const dataPassPipelinePromises = this.findOrCreateAndAddDataPasses(simulationPass, dbSimulationPass, period); + const runsAddPipeline = this.findOrCreateAndAddRuns(simulationPass, dbSimulationPass, period); await Promise.all([periodAddPromise, dataPassPipelinePromises, runsAddPipeline]); }))); }); } - async findOrCreatePeriod({ name: periodName, year: periodYear, beamType }) { - return await sequelize.transaction(async () => PeriodRepository.findOrCreate({ - where: { - name: periodName, - }, - defaults: { - name: periodName, - year: periodYear, - BeamTypeId: !beamType ? undefined : (await BeamTypeRepository.findOrCreate({ - where: { - name: beamType, - }, - }))[0]?.id, - }, - })); - } - - async findOrCreateAndAddDataPasses(simulationPass, simulationPassDBInstance, period) { + async findOrCreateAndAddDataPasses(simulationPass, dbSimulationPass, period) { const promises = simulationPass.anchoredPasses .map((passSuffix) => sequelize.transaction( () => DataPassRepository.findOrCreate({ @@ -155,13 +137,13 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer { name: `${period.name}_${passSuffix}`, PeriodId: period.id, }, - }).then(([dataPass, _]) => simulationPassDBInstance.addDataPass(dataPass.id, + }).then(([dataPass, _]) => dbSimulationPass.addDataPass(dataPass.id, { ignoreDuplicates: true })), )); return await Promise.all(promises); } - async findOrCreateAndAddRuns(simulationPass, simulationPassDBInstance, period) { + async findOrCreateAndAddRuns(simulationPass, dbSimulationPass, period) { const promises = simulationPass.runs.map((runNumber) => sequelize.transaction(async () => { const insertWithoutPeriod = simulationPass.anchoredPeriods.length > 1; await RunRepository.findOrCreate({ @@ -174,7 +156,7 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer { }, }); - return await simulationPassDBInstance.addRun(runNumber, { ignoreDuplicates: true }); + return await dbSimulationPass.addRun(runNumber, { ignoreDuplicates: true }); })); return await Promise.all(promises); diff --git a/app/lib/database/DatabaseManager.js b/app/lib/database/DatabaseManager.js index 7d62db52f..0ee510530 100644 --- a/app/lib/database/DatabaseManager.js +++ b/app/lib/database/DatabaseManager.js @@ -27,8 +27,8 @@ class DatabaseManager { constructor() { this.logger = new Log(DatabaseManager.name); this.schema = 'public'; - const o2rct_namespace = cls.createNamespace('o2rct-namespace'); - Sequelize.useCLS(o2rct_namespace); + this.o2rct_namespace = cls.createNamespace('o2rct-namespace'); + Sequelize.useCLS(this.o2rct_namespace); this.sequelize = new Sequelize({ ...config.database, diff --git a/app/lib/database/repositories/Repository.js b/app/lib/database/repositories/Repository.js index c5212aeaa..d17e359c6 100644 --- a/app/lib/database/repositories/Repository.js +++ b/app/lib/database/repositories/Repository.js @@ -89,7 +89,7 @@ class Repository { * @return {Promise} promise that resolves when the patch has been applied */ async updateOne(dbOject, patch) { - return dbOject.update(patch); + return await dbOject.update(patch); } /** diff --git a/app/lib/services/periods/findOrUpdateOrCreatePeriod.js b/app/lib/services/periods/findOrUpdateOrCreatePeriod.js new file mode 100644 index 000000000..2b5453b2f --- /dev/null +++ b/app/lib/services/periods/findOrUpdateOrCreatePeriod.js @@ -0,0 +1,103 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +const { databaseManager: { + repositories: { + PeriodRepository, + BeamTypeRepository, + }, +} } = require('../../database/DatabaseManager'); + +/** + * Find or create beam type + * @param {String|undefined} beamType beam type e.g. p-p, p-Pb, ... + * @returns {[SequelizeBeamType, boolean]|[undefined, undefined]} result of sequelize.Model.findOrCreate + */ +const findOrCreateBeamType = async (beamType) => ! beamType ? [undefined, undefined] : + await BeamTypeRepository.findOrCreate({ + where: { + name: beamType, + }, + }) + .catch((e) => { + throw new Error('Find or create beam type failed', { + cause: { + error: { + error: e.message, + cause: e.original, + }, + meta: { + explicitValues: { + name: beamType, + }, + }, + }, + }); + }); + +const periodErrorHandlerFactory = ({ name, year, beamType, BeamTypeId }) => (e) => { + throw new Error('Find/Upsert or create period with given beam type failed', { + cause: { + error: { + error: e.message, + cause: e.original, + }, + meta: { + explicitValues: { + name, + year, + BeamTypeId, + }, + implicitValues: { + BeamType: beamType, + }, + }, + }, + }); +}; + +/** + * Find or create period with given parameters + * @param {Period} period as {name, year, beamType} + * @returns {[SequelizePeriod, boolean]} result of sequelize.Model.findOrCreate + */ +const findOrCreatePeriod = async (period) => + await findOrCreateBeamType(period.beamType) + .then(async ([dbBeamType, _]) => await PeriodRepository.findOrCreate({ + where: { + name: period.name, + }, + defaults: { + name: period.name, + year: period.year, + BeamTypeId: dbBeamType?.id, + }, + })) + .catch(periodErrorHandlerFactory(period)); + +/** + * Upsert or create period with given parameters + * @param {Period} period as {name, year, beamType} + * @returns {[SequelizePeriod, boolean]} result of sequelize.Model.findOrCreate + */ +const createOrForceUpdate = async (period) => { + const { year } = period; + return await findOrCreatePeriod(period) + .then(async ([dbPeriod, _]) => [dbPeriod, await PeriodRepository.updateOne(dbPeriod, { year })]) + .catch(periodErrorHandlerFactory(period)); +}; + +module.exports = { + findOrCreatePeriod, + createOrForceUpdate, +};