Skip to content

Commit

Permalink
db: rework mergingIter and levelIter synthetic keys
Browse files Browse the repository at this point in the history
The merging iterator and the level iterator are more tightly coupled than
desired. Most of this coupling is a consequence of range deletions. The merging
iterator is responsible for applying range deletions across levels and requires
that the level iterator not progress to a new file until the file's range
deletions are no longer relevant to other levels. It does this by interleaving
"synthetic" keys. When these keys reach the top of the heap, it signals that
all other levels have progressed to a point where the current file's range
deletions are no longer relevant.

Previously, knowledge of which keys were interleaved "synthetic" keys was
propagated indirectly—outside the internal iterator interface—through a pointer
to a levelIterBoundaryContext struct with boolean fields. This commit instead
always interleaves synthetic keys as range deletion exclusive sentinels. The
merging iterator can infer which keys are synthetic by calling
InternalKey.IsExclusiveSentinel. Propagating this information directly through
the internal iterator interface is more direct and understandable.

Beyond needing to know which keys are synthetic, the merging iterator also must
know when user-imposed iteration bounds have been reached. Ordinarily when
bounds have been reached, internal iterators become exhausted. Since the
levelIter's file's range deletions may still be relevant, it cannot and it
interleaves a "synthetic" key at the iteration bound. When this key reaches the
top of the merging iterator's heap, there are no more keys within bounds and
the merging iterator is exhausted. To make this determination, we add a simple
key comparison. The use of a key comparison is expected to be acceptable,
because it's only performed when a synthetic key reaches the top of the heap.
This additional key comparison should ultimately be eliminated when cockroachdb#2863 is
complete.

Informs cockroachdb#2863.
  • Loading branch information
jbowens committed Apr 24, 2024
1 parent 16950aa commit a28f336
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 272 deletions.
1 change: 0 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,6 @@ func (i *Iterator) constructPointIter(

li.init(ctx, i.opts, &i.comparer, i.newIters, files, level, internalOpts)
li.initRangeDel(&mlevels[mlevelsIndex].rangeDelIter)
li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
li.initCombinedIterState(&i.lazyCombinedIter.combinedIterState)
mlevels[mlevelsIndex].levelIter = li
mlevels[mlevelsIndex].iter = invalidating.MaybeWrapIfInvariants(li)
Expand Down
86 changes: 41 additions & 45 deletions level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
type simpleMergingIterLevel struct {
iter internalIterator
rangeDelIter keyspan.FragmentIterator
levelIterBoundaryContext

iterKV *base.InternalKV
tombstone *keyspan.Span
Expand Down Expand Up @@ -129,8 +128,7 @@ func (m *simpleMergingIter) step() bool {
item := &m.heap.items[0]
l := &m.levels[item.index]
// Sentinels are not relevant for this point checking.
if !l.isIgnorableBoundaryKey && !item.key.IsExclusiveSentinel() &&
item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
// This is a visible point key.
if !m.handleVisiblePoint(item, l) {
return false
Expand All @@ -144,50 +142,50 @@ func (m *simpleMergingIter) step() bool {

// Step to the next point.
l.iterKV = l.iter.Next()
if !l.isIgnorableBoundaryKey {
if l.iterKV != nil {
// Check point keys in an sstable are ordered. Although not required, we check
// for memtables as well. A subtle check here is that successive sstables of
// L1 and higher levels are ordered. This happens when levelIter moves to the
// next sstable in the level, in which case item.key is previous sstable's
// last point key.
if base.InternalCompare(m.heap.cmp, item.key, l.iterKV.K) >= 0 {
m.err = errors.Errorf("out of order keys %s >= %s in %s",
item.key.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
return false
}
item.key = base.InternalKey{
Trailer: l.iterKV.K.Trailer,
UserKey: append(item.key.UserKey[:0], l.iterKV.K.UserKey...),
}
item.value = l.iterKV.V
if m.heap.len() > 1 {
m.heap.fix(0)
}
} else {
m.err = l.iter.Close()
l.iter = nil
m.heap.pop()
}
if m.err != nil {
if l.iterKV == nil {
m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
l.iter = nil
m.heap.pop()
} else if !l.iterKV.K.IsExclusiveSentinel() {
// Check point keys in an sstable are ordered. Although not required, we check
// for memtables as well. A subtle check here is that successive sstables of
// L1 and higher levels are ordered. This happens when levelIter moves to the
// next sstable in the level, in which case item.key is previous sstable's
// last point key.
if base.InternalCompare(m.heap.cmp, item.key, l.iterKV.K) >= 0 {
m.err = errors.Errorf("out of order keys %s >= %s in %s",
item.key.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
return false
}
if m.heap.len() == 0 {
// Last record was a MERGE record.
if m.valueMerger != nil {
var closer io.Closer
_, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
if m.err == nil && closer != nil {
m.err = closer.Close()
}
if m.err != nil {
m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
item.key.Pretty(m.formatKey), m.lastIterMsg)
}
m.valueMerger = nil
item.key = base.InternalKey{
Trailer: l.iterKV.K.Trailer,
UserKey: append(item.key.UserKey[:0], l.iterKV.K.UserKey...),
}
item.value = l.iterKV.V
if m.heap.len() > 1 {
m.heap.fix(0)
}
}
if m.err != nil {
return false
}
if m.heap.len() == 0 {
// If m.valueMerger != nil, the last record was a MERGE record.
if m.valueMerger != nil {
var closer io.Closer
var err error
_, closer, err = m.valueMerger.Finish(true /* includesBase */)
if closer != nil {
err = errors.CombineErrors(err, closer.Close())
}
return false
if err != nil {
m.err = errors.CombineErrors(m.err,
errors.Wrapf(err, "merge processing error on key %s in %s",
item.key.Pretty(m.formatKey), m.lastIterMsg))
}
m.valueMerger = nil
}
return false
}
m.positionRangeDels()
return true
Expand Down Expand Up @@ -639,7 +637,6 @@ func checkLevelsInternal(c *checkConfig) (err error) {
li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
manifest.L0Sublevel(sublevel), internalIterOpts{})
li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
mlevelAlloc[0].iter = li
mlevelAlloc = mlevelAlloc[1:]
}
Expand All @@ -653,7 +650,6 @@ func checkLevelsInternal(c *checkConfig) (err error) {
li.init(context.Background(), iterOpts, c.comparer, c.newIters,
current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
mlevelAlloc[0].iter = li
mlevelAlloc = mlevelAlloc[1:]
}
Expand Down
Loading

0 comments on commit a28f336

Please sign in to comment.