Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v5.37.0 #1430

Merged
merged 39 commits into from
Jan 15, 2025
Merged

v5.37.0 #1430

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4056eb9
check if daemon is synced if not stream api will return error
Cabecinha84 Jan 9, 2025
bad1341
adding timeout to axios calls
Cabecinha84 Jan 9, 2025
e6a09b2
create streamChainPreparation function that is responsable for checki…
Cabecinha84 Jan 9, 2025
8f0d0d4
Await result for Promise.race for explorer block count
MorningLightMountain713 Jan 9, 2025
c134940
Fix responses
MorningLightMountain713 Jan 9, 2025
3ef3adb
Add daemonStart variable to better control restarts
MorningLightMountain713 Jan 9, 2025
5d03ab5
Remove comments
MorningLightMountain713 Jan 9, 2025
18647b1
Move prepLock disable to end of timeout
MorningLightMountain713 Jan 9, 2025
ef55987
Move daemonStart to before services are stopped
MorningLightMountain713 Jan 9, 2025
f8b4912
Only start services in stream chain if necessary
MorningLightMountain713 Jan 9, 2025
ad90ae1
Move start daemon var after services have been stopped
MorningLightMountain713 Jan 9, 2025
921f7c4
build our own client to avoid cache
MorningLightMountain713 Jan 10, 2025
d078f27
Add error checking for non Arcane stop commands
MorningLightMountain713 Jan 10, 2025
ac4f67e
Fix missing check for stream chain timeout
MorningLightMountain713 Jan 10, 2025
aa0df0e
Adjust log message
MorningLightMountain713 Jan 10, 2025
2e54a3b
Update streamChain tests to use fluxdClient
MorningLightMountain713 Jan 10, 2025
60ffd72
Update fluxd path for Arcane
MorningLightMountain713 Jan 10, 2025
9881e08
fix adding return
Cabecinha84 Jan 13, 2025
3c38fec
check maintenance window on streamChainPreparation
Cabecinha84 Jan 13, 2025
6b92228
clear copy paste typo
Cabecinha84 Jan 13, 2025
4985a57
update considered maintenance window
Cabecinha84 Jan 14, 2025
7c4c41d
update checkAppSecrets
Cabecinha84 Jan 14, 2025
45e66ad
bump version
Cabecinha84 Jan 14, 2025
d43b70c
add logs
Cabecinha84 Jan 14, 2025
244aa83
fix
Cabecinha84 Jan 14, 2025
8ecd2f3
logs
Cabecinha84 Jan 14, 2025
44d4dbc
fix check
Cabecinha84 Jan 14, 2025
c0bf904
test
Cabecinha84 Jan 14, 2025
2330026
test
Cabecinha84 Jan 14, 2025
9a7ac9d
test
Cabecinha84 Jan 14, 2025
8b76740
test
Cabecinha84 Jan 14, 2025
6f55aab
fix test
Cabecinha84 Jan 14, 2025
3e87031
test
Cabecinha84 Jan 14, 2025
3c53b08
[FIX] checkAppSecrets
XK4MiLX Jan 14, 2025
55d8fdd
eslint fix
Cabecinha84 Jan 14, 2025
965f86e
[FIX] checkAppSecrets
XK4MiLX Jan 14, 2025
17da9fa
fix eslint
Cabecinha84 Jan 14, 2025
d44ae24
code clean up and optimizations
Cabecinha84 Jan 14, 2025
53c288d
simplify
Cabecinha84 Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ZelBack/src/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,9 @@ module.exports = (app) => {
app.post('/daemon/getaddressmempool', (req, res) => {
daemonServiceAddressRpcs.getAddressMempool(req, res);
});
app.get('/flux/streamchainpreparation', (req, res) => {
fluxService.streamChainPreparation(req, res);
});
app.post('/flux/streamchain', (req, res) => {
fluxService.streamChain(req, res);
});
Expand Down
76 changes: 44 additions & 32 deletions ZelBack/src/services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -4869,37 +4869,49 @@ async function getUserBlockedRepositores() {
/**
* Check secrets, if they are being used return exception
* @param {string} appName App name.
* @param {object} appSpecs App specifications.
* @param {boolean} registration informs if it's an app registration or not.
*/
async function checkAppSecrets(appName, appComponentSpecs, registration = false) {
* @param {object} appComponentSpecs App specifications.
* @param {string} appOwner owner Id of the app.
*/
async function checkAppSecrets(appName, appComponentSpecs, appOwner) {
// Normalize PGP secrets string
const normalizePGP = (pgpMessage) => {
if (!pgpMessage) return '';
return pgpMessage.replace(/\s+/g, '').replace(/\\n/g, '').trim();
};

const appComponentSecrets = normalizePGP(appComponentSpecs.secrets);

// Database connection
const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
const query = {};
const projection = { projection: { _id: 0 } };
const results = await dbHelper.findInDatabase(database, globalAppsInformation, query, projection);
let foundSecretsWithSameAppName = false;
let foundSecretsWithDifferentAppName = false;
// Query permanent app messages
const appsQuery = {
$and: [
{ 'appSpecifications.version': 7 },
{ 'appSpecifications.nodes': { $exists: true, $ne: [] } },
],
};

const permanentAppMessages = await dbHelper.findInDatabase(database, globalAppsMessages, appsQuery, projection);

const processedSecrets = new Set();
// eslint-disable-next-line no-restricted-syntax
for (const app of results) {
if (app.version >= 7 && app.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const component of app.compose) {
if (component.secrets.length > 0 && JSON.stringify(component.secrets.replace(/(\r\n|\n|\r)/gm, '').replace(/\\/g, '')) === JSON.stringify(appComponentSpecs.secrets.replace(/(\r\n|\n|\r)/gm, '').replace(/\\/g, ''))) {
if (registration) {
throw new Error(`Provided component ${component.name} secrets are not valid`);
} else if (app.name !== appName) {
foundSecretsWithDifferentAppName = true;
} else if (app.name === appName) {
foundSecretsWithSameAppName = true;
}
}
for (const message of permanentAppMessages) {
// eslint-disable-next-line no-restricted-syntax
for (const component of message.appSpecifications.compose.filter((comp) => comp.secrets)) {
const normalizedComponentSecret = normalizePGP(component.secrets);
// eslint-disable-next-line no-continue
if (processedSecrets.has(normalizedComponentSecret)) continue;
processedSecrets.add(normalizedComponentSecret);

if (normalizedComponentSecret === appComponentSecrets && message.appSpecifications.owner !== appOwner) {
throw new Error(
`Component '${appComponentSpecs.name}' secrets are not valid - registered already with different app owner').`,
);
}
}
}
if (!registration && foundSecretsWithDifferentAppName && !foundSecretsWithSameAppName) {
throw new Error('Provided component(s) secrets are not valid');
}
}

/**
Expand Down Expand Up @@ -7340,9 +7352,9 @@ async function registerAppGlobalyApi(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, true);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down Expand Up @@ -7472,9 +7484,9 @@ async function updateAppGlobalyApi(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, false);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down Expand Up @@ -10628,9 +10640,9 @@ async function verifyAppRegistrationParameters(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, true);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down Expand Up @@ -10685,9 +10697,9 @@ async function verifyAppUpdateParameters(req, res) {
if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) {
// eslint-disable-next-line no-restricted-syntax
for (const appComponent of appSpecFormatted.compose) {
if (appComponent.secrets.length > 0) {
if (appComponent.secrets) {
// eslint-disable-next-line no-await-in-loop
await checkAppSecrets(appSpecFormatted.name, appComponent, false);
await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions ZelBack/src/services/explorerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,8 @@ async function initiateBlockProcessor(restoreDatabase, deepRestore, reindexOrRes
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.name': 1 }, { name: 'query for getting app message based on zelapp specs name' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.owner': 1 }, { name: 'query for getting app message based on zelapp specs owner' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.repotag': 1 }, { name: 'query for getting app message based on image' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.version': 1 }, { name: 'query for getting app message based on version' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.nodes': 1 }, { name: 'query for getting app message based on nodes' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ name: 1 }, { name: 'query for getting zelapp based on zelapp specs name' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ owner: 1 }, { name: 'query for getting zelapp based on zelapp specs owner' });
await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ repotag: 1 }, { name: 'query for getting zelapp based on image' });
Expand Down
185 changes: 169 additions & 16 deletions ZelBack/src/services/fluxService.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ const isArcane = Boolean(process.env.FLUXOS_PATH);
*/
let lock = false;

/**
* Stream chain prep lock, so only one request at a time
*/
let prepLock = false;

/**
* If fluxd needs to be restarted
*/
let daemonStartRequired = false;

/**
* For testing
*/
Expand Down Expand Up @@ -1562,6 +1572,147 @@ async function restartFluxOS(req, res) {
res.json(response);
}

/*
* @param {Request} req HTTP request
* @param {Response} res HTTP response
* @returns {Promise<void>}
*/
async function streamChainPreparation(req, res) {
if (lock || prepLock) {
res.statusMessage = 'Streaming of chain already in progress, server busy.';
res.status(503).end();
return;
}

/**
* Use the remote address here, don't need to worry about x-forwarded-for headers as
* we only allow the local network. Also, using the remote address is fine as FluxOS
* won't confirm if the upstream is natting behind a private address. I.e public
* connections coming in via a private address. (Flux websockets need the remote address
* or they think there is only one inbound connnection)
*/
try {
prepLock = true;

let ip = req.socket.remoteAddress;
if (!ip) {
res.statusMessage = 'Socket closed.';
res.status(400).end();
return;
}

// convert from IPv4-mapped IPv6 address format to straight IPv4 (from socket)
ip = ip.replace(/^.*:/, ''); // this is greedy, so will remove ::ffff:

if (!serviceHelper.isPrivateAddress(ip)) {
res.statusMessage = 'Request must be from an address on the same private network as the host.';
res.status(403).end();
return;
}

log.info(`Stream chain preparation request received from: ${ip}`);

// Check if local daemon is synced
const urlExplorerA = 'https://explorer.runonflux.io/api/status?q=getInfo';
const urlExplorerB = 'https://explorer.flux.zelcore.io/api/status?q=getInfo';

const axiosConfig = {
timeout: 5000,
};

const { status: blockCountStatus, data: blockCount } = await daemonServiceBlockchainRpcs.getBlockCount();

if (blockCountStatus !== 'success') {
res.statusMessage = 'Error getting blockCount from local Flux Daemon.';
res.status(503).end();
return;
}

const explorerResponse = await Promise.race([
serviceHelper.axiosGet(urlExplorerA, axiosConfig),
serviceHelper.axiosGet(urlExplorerB, axiosConfig),
]).catch(() => null);

if (!explorerResponse || !explorerResponse?.data?.info?.blocks) {
res.statusMessage = 'Error getting Flux Explorer Height.';
res.status(503).end();
return;
}

if (blockCount + 5 < explorerResponse.data.info.blocks) {
res.statusMessage = 'Error local Daemon is not synced.';
res.status(503).end();
return;
}

const { status: fluxNodeStatus, data: fluxNodeInfo } = await daemonServiceFluxnodeRpcs.getFluxNodeStatus();

if (fluxNodeStatus !== 'success') {
res.statusMessage = 'Error getting fluxNodeStatus from local Flux Daemon.';
res.status(503).end();
return;
}

// check if it is outside maintenance window
if (fluxNodeInfo.status === 'CONFIRMED' && fluxNodeInfo.last_confirmed_height > 0 && (120 - (blockCount - fluxNodeInfo.last_confirmed_height)) < 8) {
// fluxnodes needs to confirm between 120 and 150 blocks, if it is 7 blocks remaining to enter confirmation window we already consider outside maintenance window, as this can take around 12 minutes.
res.statusMessage = 'Error Fluxnode is not in maintenance window.';
res.status(503).end();
return;
}

// on non Arcane, we check if the stop commands return successfully, as there is no guarantee that the
// node is running using zelcash or pm2 etc

// stop services
if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['stop', 'flux-watchdog.service', 'fluxd.service'] });
} else {
const { error: watchdogError } = await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['stop', 'watchdog'] });
if (watchdogError) {
res.statusMessage = 'Error: unable to stop watchdog';
res.status(503).end();
return;
}

log.info('pm2 watchdog service has been stopped');

const { error: zelcashError } = await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['stop', 'zelcash.service'] });

if (zelcashError) {
res.statusMessage = 'Error: unable to stop zelcash service';
res.status(503).end();
// if zelcash failed, it means watchdog was successful, we need to restart (no await)
serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
return;
}

log.info('zelcash service has been stopped');
}

