Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Nwaku Sync #2403

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,8 @@
url = https://github.com/nim-lang/db_connector.git
ignore = untracked
branch = master
[submodule "vendor/negentropy"]
ignore = untracked
path = vendor/negentropy
url = https://github.com/waku-org/negentropy.git
branch = master
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ RUN apk add --no-cache libgcc pcre-dev libpq-dev
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3

# Fix for 'Error loading shared library libnegentropy.so: No such file or directory'
ADD ./libnegentropy.so ./

# Copy to separate location to accomodate different MAKE_TARGET values
COPY --from=nim-build /app/build/$MAKE_TARGET /usr/local/bin/

Expand Down
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,22 @@ release-notes:
sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g'
# I could not get the tool to replace issue ids with links, so using sed for now,
# asked here: https://github.com/bvieira/sv4git/discussions/101

######################
### NEGENTROPY ###
######################
.PHONY: negentropy

## Pass libnegentropy to linker.
NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so

deps: | negentropy

clean: | negentropy-clean

negentropy:
SionoiS marked this conversation as resolved.
Show resolved Hide resolved
$(MAKE) -C vendor/negentropy/cpp && \
cp vendor/negentropy/cpp/libnegentropy.so ./
negentropy-clean:
$(MAKE) -C vendor/negentropy/cpp clean && \
rm libnegentropy.so
3 changes: 3 additions & 0 deletions docker/binaries/Dockerfile.bn.amd64
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ RUN apt-get update &&\
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3

# Fix for 'Error loading shared library libnegentropy.so: No such file or directory'
ADD ./libnegentropy.so ./

# Copy to separate location to accomodate different MAKE_TARGET values
ADD ./build/$MAKE_TARGET /usr/local/bin/

Expand Down
4 changes: 1 addition & 3 deletions examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
flags = CapabilitiesBitfield.init(relay = true)

var enrBuilder = EnrBuilder.init(nodeKey)

Expand Down
4 changes: 1 addition & 3 deletions examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
flags = CapabilitiesBitfield.init(relay = true)

var enrBuilder = EnrBuilder.init(nodeKey)

Expand Down
188 changes: 188 additions & 0 deletions tests/node/test_wakunode_sync.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
{.used.}

import std/net, testutils/unittests, chronos, libp2p/crypto/crypto

import
../../waku/
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync]

suite "Store Sync - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode

asyncSetup:
let timeOrigin = now()

let messages =
@[
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
]

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))

let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)

let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)

assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()

await server.mountStore()
await client.mountStore()

client.mountStoreClient()
server.mountStoreClient()

let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds
)

assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error

# messages are retreived when mounting Waku sync
# but based on interval so this is needed for client only
for msg in messages:
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)

await allFutures(server.start(), client.start())

let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()

client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)

client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)

asyncTeardown:
# prevent premature channel shutdown
await sleepAsync(10.milliseconds)

await allFutures(client.stop(), server.stop())

asyncTest "no message set differences":
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

await sleepAsync(10.milliseconds)

check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

asyncTest "client message set differences":
let msg = fakeWakuMessage(@[byte 10])

client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()

await sleepAsync(10.milliseconds)

check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

asyncTest "server message set differences":
let msg = fakeWakuMessage(@[byte 10])

server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()

await sleepAsync(10.milliseconds)

check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

suite "Waku Sync - Pruning":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode

asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))

let serverArchiveDriver = newSqliteArchiveDriver()
let clientArchiveDriver = newSqliteArchiveDriver()

let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)

assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()

await server.mountStore()
await client.mountStore()

client.mountStoreClient()
server.mountStoreClient()

let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncRange = 1.hours,
syncInterval = 5.minutes,
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0,
syncRange = 10.milliseconds,
syncInterval = 10.milliseconds,
relayJitter = 0.seconds,
)

assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error

await allFutures(server.start(), client.start())

asyncTeardown:
await sleepAsync(10.milliseconds)

await allFutures(client.stop(), server.stop())

asyncTest "pruning":
for _ in 0 ..< 4:
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

await sleepAsync(10.milliseconds)

check:
client.wakuSync.storageSize() == 10
server.wakuSync.storageSize() == 40
37 changes: 37 additions & 0 deletions tests/waku_sync/sync_utils.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{.used.}

import std/options, chronos, chronicles, libp2p/crypto/crypto

import waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore

proc newTestWakuSync*(
switch: Switch,
transfer: Option[TransferCallback] = none(TransferCallback),
prune: Option[PruneCallback] = none(PruneCallback),
interval: Duration = DefaultSyncInterval,
): Future[WakuSync] {.async.} =
let peerManager = PeerManager.new(switch)

let fakePruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
return ok((@[], none(WakuMessageHash)))

let res = await WakuSync.new(
peerManager = peerManager,
relayJitter = 0.seconds,
syncInterval = interval,
wakuArchive = nil,
wakuStoreClient = nil,
pruneCallback = some(fakePruneCallback),
transferCallback = none(TransferCallback),
)

let proto = res.get()

proto.start()
switch.mount(proto)

return proto
3 changes: 3 additions & 0 deletions tests/waku_sync/test_all.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{.used.}

import ./test_protocol, ./test_bindings
Loading
Loading