Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
Icelandjack committed Oct 25, 2024
1 parent 180df58 commit 5ed7fe3
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 93 deletions.
6 changes: 3 additions & 3 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Custom repository for cardano haskell packages, see
-- ouroboros-network/CONTRIBUTING for more
repository cardano-haskell-packages
url: https://input-output-hk.github.io/cardano-haskell-packages
url: https://chap.intersectmbo.org/
secure: True
root-keys:
3e0cce471cf09815f930210f7827266fd09045445d65923e6d0238a6cd15126f
Expand All @@ -16,8 +16,8 @@ repository cardano-haskell-packages
-- Bump this if you need newer packages from Hackage

index-state:
, hackage.haskell.org 2024-09-05T18:39:40Z
, cardano-haskell-packages 2024-09-10T12:51:27Z
, hackage.haskell.org 2024-10-24T18:39:40Z
, cardano-haskell-packages 2024-10-24T07:10:59Z

packages: ./.

Expand Down
9 changes: 7 additions & 2 deletions ekg-forward.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ library
, io-classes >= 1.4.1
, network
, ouroboros-network-api
, ouroboros-network-framework >= 0.8 && < 0.14
, ouroboros-network-framework ^>= 0.14
-- Marcin:
-- ouroboros-network-framework >= 0.8 && < 0.14
-- , singletons == 3.0
, singletons ^>= 3.0
-- typed-protocols ^>= 0.2
, serialise
, stm
, text
, time
, typed-protocols ^>= 0.1.1
, typed-protocols ^>= 0.3
, typed-protocols-cborg
, unordered-containers

