Skip to content

Commit

Permalink
feat: Waku Sync Protocol
Browse files Browse the repository at this point in the history
Co-authored-by: Ivan FB <[email protected]>
  • Loading branch information
SionoiS and Ivansete-status committed Mar 7, 2024
1 parent 161a10e commit a31d630
Show file tree
Hide file tree
Showing 11 changed files with 715 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,8 @@
branch = master
path = vendor/nim-results
url = https://github.com/arnetheduck/nim-results.git
[submodule "vendor/negentropy"]
ignore = untracked
path = vendor/negentropy
url = https://github.com/waku-org/negentropy.git
branch = feat/c-wrapper
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ else # "variables.mk" was included. Business as usual until the end of this file
##########
## Main ##
##########
.PHONY: all test update clean
.PHONY: all test update clean negentropy

# default target, because it's the first one that doesn't start with '.'
all: | wakunode2 example2 chat2 chat2bridge libwaku
all: | negentropy wakunode2 example2 chat2 chat2bridge libwaku

test: | testcommon testwaku

Expand All @@ -46,7 +46,7 @@ update: | update-common
rm -rf waku.nims && \
$(MAKE) waku.nims $(HANDLE_OUTPUT)

clean:
clean: | negentropy-clean
rm -rf build

# must be included after the default target
Expand Down Expand Up @@ -182,19 +182,19 @@ testcommon: | build deps
.PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge

# install anvil only for the testwaku target
testwaku: | build deps anvil librln
testwaku: | build deps anvil librln negentropy
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims

wakunode2: | build deps librln
wakunode2: | build deps librln negentropy
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) waku.nims

benchmarks: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim benchmarks $(NIM_PARAMS) waku.nims

testwakunode2: | build deps librln
testwakunode2: | build deps librln negentropy
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim testwakunode2 $(NIM_PARAMS) waku.nims

Expand Down Expand Up @@ -335,3 +335,9 @@ release-notes:
sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g'
# I could not get the tool to replace issue ids with links, so using sed for now,
# asked here: https://github.com/bvieira/sv4git/discussions/101
negentropy:
$(MAKE) -C vendor/negentropy/cpp && \
cp vendor/negentropy/cpp/libnegentropy.so ./
negentropy-clean:
$(MAKE) -C vendor/negentropy/cpp clean && \
rm libnegentropy.so
29 changes: 29 additions & 0 deletions tests/waku_sync/sync_utils.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{.used.}

import
std/options,
chronos,
chronicles,
libp2p/crypto/crypto

import
../../../waku/[
node/peer_manager,
waku_core,
waku_sync,
],
../testlib/[
common,
wakucore
]

proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} =
const DefaultFrameSize = 153600
let
peerManager = PeerManager.new(switch)
proto = WakuSync.new(peerManager, DefaultFrameSize, 2.seconds, some(handler))

proto.start()
switch.mount(proto)

return proto
4 changes: 4 additions & 0 deletions tests/waku_sync/test_all.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{.used.}

import
./test_protocol
137 changes: 137 additions & 0 deletions tests/waku_sync/test_protocol.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
{.used.}

import
std/options,
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto,
stew/byteutils
from std/os import sleep

import
../../../waku/[
common/paging,
node/peer_manager,
waku_core,
waku_core/message/digest,
waku_sync,
waku_sync/raw_bindings,
],
../testlib/[
common,
wakucore
],
./sync_utils


suite "Waku Sync - Protocol Tests":

asyncTest "test c integration":
let
s1 = Storage.new()
s2 = Storage.new()
ng1 = Negentropy.new(s1,10000)
ng2 = Negentropy.new(s2,10000)

let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg1)

check:
s1.insert(msg1.timestamp, msgHash).isOk()
s2.insert(msg1.timestamp, msgHash).isOk()

let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic)
let msgHash2: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2)

check:
s2.insert(msg2.timestamp, msgHash2).isOk()

let ng1_q1 = ng1.initiate()
check:
ng1_q1.isOk()

