Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TX Submission Logic #4887

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ccadb72
tx-submission: PeerTxState & SharedTxState
coot Nov 21, 2023
fad9fc9
tx-submission: decision logic
coot Dec 8, 2023
f96d485
tx-submission: generalised CollectPipelined
coot Feb 23, 2024
d25a918
tx-submission: registry
coot Mar 13, 2024
308b78b
tx-submission: inbound peer using tx-submission decision logic
coot Jul 15, 2024
2cc9275
ouroboros-network: tx-submission module structure
coot Jul 22, 2024
5a3352a
tx-submission: debug tracer for SharedTxState
coot Jul 30, 2024
6dd38ca
Refactor TxSubmission files and added V2 sim
bolt12 Jul 22, 2024
dc397bd
Fixed ArbTxDecisionPolicy generator
bolt12 Sep 10, 2024
38d3cec
New txSubmissionV2 simulation
bolt12 Sep 11, 2024
2ee96b9
Drop V2 in internal APIs
coot Sep 17, 2024
a83025d
Fix race condition when producing the right policy
bolt12 Sep 11, 2024
98b05fe
tx-submission: defaultTxDecisionPolicy
coot Sep 17, 2024
38e552f
Added test that checks tx multiplicities
bolt12 Sep 11, 2024
cfdea9e
Integrates txSubmissionV2 in testnet diffusion sim
bolt12 Sep 11, 2024
ef42d57
tx-submission: added DebugTxLogic tracer
coot Sep 16, 2024
547c6ab
tx-submission: label TVars in tests
coot Sep 16, 2024
fb125d5
tx-submission: use strict STM in tests
coot Sep 16, 2024
2261c27
tx-submission: refactored test
coot Sep 17, 2024
e6aacf6
tx-submission: put common types in one place
coot Sep 17, 2024
24771e7
tx-submission: compile with ghc < 9.10
coot Sep 18, 2024
76181ea
Update CHaP and hackage
bolt12 Sep 18, 2024
f98ae26
Move TxDecisionPolicy to MiniProtocolParamenters
bolt12 Sep 19, 2024
ffa9189
Added EnableNewTxSubmissionProtocol flag
bolt12 Sep 19, 2024
07c00d8
tx-submission: verify tx sizes
coot Sep 23, 2024
5a865fa
Deriving Eq Show from EnableNewTxSubmissionProtocol
bolt12 Sep 25, 2024
b472f0d
Send TraceTxSubmissionProcessed for the new TX submission
karknu Oct 2, 2024
067adaa
Add duration to TxInboundAddedToMempool
karknu Oct 5, 2024
99d7f30
tx-submission: working ranking of peers
karknu Oct 4, 2024
6bff577
tx-submission: TX peer ranking
karknu Oct 14, 2024
7a33b77
fixup! tx-submission: TX peer ranking
coot Feb 6, 2025
6a1bdd9
WIP: bind responder threads to the lower cores
coot Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ serverWorker bearer = do
putStrLn $ "Result: " ++ show result
Mx.stop mux

