-
Notifications
You must be signed in to change notification settings - Fork 1
/
readchan.go
137 lines (110 loc) · 2.76 KB
/
readchan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Package readchan provides methods for interating over an io.Reader by block
// or line, and reading the results via a channel.
package readchan
import (
"bufio"
"context"
"io"
"sync"
)
var (
MaxScanTokenSize = bufio.MaxScanTokenSize
)
// A Chunk contains the []byte Data and error from the previous Read operation.
// The Data []byte slice is safe for local use until Done is called returning
// the Chunk to the pool.
type Chunk struct {
Data []byte
Err error
pool *sync.Pool
}
// Done returns the Chunk to a pool to be reused for subsequent Reads.
func (c *Chunk) Done() {
c.pool.Put(c)
}
// Reads returns a channel that will send a Chunk for every Read on r.
//
// The maxSize argument sets the allocated capacity of each []byte. Reads will
// buffer readAhead number of Chunks in the channel as soon as they are
// available. Canceling the context will cause the Reads loop to return, but
// it cannot interrupt pending Read calls on r.
func Reads(ctx context.Context, r io.Reader, maxSize, readAhead int) <-chan *Chunk {
if maxSize <= 0 {
panic("invalid max buffer size")
}
if readAhead < 0 {
readAhead = 1
}
pool := sync.Pool{}
pool.New = func() interface{} {
return &Chunk{
Data: make([]byte, maxSize),
pool: &pool,
}
}
readChan := make(chan *Chunk, readAhead)
go func() {
var n int
var err error
defer close(readChan)
for {
chunk := pool.Get().(*Chunk)
n, err = r.Read(chunk.Data)
chunk.Data = chunk.Data[:n]
chunk.Err = err
select {
case readChan <- chunk:
case <-ctx.Done():
return
}
if err != nil {
return
}
}
}()
return readChan
}
// Lines returns a channel that will send a Chunk for every line read from r.
// Lines are read via a bufio.Scanner, and do not include the newline
// characters.
//
// The readAhead argument determines the buffer size for the channel, which
// will be filled as soon as data available. Canceling the context will
// cause the Lines scanner loop to return, but it cannot interrupt pending Read
// calls on r.
func Lines(ctx context.Context, r io.Reader, readAhead int) <-chan *Chunk {
if readAhead < 0 {
readAhead = 1
}
pool := sync.Pool{}
pool.New = func() interface{} {
return &Chunk{
pool: &pool,
}
}
readChan := make(chan *Chunk, readAhead)
scanner := bufio.NewScanner(r)
go func() {
defer close(readChan)
for scanner.Scan() {
chunk := (pool.Get().(*Chunk))
if chunk.Data != nil {
chunk.Data = chunk.Data[:0]
}
chunk.Err = nil
chunk.Data = append(chunk.Data, scanner.Bytes()...)
select {
case readChan <- chunk:
case <-ctx.Done():
return
}
}
if err := scanner.Err(); err != nil {
chunk := (pool.Get().(*Chunk))
chunk.Data = nil
chunk.Err = err
readChan <- chunk
}
}()
return readChan
}