Expand Down
56 changes: 39 additions & 17 deletions src/System/Metrics/Network/Forwarder.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module System.Metrics.Network.Forwarder
( connectToAcceptor
Expand All @@ -18,6 +20,7 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import Data.Void (Void)
import qualified Network.Socket as Socket
import Control.Monad (void)
import Ouroboros.Network.Context (MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.Driver.Simple (runPeer)
import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
Expand All @@ -38,7 +41,7 @@ import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion
import Ouroboros.Network.Snocket (MakeBearer, Snocket,
localAddressFromPath, localSnocket, socketSnocket,
makeLocalBearer, makeSocketBearer)
import Ouroboros.Network.Socket (HandshakeCallbacks (..), connectToNode, nullNetworkConnectTracers)
import Ouroboros.Network.Socket (HandshakeCallbacks (..), ConnectToArgs (..), connectToNode, nullNetworkConnectTracers)
import qualified System.Metrics as EKG

import System.Metrics.Configuration (ForwarderConfiguration (..), HowToConnect (..))
Expand All @@ -63,8 +66,18 @@ connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager $ \
address = Socket.addrAddress acceptorAddr
doConnectToAcceptor snocket makeSocketBearer mempty address timeLimitsHandshake app

-- connectToNode
-- :: Snocket IO fd0 addr0
-- -> MakeBearer IO fd0
-- -> ConnectToArgs fd0 addr0 vNumber0 vData0
-- -> (fd0 -> IO ())
-- -> Ouroboros.Network.Protocol.Handshake.Version.Versions vNumber0 vData0 (Ouroboros.Network.Mux.OuroborosApplicationWithMinimalCtx muxMode0 addr0 LBS.ByteString IO a0 b0)
-- -> Maybe addr0
-- -> addr0
-- -> IO (Either GHC.Exception.Type.SomeException (Either a0 b0))
doConnectToAcceptor
:: Snocket IO fd addr
:: forall fd addr. ()
=> Snocket IO fd addr
-> MakeBearer IO fd
-> (fd -> IO ()) -- ^ configure socket
-> addr
Expand All @@ -75,21 +88,30 @@ doConnectToAcceptor
LBS.ByteString IO () Void
-> IO ()
doConnectToAcceptor snocket makeBearer configureSocket address timeLimits app =
connectToNode
snocket
makeBearer
configureSocket
unversionedHandshakeCodec
timeLimits
unversionedProtocolDataCodec
nullNetworkConnectTracers
(HandshakeCallbacks acceptableVersion queryVersion)
(simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
app)
Nothing
address

let
connectToArgs :: ConnectToArgs fd addr vNumber0 vData0
connectToArgs = ConnectToArgs undefined undefined undefined undefined undefined

-- connectToArgs :: Ouroboros.Network.Socket.ConnectToArgs fd0 addr0 vNumber0 vData0

void do
connectToNode
snocket
makeBearer

configureSocket -- undefined --
(simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
app)
-- undefined -- unversionedHandshakeCodec
-- undefined -- timeLimits
-- undefined -- unversionedProtocolDataCodec
-- nullNetworkConnectTracers
-- (HandshakeCallbacks acceptableVersion queryVersion)
Nothing
address

forwarderApp
:: ForwarderConfiguration
Expand Down
14 changes: 8 additions & 6 deletions src/System/Metrics/Protocol/Acceptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ module System.Metrics.Protocol.Acceptor (
, ekgAcceptorPeer
) where

import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
PeerRole (..))
import Network.TypedProtocol.Peer.Client
-- import Network.TypedProtocol.Core (PeerRole (..))
-- import Network.TypedProtocol.Peer (Peer (..))
-- import qualified Network.TypedProtocol.Core as Core

import System.Metrics.Protocol.Type

Expand All @@ -38,14 +40,14 @@ data EKGAcceptor req resp m a where
ekgAcceptorPeer
:: Monad m
=> EKGAcceptor req resp m a
-> Peer (EKGForward req resp) 'AsClient 'StIdle m a
-> Client (EKGForward req resp) 'NonPipelined 'StIdle m a
ekgAcceptorPeer = \case
SendMsgReq req next ->
-- Send our message (request for the new metrics from the forwarder).
Yield (ClientAgency TokIdle) (MsgReq req) $
Yield (MsgReq req) $
-- We're now into the 'StBusy' state, and now we'll wait for a reply
-- from the forwarder.
Await (ServerAgency TokBusy) $ \(MsgResp resp) ->
Await $ \(MsgResp resp) ->
Effect $
ekgAcceptorPeer <$> next resp

Expand All @@ -55,4 +57,4 @@ ekgAcceptorPeer = \case
-- 'done', with a return value.
Effect $ do
r <- getResult
return $ Yield (ClientAgency TokIdle) MsgDone (Done TokDone r)
return $ Yield MsgDone (Done r)
40 changes: 22 additions & 18 deletions src/System/Metrics/Protocol/Codec.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
{-# OPTIONS_GHC -Winaccessible-code #-}
{-# OPTIONS_GHC -Werror #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module System.Metrics.Protocol.Codec (
codecEKGForward
Expand All @@ -14,12 +18,13 @@ import Codec.CBOR.Read (DeserialiseFailure)
import Control.Monad.Class.MonadST (MonadST)
import qualified Data.ByteString.Lazy as LBS
import Text.Printf (printf)
import Network.TypedProtocol.Codec.CBOR (Codec, PeerHasAgency (..),
PeerRole (..), SomeMessage (..),
mkCodecCborLazyBS)
import Network.TypedProtocol.Core
import Network.TypedProtocol.Codec (Codec, SomeMessage (..))
import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS)

import System.Metrics.Protocol.Type


codecEKGForward
:: forall req resp m.
(MonadST m)
Expand All @@ -34,47 +39,46 @@ codecEKGForward encodeReq decodeReq
mkCodecCborLazyBS encode decode
where
-- Encode messages.
encode :: forall (pr :: PeerRole)
(st :: EKGForward req resp)
encode :: forall (st :: EKGForward req resp)
(st' :: EKGForward req resp).
PeerHasAgency pr st
-> Message (EKGForward req resp) st st'
Message (EKGForward req resp) st st'
-> CBOR.Encoding

encode (ClientAgency TokIdle) (MsgReq req) =
encode (MsgReq req) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 0
<> encodeReq req

encode (ClientAgency TokIdle) MsgDone =
encode MsgDone =
CBOR.encodeListLen 1
<> CBOR.encodeWord 1

encode (ServerAgency TokBusy) (MsgResp resp) =
encode (MsgResp resp) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 1
<> encodeResp resp

-- Decode messages
decode :: forall (pr :: PeerRole)
(st :: EKGForward req resp) s.
PeerHasAgency pr st
decode :: forall (st :: EKGForward req resp) s.
ActiveState st
=> StateToken st
-> CBOR.Decoder s (SomeMessage st)
decode stok = do
len <- CBOR.decodeListLen
key <- CBOR.decodeWord
case (key, len, stok) of
(0, 2, ClientAgency TokIdle) ->
(0, 2, SingIdle) ->
SomeMessage . MsgReq <$> decodeReq

(1, 1, ClientAgency TokIdle) ->
(1, 1, SingIdle) ->
return $ SomeMessage MsgDone

(1, 2, ServerAgency TokBusy) ->
(1, 2, SingBusy) ->
SomeMessage . MsgResp <$> decodeResp

-- Failures per protocol state
(_, _, ClientAgency TokIdle) ->
(_, _, SingIdle) ->
fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, ServerAgency TokBusy) ->
(_, _, SingBusy) ->
fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, SingDone) -> notActiveState stok
59 changes: 40 additions & 19 deletions src/System/Metrics/Protocol/Forwarder.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
{-# options_ghc -w #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module System.Metrics.Protocol.Forwarder (
EKGForwarder (..)
, ekgForwarderPeer
) where

import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
PeerRole (..))
import Network.TypedProtocol.Peer.Server

