Skip to content

Commit

Permalink
periodic sync & peer manager
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Feb 7, 2024
1 parent e3949c0 commit 3635cfc
Showing 1 changed file with 51 additions and 10 deletions.
61 changes: 51 additions & 10 deletions waku/waku_sync/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@ import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_enr
../waku_enr,
../node/peer_manager/peer_manager

logScope:
topics = "waku sync"

const WakuSyncCodec* = "/vac/waku/sync/1.0.0"
const DefaultFrameSize = 153600 # using a random number for now
const DefaultSyncInterval = 60.minutes

type
WakuSyncCallback* = proc(hashes: seq[WakuMessageHash]) {.closure, gcsafe, raises: [].}

WakuSync* = ref object of LPProtocol
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
syncInterval: Duration
callback: Option[WakuSyncCallback]

proc serverReconciliation(self: WakuSync, message: seq[byte]): Future[Result[seq[byte], string]] {.async.} =
#TODO compute the payload
Expand All @@ -40,7 +48,7 @@ proc clientReconciliation(self: WakuSync, message: seq[byte], hashes: var seq[Wa

let payload: seq[byte] = @[0]

# TODO update the hashes if needed
# TODO update the hashes

ok(some(payload))

Expand All @@ -51,10 +59,7 @@ proc intitialization(self: WakuSync): Future[Result[seq[byte], string]] {.async.

ok(payload)

# Alternatively request could stay internal
# but WakuSync would have to access the switch to dial peers.

proc request*(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
proc request(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let request = (await self.intitialization()).valueOr:
return err(error)

Expand Down Expand Up @@ -85,7 +90,19 @@ proc request*(self: WakuSync, conn: Connection): Future[Result[seq[WakuMessageHa

return ok(hashes)

proc initProtocolHandler*(self: WakuSync) =
proc sync*(self: WakuSync): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let peer = self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")

let conn = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr:
return err("Cannot establish sync connection")

let hashes = (await self.request(conn)).valueOr:
return err("Sync request error: " & error)

ok(hashes)

proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
while not conn.isClosed: # Not sure if this works as I think it does...
let requestRes = catch: await conn.readLp(self.maxFrameSize)
Expand All @@ -99,18 +116,42 @@ proc initProtocolHandler*(self: WakuSync) =

let writeRes = catch: await conn.writeLP(response)
if writeRes.isErr():
error "Response decoding error", error=writeRes.error.msg
error "Connection write error", error=writeRes.error.msg
return

self.handler = handle
self.codec = WakuSyncCodec

proc new*(T: type WakuSync): T =
let sync = WakuSync()
proc new*(T: type WakuSync,
peerManager: PeerManager,
maxFrameSize: int = DefaultFrameSize,
syncInterval: Duration = DefaultSyncInterval,
callback: Option[WakuSyncCallback] = none(WakuSyncCallback)
): T =
let sync = WakuSync(peerManager, maxFrameSize, syncInterval, callback)

sync.initProtocolHandler()

info "Created WakuSync protocol"

return sync

proc periodicSync(self: WakuSync) {.async.} =
while self.started and self.callback.isSome():
await sleepAsync(self.syncInterval)

let hashes = (await self.sync()).valueOr:
continue

let callback = self.callback.get()

callback(hashes)

proc start*(self: WakuSync) =
self.started = true

asyncSpawn self.periodicSync()

proc stop*(self: WakuSync) =
self.started = false

0 comments on commit 3635cfc

Please sign in to comment.