-
Notifications
You must be signed in to change notification settings - Fork 1
/
colreader.go
68 lines (60 loc) · 1.29 KB
/
colreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package giashard
import (
"log"
"path/filepath"
)
// read columns of compressed files containing lines
type ColumnReader struct {
cols []string
readers []*LineReader
}
// make new column reader for the given directory, which is assumed to have
// files name c1.gz, c2.gz, ... for each element of cols
func NewColumnReader(dir string, cols ...string) (r *ColumnReader, err error) {
readers := make([]*LineReader, 0, len(cols))
for _, c := range cols {
lr, err := NewLineReader(filepath.Join(dir, c + ".gz"))
if err != nil {
for _, lr := range readers {
if e := lr.Close(); e != nil {
log.Print(e)
}
}
return nil, err
}
readers = append(readers, lr)
}
r = &ColumnReader{cols, readers}
return
}
// close the underlying readers
func (r *ColumnReader)Close() (err error) {
for _, lr := range r.readers {
if e := lr.Close(); e != nil {
err = e
}
}
return
}
func (r *ColumnReader)Rows() (ch chan map[string][]byte) {
ch = make(chan map[string][]byte)
srcs := make([]chan []byte, 0, len(r.cols))
for _, lr := range r.readers {
srcs = append(srcs, lr.Lines())
}
go func() {
for {
m := make(map[string][]byte)
for i, c := range(r.cols) {
v, ok := <- srcs[i]
if !ok {
close(ch)
return
}
m[c] = v
}
ch <- m
}
}()
return ch
}