diff --git a/ZelBack/src/routes.js b/ZelBack/src/routes.js index 3f80f0a93..99eba7fa1 100644 --- a/ZelBack/src/routes.js +++ b/ZelBack/src/routes.js @@ -1221,6 +1221,9 @@ module.exports = (app) => { app.post('/daemon/getaddressmempool', (req, res) => { daemonServiceAddressRpcs.getAddressMempool(req, res); }); + app.get('/flux/streamchainpreparation', (req, res) => { + fluxService.streamChainPreparation(req, res); + }); app.post('/flux/streamchain', (req, res) => { fluxService.streamChain(req, res); }); diff --git a/ZelBack/src/services/appsService.js b/ZelBack/src/services/appsService.js index 97482f627..67d06012f 100644 --- a/ZelBack/src/services/appsService.js +++ b/ZelBack/src/services/appsService.js @@ -4869,37 +4869,49 @@ async function getUserBlockedRepositores() { /** * Check secrets, if they are being used return exception * @param {string} appName App name. - * @param {object} appSpecs App specifications. - * @param {boolean} registration informs if it's an app registration or not. - */ -async function checkAppSecrets(appName, appComponentSpecs, registration = false) { + * @param {object} appComponentSpecs App specifications. + * @param {string} appOwner owner Id of the app. + */ +async function checkAppSecrets(appName, appComponentSpecs, appOwner) { + // Normalize PGP secrets string + const normalizePGP = (pgpMessage) => { + if (!pgpMessage) return ''; + return pgpMessage.replace(/\s+/g, '').replace(/\\n/g, '').trim(); + }; + + const appComponentSecrets = normalizePGP(appComponentSpecs.secrets); + + // Database connection const db = dbHelper.databaseConnection(); const database = db.db(config.database.appsglobal.database); - const query = {}; const projection = { projection: { _id: 0 } }; - const results = await dbHelper.findInDatabase(database, globalAppsInformation, query, projection); - let foundSecretsWithSameAppName = false; - let foundSecretsWithDifferentAppName = false; + // Query permanent app messages + const appsQuery = { + $and: [ + { 'appSpecifications.version': 7 }, + { 'appSpecifications.nodes': { $exists: true, $ne: [] } }, + ], + }; + + const permanentAppMessages = await dbHelper.findInDatabase(database, globalAppsMessages, appsQuery, projection); + + const processedSecrets = new Set(); // eslint-disable-next-line no-restricted-syntax - for (const app of results) { - if (app.version >= 7 && app.nodes.length > 0) { - // eslint-disable-next-line no-restricted-syntax - for (const component of app.compose) { - if (component.secrets.length > 0 && JSON.stringify(component.secrets.replace(/(\r\n|\n|\r)/gm, '').replace(/\\/g, '')) === JSON.stringify(appComponentSpecs.secrets.replace(/(\r\n|\n|\r)/gm, '').replace(/\\/g, ''))) { - if (registration) { - throw new Error(`Provided component ${component.name} secrets are not valid`); - } else if (app.name !== appName) { - foundSecretsWithDifferentAppName = true; - } else if (app.name === appName) { - foundSecretsWithSameAppName = true; - } - } + for (const message of permanentAppMessages) { + // eslint-disable-next-line no-restricted-syntax + for (const component of message.appSpecifications.compose.filter((comp) => comp.secrets)) { + const normalizedComponentSecret = normalizePGP(component.secrets); + // eslint-disable-next-line no-continue + if (processedSecrets.has(normalizedComponentSecret)) continue; + processedSecrets.add(normalizedComponentSecret); + + if (normalizedComponentSecret === appComponentSecrets && message.appSpecifications.owner !== appOwner) { + throw new Error( + `Component '${appComponentSpecs.name}' secrets are not valid - registered already with different app owner').`, + ); } } } - if (!registration && foundSecretsWithDifferentAppName && !foundSecretsWithSameAppName) { - throw new Error('Provided component(s) secrets are not valid'); - } } /** @@ -7340,9 +7352,9 @@ async function registerAppGlobalyApi(req, res) { if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) { // eslint-disable-next-line no-restricted-syntax for (const appComponent of appSpecFormatted.compose) { - if (appComponent.secrets.length > 0) { + if (appComponent.secrets) { // eslint-disable-next-line no-await-in-loop - await checkAppSecrets(appSpecFormatted.name, appComponent, true); + await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner); } } } @@ -7472,9 +7484,9 @@ async function updateAppGlobalyApi(req, res) { if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) { // eslint-disable-next-line no-restricted-syntax for (const appComponent of appSpecFormatted.compose) { - if (appComponent.secrets.length > 0) { + if (appComponent.secrets) { // eslint-disable-next-line no-await-in-loop - await checkAppSecrets(appSpecFormatted.name, appComponent, false); + await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner); } } } @@ -10628,9 +10640,9 @@ async function verifyAppRegistrationParameters(req, res) { if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) { // eslint-disable-next-line no-restricted-syntax for (const appComponent of appSpecFormatted.compose) { - if (appComponent.secrets.length > 0) { + if (appComponent.secrets) { // eslint-disable-next-line no-await-in-loop - await checkAppSecrets(appSpecFormatted.name, appComponent, true); + await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner); } } } @@ -10685,9 +10697,9 @@ async function verifyAppUpdateParameters(req, res) { if (appSpecFormatted.version >= 7 && appSpecFormatted.nodes.length > 0) { // eslint-disable-next-line no-restricted-syntax for (const appComponent of appSpecFormatted.compose) { - if (appComponent.secrets.length > 0) { + if (appComponent.secrets) { // eslint-disable-next-line no-await-in-loop - await checkAppSecrets(appSpecFormatted.name, appComponent, false); + await checkAppSecrets(appSpecFormatted.name, appComponent, appSpecFormatted.owner); } } } diff --git a/ZelBack/src/services/explorerService.js b/ZelBack/src/services/explorerService.js index 4b5b98ee7..606cf5c72 100644 --- a/ZelBack/src/services/explorerService.js +++ b/ZelBack/src/services/explorerService.js @@ -793,6 +793,8 @@ async function initiateBlockProcessor(restoreDatabase, deepRestore, reindexOrRes await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.name': 1 }, { name: 'query for getting app message based on zelapp specs name' }); await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.owner': 1 }, { name: 'query for getting app message based on zelapp specs owner' }); await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.repotag': 1 }, { name: 'query for getting app message based on image' }); + await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.version': 1 }, { name: 'query for getting app message based on version' }); + await databaseGlobal.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.nodes': 1 }, { name: 'query for getting app message based on nodes' }); await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ name: 1 }, { name: 'query for getting zelapp based on zelapp specs name' }); await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ owner: 1 }, { name: 'query for getting zelapp based on zelapp specs owner' }); await databaseGlobal.collection(config.database.appsglobal.collections.appsInformation).createIndex({ repotag: 1 }, { name: 'query for getting zelapp based on image' }); diff --git a/ZelBack/src/services/fluxService.js b/ZelBack/src/services/fluxService.js index 2199ba304..feb32eeda 100644 --- a/ZelBack/src/services/fluxService.js +++ b/ZelBack/src/services/fluxService.js @@ -39,6 +39,16 @@ const isArcane = Boolean(process.env.FLUXOS_PATH); */ let lock = false; +/** + * Stream chain prep lock, so only one request at a time + */ +let prepLock = false; + +/** + * If fluxd needs to be restarted + */ +let daemonStartRequired = false; + /** * For testing */ @@ -1562,6 +1572,147 @@ async function restartFluxOS(req, res) { res.json(response); } +/* +* @param {Request} req HTTP request +* @param {Response} res HTTP response +* @returns {Promise} +*/ +async function streamChainPreparation(req, res) { + if (lock || prepLock) { + res.statusMessage = 'Streaming of chain already in progress, server busy.'; + res.status(503).end(); + return; + } + + /** + * Use the remote address here, don't need to worry about x-forwarded-for headers as + * we only allow the local network. Also, using the remote address is fine as FluxOS + * won't confirm if the upstream is natting behind a private address. I.e public + * connections coming in via a private address. (Flux websockets need the remote address + * or they think there is only one inbound connnection) + */ + try { + prepLock = true; + + let ip = req.socket.remoteAddress; + if (!ip) { + res.statusMessage = 'Socket closed.'; + res.status(400).end(); + return; + } + + // convert from IPv4-mapped IPv6 address format to straight IPv4 (from socket) + ip = ip.replace(/^.*:/, ''); // this is greedy, so will remove ::ffff: + + if (!serviceHelper.isPrivateAddress(ip)) { + res.statusMessage = 'Request must be from an address on the same private network as the host.'; + res.status(403).end(); + return; + } + + log.info(`Stream chain preparation request received from: ${ip}`); + + // Check if local daemon is synced + const urlExplorerA = 'https://explorer.runonflux.io/api/status?q=getInfo'; + const urlExplorerB = 'https://explorer.flux.zelcore.io/api/status?q=getInfo'; + + const axiosConfig = { + timeout: 5000, + }; + + const { status: blockCountStatus, data: blockCount } = await daemonServiceBlockchainRpcs.getBlockCount(); + + if (blockCountStatus !== 'success') { + res.statusMessage = 'Error getting blockCount from local Flux Daemon.'; + res.status(503).end(); + return; + } + + const explorerResponse = await Promise.race([ + serviceHelper.axiosGet(urlExplorerA, axiosConfig), + serviceHelper.axiosGet(urlExplorerB, axiosConfig), + ]).catch(() => null); + + if (!explorerResponse || !explorerResponse?.data?.info?.blocks) { + res.statusMessage = 'Error getting Flux Explorer Height.'; + res.status(503).end(); + return; + } + + if (blockCount + 5 < explorerResponse.data.info.blocks) { + res.statusMessage = 'Error local Daemon is not synced.'; + res.status(503).end(); + return; + } + + const { status: fluxNodeStatus, data: fluxNodeInfo } = await daemonServiceFluxnodeRpcs.getFluxNodeStatus(); + + if (fluxNodeStatus !== 'success') { + res.statusMessage = 'Error getting fluxNodeStatus from local Flux Daemon.'; + res.status(503).end(); + return; + } + + // check if it is outside maintenance window + if (fluxNodeInfo.status === 'CONFIRMED' && fluxNodeInfo.last_confirmed_height > 0 && (120 - (blockCount - fluxNodeInfo.last_confirmed_height)) < 8) { + // fluxnodes needs to confirm between 120 and 150 blocks, if it is 7 blocks remaining to enter confirmation window we already consider outside maintenance window, as this can take around 12 minutes. + res.statusMessage = 'Error Fluxnode is not in maintenance window.'; + res.status(503).end(); + return; + } + + // on non Arcane, we check if the stop commands return successfully, as there is no guarantee that the + // node is running using zelcash or pm2 etc + + // stop services + if (isArcane) { + await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['stop', 'flux-watchdog.service', 'fluxd.service'] }); + } else { + const { error: watchdogError } = await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['stop', 'watchdog'] }); + if (watchdogError) { + res.statusMessage = 'Error: unable to stop watchdog'; + res.status(503).end(); + return; + } + + log.info('pm2 watchdog service has been stopped'); + + const { error: zelcashError } = await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['stop', 'zelcash.service'] }); + + if (zelcashError) { + res.statusMessage = 'Error: unable to stop zelcash service'; + res.status(503).end(); + // if zelcash failed, it means watchdog was successful, we need to restart (no await) + serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] }); + return; + } + + log.info('zelcash service has been stopped'); + } + + daemonStartRequired = true; + const response = messageHelper.createSuccessMessage('Daemon stopped, you can start stream chain functionality'); + res.json(response); + } finally { + // check if restart required + setTimeout(() => { + if (!lock && daemonStartRequired) { + daemonStartRequired = false; + log.info('Stream chain prep timeout hit: restarting services'); + if (isArcane) { + serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] }); + } else { + serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] }); + serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] }); + } + } else { + log.info('Stream chain prep timeout hit: services already restarted or stream in progress'); + } + prepLock = false; + }, 30 * 1000); + } +} + /** * Streams the blockchain via http at breakneck speeds. * @@ -1647,7 +1798,7 @@ async function streamChain(req, res) { log.info(`Stream chain request received from: ${ip}`); const homeDir = os.homedir(); - const base = path.join(homeDir, '.flux'); + const base = process.env.FLUXD_PATH || path.join(homeDir, '.flux'); // the order can matter when doing the stream live, the level db's can be volatile const folders = [ @@ -1669,7 +1820,7 @@ async function streamChain(req, res) { const chainExists = foldersExist.every((x) => x); if (!chainExists) { - res.statusMessage = 'Unable to find chain at $HOME/.flux'; + res.statusMessage = 'Unable to find chain'; res.status(500).end(); return; } @@ -1690,17 +1841,13 @@ async function streamChain(req, res) { res.status(422).end(); return; } - // stop services - if (isArcane) { - await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['stop', 'flux-watchdog.service', 'fluxd.service'] }); - } else { - await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['stop', 'zelcash.service'] }); - await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['stop', 'watchdog'] }); - } if (safe) { - const blockInfoRes = await daemonServiceBlockchainRpcs.getBlockchainInfo(); - fluxdRunning = !(blockInfoRes.status === 'error' && blockInfoRes.data.code === 'ECONNREFUSED'); + // we have to build the client here so that we can avoid the cache. We should have an option for he + // blockChainRpcs thing to skip cache + const fluxdClient = await daemonServiceUtils.buildFluxdClient(); + const blockCountRes = await fluxdClient.run('getBlockCount', { params: [] }).catch((err) => err); + fluxdRunning = !(blockCountRes instanceof Error && blockCountRes.code === 'ECONNREFUSED'); } if (safe && fluxdRunning) { @@ -1750,12 +1897,17 @@ async function streamChain(req, res) { log.error(error); } finally { // start services - if (isArcane) { - await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] }); - } else { - await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] }); - await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] }); + if (daemonStartRequired) { + daemonStartRequired = false; + + if (isArcane) { + await serviceHelper.runCommand('systemctl', { runAsRoot: false, params: ['start', 'fluxd.service', 'flux-watchdog.service'] }); + } else { + await serviceHelper.runCommand('systemctl', { runAsRoot: true, params: ['start', 'zelcash.service'] }); + await serviceHelper.runCommand('pm2', { runAsRoot: false, params: ['start', 'watchdog', '--watch'] }); + } } + lock = false; } } @@ -1805,6 +1957,7 @@ module.exports = { softUpdateFluxInstall, startBenchmark, startDaemon, + streamChainPreparation, streamChain, tailBenchmarkDebug, tailDaemonDebug, diff --git a/ZelBack/src/services/serviceManager.js b/ZelBack/src/services/serviceManager.js index b35e1fa12..a40c45f68 100644 --- a/ZelBack/src/services/serviceManager.js +++ b/ZelBack/src/services/serviceManager.js @@ -80,6 +80,8 @@ async function startFluxFunctions() { log.info('Temporary database prepared'); log.info('Preparing Flux Apps locations'); await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).dropIndex({ hash: 1 }, { name: 'query for getting zelapp message based on hash' }).catch(() => { console.log('Welcome to FluxOS'); }); // drop old index or display message for new installations + await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.version': 1 }, { name: 'query for getting app message based on version' }); + await databaseTemp.collection(config.database.appsglobal.collections.appsMessages).createIndex({ 'appSpecifications.nodes': 1 }, { name: 'query for getting app message based on nodes' }); // more than 2 hours and 5m. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down. await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 7500 }); await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ name: 1 }, { name: 'query for getting zelapp location based on zelapp specs name' }); diff --git a/package.json b/package.json index ccf717fbc..600fcb9a1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "flux", - "version": "5.36.0", + "version": "5.37.0", "description": "Flux, Your Gateway to a Decentralized World", "repository": { "type": "git", diff --git a/tests/unit/fluxService.test.js b/tests/unit/fluxService.test.js index f7d9bc9eb..9f7e48698 100644 --- a/tests/unit/fluxService.test.js +++ b/tests/unit/fluxService.test.js @@ -26,7 +26,7 @@ const appsService = require('../../ZelBack/src/services/appsService'); const daemonServiceControlRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceControlRpcs'); const daemonServiceBenchmarkRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceBenchmarkRpcs'); const daemonServiceFluxnodeRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); -const daemonServiceBlockchainRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceBlockchainRpcs'); +const daemonServiceUtils = require('../../ZelBack/src/services/daemonService/daemonServiceUtils'); const serviceHelper = require('../../ZelBack/src/services/serviceHelper'); const syncthingService = require('../../ZelBack/src/services/syncthingService'); const packageJson = require('../../package.json'); @@ -2881,7 +2881,7 @@ describe('fluxService tests', () => { let osStub; let statStub; let readdirStub; - let blockchainInfoStub; + let daemonServiceUtilsStub; let tarPackStub; beforeEach(() => { @@ -2889,7 +2889,7 @@ describe('fluxService tests', () => { statStub = sinon.stub(fs, 'stat'); readdirStub = sinon.stub(fs, 'readdir'); - blockchainInfoStub = sinon.stub(daemonServiceBlockchainRpcs, 'getBlockchainInfo'); + daemonServiceUtilsStub = sinon.stub(daemonServiceUtils, 'buildFluxdClient'); tarPackStub = sinon.stub(tar, 'create'); }); @@ -2946,7 +2946,7 @@ describe('fluxService tests', () => { await fluxService.streamChain(req, res); - expect(res.statusMessage).to.equal('Unable to find chain at $HOME/.flux'); + expect(res.statusMessage).to.equal('Unable to find chain'); sinon.assert.calledWithExactly(res.status, 500); sinon.assert.calledOnce(res.end); }); @@ -2971,7 +2971,7 @@ describe('fluxService tests', () => { osStub.returns('/home/testuser'); statStub.resolves({ isDirectory: () => true }); - blockchainInfoStub.resolves({ status: 'success', blocks: 1635577 }); + daemonServiceUtilsStub.resolves({ run: async () => 123456 }); await fluxService.streamChain(req, res); @@ -3019,6 +3019,9 @@ describe('fluxService tests', () => { const totalFileSize = testFiles.length * testFileSize * folderCount; const expectedSize = headerSize + totalFileSize + eof; + const daemonServiceError = new Error(); + daemonServiceError.code = 'ECONNREFUSED'; + statStub.resolves({ isDirectory: () => true, size: testFileSize, @@ -3026,7 +3029,7 @@ describe('fluxService tests', () => { readdirStub.resolves(testFiles); - blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } }); + daemonServiceUtilsStub.resolves({ run: async () => daemonServiceError }); tarPackStub.returns(readable); await fluxService.streamChain(req, res); @@ -3037,6 +3040,8 @@ describe('fluxService tests', () => { const received = []; const req = { socket: { remoteAddress: '10.20.30.40' } }; + const daemonServiceError = new Error(); + daemonServiceError.code = 'ECONNREFUSED'; const res = new Writable({ write(chunk, encoding, done) { @@ -3058,7 +3063,7 @@ describe('fluxService tests', () => { osStub.returns('/home/testuser'); statStub.resolves({ isDirectory: () => true }); - blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } }); + daemonServiceUtilsStub.resolves({ run: async () => daemonServiceError }); tarPackStub.returns(readable); await fluxService.streamChain(req, res); @@ -3070,6 +3075,9 @@ describe('fluxService tests', () => { const req = { socket: { remoteAddress: '10.20.30.40' }, body: { compress: true } }; + const daemonServiceError = new Error(); + daemonServiceError.code = 'ECONNREFUSED'; + const res = zlib.createGunzip(); res.setHeader = sinon.stub(); @@ -3091,7 +3099,7 @@ describe('fluxService tests', () => { osStub.returns('/home/testuser'); statStub.resolves({ isDirectory: () => true }); - blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } }); + daemonServiceUtilsStub.resolves({ run: async () => daemonServiceError }); tarPackStub.returns(readable); await fluxService.streamChain(req, res);