diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 1574630b32..e1cbc8e8f1 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -17,16 +17,24 @@ import ../common/nimchronos, ../common/enr, ../waku_core, - ../waku_enr + ../waku_enr, + ../node/peer_manager/peer_manager logScope: topics = "waku sync" const WakuSyncCodec* = "/vac/waku/sync/1.0.0" +const DefaultFrameSize = 153600 # using a random number for now +const DefaultSyncInterval = 60.minutes type + WakuSyncCallback* = proc(hashes: seq[WakuMessageHash]) {.closure, gcsafe, raises: [].} + WakuSync* = ref object of LPProtocol + peerManager: PeerManager maxFrameSize: int # Not sure if this should be protocol defined or not... + syncInterval: Duration + callback: Option[WakuSyncCallback] proc serverReconciliation(self: WakuSync, message: seq[byte]): Future[Result[seq[byte], string]] {.async.} = #TODO compute the payload @@ -40,7 +48,7 @@ proc clientReconciliation(self: WakuSync, message: seq[byte], hashes: var seq[Wa let payload: seq[byte] = @[0] - # TODO update the hashes if needed + # TODO update the hashes ok(some(payload)) @@ -51,10 +59,7 @@ proc intitialization(self: WakuSync): Future[Result[seq[byte], string]] {.async. ok(payload) -# Alternatively request could stay internal -# but WakuSync would have to access the switch to dial peers. - -proc request*(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = +proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = let request = (await self.intitialization()).valueOr: return err(error) @@ -85,7 +90,19 @@ proc request*(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHa return ok(hashes) -proc initProtocolHandler*(self: WakuSync) = +proc sync*(self: WakuSync): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + let peer = self.peerManager.selectPeer(WakuSyncCodec).valueOr: + return err("No suitable peer found for sync") + + let conn = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + return err("Cannot establish sync connection") + + let hashes = (await self.request(conn)).valueOr: + return err("Sync request error: " & error) + + ok(hashes) + +proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = while not conn.isClosed: # Not sure if this works as I think it does... let requestRes = catch: await conn.readLp(self.maxFrameSize) @@ -99,14 +116,19 @@ proc initProtocolHandler*(self: WakuSync) = let writeRes = catch: await conn.writeLP(response) if writeRes.isErr(): - error "Response decoding error", error=writeRes.error.msg + error "Connection write error", error=writeRes.error.msg return self.handler = handle self.codec = WakuSyncCodec -proc new*(T: type WakuSync): T = - let sync = WakuSync() +proc new*(T: type WakuSync, + peerManager: PeerManager, + maxFrameSize: int = DefaultFrameSize, + syncInterval: Duration = DefaultSyncInterval, + callback: Option[WakuSyncCallback] = none(WakuSyncCallback) +): T = + let sync = WakuSync(peerManager, maxFrameSize, syncInterval, callback) sync.initProtocolHandler() @@ -114,3 +136,22 @@ proc new*(T: type WakuSync): T = return sync +proc periodicSync(self: WakuSync) {.async.} = + while self.started and self.callback.isSome(): + await sleepAsync(self.syncInterval) + + let hashes = (await self.sync()).valueOr: + continue + + let callback = self.callback.get() + + callback(hashes) + +proc start*(self: WakuSync) = + self.started = true + + asyncSpawn self.periodicSync() + +proc stop*(self: WakuSync) = + self.started = false +