-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchans.go
202 lines (165 loc) · 3.98 KB
/
chans.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package chans
import (
"context"
"golang.org/x/exp/constraints"
"math/big"
)
var oneBig = big.NewInt(1)
// Concat produces a channel containing all items from all channels concatenated in the order they are passed.
func Concat[T any](channels ...<-chan T) <-chan T {
newCh := make(chan T)
go func() {
defer close(newCh)
for _, ch := range channels {
for v := range ch {
newCh <- v
}
}
}()
return newCh
}
// Empty returns an empty chanel
func Empty[T any]() <-chan T {
newCh := make(chan T)
close(newCh)
return newCh
}
// Filter returns a new channel that only contains elements that match the predicate. Runs until source channel is closed
func Filter[T any](source <-chan T, predicate func(T) bool) <-chan T {
out := make(chan T, cap(source))
go func() {
defer close(out)
for x := range source {
if predicate(x) {
out <- x
}
}
}()
return out
}
// Flatten flattens a chan of chans into a chan of elements
func Flatten[T any](source <-chan <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for c := range source {
for element := range c {
out <- element
}
}
}()
return out
}
// ForEach performs the given function on every element in the channel. Runs until source channel is closed
func ForEach[T any](source <-chan T, f func(T)) {
go func() {
for x := range source {
f(x)
}
}()
}
// FromValues creates a ComposableChannel from a list of values
func FromValues[T any](values ...T) <-chan T {
newCh := make(chan T, len(values))
defer close(newCh)
for _, v := range values {
newCh <- v
}
return newCh
}
// Map maps a channel of T to a channel of S. Runs until source channel is closed.
// By default, output channel has the same capacity as the source channel.
// Desired capacity of the output channel can be specified via an optional argument.
func Map[T, S any](source <-chan T, f func(T) S, capacity ...int) <-chan S {
var out chan S
if len(capacity) > 0 {
out = make(chan S, capacity[0])
} else {
out = make(chan S)
}
go func() {
defer close(out)
for x := range source {
out <- f(x)
}
}()
return out
}
// Pop gets the next element from the given channel. Returns false if the context expires before an element is available.
func Pop[T any](ctx context.Context, c <-chan T) (T, bool) {
if ctx.Err() != nil {
return *new(T), false
}
select {
case <-ctx.Done():
return *new(T), false
case x := <-c:
return x, true
}
}
// Push adds the given element to the given channel. Returns false if the context expires before the channel unblocks.
func Push[T any](ctx context.Context, c chan<- T, x T) bool {
if ctx.Err() != nil {
return false
}
select {
case <-ctx.Done():
return false
case c <- x:
return true
}
}
// Range creates a channel that inclusively contains all values from `from` to `to`.
func Range[T constraints.Integer](from, to T) <-chan T {
if from > to {
return Empty[T]()
}
newCh := make(chan T)
go func() {
defer close(newCh)
for i := from; i <= to; i++ {
newCh <- i
}
}()
return newCh
}
// RangeBig creates a channel that inclusively contains all values from `from` to `to`.
func RangeBig(from, to *big.Int) <-chan *big.Int {
if from.Cmp(to) > 0 {
return Empty[*big.Int]()
}
newCh := make(chan *big.Int)
go func() {
defer close(newCh)
for i := (&big.Int{}).Set(from); i.Cmp(to) <= 0; i.Add(i, oneBig) {
newCh <- (&big.Int{}).Set(i)
}
}()
return newCh
}
// DrainOpen enumerates all items from the channel and discards them.
// Returns number of items drained as soon as channel is empty.
func DrainOpen[T any](channel <-chan T) int {
drained := 0
for {
select {
case _, ok := <-channel:
if ok {
drained++
} else {
return drained
}
default:
return drained
}
}
}
// Drain enumerates all items from the channel and discards them.
// Blocks until the channel is closed and returns number of items drained.
func Drain[T any](channel <-chan T) int {
drained := 0
for range channel {
drained++
}
return drained
}