-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPipes.fold
100 lines (67 loc) · 2.01 KB
/
Pipes.fold
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
{-
+---------+ +---------+ +---------+
| | | | | |
... --> node0 --> i --> node1 --> o --> node2 --> ...
| | | | | |
+----|----+ +----|----+ +----|----+
v v v
... r ...
-}
-- Core Types
data Pipe (i, o, r) =
| Yield (o, Pipe (i, o, r))
| Await (i -> Pipe (i, o, r))
| Ready r
-- Type Synonyms
-- Effects neither `await` nor `yield`.
type Effect r = Pipe (Void, Void, r)
-- Producers can only `yield`.
type Producer (o, r) = Pipe (Void, o, r)
-- Consumers can only `await`.
type Consumer (i, r) = Pipe (i, Void, r)
-- Pipe Monad
instance Pipe :: Monad
def pure a = Ready r
def f >>= Yield (b, n) = Yield (b, n >>= f)
| f >>= Await k = Await (a -> k a >>= f)
| f >>= Ready r = f r
end
-- Creation
def empty = Ready ()
def yield b = Yield (b, Ready ())
def await = Await (b -> Ready b)
-- Category Instance
def id = Await (a -> Yield (a, id))
def compose d u =
case (d, u)
-- Downstream is ready: notify the upstream about termination.
| (Ready r, _) -> Ready r
-- Downstream is yielding: exhaust downstream first.
| (Yield (b, d'), _) -> yield b >> compose d' u
-- Downstream is awaiting: keep awaiting.
| (Await k, Yield (b, u')) -> compose (k b) u'
-- Upstream is ready: notify downstream about termination.
| (Await _, Ready r) -> Ready r
end
-- Infix composition
def (n1 <<< n2) = compose n1 n2
def (n2 >>> n1) = compose n1 n2
-- Helper Operations
def await_from node =
node >>> await
def run n =
match n
| Ready r -> Some r
| Await k -> None
| Yield (a, n') -> run n'
end
def next n =
case n
| Ready _ -> None
| Yield (a, s') -> Some (a, s')
| Await k -> fail "stream requires more input."
end
def next Ready _ -> None
| Yield (a, s') -> Some (a, s')
| Await k -> fail "stream requires more input."
end