Skip to content

Commit

Permalink
WIP receive
Browse files Browse the repository at this point in the history
  • Loading branch information
tomjaguarpaw committed Sep 30, 2024
1 parent 6410df4 commit 339fcf4
Showing 1 changed file with 159 additions and 1 deletion.
160 changes: 159 additions & 1 deletion bluefin-internal/src/Bluefin/Internal/Examples.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Bluefin.Internal.Pipes
import qualified Bluefin.Internal.Pipes as P
import Control.Exception (IOException)
import qualified Control.Exception
import Control.Monad (forever, unless, when)
import Control.Monad (forever, replicateM_, unless, when)
import Control.Monad.IO.Class (liftIO)
import Data.Foldable (for_)
import Data.Monoid (Any (Any, getAny))
Expand Down Expand Up @@ -169,6 +169,164 @@ example3_ = runEff $ \io -> do

forEach formattedLines $ \line -> effIO io (putStrLn line)

receiveList ::
(e :> es) =>
[a] ->
IOE e ->
(forall e1. Receive a e1 -> Eff (e1 :& es) ()) ->
Eff es ()
receiveList l io k = evalState l $ \s -> do
withJump $ \done ->
bracket
(pure ())
(\() -> effIO io (putStrLn "Released"))
$ \() -> do
feedEach (useImplWithin k) $ do
(x, xs) <-
get s >>= \case
[] -> jumpTo done
x : xs -> pure (x, xs)
put s xs
pure x

takeRec ::
(e3 :> es) =>
Int ->
(forall e. Receive a e -> Eff (e :& es) ()) ->
Receive a e3 ->
Eff es ()
takeRec n k rec =
withJump $ \done -> evalState n $ \s -> feedEach (useImplWithin k) $ do
s' <- get s
if s' <= 0
then jumpTo done
else do
modify s (subtract 1)
receive rec

mapRec ::
(e :> es) =>
(a -> b) ->
(forall e1. Receive b e1 -> Eff (e1 :& es) ()) ->
Receive a e ->
Eff es ()
mapRec f = traverseRec (pure . f)

traverseRec ::
(e :> es) =>
(a -> Eff es b) ->
(forall e1. Receive b e1 -> Eff (e1 :& es) ()) ->
Receive a e ->
Eff es ()
traverseRec f k rec = forEach k $ \() -> do
r <- receive rec
f r

receiveUsage ::
(e3 :> es, e2 :> es) =>
IOE e3 ->
(forall e. Receive () e -> Eff (e :& es) ()) ->
Receive Int e2 ->
Eff es ()
receiveUsage io x = do
mapRec (* 11) $
mapRec (subtract 1) $
takeRec 3 $
traverseRec (effIO io . print) $
useImplWithin x

receiveExample :: IO ()
receiveExample = runEff $ \io -> do
receiveList [1 :: Int ..] io $ receiveUsage io $ \rec -> do
replicateM_ 5 (receive rec)

receiveStreamExample :: IO (Either String String)
receiveStreamExample = runEff $ \io -> do
try $ \ex -> do
receiveStream
( \r ->
bracket
(effIO io (putStrLn "Starting 2"))
(\_ -> effIO io (putStrLn "Leaving 2"))
$ \_ -> do
for_ [1 :: Int .. 100] $ \n -> do
b' <- receive r
effIO
io
( putStrLn
("Received body " ++ show b' ++ " at time " ++ show n)
)
pure "Receiver finished first"
)
( \y -> bracket
(effIO io (putStrLn "Starting 1"))
(\_ -> effIO io (putStrLn "Leaving 1"))
$ \_ -> do
for_ [1 :: Int .. 10] $ \n -> do
effIO io (putStrLn ("Sending " ++ show n))
yield y n
when (n > 5) $ do
effIO io (putStrLn "Aborting...")
throw ex "Aborted"

pure "Yielder finished first"
)

connectExample :: IO (Either String String)
connectExample = runEff $ \io -> do
try $ \ex -> do
connectCoroutines
( \y -> bracket
(effIO io (putStrLn "Starting 1"))
(\_ -> effIO io (putStrLn "Leaving 1"))
$ \_ -> do
for_ [1 :: Int .. 10] $ \n -> do
effIO io (putStrLn ("Sending " ++ show n))
yield y n
when (n > 5) $ do
effIO io (putStrLn "Aborting...")
throw ex "Aborted"

pure "Yielder finished first"
)
( \binit r ->
bracket
(effIO io (putStrLn "Starting 2"))
(\_ -> effIO io (putStrLn "Leaving 2"))
$ \_ -> do
effIO io (putStrLn ("Received intial " ++ show binit))
for_ [1 :: Int .. 100] $ \n -> do
b' <- receive r
effIO
io
( putStrLn
("Received body " ++ show b' ++ " at time " ++ show n)
)
pure "Receiver finished first"
)

zipCoroutinesExample :: IO ()
zipCoroutinesExample = runEff $ \io -> do
let m1 y = do
r <- yieldCoroutine y 1
evalState r $ \rs -> do
for_ [1 .. 10 :: Int] $ \i -> do
r' <- get rs
r'' <- yieldCoroutine y (r' + i)
put rs r''

let m2 y = do
r <- yieldCoroutine y 1
evalState r $ \rs -> do
for_ [1 .. 5 :: Int] $ \i -> do
r' <- get rs
r'' <- yieldCoroutine y (r' - i)
put rs r''

forEach (\c -> zipCoroutines c m1 m2) $ \i@(i1, i2) -> do
effIO io (print i)
pure (i1 + i2)

-- Count the number of (strictly) positives and (strictly) negatives
-- in a list, unless we see a zero, in which case we bail with an
-- error message.
Expand Down

0 comments on commit 339fcf4

Please sign in to comment.