Skip to content

Commit

Permalink
load the local state on download
Browse files Browse the repository at this point in the history
  • Loading branch information
MKPLKN committed Sep 3, 2024
1 parent 97dadc6 commit b5721c1
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 25 deletions.
79 changes: 60 additions & 19 deletions lib/monitor.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const ReadyResource = require('ready-resource')
const debounce = require('debounceify')
const safetyCatch = require('safety-catch')

module.exports = class Monitor extends ReadyResource {
constructor (drive, opts = {}) {
Expand All @@ -7,20 +9,24 @@ module.exports = class Monitor extends ReadyResource {
this.blobs = null
this.name = opts.name
this.entry = opts.entry
this.isDownload = opts.download === true

this._boundOnAppend = this._onAppend.bind(this)
this._boundUpdateStats = this._updateStats.bind(this)
this._boundOnAppend = debounce(this._onAppend.bind(this))
this._boundOnUpload = this._onUpload.bind(this)
this._boundOnDownload = this._onDownload.bind(this)
this.drive.on('close', () => this.close())

// Updated on each upload/download event
this.stats = {
startTime: 0,
percentage: 0,
peersCount: 0,
speed: null,
blocks: null,
bytes: null,
totalBytes: null,
totalBlocks: null
totalBytes: null, // local + bytes loaded during monitoring
monitoringBytes: null, // bytes loaded during monitoring
targetBytes: null,
targetBlocks: null
}
}

Expand All @@ -29,16 +35,26 @@ module.exports = class Monitor extends ReadyResource {
this.blobs = await this.drive.getBlobs()
this.entry = await this.drive.entry(this.name)
if (this.entry) this._setEntryInfo()

// load the local state for the file.
// upload is a bit more tricky
if (this.entry && this.isDownload) {
await this._loadLocalState().catch(safetyCatch).finally(() => {
this._calculateStats()
this.emit('update')
})
}

// Handlers
this.blobs.core.on('append', this._boundOnAppend)
this.blobs.core.on('upload', this._boundUpdateStats)
this.blobs.core.on('download', this._boundUpdateStats)
this.blobs.core.on('upload', this._boundOnUpload)
this.blobs.core.on('download', this._boundOnDownload)
}

async _close () {
this.blobs.core.off('append', this._boundOnAppend)
this.blobs.core.off('upload', this._boundUpdateStats)
this.blobs.core.off('download', this._boundUpdateStats)
this.blobs.core.off('upload', this._boundOnUpload)
this.blobs.core.off('download', this._boundOnDownload)
}

async _onAppend () {
Expand All @@ -49,25 +65,50 @@ module.exports = class Monitor extends ReadyResource {
}

_setEntryInfo () {
if (this.stats.totalBytes || this.stats.totalBlocks) return
this.stats.totalBytes = this.entry.value.blob.byteLength
this.stats.totalBlocks = this.entry.value.blob.blockLength
if (this.stats.targetBytes || this.stats.targetBlocks) return
this.stats.targetBytes = this.entry.value.blob.byteLength
this.stats.targetBlocks = this.entry.value.blob.blockLength
this.stats.blockOffset = this.entry.value.blob.blockOffset
this.stats.byteOffset = this.entry.value.blob.byteOffset
}

async _onUpload (index, bytes, from) {
this._updateStats(index, bytes, from)
}

async _onDownload (index, bytes, from) {
this._updateStats(index, bytes, from)
}

async _loadLocalState () {
// TODO: think this will only work if its linear
const stream = this.blobs.createReadStream(this.entry.value.blob, { wait: false })
for await (const bytes of stream) {
this.stats.totalBytes += bytes.length
this.stats.blocks++
}
}

_updateStats (index, bytes) {
_updateStats (index, bytes, from) {
if (!this.entry || this.closing) return
if (!isWithinRange(index, this.entry)) return
if (!this.stats.startTime) this.stats.startTime = Date.now()

this.stats.peersCount = from.replicator.peers.length
this.stats.blocks++
this.stats.bytes += bytes
this.stats.percentage = Number(((this.stats.bytes / this.stats.totalBytes) * 100).toFixed(2))
this.stats.monitoringBytes += bytes
this.stats.totalBytes += bytes

this._calculateStats()
this.emit('update')
}

_calculateStats () {
if (!this.stats.startTime) this.stats.startTime = Date.now()
this.stats.percentage = Number(((this.stats.totalBytes / this.stats.targetBytes) * 100).toFixed(2))
const timeElapsed = (Date.now() - this.stats.startTime) / 1000
if (timeElapsed > 0) {
this.stats.speed = Math.floor(this.stats.bytes / timeElapsed) // Speed in bytes/sec
this.stats.speed = Math.floor(this.stats.monitoringBytes / timeElapsed) // Speed in bytes/sec
}

this.emit('update')
}
}

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
},
"homepage": "https://github.com/holepunchto/hyperdrive#readme",
"dependencies": {
"debounceify": "^1.1.0",
"hyperbee": "^2.11.1",
"hyperblobs": "^2.3.0",
"hypercore": "^10.33.0",
Expand Down
44 changes: 38 additions & 6 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1586,9 +1586,9 @@ test('upload/download can be monitored', async (t) => {
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.stats.blocks, expectedBlocks.pop())
t.is(monitor.stats.bytes, expectedBytes.pop())
t.is(monitor.stats.totalBlocks, 2)
t.is(monitor.stats.totalBytes, bytes)
t.is(monitor.stats.monitoringBytes, expectedBytes.pop())
t.is(monitor.stats.targetBlocks, 2)
t.is(monitor.stats.targetBytes, bytes)
})
}

Expand All @@ -1602,15 +1602,47 @@ test('upload/download can be monitored', async (t) => {
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.stats.blocks, expectedBlocks.pop())
t.is(monitor.stats.bytes, expectedBytes.pop())
t.is(monitor.stats.totalBlocks, 2)
t.is(monitor.stats.totalBytes, bytes)
t.is(monitor.stats.monitoringBytes, expectedBytes.pop())
t.is(monitor.stats.targetBlocks, 2)
t.is(monitor.stats.targetBytes, bytes)
})
}

await mirror.drive.get(file)
})

test('monitor loads the local state on download', async (t) => {
t.plan(3)
const { corestore, drive, swarm, mirror } = await testenv(t.teardown)
swarm.on('connection', (conn) => corestore.replicate(conn))
swarm.join(drive.discoveryKey, { server: true, client: false })
await swarm.flush()

mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn))
mirror.swarm.join(drive.discoveryKey, { server: false, client: true })
await mirror.swarm.flush()

const file = '/example.md'
const bytes = 1234
const buffer = Buffer.alloc(bytes, '0')
await drive.put(file, buffer)

observe()
async function observe () {
for await (const _ of mirror.drive.watch()) { /* eslint-disable-line */
await mirror.drive.get(file)
// Start monitoring after we've downloaded the file
const monitor = mirror.drive.monitor(file, { download: true })
monitor.on('update', () => {
t.is(monitor.stats.percentage, 100)
t.is(monitor.stats.totalBytes, bytes)
t.is(monitor.stats.targetBytes, bytes)
})
await monitor.ready()
}
}
})

async function testenv (teardown) {
const corestore = new Corestore(RAM)
await corestore.ready()
Expand Down

0 comments on commit b5721c1

Please sign in to comment.