import System.Metrics.Protocol.Type

Expand All @@ -34,20 +36,39 @@ data EKGForwarder req resp m a = EKGForwarder {
ekgForwarderPeer
:: Monad m
=> EKGForwarder req resp m a
-> Peer (EKGForward req resp) 'AsServer 'StIdle m a
ekgForwarderPeer EKGForwarder{..} = go
where
go =
-- In the 'StIdle' state the forwarder is awaiting a request message
-- from the acceptor.
Await (ClientAgency TokIdle) $ \case
-- The acceptor sent us a request for new metrics, so now we're
-- in the 'StBusy' state which means it's the forwarder's turn to send
-- a reply.
MsgReq req -> Effect $ do
resp <- recvMsgReq req
return $ Yield (ServerAgency TokBusy) (MsgResp resp) go

-- The acceptor sent the done transition, so we're in the 'StDone' state
-- so all we can do is stop using 'done', with a return value.
MsgDone -> Effect $ Done TokDone <$> recvMsgDone
-> Server (EKGForward req resp) 'NonPipelined 'StIdle m a
ekgForwarderPeer EKGForwarder{..} =
-- In the 'StIdle' state the forwarder is awaiting a request message
-- from the acceptor.
Await $ \case
-- The acceptor sent us a request for new metrics, so now we're
-- in the 'StBusy' state which means it's the forwarder's turn to send
-- a reply.
MsgReq req -> Effect $ do
(resp, next) <- recvMsgReq req
return $ Yield (MsgResp resp) (ekgForwarderPeer next)

-- The acceptor sent the done transition, so we're in the 'StDone' state
-- so all we can do is stop using 'done', with a return value.
MsgDone -> Effect $ Done <$> recvMsgDone
-- ekgForwarderPeer
-- :: forall m req resp a. ()
-- => Monad m
-- => EKGForwarder req resp m a
-- -> Server (EKGForward req resp) 'NonPipelined 'StIdle m a
-- ekgForwarderPeer EKGForwarder{..} = go
-- where
-- go :: Peer (EKGForward req resp) 'AsServer 'NonPipelined 'StIdle m a
-- go =
-- -- In the 'StIdle' state the forwarder is awaiting a request message
-- -- from the acceptor.
-- Await Core.ReflClientAgency \case
-- MsgReq req -> Effect do
-- resp <- recvMsgReq req
-- return $ Yield Core.ReflServerAgency (MsgResp resp) go

-- -- The acceptor sent the done transition, so we're in the 'StDone' state
-- -- so all we can do is stop using 'done', with a return value.
-- MsgDone -> Effect $ Done Core.ReflNobodyAgency <$> recvMsgDone


Loading

0 comments on commit 5ed7fe3

Please sign in to comment.