Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
Icelandjack committed Oct 28, 2024
1 parent 180df58 commit b025015
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 92 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.6.1 - Oct 2024

* Updated to `typed-protocols-0.3`.

## 0.6.0 - Sep 2024

* Remove potentially leaky continuation passing of `EKGForwarder`.
Expand Down
6 changes: 3 additions & 3 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Custom repository for cardano haskell packages, see
-- ouroboros-network/CONTRIBUTING for more
repository cardano-haskell-packages
url: https://input-output-hk.github.io/cardano-haskell-packages
url: https://chap.intersectmbo.org/
secure: True
root-keys:
3e0cce471cf09815f930210f7827266fd09045445d65923e6d0238a6cd15126f
Expand All @@ -16,8 +16,8 @@ repository cardano-haskell-packages
-- Bump this if you need newer packages from Hackage

index-state:
, hackage.haskell.org 2024-09-05T18:39:40Z
, cardano-haskell-packages 2024-09-10T12:51:27Z
, hackage.haskell.org 2024-10-24T18:39:40Z
, cardano-haskell-packages 2024-10-24T07:10:59Z

packages: ./.

Expand Down
9 changes: 7 additions & 2 deletions ekg-forward.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ library
, io-classes >= 1.4.1
, network
, ouroboros-network-api
, ouroboros-network-framework >= 0.8 && < 0.14
, ouroboros-network-framework ^>= 0.14
-- Marcin:
-- ouroboros-network-framework >= 0.8 && < 0.14
-- , singletons == 3.0
, singletons ^>= 3.0
-- typed-protocols ^>= 0.2
, serialise
, stm
, text
, time
, typed-protocols ^>= 0.1.1
, typed-protocols ^>= 0.3
, typed-protocols-cborg
, unordered-containers

Expand Down
68 changes: 47 additions & 21 deletions src/System/Metrics/Network/Forwarder.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module System.Metrics.Network.Forwarder
( connectToAcceptor
Expand All @@ -18,27 +21,28 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import Data.Void (Void)
import qualified Network.Socket as Socket
import Network.TypedProtocol.Codec
import Control.Monad (void)
import Ouroboros.Network.Context (MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.Driver.Simple (runPeer)
import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.IOManager (withIOManager)
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..),
RunMiniProtocol (..),
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..), MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..), RunMiniProtocol (..),
miniProtocolLimits, miniProtocolNum, miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake,
import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec, noTimeLimitsHandshake,
timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..),
UnversionedProtocolData (..),
unversionedHandshakeCodec,
unversionedProtocolDataCodec)
import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion, simpleSingletonVersions)

import Ouroboros.Network.Protocol.Handshake.Version (Versions, acceptableVersion, queryVersion, simpleSingletonVersions)
import Ouroboros.Network.Snocket (MakeBearer, Snocket,
localAddressFromPath, localSnocket, socketSnocket,
makeLocalBearer, makeSocketBearer)
import Ouroboros.Network.Socket (HandshakeCallbacks (..), connectToNode, nullNetworkConnectTracers)
import Ouroboros.Network.Socket (NetworkConnectTracers(..), HandshakeCallbacks (..), ConnectToArgs (..), connectToNode, nullNetworkConnectTracers)
import qualified System.Metrics as EKG

import System.Metrics.Configuration (ForwarderConfiguration (..), HowToConnect (..))
Expand All @@ -64,7 +68,8 @@ connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager $ \
doConnectToAcceptor snocket makeSocketBearer mempty address timeLimitsHandshake app

