From a31d630802cad7f8b1f7a4c4a930e0776fc631ba Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 6 Feb 2024 16:41:05 -0500 Subject: [PATCH] feat: Waku Sync Protocol Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- .gitmodules | 5 + Makefile | 18 +- tests/waku_sync/sync_utils.nim | 29 +++ tests/waku_sync/test_all.nim | 4 + tests/waku_sync/test_protocol.nim | 137 ++++++++++++ vendor/negentropy | 1 + waku/node/waku_node.nim | 25 +++ waku/waku_api/rest/admin/handlers.nim | 10 + waku/waku_sync.nim | 10 + waku/waku_sync/protocol.nim | 195 +++++++++++++++++ waku/waku_sync/raw_bindings.nim | 287 ++++++++++++++++++++++++++ 11 files changed, 715 insertions(+), 6 deletions(-) create mode 100644 tests/waku_sync/sync_utils.nim create mode 100644 tests/waku_sync/test_all.nim create mode 100644 tests/waku_sync/test_protocol.nim create mode 160000 vendor/negentropy create mode 100644 waku/waku_sync.nim create mode 100644 waku/waku_sync/protocol.nim create mode 100644 waku/waku_sync/raw_bindings.nim diff --git a/.gitmodules b/.gitmodules index 69b4fe2cab..ffd222ee65 100644 --- a/.gitmodules +++ b/.gitmodules @@ -164,3 +164,8 @@ branch = master path = vendor/nim-results url = https://github.com/arnetheduck/nim-results.git +[submodule "vendor/negentropy"] + ignore = untracked + path = vendor/negentropy + url = https://github.com/waku-org/negentropy.git + branch = feat/c-wrapper \ No newline at end of file diff --git a/Makefile b/Makefile index 2125f5d85c..3126e3c730 100644 --- a/Makefile +++ b/Makefile @@ -32,10 +32,10 @@ else # "variables.mk" was included. Business as usual until the end of this file ########## ## Main ## ########## -.PHONY: all test update clean +.PHONY: all test update clean negentropy # default target, because it's the first one that doesn't start with '.' -all: | wakunode2 example2 chat2 chat2bridge libwaku +all: | negentropy wakunode2 example2 chat2 chat2bridge libwaku test: | testcommon testwaku @@ -46,7 +46,7 @@ update: | update-common rm -rf waku.nims && \ $(MAKE) waku.nims $(HANDLE_OUTPUT) -clean: +clean: | negentropy-clean rm -rf build # must be included after the default target @@ -182,11 +182,11 @@ testcommon: | build deps .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge # install anvil only for the testwaku target -testwaku: | build deps anvil librln +testwaku: | build deps anvil librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim test -d:os=$(shell uname) $(NIM_PARAMS) waku.nims -wakunode2: | build deps librln +wakunode2: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) waku.nims @@ -194,7 +194,7 @@ benchmarks: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim benchmarks $(NIM_PARAMS) waku.nims -testwakunode2: | build deps librln +testwakunode2: | build deps librln negentropy echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim testwakunode2 $(NIM_PARAMS) waku.nims @@ -335,3 +335,9 @@ release-notes: sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, # asked here: https://github.com/bvieira/sv4git/discussions/101 +negentropy: + $(MAKE) -C vendor/negentropy/cpp && \ + cp vendor/negentropy/cpp/libnegentropy.so ./ +negentropy-clean: + $(MAKE) -C vendor/negentropy/cpp clean && \ + rm libnegentropy.so diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim new file mode 100644 index 0000000000..c277a0d1e2 --- /dev/null +++ b/tests/waku_sync/sync_utils.nim @@ -0,0 +1,29 @@ +{.used.} + +import + std/options, + chronos, + chronicles, + libp2p/crypto/crypto + +import + ../../../waku/[ + node/peer_manager, + waku_core, + waku_sync, + ], + ../testlib/[ + common, + wakucore + ] + +proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} = + const DefaultFrameSize = 153600 + let + peerManager = PeerManager.new(switch) + proto = WakuSync.new(peerManager, DefaultFrameSize, 2.seconds, some(handler)) + + proto.start() + switch.mount(proto) + + return proto \ No newline at end of file diff --git a/tests/waku_sync/test_all.nim b/tests/waku_sync/test_all.nim new file mode 100644 index 0000000000..178e9277ea --- /dev/null +++ b/tests/waku_sync/test_all.nim @@ -0,0 +1,4 @@ +{.used.} + +import + ./test_protocol diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim new file mode 100644 index 0000000000..cccd3b13f9 --- /dev/null +++ b/tests/waku_sync/test_protocol.nim @@ -0,0 +1,137 @@ +{.used.} + +import + std/options, + testutils/unittests, + chronos, + chronicles, + libp2p/crypto/crypto, + stew/byteutils +from std/os import sleep + +import + ../../../waku/[ + common/paging, + node/peer_manager, + waku_core, + waku_core/message/digest, + waku_sync, + waku_sync/raw_bindings, + ], + ../testlib/[ + common, + wakucore + ], + ./sync_utils + + +suite "Waku Sync - Protocol Tests": + + asyncTest "test c integration": + let + s1 = Storage.new() + s2 = Storage.new() + ng1 = Negentropy.new(s1,10000) + ng2 = Negentropy.new(s2,10000) + + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg1) + + check: + s1.insert(msg1.timestamp, msgHash).isOk() + s2.insert(msg1.timestamp, msgHash).isOk() + + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msgHash2: WakuMessageHash = computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) + + check: + s2.insert(msg2.timestamp, msgHash2).isOk() + + let ng1_q1 = ng1.initiate() + check: + ng1_q1.isOk() + + let ng2_q1 = ng2.serverReconcile(ng1_q1.get()) + check: + ng2_q1.isOk() + + var + haveHashes: seq[WakuMessageHash] + needHashes: seq[WakuMessageHash] + let ng1_q2 = ng1.clientReconcile(ng2_q1.get(), haveHashes, needHashes) + + check: + needHashes.len() == 1 + haveHashes.len() == 0 + ng1_q2.isOk() + needHashes[0] == msgHash2 + + check: + s1.erase(msg1.timestamp, msgHash).isOk() + + asyncTest "sync 2 nodes different hashes": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + + let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = + debug "Received needHashes from peer:", len = hashes.len + for hash in hashes: + debug "Hash received from peer:", hash=hash.to0xHex() + + let + server = await newTestWakuSync(serverSwitch, handler=protoHandler) + client = await newTestWakuSync(clientSwitch, handler=protoHandler) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + + var hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 1 + hashes.value[0] == computeMessageHash(pubsubTopic=DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer +#[ client.ingessMessage(DefaultPubsubTopic, msg2) + sleep(1000) + hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 0 ]# + + asyncTest "sync 2 nodes same hashes": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + + let protoHandler:WakuSyncCallback = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} = + debug "Received needHashes from peer:", len = hashes.len + for hash in hashes: + debug "Hash received from peer:", hash=hash.to0xHex() + + let + server = await newTestWakuSync(serverSwitch, handler=protoHandler) + client = await newTestWakuSync(clientSwitch, handler=protoHandler) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + client.ingessMessage(DefaultPubsubTopic, msg2) + + let hashes = await client.sync(serverPeerInfo) + assert hashes.isOk(), $hashes.error + check: + hashes.value.len == 0 diff --git a/vendor/negentropy b/vendor/negentropy new file mode 160000 index 0000000000..1a59da6c32 --- /dev/null +++ b/vendor/negentropy @@ -0,0 +1 @@ +Subproject commit 1a59da6c32605bb5ae7c495c66dac3367d4bc560 diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 91c0494419..f77ba199be 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -37,6 +37,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_sync, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, ../waku_lightpush/protocol, @@ -97,6 +98,7 @@ type wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange wakuMetadata*: WakuMetadata + wakuSync*: WakuSync enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -182,6 +184,22 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s # NOTE Connects to the node without a give protocol, which automatically creates streams for relay await peer_manager.connectToNodes(node.peerManager, nodes, source=source) +## Waku Sync + +proc mountWakuSync*(node: WakuNode): Result[void, string] = + if not node.wakuSync.isNil(): + return err("Waku sync already mounted, skipping") + + let sync = WakuSync.new(node.peerManager)#TODO add the callback and the options + + node.wakuSync = sync + + let catchRes = catch: node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) + if catchRes.isErr(): + return err(catchRes.error.msg) + + return ok() + ## Waku Metadata proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = @@ -237,10 +255,17 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) + proc syncHandler(topic: PubsubTopic, msg: WakuMessage) = + if node.wakuSync.isNil(): + return + + node.wakuSync.ingessMessage(topic, msg) + let defaultHandler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) + syncHandler(topic, msg) discard node.wakuRelay.subscribe(topic, defaultHandler) diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index ca85b61c63..103740e597 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -19,6 +19,7 @@ import ../../../waku_lightpush/common, ../../../waku_relay, ../../../waku_node, + ../../../waku_sync, ../../../node/peer_manager, ../responses, ../serdes, @@ -89,6 +90,15 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = connected: it.connectedness == Connectedness.Connected)) tuplesToWakuPeers(peers, lightpushPeers) + if not node.wakuSync.isNil(): + # Map WakuStore peers to WakuPeers and add to return list + let syncPeers = node.peerManager.peerStore + .peers(WakuSyncCodec) + .mapIt((multiaddr: constructMultiaddrStr(it), + protocol: WakuSyncCodec, + connected: it.connectedness == Connectedness.Connected)) + tuplesToWakuPeers(peers, syncPeers) + let resp = RestApiResponse.jsonResponse(peers, status=Http200) if resp.isErr(): error "An error ocurred while building the json respose: ", error=resp.error diff --git a/waku/waku_sync.nim b/waku/waku_sync.nim new file mode 100644 index 0000000000..dc60abeaee --- /dev/null +++ b/waku/waku_sync.nim @@ -0,0 +1,10 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + ./waku_sync/protocol + +export + protocol diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim new file mode 100644 index 0000000000..c9fecab3ef --- /dev/null +++ b/waku/waku_sync/protocol.nim @@ -0,0 +1,195 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options, + stew/results, + chronicles, + chronos, + metrics, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/enr, + ../waku_core, + ../waku_enr, + ../node/peer_manager/peer_manager, + ./raw_bindings + +logScope: + topics = "waku sync" + +const WakuSyncCodec* = "/vac/waku/sync/1.0.0" +const DefaultFrameSize = 153600 # using a random number for now +const DefaultSyncInterval = 60.minutes + +type + #TODO maybe add the remote peer info? + WakuSyncCallback* = proc(hashes: seq[WakuMessageHash]) {.async: (raises: []), closure, gcsafe.} + + WakuSync* = ref object of LPProtocol + storage: Storage + negentropy: Negentropy + peerManager: PeerManager + maxFrameSize: int # Not sure if this should be protocol defined or not... + syncInterval: Duration + callback: Option[WakuSyncCallback] + periodicSyncFut: Future[void] + +proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = + if msg.ephemeral: + return + + let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) + debug "inserting message into storage ", hash=msgHash + + if self.storage.insert(msg.timestamp, msgHash).isErr(): + debug "failed to insert message ", hash=msgHash.toHex() + +proc serverReconciliation(self: WakuSync, payload: NegentropyPayload): Result[NegentropyPayload, string] = + return self.negentropy.serverReconcile(payload) + +proc clientReconciliation( + self: WakuSync, payload: NegentropyPayload, + haveHashes: var seq[WakuMessageHash], + needHashes: var seq[WakuMessageHash], + ): Result[Option[NegentropyPayload], string] = + return self.negentropy.clientReconcile(payload, haveHashes, needHashes) + +proc intitialization(self: WakuSync): Future[Result[NegentropyPayload, string]] {.async.} = + return self.negentropy.initiate() + +proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let payload = (await self.intitialization()).valueOr: + return err(error) + + debug "sending request to server", payload = toHex(seq[byte](payload)) + + let writeRes = catch: await conn.writeLP(seq[byte](payload)) + if writeRes.isErr(): + return err(writeRes.error.msg) + + var + haveHashes: seq[WakuMessageHash] # What to do with haves ??? + needHashes: seq[WakuMessageHash] + + while true: + let readRes = catch: await conn.readLp(self.maxFrameSize) + + let buffer: seq[byte] = readRes.valueOr: + return err(error.msg) + + debug "Received Sync request from peer", payload = toHex(buffer) + + let request = NegentropyPayload(buffer) + + let responseOpt = self.clientReconciliation(request, haveHashes, needHashes).valueOr: + return err(error) + + let response = responseOpt.valueOr: + debug "Closing connection, sync session is done" + await conn.close() + break + + debug "Sending Sync response to peer", payload = toHex(seq[byte](response)) + + let writeRes = catch: await conn.writeLP(seq[byte](response)) + if writeRes.isErr(): + return err(writeRes.error.msg) + + return ok(needHashes) + +proc sync*(self: WakuSync): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: + return err("No suitable peer found for sync") + + let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + return err("Cannot establish sync connection") + + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + return err("Sync request error: " & error) + + ok(hashes) + +proc sync*(self: WakuSync, peer: RemotePeerInfo): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + return err("Cannot establish sync connection") + + let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: + return err("Sync request error: " & error) + + ok(hashes) + +proc initProtocolHandler(self: WakuSync) = + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + # Not sure if this works as I think it does... + while not conn.isClosed: + let requestRes = catch: await conn.readLp(self.maxFrameSize) + + let buffer = requestRes.valueOr: + error "Connection reading error", error=error.msg + return + + let request = NegentropyPayload(buffer) + + let response = self.serverReconciliation(request).valueOr: + error "Reconciliation error", error=error + return + + let writeRes= catch: await conn.writeLP(seq[byte](response)) + if writeRes.isErr(): + error "Connection write error", error=writeRes.error.msg + return + + self.handler = handle + self.codec = WakuSyncCodec + +proc new*(T: type WakuSync, + peerManager: PeerManager, + maxFrameSize: int = DefaultFrameSize, + syncInterval: Duration = DefaultSyncInterval, + callback: Option[WakuSyncCallback] = none(WakuSyncCallback) +): T = + let storage = Storage.new() + let negentropy = Negentropy.new(storage, uint64(maxFrameSize)) + + let sync = WakuSync( + storage: storage, + negentropy: negentropy, + peerManager: peerManager, + maxFrameSize: maxFrameSize, + syncInterval: syncInterval, + callback: callback + ) + + sync.initProtocolHandler() + + info "Created WakuSync protocol" + + return sync + +proc periodicSync(self: WakuSync) {.async.} = + while true: + await sleepAsync(self.syncInterval) + + let hashes = (await self.sync()).valueOr: + error "periodic sync error", error = error + continue + + let callback = self.callback.valueOr: + continue + + await callback(hashes) + +proc start*(self: WakuSync) = + self.started = true + + self.periodicSyncFut = self.periodicSync() + +proc stopWait*(self: WakuSync) {.async.} = + await self.periodicSyncFut.cancelAndWait() \ No newline at end of file diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim new file mode 100644 index 0000000000..cd019a0fb6 --- /dev/null +++ b/waku/waku_sync/raw_bindings.nim @@ -0,0 +1,287 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +from os import DirSep + +import + std/[strutils], + chronicles, + std/options, + stew/[results, byteutils] +import + ../waku_core/message + +const negentropyPath = currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep & "vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep + +{.link: negentropyPath & "libnegentropy.so".} + +const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h" + +#[ proc StringtoBytes(data: cstring): seq[byte] = + let size = data.len() + + var bytes = newSeq[byte](size) + copyMem(bytes[0].addr, data[0].unsafeAddr, size) + + return bytes ]# + +type Buffer* = object + len*: uint64 + `ptr`*: ptr uint8 + +type BindingResult* = object + output: Buffer + have_ids_len: uint + need_ids_len: uint + have_ids: ptr Buffer + need_ids: ptr Buffer + +proc toWakuMessageHash(buffer: Buffer): WakuMessageHash = + assert buffer.len == 32 + + var hash: WakuMessageHash + + copyMem(hash[0].addr, buffer.ptr, 32) + + return hash + +proc toBuffer*(x: openArray[byte]): Buffer = + ## converts the input to a Buffer object + ## the Buffer object is used to communicate data with the rln lib + var temp = @x + let baseAddr = cast[pointer](x) + let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len)) + return output + +proc BufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)):seq[byte] = + var bufLen: uint64 + if isNone(len): + bufLen = buffer.len + else: + bufLen = len.get() + if bufLen == 0: + return @[] + debug "length of buffer is",len=bufLen + let bytes = newSeq[byte](bufLen) + copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen) + return bytes + +proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] = + var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr) + var mySequence = newSeq[Buffer](buffLen) + for i in 0..buffLen-1: + mySequence[i] = uncheckedArr[i] + return mySequence + +### Storage ### + +type + Storage* = distinct pointer + +proc storage_init(db_path:cstring, name: cstring): Storage{. header: NEGENTROPY_HEADER, importc: "storage_new".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy/storage/btree/core.h#L163 +proc raw_insert(storage: Storage, timestamp: uint64, id: ptr Buffer): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy/storage/btree/core.h#L300 +proc raw_erase(storage: Storage, timestamp: uint64, id: ptr Buffer): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".} + +### Negentropy ### + +type + Negentropy* = distinct pointer + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L42 +proc constructNegentropy(storage: Storage, frameSizeLimit: uint64): Negentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L46 +proc raw_initiate(negentropy: Negentropy, output: ptr Buffer): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L58 +proc raw_setInitiator(negentropy: Negentropy) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".} + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L62 +proc raw_reconcile(negentropy: Negentropy, query: ptr Buffer, output: ptr Buffer): int {.header: NEGENTROPY_HEADER, importc: "reconcile".} +#[ +type + ReconcileCallback = proc(have_ids: ptr Buffer, have_ids_len:uint64, need_ids: ptr Buffer, need_ids_len:uint64, output: ptr Buffer, outptr: var ptr cchar) {.cdecl, raises: [], gcsafe.}# {.header: NEGENTROPY_HEADER, importc: "reconcile_cbk".} + ]# + +# https://github.com/hoytech/negentropy/blob/6e1e6083b985adcdce616b6bb57b6ce2d1a48ec1/cpp/negentropy.h#L69 +#proc raw_reconcile(negentropy: pointer, query: ptr Buffer, cbk: ReconcileCallback, output: ptr cchar): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids".} + +proc raw_reconcile(negentropy: Negentropy, query: ptr Buffer, r: ptr BindingResult){.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".} + +proc free_result(r: ptr BindingResult){.header: NEGENTROPY_HEADER, importc: "free_result".} + +### Wrappings ### + +type + NegentropyPayload* = distinct seq[byte] + +proc new*(T: type Storage): T = + #TODO db name and path + let storage = storage_init("", "") + + #TODO error handling + + return storage + +proc erase*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, string] = + let cString = toBuffer(hash) + + let res = raw_erase(storage, uint64(id), cString.unsafeAddr) + + #TODO error handling + + if res: + return ok() + else: + return err("erase error") + +proc insert*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, string] = + var buffer = toBuffer(hash) + var bufPtr = addr(buffer) + let res = raw_insert(storage, uint64(id), bufPtr) + + #TODO error handling + + if res: + return ok() + else: + return err("insert error") + +proc new*(T: type Negentropy, storage: Storage, frameSizeLimit: uint64): T = + let negentropy = constructNegentropy(storage, frameSizeLimit) + + #TODO error handling + + return negentropy + +proc initiate*(negentropy: Negentropy): Result[NegentropyPayload, string] = + ## Client inititate a sync session with a server by sending a payload + + var output:seq[byte] = newSeq[byte](153600) #TODO: Optimize this using callback to avoid huge alloc + var outBuffer: Buffer = toBuffer(output) + let outLen: int = raw_initiate(negentropy, outBuffer.unsafeAddr) + let bytes: seq[byte] = BufferToBytes(addr(outBuffer), some(uint64(outLen))) + + debug "received return from initiate", len=outLen + + return ok(NegentropyPayload(bytes)) + +proc setInitiator*(negentropy: Negentropy) = + raw_setInitiator(negentropy) + +proc serverReconcile*( + negentropy: Negentropy, + query: NegentropyPayload, + ): Result[NegentropyPayload, string] = + ## Server response to a negentropy payload. + ## Always return an answer. + + let queryBuf = toBuffer(seq[byte](query)) + var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error + var output:seq[byte] = newSeq[byte](153600) #TODO: Optimize this using callback to avoid huge alloc + var outBuffer: Buffer = toBuffer(output) + + let outLen: int = raw_reconcile(negentropy, queryBufPtr, outBuffer.unsafeAddr) + debug "received return from raw_reconcile", len=outLen + + let outputBytes: seq[byte] = BufferToBytes(addr(outBuffer), some(uint64(outLen))) + debug "outputBytes len", len=outputBytes.len + + #TODO error handling + + return ok(NegentropyPayload(outputBytes)) + +proc clientReconcile*( + negentropy: Negentropy, + query: NegentropyPayload, + haveIds: var seq[WakuMessageHash], + needIds: var seq[WakuMessageHash], + ): Result[Option[NegentropyPayload], string] = + ## Client response to a negentropy payload. + ## May return an answer, if not the sync session done. + + let cQuery = toBuffer(seq[byte](query)) + + var myResult {.noinit.}: BindingResult = BindingResult() + myResult.have_ids_len = 0 + myResult.need_ids_len = 0 + var myResultPtr = addr myResult + + raw_reconcile(negentropy, cQuery.unsafeAddr, myResultPtr) + + if myResultPtr == nil: + return err("ERROR from raw_reconcile!") + + let output = BufferToBytes(addr myResult.output) + + var + have_hashes: seq[Buffer] + need_hashes: seq[Buffer] + + if myResult.have_ids_len > 0: + have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids) + if myResult.need_ids_len > 0: + need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids) + + debug "have and need hashes ",have_count=have_hashes.len, need_count=need_hashes.len + + for i in 0..have_hashes.len - 1: + var hash = toWakuMessageHash(have_hashes[i]) + debug "have hashes ", index=i, hash=hash.to0xHex() + haveIds.add(hash) + + for i in 0..need_hashes.len - 1: + var hash = toWakuMessageHash(need_hashes[i]) + debug "need hashes ", index=i, hash=hash.to0xHex() + needIds.add(hash) + + +#[ Callback Approach, to be uncommented later during optimization phase + var + cppHaveIds: cstringArray = allocCStringArray([]) + cppNeedIds: cstringArray = allocCStringArray([]) + haveIdsLen: uint + needIdsLen: uint + output: seq[byte] = newSeq[byte](1) #TODO: fix this hack. + + + let handler:ReconcileCallback = proc(have_ids: ptr Buffer, have_ids_len:uint64, need_ids: ptr Buffer, + need_ids_len:uint64, outBuffer: ptr Buffer, outptr: var ptr cchar) {.cdecl, raises: [], gcsafe.} = + debug "ReconcileCallback: Received needHashes from client:", len = need_ids_len , outBufLen=outBuffer.len + if outBuffer.len > 0: + let ret = BufferToBytes(outBuffer) + outptr = cast[ptr cchar](ret[0].unsafeAddr) + + try: + let ret = raw_reconcile(negentropy, cQuery.unsafeAddr, handler, cast[ptr cchar](output[0].unsafeAddr)) + if ret != 0: + error "failed to reconcile" + return + except Exception as e: + error "exception raised from raw_reconcile", error=e.msg ]# + + +#[ debug "haveIdsLen", len=haveIdsLen + + for ele in cstringArrayToSeq(cppHaveIds, haveIdsLen): + haveIds.add(toWakuMessageHash(ele)) + + for ele in cstringArrayToSeq(cppNeedIds, needIdsLen): + needIds.add(toWakuMessageHash(ele)) + + deallocCStringArray(cppHaveIds) + deallocCStringArray(cppNeedIds) ]# + free_result(myResultPtr) + + debug "return " , output=output + + if output.len < 1: + return ok(none(NegentropyPayload)) + + return ok(some(NegentropyPayload(output))) \ No newline at end of file