Mx.run nullTracer mux bearer
Mx.run nullTracer 1 mux bearer
where
ptcls :: [MiniProtocolInfo ResponderMode]
ptcls = [ MiniProtocolInfo {
Expand Down Expand Up @@ -192,7 +192,7 @@ clientWorker bearer n msg = do
putStrLn $ "Result: " ++ show result
Mx.stop mux

Mx.run nullTracer mux bearer
Mx.run nullTracer 0 mux bearer
where
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
ptcls = [ MiniProtocolInfo {
Expand Down
53 changes: 44 additions & 9 deletions network-mux/src/Control/Concurrent/JobPool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Control.Concurrent.JobPool
, Job (..)
, withJobPool
, forkJob
, forkJobOn
, readSize
, readGroupSize
, waitForJob
Expand All @@ -29,6 +30,9 @@ import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadThread (..))
import Control.Monad.Class.MonadThrow

import Control.Concurrent (getNumCapabilities)
import System.IO.Unsafe (unsafePerformIO)

-- | JobPool allows to submit asynchronous jobs, wait for their completion or
-- cancel. Jobs are grouped, each group can be cancelled separately.
--
Expand Down Expand Up @@ -69,16 +73,18 @@ withJobPool =
jobs <- readTVarIO jobsVar
mapM_ uninterruptibleCancel jobs

forkJob :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> JobPool group m a
-> Job group m a
-> m ()
forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =

forkJob' :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> (m () -> m (Async m ()))
-> JobPool group m a
-> Job group m a
-> m ()
forkJob' doFork JobPool{jobsVar, completionQueue} (Job action handler group label) =
mask $ \restore -> do
jobAsync <- async $ do
jobAsync <- doFork $ do
tid <- myThreadId
io tid restore
`onException`
Expand All @@ -104,6 +110,35 @@ forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =
restore action
atomically $ writeTQueue completionQueue res



forkJob :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> JobPool group m a
-> Job group m a
-> m ()
forkJob = forkJob' async


forkJobOn :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> Int
-> JobPool group m a
-> Job group m a
-> m ()
forkJobOn c = forkJob' (asyncOn limitCapability)
where
limitCapability :: Int
limitCapability =
-- TODO: add `getNumCapabilities` to `MonadFork`
let sysCap = unsafePerformIO getNumCapabilities in
c `mod` (max 1 $ sysCap - 2)


readSize :: MonadSTM m => JobPool group m a -> STM m Int
readSize JobPool{jobsVar} = Map.size <$> readTVar jobsVar

Expand Down
13 changes: 8 additions & 5 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ run :: forall m mode.
, MonadMask m
)
=> Tracer m Trace
-> Int
-> Mux mode m
-> Bearer m
-> m ()
run tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer {name} = do
run tracer peerHash Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer{name} = do
egressQueue <- atomically $ newTBQueue 100

-- label shared variables
Expand All @@ -231,7 +232,8 @@ run tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer {
-- Wait for someone to shut us down by calling muxStop or an error.
-- Outstanding jobs are shut down Upon completion of withJobPool.
withTimeoutSerial $ \timeout ->
monitor tracer
monitor peerHash
tracer
timeout
jobpool
egressQueue
Expand Down Expand Up @@ -375,14 +377,15 @@ monitor :: forall mode m.
, Alternative (STM m)
, MonadThrow (STM m)
)
=> Tracer m Trace
=> Int
-> Tracer m Trace
-> TimeoutFn m
-> JobPool.JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
monitor peerHash tracer timeout jobpool egressQueue cmdQueue muxStatus =
go (MonitorCtx Map.empty)
where
go :: MonitorCtx m mode -> m ()
Expand Down Expand Up @@ -451,7 +454,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
ptclAction) -> do
traceWith tracer (TraceStartEagerly miniProtocolNum
(protocolDirEnum miniProtocolDir))
JobPool.forkJob jobpool $
JobPool.forkJobOn peerHash jobpool $
miniProtocolJob
tracer
egressQueue
Expand Down
44 changes: 22 additions & 22 deletions network-mux/test/Test/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ prop_mux_snd_recv (DummyRun messages) = ioProperty $ do

serverMux <- Mx.new [serverApp]

withAsync (Mx.run clientTracer clientMux clientBearer) $ \clientAsync ->
withAsync (Mx.run serverTracer serverMux serverBearer) $ \serverAsync -> do
withAsync (Mx.run clientTracer 0 clientMux clientBearer) $ \clientAsync ->
withAsync (Mx.run serverTracer 0 serverMux serverBearer) $ \serverAsync -> do

r <- step clientMux clientApp serverMux serverApp messages
Mx.stop serverMux
Expand Down Expand Up @@ -434,10 +434,10 @@ prop_mux_snd_recv_bi (DummyRun messages) = ioProperty $ do


clientMux <- Mx.new clientApps
clientAsync <- async $ Mx.run clientTracer clientMux clientBearer
clientAsync <- async $ Mx.run clientTracer 0 clientMux clientBearer

serverMux <- Mx.new serverApps
serverAsync <- async $ Mx.run serverTracer serverMux serverBearer
serverAsync <- async $ Mx.run serverTracer 1 serverMux serverBearer

r <- step clientMux clientApps serverMux serverApps messages
Mx.stop clientMux
Expand Down Expand Up @@ -541,7 +541,7 @@ prop_mux_snd_recv_compat messages = ioProperty $ do
)

-- Wait for the first MuxApplication to finish, then stop the mux.
withAsync (Mx.run clientTracer clientMux clientBearer) $ \aid -> do
withAsync (Mx.run clientTracer 0 clientMux clientBearer) $ \aid -> do
_ <- atomically res
Mx.stop clientMux
wait aid
Expand All @@ -559,7 +559,7 @@ prop_mux_snd_recv_compat messages = ioProperty $ do
)