doConnectToAcceptor
:: Snocket IO fd addr
:: forall fd addr. ()
=> Snocket IO fd addr
-> MakeBearer IO fd
-> (fd -> IO ()) -- ^ configure socket
-> addr
Expand All @@ -75,21 +80,42 @@ doConnectToAcceptor
LBS.ByteString IO () Void
-> IO ()
doConnectToAcceptor snocket makeBearer configureSocket address timeLimits app =
connectToNode
snocket
makeBearer
configureSocket
unversionedHandshakeCodec
timeLimits
unversionedProtocolDataCodec
nullNetworkConnectTracers
(HandshakeCallbacks acceptableVersion queryVersion)
(simpleSingletonVersions

let
connectToArgs :: ConnectToArgs fd addr UnversionedProtocol UnversionedProtocolData
connectToArgs = ConnectToArgs
{ ctaHandshakeCodec = unversionedHandshakeCodec
:: Codec (Handshake UnversionedProtocol Term) CBOR.DeserialiseFailure IO LBS.ByteString
, ctaHandshakeTimeLimits = timeLimits
:: ProtocolTimeLimits (Handshake UnversionedProtocol Term)
, ctaVersionDataCodec = unversionedProtocolDataCodec
:: VersionDataCodec Term UnversionedProtocol UnversionedProtocolData
, ctaConnectTracers = nullNetworkConnectTracers
:: NetworkConnectTracers addr UnversionedProtocol
, ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion
:: HandshakeCallbacks UnversionedProtocolData
}

versions :: Versions UnversionedProtocol UnversionedProtocolData (OuroborosApplication 'InitiatorMode (MinimalInitiatorContext addr) (ResponderContext addr) LBS.ByteString IO () Void)
versions = simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
app)
Nothing
address
app

localAddress :: Maybe addr
remoteAddress :: addr
(localAddress, remoteAddress) = (Nothing, address)

in
void do
connectToNode @'InitiatorMode @UnversionedProtocol
snocket
makeBearer
connectToArgs
configureSocket
versions
localAddress
remoteAddress

forwarderApp
:: ForwarderConfiguration
Expand Down
12 changes: 5 additions & 7 deletions src/System/Metrics/Protocol/Acceptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ module System.Metrics.Protocol.Acceptor (
, ekgAcceptorPeer
) where

import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
PeerRole (..))

import Network.TypedProtocol.Peer.Client
import System.Metrics.Protocol.Type

-- | Please note that the acceptor is a server from the __networking__
Expand All @@ -38,14 +36,14 @@ data EKGAcceptor req resp m a where
ekgAcceptorPeer
:: Monad m
=> EKGAcceptor req resp m a
-> Peer (EKGForward req resp) 'AsClient 'StIdle m a
-> Client (EKGForward req resp) 'NonPipelined 'StIdle m a
ekgAcceptorPeer = \case
SendMsgReq req next ->
-- Send our message (request for the new metrics from the forwarder).
Yield (ClientAgency TokIdle) (MsgReq req) $
Yield (MsgReq req) $
-- We're now into the 'StBusy' state, and now we'll wait for a reply
-- from the forwarder.
Await (ServerAgency TokBusy) $ \(MsgResp resp) ->
Await $ \(MsgResp resp) ->
Effect $
ekgAcceptorPeer <$> next resp

