Skip to content

Commit

Permalink
Merge pull request #1430 from RunOnFlux/development
Browse files Browse the repository at this point in the history
v5.37.0
  • Loading branch information
TheTrunk authored Jan 15, 2025
2 parents 5f84890 + 53c288d commit 29571e0
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 57 deletions.
3 changes: 3 additions & 0 deletions ZelBack/src/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,9 @@ module.exports = (app) => {
app.post('/daemon/getaddressmempool', (req, res) => {
daemonServiceAddressRpcs.getAddressMempool(req, res);
});
app.get('/flux/streamchainpreparation', (req, res) => {
fluxService.streamChainPreparation(req, res);
});
app.post('/flux/streamchain', (req, res) => {
fluxService.streamChain(req, res);
});
Expand Down
76 changes: 44 additions & 32 deletions ZelBack/src/services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -4869,37 +4869,49 @@ async function getUserBlockedRepositores() {
/**
* Check secrets, if they are being used return exception
* @param {string} appName App name.
* @param {object} appSpecs App specifications.
* @param {boolean} registration informs if it's an app registration or not.
*/
async function checkAppSecrets(appName, appComponentSpecs, registration = false) {
* @param {object} appComponentSpecs App specifications.
* @param {string} appOwner owner Id of the app.
*/
async function checkAppSecrets(appName, appComponentSpecs, appOwner) {
// Normalize PGP secrets string
const normalizePGP = (pgpMessage) => {
if (!pgpMessage) return '';
return pgpMessage.replace(/\s+/g, '').replace(/\\n/g, '').trim();
};

const appComponentSecrets = normalizePGP(appComponentSpecs.secrets);

// Database connection
const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
const query = {};
const projection = { projection: { _id: 0 } };
const results = await dbHelper.findInDatabase(database, globalAppsInformation, query, projection);
let foundSecretsWithSameAppName = false;
let foundSecretsWithDifferentAppName = false;
// Query permanent app messages
const appsQuery = {
$and: [
{ 'appSpecifications.version': 7 },
{ 'appSpecifications.nodes': { $exists: true, $ne: [] } },
],
};

const permanentAppMessages = await dbHelper.findInDatabase(database, globalAppsMessages, appsQuery, projection);

const processedSecrets = new Set();
// eslint-disable-next-line no-restricted-syntax
for (const app of results) {
if (app.version >= 7 && app.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const component of app.compose) {
if (component.secrets.length > 0 && JSON.stringify(component.secrets.replace(/(\r\n|\n|\r)/gm, '').replace(/\\/g, '')) === JSON.stringify(appComponentSpecs.secrets.replace(/(\r\n|\n|\r)/gm, '').replace(/\\/g, ''))) {
if (registration) {
throw new Error(`Provided component ${component.name} secrets are not valid`);
} else if (app.name !== appName) {
foundSecretsWithDifferentAppName = true;
} else if (app.name === appName) {
foundSecretsWithSameAppName = true;
}
}
for (const message of permanentAppMessages) {
// eslint-disable-next-line no-restricted-syntax
for (const component of message.appSpecifications.compose.filter((comp) => comp.secrets)) {
const normalizedComponentSecret = normalizePGP(component.secrets);
// eslint-disable-next-line no-continue
if (processedSecrets.has(normalizedComponentSecret)) continue;
processedSecrets.add(normalizedComponentSecret);

if (normalizedComponentSecret === appComponentSecrets && message.appSpecifications.owner !== appOwner) {
throw new Error(
`Component '${appComponentSpecs.name}' secrets are not valid - registered already with different app owner').`,
);
}
}
}
if (!registration && foundSecretsWithDifferentAppName && !foundSecretsWithSameAppName) {
throw new Error('Provided component(s) secrets are not valid');
}
}

/**
Expand Down Expand Up @@ -7340,9 +7352,9 @@ async function registerAppGlobalyApi(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, true);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down Expand Up @@ -7472,9 +7484,9 @@ async function updateAppGlobalyApi(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, false);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down Expand Up @@ -10628,9 +10640,9 @@ async function verifyAppRegistrationParameters(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, true);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down Expand Up @@ -10685,9 +10697,9 @@ async function verifyAppUpdateParameters(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, false);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions ZelBack/src/services/explorerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,8 @@ async function initiateBlockProcessor(restoreDatabase, deepRestore, reindexOrRes
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.name': 1 }, { name: 'query for getting app message based on zelapp specs name' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.owner': 1 }, { name: 'query for getting app message based on zelapp specs owner' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.repotag': 1 }, { name: 'query for getting app message based on image' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.version': 1 }, { name: 'query for getting app message based on version' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.nodes': 1 }, { name: 'query for getting app message based on nodes' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ name: 1 }, { name: 'query for getting zelapp based on zelapp specs name' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ owner: 1 }, { name: 'query for getting zelapp based on zelapp specs owner' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ repotag: 1 }, { name: 'query for getting zelapp based on image' });
Expand Down
185 changes: 169 additions & 16 deletions ZelBack/src/services/fluxService.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ const isArcane = Boolean(process.env.FLUXOS_PATH);
*/
let lock = false;

/**
* Stream chain prep lock, so only one request at a time
*/
let prepLock = false;

/**
* If fluxd needs to be restarted
*/
let daemonStartRequired = false;

/**
* For testing
*/
Expand Down Expand Up @@ -1562,6 +1572,147 @@ async function restartFluxOS(req, res) {
res.json(response);
}

/*
* @param {Request} req HTTP request
* @param {Response} res HTTP response
* @returns {Promise<void>}
*/
async function streamChainPreparation(req, res) {
if (lock || prepLock) {
res.statusMessage = 'Streaming of chain already in progress, server busy.';
res.status(503).end();
return;
}

/**
* Use the remote address here, don't need to worry about x-forwarded-for headers as
* we only allow the local network. Also, using the remote address is fine as FluxOS
* won't confirm if the upstream is natting behind a private address. I.e public
* connections coming in via a private address. (Flux websockets need the remote address
* or they think there is only one inbound connnection)
*/
try {
prepLock = true;

let ip = req.socket.remoteAddress;
if (!ip) {
res.statusMessage = 'Socket closed.';
res.status(400).end();
return;
}

// convert from IPv4-mapped IPv6 address format to straight IPv4 (from socket)
ip = ip.replace(/^.*:/, ''); // this is greedy, so will remove ::ffff:

if (!serviceHelper.isPrivateAddress(ip)) {
res.statusMessage = 'Request must be from an address on the same private network as the host.';
res.status(403).end();
return;
}

log.info(`Stream chain preparation request received from: ${ip}`);

// Check if local daemon is synced
const urlExplorerA = 'https://explorer.runonflux.io/api/status?q=getInfo';
const urlExplorerB = 'https://explorer.flux.zelcore.io/api/status?q=getInfo';

const axiosConfig = {
timeout: 5000,
};

const { status: blockCountStatus, data: blockCount } = await daemonServiceBlockchainRpcs.getBlockCount();

if (blockCountStatus !== 'success') {
res.statusMessage = 'Error getting blockCount from local Flux Daemon.';
res.status(503).end();
return;
}

const explorerResponse = await Promise.race([
serviceHelper.axiosGet(urlExplorerA, axiosConfig),
serviceHelper.axiosGet(urlExplorerB, axiosConfig),
]).catch(() => null);

if (!explorerResponse || !explorerResponse?.data?.info?.blocks) {
res.statusMessage = 'Error getting Flux Explorer Height.';
res.status(503).end();
return;
}

if (blockCount + 5 < explorerResponse.data.info.blocks) {
res.statusMessage = 'Error local Daemon is not synced.';
res.status(503).end();
return;
}

const { status: fluxNodeStatus, data: fluxNodeInfo } = await daemonServiceFluxnodeRpcs.getFluxNodeStatus();

if (fluxNodeStatus !== 'success') {
res.statusMessage = 'Error getting fluxNodeStatus from local Flux Daemon.';
res.status(503).end();
return;
}

// check if it is outside maintenance window
if (fluxNodeInfo.status === 'CONFIRMED' && fluxNodeInfo.last_confirmed_height > 0 && (120 - (blockCount - fluxNodeInfo.last_confirmed_height)) < 8) {
// fluxnodes needs to confirm between 120 and 150 blocks, if it is 7 blocks remaining to enter confirmation window we already consider outside maintenance window, as this can take around 12 minutes.
res.statusMessage = 'Error Fluxnode is not in maintenance window.';
res.status(503).end();
return;
}

// on non Arcane, we check if the stop commands return successfully, as there is no guarantee that the
// node is running using zelcash or pm2 etc

// stop services
if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['stop', 'flux-watchdog.service', 'fluxd.service'] });
} else {
const { error: watchdogError } = await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['stop', 'watchdog'] });
if (watchdogError) {
res.statusMessage = 'Error: unable to stop watchdog';
res.status(503).end();
return;
}

log.info('pm2 watchdog service has been stopped');

const { error: zelcashError } = await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['stop', 'zelcash.service'] });

if (zelcashError) {
res.statusMessage = 'Error: unable to stop zelcash service';
res.status(503).end();
// if zelcash failed, it means watchdog was successful, we need to restart (no await)
serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
return;
}

log.info('zelcash service has been stopped');
}

