Skip to content

Commit

Permalink
[ORCT-118] Extract find or create period (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
xsalonx authored Sep 27, 2023
1 parent 8e3dd23 commit 190a754
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 154 deletions.
71 changes: 23 additions & 48 deletions app/lib/alimonitor-services/BookkeepingService.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const { databaseManager: {
repositories: {
RunRepository,
DetectorSubsystemRepository,
PeriodRepository,
BeamTypeRepository,
RunDetectorsRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { createOrForceUpdate } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

/**
* BookkeepingService used to synchronize runs
Expand Down Expand Up @@ -127,52 +127,27 @@ class BookkeepingService extends AbstractServiceSynchronizer {
const period = extractPeriod(periodName, beamType);
const { detectorsNameToId } = this;

return await BeamTypeRepository.T.findOrCreate({
where: {
name: beamType,
},
})
.then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: period.name,
},
defaults: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
}))
.catch((e) => {
throw new Error('Find or create period failed', {
cause: {
error: e.message,
meta: {
explicitValues: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
implicitValues: {
BeamType: beamType,
},
},
},
});
})
.then(async ([period, _]) => await RunRepository.T.upsert({
PeriodId: period.id,
...run,
}))
.then(async ([run, _]) => {
const d = detectorNames?.map((detectorName, i) => ({
run_number: run.runNumber,
detector_id: detectorsNameToId[detectorName],
quality: detectorQualities[i] }));

await RunDetectorsRepository.T.bulkCreate(
d, { updateOnDuplicate: ['quality'] },
);
});
const upsertRun = async ([dbPeriod, _]) => await RunRepository.upsert({
PeriodId: dbPeriod.id,
...run,
});

const bulkCreateRunDetectors = async ([run, _]) => {
const d = detectorNames?.map((detectorName, i) => ({
run_number: run.runNumber,
detector_id: detectorsNameToId[detectorName],
quality: detectorQualities[i] }));

await RunDetectorsRepository.bulkCreate(
d, { updateOnDuplicate: ['quality'] },
);
};

const pipeline = async () => await createOrForceUpdate(period)
.then(upsertRun)
.then(bulkCreateRunDetectors);

return await sequelize.transaction(async () => await pipeline());
}

/**
Expand Down
39 changes: 6 additions & 33 deletions app/lib/alimonitor-services/MonalisaService.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ const config = require('../config/configProvider.js');

const { databaseManager: {
repositories: {
BeamTypeRepository,
PeriodRepository,
DataPassRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

class MonalisaService extends AbstractServiceSynchronizer {
constructor() {
Expand Down Expand Up @@ -93,40 +92,14 @@ class MonalisaService extends AbstractServiceSynchronizer {

async executeDbAction(dataPass) {
const { period } = dataPass;

return await BeamTypeRepository.T.findOrCreate({
where: {
name: period.beamType,
},
})
.then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: period.name,
},
defaults: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
}))
.catch((e) => {
throw new Error('Find or create period failed', {
cause: {
error: e.message,
meta: {
explicitValues: {
name: period.name,
year: period.year,
},
},
},
});
})
.then(async ([period, _]) => await DataPassRepository.T.upsert({
const act = async () => findOrCreatePeriod(period)
.then(async ([period, _]) => await DataPassRepository.upsert({
PeriodId: period.id,
...dataPass,
}))
.then(async ([dataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ parentDataUnit: dataPass }));
.then(async ([dbDataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ parentDataUnit: dbDataPass }));

return await sequelize.transaction(async (_t1) => await act());
}
}

Expand Down
99 changes: 57 additions & 42 deletions app/lib/alimonitor-services/MonalisaServiceDetails.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

const AbstractServiceSynchronizer = require('./AbstractServiceSynchronizer.js');
const Utils = require('../utils');
const { ServicesEndpointsFormatter } = require('./helpers');
const { ServicesEndpointsFormatter, ServicesDataCommons: { PERIOD_NAME_REGEX } } = require('./helpers');
const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

const { databaseManager: {
repositories: {
RunRepository,
PeriodRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { extractPeriod } = require('./helpers/ServicesDataCommons.js');

class MonalisaServiceDetails extends AbstractServiceSynchronizer {
constructor() {
Expand All @@ -32,7 +32,7 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer {

this.ketpFields = {
run_no: 'runNumber',
raw_partition: 'period',
raw_partition: 'periodName',
};
}

Expand All @@ -52,59 +52,74 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer {
}

adjustDataUnit(dataPassDetails) {
return Utils.filterObject(dataPassDetails, this.ketpFields);
dataPassDetails = Utils.filterObject(dataPassDetails, this.ketpFields);
const { periodName } = dataPassDetails;
dataPassDetails.period = PERIOD_NAME_REGEX.test(periodName) ? extractPeriod(periodName) : undefined;
return dataPassDetails;
}

isDataUnitValid() {
return true;
}

async executeDbAction(dataPassDetails, forUrlMetaStore) {
const { parentDataUnit: dataPass } = forUrlMetaStore;
return (async () => {
if (/LHC[0-9]{2}[a-z]+/.test(dataPassDetails.period)) {
return await PeriodRepository.T.findOrCreate({
where: {
name: dataPassDetails.period,
},
});
const { parentDataUnit: dbDataPass } = forUrlMetaStore;

const getPresumedPeriod = async () => {
if (dataPassDetails.period) {
return await findOrCreatePeriod(dataPassDetails.period);
} else {
// eslint-disable-next-line max-len
this.logger.warn(`Incorrect period from monalisa ${dataPassDetails.period} for run ${dataPassDetails.runNumber} in data pass ${dataPass.name}`);
this.logger.warn(`Incorrect period name from monalisa ${dataPassDetails.periodName}
for run ${dataPassDetails.runNumber} in details of data pass ${dbDataPass.name}`);
return [undefined, undefined];
}
})()
.then(async ([period, _]) => {
dataPassDetails.PeriodId = period?.id;
return await RunRepository.T.findOrCreate({
where: {
runNumber: dataPassDetails.runNumber,
},
defualt: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
},
});
};

const findOrCreateRun = async ([dbPeriod, _]) => {
dataPassDetails.PeriodId = dbPeriod?.id;
return await RunRepository.findOrCreate({
where: {
runNumber: dataPassDetails.runNumber,
},
defualt: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
},
})
.catch(async (e) => {
throw new Error('Find or create run failed', {
cause: {
error: e.message,
meta: {
actualValueInDB: await RunRepository.findOne({ where: { runNumber: dataPassDetails.runNumber } }, { raw: true }),
inQueryValues: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
.catch(async (e) => {
throw new Error('Find or create run failed', {
cause: {
error: {
error: e.message,
cause: e.cause,
},
sourceValues: {
runNumber: dataPassDetails.runNumber,
periodName: dataPassDetails.period,
meta: {
actualValueInDB: await RunRepository.findOne(
{ where: { runNumber: dataPassDetails.runNumber } },
{ raw: true },
).catch((error) => `ERROR RETRIVING ADDITIONAL INFO FROM DB: ${error.message}`),

inQueryValues: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
},
sourceValues: {
runNumber: dataPassDetails.runNumber,
periodName: dataPassDetails.period,
},
},
},
},
});
});
})
.then(async ([run, _]) => await sequelize.transaction(() => run.addDataPasses(dataPass.id, { ignoreDuplicates: true })));
};

const addRunToDataPass = async ([dbRun, _]) => await dbRun.addDataPasses(dbDataPass.id, { ignoreDuplicates: true });

const pipeline = async () => await getPresumedPeriod()
.then(findOrCreateRun)
.then(addRunToDataPass);

return await pipeline();
}
}

Expand Down
38 changes: 10 additions & 28 deletions app/lib/alimonitor-services/MonalisaServiceMC.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ const config = require('../config/configProvider.js');

const { databaseManager: {
repositories: {
BeamTypeRepository,
PeriodRepository,
SimulationPassRepository,
DataPassRepository,
RunRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

class MonalisaServiceMC extends AbstractServiceSynchronizer {
constructor() {
Expand Down Expand Up @@ -114,37 +113,20 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer {
requestedEvents: simulationPass.requestedEvents,
outputSize: simulationPass.outputSize,
})
.then(async ([simulationPassDBInstance, _]) => {
.then(async ([dbSimulationPass, _]) => {
await Promise.all(simulationPass.anchoredPeriods.map(async (period) =>
this.findOrCreatePeriod(period)
findOrCreatePeriod(period)
.then(async ([period, _]) => {
const periodAddPromise = simulationPassDBInstance.addPeriod(period.id, { ignoreDuplicates: true });
const dataPassPipelinePromises = this.findOrCreateAndAddDataPasses(simulationPass, simulationPassDBInstance, period);
const runsAddPipeline = this.findOrCreateAndAddRuns(simulationPass, simulationPassDBInstance, period);
const periodAddPromise = dbSimulationPass.addPeriod(period.id, { ignoreDuplicates: true });
const dataPassPipelinePromises = this.findOrCreateAndAddDataPasses(simulationPass, dbSimulationPass, period);
const runsAddPipeline = this.findOrCreateAndAddRuns(simulationPass, dbSimulationPass, period);

await Promise.all([periodAddPromise, dataPassPipelinePromises, runsAddPipeline]);
})));
});
}

async findOrCreatePeriod({ name: periodName, year: periodYear, beamType }) {
return await sequelize.transaction(async () => PeriodRepository.findOrCreate({
where: {
name: periodName,
},
defaults: {
name: periodName,
year: periodYear,
BeamTypeId: !beamType ? undefined : (await BeamTypeRepository.findOrCreate({
where: {
name: beamType,
},
}))[0]?.id,
},
}));
}

async findOrCreateAndAddDataPasses(simulationPass, simulationPassDBInstance, period) {
async findOrCreateAndAddDataPasses(simulationPass, dbSimulationPass, period) {
const promises = simulationPass.anchoredPasses
.map((passSuffix) => sequelize.transaction(
() => DataPassRepository.findOrCreate({
Expand All @@ -155,13 +137,13 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer {
name: `${period.name}_${passSuffix}`,
PeriodId: period.id,
},
}).then(([dataPass, _]) => simulationPassDBInstance.addDataPass(dataPass.id,
}).then(([dataPass, _]) => dbSimulationPass.addDataPass(dataPass.id,
{ ignoreDuplicates: true })),
));
return await Promise.all(promises);
}

async findOrCreateAndAddRuns(simulationPass, simulationPassDBInstance, period) {
async findOrCreateAndAddRuns(simulationPass, dbSimulationPass, period) {
const promises = simulationPass.runs.map((runNumber) => sequelize.transaction(async () => {
const insertWithoutPeriod = simulationPass.anchoredPeriods.length > 1;
await RunRepository.findOrCreate({
Expand All @@ -174,7 +156,7 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer {
},
});

return await simulationPassDBInstance.addRun(runNumber, { ignoreDuplicates: true });
return await dbSimulationPass.addRun(runNumber, { ignoreDuplicates: true });
}));

return await Promise.all(promises);
Expand Down
4 changes: 2 additions & 2 deletions app/lib/database/DatabaseManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class DatabaseManager {
constructor() {
this.logger = new Log(DatabaseManager.name);
this.schema = 'public';
const o2rct_namespace = cls.createNamespace('o2rct-namespace');
Sequelize.useCLS(o2rct_namespace);
this.o2rct_namespace = cls.createNamespace('o2rct-namespace');
Sequelize.useCLS(this.o2rct_namespace);

this.sequelize = new Sequelize({
...config.database,
Expand Down
2 changes: 1 addition & 1 deletion app/lib/database/repositories/Repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Repository {
* @return {Promise<boolean>} promise that resolves when the patch has been applied
*/
async updateOne(dbOject, patch) {
return dbOject.update(patch);
return await dbOject.update(patch);
}

/**
Expand Down
Loading

0 comments on commit 190a754

Please sign in to comment.