-
Notifications
You must be signed in to change notification settings - Fork 11
/
merge.go
181 lines (162 loc) · 5.08 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package zenodb
import (
"fmt"
"io"
"io/ioutil"
"os"
"github.com/golang/snappy"
"github.com/getlantern/bytemap"
"github.com/getlantern/errors"
"github.com/getlantern/goexpr"
"github.com/getlantern/golog"
"github.com/getlantern/wal"
"github.com/getlantern/zenodb/common"
"github.com/getlantern/zenodb/core"
"github.com/getlantern/zenodb/encoding"
"github.com/getlantern/zenodb/sql"
)
var (
emptyOffset = make(wal.Offset, wal.OffsetSize)
)
// FilterAndMerge merges the specified inFiles into the given outFile, where
// inFiles and outFiles are all valid filestore files. The schema is based on
// the named table. If whereClause is specified, rows are filtered by comparing
// them to the whereClause. The merge is performed as a disk-based merge in
// order to use minimal memory. If shouldSort is true, the output will be sorted
// by key.
func (db *DB) FilterAndMerge(table string, whereClause string, shouldSort bool, outFile string, inFiles ...string) error {
t := db.getTable(table)
if t == nil {
return errors.New("Table %v not found", table)
}
return t.filterAndMerge(whereClause, shouldSort, outFile, inFiles)
}
// FileInfo returns information about the given data file
func FileInfo(inFile string) (offsetsBySource common.OffsetsBySource, fieldsString string, fields core.Fields, err error) {
fs := &fileStore{
filename: inFile,
}
file, err := os.OpenFile(fs.filename, os.O_RDONLY, 0)
if err != nil {
err = errors.New("Unable to open filestore at %v: %v", fs.filename, err)
return
}
defer file.Close()
r := snappy.NewReader(file)
return fs.info(r)
}
// Check checks all of the given inFiles for readability and returns errors
// for all files that are in error.
func Check(inFiles ...string) map[string]error {
errors := make(map[string]error)
for _, inFile := range inFiles {
fs := &fileStore{
filename: inFile,
t: &table{
log: golog.LoggerFor("check"),
},
}
file, err := os.OpenFile(fs.filename, os.O_RDONLY, 0)
if err != nil {
errors[inFile] = fmt.Errorf("Unable to open filestore at %v: %v", fs.filename, err)
continue
}
defer file.Close()
r := snappy.NewReader(file)
_, _, _, err = fs.info(r)
if err != nil {
errors[inFile] = err
continue
}
n, err := io.Copy(ioutil.Discard, r)
if err != nil {
errors[inFile] = fmt.Errorf("%v after %d bytes read", err, n)
}
fmt.Printf("Read %d uncompressed bytes from %v\n", n, inFile)
}
return errors
}
// CheckTable checks the given data file for the given table to make sure it's readable
func (db *DB) CheckTable(table string, filename string) error {
t := db.getTable(table)
if t == nil {
return errors.New("Table %v not found", table)
}
fs := &fileStore{
t: t,
fields: t.fields,
filename: filename,
}
numRows := 0
_, err := fs.iterate(t.fields, nil, true, false, func(key bytemap.ByteMap, columns []encoding.Sequence, raw []byte) (bool, error) {
numRows++
return true, nil
})
if err != nil {
return errors.New("Encountered error after reading %d rows: %v", numRows, err)
}
fmt.Printf("Read %d rows\n", numRows)
return nil
}
func (t *table) filterAndMerge(whereClause string, shouldSort bool, outFile string, inFiles []string) error {
// TODO: make this work with multi-source offsets
// okayToReuseBuffers := false
// rawOkay := false
// filter, err := whereFor(whereClause)
// if err != nil {
// return err
// }
// // Find highest offset amongst all infiles
// var offset wal.Offset
// for _, inFile := range inFiles {
// nextOffset, _, offsetErr := readWALOffset(inFile)
// if offsetErr != nil {
// return errors.New("Unable to read WAL offset from %v: %v", inFile, offsetErr)
// }
// if nextOffset.After(offset) {
// offset = nextOffset
// }
// }
// // Create output file
// out, err := os.OpenFile(outFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
// if err != nil {
// return errors.New("Unable to create outFile at %v: %v", outFile, err)
// }
// defer out.Close()
// fso := &fileStore{
// t: t,
// fields: t.fields,
// }
// cout, err := fso.createOutWriter(out, t.fields, offset, shouldSort)
// if err != nil {
// return errors.New("Unable to create out writer for %v: %v", outFile, err)
// }
// defer cout.Close()
// truncateBefore := t.truncateBefore()
// for _, inFile := range inFiles {
// fs := &fileStore{
// t: t,
// fields: t.fields,
// filename: inFile,
// }
// _, err = fs.iterate(t.fields, nil, okayToReuseBuffers, rawOkay, func(key bytemap.ByteMap, columns []encoding.Sequence, raw []byte) (bool, error) {
// _, writeErr := fs.doWrite(cout, t.fields, filter, truncateBefore, shouldSort, key, columns, raw)
// return true, writeErr
// })
// if err != nil {
// return errors.New("Error iterating on %v: %v", inFile, err)
// }
// }
return nil
}
func whereFor(whereClause string) (goexpr.Expr, error) {
if whereClause == "" {
return nil, nil
}
sqlString := fmt.Sprintf("SELECT * FROM thetable WHERE %v", whereClause)
query, err := sql.Parse(sqlString)
if err != nil {
return nil, fmt.Errorf("Unable to process where clause %v: %v", whereClause, err)
}
return query.Where, nil
}