Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Sonar to Hypercore v10 and Hyperswarm v4 #87

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 38 additions & 33 deletions packages/core/lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PeerMap extends EventEmitter {
const feeds = []
for (const feed of peer.feeds) {
feeds.push({
key: feed.feed.key.toString('hex'),
key: feed.key.toString('hex'),
stats: feed.stats
})
}
Expand All @@ -58,29 +58,26 @@ class PeerMap extends EventEmitter {
return list
}

add (peer) {
const remotePublicKey = peer.stream.stream.remotePublicKey.toString('hex')
add (peer, feed) {
const remotePublicKey = peer.remotePublicKey.toString('hex')
// this.log.debug('add peer', feed.key.toString('hex').slice(0, 8), remotePublicKey.slice(0, 9))
const info = {
remotePublicKey,
remoteAddress: peer.stream.stream.remoteAddress
remotePublicKey
}
if (!this.peers.has(remotePublicKey)) {
const feeds = [peer]
const feeds = new Set()
this.peers.set(remotePublicKey, { info, feeds })
this.emit('add', info)
} else {
const existing = this.peers.get(remotePublicKey)
existing.feeds = existing.feeds.filter(info => info.feed.key.toString('hex') !== peer.feed.key.toString('hex'))
existing.feeds.push(peer)
}
this.peers.get(remotePublicKey).feeds.add(feed)
}

remove (peer) {
const remotePublicKey = peer.stream.stream.remotePublicKey
remove (peer, feed) {
const remotePublicKey = peer.remotePublicKey
if (!this.peers.has(remotePublicKey)) return
const existing = this.peers.get(remotePublicKey)
existing.feeds = peer.feeds.filter(info => info.feed.key.toString('hex') !== peer.feed.key.toString('hex'))
if (!existing.feeds.length) {
existing.feeds.delete(feed)
if (!existing.feeds.size) {
this.peers.delete(remotePublicKey)
this.emit('remove', existing.info)
}
Expand Down Expand Up @@ -310,14 +307,14 @@ class Collection extends Nanoresource {
}

async configure (configuration = {}, save = true) {
let promise
let discovery
if (configuration.share !== false) {
promise = this._workspace.network.configure(this.discoveryKey, {
discovery = this._workspace.sdk.join(this.discoveryKey, {
announce: true,
lookup: true
})
} else {
promise = this._workspace.network.configure(this.discoveryKey, {
discovery = this._workspace.sdk.leave(this.discoveryKey, {
announce: false,
lookup: false
})
Expand Down Expand Up @@ -575,6 +572,8 @@ class Collection extends Nanoresource {
}
const self = this
cb = maybeCallback(cb)
// setTimeout(cb, 100)
// return cb.promise

// const timeout = setTimeout(() => {
// cb(new Error('Sync timeout')
Expand All @@ -599,7 +598,6 @@ class Collection extends Nanoresource {

function syncKappa () {
self._kappa.ready(views, () => {
// clearTimeout(timeout)
cb()
})
}
Expand Down Expand Up @@ -635,6 +633,7 @@ class Collection extends Nanoresource {
status (cb) {
const feeds = this.feeds().map(feed => feedStatus(this, feed))
const kappa = this._kappa.getState()
// TODO v10: Port.
const network = this._workspace.network.status(this.discoveryKey)
const config = this.getConfig() || DEFAULT_CONFIG
const status = {
Expand All @@ -648,7 +647,7 @@ class Collection extends Nanoresource {
length: this.length,
feeds,
kappa,
network,
// network,
config,
peers: this._peers.list()
}
Expand Down Expand Up @@ -888,17 +887,15 @@ class Collection extends Nanoresource {
})
// wait a tick
await new Promise(resolve => process.nextTick(resolve))
await new Promise(resolve => {
this._indexer.close(resolve)
})
await this._indexer.close()
this.emit('close')
this._eventStream.destroy()
}

async _initFeed (keyOrName, info = {}, opts = {}) {
// Open the feed if it's not yet opened
// if (this._feeds.has(keyOrName)) return this._feeds.get(keyOrName)
const feed = this._workspace.Hypercore(keyOrName)
const feed = await this._workspace.Hypercore(keyOrName)
if (!feed.opened) await feed.ready()
const hkey = feed.key.toString('hex')
// Recheck if feed is opened (if it was opened by name first)
Expand All @@ -915,12 +912,16 @@ class Collection extends Nanoresource {
feed.on('remote-update', () => {
this.emit('remote-update', feed)
})
feed.on('peer-add', peer => {
// console.log('peer-open', keyOrName, peer)
this._peers.add(peer, feed)
})
feed.on('peer-open', peer => {
// console.log('peer-open', keyOrName, peer)
this._peers.add(peer)
this._peers.add(peer, feed)
})
feed.on('peer-remove', peer => {
this._peers.remove(peer)
this._peers.remove(peer, feed)
})

// If the feed is unknown add it to the local feed store.
Expand Down Expand Up @@ -956,13 +957,16 @@ class Collection extends Nanoresource {

// Look for the feed in the swarm if added by myself
if (!opts.origin || opts.origin === this.localKey.toString('hex')) {
const _networkPromise = this._workspace.network.configure(
feed.discoveryKey,
{
announce: true,
lookup: true
}
)
if (!this._workspace.closing) {
await this._workspace.sdk.join(feed.discoveryKey).flushed()
}
// const _networkPromise = this._workspace.network.configure(
// feed.discoveryKey,
// {
// announce: true,
// lookup: true
// }
// )
}

this.emit('feed', feed, info)
Expand Down Expand Up @@ -1341,7 +1345,8 @@ function feedStatus (collection, feed) {
type: info.type
}
if (feed.opened) {
status.downloadedVersions = feed.downloaded(0, feed.length)
// TODO: Port to v10
// status.downloadedVersions = feed.downloaded(0, feed.length)
status.stats = feed.stats
}
return status
Expand Down
7 changes: 7 additions & 0 deletions packages/core/lib/sdk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = async function createSDK (opts = {}) {
opts.autoJoin = false
delete opts.swarm
const { create } = await import('hyper-sdk')
const sdk = await create(opts)
return sdk
}
6 changes: 1 addition & 5 deletions packages/core/lib/version-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ module.exports = class VersionCache {

async _getVersion (id, key, seq, getOpts = {}) {
const feed = this.corestore.get({ key })
const rawVersion = await new Promise((resolve, reject) => {
feed.get(seq, getOpts, (err, version) =>
err ? reject(err) : resolve(version)
)
})
const rawVersion = await feed.get(seq, getOpts)
const mappedVersion = this.map(rawVersion, {
key: key.toString('hex'),
seq
Expand Down
5 changes: 3 additions & 2 deletions packages/core/lib/workspace-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ const {
} = require('nanoresource-promise/emitter')
const { promisify } = require('util')
const mkdirp = promisify(require('mkdirp-classic'))
const DatSDK = require('hyper-sdk')
const createSDK = require('./sdk')
// const DatSDK = require('hyper-sdk')
const p = require('path')
const RAF = require('random-access-file')
const RAM = require('random-access-memory')
Expand Down Expand Up @@ -62,7 +63,7 @@ module.exports = class WorkspaceManager extends Nanoresource {
} else {
sdkOpts.storage = file => RAF(this.storagePath('cores/' + file))
}
this.sdk = await DatSDK(sdkOpts)
this.sdk = await createSDK(sdkOpts)
this.ownSDK = true
} else {
this.sdk = this.opts.sdk
Expand Down
45 changes: 33 additions & 12 deletions packages/core/lib/workspace.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const DatSDK = require('hyper-sdk')
// const DatSDK = require('hyper-sdk')
// TODO: Think about using the hyperspace daemon :-)
// const DatSDK = require('hyper-sdk/hyperspace')
const RAF = require('random-access-file')
Expand All @@ -17,6 +17,7 @@ const {
const { createLogger } = require('@arsonar/common')

const Collection = require('./collection')
const createSDK = require('./sdk')
const LevelMap = require('./utils/level-map')
const {
defaultStoragePath,
Expand Down Expand Up @@ -58,11 +59,19 @@ module.exports = class Workspace extends Nanoresource {
}

get corestore () {
return this._sdk._corestore
return this._sdk.corestore
}

get network () {
return this._sdk._swarm
return this._sdk.swarm
}

get swarm () {
return this._sdk.swarm
}

get sdk () {
return this._sdk
}

get persist () {
Expand All @@ -84,9 +93,9 @@ module.exports = class Workspace extends Nanoresource {
if (this._opts.persist === false) {
sdkOpts.storage = RAM
} else {
sdkOpts.storage = file => RAF(this.storagePath('cores/' + file))
sdkOpts.storage = file => new RAF(this.storagePath('cores/' + file))
}
this._sdk = await DatSDK(sdkOpts)
this._sdk = await createSDK(sdkOpts)
this._ownSDK = true
} else {
this._sdk = this._opts.sdk
Expand Down Expand Up @@ -129,23 +138,33 @@ module.exports = class Workspace extends Nanoresource {
}

async _close () {
this._closing = true
if (!this.opened) await this.open()
await new Promise(resolve => process.nextTick(resolve))
const promises = this.collections().map(c => c.close())
await Promise.all(promises)
await this._workspaceInfo.close()
await this._collectionInfo.close()
if (this._ownSDK) {
await this._sdk.swarm.destroy()
await this._sdk.corestore.close()
}
try {
await this._leveldb.close()
} catch (err) {}
if (this._ownSDK) {
await this._sdk.close()
}
// if (this._ownSDK) {
// await this._sdk.close()
// }
this._sdk = null
// this._sdk = null
// this._leveldb = null
// this._collections = new Map()
this.emit('close')
this._closed = true
}

get closing () {
return this._closing || this._closed
}

async status () {
Expand Down Expand Up @@ -200,7 +219,7 @@ module.exports = class Workspace extends Nanoresource {
}

async _nameToKey (keyOrName) {
const core = this.Hypercore(keyOrName)
const core = await this.Hypercore(keyOrName)
await core.ready()
return core.key
}
Expand Down Expand Up @@ -322,15 +341,17 @@ module.exports = class Workspace extends Nanoresource {
return subdb
}

Hypercore (keyOrName, opts = {}) {
async Hypercore (keyOrName, opts = {}) {
if (this.closing) throw new Error('Workspace is closed or closing.')
if (opts.announce === undefined) {
opts.announce = false
}
if (opts.lookup === undefined) {
opts.lookup = false
}
const core = this._sdk.Hypercore(keyOrName, opts)
core.setMaxListeners(128)
await this.ready()
const core = await this._sdk.get(keyOrName, opts)
// core.setMaxListeners(128)
return core
}

Expand Down
20 changes: 11 additions & 9 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
"@arsonar/common": "^0.6.9",
"@arsonar/plugin-relations": "^0.6.11",
"@arsonar/plugin-search": "^0.6.11",
"@frando/kappa-core": "^6.0.0",
"@frando/kappa-core": "https://github.com/Frando/kappa-core#v10",
"@frando/level-live": "^1.1.0",
"base32": "^0.0.6",
"charwise": "^3.0.1",
"codecs": "^2.1.0",
"dat-encoding": "^5.0.1",
"debug": "^4.1.1",
"fast-bitfield": "^1.2.2",
"hyper-sdk": "^3.0.9",
"hyperblobs": "^1.1.2",
"hypercore-crypto": "^2.1.1",
"hyper-sdk": "https://github.com/RangerMauve/hyper-sdk.git#hypercore10",
"hyperblobs": "^2",
"hypercore-crypto": "^3.2",
"inspect-custom-symbol": "^1.1.1",
"js-yaml": "^3.14.0",
"kappa-sparse-indexer": "^0.7.4",
"kappa-sparse-indexer": "https://github.com/Frando/kappa-sparse-indexer#849b69a",
"level": "^6.0.1",
"level-mem": "^5.0.1",
"lru-cache": "^6.0.0",
Expand All @@ -46,8 +46,8 @@
"pretty-hash": "^1.0.1",
"protocol-buffers-encodings": "^1.1.1",
"pump": "^3.0.0",
"random-access-file": "^2.1.4",
"random-access-memory": "^3.1.1",
"random-access-file": "^4.0.0",
"random-access-memory": "^6",
"randombytes": "^2.1.0",
"range-parser": "^1.2.1",
"rimraf": "^3.0.2",
Expand All @@ -60,8 +60,10 @@
"unordered-materialized-kv": "^1.3.0"
},
"devDependencies": {
"@hyperswarm/dht": "^4.0.1",
"hyperspace": "^3.15.0",
"@hyperswarm/dht": "^6",
"@hyperswarm/testnet": "^3.1.0",
"brittle": "^3.1.1",
"nonsynchronous": "^1.2.0",
"tape": "^5.0",
"temporary-directory": "^1.0.2",
"webnet": "^1.0.0",
Expand Down
3 changes: 2 additions & 1 deletion packages/core/test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ tape('batch and query', async t => {
await cleanup()
})

tape('share and unshare workspace', async t => {
// TODO v10: Port swarm status
tape.skip('share and unshare workspace', async t => {
const { cleanup, workspace } = await createOne()
const collection = await workspace.createCollection('default')
const config = await collection.getConfig()
Expand Down
Loading