daemonStartRequired = true;
const response = messageHelper.createSuccessMessage('Daemon stopped, you can start stream chain functionality');
res.json(response);
} finally {
// check if restart required
setTimeout(() => {
if (!lock && daemonStartRequired) {
daemonStartRequired = false;
log.info('Stream chain prep timeout hit: restarting services');
if (isArcane) {
serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] });
} else {
serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] });
serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
}
} else {
log.info('Stream chain prep timeout hit: services already restarted or stream in progress');
}
prepLock = false;
}, 30 * 1000);
}
}

/**
* Streams the blockchain via http at breakneck speeds.
*
Expand Down Expand Up @@ -1647,7 +1798,7 @@ async function streamChain(req, res) {
log.info(`Stream chain request received from: ${ip}`);

const homeDir = os.homedir();
const base = path.join(homeDir, '.flux');
const base = process.env.FLUXD_PATH || path.join(homeDir, '.flux');

// the order can matter when doing the stream live, the level db's can be volatile
const folders = [
Expand All @@ -1669,7 +1820,7 @@ async function streamChain(req, res) {
const chainExists = foldersExist.every((x) => x);

if (!chainExists) {
res.statusMessage = 'Unable to find chain at $HOME/.flux';
res.statusMessage = 'Unable to find chain';
res.status(500).end();
return;
}
Expand All @@ -1690,17 +1841,13 @@ async function streamChain(req, res) {
res.status(422).end();
return;
}
// stop services
if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['stop', 'flux-watchdog.service', 'fluxd.service'] });
} else {
await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['stop', 'zelcash.service'] });
await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['stop', 'watchdog'] });
}

if (safe) {
const blockInfoRes = await daemonServiceBlockchainRpcs.getBlockchainInfo();
fluxdRunning = !(blockInfoRes.status === 'error' && blockInfoRes.data.code === 'ECONNREFUSED');
// we have to build the client here so that we can avoid the cache. We should have an option for he
// blockChainRpcs thing to skip cache
const fluxdClient = await daemonServiceUtils.buildFluxdClient();
const blockCountRes = await fluxdClient.run('getBlockCount', { params: [] }).catch((err) => err);
fluxdRunning = !(blockCountRes instanceof Error && blockCountRes.code === 'ECONNREFUSED');
}

if (safe && fluxdRunning) {
Expand Down Expand Up @@ -1750,12 +1897,17 @@ async function streamChain(req, res) {
log.error(error);
} finally {
// start services
if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] });
} else {
await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] });
await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
if (daemonStartRequired) {
daemonStartRequired = false;

if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] });
} else {
await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] });
await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
}
}

lock = false;
}
}
Expand Down Expand Up @@ -1805,6 +1957,7 @@ module.exports = {
softUpdateFluxInstall,
startBenchmark,
startDaemon,
streamChainPreparation,
streamChain,
tailBenchmarkDebug,
tailDaemonDebug,
Expand Down
2 changes: 2 additions & 0 deletions ZelBack/src/services/serviceManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ async function startFluxFunctions() {
log.info('Temporary database prepared');
log.info('Preparing Flux Apps locations');
await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).dropIndex({ hash: 1 }, { name: 'query for getting zelapp message based on hash' }).catch(() => { console.log('Welcome to FluxOS'); }); // drop old index or display message for new installations
await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.version': 1 }, { name: 'query for getting app message based on version' });
await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.nodes': 1 }, { name: 'query for getting app message based on nodes' });
// more than 2 hours and 5m. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down.
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 7500 });
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ name: 1 }, { name: 'query for getting zelapp location based on zelapp specs name' });
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "flux",
"version": "5.36.0",
"version": "5.37.0",
"description": "Flux, Your Gateway to a Decentralized World",
"repository": {
"type": "git",
Expand Down
Loading

0 comments on commit 29571e0

Please sign in to comment.