daemonStartRequired = true;
const response = messageHelper.createSuccessMessage('Daemon stopped, you can start stream chain functionality');
res.json(response);
} finally {
// check if restart required
setTimeout(() => {
if (!lock && daemonStartRequired) {
daemonStartRequired = false;
log.info('Stream chain prep timeout hit: restarting services');
if (isArcane) {
serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] });
} else {
serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] });
serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
}
} else {
log.info('Stream chain prep timeout hit: services already restarted or stream in progress');
}
prepLock = false;
}, 30 * 1000);
}
}

/**
* Streams the blockchain via http at breakneck speeds.
*
Expand Down Expand Up @@ -1647,7 +1798,7 @@ async function streamChain(req, res) {
log.info(`Stream chain request received from: ${ip}`);

const homeDir = os.homedir();
const base = path.join(homeDir, '.flux');
const base = process.env.FLUXD_PATH || path.join(homeDir, '.flux');

// the order can matter when doing the stream live, the level db's can be volatile
const folders = [
Expand All @@ -1669,7 +1820,7 @@ async function streamChain(req, res) {
const chainExists = foldersExist.every((x) => x);

if (!chainExists) {
res.statusMessage = 'Unable to find chain at $HOME/.flux';
res.statusMessage = 'Unable to find chain';
res.status(500).end();
return;
}
Expand All @@ -1690,17 +1841,13 @@ async function streamChain(req, res) {
res.status(422).end();
return;
}
// stop services
if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['stop', 'flux-watchdog.service', 'fluxd.service'] });
} else {
await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['stop', 'zelcash.service'] });
await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['stop', 'watchdog'] });
}

if (safe) {
const blockInfoRes = await daemonServiceBlockchainRpcs.getBlockchainInfo();
fluxdRunning = !(blockInfoRes.status === 'error' && blockInfoRes.data.code === 'ECONNREFUSED');
// we have to build the client here so that we can avoid the cache. We should have an option for he
// blockChainRpcs thing to skip cache
const fluxdClient = await daemonServiceUtils.buildFluxdClient();
const blockCountRes = await fluxdClient.run('getBlockCount', { params: [] }).catch((err) => err);
fluxdRunning = !(blockCountRes instanceof Error && blockCountRes.code === 'ECONNREFUSED');
}

if (safe && fluxdRunning) {
Expand Down Expand Up @@ -1750,12 +1897,17 @@ async function streamChain(req, res) {
log.error(error);
} finally {
// start services
if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] });
} else {
await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] });
await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
if (daemonStartRequired) {
daemonStartRequired = false;

if (isArcane) {
await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] });
} else {
await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] });
await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] });
}
}

lock = false;
}
}
Expand Down Expand Up @@ -1805,6 +1957,7 @@ module.exports = {
softUpdateFluxInstall,
startBenchmark,
startDaemon,
streamChainPreparation,
streamChain,
tailBenchmarkDebug,
tailDaemonDebug,
Expand Down
2 changes: 2 additions & 0 deletions ZelBack/src/services/serviceManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ async function startFluxFunctions() {
log.info('Temporary database prepared');
log.info('Preparing Flux Apps locations');
await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).dropIndex({ hash: 1 }, { name: 'query for getting zelapp message based on hash' }).catch(() => { console.log('Welcome to FluxOS'); }); // drop old index or display message for new installations
await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.version': 1 }, { name: 'query for getting app message based on version' });
await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.nodes': 1 }, { name: 'query for getting app message based on nodes' });
// more than 2 hours and 5m. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down.
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 7500 });
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ name: 1 }, { name: 'query for getting zelapp location based on zelapp specs name' });
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "flux",
"version": "5.36.0",
"version": "5.37.0",
"description": "Flux, Your Gateway to a Decentralized World",
"repository": {
"type": "git",
Expand Down
Loading