Skip to content

Commit

Permalink
WIP: Working ranking of peers
Browse files Browse the repository at this point in the history
DeltaQ metrics is only available for our warm and hot peers that also
have us as hot. So a fraction of all downstream clients will have a
metric.

This change the ranking of peers to use simple scoring system. Deliver a
new TX before in time before it gets into the block gives you one point.
Delivering a TXs thats already in the mempool, is invalid, or fail
because it was included in a recent blocks gives you a penalty.
  • Loading branch information
karknu committed Oct 8, 2024
1 parent a9cd34d commit a3de87b
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
them $ \api -> do
let server = txSubmissionInboundV2
txSubmissionInboundTracer
(getMempoolReader mempool)
(getMempoolWriter mempool)
api
labelThisThread "TxSubmissionServer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ runTxSubmission tracer tracerTxLogic state txDecisionPolicy = do
(getMempoolReader inboundMempool)
addr $ \api -> do
let server = txSubmissionInboundV2 verboseTracer
(getMempoolReader inboundMempool)
(getMempoolWriter inboundMempool)
api
runPipelinedPeerWithLimits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mkArbPeerTxState mempoolHasTxFun txIdsInflight unacked txMaskMap =
requestedTxIdsInflight,
requestedTxsInflight,
requestedTxsInflightSize,
unknownTxs }
unknownTxs,
rejectedTxs = 0,
fetchedTxs = Set.empty }
(Set.fromList $ Map.elems inflightMap)
bufferedMap
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)

Expand Down Expand Up @@ -314,13 +315,18 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
traceWith tracer $
TraceTxSubmissionCollected collected

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs txsReady

!end <- getMonotonicTime
let duration = diffTime end start
traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
let !accepted = length txidsAccepted

traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = collected - accepted
, ptxcScore = 0 -- This implementatin does not track score
}

continueWithStateM (serverIdle n) st {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,40 @@ makeDecisions
, Map peeraddr (TxDecision txid tx)
)
makeDecisions policy SharedDecisionContext {
sdcPeerGSV = peerGSV,
sdcPeerGSV = _peerGSV,
sdcSharedTxState = st
}
= fn
. pickTxsToDownload policy st
. orderByDeltaQ peerGSV
. orderByRejections
where
fn :: forall a.
(a, [(peeraddr, TxDecision txid tx)])
-> (a, Map peeraddr (TxDecision txid tx))
fn (a, as) = (a, Map.fromList as)


-- | Order peers by how useful the TXs they have provided are.
--
-- TXs delivered late will fail to apply because they where included in
-- a recently adopted block. Peers can race against each other by setting
-- `txInflightMultiplicity` to > 1.
--
-- TODO: Should not depend on plain `peeraddr` as a tie breaker.
orderByRejections :: Map peeraddr (PeerTxState txid tx)
-> [ (peeraddr, PeerTxState txid tx)]
orderByRejections =
sortOn (\(_peeraddr, ps) -> rejectedTxs ps)
. Map.toList

-- | Order peers by `DeltaQ`.
--
orderByDeltaQ :: forall peeraddr txid tx.
_orderByDeltaQ :: forall peeraddr txid tx.
Ord peeraddr
=> Map peeraddr PeerGSV
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
orderByDeltaQ dq =
_orderByDeltaQ dq =
sortOn (\(peeraddr, _) ->
gsvRequestResponseDuration
(Map.findWithDefault defaultGSV peeraddr dq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ defaultTxDecisionPolicy =
maxUnacknowledgedTxIds = 10, -- must be the same as txSubmissionMaxUnacked
txsSizeInflightPerPeer = max_TX_SIZE * 6,
maxTxsSizeInflight = max_TX_SIZE * 20,
txInflightMultiplicity = 1
txInflightMultiplicity = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Data.Foldable (traverse_
, foldl'
#endif
)
import Data.Functor (void)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
Expand Down Expand Up @@ -75,8 +76,14 @@ data PeerTxAPI m txid tx = PeerTxAPI {
-- ^ requested txids
-> Map txid tx
-- ^ received txs
-> m ()
-> m (),
-- ^ handle received txs

countRejectedTxs :: Int
-> m Int,

consumeFetchedTxs :: Set txid
-> m (Set txid)
}


Expand Down Expand Up @@ -123,7 +130,9 @@ withPeer tracer
( TxChannels { txChannelMap = txChannelMap' }
, PeerTxAPI { readTxDecision = takeMVar chann',
handleReceivedTxIds,
handleReceivedTxs }
handleReceivedTxs,
countRejectedTxs,
consumeFetchedTxs }
)

atomically $ modifyTVar sharedStateVar registerPeer
Expand Down Expand Up @@ -151,7 +160,9 @@ withPeer tracer
requestedTxsInflightSize = 0,
requestedTxsInflight = Set.empty,
unacknowledgedTxIds = StrictSeq.empty,
unknownTxs = Set.empty }
unknownTxs = Set.empty,
rejectedTxs = 0,
fetchedTxs = Set.empty }
peerTxStates
}