Expand All @@ -55,4 +53,4 @@ ekgAcceptorPeer = \case
-- 'done', with a return value.
Effect $ do
r <- getResult
return $ Yield (ClientAgency TokIdle) MsgDone (Done TokDone r)
return $ Yield MsgDone (Done r)
40 changes: 22 additions & 18 deletions src/System/Metrics/Protocol/Codec.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
{-# OPTIONS_GHC -Winaccessible-code #-}
{-# OPTIONS_GHC -Werror #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module System.Metrics.Protocol.Codec (
codecEKGForward
Expand All @@ -14,12 +18,13 @@ import Codec.CBOR.Read (DeserialiseFailure)
import Control.Monad.Class.MonadST (MonadST)
import qualified Data.ByteString.Lazy as LBS
import Text.Printf (printf)
import Network.TypedProtocol.Codec.CBOR (Codec, PeerHasAgency (..),
PeerRole (..), SomeMessage (..),
mkCodecCborLazyBS)
import Network.TypedProtocol.Core
import Network.TypedProtocol.Codec (Codec, SomeMessage (..))
import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS)

import System.Metrics.Protocol.Type


codecEKGForward
:: forall req resp m.
(MonadST m)
Expand All @@ -34,47 +39,46 @@ codecEKGForward encodeReq decodeReq
mkCodecCborLazyBS encode decode
where
-- Encode messages.
encode :: forall (pr :: PeerRole)
(st :: EKGForward req resp)
encode :: forall (st :: EKGForward req resp)
(st' :: EKGForward req resp).
PeerHasAgency pr st
-> Message (EKGForward req resp) st st'
Message (EKGForward req resp) st st'
-> CBOR.Encoding

encode (ClientAgency TokIdle) (MsgReq req) =
encode (MsgReq req) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 0
<> encodeReq req

encode (ClientAgency TokIdle) MsgDone =
encode MsgDone =
CBOR.encodeListLen 1
<> CBOR.encodeWord 1

encode (ServerAgency TokBusy) (MsgResp resp) =
encode (MsgResp resp) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 1
<> encodeResp resp

-- Decode messages
decode :: forall (pr :: PeerRole)
(st :: EKGForward req resp) s.
PeerHasAgency pr st
decode :: forall (st :: EKGForward req resp) s.
ActiveState st
=> StateToken st
-> CBOR.Decoder s (SomeMessage st)
decode stok = do
len <- CBOR.decodeListLen
key <- CBOR.decodeWord
case (key, len, stok) of
(0, 2, ClientAgency TokIdle) ->
(0, 2, SingIdle) ->
SomeMessage . MsgReq <$> decodeReq

(1, 1, ClientAgency TokIdle) ->
(1, 1, SingIdle) ->
return $ SomeMessage MsgDone

(1, 2, ServerAgency TokBusy) ->
(1, 2, SingBusy) ->
SomeMessage . MsgResp <$> decodeResp

-- Failures per protocol state
(_, _, ClientAgency TokIdle) ->
(_, _, SingIdle) ->
fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, ServerAgency TokBusy) ->
(_, _, SingBusy) ->
fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, SingDone) -> notActiveState stok
43 changes: 32 additions & 11 deletions src/System/Metrics/Protocol/Forwarder.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
{-# options_ghc -w #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module System.Metrics.Protocol.Forwarder (
EKGForwarder (..)
, ekgForwarderPeer
) where

import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
PeerRole (..))
import Network.TypedProtocol.Peer.Server
import Network.TypedProtocol.Core

import System.Metrics.Protocol.Type

Expand All @@ -31,23 +34,41 @@ data EKGForwarder req resp m a = EKGForwarder {
-- | Interpret a particular action sequence into the server side of the
-- 'EKGForward' protocol.
--

-- ekgForwarderPeer
-- :: Monad m
-- => EKGForwarder req resp m a
-- -> Server (EKGForward req resp) 'NonPipelined 'StIdle m a
-- ekgForwarderPeer EKGForwarder{..} =
-- -- In the 'StIdle' state the forwarder is awaiting a request message
-- -- from the acceptor.
-- Await $ \case
-- -- The acceptor sent us a request for new metrics, so now we're
-- -- in the 'StBusy' state which means it's the forwarder's turn to send
-- -- a reply.
-- MsgReq req -> Effect $ do
-- (resp, next) <- recvMsgReq req
-- return $ Yield (MsgResp resp) (ekgForwarderPeer next)

-- -- The acceptor sent the done transition, so we're in the 'StDone' state
-- -- so all we can do is stop using 'done', with a return value.
-- MsgDone -> Effect $ Done <$> recvMsgDone
ekgForwarderPeer
:: Monad m
:: forall m req resp a. ()
=> Monad m
=> EKGForwarder req resp m a
-> Peer (EKGForward req resp) 'AsServer 'StIdle m a
-> Server (EKGForward req resp) 'NonPipelined 'StIdle m a
ekgForwarderPeer EKGForwarder{..} = go
where
go :: Server (EKGForward req resp) 'NonPipelined 'StIdle m a
go =
-- In the 'StIdle' state the forwarder is awaiting a request message
-- from the acceptor.
Await (ClientAgency TokIdle) $ \case
-- The acceptor sent us a request for new metrics, so now we're
-- in the 'StBusy' state which means it's the forwarder's turn to send
-- a reply.
MsgReq req -> Effect $ do
Await \case
MsgReq req -> Effect do
resp <- recvMsgReq req
return $ Yield (ServerAgency TokBusy) (MsgResp resp) go
return $ Yield (MsgResp resp) go

-- The acceptor sent the done transition, so we're in the 'StDone' state
-- so all we can do is stop using 'done', with a return value.
MsgDone -> Effect $ Done TokDone <$> recvMsgDone
MsgDone -> Effect $ Done <$> recvMsgDone
Loading

0 comments on commit b025015

Please sign in to comment.