Skip to content

Commit

Permalink
make relayed messages non priority (don't use an explicit queue for p…
Browse files Browse the repository at this point in the history
…riority msgs)
  • Loading branch information
diegomrsantos committed Feb 1, 2024
1 parent 5594bcb commit 23dc3a9
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 43 deletions.
4 changes: 2 additions & 2 deletions libp2p/errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
debug "A future has failed, enable trace logging for details", error=exc.name
trace "Exception details", msg=exc.msg

proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
var futs: seq[Future[T]]
proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] =
var futs: seq[F]
for fut in args:
futs &= fut
proc call() {.async.} =
Expand Down
13 changes: 9 additions & 4 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0

pubSubPeer.stopSendNonPriorityTask()

procCall FloodSub(g).unsubscribePeer(peer)

proc handleSubscribe*(g: GossipSub,
Expand Down Expand Up @@ -303,7 +305,10 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
RPCMsg(control: some(respControl)), true)
g.send(
peer,
RPCMsg(messages: messages), false)

proc validateAndRelay(g: GossipSub,
msg: Message,
Expand Down Expand Up @@ -370,7 +375,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
Expand Down Expand Up @@ -441,7 +446,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
Expand Down Expand Up @@ -655,7 +660,7 @@ method publish*(g: GossipSub,

g.mcache.put(msgId, msg)

g.broadcast(peers, RPCMsg(messages: @[msg]))
g.broadcast(peers, RPCMsg(messages: @[msg]), true)

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
Expand Down
11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,18 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to remote peer
##

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [].} =
msg: RPCMsg,
isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to the given peers

let npeers = sendPeers.len.int64
Expand Down Expand Up @@ -195,12 +196,12 @@ proc broadcast*(

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
p.send(peer, msg, isHighPriority)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded)
asyncSpawn peer.sendEncoded(encoded, isHighPriority)

proc sendSubs*(p: PubSub,
peer: PubSubPeer,
Expand Down
113 changes: 87 additions & 26 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import rpc/[messages, message, protobuf],
../../stream/connection,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
../../utility,
../../utils/future

export peerid, connection, deques

Expand All @@ -31,6 +32,9 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

type
PeerRateLimitError* = object of CatchableError

Expand All @@ -49,6 +53,11 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

RpcMessageQueue* = ref object
sendPriorityQueue: Deque[Future[void]]
nonPriorityQueue: AsyncQueue[seq[byte]]
sendNonPriorityTask: Future[void]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
Expand All @@ -70,6 +79,8 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

rpcmessagequeue: RpcMessageQueue

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}

Expand All @@ -82,6 +93,16 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"

func hash*(p: PubSubPeer): Hash =
p.peerId.hash

Expand Down Expand Up @@ -227,17 +248,7 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])

proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
Expand All @@ -262,6 +273,27 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

if isHighPriority:
p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)

iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
Expand Down Expand Up @@ -297,7 +329,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
Expand All @@ -317,11 +349,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
await p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)
await p.sendEncoded(encoded, isHighPriority)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
Expand All @@ -330,6 +362,43 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false

proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[0]
p.clearSendPriorityQueue()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()

proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
)

proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
Expand All @@ -346,17 +415,9 @@ proc new*(
peerId: peerId,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt
overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(),
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
result.startSendNonPriorityTask()
12 changes: 6 additions & 6 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,12 @@ suite "GossipSub":
var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])

check:
"foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub
gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId)
not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId)
gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId)
checkExpiring:
"foobar" in gossip1.gossipsub and
"foobar" in gossip2.gossipsub and
gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) and
not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) and
gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) and
not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId)

await allFuturesThrowing(
Expand Down

0 comments on commit 23dc3a9

Please sign in to comment.