Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add removeFeed(name|key) #56

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
152 changes: 130 additions & 22 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ function Multifeed (storage, opts) {

// random-access-storage wrapper that wraps all hypercores in a directory
// structures. (dir/0, dir/1, ...)
this._dirs = {}
this._max_dir = -1
this._storage = function (dir) {
return function (name) {
var s = storage
Expand Down Expand Up @@ -92,6 +94,51 @@ Multifeed.prototype._addFeed = function (feed, name) {
this._forwardLiveFeedAnnouncements(feed, name)
}

Multifeed.prototype.removeFeed = function (nameOrKey, cb) {
if (typeof cb !== 'function') cb = function noop () {}

var self = this

var feed = null
var name = null
var key = null

if (nameOrKey in self._feeds) {
name = nameOrKey
feed = self._feeds[name]
key = feed.key.toString('hex')
} else {
key = nameOrKey
feed = self._feedKeyToFeed[key]
name = Object.keys(self._feeds).find(key => self._feeds[key] === feed)
}

delete self._feeds[name]
delete self._feedKeyToFeed[key]

// Remove from dirs index
Object.keys(self._dirs).forEach((dir) => {
if (self._dirs[dir] === feed) {
delete self._dirs[dir]
}
})

// Remove from mux offering
self._streams.forEach((mux) => {
var idx = mux._localOffer.indexOf(key)
if (idx !== -1) {
mux._localOffer.splice(idx, 1)
}
})

self.writerLock(function (release) {
feed.destroyStorage(function (err) {
if (err) return self.emit('error', err)
self._updateStorageIndex(function () { release(cb) })
})
})
}

Multifeed.prototype.ready = function (cb) {
this._ready(cb)
}
Expand Down Expand Up @@ -133,32 +180,52 @@ function _close (cb) {
})
}

Multifeed.prototype._updateStorageIndex = function (cb) {
if (typeof cb !== 'function') cb = function noop () {}

var self = this

var dirs = Object.keys(self._dirs).join(',')
self._max_dir = Math.max.apply(null, Object.keys(self._dirs).map(Number))

var st = self._storage('index')('dirs')
writeStringToStorage(dirs, st, cb)
}

Multifeed.prototype._loadFeeds = function (cb) {
var self = this

// Hypercores are stored starting at 0 and incrementing by 1. A failed read
// at position 0 implies non-existance of the hypercore.
// Hypercores are stored via an index file in numbers directories. If no index
// is found, the structure is assumed to be legacy which starts at 0 and
// increments by 1. A failed read at position 0 implies non-existance of the
// hypercore and if legacy means the end of loading.
var doneOnErr = true
var nextDir = function (dir) { return dir + 1 }

var pending = 1
function next (n) {
var storage = self._storage('' + n)
function next (dir) {
if (!dir && typeof dir !== 'number') return done()

var storage = self._storage('' + dir)
var st = storage('key')
st.read(0, 4, function (err) {
if (err) return done() // means there are no more feeds to read
debug(self._id + ' [INIT] loading feed #' + n)
if (err && doneOnErr) return done() // means there are no more feeds to read
debug(self._id + ' [INIT] loading feed #' + dir)
pending++
var feed = self._hypercore(storage, self._opts)
process.nextTick(next, n + 1)
process.nextTick(next, nextDir(dir))

feed.ready(function () {
readStringFromStorage(storage('localname'), function (err, name) {
if (!err && name) {
self._addFeed(feed, name)
} else {
self._addFeed(feed, String(n))
self._addFeed(feed, String(dir))
}
self._dirs[dir] = feed
st.close(function (err) {
if (err) return done(err)
debug(self._id + ' [INIT] loaded feed #' + n)
debug(self._id + ' [INIT] loaded feed #' + dir)
done()
})
})
Expand All @@ -174,7 +241,30 @@ Multifeed.prototype._loadFeeds = function (cb) {
if (!--pending) cb()
}

next(0)
var indexSt = self._storage('index')('dirs')

readStringFromStorage(indexSt, function (err, dirs) {
if (err) {
next(0)
} else {
doneOnErr = false
dirs = dirs ? dirs.split(',') : []

// Update max dir on load
self._max_dir = Math.max.apply(null, dirs.map(Number).concat(self._max_dir))

nextDir = function (dir) {
var idx = dirs.indexOf(dir)
if (idx < dirs.length - 1) {
return dirs[idx + 1]
} else {
return ''
}
}

next(dirs[0])
}
})
}

Multifeed.prototype.writer = function (name, opts, cb) {
Expand All @@ -201,10 +291,10 @@ Multifeed.prototype.writer = function (name, opts, cb) {
debug(self._id + ' [WRITER] creating new writer: ' + name)

self.writerLock(function (release) {
var len = Object.keys(self._feeds).length
var storage = self._storage('' + len)
var dir = self._max_dir + 1
var storage = self._storage('' + dir)

var idx = name || String(len)
var idx = name || String(dir)

var nameStore = storage('localname')
writeStringToStorage(idx, nameStore, function (err) {
Expand All @@ -224,9 +314,12 @@ Multifeed.prototype.writer = function (name, opts, cb) {

feed.ready(function () {
self._addFeed(feed, String(idx))
release(function () {
if (err) cb(err)
else cb(null, feed, idx)
self._dirs[dir] = feed
self._updateStorageIndex(function (err) {
release(function () {
if (err) cb(err)
else cb(null, feed, idx)
})
})
})
})
Expand Down Expand Up @@ -332,7 +425,7 @@ Multifeed.prototype.replicate = function (isInitiator, opts) {
return !Number.isNaN(parseInt(key, 16)) && key.length === 64
})

var numFeeds = Object.keys(self._feeds).length
var numFeeds = self._max_dir + 1
var keyId = numFeeds
filtered.forEach(function (key) {
var feeds = values(self._feeds).filter(function (feed) {
Expand All @@ -355,6 +448,8 @@ Multifeed.prototype.replicate = function (isInitiator, opts) {
}
feed.ready(function () {
self._addFeed(feed, myKey)
self._dirs[myKey] = feed
self._updateStorageIndex()
keyId++
debug(self._id + ' [REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex'))
if (!--pending) cb()
Expand All @@ -378,12 +473,25 @@ Multifeed.prototype._forwardLiveFeedAnnouncements = function (feed, name) {
})
}

// TODO: what if the new data is shorter than the old data? things will break!
function writeStringToStorage (string, storage, cb) {
var buf = Buffer.from(string, 'utf8')
storage.write(0, buf, function (err) {
storage.close(function (err2) {
cb(err || err2)
function writeBuffer () {
var buf = Buffer.from(string, 'utf8')
storage.write(0, buf, function (err) {
storage.close(function (err2) {
cb(err || err2)
})
})
}

// Check if data already exists
storage.stat(function (err, stat) {
if (err) return writeBuffer()

var len = stat.size
storage.del(0, len, function (err) {
if (err) return cb(err)

writeBuffer()
})
})
}
Expand Down
Loading