From 22a44c1114534a1de162225afacefef1e3626f01 Mon Sep 17 00:00:00 2001 From: David White Date: Tue, 29 Oct 2024 10:29:37 +0000 Subject: [PATCH] Feature - add chain size header to streamchain (#1416) * Latest changes * Tidy up * Add tests, remove not need lock removals --- ZelBack/src/services/fluxService.js | 61 +++++++++++++------- ZelBack/src/services/serviceHelper.js | 43 ++++++++++++++ package.json | 2 +- tests/unit/fluxService.test.js | 80 +++++++++++++++++++++++---- 4 files changed, 156 insertions(+), 30 deletions(-) diff --git a/ZelBack/src/services/fluxService.js b/ZelBack/src/services/fluxService.js index 8a02f9481..9b26e45e7 100644 --- a/ZelBack/src/services/fluxService.js +++ b/ZelBack/src/services/fluxService.js @@ -27,7 +27,7 @@ const dockerService = require('./dockerService'); // for streamChain endpoint const zlib = require('node:zlib'); -const tar = require('tar-fs'); +const tar = require('tar/create'); // use non promises stream for node 14.x compatibility // const stream = require('node:stream/promises'); const stream = require('node:stream'); @@ -37,6 +37,13 @@ const stream = require('node:stream'); */ let lock = false; +/** + * For testing + */ +function getStreamLock() { + return lock; +} + /** * For testing */ @@ -1601,17 +1608,16 @@ async function streamChain(req, res) { lock = true; /** - * Use the remote address here, don't need to worry about x-forwarded-for headers as - * we only allow the local network. Also, using the remote address is fine as FluxOS - * won't confirm if the upstream is natting behind a private address. I.e public - * connections coming in via a private address. (Flux websockets need the remote address - * or they think there is only one inbound connnection) - */ + * Use the remote address here, don't need to worry about x-forwarded-for headers as + * we only allow the local network. Also, using the remote address is fine as FluxOS + * won't confirm if the upstream is natting behind a private address. I.e public + * connections coming in via a private address. (Flux websockets need the remote address + * or they think there is only one inbound connnection) + */ let ip = req.socket.remoteAddress; if (!ip) { res.statusMessage = 'Socket closed.'; res.status(400).end(); - lock = false; return; } @@ -1621,7 +1627,6 @@ async function streamChain(req, res) { if (!serviceHelper.isPrivateAddress(ip)) { res.statusMessage = 'Request must be from an address on the same private network as the host.'; res.status(403).end(); - lock = false; return; } @@ -1630,10 +1635,11 @@ async function streamChain(req, res) { const homeDir = os.homedir(); const base = path.join(homeDir, '.flux'); + // the order can matter when doing the stream live, the level db's can be volatile const folders = [ - 'blocks', - 'chainstate', 'determ_zelnodes', + 'chainstate', + 'blocks', ]; const folderPromises = folders.map(async (f) => { @@ -1651,7 +1657,6 @@ async function streamChain(req, res) { if (!chainExists) { res.statusMessage = 'Unable to find chain at $HOME/.flux'; res.status(500).end(); - lock = false; return; } @@ -1669,7 +1674,6 @@ async function streamChain(req, res) { if (!safe && compress) { res.statusMessage = 'Unable to compress blockchain in unsafe mode, it will corrupt new db.'; res.status(422).end(); - lock = false; return; } @@ -1681,15 +1685,33 @@ async function streamChain(req, res) { if (safe && fluxdRunning) { res.statusMessage = 'Flux daemon still running, unable to clone blockchain.'; res.status(503).end(); - lock = false; return; } + const infoPromises = folders.map( + (f) => serviceHelper.dirInfo(path.join(base, f), { padFiles: 512 }), + ); + const info = await Promise.all(infoPromises).catch(() => [{ count: 0, size: 0 }]); + + const { count, size } = info.reduce((prev, current) => ( + { count: prev.count + current.count, size: prev.size + current.size }), { count: 0, size: 0 }); + + const tarHeaderSize = count * 512; + // if we get an error getting size, just set eof to 0, which will make totalSize 0 + const tarEof = size ? 512 * 2 : 0; + + const totalSize = size + tarHeaderSize + tarEof; + + // We can't set this as the actual content length. As it can change slightly during transfer. + // However we need to set some size, so that the image installer can get a rought idea + // how log the transfer will take. + res.setHeader('Approx-Content-Length', totalSize.toString()); + const workflow = []; - workflow.push(tar.pack(base, { - entries: folders, - })); + const readStream = tar.create({ cwd: base }, folders); + + workflow.push(readStream); if (compress) { log.info('Compression requested... adding gzip. This can be 10-20x slower than sending uncompressed'); @@ -1744,7 +1766,6 @@ module.exports = { hardUpdateFlux, installFluxWatchTower, isStaticIPapi, - lockStreamLock, rebuildHome, reindexDaemon, restartBenchmark, @@ -1761,11 +1782,13 @@ module.exports = { tailFluxErrorLog, tailFluxInfoLog, tailFluxWarnLog, - unlockStreamLock, updateBenchmark, updateDaemon, updateFlux, // Exports for testing purposes fluxLog, + getStreamLock, + lockStreamLock, tailFluxLog, + unlockStreamLock, }; diff --git a/ZelBack/src/services/serviceHelper.js b/ZelBack/src/services/serviceHelper.js index c6f1a6fad..32057407e 100644 --- a/ZelBack/src/services/serviceHelper.js +++ b/ZelBack/src/services/serviceHelper.js @@ -1,4 +1,6 @@ const util = require('node:util'); +const path = require('node:path'); +const fs = require('node:fs/promises'); const execFile = util.promisify(require('node:child_process').execFile); const axios = require('axios').default; @@ -451,6 +453,46 @@ function minVersionSatisfy(targetVersion, minimumVersion) { return true; } +/** + * Recursively sum size of directory and children, in bytes + * @param {string} dir The directory we want the size of + * @param {{padFiles?: number}} options If the files are to be padded to size + * @returns {Promise} + */ +async function dirInfo(dir, options = {}) { + const padFiles = options.padFiles || null; + + const files = await fs.readdir(dir, { withFileTypes: true }); + + const pathPromises = files.map(async (file) => { + const targetpath = path.join(dir, file.name); + + if (file.isDirectory()) return dirInfo(targetpath, options); + + if (file.isFile()) { + const { size } = await fs.stat(targetpath); + + return size; + } + + return 0; + }); + + const paths = await Promise.all(pathPromises); + + const response = paths.flat(Infinity).reduce((prev, current) => { + // the paths are either a number, i.e. a file, or a directory, with a count and aggregate size + const { count, size } = typeof current === 'number' ? { count: 1, size: current } : current; + + // we only pad if it's a file (a dir has already been padded) + const padding = padFiles && count > 1 ? size % 512 : 0; + + return { count: prev.count + count, size: prev.size + size + padding }; + }, { count: 0, size: 0 }); + + return response; +} + module.exports = { axiosGet, axiosPost, @@ -458,6 +500,7 @@ module.exports = { axiosInstance, delay, deleteLoginPhrase, + dirInfo, dockerBufferToString, ensureBoolean, ensureNumber, diff --git a/package.json b/package.json index e303767f4..fac210a36 100644 --- a/package.json +++ b/package.json @@ -117,7 +117,7 @@ "socket.io": "~4.7.2", "splitargs": "~0.0.7", "store": "~2.0.12", - "tar-fs": "~3.0.6", + "tar": "^7.4.3", "ws": "~7.5.9", "xterm": "~5.1.0", "zeltrezjs": "~2.12.0", diff --git a/tests/unit/fluxService.test.js b/tests/unit/fluxService.test.js index 7ecd18e5e..f7d9bc9eb 100644 --- a/tests/unit/fluxService.test.js +++ b/tests/unit/fluxService.test.js @@ -5,7 +5,7 @@ const path = require('node:path'); const { Readable, Writable } = require('node:stream'); const zlib = require('node:zlib'); -const tar = require('tar-fs'); +const tar = require('tar/create'); const chai = require('chai'); const chaiAsPromised = require('chai-as-promised'); @@ -2879,19 +2879,21 @@ describe('fluxService tests', () => { describe('streamChain tests', () => { let osStub; - let fsStub; + let statStub; + let readdirStub; let blockchainInfoStub; let tarPackStub; beforeEach(() => { osStub = sinon.stub(os, 'homedir'); - fsStub = sinon.stub(fs, 'stat'); + statStub = sinon.stub(fs, 'stat'); + readdirStub = sinon.stub(fs, 'readdir'); + blockchainInfoStub = sinon.stub(daemonServiceBlockchainRpcs, 'getBlockchainInfo'); - tarPackStub = sinon.stub(tar, 'pack'); + tarPackStub = sinon.stub(tar, 'create'); }); afterEach(() => { - fluxService.unlockStreamLock(); sinon.restore(); }); @@ -2902,8 +2904,10 @@ describe('fluxService tests', () => { await fluxService.streamChain(null, res); expect(res.statusMessage).to.equal('Streaming of chain already in progress, server busy.'); + expect(fluxService.getStreamLock()).to.equal(true); sinon.assert.calledWithExactly(res.status, 503); sinon.assert.calledOnce(res.end); + fluxService.unlockStreamLock(); }); it('should lock if no other streams are in progress', async () => { @@ -2938,7 +2942,7 @@ describe('fluxService tests', () => { osStub.returns('/home/testuser'); - fsStub.rejects(new Error("Test block dir doesn't exist")); + statStub.rejects(new Error("Test block dir doesn't exist")); await fluxService.streamChain(req, res); @@ -2952,7 +2956,7 @@ describe('fluxService tests', () => { const req = { socket: { remoteAddress: '10.20.30.40' }, body: { unsafe: true, compress: true } }; osStub.returns('/home/testuser'); - fsStub.resolves({ isDirectory: () => true }); + statStub.resolves({ isDirectory: () => true }); await fluxService.streamChain(req, res); @@ -2966,7 +2970,7 @@ describe('fluxService tests', () => { const req = { socket: { remoteAddress: '10.20.30.40' } }; osStub.returns('/home/testuser'); - fsStub.resolves({ isDirectory: () => true }); + statStub.resolves({ isDirectory: () => true }); blockchainInfoStub.resolves({ status: 'success', blocks: 1635577 }); await fluxService.streamChain(req, res); @@ -2976,6 +2980,59 @@ describe('fluxService tests', () => { sinon.assert.calledOnce(res.end); }); + it('should set Approx-Content-Length response header with expected value', async () => { + const received = []; + + const req = { socket: { remoteAddress: '10.20.30.40' } }; + + const res = new Writable({ + write(chunk, encoding, done) { + received.push(chunk.toString()); + done(); + }, + }); + + res.setHeader = sinon.stub(); + + let count = 0; + const readable = new Readable({ + read() { + this.push('test'); + if (count === 3) this.push(null); + count += 1; + }, + }); + + osStub.returns('/home/testuser'); + + const createFile = (name) => ({ + name, + isDirectory: () => false, + isFile: () => true, + }); + + const folderCount = 3; + const testFileSize = 1048576; + const testFiles = [...Array(50).keys()].map((x) => createFile(x.toString())); + const headerSize = testFiles.length * 512 * folderCount; + const eof = 1024; + const totalFileSize = testFiles.length * testFileSize * folderCount; + const expectedSize = headerSize + totalFileSize + eof; + + statStub.resolves({ + isDirectory: () => true, + size: testFileSize, + }); + + readdirStub.resolves(testFiles); + + blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } }); + tarPackStub.returns(readable); + + await fluxService.streamChain(req, res); + sinon.assert.calledWithExactly(res.setHeader, 'Approx-Content-Length', expectedSize.toString()); + }); + it('should stream chain uncompressed when no compression requested', async () => { const received = []; @@ -2988,6 +3045,8 @@ describe('fluxService tests', () => { }, }); + res.setHeader = sinon.stub(); + let count = 0; const readable = new Readable({ read() { @@ -2998,7 +3057,7 @@ describe('fluxService tests', () => { }); osStub.returns('/home/testuser'); - fsStub.resolves({ isDirectory: () => true }); + statStub.resolves({ isDirectory: () => true }); blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } }); tarPackStub.returns(readable); @@ -3012,6 +3071,7 @@ describe('fluxService tests', () => { const req = { socket: { remoteAddress: '10.20.30.40' }, body: { compress: true } }; const res = zlib.createGunzip(); + res.setHeader = sinon.stub(); res.on('data', (data) => { // this gets all data in buffer @@ -3030,7 +3090,7 @@ describe('fluxService tests', () => { }); osStub.returns('/home/testuser'); - fsStub.resolves({ isDirectory: () => true }); + statStub.resolves({ isDirectory: () => true }); blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } }); tarPackStub.returns(readable);