Skip to content

Commit

Permalink
Feature - add chain size header to streamchain (#1416)
Browse files Browse the repository at this point in the history
* Latest changes

* Tidy up

* Add tests, remove not need lock removals
  • Loading branch information
MorningLightMountain713 authored Oct 29, 2024
1 parent 2bcf622 commit 22a44c1
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 30 deletions.
61 changes: 42 additions & 19 deletions ZelBack/src/services/fluxService.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const dockerService = require('./dockerService');

// for streamChain endpoint
const zlib = require('node:zlib');
const tar = require('tar-fs');
const tar = require('tar/create');
// use non promises stream for node 14.x compatibility
// const stream = require('node:stream/promises');
const stream = require('node:stream');
Expand All @@ -37,6 +37,13 @@ const stream = require('node:stream');
*/
let lock = false;

/**
* For testing
*/
function getStreamLock() {
return lock;
}

/**
* For testing
*/
Expand Down Expand Up @@ -1601,17 +1608,16 @@ async function streamChain(req, res) {
lock = true;

/**
* Use the remote address here, don't need to worry about x-forwarded-for headers as
* we only allow the local network. Also, using the remote address is fine as FluxOS
* won't confirm if the upstream is natting behind a private address. I.e public
* connections coming in via a private address. (Flux websockets need the remote address
* or they think there is only one inbound connnection)
*/
* Use the remote address here, don't need to worry about x-forwarded-for headers as
* we only allow the local network. Also, using the remote address is fine as FluxOS
* won't confirm if the upstream is natting behind a private address. I.e public
* connections coming in via a private address. (Flux websockets need the remote address
* or they think there is only one inbound connnection)
*/
let ip = req.socket.remoteAddress;
if (!ip) {
res.statusMessage = 'Socket closed.';
res.status(400).end();
lock = false;
return;
}

Expand All @@ -1621,7 +1627,6 @@ async function streamChain(req, res) {
if (!serviceHelper.isPrivateAddress(ip)) {
res.statusMessage = 'Request must be from an address on the same private network as the host.';
res.status(403).end();
lock = false;
return;
}

Expand All @@ -1630,10 +1635,11 @@ async function streamChain(req, res) {
const homeDir = os.homedir();
const base = path.join(homeDir, '.flux');

// the order can matter when doing the stream live, the level db's can be volatile
const folders = [
'blocks',
'chainstate',
'determ_zelnodes',
'chainstate',
'blocks',
];

const folderPromises = folders.map(async (f) => {
Expand All @@ -1651,7 +1657,6 @@ async function streamChain(req, res) {
if (!chainExists) {
res.statusMessage = 'Unable to find chain at $HOME/.flux';
res.status(500).end();
lock = false;
return;
}

Expand All @@ -1669,7 +1674,6 @@ async function streamChain(req, res) {
if (!safe && compress) {
res.statusMessage = 'Unable to compress blockchain in unsafe mode, it will corrupt new db.';
res.status(422).end();
lock = false;
return;
}

Expand All @@ -1681,15 +1685,33 @@ async function streamChain(req, res) {
if (safe && fluxdRunning) {
res.statusMessage = 'Flux daemon still running, unable to clone blockchain.';
res.status(503).end();
lock = false;
return;
}

const infoPromises = folders.map(
(f) => serviceHelper.dirInfo(path.join(base, f), { padFiles: 512 }),
);
const info = await Promise.all(infoPromises).catch(() => [{ count: 0, size: 0 }]);

const { count, size } = info.reduce((prev, current) => (
{ count: prev.count + current.count, size: prev.size + current.size }), { count: 0, size: 0 });

const tarHeaderSize = count * 512;
// if we get an error getting size, just set eof to 0, which will make totalSize 0
const tarEof = size ? 512 * 2 : 0;

const totalSize = size + tarHeaderSize + tarEof;

// We can't set this as the actual content length. As it can change slightly during transfer.
// However we need to set some size, so that the image installer can get a rought idea
// how log the transfer will take.
res.setHeader('Approx-Content-Length', totalSize.toString());

const workflow = [];

workflow.push(tar.pack(base, {
entries: folders,
}));
const readStream = tar.create({ cwd: base }, folders);

workflow.push(readStream);

if (compress) {
log.info('Compression requested... adding gzip. This can be 10-20x slower than sending uncompressed');
Expand Down Expand Up @@ -1744,7 +1766,6 @@ module.exports = {
hardUpdateFlux,
installFluxWatchTower,
isStaticIPapi,
lockStreamLock,
rebuildHome,
reindexDaemon,
restartBenchmark,
Expand All @@ -1761,11 +1782,13 @@ module.exports = {
tailFluxErrorLog,
tailFluxInfoLog,
tailFluxWarnLog,
unlockStreamLock,
updateBenchmark,
updateDaemon,
updateFlux,
// Exports for testing purposes
fluxLog,
getStreamLock,
lockStreamLock,
tailFluxLog,
unlockStreamLock,
};
43 changes: 43 additions & 0 deletions ZelBack/src/services/serviceHelper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const util = require('node:util');
const path = require('node:path');
const fs = require('node:fs/promises');
const execFile = util.promisify(require('node:child_process').execFile);

const axios = require('axios').default;
Expand Down Expand Up @@ -451,13 +453,54 @@ function minVersionSatisfy(targetVersion, minimumVersion) {
return true;
}

/**
* Recursively sum size of directory and children, in bytes
* @param {string} dir The directory we want the size of
* @param {{padFiles?: number}} options If the files are to be padded to size
* @returns {Promise<number>}
*/
async function dirInfo(dir, options = {}) {
const padFiles = options.padFiles || null;

const files = await fs.readdir(dir, { withFileTypes: true });

const pathPromises = files.map(async (file) => {
const targetpath = path.join(dir, file.name);

if (file.isDirectory()) return dirInfo(targetpath, options);

if (file.isFile()) {
const { size } = await fs.stat(targetpath);

return size;
}

return 0;
});

const paths = await Promise.all(pathPromises);

const response = paths.flat(Infinity).reduce((prev, current) => {
// the paths are either a number, i.e. a file, or a directory, with a count and aggregate size
const { count, size } = typeof current === 'number' ? { count: 1, size: current } : current;

// we only pad if it's a file (a dir has already been padded)
const padding = padFiles && count > 1 ? size % 512 : 0;

return { count: prev.count + count, size: prev.size + size + padding };
}, { count: 0, size: 0 });

return response;
}

module.exports = {
axiosGet,
axiosPost,
commandStringToArray,
axiosInstance,
delay,
deleteLoginPhrase,
dirInfo,
dockerBufferToString,
ensureBoolean,
ensureNumber,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"socket.io": "~4.7.2",
"splitargs": "~0.0.7",
"store": "~2.0.12",
"tar-fs": "~3.0.6",
"tar": "^7.4.3",
"ws": "~7.5.9",
"xterm": "~5.1.0",
"zeltrezjs": "~2.12.0",
Expand Down
80 changes: 70 additions & 10 deletions tests/unit/fluxService.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const path = require('node:path');
const { Readable, Writable } = require('node:stream');
const zlib = require('node:zlib');

const tar = require('tar-fs');
const tar = require('tar/create');

const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
Expand Down Expand Up @@ -2879,19 +2879,21 @@ describe('fluxService tests', () => {

describe('streamChain tests', () => {
let osStub;
let fsStub;
let statStub;
let readdirStub;
let blockchainInfoStub;
let tarPackStub;

beforeEach(() => {
osStub = sinon.stub(os, 'homedir');
fsStub = sinon.stub(fs, 'stat');
statStub = sinon.stub(fs, 'stat');
readdirStub = sinon.stub(fs, 'readdir');

blockchainInfoStub = sinon.stub(daemonServiceBlockchainRpcs, 'getBlockchainInfo');
tarPackStub = sinon.stub(tar, 'pack');
tarPackStub = sinon.stub(tar, 'create');
});

afterEach(() => {
fluxService.unlockStreamLock();
sinon.restore();
});

Expand All @@ -2902,8 +2904,10 @@ describe('fluxService tests', () => {
await fluxService.streamChain(null, res);

expect(res.statusMessage).to.equal('Streaming of chain already in progress, server busy.');
expect(fluxService.getStreamLock()).to.equal(true);
sinon.assert.calledWithExactly(res.status, 503);
sinon.assert.calledOnce(res.end);
fluxService.unlockStreamLock();
});

it('should lock if no other streams are in progress', async () => {
Expand Down Expand Up @@ -2938,7 +2942,7 @@ describe('fluxService tests', () => {

osStub.returns('/home/testuser');

fsStub.rejects(new Error("Test block dir doesn't exist"));
statStub.rejects(new Error("Test block dir doesn't exist"));

await fluxService.streamChain(req, res);

Expand All @@ -2952,7 +2956,7 @@ describe('fluxService tests', () => {
const req = { socket: { remoteAddress: '10.20.30.40' }, body: { unsafe: true, compress: true } };

osStub.returns('/home/testuser');
fsStub.resolves({ isDirectory: () => true });
statStub.resolves({ isDirectory: () => true });

await fluxService.streamChain(req, res);

Expand All @@ -2966,7 +2970,7 @@ describe('fluxService tests', () => {
const req = { socket: { remoteAddress: '10.20.30.40' } };

osStub.returns('/home/testuser');
fsStub.resolves({ isDirectory: () => true });
statStub.resolves({ isDirectory: () => true });
blockchainInfoStub.resolves({ status: 'success', blocks: 1635577 });

await fluxService.streamChain(req, res);
Expand All @@ -2976,6 +2980,59 @@ describe('fluxService tests', () => {
sinon.assert.calledOnce(res.end);
});

it('should set Approx-Content-Length response header with expected value', async () => {
const received = [];

const req = { socket: { remoteAddress: '10.20.30.40' } };

const res = new Writable({
write(chunk, encoding, done) {
received.push(chunk.toString());
done();
},
});

res.setHeader = sinon.stub();

let count = 0;
const readable = new Readable({
read() {
this.push('test');
if (count === 3) this.push(null);
count += 1;
},
});

osStub.returns('/home/testuser');

const createFile = (name) => ({
name,
isDirectory: () => false,
isFile: () => true,
});

const folderCount = 3;
const testFileSize = 1048576;
const testFiles = [...Array(50).keys()].map((x) => createFile(x.toString()));
const headerSize = testFiles.length * 512 * folderCount;
const eof = 1024;
const totalFileSize = testFiles.length * testFileSize * folderCount;
const expectedSize = headerSize + totalFileSize + eof;

statStub.resolves({
isDirectory: () => true,
size: testFileSize,
});

readdirStub.resolves(testFiles);

blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } });
tarPackStub.returns(readable);

await fluxService.streamChain(req, res);
sinon.assert.calledWithExactly(res.setHeader, 'Approx-Content-Length', expectedSize.toString());
});

it('should stream chain uncompressed when no compression requested', async () => {
const received = [];

Expand All @@ -2988,6 +3045,8 @@ describe('fluxService tests', () => {
},
});

res.setHeader = sinon.stub();

let count = 0;
const readable = new Readable({
read() {
Expand All @@ -2998,7 +3057,7 @@ describe('fluxService tests', () => {
});

osStub.returns('/home/testuser');
fsStub.resolves({ isDirectory: () => true });
statStub.resolves({ isDirectory: () => true });
blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } });
tarPackStub.returns(readable);

Expand All @@ -3012,6 +3071,7 @@ describe('fluxService tests', () => {
const req = { socket: { remoteAddress: '10.20.30.40' }, body: { compress: true } };

const res = zlib.createGunzip();
res.setHeader = sinon.stub();

res.on('data', (data) => {
// this gets all data in buffer
Expand All @@ -3030,7 +3090,7 @@ describe('fluxService tests', () => {
});

osStub.returns('/home/testuser');
fsStub.resolves({ isDirectory: () => true });
statStub.resolves({ isDirectory: () => true });
blockchainInfoStub.resolves({ status: 'error', data: { code: 'ECONNREFUSED' } });
tarPackStub.returns(readable);

Expand Down

0 comments on commit 22a44c1

Please sign in to comment.