Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: fix cases where SeekPrefixGE prefix doesn't match key #3845

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/base/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (s Split) Prefix(k []byte) []byte {
return k[:i:i]
}

// HasPrefix returns true if the given key has the given prefix.
func (s Split) HasPrefix(prefix, key []byte) bool {
i := s(key)
return bytes.Equal(prefix, key[:i:i])
}

// DefaultSplit is a trivial implementation of Split which always returns the
// full key.
var DefaultSplit Split = func(key []byte) int { return len(key) }
Expand Down
11 changes: 3 additions & 8 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,23 +1476,18 @@ func (i *Iterator) SeekPrefixGE(key []byte) bool {
}
// Make a copy of the prefix so that modifications to the key after
// SeekPrefixGE returns does not affect the stored prefix.
if cap(i.prefixOrFullSeekKey) < prefixLen {
i.prefixOrFullSeekKey = make([]byte, prefixLen)
} else {
i.prefixOrFullSeekKey = i.prefixOrFullSeekKey[:prefixLen]
}
i.hasPrefix = true
copy(i.prefixOrFullSeekKey, keyPrefix)
i.prefixOrFullSeekKey = append(i.prefixOrFullSeekKey[:0], keyPrefix...)

if lowerBound := i.opts.GetLowerBound(); lowerBound != nil && i.cmp(key, lowerBound) < 0 {
if p := i.comparer.Split.Prefix(lowerBound); !bytes.Equal(i.prefixOrFullSeekKey, p) {
if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, lowerBound) {
i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of lower bound")
i.iterValidityState = IterExhausted
return false
}
key = lowerBound
} else if upperBound := i.opts.GetUpperBound(); upperBound != nil && i.cmp(key, upperBound) > 0 {
if p := i.comparer.Split.Prefix(upperBound); !bytes.Equal(i.prefixOrFullSeekKey, p) {
if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, upperBound) {
i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of upper bound")
i.iterValidityState = IterExhausted
return false
Expand Down
3 changes: 3 additions & 0 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ func (l *levelIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV
}

func (l *levelIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
if invariants.Enabled && !l.split.HasPrefix(prefix, key) {
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
}
if invariants.Enabled && l.lower != nil && l.cmp(key, l.lower) < 0 {
panic(errors.AssertionFailedf("levelIter SeekGE to key %q violates lower bound %q", key, l.lower))
}
Expand Down
43 changes: 28 additions & 15 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,9 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
// P2. Care is taken to avoid ever advancing the iterator beyond the current
// prefix. If nextEntry is ever invoked while we're already beyond the
// current prefix, we're violating the invariant.
if invariants.Enabled && m.prefix != nil {
if p := m.split.Prefix(l.iterKV.K.UserKey); !bytes.Equal(m.prefix, p) {
m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
m.prefix, l.iterKV, debug.Stack())
}
if invariants.Enabled && m.prefix != nil && !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) {
m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
m.prefix, l.iterKV, debug.Stack())
}

oldTopLevel := l.index
Expand Down Expand Up @@ -905,6 +903,10 @@ func (m *mergingIter) findPrevEntry() *base.InternalKV {
//
// If an error occurs, seekGE returns the error without setting m.err.
func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error {
if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
}

// When seeking, we can use tombstones to adjust the key we seek to on each
// level. Consider the series of range tombstones:
//
Expand Down Expand Up @@ -957,15 +959,11 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
}

for ; level < len(m.levels); level++ {
if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
}

l := &m.levels[level]
if m.prefix != nil {
l.iterKV = l.iter.SeekPrefixGE(m.prefix, key, flags)
if l.iterKV != nil {
if !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
if !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) {
// Prevent keys without a matching prefix from being added to the heap by setting
// iterKey and iterValue to their zero values before calling initMinHeap.
l.iterKV = nil
Expand Down Expand Up @@ -999,7 +997,21 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
// Based on the containment condition tombstone.End > key, so
// the assignment to key results in a monotonically
// non-decreasing key across iterations of this loop.
//
if m.prefix != nil && !m.split.HasPrefix(m.prefix, l.tombstone.End) {
// Any keys with m.prefix on subsequent levels are under the tombstone.
// We still need to perform the seeks, in case the next seek uses
// the TrySeekUsingNext flag.
for level++; level < len(m.levels); level++ {
l := &m.levels[level]
if kv := l.iter.SeekPrefixGE(m.prefix, key, flags); kv == nil {
if err := l.iter.Error(); err != nil {
return err
}
}
l.iterKV = nil
}
break
}
// The adjustment of key here can only move it to a larger key.
// Since the caller of seekGE guaranteed that the original key
// was greater than or equal to m.lower, the new key will
Expand Down Expand Up @@ -1037,17 +1049,18 @@ func (m *mergingIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *
func (m *mergingIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) *base.InternalKV {
if invariants.Enabled && !m.split.HasPrefix(prefix, key) {
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
}
m.prefix = prefix
m.err = m.seekGE(key, 0 /* start level */, flags)
if m.err != nil {
return nil
}

iterKV := m.findNextEntry()
if invariants.Enabled && iterKV != nil {
if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
}
if invariants.Enabled && iterKV != nil && !m.split.HasPrefix(m.prefix, iterKV.K.UserKey) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
}
return iterKV
}
Expand Down
37 changes: 29 additions & 8 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,13 @@ type singleLevelIterator[D any, PD block.DataBlockIterator[D]] struct {
// present, should be used for prefix seeks or not. In some cases it is
// beneficial to skip a filter block even if it exists (eg. if probability of
// a match is high).
useFilterBlock bool
lastBloomFilterMatched bool
useFilterBlock bool

// didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE
// or SeekPrefixGE without positioning the iterator internally. If this flag
// is set, the TrySeekUsingNext optimization is disabled on the next seek.
// This happens for example when the bloom filter excludes a prefix.
didNotPositionOnLastSeekGE bool

transforms IterTransforms

Expand Down Expand Up @@ -665,6 +670,11 @@ func (i *singleLevelIterator[D, PD]) SeekGE(key []byte, flags base.SeekGEFlags)
key = i.lower
}
}
if i.didNotPositionOnLastSeekGE {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
i.didNotPositionOnLastSeekGE = false
}

if flags.TrySeekUsingNext() {
// The i.exhaustedBounds comparison indicates that the upper bound was
Expand Down Expand Up @@ -817,6 +827,12 @@ func (i *singleLevelIterator[D, PD]) SeekPrefixGE(
// TODO(bananabrick): We can optimize away this check for the level iter
// if necessary.
if i.cmp(key, i.lower) < 0 {
if !i.reader.Split.HasPrefix(prefix, i.lower) {
i.err = nil // clear any cached iteration error
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
key = i.lower
}
}
Expand All @@ -826,18 +842,22 @@ func (i *singleLevelIterator[D, PD]) SeekPrefixGE(
func (i *singleLevelIterator[D, PD]) seekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (kv *base.InternalKV) {
if invariants.Enabled && !i.reader.Split.HasPrefix(prefix, key) {
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
}
if i.didNotPositionOnLastSeekGE {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
i.didNotPositionOnLastSeekGE = false
}

// NOTE: prefix is only used for bloom filter checking and not later work in
// this method. Hence, we can use the existing iterator position if the last
// SeekPrefixGE did not fail bloom filter matching.

err := i.err
i.err = nil // clear cached iteration error
if i.useFilterBlock {
if !i.lastBloomFilterMatched {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
}
i.lastBloomFilterMatched = false
// Check prefix bloom filter.
var mayContain bool
mayContain, i.err = i.bloomFilterMayContain(prefix)
Expand All @@ -848,9 +868,10 @@ func (i *singleLevelIterator[D, PD]) seekPrefixGE(
// since the caller was allowed to call Next when SeekPrefixGE returned
// nil. This is no longer allowed.
PD(&i.data).Invalidate()
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
i.lastBloomFilterMatched = true
}
if flags.TrySeekUsingNext() {
// The i.exhaustedBounds comparison indicates that the upper bound was
Expand Down
29 changes: 19 additions & 10 deletions sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type twoLevelIterator[D any, PD block.DataBlockIterator[D]] struct {
// useFilterBlock controls whether we consult the bloom filter in the
// twoLevelIterator code. Note that secondLevel.useFilterBlock is always
// false - any filtering happens at the top level.
useFilterBlock bool
lastBloomFilterMatched bool
useFilterBlock bool
// didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE
// or SeekPrefixGE without positioning the iterator internally. If this flag
// is set, the TrySeekUsingNext optimization is disabled on the next seek.
// This happens for example when the bloom filter excludes a prefix.
didNotPositionOnLastSeekGE bool
}

var _ Iterator = (*twoLevelIterator[rowblk.Iter, *rowblk.Iter])(nil)
Expand Down Expand Up @@ -371,9 +375,19 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(
// TODO(bananabrick): We can optimize away this check for the level iter
// if necessary.
if i.secondLevel.cmp(key, i.secondLevel.lower) < 0 {
if !i.secondLevel.reader.Split.HasPrefix(prefix, i.secondLevel.lower) {
i.secondLevel.err = nil // clear any cached iteration error
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
key = i.secondLevel.lower
}
}
if i.didNotPositionOnLastSeekGE {
flags = flags.DisableTrySeekUsingNext()
i.didNotPositionOnLastSeekGE = false
}

// NOTE: prefix is only used for bloom filter checking and not later work in
// this method. Hence, we can use the existing iterator position if the last
Expand All @@ -385,8 +399,7 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(
// The twoLevelIterator could be already exhausted. Utilize that when
// trySeekUsingNext is true. See the comment about data-exhausted, PGDE, and
// bounds-exhausted near the top of the file.
filterUsedAndDidNotMatch := i.useFilterBlock && !i.lastBloomFilterMatched
if flags.TrySeekUsingNext() && !filterUsedAndDidNotMatch &&
if flags.TrySeekUsingNext() &&
(i.secondLevel.exhaustedBounds == +1 || (PD(&i.secondLevel.data).IsDataInvalidated() && i.secondLevel.index.IsDataInvalidated())) &&
err == nil {
// Already exhausted, so return nil.
Expand All @@ -395,11 +408,6 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(

// Check prefix bloom filter.
if i.useFilterBlock {
if !i.lastBloomFilterMatched {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
}
i.lastBloomFilterMatched = false
var mayContain bool
mayContain, i.secondLevel.err = i.secondLevel.bloomFilterMayContain(prefix)
if i.secondLevel.err != nil || !mayContain {
Expand All @@ -409,9 +417,10 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(
// since the caller was allowed to call Next when SeekPrefixGE returned
// nil. This is no longer allowed.
PD(&i.secondLevel.data).Invalidate()
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
i.lastBloomFilterMatched = true
}

// Bloom filter matches.
Expand Down
10 changes: 10 additions & 0 deletions sstable/testdata/virtual_reader_iter
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ bounds: [dd#5,SET-ddd#6,SET]
# Check lower bound enforcement during SeekPrefixGE.
iter
seek-prefix-ge d
----
.

iter
seek-prefix-ge dd
next
next
----
Expand Down Expand Up @@ -292,6 +297,11 @@ bounds: [dd#5,SET-ddd#6,SET]
# Check lower bound enforcement during SeekPrefixGE.
iter
seek-prefix-ge d
----
.

iter
seek-prefix-ge dd
next
next
----
Expand Down
Loading