Expand Down Expand Up @@ -210,8 +221,43 @@ withPeer tracer
-> Map txid tx
-- ^ received txs
-> m ()
handleReceivedTxs txids txs =
handleReceivedTxs txids txs = do
void $ atomically $ modifyTVar sharedStateVar addFethed
collectTxs tracer sharedStateVar peeraddr txids txs
where
addFethed :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
addFethed st@SharedTxState { peerTxStates } =
let peerTxStates' = Map.update (\ps -> Just $! ps { fetchedTxs = Set.union (fetchedTxs ps) txids }) peeraddr peerTxStates in
st {peerTxStates = peerTxStates' }

countRejectedTxs :: Int
-> m Int
countRejectedTxs n = atomically $ do
modifyTVar sharedStateVar cntRejects
st <- readTVar sharedStateVar
case Map.lookup peeraddr (peerTxStates st) of
Nothing -> error "missing peer updated"
Just ps -> return $ rejectedTxs ps
where
cntRejects :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
cntRejects st@SharedTxState { peerTxStates } =
let peerTxStates' = Map.update (\ps -> Just $! ps { rejectedTxs = min 42 (max (-42) (rejectedTxs ps + n)) }) peeraddr peerTxStates in
st {peerTxStates = peerTxStates'}

consumeFetchedTxs :: Set txid
-> m (Set txid)
consumeFetchedTxs otxids = atomically $ do
st <- readTVar sharedStateVar
case Map.lookup peeraddr (peerTxStates st) of
Nothing -> error "missing peer in consumeFetchedTxs"
Just ps -> do
let o = Set.intersection (fetchedTxs ps) otxids
r = Set.difference (fetchedTxs ps) otxids
st' = st { peerTxStates = Map.update (\ps' -> Just $! ps' { fetchedTxs = r }) peeraddr (peerTxStates st) }
writeTVar sharedStateVar st'
return o


decisionLogicThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined

import Control.Monad (unless)
import Control.Monad (unless, when)
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
import Ouroboros.Network.TxSubmission.Inbound.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader

-- | Flag to enable/disable the usage of the new tx submission protocol
--
Expand All @@ -48,19 +49,25 @@ txSubmissionInboundV2
, Ord txid
)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
tracer
TxSubmissionMempoolReader{
mempoolGetSnapshot
}
TxSubmissionMempoolWriter {
txId,
mempoolAddTxs
}
PeerTxAPI {
readTxDecision,
handleReceivedTxIds,
handleReceivedTxs
handleReceivedTxs,
countRejectedTxs,
consumeFetchedTxs
}
=
TxSubmissionServerPipelined serverIdle
Expand All @@ -73,23 +80,52 @@ txSubmissionInboundV2
<- readTxDecision
traceWith tracer (TraceTxInboundDecision txd)

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs txs
!end <- getMonotonicTime
let duration = diffTime end start

traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration

let !collected = length txs
let !accepted = length txidsAccepted
traceWith tracer $
TraceTxSubmissionCollected collected

traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = collected - accepted
}
mpSnapshot <- atomically mempoolGetSnapshot
let receivedL = [ (txId tx, tx) | tx <- txs ]
fetchedSet <- consumeFetchedTxs (Set.fromList (map fst receivedL))

-- Only attempt to add TXs if we actually has fetched some.
when (not $ Set.null fetchedSet) $ do
let fetched = filter
(\(txid, _) -> Set.member txid fetchedSet)
receivedL
fetchedS = Set.fromList $ map fst fetched

-- Note that checking if the mempool contains a TX before
-- spending several ms attempting to add it to the pool has
-- been judged immoral.
let fresh = filter
(\(txid, _) -> not $ mempoolHasTx mpSnapshot txid)
receivedL

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs $ map snd fresh
!end <- getMonotonicTime
let duration = diffTime end start

let acceptedS = Set.fromList txidsAccepted
acceptedFetched = Set.intersection fetchedS acceptedS
!accepted = Set.size acceptedFetched
!rejected = Set.size fetchedS - accepted

traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
traceWith tracer $
TraceTxSubmissionCollected collected

-- Accepted TXs are discounted from rejected.
--
-- The number of rejected TXs may be too high.
-- The reason for that is that any peer which has downloaded a
-- TX is permitted to add TXs for all TXids hit has offered.
-- This is done to preserve TX ordering.
!s <- countRejectedTxs (rejected - accepted) -- accepted TXs are discounted
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = rejected
, ptxcScore = s
}

-- TODO:
-- We can update the state so that other `tx-submission` servers will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ data PeerTxState txid tx = PeerTxState {
-- since that could potentially lead to corrupting the node, not being
-- able to download a `tx` which is needed & available from other nodes.
--
unknownTxs :: !(Set txid)
unknownTxs :: !(Set txid),

rejectedTxs :: !Int,

fetchedTxs :: !(Set txid)
}
deriving (Eq, Show, Generic)

Expand Down Expand Up @@ -259,6 +263,7 @@ data ProcessedTxCount = ProcessedTxCount {
ptxcAccepted :: Int
-- | Just rejected this many transactions.
, ptxcRejected :: Int
, ptxcScore :: Int
}
deriving (Eq, Show)

Expand Down

0 comments on commit a3de87b

Please sign in to comment.