forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_iter.go
295 lines (266 loc) · 8.82 KB
/
get_iter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"fmt"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
)
// getIter is an internal iterator used to perform gets. It iterates through
// the values for a particular key, level by level. It is not a general purpose
// internalIterator, but specialized for Get operations so that it loads data
// lazily.
type getIter struct {
comparer *Comparer
newIters tableNewIters
snapshot uint64
iterOpts IterOptions
key []byte
prefix []byte
iter internalIterator
level int
batch *Batch
mem flushableList
l0 []manifest.LevelSlice
version *version
iterKV *base.InternalKV
// tombstoned and tombstonedSeqNum track whether the key has been deleted by
// a range delete tombstone. The first visible (at getIter.snapshot) range
// deletion encounterd transitions tombstoned to true. The tombstonedSeqNum
// field is updated to hold the sequence number of the tombstone.
tombstoned bool
tombstonedSeqNum uint64
err error
}
// TODO(sumeer): CockroachDB code doesn't use getIter, but, for completeness,
// make this implement InternalIteratorWithStats.
// getIter implements the base.InternalIterator interface.
var _ base.InternalIterator = (*getIter)(nil)
func (g *getIter) String() string {
return fmt.Sprintf("len(l0)=%d, len(mem)=%d, level=%d", len(g.l0), len(g.mem), g.level)
}
func (g *getIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
panic("pebble: SeekGE unimplemented")
}
func (g *getIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
return g.SeekPrefixGEStrict(prefix, key, flags)
}
func (g *getIter) SeekPrefixGEStrict(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
panic("pebble: SeekPrefixGE unimplemented")
}
func (g *getIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
panic("pebble: SeekLT unimplemented")
}
func (g *getIter) First() *base.InternalKV {
return g.Next()
}
func (g *getIter) Last() *base.InternalKV {
panic("pebble: Last unimplemented")
}
func (g *getIter) Next() *base.InternalKV {
// If g.iter != nil, we're already iterating through a level. Next. Note
// that it's possible the next key within the level is still relevant (eg,
// MERGE keys written in the presence of an LSM snapshot).
//
// NB: We can't perform this Next below, in the for loop, because when we
// open an iterator into the next level, we need to seek to the key.
if g.iter != nil {
g.iterKV = g.iter.Next()
if err := g.iter.Error(); err != nil {
g.err = err
return nil
}
}
// This for loop finds the next internal key in the LSM that is equal to
// g.key, visible at g.snapshot and not shadowed by a range deletion. If it
// exhausts a level, it initializes iterators for the next level.
for {
if g.iter != nil {
if g.iterKV != nil {
// Check if the current KV pair is deleted by a range deletion.
if g.tombstoned && g.tombstonedSeqNum > g.iterKV.SeqNum() {
// We have a range tombstone covering this key. Rather than
// return a point or range deletion here, we return nil and
// close our internal iterator stopping iteration.
g.err = g.iter.Close()
g.iter = nil
return nil
}
// Is this the correct user key?
if g.comparer.Equal(g.key, g.iterKV.K.UserKey) {
// If the KV pair is not visible at the get's snapshot,
// Next. The level may still contain older keys with the
// same user key that are visible.
if !g.iterKV.Visible(g.snapshot, base.InternalKeySeqNumMax) {
g.iterKV = g.iter.Next()
continue
}
return g.iterKV
}
}
// We've advanced the iterator passed the desired key. Move on to the
// next memtable / level.
g.err = g.iter.Close()
g.iter = nil
if g.err != nil {
return nil
}
}
// g.iter == nil; we need to initialize the next iterator.
if !g.initializeNextIterator() {
return nil
}
g.iterKV = g.iter.SeekPrefixGE(g.prefix, g.key, base.SeekGEFlagsNone)
}
}
func (g *getIter) Prev() *base.InternalKV {
panic("pebble: Prev unimplemented")
}
func (g *getIter) NextPrefix([]byte) *base.InternalKV {
panic("pebble: NextPrefix unimplemented")
}
func (g *getIter) Error() error {
return g.err
}
func (g *getIter) Close() error {
if g.iter != nil {
if err := g.iter.Close(); err != nil && g.err == nil {
g.err = err
}
g.iter = nil
}
return g.err
}
func (g *getIter) SetBounds(lower, upper []byte) {
panic("pebble: SetBounds unimplemented")
}
func (g *getIter) SetContext(_ context.Context) {}
func (g *getIter) initializeNextIterator() (ok bool) {
// A batch's keys shadow all other keys, so we visit the batch first.
if g.batch != nil {
if g.batch.index == nil {
g.err = ErrNotIndexed
g.iterKV = nil
return false
}
g.iter = g.batch.newInternalIter(nil)
if !g.maybeSetTombstone(g.batch.newRangeDelIter(nil,
// Get always reads the entirety of the batch's history, so no
// batch keys should be filtered.
base.InternalKeySeqNumMax,
)) {
return false
}
g.batch = nil
return true
}
// If we're trying to initialize the next level of the iterator stack but
// have a tombstone from a previous level, it is guaranteed to delete keys
// in lower levels. This key is deleted.
if g.tombstoned {
return false
}
// Create iterators from memtables from newest to oldest.
if n := len(g.mem); n > 0 {
m := g.mem[n-1]
g.iter = m.newIter(nil)
if !g.maybeSetTombstone(m.newRangeDelIter(nil)) {
return false
}
g.mem = g.mem[:n-1]
return true
}
// Visit each sublevel of L0 individually, so that we only need to read
// at most one file per sublevel.
if g.level == 0 {
// Create iterators from L0 from newest to oldest.
if n := len(g.l0); n > 0 {
files := g.l0[n-1].Iter()
g.l0 = g.l0[:n-1]
iter, rangeDelIter, err := g.getSSTableIterators(files, manifest.L0Sublevel(n))
if err != nil {
g.err = firstError(g.err, err)
return false
}
if !g.maybeSetTombstone(rangeDelIter) {
return false
}
g.iter = iter
return true
}
// We've exhausted all the sublevels of L0. Progress to L1.
g.level++
}
for g.level < numLevels {
if g.version.Levels[g.level].Empty() {
g.level++
continue
}
// Open the next level of the LSM.
iter, rangeDelIter, err := g.getSSTableIterators(g.version.Levels[g.level].Iter(), manifest.Level(g.level))
if err != nil {
g.err = firstError(g.err, err)
return false
}
if !g.maybeSetTombstone(rangeDelIter) {
return false
}
g.level++
g.iter = iter
return true
}
// We've exhausted all levels of the LSM.
return false
}
// getSSTableIterators returns a point iterator and a range deletion iterator
// for the sstable in files that overlaps with the key g.key. Pebble does not
// split user keys across adjacent sstables within a level, ensuring that at
// most one sstable overlaps g.key.
func (g *getIter) getSSTableIterators(
files manifest.LevelIterator, level manifest.Level,
) (internalIterator, keyspan.FragmentIterator, error) {
files = files.Filter(manifest.KeyTypePoint)
m := files.SeekGE(g.comparer.Compare, g.key)
if m == nil {
return emptyIter, nil, nil
}
// m is now positioned at the file containing the first point key ≥ `g.key`.
// Does it exist and possibly contain point keys with the user key 'g.key'?
if m == nil || !m.HasPointKeys || g.comparer.Compare(m.SmallestPointKey.UserKey, g.key) > 0 {
return emptyIter, nil, nil
}
// m may possibly contain point (or range deletion) keys relevant to g.key.
g.iterOpts.level = level
iters, err := g.newIters(context.Background(), m, &g.iterOpts, internalIterOpts{}, iterPointKeys|iterRangeDeletions)
if err != nil {
return emptyIter, nil, err
}
return iters.Point(), iters.RangeDeletion(), nil
}
// maybeSetTombstone updates g.tombstoned[SeqNum] to reflect the presence of a
// range deletion covering g.key, if there are any. It returns true if
// successful, or false if an error occurred and the caller should abort
// iteration.
func (g *getIter) maybeSetTombstone(rangeDelIter keyspan.FragmentIterator) (ok bool) {
if rangeDelIter == nil {
// Nothing to do.
return true
}
// Find the range deletion that covers the sought key, if any.
t, err := keyspan.Get(g.comparer.Compare, rangeDelIter, g.key)
if err != nil {
g.err = firstError(g.err, err)
return false
}
// Find the most recent visible range deletion's sequence number. We only
// care about the most recent range deletion that's visible because it's the
// "most powerful."
g.tombstonedSeqNum, g.tombstoned = t.LargestVisibleSeqNum(g.snapshot)
if g.err = firstError(g.err, rangeDelIter.Close()); g.err != nil {
return false
}
return true
}