let ng2_q1 = ng2.serverReconcile(ng1_q1.get())
check:
ng2_q1.isOk()

var
haveHashes: seq[WakuMessageHash]
needHashes: seq[WakuMessageHash]
let ng1_q2 = ng1.clientReconcile(ng2_q1.get(), haveHashes, needHashes)

check:
needHashes.len() == 1
haveHashes.len() == 0
ng1_q2.isOk()
needHashes[0] == msgHash2

check:
s1.erase(msg1.timestamp, msgHash).isOk()

asyncTest "sync 2 nodes different hashes":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic)

let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} =
debug "Received needHashes from peer:", len = hashes.len
for hash in hashes:
debug "Hash received from peer:", hash=hash.to0xHex()

let
server = await newTestWakuSync(serverSwitch, handler=protoHandler)
client = await newTestWakuSync(clientSwitch, handler=protoHandler)
server.ingessMessage(DefaultPubsubTopic, msg1)
client.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg2)

var hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 1
hashes.value[0] == computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2)
#Assuming message is fetched from peer
#[ client.ingessMessage(DefaultPubsubTopic, msg2)
sleep(1000)
hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 0 ]#

asyncTest "sync 2 nodes same hashes":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic)

let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} =
debug "Received needHashes from peer:", len = hashes.len
for hash in hashes:
debug "Hash received from peer:", hash=hash.to0xHex()

let
server = await newTestWakuSync(serverSwitch, handler=protoHandler)
client = await newTestWakuSync(clientSwitch, handler=protoHandler)
server.ingessMessage(DefaultPubsubTopic, msg1)
client.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg2)
client.ingessMessage(DefaultPubsubTopic, msg2)

let hashes = await client.sync(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value.len == 0
1 change: 1 addition & 0 deletions vendor/negentropy
Submodule negentropy added at 1a59da
25 changes: 25 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_sync,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
Expand Down Expand Up @@ -97,6 +98,7 @@ type
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuMetadata*: WakuMetadata
wakuSync*: WakuSync
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
Expand Down Expand Up @@ -182,6 +184,22 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source=source)

## Waku Sync

proc mountWakuSync*(node: WakuNode): Result[void, string] =
if not node.wakuSync.isNil():
return err("Waku sync already mounted, skipping")

let sync = WakuSync.new(node.peerManager)#TODO add the callback and the options

node.wakuSync = sync

let catchRes = catch: node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec))
if catchRes.isErr():
return err(catchRes.error.msg)

return ok()

## Waku Metadata

proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
Expand Down Expand Up @@ -237,10 +255,17 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuArchive.handleMessage(topic, msg)


proc syncHandler(topic: PubsubTopic, msg: WakuMessage) =
if node.wakuSync.isNil():
return

node.wakuSync.ingessMessage(topic, msg)

let defaultHandler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
syncHandler(topic, msg)

discard node.wakuRelay.subscribe(topic, defaultHandler)

Expand Down
10 changes: 10 additions & 0 deletions waku/waku_api/rest/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import
../../../waku_lightpush/common,
../../../waku_relay,
../../../waku_node,
../../../waku_sync,
../../../node/peer_manager,
../responses,
../serdes,
Expand Down Expand Up @@ -89,6 +90,15 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
connected: it.connectedness == Connectedness.Connected))
tuplesToWakuPeers(peers, lightpushPeers)

if not node.wakuSync.isNil():
# Map WakuStore peers to WakuPeers and add to return list
let syncPeers = node.peerManager.peerStore
.peers(WakuSyncCodec)
.mapIt((multiaddr: constructMultiaddrStr(it),
protocol: WakuSyncCodec,
connected: it.connectedness == Connectedness.Connected))
tuplesToWakuPeers(peers, syncPeers)

let resp = RestApiResponse.jsonResponse(peers, status=Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
Expand Down
10 changes: 10 additions & 0 deletions waku/waku_sync.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
./waku_sync/protocol

export
protocol
Loading

0 comments on commit a31d630

Please sign in to comment.