-- Wait for the first MuxApplication to finish, then stop the mux.
withAsync (Mx.run serverTracer serverMux serverBearer) $ \aid -> do
withAsync (Mx.run serverTracer 1 serverMux serverBearer) $ \aid -> do
_ <- atomically res
Mx.stop serverMux
wait aid
Expand Down Expand Up @@ -719,7 +719,7 @@ runMuxApplication initApps initBearer respApps respBearer = do
respMux <- Mx.new $ map (\(pn,_) ->
MiniProtocolInfo (Mx.MiniProtocolNum pn) Mx.ResponderDirectionOnly defaultMiniProtocolLimits)
respApps'
respAsync <- async $ Mx.run serverTracer respMux respBearer
respAsync <- async $ Mx.run serverTracer 1 respMux respBearer
getRespRes <- sequence [ Mx.runMiniProtocol
respMux
(Mx.MiniProtocolNum pn)
Expand All @@ -732,7 +732,7 @@ runMuxApplication initApps initBearer respApps respBearer = do
initMux <- Mx.new $ map (\(pn,_) ->
MiniProtocolInfo (Mx.MiniProtocolNum pn) Mx.InitiatorDirectionOnly defaultMiniProtocolLimits)
initApps'
initAsync <- async $ Mx.run clientTracer initMux initBearer
initAsync <- async $ Mx.run clientTracer 0 initMux initBearer
getInitRes <- sequence [ Mx.runMiniProtocol
initMux
(Mx.MiniProtocolNum pn)
Expand Down Expand Up @@ -952,17 +952,17 @@ prop_mux_starvation (Uneven response0 response1) =
}

serverMux <- Mx.new [serverApp2, serverApp3]
serverMux_aid <- async $ Mx.run serverTracer serverMux serverBearer
serverMux_aid <- async $ Mx.run serverTracer 0 serverMux serverBearer
serverRes2 <- Mx.runMiniProtocol serverMux (miniProtocolNum serverApp2) (miniProtocolDir serverApp2)
Mx.StartOnDemand server_short
serverRes3 <- Mx.runMiniProtocol serverMux (miniProtocolNum serverApp3) (miniProtocolDir serverApp3)
Mx.StartOnDemand server_long

clientMux <- Mx.new [clientApp2, clientApp3]
clientMux_aid <- async $ Mx.run (clientTracer <> headerTracer) clientMux clientBearer
clientRes2 <- Mx.runMiniProtocol clientMux (Mx.miniProtocolNum clientApp2) (Mx.miniProtocolDir clientApp2)
clientMux_aid <- async $ Mx.run (clientTracer <> headerTracer) 1 clientMux clientBearer
clientRes2 <- Mx.runMiniProtocol clientMux (miniProtocolNum clientApp2) (miniProtocolDir clientApp2)
Mx.StartEagerly client_short
clientRes3 <- Mx.runMiniProtocol clientMux (Mx.miniProtocolNum clientApp3) (Mx.miniProtocolDir clientApp3)
clientRes3 <- Mx.runMiniProtocol clientMux (miniProtocolNum clientApp3) (miniProtocolDir clientApp3)
Mx.StartEagerly client_long


Expand Down Expand Up @@ -1157,7 +1157,7 @@ prop_demux_sdu a = do
serverRes <- Mx.runMiniProtocol serverMux (Mx.miniProtocolNum serverApp) (Mx.miniProtocolDir serverApp)
Mx.StartEagerly server_mp

said <- async $ Mx.run serverTracer serverMux serverBearer
said <- async $ Mx.run serverTracer 1 serverMux serverBearer
return (server_r, said, serverRes, serverMux)

-- Server that expects to receive a specific ByteString.
Expand Down Expand Up @@ -1432,7 +1432,7 @@ prop_mux_restart_m (DummyRestartingInitiatorApps apps) = do
let minis = map (appToInfo Mx.InitiatorDirectionOnly . fst) apps

mux <- Mx.new minis
mux_aid <- async $ Mx.run nullTracer mux bearer
mux_aid <- async $ Mx.run nullTracer 0 mux bearer
getRes <- sequence [ Mx.runMiniProtocol
mux
(daNum $ fst app)
Expand Down Expand Up @@ -1479,7 +1479,7 @@ prop_mux_restart_m (DummyRestartingResponderApps rapps) = do
minis = map (appToInfo Mx.ResponderDirectionOnly) apps

mux <- Mx.new minis
mux_aid <- async $ Mx.run nullTracer mux bearer
mux_aid <- async $ Mx.run nullTracer 1 mux bearer
getRes <- sequence [ Mx.runMiniProtocol
mux
(daNum $ fst app)
Expand Down Expand Up @@ -1528,7 +1528,7 @@ prop_mux_restart_m (DummyRestartingInitiatorResponderApps rapps) = do
respMinis = map (appToInfo Mx.ResponderDirection) apps

mux <- Mx.new $ initMinis ++ respMinis
mux_aid <- async $ Mx.run nullTracer mux bearer
mux_aid <- async $ Mx.run nullTracer 1 mux bearer
getInitRes <- sequence [ Mx.runMiniProtocol
mux
(daNum $ fst app)
Expand Down Expand Up @@ -1603,7 +1603,7 @@ prop_mux_start_m bearer _ checkRes (DummyInitiatorApps apps) runTime = do
minRunTime = minimum $ runTime : (map daRunTime $ filter (\app -> daAction app == DummyAppFail) apps)

mux <- Mx.new minis
mux_aid <- async $ Mx.run nullTracer mux bearer
mux_aid <- async $ Mx.run nullTracer 0 mux bearer
killer <- async $ (threadDelay runTime) >> Mx.stop mux
getRes <- sequence [ Mx.runMiniProtocol
mux
Expand All @@ -1624,7 +1624,7 @@ prop_mux_start_m bearer trigger checkRes (DummyResponderApps apps) runTime = do
minRunTime = minimum $ runTime : (map (\a -> daRunTime a + daStartAfter a) $ filter (\app -> daAction app == DummyAppFail) apps)

mux <- Mx.new minis
mux_aid <- async $ Mx.run verboseTracer mux bearer
mux_aid <- async $ Mx.run verboseTracer 0 mux bearer
getRes <- sequence [ Mx.runMiniProtocol
mux
(daNum app)
Expand All @@ -1650,7 +1650,7 @@ prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runT
let minis = map (appToInfo Mx.ResponderDirectionOnly) apps

mux <- Mx.new minis
mux_aid <- async $ Mx.run verboseTracer mux bearer
mux_aid <- async $ Mx.run verboseTracer 1 mux bearer
getRes <- sequence [ Mx.runMiniProtocol
mux
(daNum app)
Expand All @@ -1673,7 +1673,7 @@ prop_mux_start_m bearer trigger checkRes (DummyInitiatorResponderApps apps) runT
minRunTime = minimum $ runTime : (map (\a -> daRunTime a) $ filter (\app -> daAction app == DummyAppFail) apps)

mux <- Mx.new $ initMinis ++ respMinis
mux_aid <- async $ Mx.run verboseTracer mux bearer
mux_aid <- async $ Mx.run verboseTracer 0 mux bearer
getInitRes <- sequence [ Mx.runMiniProtocol
mux
(daNum app)
Expand Down Expand Up @@ -1835,7 +1835,7 @@ close_experiment
])
Mx.stop $ \mux ->
withNetworkCtx clientCtx $ \clientBearer ->
withAsync (Mx.run ((Client,) `contramap` muxTracer) mux clientBearer) $ \_muxAsync ->
withAsync (Mx.run ((Client,) `contramap` muxTracer) 0 mux clientBearer) $ \_muxAsync ->
Mx.runMiniProtocol
mux miniProtocolNum
Mx.InitiatorDirectionOnly Mx.StartEagerly
Expand All @@ -1853,7 +1853,7 @@ close_experiment
])
Mx.stop $ \mux ->
withNetworkCtx serverCtx $ \serverBearer ->
withAsync (Mx.run ((Server,) `contramap` muxTracer) mux serverBearer) $ \_muxAsync -> do
withAsync (Mx.run ((Server,) `contramap` muxTracer) 0 mux serverBearer) $ \_muxAsync -> do
Mx.runMiniProtocol
mux miniProtocolNum
Mx.ResponderDirectionOnly Mx.StartOnDemand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Quiet (Quiet (..))
newtype SizeInBytes = SizeInBytes { getSizeInBytes :: Word32 }
deriving (Eq, Ord)
deriving Show via Quiet SizeInBytes
deriving Bounded via Word32
deriving Enum via Word32
deriving Num via Word32
deriving Real via Word32
Expand Down
10 changes: 8 additions & 2 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import Control.Tracer (Tracer (..), contramap, nullTracer, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Either (partitionEithers)
import Data.Functor (($>))
import Data.Hashable (Hashable)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Typeable (Typeable)

Expand Down Expand Up @@ -77,6 +78,7 @@ import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Server2 qualified as Server
import Ouroboros.Network.Snocket (Snocket, socketSnocket)
import Ouroboros.Network.Snocket qualified as Snocket
import Ouroboros.Network.Socket () -- Hashable SockAddr
import Ouroboros.Network.Util.ShowProxy


Expand Down Expand Up @@ -174,7 +176,10 @@ withBidirectionalConnectionManager
:: forall peerAddr socket m a.
( ConnectionManagerMonad m

, Ord peerAddr, Show peerAddr, Typeable peerAddr
, Hashable peerAddr
, Ord peerAddr
, Show peerAddr
, Typeable peerAddr

-- debugging
, MonadFix m
Expand Down Expand Up @@ -441,7 +446,8 @@ runInitiatorProtocols
--
bidirectionalExperiment
:: forall peerAddr socket.
( Ord peerAddr
( Hashable peerAddr
, Ord peerAddr
, Show peerAddr
, Typeable peerAddr
, Eq peerAddr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ library testlib
cborg,
containers,
contra-tracer,
hashable,
io-classes,
io-sim,
network-mux,
Expand Down Expand Up @@ -331,6 +332,7 @@ executable demo-connection-manager
base >=4.14 && <4.21,
bytestring,
contra-tracer,
hashable,
io-classes,
network,
network-mux,
Expand Down
Loading