Skip to content

Commit

Permalink
Refactor trySpawningGlobalApplication (#1421)
Browse files Browse the repository at this point in the history
* test aggregate function

* fix

* refactor testTrySpawningGlobalApplication, no more random stuff

* improvements

* fix typo

* Fix plus logs
  • Loading branch information
Cabecinha84 authored Nov 15, 2024
1 parent be386d2 commit 802538b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 87 deletions.
167 changes: 80 additions & 87 deletions ZelBack/src/services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ testingAppserver = httpShutdown(testingAppserver);

const GlobalAppsSpawnLRUoptions = {
max: 2000,
ttl: 1000 * 60 * 60 * 2, // 2 hours
maxAge: 1000 * 60 * 60 * 2, // 2 hours
ttl: 1000 * 60 * 60 * 6, // 6 hours
maxAge: 1000 * 60 * 60 * 6, // 6 hours
};
const shortCache = {
max: 500,
Expand Down Expand Up @@ -9045,81 +9045,80 @@ async function trySpawningGlobalApplication() {
throw new Error('Unable to detect Flux IP address');
}

// get all the applications list names
const globalAppNamesLocation = await getAllGlobalApplications(['name', 'geolocation', 'nodes']);
// get all the applications list names missing instances
const pipeline = [
{
$lookup: {
from: 'zelappslocation',
localField: 'name',
foreignField: 'name',
as: 'locations',
},
},
{
$addFields: {
actual: { $size: '$locations.name' },
},
},
{
$match: {
$expr: { $lt: ['$actual', { $ifNull: ['$instances', 3] }] },
},
},
{
$project: {
_id: 0,
name: '$name',
actual: '$actual',
required: '$instances',
nodes: { $ifNull: ['$nodes', []] },
geolocation: { $ifNull: ['$geolocation', []] },
},
},
{ $sort: { name: 1 } },
];

const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
log.info('Checking for apps that are missing instances on the network.');
let globalAppNamesLocation = await dbHelper.aggregateInDatabase(database, globalAppsInformation, pipeline);
const numberOfGlobalApps = globalAppNamesLocation.length;
if (!numberOfGlobalApps) {
log.info('No installable application found');
await serviceHelper.delay(config.fluxapps.installation.delay * 1000);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
log.info(`Found ${numberOfGlobalApps} that are missing instances on the network.`);

const installDelay = config.fluxapps.installation.delay * 1000;
let probLn = Math.log(2 + numberOfGlobalApps); // from ln(2) -> ln(2 + x)
const adjustedDelay = installDelay / probLn;

let appToRun;
// highly favor application that is targetting our node
// get my collateral
const myCollateral = await generalService.obtainNodeCollateralInformation();
// get my ip address
// filter apps only those that include my ip or my collateral
const scopedApps = globalAppNamesLocation.filter((app) => app.nodes && (app.nodes.includes(myIP) || app.nodes.includes(`${myCollateral.txhash}:${myCollateral.txindex}`)));
const scopedAppsToRun = scopedApps.filter((app) => !trySpawningGlobalAppCache.has(app.name));
// check if this app was already evaluated
const numberOfScopedAppsToRun = scopedAppsToRun.length;
if (numberOfScopedAppsToRun) { // some app should be prioritized on our node
const appToRunNumber = Math.floor((Math.random() * numberOfScopedAppsToRun));
appToRun = scopedAppsToRun[appToRunNumber].name;
}

if (appToRun) { // ensure higher rate spawning for scoped apps
probLn *= 2;
}
// if all ok Check hashes comparison if its out turn to start the app. higher than 1% probability.
const randomNumber = Math.floor((Math.random() * (config.fluxapps.installation.probability / probLn))); // higher probability for more apps on network
if (randomNumber !== 0) {
log.info('Other Flux nodes are evaluating application installation');
await serviceHelper.delay(adjustedDelay);
trySpawningGlobalApplication();
return;
let appToRun = null;
let minInstances = null;
let appFromAppsToBeCheckedLater = false;
const appIndex = appsToBeCheckedLater.findIndex((app) => app.timeToCheck >= Date.now());
if (appIndex >= 0) {
appToRun = appsToBeCheckedLater[appIndex].appName;
minInstances = appsToBeCheckedLater[appIndex].required;
appsToBeCheckedLater.splice(appIndex, 1);
appFromAppsToBeCheckedLater = true;
} else {
const myNodeLocation = nodeFullGeolocation();
globalAppNamesLocation = globalAppNamesLocation.filter((app) => (app.geolocation.length === 0 || app.geolocation.find((loc) => `ac${myNodeLocation}`.startsWith(loc)))
|| (app.nodes.length === 0 || app.nodes.find((ip) => ip === myIP)));
log.info(`Found ${globalAppNamesLocation.length} apps that are missing instances on the network and can be selected to try to spawn on my node.`);
// eslint-disable-next-line no-restricted-syntax
for (const appToRunAux of globalAppNamesLocation) {
if (!trySpawningGlobalAppCache.has(appToRunAux.name) && !appsToBeCheckedLater.includes((app) => app.appName === appToRunAux.name)) {
appToRun = appToRunAux.name;
minInstances = appToRunAux.required;
log.info(`Application ${appToRun} selected to try to spawn. Reported as been running in ${appToRunAux.actual} instances and ${appToRunAux.required} are required.`);
break;
}
}
}

let appFromAppsToBeCheckedLater = false;
// no scoped applicaton, run some global app
if (!appToRun) {
// pick a random one
const appToRunNumber = Math.floor((Math.random() * numberOfGlobalApps));
appToRun = globalAppNamesLocation[appToRunNumber].name;

// force switch to run a geo restricted app
if (appToRunNumber % 5 === 0) { // every 5th run we are forcing application instalation that is in the nodes geolocation, esnuring highly geolocated apps spawn fast enough
// get this node location
const myNodeLocation = nodeFullGeolocation();
const appsInMyLocation = globalAppNamesLocation.filter((apps) => apps.geolocation && apps.geolocation.find((loc) => `ac${myNodeLocation}`.startsWith(loc)));
if (appsInMyLocation.length) {
const numberOfMyNodeGeoApps = appsInMyLocation.length;
const randomGeoAppNumber = Math.floor((Math.random() * numberOfMyNodeGeoApps));
// install geo location restricted app instead
appToRun = appsInMyLocation[randomGeoAppNumber].name;
}
} else if (appToRunNumber % 9 === 0) { // we will be checking every few runs if there are apps on appsToBeCheckedLater to be installed that previously passed all checks but were prioritize to be installed on lower tier nodes
const appIndex = appsToBeCheckedLater.findIndex((app) => app.timeToCheck >= Date.now());
if (appIndex >= 0) {
appToRun = appsToBeCheckedLater[appIndex].appName;
appsToBeCheckedLater.splice(appIndex, 1);
appFromAppsToBeCheckedLater = true;
}
}
}

// Check if App was checked in the last 2 hours.
// This is a small help because random can be getting the same app over and over
if (trySpawningGlobalAppCache.has(appToRun)) {
log.info(`App ${appToRun} was already evaluated in the last 2 hours.`);
const delay = numberOfGlobalApps < 20 ? config.fluxapps.installation.delay * 1000 : config.fluxapps.installation.delay * 1000 * 0.1;
await serviceHelper.delay(delay);
log.info('No app currently to be processed');
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9132,7 +9131,7 @@ async function trySpawningGlobalApplication() {
// check if app not running on this device
if (runningAppList.find((document) => document.ip.includes(adjustedIP))) {
log.info(`Application ${appToRun} is reported as already running on this Flux IP`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9143,7 +9142,7 @@ async function trySpawningGlobalApplication() {
}
if (runningApps.data.find((app) => app.Names[0].slice(5) === appToRun)) {
log.info(`${appToRun} application is already running on this Flux`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9154,15 +9153,6 @@ async function trySpawningGlobalApplication() {
throw new Error(`Specifications for application ${appToRun} were not found!`);
}

// check if app is installed on the number of instances requested
let minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length >= minInstances) {
log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances`);
await serviceHelper.delay(adjustedDelay);
trySpawningGlobalApplication();
return;
}

// eslint-disable-next-line no-restricted-syntax
const dbopen = dbHelper.databaseConnection();
const appsDatabase = dbopen.db(config.database.appslocal.database);
Expand All @@ -9180,7 +9170,7 @@ async function trySpawningGlobalApplication() {
const appExists = apps.find((app) => app.name === appSpecifications.name);
if (appExists) { // double checked in installation process.
log.info(`Application ${appSpecifications.name} is already installed`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand Down Expand Up @@ -9213,17 +9203,17 @@ async function trySpawningGlobalApplication() {
const portsPubliclyAvailable = await checkInstallingAppPortAvailable(appPorts);
if (portsPubliclyAvailable === false) {
log.error(`Some of application ports of ${appSpecifications.name} are not available publicly. Installation aborted.`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}

// double check if app is installed on the number of instances requested
runningAppList = await getRunningAppList(appToRun);
minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length >= minInstances) {
log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances`);
await serviceHelper.delay(adjustedDelay);
trySpawningGlobalAppCache.delete(appToRun);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9241,7 +9231,7 @@ async function trySpawningGlobalApplication() {
const sameIpRangeNode = runningAppList.find((location) => location.ip.includes(myIpWithoutPort.substring(0, lastIndex)));
if (sameIpRangeNode) {
log.info(`Application ${appToRun} uses syncthing and it is already spawned on Fluxnode with same ip range`);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9262,7 +9252,7 @@ async function trySpawningGlobalApplication() {
if (component.repotag === componentToInstall.repotag && componentToInstall.repotag.startsWith('presearch/node')) { // applies to presearch specifically
log.info(`${componentToInstall.repotag} Image is already running on this Flux`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}
Expand All @@ -9281,6 +9271,7 @@ async function trySpawningGlobalApplication() {
const appToCheck = {
timeToCheck: Date.now() + 1.5 * 60 * 60 * 1000,
appName: appToRun,
required: minInstances,
};
log.info(`App ${appToRun} specs are from cumulus, will check in 1.5h if instances are still missing`);
appsToBeCheckedLater.push(appToCheck);
Expand All @@ -9289,6 +9280,7 @@ async function trySpawningGlobalApplication() {
const appToCheck = {
timeToCheck: Date.now() + 1 * 60 * 60 * 1000,
appName: appToRun,
required: minInstances,
};
log.info(`App ${appToRun} specs are from nimbus, will check in 1h if instances are still missing`);
appsToBeCheckedLater.push(appToCheck);
Expand All @@ -9297,6 +9289,7 @@ async function trySpawningGlobalApplication() {
const appToCheck = {
timeToCheck: Date.now() + 0.75 * 60 * 60 * 1000,
appName: appToRun,
required: minInstances,
};
log.info(`App ${appToRun} specs are from cumulus, will check in 45m if instances are still missing`);
appsToBeCheckedLater.push(appToCheck);
Expand All @@ -9322,20 +9315,20 @@ async function trySpawningGlobalApplication() {
await fluxCommunicationMessagesSender.broadcastMessageToOutgoing(appRemovedMessage);
await serviceHelper.delay(500);
await fluxCommunicationMessagesSender.broadcastMessageToIncoming(appRemovedMessage);
await serviceHelper.delay(adjustedDelay);
await serviceHelper.delay(5 * 60 * 1000);
trySpawningGlobalApplication();
return;
}

// double check if app is installed in more of the instances requested
runningAppList = await getRunningAppList(appToRun);
minInstances = appSpecifications.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length > minInstances) {
log.info(`Application ${appToRun} is already spawned on ${runningAppList.length} instances, will unninstall it`);
trySpawningGlobalAppCache.delete(appToRun);
removeAppLocally(appSpecifications.name, null, true, null, true).catch((error) => log.error(error));
}

await serviceHelper.delay(10 * config.fluxapps.installation.delay * 1000);
await serviceHelper.delay(5 * 60 * 1000);
log.info('Reinitiating possible app installation');
trySpawningGlobalApplication();
} catch (error) {
Expand Down
15 changes: 15 additions & 0 deletions ZelBack/src/services/dbHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ async function findInDatabase(database, collection, query, projection) {
return results;
}

/**
* Returns array of documents from the DB based on pipeline aggregate.
*
* @param {string} database
* @param {string} collection
* @param {object} pipeline
*
* @returns array
*/
async function aggregateInDatabase(database, collection, pipeline) {
const results = await database.collection(collection).aggregate(pipeline).toArray();
return results;
}

/**
* Returns document from the DB based on the query and the projection.
*
Expand Down Expand Up @@ -266,4 +280,5 @@ module.exports = {
collectionStats,
closeDbConnection,
insertManyToDatabase,
aggregateInDatabase,
};

0 comments on commit 802538b

Please sign in to comment.