From 802538bd8763dfd23f08e6fbd965a7b1610d86c1 Mon Sep 17 00:00:00 2001 From: Cabecinha84 <42519726+Cabecinha84@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:49:11 +0000 Subject: [PATCH] Refactor trySpawningGlobalApplication (#1421) * test aggregate function * fix * refactor testTrySpawningGlobalApplication, no more random stuff * improvements * fix typo * Fix plus logs --- ZelBack/src/services/appsService.js | 167 +++++++++++++--------------- ZelBack/src/services/dbHelper.js | 15 +++ 2 files changed, 95 insertions(+), 87 deletions(-) diff --git a/ZelBack/src/services/appsService.js b/ZelBack/src/services/appsService.js index eb84a27aa..bf47c9755 100644 --- a/ZelBack/src/services/appsService.js +++ b/ZelBack/src/services/appsService.js @@ -70,8 +70,8 @@ testingAppserver = httpShutdown(testingAppserver); const GlobalAppsSpawnLRUoptions = { max: 2000, - ttl: 1000 * 60 * 60 * 2, // 2 hours - maxAge: 1000 * 60 * 60 * 2, // 2 hours + ttl: 1000 * 60 * 60 * 6, // 6 hours + maxAge: 1000 * 60 * 60 * 6, // 6 hours }; const shortCache = { max: 500, @@ -9045,81 +9045,80 @@ async function trySpawningGlobalApplication() { throw new Error('Unable to detect Flux IP address'); } - // get all the applications list names - const globalAppNamesLocation = await getAllGlobalApplications(['name', 'geolocation', 'nodes']); + // get all the applications list names missing instances + const pipeline = [ + { + $lookup: { + from: 'zelappslocation', + localField: 'name', + foreignField: 'name', + as: 'locations', + }, + }, + { + $addFields: { + actual: { $size: '$locations.name' }, + }, + }, + { + $match: { + $expr: { $lt: ['$actual', { $ifNull: ['$instances', 3] }] }, + }, + }, + { + $project: { + _id: 0, + name: '$name', + actual: '$actual', + required: '$instances', + nodes: { $ifNull: ['$nodes', []] }, + geolocation: { $ifNull: ['$geolocation', []] }, + }, + }, + { $sort: { name: 1 } }, + ]; + + const db = dbHelper.databaseConnection(); + const database = db.db(config.database.appsglobal.database); + log.info('Checking for apps that are missing instances on the network.'); + let globalAppNamesLocation = await dbHelper.aggregateInDatabase(database, globalAppsInformation, pipeline); const numberOfGlobalApps = globalAppNamesLocation.length; if (!numberOfGlobalApps) { log.info('No installable application found'); - await serviceHelper.delay(config.fluxapps.installation.delay * 1000); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } + log.info(`Found ${numberOfGlobalApps} that are missing instances on the network.`); - const installDelay = config.fluxapps.installation.delay * 1000; - let probLn = Math.log(2 + numberOfGlobalApps); // from ln(2) -> ln(2 + x) - const adjustedDelay = installDelay / probLn; - - let appToRun; - // highly favor application that is targetting our node - // get my collateral - const myCollateral = await generalService.obtainNodeCollateralInformation(); - // get my ip address - // filter apps only those that include my ip or my collateral - const scopedApps = globalAppNamesLocation.filter((app) => app.nodes && (app.nodes.includes(myIP) || app.nodes.includes(`${myCollateral.txhash}:${myCollateral.txindex}`))); - const scopedAppsToRun = scopedApps.filter((app) => !trySpawningGlobalAppCache.has(app.name)); - // check if this app was already evaluated - const numberOfScopedAppsToRun = scopedAppsToRun.length; - if (numberOfScopedAppsToRun) { // some app should be prioritized on our node - const appToRunNumber = Math.floor((Math.random() * numberOfScopedAppsToRun)); - appToRun = scopedAppsToRun[appToRunNumber].name; - } - - if (appToRun) { // ensure higher rate spawning for scoped apps - probLn *= 2; - } - // if all ok Check hashes comparison if its out turn to start the app. higher than 1% probability. - const randomNumber = Math.floor((Math.random() * (config.fluxapps.installation.probability / probLn))); // higher probability for more apps on network - if (randomNumber !== 0) { - log.info('Other Flux nodes are evaluating application installation'); - await serviceHelper.delay(adjustedDelay); - trySpawningGlobalApplication(); - return; + let appToRun = null; + let minInstances = null; + let appFromAppsToBeCheckedLater = false; + const appIndex = appsToBeCheckedLater.findIndex((app) => app.timeToCheck >= Date.now()); + if (appIndex >= 0) { + appToRun = appsToBeCheckedLater[appIndex].appName; + minInstances = appsToBeCheckedLater[appIndex].required; + appsToBeCheckedLater.splice(appIndex, 1); + appFromAppsToBeCheckedLater = true; + } else { + const myNodeLocation = nodeFullGeolocation(); + globalAppNamesLocation = globalAppNamesLocation.filter((app) => (app.geolocation.length === 0 || app.geolocation.find((loc) => `ac${myNodeLocation}`.startsWith(loc))) + || (app.nodes.length === 0 || app.nodes.find((ip) => ip === myIP))); + log.info(`Found ${globalAppNamesLocation.length} apps that are missing instances on the network and can be selected to try to spawn on my node.`); + // eslint-disable-next-line no-restricted-syntax + for (const appToRunAux of globalAppNamesLocation) { + if (!trySpawningGlobalAppCache.has(appToRunAux.name) && !appsToBeCheckedLater.includes((app) => app.appName === appToRunAux.name)) { + appToRun = appToRunAux.name; + minInstances = appToRunAux.required; + log.info(`Application ${appToRun} selected to try to spawn. Reported as been running in ${appToRunAux.actual} instances and ${appToRunAux.required} are required.`); + break; + } + } } - let appFromAppsToBeCheckedLater = false; - // no scoped applicaton, run some global app if (!appToRun) { - // pick a random one - const appToRunNumber = Math.floor((Math.random() * numberOfGlobalApps)); - appToRun = globalAppNamesLocation[appToRunNumber].name; - - // force switch to run a geo restricted app - if (appToRunNumber % 5 === 0) { // every 5th run we are forcing application instalation that is in the nodes geolocation, esnuring highly geolocated apps spawn fast enough - // get this node location - const myNodeLocation = nodeFullGeolocation(); - const appsInMyLocation = globalAppNamesLocation.filter((apps) => apps.geolocation && apps.geolocation.find((loc) => `ac${myNodeLocation}`.startsWith(loc))); - if (appsInMyLocation.length) { - const numberOfMyNodeGeoApps = appsInMyLocation.length; - const randomGeoAppNumber = Math.floor((Math.random() * numberOfMyNodeGeoApps)); - // install geo location restricted app instead - appToRun = appsInMyLocation[randomGeoAppNumber].name; - } - } else if (appToRunNumber % 9 === 0) { // we will be checking every few runs if there are apps on appsToBeCheckedLater to be installed that previously passed all checks but were prioritize to be installed on lower tier nodes - const appIndex = appsToBeCheckedLater.findIndex((app) => app.timeToCheck >= Date.now()); - if (appIndex >= 0) { - appToRun = appsToBeCheckedLater[appIndex].appName; - appsToBeCheckedLater.splice(appIndex, 1); - appFromAppsToBeCheckedLater = true; - } - } - } - - // Check if App was checked in the last 2 hours. - // This is a small help because random can be getting the same app over and over - if (trySpawningGlobalAppCache.has(appToRun)) { - log.info(`App ${appToRun} was already evaluated in the last 2 hours.`); - const delay = numberOfGlobalApps < 20 ? config.fluxapps.installation.delay * 1000 : config.fluxapps.installation.delay * 1000 * 0.1; - await serviceHelper.delay(delay); + log.info('No app currently to be processed'); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9132,7 +9131,7 @@ async function trySpawningGlobalApplication() { // check if app not running on this device if (runningAppList.find((document) => document.ip.includes(adjustedIP))) { log.info(`Application ${appToRun} is reported as already running on this Flux IP`); - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9143,7 +9142,7 @@ async function trySpawningGlobalApplication() { } if (runningApps.data.find((app) => app.Names[0].slice(5) === appToRun)) { log.info(`${appToRun} application is already running on this Flux`); - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9154,15 +9153,6 @@ async function trySpawningGlobalApplication() { throw new Error(`Specifications for application ${appToRun} were not found!`); } - // check if app is installed on the number of instances requested - let minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs - if (runningAppList.length >= minInstances) { - log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances`); - await serviceHelper.delay(adjustedDelay); - trySpawningGlobalApplication(); - return; - } - // eslint-disable-next-line no-restricted-syntax const dbopen = dbHelper.databaseConnection(); const appsDatabase = dbopen.db(config.database.appslocal.database); @@ -9180,7 +9170,7 @@ async function trySpawningGlobalApplication() { const appExists = apps.find((app) => app.name === appSpecifications.name); if (appExists) { // double checked in installation process. log.info(`Application ${appSpecifications.name} is already installed`); - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9213,17 +9203,17 @@ async function trySpawningGlobalApplication() { const portsPubliclyAvailable = await checkInstallingAppPortAvailable(appPorts); if (portsPubliclyAvailable === false) { log.error(`Some of application ports of ${appSpecifications.name} are not available publicly. Installation aborted.`); - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } // double check if app is installed on the number of instances requested runningAppList = await getRunningAppList(appToRun); - minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs if (runningAppList.length >= minInstances) { log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances`); - await serviceHelper.delay(adjustedDelay); + trySpawningGlobalAppCache.delete(appToRun); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9241,7 +9231,7 @@ async function trySpawningGlobalApplication() { const sameIpRangeNode = runningAppList.find((location) => location.ip.includes(myIpWithoutPort.substring(0, lastIndex))); if (sameIpRangeNode) { log.info(`Application ${appToRun} uses syncthing and it is already spawned on Fluxnode with same ip range`); - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9262,7 +9252,7 @@ async function trySpawningGlobalApplication() { if (component.repotag === componentToInstall.repotag && componentToInstall.repotag.startsWith('presearch/node')) { // applies to presearch specifically log.info(`${componentToInstall.repotag} Image is already running on this Flux`); // eslint-disable-next-line no-await-in-loop - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } @@ -9281,6 +9271,7 @@ async function trySpawningGlobalApplication() { const appToCheck = { timeToCheck: Date.now() + 1.5 * 60 * 60 * 1000, appName: appToRun, + required: minInstances, }; log.info(`App ${appToRun} specs are from cumulus, will check in 1.5h if instances are still missing`); appsToBeCheckedLater.push(appToCheck); @@ -9289,6 +9280,7 @@ async function trySpawningGlobalApplication() { const appToCheck = { timeToCheck: Date.now() + 1 * 60 * 60 * 1000, appName: appToRun, + required: minInstances, }; log.info(`App ${appToRun} specs are from nimbus, will check in 1h if instances are still missing`); appsToBeCheckedLater.push(appToCheck); @@ -9297,6 +9289,7 @@ async function trySpawningGlobalApplication() { const appToCheck = { timeToCheck: Date.now() + 0.75 * 60 * 60 * 1000, appName: appToRun, + required: minInstances, }; log.info(`App ${appToRun} specs are from cumulus, will check in 45m if instances are still missing`); appsToBeCheckedLater.push(appToCheck); @@ -9322,20 +9315,20 @@ async function trySpawningGlobalApplication() { await fluxCommunicationMessagesSender.broadcastMessageToOutgoing(appRemovedMessage); await serviceHelper.delay(500); await fluxCommunicationMessagesSender.broadcastMessageToIncoming(appRemovedMessage); - await serviceHelper.delay(adjustedDelay); + await serviceHelper.delay(5 * 60 * 1000); trySpawningGlobalApplication(); return; } // double check if app is installed in more of the instances requested runningAppList = await getRunningAppList(appToRun); - minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs if (runningAppList.length > minInstances) { log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances, will unninstall it`); + trySpawningGlobalAppCache.delete(appToRun); removeAppLocally(appSpecifications.name, null, true, null, true).catch((error) => log.error(error)); } - await serviceHelper.delay(10 * config.fluxapps.installation.delay * 1000); + await serviceHelper.delay(5 * 60 * 1000); log.info('Reinitiating possible app installation'); trySpawningGlobalApplication(); } catch (error) { diff --git a/ZelBack/src/services/dbHelper.js b/ZelBack/src/services/dbHelper.js index 643147f44..285f69466 100644 --- a/ZelBack/src/services/dbHelper.js +++ b/ZelBack/src/services/dbHelper.js @@ -85,6 +85,20 @@ async function findInDatabase(database, collection, query, projection) { return results; } +/** + * Returns array of documents from the DB based on pipeline aggregate. + * + * @param {string} database + * @param {string} collection + * @param {object} pipeline + * + * @returns array + */ +async function aggregateInDatabase(database, collection, pipeline) { + const results = await database.collection(collection).aggregate(pipeline).toArray(); + return results; +} + /** * Returns document from the DB based on the query and the projection. * @@ -266,4 +280,5 @@ module.exports = { collectionStats, closeDbConnection, insertManyToDatabase, + aggregateInDatabase, };