Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluffy: Portal subnetwork peer ban list #3007

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ proc validateContent(

debug "Received offered content validated successfully", srcNodeId, contentKey
else:
if srcNodeId.isSome():
n.portalProtocol.banNode(srcNodeId.get(), NodeBanDurationOfferFailedValidation)
debug "Received offered content failed validation",
srcNodeId, contentKey, error = validation.error
return false
Expand Down
13 changes: 13 additions & 0 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ proc getVerifiedBlockHeader*(
return Opt.none(Header)

header = validateCanonicalHeaderBytes(headerContent.content, id, n.accumulator).valueOr:
n.portalProtocol.banNode(
headerContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
)
warn "Validation of block header failed",
error = error, node = headerContent.receivedFrom.record.toURI()
continue
Expand Down Expand Up @@ -191,6 +194,9 @@ proc getBlockBody*(
return Opt.none(BlockBody)

body = validateBlockBodyBytes(bodyContent.content, header).valueOr:
n.portalProtocol.banNode(
bodyContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
)
warn "Validation of block body failed",
error, node = bodyContent.receivedFrom.record.toURI()
continue
Expand Down Expand Up @@ -265,7 +271,11 @@ proc getReceipts*(
receiptsContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr:
debug "Failed fetching receipts from the network"
return Opt.none(seq[Receipt])

receipts = validateReceiptsBytes(receiptsContent.content, header.receiptsRoot).valueOr:
n.portalProtocol.banNode(
receiptsContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
)
warn "Validation of receipts failed",
error, node = receiptsContent.receivedFrom.record.toURI()
continue
Expand Down Expand Up @@ -380,6 +390,9 @@ proc validateContent(

debug "Received offered content validated successfully", srcNodeId, contentKey
else:
if srcNodeId.isSome():
n.portalProtocol.banNode(srcNodeId.get(), NodeBanDurationOfferFailedValidation)

debug "Received offered content failed validation",
srcNodeId, contentKey, error = res.error
return false
Expand Down
9 changes: 8 additions & 1 deletion fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -109,6 +109,9 @@ proc getContent(
continue

validateRetrieval(key, contentValue).isOkOr:
n.portalProtocol.banNode(
lookupRes.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
)
error "Validation of retrieved state content failed"
continue

Expand Down Expand Up @@ -243,6 +246,10 @@ proc processContentLoop(n: StateNetwork) {.async: (raises: []).} =
debug "Received offered content validated successfully",
srcNodeId, contentKeyBytes
else:
if srcNodeId.isSome():
n.portalProtocol.banNode(
srcNodeId.get(), NodeBanDurationOfferFailedValidation
)
state_network_offers_failed.inc(labelValues = [$n.portalProtocol.protocolId])
error "Received offered content failed validation",
srcNodeId, contentKeyBytes, error = offerRes.error()
Expand Down
69 changes: 56 additions & 13 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ const
## value in milliseconds
initialLookups = 1 ## Amount of lookups done when populating the routing table

## Ban durations for banned nodes in the routing table
NodeBanDurationInvalidResponse = 15.minutes
NodeBanDurationNoResponse = 5.minutes
NodeBanDurationContentLookupFailedValidation* = 60.minutes
NodeBanDurationOfferFailedValidation* = 60.minutes

type
ToContentIdHandler* =
proc(contentKey: ContentKeyByteList): results.Opt[ContentId] {.raises: [], gcsafe.}
Expand Down Expand Up @@ -284,6 +290,12 @@ func getProtocolId*(
of PortalSubnetwork.transactionGossip:
[portalPrefix, 0x4F]

template banNode*(p: PortalProtocol, nodeId: NodeId, period: chronos.Duration) =
p.routingTable.banNode(nodeId, period)

template isBanned*(p: PortalProtocol, nodeId: NodeId): bool =
p.routingTable.isBanned(nodeId)

func `$`(id: PortalProtocolId): string =
id.toHex()

Expand All @@ -299,8 +311,10 @@ func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
func localNode*(p: PortalProtocol): Node =
p.baseProtocol.localNode

func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
template neighbours*(
p: PortalProtocol, id: NodeId, k: int = BUCKET_SIZE, seenOnly = false
): seq[Node] =
p.routingTable.neighbours(id, k, seenOnly)

func distance(p: PortalProtocol, a, b: NodeId): UInt256 =
p.routingTable.distance(a, b)
Expand Down Expand Up @@ -437,7 +451,7 @@ proc handleFindContent(
# Node does not have the content, or content is not even in radius,
# send closest neighbours to the requested content id.
let
closestNodes = p.routingTable.neighbours(NodeId(contentId), seenOnly = true)
closestNodes = p.neighbours(NodeId(contentId), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])

Expand Down Expand Up @@ -514,6 +528,12 @@ proc messageHandler(

let p = PortalProtocol(protocol)

if p.isBanned(srcId):
# The sender of the message is in the temporary node ban list
# so we don't process the message
debug "Ignoring message from banned node", srcId, srcUdpAddress
return @[]

let decoded = decodeMessage(request)
if decoded.isOk():
let message = decoded.get()
Expand Down Expand Up @@ -702,6 +722,9 @@ proc recordsFromBytes(rawRecords: List[ByteList[2048], 32]): PortalResult[seq[Re
proc ping*(
p: PortalProtocol, dst: Node
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
if p.isBanned(dst.id):
return err("destination node is banned")

let pongResponse = await p.pingImpl(dst)

if pongResponse.isOk():
Expand All @@ -724,12 +747,16 @@ proc ping*(
proc findNodes*(
p: PortalProtocol, dst: Node, distances: seq[uint16]
): Future[PortalResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
if p.isBanned(dst.id):
return err("destination node is banned")

let nodesMessage = await p.findNodesImpl(dst, List[uint16, 256](distances))
if nodesMessage.isOk():
let records = recordsFromBytes(nodesMessage.get().enrs)
if records.isOk():
# TODO: distance function is wrong here for state, fix + tests
return ok(verifyNodesRecords(records.get(), dst, enrsResultLimit, distances))
let res = verifyNodesRecords(records.get(), dst, enrsResultLimit, distances)
return ok(res.filterIt(not p.isBanned(it.id)))
else:
return err(records.error)
else:
Expand All @@ -742,6 +769,9 @@ proc findContent*(
node = dst
contentKey

if p.isBanned(dst.id):
return err("destination node is banned")

let contentMessageResponse = await p.findContentImpl(dst, contentKey)

if contentMessageResponse.isOk():
Expand Down Expand Up @@ -876,6 +906,9 @@ proc offer(
contentKeys.len().int64, labelValues = [$p.protocolId]
)

if p.isBanned(o.dst.id):
return err("destination node is banned")

let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys)

if acceptMessageResponse.isOk():
Expand Down Expand Up @@ -1029,7 +1062,7 @@ proc lookup*(
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
var closestNodes = p.neighbours(target, BUCKET_SIZE, seenOnly = false)

var asked, seen = HashSet[NodeId]()
asked.incl(p.localNode.id) # No need to ask our own node
Expand Down Expand Up @@ -1131,7 +1164,7 @@ proc contentLookup*(
## target.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false)

# Shuffling the order of the nodes in order to not always hit the same node
# first for the same request.
Expand Down Expand Up @@ -1204,7 +1237,8 @@ proc contentLookup*(
of Nodes:
let maybeRadius = p.radiusCache.get(content.src.id)
if maybeRadius.isSome() and
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId) and
not p.isBanned(content.src.id):
# Only return nodes which may be interested in content.
# No need to check for duplicates in nodesWithoutContent
# as requests are never made two times to the same node.
Expand Down Expand Up @@ -1257,7 +1291,7 @@ proc traceContentLookup*(
# Need to use a system clock and not the mono clock for this.
let startedAtMs = int64(times.epochTime() * 1000)

var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
var closestNodes = p.neighbours(targetId, BUCKET_SIZE, seenOnly = false)
# Shuffling the order of the nodes in order to not always hit the same node
# first for the same request.
p.baseProtocol.rng[].shuffle(closestNodes)
Expand Down Expand Up @@ -1348,7 +1382,8 @@ proc traceContentLookup*(

let maybeRadius = p.radiusCache.get(content.src.id)
if maybeRadius.isSome() and
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId) and
not p.isBanned(content.src.id):
# Only return nodes which may be interested in content.
# No need to check for duplicates in nodesWithoutContent
# as requests are never made two times to the same node.
Expand All @@ -1362,7 +1397,7 @@ proc traceContentLookup*(
metadata["0x" & $n.id] = NodeMetadata(enr: n.record, distance: dist)
respondedWith.add(n.id)

if not seen.containsOrIncl(n.id):
if not seen.containsOrIncl(n.id) and not p.isBanned(n.id):
discard p.addNode(n)
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(
Expand Down Expand Up @@ -1455,7 +1490,7 @@ proc query*(
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = p.routingTable.neighbours(target, k, seenOnly = false)
var queryBuffer = p.neighbours(target, k, seenOnly = false)

var asked, seen = HashSet[NodeId]()
asked.incl(p.localNode.id) # No need to ask our own node
Expand Down Expand Up @@ -1543,8 +1578,7 @@ proc neighborhoodGossip*(
# table, but at the same time avoid unnecessary node lookups.
# It might still cause issues in data getting propagated in a wider id range.

let closestLocalNodes =
p.routingTable.neighbours(NodeId(contentId), k = 16, seenOnly = true)
let closestLocalNodes = p.neighbours(NodeId(contentId), BUCKET_SIZE, seenOnly = true)

var gossipNodes: seq[Node]
for node in closestLocalNodes:
Expand Down Expand Up @@ -1748,6 +1782,9 @@ proc refreshLoop(p: PortalProtocol) {.async: (raises: []).} =
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in routing table", total = p.routingTable.len()

# Remove the expired bans from routing table to limit memory usage
p.routingTable.cleanupExpiredBans()

await sleepAsync(refreshInterval)
except CancelledError:
trace "refreshLoop canceled"
Expand Down Expand Up @@ -1800,6 +1837,12 @@ proc resolve*(
if id == p.localNode.id:
return Opt.some(p.localNode)

# No point in trying to resolve a banned node because it won't exist in the
# routing table and it will be filtered out of any respones in the lookup call
if p.isBanned(id):
debug "Not resolving banned node", nodeId = id
return Opt.none(Node)

let node = p.getNode(id)
if node.isSome():
let nodesMessage = await p.findNodes(node.get(), @[0'u16])
Expand Down
9 changes: 9 additions & 0 deletions fluffy/rpc/rpc_portal_state_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
of Content:
let valueBytes = foundContentResult.content
validateRetrieval(key, valueBytes).isOkOr:
p.banNode(node.id, NodeBanDurationContentLookupFailedValidation)
raise invalidValueErr()

let res = ContentInfo(
Expand Down Expand Up @@ -97,6 +98,10 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
valueBytes = contentLookupResult.content

validateRetrieval(key, valueBytes).isOkOr:
p.banNode(
contentLookupResult.receivedFrom.id,
NodeBanDurationContentLookupFailedValidation,
)
raise invalidValueErr()
p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true)

Expand Down Expand Up @@ -132,6 +137,10 @@ proc installPortalStateApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
raise contentNotFoundErrWithTrace(data)

validateRetrieval(key, valueBytes).isOkOr:
if res.trace.receivedFrom.isSome():
p.banNode(
res.trace.receivedFrom.get(), NodeBanDurationContentLookupFailedValidation
)
raise invalidValueErr()
p.storeContent(keyBytes, contentId, valueBytes, cacheContent = true)

Expand Down
Loading