diff --git a/.gitmodules b/.gitmodules index 57ddea4d42..a2bc64ffbb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -169,3 +169,8 @@ url = https://github.com/nim-lang/db_connector.git ignore = untracked branch = master +[submodule "vendor/negentropy"] + ignore = untracked + path = vendor/negentropy + url = https://github.com/waku-org/negentropy.git + branch = master \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 2667af3968..8a5eee7624 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,6 +46,9 @@ RUN apk add --no-cache libgcc pcre-dev libpq-dev # Fix for 'Error loading shared library libpcre.so.3: No such file or directory' RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 +# Fix for 'Error loading shared library libnegentropy.so: No such file or directory' +ADD ./libnegentropy.so ./ + # Copy to separate location to accomodate different MAKE_TARGET values COPY --from=nim-build /app/build/$MAKE_TARGET /usr/local/bin/ diff --git a/Makefile b/Makefile index eeb44c71a5..7a6cb3af65 100644 --- a/Makefile +++ b/Makefile @@ -411,3 +411,22 @@ release-notes: sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, # asked here: https://github.com/bvieira/sv4git/discussions/101 + +###################### +### NEGENTROPY ### +###################### +.PHONY: negentropy + +## Pass libnegentropy to linker. +NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so + +deps: | negentropy + +clean: | negentropy-clean + +negentropy: + $(MAKE) -C vendor/negentropy/cpp && \ + cp vendor/negentropy/cpp/libnegentropy.so ./ +negentropy-clean: + $(MAKE) -C vendor/negentropy/cpp clean && \ + rm libnegentropy.so \ No newline at end of file diff --git a/docker/binaries/Dockerfile.bn.amd64 b/docker/binaries/Dockerfile.bn.amd64 index 8b810e3a10..d7fbbca667 100644 --- a/docker/binaries/Dockerfile.bn.amd64 +++ b/docker/binaries/Dockerfile.bn.amd64 @@ -19,6 +19,9 @@ RUN apt-get update &&\ # Fix for 'Error loading shared library libpcre.so.3: No such file or directory' RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 +# Fix for 'Error loading shared library libnegentropy.so: No such file or directory' +ADD ./libnegentropy.so ./ + # Copy to separate location to accomodate different MAKE_TARGET values ADD ./build/$MAKE_TARGET /usr/local/bin/ diff --git a/examples/publisher.nim b/examples/publisher.nim index e26a476edc..fa646536c9 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -46,9 +46,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get() ip = parseIpAddress("0.0.0.0") - flags = CapabilitiesBitfield.init( - lightpush = false, filter = false, store = false, relay = true - ) + flags = CapabilitiesBitfield.init(relay = true) var enrBuilder = EnrBuilder.init(nodeKey) diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 477f90982e..2cab3a731e 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -44,9 +44,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] ip = parseIpAddress("0.0.0.0") - flags = CapabilitiesBitfield.init( - lightpush = false, filter = false, store = false, relay = true - ) + flags = CapabilitiesBitfield.init(relay = true) var enrBuilder = EnrBuilder.init(nodeKey) diff --git a/tests/node/test_wakunode_sync.nim b/tests/node/test_wakunode_sync.nim new file mode 100644 index 0000000000..0ffb3a8a61 --- /dev/null +++ b/tests/node/test_wakunode_sync.nim @@ -0,0 +1,188 @@ +{.used.} + +import std/net, testutils/unittests, chronos, libp2p/crypto/crypto + +import + ../../waku/ + [node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync], + ../waku_store/store_utils, + ../waku_archive/archive_utils, + ../testlib/[wakucore, wakunode, testasync] + +suite "Store Sync - End to End": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + asyncSetup: + let timeOrigin = now() + + let messages = + @[ + fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)), + fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)), + fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)), + fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)), + fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)), + fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)), + fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)), + fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)), + fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)), + fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)), + ] + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) + client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) + + let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + + let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) + let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) + + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() + + await server.mountStore() + await client.mountStore() + + client.mountStoreClient() + server.mountStoreClient() + + let mountServerSync = await server.mountWakuSync( + maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds + ) + let mountClientSync = await client.mountWakuSync( + maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds + ) + + assert mountServerSync.isOk(), mountServerSync.error + assert mountClientSync.isOk(), mountClientSync.error + + # messages are retreived when mounting Waku sync + # but based on interval so this is needed for client only + for msg in messages: + client.wakuSync.messageIngress(DefaultPubsubTopic, msg) + + await allFutures(server.start(), client.start()) + + let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo() + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec) + server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec) + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec) + server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec) + + asyncTeardown: + # prevent premature channel shutdown + await sleepAsync(10.milliseconds) + + await allFutures(client.stop(), server.stop()) + + asyncTest "no message set differences": + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "client message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + client.wakuSync.messageIngress(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "server message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + server.wakuSync.messageIngress(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + +suite "Waku Sync - Pruning": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + asyncSetup: + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) + client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) + + let serverArchiveDriver = newSqliteArchiveDriver() + let clientArchiveDriver = newSqliteArchiveDriver() + + let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) + let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) + + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() + + await server.mountStore() + await client.mountStore() + + client.mountStoreClient() + server.mountStoreClient() + + let mountServerSync = await server.mountWakuSync( + maxFrameSize = 0, + relayJitter = 0.seconds, + syncRange = 1.hours, + syncInterval = 5.minutes, + ) + let mountClientSync = await client.mountWakuSync( + maxFrameSize = 0, + syncRange = 10.milliseconds, + syncInterval = 10.milliseconds, + relayJitter = 0.seconds, + ) + + assert mountServerSync.isOk(), mountServerSync.error + assert mountClientSync.isOk(), mountClientSync.error + + await allFutures(server.start(), client.start()) + + asyncTeardown: + await sleepAsync(10.milliseconds) + + await allFutures(client.stop(), server.stop()) + + asyncTest "pruning": + for _ in 0 ..< 4: + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + client.wakuSync.messageIngress(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + server.wakuSync.messageIngress(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + await sleepAsync(10.milliseconds) + + check: + client.wakuSync.storageSize() == 10 + server.wakuSync.storageSize() == 40 diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim new file mode 100644 index 0000000000..be0e44d7e5 --- /dev/null +++ b/tests/waku_sync/sync_utils.nim @@ -0,0 +1,37 @@ +{.used.} + +import std/options, chronos, chronicles, libp2p/crypto/crypto + +import waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore + +proc newTestWakuSync*( + switch: Switch, + transfer: Option[TransferCallback] = none(TransferCallback), + prune: Option[PruneCallback] = none(PruneCallback), + interval: Duration = DefaultSyncInterval, +): Future[WakuSync] {.async.} = + let peerManager = PeerManager.new(switch) + + let fakePruneCallback = proc( + pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure.} = + return ok((@[], none(WakuMessageHash))) + + let res = await WakuSync.new( + peerManager = peerManager, + relayJitter = 0.seconds, + syncInterval = interval, + wakuArchive = nil, + wakuStoreClient = nil, + pruneCallback = some(fakePruneCallback), + transferCallback = none(TransferCallback), + ) + + let proto = res.get() + + proto.start() + switch.mount(proto) + + return proto diff --git a/tests/waku_sync/test_all.nim b/tests/waku_sync/test_all.nim new file mode 100644 index 0000000000..b5801e4acb --- /dev/null +++ b/tests/waku_sync/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_protocol, ./test_bindings diff --git a/tests/waku_sync/test_bindings.nim b/tests/waku_sync/test_bindings.nim new file mode 100644 index 0000000000..f2099ea50a --- /dev/null +++ b/tests/waku_sync/test_bindings.nim @@ -0,0 +1,141 @@ +{.used.} + +import std/[options], testutils/unittests, chronos, libp2p/crypto/crypto, std/random + +import + ../../waku/ + [node/peer_manager, waku_core, waku_core/message/digest, waku_sync/raw_bindings], + ../testlib/[wakucore], + ./sync_utils + +random.randomize() + +#TODO clean this up + +suite "Bindings": + var storage {.threadvar.}: NegentropyStorage + var messages {.threadvar.}: seq[(WakuMessageHash, WakuMessage)] + + setup: + let storageRes = NegentropyStorage.new() + assert storageRes.isOk(), $storageRes.error + storage = storageRes.get() + + messages = @[] + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + let hash = computeMessageHash(DefaultPubsubTopic, msg) + messages.add((hash, msg)) + + teardown: + storage.delete() + + test "storage insert": + check: + storage.len() == 0 + + let insRes = storage.insert(messages[0][1].timestamp, messages[0][0]) + + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 1 + + test "storage erase": + let insRes = storage.insert(messages[0][1].timestamp, messages[0][0]) + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 1 + + var delRes = storage.erase(messages[0][1].timestamp, messages[0][0]) + assert delRes.isOk() + + check: + storage.len() == 0 + + delRes = storage.erase(messages[0][1].timestamp, messages[0][0]) + assert delRes.isErr() + + check: + storage.len() == 0 + + test "subrange": + for (hash, msg) in messages: + let insRes = storage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 10 + + let subrangeRes = NegentropySubRangeStorage.new(storage) + assert subrangeRes.isOk(), subrangeRes.error + let subrange = subrangeRes.get() + + check: + subrange.len() == 10 + + #[ test "storage memory size": + for (hash, msg) in messages: + let insRes = storage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + check: + storage.len() == 10 + + for (hash, msg) in messages: + let delRes = storage.erase(msg.timestamp, hash) + assert delRes.isOk(), $delRes.error + + check: + storage.len() == 0 + + #TODO validate that the occupied memory didn't grow. ]# + + test "reconcile server differences": + for (hash, msg) in messages: + let insRes = storage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + let clientNegentropyRes = Negentropy.new(storage, 0) + + let storageRes = NegentropyStorage.new() + assert storageRes.isOk(), $storageRes.error + let serverStorage = storageRes.get() + + for (hash, msg) in messages: + let insRes = serverStorage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + # the extra msg + let msg = fakeWakuMessage() + let hash = computeMessageHash(DefaultPubsubTopic, msg) + let insRes = serverStorage.insert(msg.timestamp, hash) + assert insRes.isOk(), $insRes.error + + let serverNegentropyRes = Negentropy.new(serverStorage, 0) + + assert clientNegentropyRes.isOk(), $clientNegentropyRes.error + assert serverNegentropyRes.isOk(), $serverNegentropyRes.error + + let clientNegentropy = clientNegentropyRes.get() + let serverNegentropy = serverNegentropyRes.get() + + let initRes = clientNegentropy.initiate() + assert initRes.isOk(), $initRes.error + let init = initRes.get() + + let reconRes = serverNegentropy.serverReconcile(init) + assert reconRes.isOk(), $reconRes.error + let srecon = reconRes.get() + + var + haves: seq[WakuMessageHash] + needs: seq[WakuMessageHash] + let creconRes = clientNegentropy.clientReconcile(srecon, haves, needs) + assert creconRes.isOk(), $creconRes.error + let reconOpt = creconRes.get() + + check: + reconOpt.isNone() + needs[0] == hash diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim new file mode 100644 index 0000000000..c203471fb2 --- /dev/null +++ b/tests/waku_sync/test_protocol.nim @@ -0,0 +1,374 @@ +{.used.} + +import + std/[options, sets], + testutils/unittests, + chronos, + chronicles, + libp2p/crypto/crypto, + stew/byteutils, + std/random + +import + ../../waku/[ + node/peer_manager, + waku_core, + waku_core/message/digest, + waku_sync, + waku_sync/raw_bindings, + ], + ../testlib/[wakucore, testasync], + ./sync_utils + +random.randomize() + +suite "Waku Sync": + var serverSwitch {.threadvar.}: Switch + var clientSwitch {.threadvar.}: Switch + + var server {.threadvar.}: WakuSync + var client {.threadvar.}: WakuSync + + var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo] + + asyncSetup: + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + server = await newTestWakuSync(serverSwitch) + client = await newTestWakuSync(clientSwitch) + + serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo()) + + asyncTeardown: + await sleepAsync(10.milliseconds) + + await allFutures(server.stop(), client.stop()) + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + suite "Protocol": + asyncTest "sync 2 nodes both empty": + let hashes = await client.storeSynchronization(serverPeerInfo) + assert hashes.isOk(), hashes.error + check: + hashes.value[0].len == 0 + + asyncTest "sync 2 nodes empty client full server": + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) + + server.messageIngress(DefaultPubsubTopic, msg1) + server.messageIngress(DefaultPubsubTopic, msg2) + server.messageIngress(DefaultPubsubTopic, msg3) + + var hashes = await client.storeSynchronization(serverPeerInfo) + + assert hashes.isOk(), hashes.error + check: + hashes.value[0].len == 3 + computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0] + computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0] + computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0] + + asyncTest "sync 2 nodes full client empty server": + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) + + client.messageIngress(DefaultPubsubTopic, msg1) + client.messageIngress(DefaultPubsubTopic, msg2) + client.messageIngress(DefaultPubsubTopic, msg3) + + var hashes = await client.storeSynchronization(serverPeerInfo) + assert hashes.isOk(), hashes.error + check: + hashes.value[0].len == 0 + + asyncTest "sync 2 nodes different hashes": + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + + server.messageIngress(DefaultPubsubTopic, msg1) + client.messageIngress(DefaultPubsubTopic, msg1) + server.messageIngress(DefaultPubsubTopic, msg2) + + var syncRes = await client.storeSynchronization(serverPeerInfo) + + check: + syncRes.isOk() + + var hashes = syncRes.get() + + check: + hashes[0].len == 1 + hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + + #Assuming message is fetched from peer + client.messageIngress(DefaultPubsubTopic, msg2) + + syncRes = await client.storeSynchronization(serverPeerInfo) + + check: + syncRes.isOk() + + hashes = syncRes.get() + + check: + hashes[0].len == 0 + + asyncTest "sync 2 nodes same hashes": + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + + server.messageIngress(DefaultPubsubTopic, msg1) + client.messageIngress(DefaultPubsubTopic, msg1) + server.messageIngress(DefaultPubsubTopic, msg2) + client.messageIngress(DefaultPubsubTopic, msg2) + + let hashes = await client.storeSynchronization(serverPeerInfo) + assert hashes.isOk(), $hashes.error + check: + hashes.value[0].len == 0 + + asyncTest "sync 2 nodes 100K msgs": + var i = 0 + let msgCount = 100000 + var diffIndex = rand(msgCount) + var diffMsg: WakuMessage + while i < msgCount: + let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) + if i != diffIndex: + client.messageIngress(DefaultPubsubTopic, msg) + else: + diffMsg = msg + server.messageIngress(DefaultPubsubTopic, msg) + i += 1 + + let hashes = await client.storeSynchronization(serverPeerInfo) + assert hashes.isOk(), $hashes.error + + check: + hashes.value[0].len == 1 + hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg) + + asyncTest "sync 2 nodes 100K msgs 10K diffs": + var i = 0 + let msgCount = 100000 + var diffCount = 10000 + + var diffMsgHashes: seq[WakuMessageHash] + var randIndexes: seq[int] + while i < diffCount: + let randInt = rand(msgCount) + if randInt in randIndexes: + continue + randIndexes.add(randInt) + i += 1 + + i = 0 + var tmpDiffCnt = diffCount + while i < msgCount: + let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) + if tmpDiffCnt > 0 and i in randIndexes: + diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg)) + tmpDiffCnt = tmpDiffCnt - 1 + else: + client.messageIngress(DefaultPubsubTopic, msg) + + server.messageIngress(DefaultPubsubTopic, msg) + i += 1 + + let hashes = await client.storeSynchronization(serverPeerInfo) + assert hashes.isOk(), $hashes.error + + check: + hashes.value[0].len == diffCount + toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes) + + asyncTest "sync 3 nodes 2 client 1 server": + ## Setup + let client2Switch = newTestSwitch() + await client2Switch.start() + let client2 = await newTestWakuSync(client2Switch) + + let msgCount = 10000 + var i = 0 + + while i < msgCount: + i += 1 + let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) + if i mod 2 == 0: + client2.messageIngress(DefaultPubsubTopic, msg) + else: + client.messageIngress(DefaultPubsubTopic, msg) + server.messageIngress(DefaultPubsubTopic, msg) + + let fut1 = client.storeSynchronization(serverPeerInfo) + let fut2 = client2.storeSynchronization(serverPeerInfo) + waitFor allFutures(fut1, fut2) + + let hashes1 = fut1.read() + let hashes2 = fut2.read() + + assert hashes1.isOk(), $hashes1.error + assert hashes2.isOk(), $hashes2.error + + check: + hashes1.value[0].len == int(msgCount / 2) + hashes2.value[0].len == int(msgCount / 2) + + await client2.stop() + await client2Switch.stop() + + asyncTest "sync 6 nodes varying sync diffs": + ## Setup + let + client2Switch = newTestSwitch() + client3Switch = newTestSwitch() + client4Switch = newTestSwitch() + client5Switch = newTestSwitch() + + await allFutures( + client2Switch.start(), + client3Switch.start(), + client4Switch.start(), + client5Switch.start(), + ) + + let + client2 = await newTestWakuSync(client2Switch) + client3 = await newTestWakuSync(client3Switch) + client4 = await newTestWakuSync(client4Switch) + client5 = await newTestWakuSync(client5Switch) + + let msgCount = 100000 + var i = 0 + + while i < msgCount: + let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) + if i < msgCount - 1: + client.messageIngress(DefaultPubsubTopic, msg) + if i < msgCount - 10: + client2.messageIngress(DefaultPubsubTopic, msg) + if i < msgCount - 100: + client3.messageIngress(DefaultPubsubTopic, msg) + if i < msgCount - 1000: + client4.messageIngress(DefaultPubsubTopic, msg) + if i < msgCount - 10000: + client5.messageIngress(DefaultPubsubTopic, msg) + server.messageIngress(DefaultPubsubTopic, msg) + i += 1 + + var timeBefore = getNowInNanosecondTime() + let hashes1 = await client.storeSynchronization(serverPeerInfo) + var timeAfter = getNowInNanosecondTime() + var syncTime = (timeAfter - timeBefore) + debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime + assert hashes1.isOk(), $hashes1.error + check: + hashes1.value[0].len == 1 + + timeBefore = getNowInNanosecondTime() + let hashes2 = await client2.storeSynchronization(serverPeerInfo) + timeAfter = getNowInNanosecondTime() + syncTime = (timeAfter - timeBefore) + debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime + assert hashes2.isOk(), $hashes2.error + check: + hashes2.value[0].len == 10 + + timeBefore = getNowInNanosecondTime() + let hashes3 = await client3.storeSynchronization(serverPeerInfo) + timeAfter = getNowInNanosecondTime() + syncTime = (timeAfter - timeBefore) + debug "sync time in seconds", + msgsTotal = msgCount, diff = 100, syncTime = syncTime + assert hashes3.isOk(), $hashes3.error + check: + hashes3.value[0].len == 100 + + timeBefore = getNowInNanosecondTime() + let hashes4 = await client4.storeSynchronization(serverPeerInfo) + timeAfter = getNowInNanosecondTime() + syncTime = (timeAfter - timeBefore) + debug "sync time in seconds", + msgsTotal = msgCount, diff = 1000, syncTime = syncTime + assert hashes4.isOk(), $hashes4.error + check: + hashes4.value[0].len == 1000 + + timeBefore = getNowInNanosecondTime() + let hashes5 = await client5.storeSynchronization(serverPeerInfo) + timeAfter = getNowInNanosecondTime() + syncTime = (timeAfter - timeBefore) + debug "sync time in seconds", + msgsTotal = msgCount, diff = 10000, syncTime = syncTime + assert hashes5.isOk(), $hashes5.error + check: + hashes5.value[0].len == 10000 + + await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop()) + await allFutures( + client2Switch.stop(), + client3Switch.stop(), + client4Switch.stop(), + client5Switch.stop(), + ) + + asyncTest "sync 3 nodes cyclic": + let + node1Switch = newTestSwitch() + node2Switch = newTestSwitch() + node3Switch = newTestSwitch() + + await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start()) + + let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo()) + let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo()) + let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo()) + + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let hash1 = computeMessageHash(DefaultPubsubTopic, msg1) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let hash2 = computeMessageHash(DefaultPubsubTopic, msg2) + let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let hash3 = computeMessageHash(DefaultPubsubTopic, msg3) + + let + node1 = await newTestWakuSync(node1Switch) + node2 = await newTestWakuSync(node2Switch) + node3 = await newTestWakuSync(node3Switch) + + node1.messageIngress(DefaultPubsubTopic, msg1) + node2.messageIngress(DefaultPubsubTopic, msg1) + node2.messageIngress(DefaultPubsubTopic, msg2) + node3.messageIngress(DefaultPubsubTopic, msg3) + + let f1 = node1.storeSynchronization(node2PeerInfo) + let f2 = node2.storeSynchronization(node3PeerInfo) + let f3 = node3.storeSynchronization(node1PeerInfo) + + waitFor allFutures(f1, f2, f3) + + let hashes1 = f1.read() + let hashes2 = f2.read() + let hashes3 = f3.read() + + assert hashes1.isOk(), hashes1.error + assert hashes2.isOk(), hashes2.error + assert hashes3.isOk(), hashes3.error + + check: + hashes1.get()[0].len == 1 + hashes2.get()[0].len == 1 + hashes3.get()[0].len == 1 + + hashes1.get()[0][0] == hash2 + hashes2.get()[0][0] == hash3 + hashes3.get()[0][0] == hash1 + + await allFutures(node1.stop(), node2.stop(), node3.stop()) + await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop()) diff --git a/vendor/negentropy b/vendor/negentropy new file mode 160000 index 0000000000..311a21a22b --- /dev/null +++ b/vendor/negentropy @@ -0,0 +1 @@ +Subproject commit 311a21a22bdb6d80e5c4ba5e3d2f550e0062b2cb diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 4556eb56a2..6995a9a8ea 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -385,6 +385,40 @@ type WakuNodeConf* = object name: "store-resume" .}: bool + ## Sync config + storeSync* {. + desc: "Enable store sync protocol: true|false", + defaultValue: false, + name: "store-sync" + .}: bool + + storeSyncInterval* {. + desc: "Interval between store sync attempts. In seconds.", + defaultValue: 3600, # 1 hours + name: "store-sync-interval" + .}: int64 + + storeSyncRange* {. + desc: "Amount of time to sync. In seconds.", + defaultValue: 300, # 5 minutes + name: "store-sync-range" + .}: int64 + + storeSyncRelayJitter* {. + hidden, + desc: "Time offset to account for message propagation jitter. In seconds.", + defaultValue: 20, + name: "store-sync-relay-jitter" + .}: int64 + + storeSyncMaxPayloadSize* {. + hidden, + desc: + "Max size in bytes of the inner negentropy payload. Cannot be less than 5K, 0 is unlimited.", + defaultValue: 0, + name: "store-sync-max-payload-size" + .}: int64 + ## Filter config filter* {. desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter" diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index c284982310..0d2b6ef5b3 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -142,6 +142,7 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul filter = conf.filter, store = conf.store, relay = conf.relay, + sync = conf.storeSync, ) # Resolve and use DNS domain IP diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index ec26480bbb..91407e9d89 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -295,6 +295,17 @@ proc setupProtocols( if conf.store and conf.storeResume: node.setupStoreResume() + if conf.storeSync: + ( + await node.mountWakuSync( + int(conf.storeSyncMaxPayloadSize), + conf.storeSyncRange.seconds(), + conf.storeSyncInterval.seconds(), + conf.storeSyncRelayJitter.seconds(), + ) + ).isOkOr: + return err("failed to mount waku sync protocol: " & $error) + # NOTE Must be mounted after relay if conf.lightpush: try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index b60e2c8185..d4e5ff74e6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -39,6 +39,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_sync, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -104,6 +105,7 @@ type wakuPeerExchange*: WakuPeerExchange wakuMetadata*: WakuMetadata wakuSharding*: Sharding + wakuSync*: WakuSync enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -194,6 +196,42 @@ proc connectToNodes*( # NOTE Connects to the node without a give protocol, which automatically creates streams for relay await peer_manager.connectToNodes(node.peerManager, nodes, source = source) +## Waku Sync + +proc mountWakuSync*( + node: WakuNode, + maxFrameSize: int = DefaultMaxFrameSize, + syncRange: timer.Duration = DefaultSyncRange, + syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: Duration = DefaultGossipSubJitter, +): Future[Result[void, string]] {.async.} = + if not node.wakuSync.isNil(): + return err("already mounted") + + node.wakuSync = ( + await WakuSync.new( + peerManager = node.peerManager, + maxFrameSize = maxFrameSize, + syncRange = syncRange, + syncInterval = syncInterval, + relayJitter = relayJitter, + wakuArchive = node.wakuArchive, + wakuStoreClient = node.wakuStoreClient, + ) + ).valueOr: + return err("initialization failed: " & error) + + let catchable = catch: + node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) + + if catchable.isErr(): + return err("switch mounting failed: " & catchable.error.msg) + + if node.started: + node.wakuSync.start() + + return ok() + ## Waku Metadata proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = @@ -258,12 +296,19 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) + proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async.} = + if node.wakuSync.isNil(): + return + + node.wakuSync.messageIngress(topic, msg) + let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) + await syncHandler(topic, msg) discard node.wakuRelay.subscribe(topic, defaultHandler) @@ -1286,6 +1331,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() + if not node.wakuSync.isNil(): + node.wakuSync.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 435d19de25..bf73bf0168 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -16,6 +16,7 @@ import ../../../waku_relay, ../../../waku_peer_exchange, ../../../waku_node, + ../../../waku_sync, ../../../node/peer_manager, ../responses, ../serdes, @@ -101,6 +102,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, pxPeers) + if not node.wakuSync.isNil(): + # Map WakuSync peers to WakuPeers and add to return list + let syncPeers = node.peerManager.peerStore.peers(WakuSyncCodec).mapIt( + ( + multiaddr: constructMultiaddrStr(it), + protocol: WakuSyncCodec, + connected: it.connectedness == Connectedness.Connected, + origin: it.origin, + ) + ) + tuplesToWakuPeers(peers, syncPeers) + let resp = RestApiResponse.jsonResponse(peers, status = Http200) if resp.isErr(): error "An error ocurred while building the json respose: ", error = resp.error diff --git a/waku/waku_core/time.nim b/waku/waku_core/time.nim index c8aa563558..43bb0be489 100644 --- a/waku/waku_core/time.nim +++ b/waku/waku_core/time.nim @@ -33,3 +33,16 @@ template nanosecondTime*(collector: Gauge, body: untyped) = metrics.set(collector, nowInUnixFloat() - start) else: body + +# Unused yet. Kept for future use in Waku Sync. +#[ proc timestampInSeconds*(time: Timestamp): Timestamp = + let timeStr = $time + var timestamp: Timestamp = time + + if timeStr.len() > 16: + timestamp = Timestamp(time div Timestamp(1_000_000_000)) + elif timeStr.len() < 16 and timeStr.len() > 13: + timestamp = Timestamp(time div Timestamp(1_000_000)) + elif timeStr.len() > 10: + timestamp = Timestamp(time div Timestamp(1000)) + return timestamp ]# diff --git a/waku/waku_enr/capabilities.nim b/waku/waku_enr/capabilities.nim index caec9d969b..b6492dda11 100644 --- a/waku/waku_enr/capabilities.nim +++ b/waku/waku_enr/capabilities.nim @@ -18,8 +18,11 @@ type Store = 1 Filter = 2 Lightpush = 3 + Sync = 4 -func init*(T: type CapabilitiesBitfield, lightpush, filter, store, relay: bool): T = +func init*( + T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false +): T = ## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/) var bitfield: uint8 if relay: @@ -30,6 +33,8 @@ func init*(T: type CapabilitiesBitfield, lightpush, filter, store, relay: bool): bitfield.setBit(2) if lightpush: bitfield.setBit(3) + if sync: + bitfield.setBit(4) CapabilitiesBitfield(bitfield) func init*(T: type CapabilitiesBitfield, caps: varargs[Capabilities]): T = diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index aa3f6629dd..d6d53a9c7c 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -48,7 +48,7 @@ proc sendStoreRequest( return ok(res) proc query*( - self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo + self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId ): Future[StoreQueryResult] {.async, gcsafe.} = if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) diff --git a/waku/waku_sync.nim b/waku/waku_sync.nim new file mode 100644 index 0000000000..61cb2df4e4 --- /dev/null +++ b/waku/waku_sync.nim @@ -0,0 +1,5 @@ +{.push raises: [].} + +import ./waku_sync/protocol, ./waku_sync/common + +export common, protocol diff --git a/waku/waku_sync/codec.nim b/waku/waku_sync/codec.nim new file mode 100644 index 0000000000..6e3d9bd63b --- /dev/null +++ b/waku/waku_sync/codec.nim @@ -0,0 +1,57 @@ +{.push raises: [].} + +import std/options, stew/arrayops +import ../common/protobuf, ../waku_core, ./common + +proc encode*(req: SyncPayload): ProtoBuffer = + var pb = initProtoBuffer() + + if req.syncRange.isSome(): + pb.write3(31, req.syncRange.get()[0]) + pb.write3(32, req.syncRange.get()[1]) + + if req.frameSize.isSome(): + pb.write3(33, req.frameSize.get()) + + if req.negentropy.len > 0: + pb.write3(1, req.negentropy) + + if req.hashes.len > 0: + for hash in req.hashes: + pb.write3(20, hash) + + return pb + +proc decode*(T: type SyncPayload, buffer: seq[byte]): ProtobufResult[T] = + var req = SyncPayload() + let pb = initProtoBuffer(buffer) + + var rangeStart: uint64 + var rangeEnd: uint64 + if ?pb.getField(31, rangeStart) and ?pb.getField(32, rangeEnd): + req.syncRange = some((rangeStart, rangeEnd)) + else: + req.syncRange = none((uint64, uint64)) + + var frame: uint64 + if ?pb.getField(33, frame): + req.frameSize = some(frame) + else: + req.frameSize = none(uint64) + + var negentropy: seq[byte] + if ?pb.getField(1, negentropy): + req.negentropy = negentropy + else: + req.negentropy = @[] + + var buffer: seq[seq[byte]] + if not ?pb.getRepeatedField(20, buffer): + req.hashes = @[] + else: + req.hashes = newSeqOfCap[WakuMessageHash](buffer.len) + for buf in buffer: + let msg: WakuMessageHash = fromBytes(buf) + req.hashes.add(msg) + + return ok(req) diff --git a/waku/waku_sync/common.nim b/waku/waku_sync/common.nim new file mode 100644 index 0000000000..4796ebf79d --- /dev/null +++ b/waku/waku_sync/common.nim @@ -0,0 +1,32 @@ +{.push raises: [].} + +import std/[options], chronos, libp2p/peerId +import ../waku_core + +const + DefaultSyncInterval*: Duration = 5.minutes + DefaultSyncRange*: Duration = 1.hours + RetryDelay*: Duration = 30.seconds + WakuSyncCodec* = "/vac/waku/sync/1.0.0" + DefaultMaxFrameSize* = 1048576 # 1 MiB + DefaultGossipSubJitter*: Duration = 20.seconds + +type + TransferCallback* = proc( + hashes: seq[WakuMessageHash], peerId: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} + + PruneCallback* = proc( + startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash) + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure.} + + SyncPayload* = object + syncRange*: Option[(uint64, uint64)] + + frameSize*: Option[uint64] + + negentropy*: seq[byte] # negentropy protocol payload + + hashes*: seq[WakuMessageHash] diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim new file mode 100644 index 0000000000..0a5e6e49d8 --- /dev/null +++ b/waku/waku_sync/protocol.nim @@ -0,0 +1,520 @@ +{.push raises: [].} + +import + std/[options, sugar, sequtils], + stew/byteutils, + results, + chronicles, + chronos, + metrics, + libp2p/utility, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/enr, + ../waku_core, + ../waku_archive, + ../waku_store/[client, common], + ../waku_enr, + ../node/peer_manager/peer_manager, + ./raw_bindings, + ./common, + ./session + +logScope: + topics = "waku sync" + +type WakuSync* = ref object of LPProtocol + storage: NegentropyStorage + maxFrameSize: int # Negentropy param to limit the size of payloads + + peerManager: PeerManager + + syncInterval: timer.Duration # Time between each syncronisation attempt + syncRange: timer.Duration # Amount of time in the past to sync + relayJitter: Duration # Amount of time since the present to ignore when syncing + transferCallBack: Option[TransferCallback] # Callback for message transfers. + + pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query + pruneStart: Timestamp # Last pruning start timestamp + pruneOffset: timer.Duration # Offset to prune a bit more than necessary. + + periodicSyncFut: Future[void] + periodicPruneFut: Future[void] + +proc storageSize*(self: WakuSync): int = + self.storage.len + +proc messageIngress*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = + if msg.ephemeral: + return + + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) + + trace "inserting message into waku sync storage ", + msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp + + self.storage.insert(msg.timestamp, msgHash).isOkOr: + error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = $error + +proc messageIngress*( + self: WakuSync, pubsubTopic: PubsubTopic, msgHash: WakuMessageHash, msg: WakuMessage +) = + if msg.ephemeral: + return + + trace "inserting message into waku sync storage ", + msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp + + if self.storage.insert(msg.timestamp, msgHash).isErr(): + error "failed to insert message ", msg_hash = msgHash.to0xHex() + +proc calculateRange( + jitter: Duration = 20.seconds, syncRange: Duration = 1.hours +): (int64, int64) = + ## Calculates the start and end time of a sync session + + var now = getNowInNanosecondTime() + + # Because of message jitter inherent to Relay protocol + now -= jitter.nanos + + let syncRange = syncRange.nanos + + let syncStart = now - syncRange + let syncEnd = now + + return (syncStart, syncEnd) + +proc request( + self: WakuSync, conn: Connection +): Future[Result[seq[WakuMessageHash], string]] {.async.} = + let (syncStart, syncEnd) = calculateRange(self.relayJitter) + + let initialized = + ?clientInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd) + + debug "sync session initialized", + client = self.peerManager.switch.peerInfo.peerId, + server = conn.peerId, + frameSize = self.maxFrameSize, + timeStart = syncStart, + timeEnd = syncEnd + + var hashes: seq[WakuMessageHash] + var reconciled = initialized + + while true: + let sent = ?await reconciled.send() + + trace "sync payload sent", + client = self.peerManager.switch.peerInfo.peerId, + server = conn.peerId, + payload = reconciled.payload + + let received = ?await sent.listenBack() + + trace "sync payload received", + client = self.peerManager.switch.peerInfo.peerId, + server = conn.peerId, + payload = received.payload + + reconciled = (?received.clientReconcile(hashes)).valueOr: + let completed = error # Result[Reconciled, Completed] + + ?await completed.clientTerminate() + + debug "sync session ended gracefully", + client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId + + return ok(hashes) + + continue + +proc storeSynchronization*( + self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) +): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async.} = + let peer = peerInfo.valueOr: + self.peerManager.selectPeer(WakuSyncCodec).valueOr: + return err("No suitable peer found for sync") + + let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) + + let conn: Connection = connOpt.valueOr: + return err("Cannot establish sync connection") + + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + error "sync session ended", + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error + + return err("Sync request error: " & error) + + return ok((hashes, peer)) + +proc handleSyncSession( + self: WakuSync, conn: Connection +): Future[Result[seq[WakuMessageHash], string]] {.async.} = + let (syncStart, syncEnd) = calculateRange(self.relayJitter) + + let initialized = + ?serverInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd) + + var sent = initialized + + while true: + let received = ?await sent.listenBack() + + trace "sync payload received", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + payload = received.payload + + let reconciled = (?received.serverReconcile()).valueOr: + let completed = error # Result[Reconciled, Completed] + + let hashes = await completed.serverTerminate() + + return ok(hashes) + + sent = ?await reconciled.send() + + trace "sync payload sent", + server = self.peerManager.switch.peerInfo.peerId, + client = conn.peerId, + payload = reconciled.payload + + continue + +proc initProtocolHandler(self: WakuSync) = + proc handle(conn: Connection, proto: string) {.async, closure.} = + debug "sync session requested", + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId + + let hashes = (await self.handleSyncSession(conn)).valueOr: + debug "sync session ended", + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error + + #TODO send error code and desc to client + return + + if hashes.len > 0 and self.transferCallBack.isSome(): + let callback = self.transferCallBack.get() + + (await callback(hashes, conn.peerId)).isOkOr: + error "transfer callback failed", error = $error + + debug "sync session ended gracefully", + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId + + self.handler = handle + self.codec = WakuSyncCodec + +proc createPruneCallback( + self: WakuSync, wakuArchive: WakuArchive +): Result[PruneCallBack, string] = + if wakuArchive.isNil(): + return err ("waku archive unavailable") + + let callback: PruneCallback = proc( + pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure.} = + let archiveCursor = + if cursor.isSome(): + some(cursor.get()) + else: + none(ArchiveCursor) + + let query = ArchiveQuery( + includeData: true, + cursor: archiveCursor, + startTime: some(pruneStart), + endTime: some(pruneStop), + pageSize: 100, + ) + + let catchable = catch: + await wakuArchive.findMessages(query) + + if catchable.isErr(): + return err("archive error: " & catchable.error.msg) + + let res = catchable.get() + let response = res.valueOr: + return err("archive error: " & $error) + + let elements = collect(newSeq): + for (hash, msg) in response.hashes.zip(response.messages): + (hash, msg.timestamp) + + let cursor = response.cursor + + return ok((elements, cursor)) + + return ok(callback) + +proc createTransferCallback( + self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient +): Result[TransferCallback, string] = + if wakuArchive.isNil(): + return err("waku archive unavailable") + + if wakuStoreClient.isNil(): + return err("waku store client unavailable") + + let callback: TransferCallback = proc( + hashes: seq[WakuMessageHash], peerId: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} = + var query = StoreQueryRequest() + query.includeData = true + query.messageHashes = hashes + query.paginationLimit = some(uint64(100)) + + while true: + let catchable = catch: + await wakuStoreClient.query(query, peerId) + + if catchable.isErr(): + return err("store client error: " & catchable.error.msg) + + let res = catchable.get() + let response = res.valueOr: + return err("store client error: " & $error) + + query.paginationCursor = response.paginationCursor + + for kv in response.messages: + let handleRes = catch: + await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) + + if handleRes.isErr(): + error "message transfer failed", error = handleRes.error.msg + # Messages can be synced next time since they are not added to storage yet. + continue + + self.messageIngress(kv.pubsubTopic.get(), kv.messageHash, kv.message.get()) + + if query.paginationCursor.isNone(): + break + + return ok() + + return ok(callback) + +proc initFillStorage( + self: WakuSync, wakuArchive: WakuArchive +): Future[Result[void, string]] {.async.} = + if wakuArchive.isNil(): + return err("waku archive unavailable") + + let endTime = getNowInNanosecondTime() + let starTime = endTime - self.syncRange.nanos + + var query = ArchiveQuery( + includeData: true, + cursor: none(ArchiveCursor), + startTime: some(starTime), + endTime: some(endTime), + pageSize: 100, + ) + + while true: + let response = (await wakuArchive.findMessages(query)).valueOr: + return err($error) + + for i in 0 ..< response.hashes.len: + let hash = response.hashes[i] + let topic = response.topics[i] + let msg = response.messages[i] + + self.messageIngress(topic, hash, msg) + + if response.cursor.isNone(): + break + + query.cursor = response.cursor + + return ok() + +proc new*( + T: type WakuSync, + peerManager: PeerManager, + maxFrameSize: int = DefaultMaxFrameSize, + syncRange: timer.Duration = DefaultSyncRange, + syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: Duration = DefaultGossipSubJitter, + wakuArchive: WakuArchive, + wakuStoreClient: WakuStoreClient, + pruneCallback: Option[PruneCallback] = none(PruneCallback), + transferCallback: Option[TransferCallback] = none(TransferCallback), +): Future[Result[T, string]] {.async.} = + let storage = NegentropyStorage.new().valueOr: + return err("negentropy storage creation failed") + + var sync = WakuSync( + storage: storage, + peerManager: peerManager, + maxFrameSize: maxFrameSize, + syncInterval: syncInterval, + syncRange: syncRange, + relayJitter: relayJitter, + pruneOffset: syncInterval div 100, + ) + + sync.initProtocolHandler() + + sync.pruneCallBack = pruneCallback + + if sync.pruneCallBack.isNone(): + let res = sync.createPruneCallback(wakuArchive) + + if res.isErr(): + error "pruning callback creation error", error = res.error + else: + sync.pruneCallBack = some(res.get()) + + sync.transferCallBack = transferCallback + + if sync.transferCallBack.isNone(): + let res = sync.createTransferCallback(wakuArchive, wakuStoreClient) + + if res.isErr(): + error "transfer callback creation error", error = res.error + else: + sync.transferCallBack = some(res.get()) + + let res = await sync.initFillStorage(wakuArchive) + if res.isErr(): + warn "will not sync messages before this point in time", error = res.error + + info "WakuSync protocol initialized" + + return ok(sync) + +proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} = + debug "periodic sync initialized", interval = $self.syncInterval + + while true: # infinite loop + await sleepAsync(self.syncInterval) + + debug "periodic sync started" + + var + hashes: seq[WakuMessageHash] + peer: RemotePeerInfo + tries = 3 + + while true: + let res = (await self.storeSynchronization()).valueOr: + # we either try again or log an error and break + if tries > 0: + tries -= 1 + await sleepAsync(RetryDelay) + continue + else: + error "sync failed", error = $error + break + + hashes = res[0] + peer = res[1] + break + + if hashes.len > 0: + tries = 3 + while true: + (await callback(hashes, peer.peerId)).isOkOr: + # we either try again or log an error and break + if tries > 0: + tries -= 1 + await sleepAsync(RetryDelay) + continue + else: + error "transfer callback failed", error = $error + break + + break + + debug "periodic sync done", hashSynced = hashes.len + + continue + +proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} = + debug "periodic prune initialized", interval = $self.syncInterval + + # Default T minus 60m + self.pruneStart = getNowInNanosecondTime() - self.syncRange.nanos + + await sleepAsync(self.syncInterval) + + # Default T minus 55m + var pruneStop = getNowInNanosecondTime() - self.syncRange.nanos + + while true: # infinite loop + await sleepAsync(self.syncInterval) + + debug "periodic prune started", + startTime = self.pruneStart - self.pruneOffset.nanos, + endTime = pruneStop, + storageSize = self.storage.len + + var (elements, cursor) = + (newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash)) + + var tries = 3 + while true: + (elements, cursor) = ( + await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor) + ).valueOr: + # we either try again or log an error and break + if tries > 0: + tries -= 1 + await sleepAsync(RetryDelay) + continue + else: + error "pruning callback failed", error = $error + break + + if elements.len == 0: + # no elements to remove, stop + break + + for (hash, timestamp) in elements: + self.storage.erase(timestamp, hash).isOkOr: + error "storage erase failed", + timestamp = timestamp, msg_hash = hash.to0xHex(), error = $error + continue + + if cursor.isNone(): + # no more pages, stop + break + + self.pruneStart = pruneStop + pruneStop = getNowInNanosecondTime() - self.syncRange.nanos + + debug "periodic prune done", storageSize = self.storage.len + + continue + +proc start*(self: WakuSync) = + self.started = true + + if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: + self.periodicSyncFut = self.periodicSync(self.transferCallBack.get()) + + if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: + self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get()) + + info "WakuSync protocol started" + +proc stopWait*(self: WakuSync) {.async.} = + if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: + await self.periodicSyncFut.cancelAndWait() + + if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: + await self.periodicPruneFut.cancelAndWait() + + info "WakuSync protocol stopped" diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim new file mode 100644 index 0000000000..e96898d961 --- /dev/null +++ b/waku/waku_sync/raw_bindings.nim @@ -0,0 +1,501 @@ +{.push raises: [].} + +from os import DirSep + +import std/[strutils], chronicles, std/options, stew/byteutils, confutils, results +import ../waku_core/message + +const negentropyPath = + currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep & + "vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep + +{.link: negentropyPath & "libnegentropy.so".} + +const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h" + +logScope: + topics = "waku sync" + +type Buffer = object + len*: uint64 + `ptr`*: ptr uint8 + +type BindingResult = object + output: Buffer + have_ids_len: uint + need_ids_len: uint + have_ids: ptr Buffer + need_ids: ptr Buffer + error: cstring + +proc toWakuMessageHash(buffer: Buffer): WakuMessageHash = + assert buffer.len == 32 + + var hash: WakuMessageHash + + copyMem(hash[0].addr, buffer.ptr, 32) + + return hash + +proc toBuffer(x: openArray[byte]): Buffer = + ## converts the input to a Buffer object + ## the Buffer object is used to communicate data with the rln lib + var temp = @x + let baseAddr = cast[pointer](x) + let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len)) + return output + +proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] = + var bufLen: uint64 + if isNone(len): + bufLen = buffer.len + else: + bufLen = len.get() + if bufLen == 0: + return @[] + trace "length of buffer is", len = bufLen + let bytes = newSeq[byte](bufLen) + copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen) + return bytes + +proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] = + var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr) + var mySequence = newSeq[Buffer](buffLen) + for i in 0 .. buffLen - 1: + mySequence[i] = uncheckedArr[i] + return mySequence + +### Storage ### + +type NegentropyStorage* = distinct pointer + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27 +proc storage_init( + db_path: cstring, name: cstring +): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41 +proc raw_insert( + storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer +): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43 +proc raw_erase( + storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer +): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29 +proc free( + storage: NegentropyStorage +) {.header: NEGENTROPY_HEADER, importc: "storage_delete".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31 +proc size( + storage: NegentropyStorage +): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".} + +### Negentropy ### + +type RawNegentropy* = distinct pointer + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33 +proc constructNegentropy( + storage: NegentropyStorage, frameSizeLimit: uint64 +): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37 +proc raw_initiate( + negentropy: RawNegentropy, r: ptr BindingResult +): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39 +proc raw_setInitiator( + negentropy: RawNegentropy +) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45 +proc raw_reconcile( + negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult +): int {.header: NEGENTROPY_HEADER, importc: "reconcile".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51 +proc raw_reconcile_with_ids( + negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult +): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35 +proc free( + negentropy: RawNegentropy +) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53 +proc free_result( + r: ptr BindingResult +) {.header: NEGENTROPY_HEADER, importc: "free_result".} + +### SubRange ### + +type NegentropySubRangeStorage* = distinct pointer + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57 +proc subrange_init( + storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64 +): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".} + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59 +proc free( + subrange: NegentropySubRangeStorage +) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".} + +# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31 +proc size( + subrange: NegentropySubRangeStorage +): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".} + +### Negentropy with NegentropySubRangeStorage ### + +type RawNegentropySubRange = distinct pointer + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61 +proc constructNegentropyWithSubRange( + subrange: NegentropySubRangeStorage, frameSizeLimit: uint64 +): RawNegentropySubRange {. + header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new" +.} + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65 +proc raw_initiate_subrange( + negentropy: RawNegentropySubRange, r: ptr BindingResult +): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".} + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67 +proc raw_reconcile_subrange( + negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult +): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".} + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69 +proc raw_reconcile_with_ids_subrange( + negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult +): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".} + +# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63 +proc free( + negentropy: RawNegentropySubRange +) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".} + +### Wrappings ### + +### Storage ### + +proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.} + +proc new*(T: type NegentropyStorage): Result[T, string] = + #TODO db name and path + let storage = storage_init("", "") + + #[ TODO: Uncomment once we move to lmdb + if storage == nil: + return err("storage initialization failed") ]# + return ok(storage) + +proc delete*(storage: NegentropyStorage) = + storage.free() + +proc erase*( + storage: NegentropyStorage, id: int64, hash: WakuMessageHash +): Result[void, string] = + var buffer = toBuffer(hash) + var bufPtr = addr(buffer) + let res = raw_erase(storage, uint64(id), bufPtr) + + #TODO error handling once we move to lmdb + + if res: + return ok() + else: + return err("erase error") + +proc insert*( + storage: NegentropyStorage, id: int64, hash: WakuMessageHash +): Result[void, string] = + var buffer = toBuffer(hash) + var bufPtr = addr(buffer) + let res = raw_insert(storage, uint64(id), bufPtr) + + #TODO error handling once we move to lmdb + + if res: + return ok() + else: + return err("insert error") + +proc len*(storage: NegentropyStorage): int = + int(storage.size) + +### SubRange ### + +proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.} + +proc new*( + T: type NegentropySubRangeStorage, + storage: NegentropyStorage, + startTime: uint64 = uint64.low, + endTime: uint64 = uint64.high, +): Result[T, string] = + let subrange = subrange_init(storage, startTime, endTime) + + #[ TODO: Uncomment once we move to lmdb + if storage == nil: + return err("storage initialization failed") ]# + return ok(subrange) + +proc delete*(subrange: NegentropySubRangeStorage) = + subrange.free() + +proc len*(subrange: NegentropySubRangeStorage): int = + int(subrange.size) + +### Interface ### + +type + Negentropy* = ref object of RootObj + + NegentropyWithSubRange = ref object of Negentropy + inner: RawNegentropySubRange + + NegentropyWithStorage = ref object of Negentropy + inner: RawNegentropy + + NegentropyPayload* = distinct seq[byte] + +method delete*(self: Negentropy) {.base, gcsafe.} = + discard + +method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} = + discard + +method serverReconcile*( + self: Negentropy, query: NegentropyPayload +): Result[NegentropyPayload, string] {.base.} = + discard + +method clientReconcile*( + self: Negentropy, + query: NegentropyPayload, + haves: var seq[WakuMessageHash], + needs: var seq[WakuMessageHash], +): Result[Option[NegentropyPayload], string] {.base.} = + discard + +### Impl. ### + +proc new*( + T: type Negentropy, + storage: NegentropyStorage | NegentropySubRangeStorage, + frameSizeLimit: int, +): Result[T, string] = + if storage is NegentropyStorage: + let raw_negentropy = + constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit)) + + let negentropy = NegentropyWithStorage(inner: raw_negentropy) + + return ok(negentropy) + elif storage is NegentropySubRangeStorage: + let raw_negentropy = constructNegentropyWithSubRange( + NegentropySubRangeStorage(storage), uint64(frameSizeLimit) + ) + + let negentropy = NegentropyWithSubRange(inner: raw_negentropy) + + return ok(negentropy) + +method delete*(self: NegentropyWithSubRange) = + self.inner.free() + +method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] = + ## Client inititate a sync session with a server by sending a payload + var myResult {.noinit.}: BindingResult = BindingResult() + var myResultPtr = addr myResult + + let ret = self.inner.raw_initiate_subrange(myResultPtr) + if ret < 0 or myResultPtr == nil: + error "negentropy initiate failed with code ", code = ret + return err("negentropy already initiated!") + let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) + free_result(myResultPtr) + trace "received return from initiate", len = myResultPtr.output.len + + return ok(NegentropyPayload(bytes)) + +method serverReconcile*( + self: NegentropyWithSubRange, query: NegentropyPayload +): Result[NegentropyPayload, string] = + ## Server response to a negentropy payload. + ## Always return an answer. + + let queryBuf = toBuffer(seq[byte](query)) + var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error + var myResult {.noinit.}: BindingResult = BindingResult() + var myResultPtr = addr myResult + + let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr) + if ret < 0: + error "raw_reconcile failed with code ", code = ret + return err($myResultPtr.error) + trace "received return from raw_reconcile", len = myResultPtr.output.len + + let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) + trace "outputBytes len", len = outputBytes.len + free_result(myResultPtr) + + return ok(NegentropyPayload(outputBytes)) + +method clientReconcile*( + self: NegentropyWithSubRange, + query: NegentropyPayload, + haves: var seq[WakuMessageHash], + needs: var seq[WakuMessageHash], +): Result[Option[NegentropyPayload], string] = + ## Client response to a negentropy payload. + ## May return an answer, if not the sync session done. + + let cQuery = toBuffer(seq[byte](query)) + + var myResult {.noinit.}: BindingResult = BindingResult() + myResult.have_ids_len = 0 + myResult.need_ids_len = 0 + var myResultPtr = addr myResult + + let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr) + if ret < 0: + error "raw_reconcile failed with code ", code = ret + return err($myResultPtr.error) + + let output = bufferToBytes(addr myResult.output) + + var + have_hashes: seq[Buffer] + need_hashes: seq[Buffer] + + if myResult.have_ids_len > 0: + have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) + if myResult.need_ids_len > 0: + need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) + + trace "have and need hashes ", + have_count = have_hashes.len, need_count = need_hashes.len + + for i in 0 .. have_hashes.len - 1: + var hash = toWakuMessageHash(have_hashes[i]) + trace "have hashes ", index = i, msg_hash = hash.to0xHex() + haves.add(hash) + + for i in 0 .. need_hashes.len - 1: + var hash = toWakuMessageHash(need_hashes[i]) + trace "need hashes ", index = i, msg_hash = hash.to0xHex() + needs.add(hash) + + trace "return ", output = output, len = output.len + + free_result(myResultPtr) + + if output.len < 1: + return ok(none(NegentropyPayload)) + + return ok(some(NegentropyPayload(output))) + +method delete*(self: NegentropyWithStorage) = + self.inner.free() + +method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] = + ## Client inititate a sync session with a server by sending a payload + var myResult {.noinit.}: BindingResult = BindingResult() + var myResultPtr = addr myResult + + let ret = self.inner.raw_initiate(myResultPtr) + if ret < 0 or myResultPtr == nil: + error "negentropy initiate failed with code ", code = ret + return err("negentropy already initiated!") + let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) + free_result(myResultPtr) + trace "received return from initiate", len = myResultPtr.output.len + + return ok(NegentropyPayload(bytes)) + +method serverReconcile*( + self: NegentropyWithStorage, query: NegentropyPayload +): Result[NegentropyPayload, string] = + ## Server response to a negentropy payload. + ## Always return an answer. + + let queryBuf = toBuffer(seq[byte](query)) + var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error + var myResult {.noinit.}: BindingResult = BindingResult() + var myResultPtr = addr myResult + + let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr) + if ret < 0: + error "raw_reconcile failed with code ", code = ret + return err($myResultPtr.error) + trace "received return from raw_reconcile", len = myResultPtr.output.len + + let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output)) + trace "outputBytes len", len = outputBytes.len + free_result(myResultPtr) + + return ok(NegentropyPayload(outputBytes)) + +method clientReconcile*( + self: NegentropyWithStorage, + query: NegentropyPayload, + haves: var seq[WakuMessageHash], + needs: var seq[WakuMessageHash], +): Result[Option[NegentropyPayload], string] = + ## Client response to a negentropy payload. + ## May return an answer, if not the sync session done. + + let cQuery = toBuffer(seq[byte](query)) + + var myResult {.noinit.}: BindingResult = BindingResult() + myResult.have_ids_len = 0 + myResult.need_ids_len = 0 + var myResultPtr = addr myResult + + let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr) + if ret < 0: + error "raw_reconcile failed with code ", code = ret + return err($myResultPtr.error) + + let output = bufferToBytes(addr myResult.output) + + var + have_hashes: seq[Buffer] + need_hashes: seq[Buffer] + + if myResult.have_ids_len > 0: + have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) + if myResult.need_ids_len > 0: + need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) + + trace "have and need hashes ", + have_count = have_hashes.len, need_count = need_hashes.len + + for i in 0 .. have_hashes.len - 1: + var hash = toWakuMessageHash(have_hashes[i]) + trace "have hashes ", index = i, msg_hash = hash.to0xHex() + haves.add(hash) + + for i in 0 .. need_hashes.len - 1: + var hash = toWakuMessageHash(need_hashes[i]) + trace "need hashes ", index = i, msg_hash = hash.to0xHex() + needs.add(hash) + + trace "return ", output = output, len = output.len + + free_result(myResultPtr) + + if output.len < 1: + return ok(none(NegentropyPayload)) + + return ok(some(NegentropyPayload(output))) diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim new file mode 100644 index 0000000000..ff6d741efe --- /dev/null +++ b/waku/waku_sync/session.nim @@ -0,0 +1,240 @@ +{.push raises: [].} + +import std/options, results, chronos, libp2p/stream/connection + +import + ../common/nimchronos, + ../common/protobuf, + ../waku_core, + ./raw_bindings, + ./common, + ./codec + +#TODO add states for protocol negotiation + +### Type State ### + +type ClientSync* = object + haveHashes: seq[WakuMessageHash] + +type ServerSync* = object + +# T is either ClientSync or ServerSync + +type Reconciled*[T] = object + sync: T + negentropy: Negentropy + connection: Connection + frameSize: int + payload*: SyncPayload + +type Sent*[T] = object + sync: T + negentropy: Negentropy + connection: Connection + frameSize: int + +type Received*[T] = object + sync: T + negentropy: Negentropy + connection: Connection + frameSize: int + payload*: SyncPayload + +type Completed*[T] = object + sync: T + negentropy: Negentropy + connection: Connection + haveHashes: seq[WakuMessageHash] + +### State Transition ### + +proc clientInitialize*( + store: NegentropyStorage, + conn: Connection, + frameSize = DefaultMaxFrameSize, + start = int64.low, + `end` = int64.high, +): Result[Reconciled[ClientSync], string] = + let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`)) + + let negentropy = ?Negentropy.new(subrange, frameSize) + + let negentropyPayload = ?negentropy.initiate() + + let payload = SyncPayload(negentropy: seq[byte](negentropyPayload)) + + let sync = ClientSync() + + return ok( + Reconciled[ClientSync]( + sync: sync, + negentropy: negentropy, + connection: conn, + frameSize: frameSize, + payload: payload, + ) + ) + +proc serverInitialize*( + store: NegentropyStorage, + conn: Connection, + frameSize = DefaultMaxFrameSize, + syncStart = int64.low, + syncEnd = int64.high, +): Result[Sent[ServerSync], string] = + let subrange = + ?NegentropySubRangeStorage.new(store, uint64(syncStart), uint64(syncEnd)) + + let negentropy = ?Negentropy.new(subrange, frameSize) + + let sync = ServerSync() + + return ok( + Sent[ServerSync]( + sync: sync, negentropy: negentropy, connection: conn, frameSize: frameSize + ) + ) + +proc send*[T](self: Reconciled[T]): Future[Result[Sent[T], string]] {.async.} = + let writeRes = catch: + await self.connection.writeLP(self.payload.encode().buffer) + + if writeRes.isErr(): + return err("send connection write error: " & writeRes.error.msg) + + return ok( + Sent[T]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + ) + ) + +proc listenBack*[T](self: Sent[T]): Future[Result[Received[T], string]] {.async.} = + let readRes = catch: + await self.connection.readLp(-1) + + let buffer: seq[byte] = + if readRes.isOk(): + readRes.get() + else: + return err("listenBack connection read error: " & readRes.error.msg) + + # can't otherwise the compiler complains + #let payload = SyncPayload.decode(buffer).valueOr: + #return err($error) + + let decodeRes = SyncPayload.decode(buffer) + + let payload = + if decodeRes.isOk(): + decodeRes.get() + else: + let decodeError: ProtobufError = decodeRes.error + let errMsg = $decodeError + return err("listenBack decoding error: " & errMsg) + + return ok( + Received[T]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + payload: payload, + ) + ) + +# Aliasing for readability +type ContinueOrCompleted[T] = Result[Reconciled[T], Completed[T]] +type Continue[T] = Reconciled[T] + +proc clientReconcile*( + self: Received[ClientSync], needHashes: var seq[WakuMessageHash] +): Result[ContinueOrCompleted[ClientSync], string] = + var haves = self.sync.haveHashes + + let responseOpt = + ?self.negentropy.clientReconcile( + NegentropyPayload(self.payload.negentropy), haves, needHashes + ) + + let sync = ClientSync(haveHashes: haves) + + let response = responseOpt.valueOr: + let res = ContinueOrCompleted[ClientSync].err( + Completed[ClientSync]( + sync: sync, negentropy: self.negentropy, connection: self.connection + ) + ) + + return ok(res) + + let payload = SyncPayload(negentropy: seq[byte](response)) + + let res = ContinueOrCompleted[ClientSync].ok( + Continue[ClientSync]( + sync: sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + payload: payload, + ) + ) + + return ok(res) + +proc serverReconcile*( + self: Received[ServerSync] +): Result[ContinueOrCompleted[ServerSync], string] = + if self.payload.negentropy.len == 0: + let res = ContinueOrCompleted[ServerSync].err( + Completed[ServerSync]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + haveHashes: self.payload.hashes, + ) + ) + + return ok(res) + + let response = + ?self.negentropy.serverReconcile(NegentropyPayload(self.payload.negentropy)) + + let payload = SyncPayload(negentropy: seq[byte](response)) + + let res = ContinueOrCompleted[ServerSync].ok( + Continue[ServerSync]( + sync: self.sync, + negentropy: self.negentropy, + connection: self.connection, + frameSize: self.frameSize, + payload: payload, + ) + ) + + return ok(res) + +proc clientTerminate*( + self: Completed[ClientSync] +): Future[Result[void, string]] {.async.} = + let payload = SyncPayload(hashes: self.sync.haveHashes) + + let writeRes = catch: + await self.connection.writeLp(payload.encode().buffer) + + if writeRes.isErr(): + return err("clientTerminate connection write error: " & writeRes.error.msg) + + self.negentropy.delete() + + return ok() + +proc serverTerminate*( + self: Completed[ServerSync] +): Future[seq[WakuMessageHash]] {.async.} = + self.negentropy.delete() + + return self.haveHashes diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim new file mode 100644 index 0000000000..528da2b64c --- /dev/null +++ b/waku/waku_sync/storage_manager.nim @@ -0,0 +1,76 @@ +# Unused yet. Kept for future use. +#[ when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[times, tables, options], chronicles, chronos, stew/results + +import ./raw_bindings, ../waku_core/time + +logScope: + topics = "waku sync" + +type WakuSyncStorageManager* = ref object + storages: OrderedTable[string, Storage] + # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH + maxHours: int64 + +proc new*( + T: type WakuSyncStorageManager, + hoursToStore: times.Duration = initDuration(minutes = 120), +): T = + return WakuSyncStorageManager(maxHours: hoursToStore.inHours) + +proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], string] = + if self.storages.len() == 0: + return ok(none(Storage)) + var storageToFetch: Storage + #is there a more effective way to fetch last element? + for k, storage in self.storages: + storageToFetch = storage + + return ok(some(storageToFetch)) + +proc deleteOldestStorage*(self: WakuSyncStorageManager) = + var storageToDelete: Storage + var time: string + #is there a more effective way to fetch first element? + for k, storage in self.storages: + storageToDelete = storage + time = k + break + + if self.storages.pop(time, storageToDelete): + delete(storageToDelete) + +proc retrieveStorage*( + self: WakuSyncStorageManager, time: Timestamp +): Result[Option[Storage], string] = + var timestamp: Timestamp + if time == 0: + timestamp = timestampInSeconds(getNowInNanosecondTime()) + debug "timestamp not provided, using now to fetch storage", timestamp = timestamp + else: + timestamp = timestampInSeconds(time) + let tsTime = times.fromUnix(timestamp) + let dateTime = times.format(tsTime, "yyyyMMddHH", utc()) + + var storage: Storage = self.storages.getOrDefault(dateTime) + if storage == nil: + #create a new storage + # TODO: May need synchronization?? + # Limit number of storages to configured duration + let hours = self.storages.len() + if hours == self.maxHours: + #Need to delete oldest storage at this point, but what if that is being synced? + self.deleteOldestStorage() + info "number of storages reached, deleting the oldest" + info "creating a new storage for ", time = dateTime + storage = Storage.new().valueOr: + error "storage creation failed" + return err(error) + self.storages[dateTime] = storage + + return ok(some(storage)) + ]#