Skip to content

Commit

Permalink
db: add optional ValidateKey Comparer func
Browse files Browse the repository at this point in the history
Add a new optional ValidateKey func to the Comparer type. There are many
instances where we reconsititute or rewrite user keys. When the Comparer
provides a way to assert that a key is well structured, we can catch any
malformed keys closer to the moment of corruption.

The AssertComparer is updated to make use of ValidateKey, and the CheckComparer
func is updated to ensure that all the keys it constructs validate if
ValidateKey is provided. Additionally, the metamorphic key generator is updated
to validate new keys at generation time.

Fixes #3825.
  • Loading branch information
jbowens committed Jan 27, 2025
1 parent 303c885 commit 1157615
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 40 deletions.
21 changes: 12 additions & 9 deletions cockroachkvs/cockroachkvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ var Comparer = base.Comparer{
return append(append(dst, a...), 0)
},

ValidateKey: validateEngineKey,

Name: "cockroach_comparator",
}

Expand Down Expand Up @@ -266,7 +268,7 @@ func Split(key []byte) int {
return 0
}
if invariants.Enabled {
checkEngineKey(key)
validateEngineKey.MustValidate(key)
}
// Last byte is the version length + 1 when there is a version, else it is
// 0.
Expand All @@ -281,8 +283,8 @@ func Compare(a, b []byte) int {
return cmp.Compare(len(a), len(b))
}
if invariants.Enabled {
checkEngineKey(a)
checkEngineKey(b)
validateEngineKey.MustValidate(a)
validateEngineKey.MustValidate(b)
}
aSuffixLen := int(a[len(a)-1])
aSuffixStart := len(a) - aSuffixLen
Expand Down Expand Up @@ -343,8 +345,8 @@ func Equal(a, b []byte) bool {
return len(a) == len(b)
}
if invariants.Enabled {
checkEngineKey(a)
checkEngineKey(b)
validateEngineKey.MustValidate(a)
validateEngineKey.MustValidate(b)
}
aSuffixLen := int(a[len(a)-1])
aSuffixStart := len(a) - aSuffixLen
Expand Down Expand Up @@ -983,16 +985,17 @@ func (ks *cockroachKeySeeker) MaterializeUserKeyWithSyntheticSuffix(
//go:linkname memmove runtime.memmove
func memmove(to, from unsafe.Pointer, n uintptr)

func checkEngineKey(k []byte) {
var validateEngineKey base.ValidateKey = func(k []byte) error {
if len(k) == 0 {
panic(errors.AssertionFailedf("empty key"))
return errors.AssertionFailedf("empty key")
}
if int(k[len(k)-1]) >= len(k) {
panic(errors.AssertionFailedf("malformed key terminator byte: %x", k))
return errors.AssertionFailedf("malformed key terminator byte: %x", k)
}
if k[len(k)-1] == 1 {
panic(errors.AssertionFailedf("invalid key terminator byte 1"))
return errors.AssertionFailedf("invalid key terminator byte 1 in key: %x", k)
}
return nil
}

// FormatKey returns a formatter for the user key.
Expand Down
4 changes: 2 additions & 2 deletions cockroachkvs/cockroachkvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func parseUserKey(userKeyStr string) []byte {
}
}
k := append(append([]byte(roachKey), 0), parseVersion(versionStr)...)
checkEngineKey(k)
validateEngineKey.MustValidate(k)
return k
}

Expand All @@ -657,7 +657,7 @@ func parseVersion(versionStr string) []byte {
if len(ret) != 14 && len(ret) != 10 {
panic(fmt.Sprintf("expected 10 or 14-length ret got %d", len(ret)))
}
checkEngineKey(ret)
validateEngineKey.MustValidate(ret)
// TODO(jackson): Refactor to allow us to generate a suffix without a
// sentinel byte rather than stripping it like this.
return ret[1:]
Expand Down
2 changes: 1 addition & 1 deletion cockroachkvs/key_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestKeySchema_RandomKeys(t *testing.T) {
if n := len(kv.K.UserKey); n > len(keys[k]) {
t.Fatalf("key %q is longer than original key %q", kv.K.UserKey, keys[k])
}
checkEngineKey(kv.K.UserKey)
validateEngineKey.MustValidate(kv.K.UserKey)

// We write keys[k] as the value too, so check that it's verbatim equal.
value, callerOwned, err := kv.V.Value(valBuf)
Expand Down
8 changes: 4 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
Err: err,
}
if err == nil {
validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey, d.opts.Logger)
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
for i := range ve.NewFiles {
e := &ve.NewFiles[i]
info.Output = append(info.Output, e.Meta.TableInfo())
Expand Down Expand Up @@ -2356,7 +2356,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey, d.opts.Logger)
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
err = func() error {
var err error
d.mu.versions.logLock()
Expand Down Expand Up @@ -3241,10 +3241,10 @@ func (d *DB) newCompactionOutput(
// validateVersionEdit validates that start and end keys across new and deleted
// files in a versionEdit pass the given validation function.
func validateVersionEdit(
ve *versionEdit, validateFn func([]byte) error, format base.FormatKey, logger Logger,
ve *versionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
) {
validateKey := func(f *manifest.FileMetadata, key []byte) {
if err := validateFn(key); err != nil {
if err := vk.Validate(key); err != nil {
logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func ingestValidateKey(opts *Options, key *InternalKey) error {
return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
key.Pretty(opts.Comparer.FormatKey))
}
if err := opts.Comparer.ValidateKey.Validate(key.UserKey); err != nil {
return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s, %w",
key.Pretty(opts.Comparer.FormatKey), err)
}
return nil
}

Expand Down
69 changes: 62 additions & 7 deletions internal/base/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ type Comparer struct {
// FormatValue is optional.
FormatValue FormatValue

// ValidateKey is an optional function that determines whether a key is
// valid according to this Comparer's key encoding.
ValidateKey ValidateKey

// Name is the name of the comparer.
//
// The on-disk format stores the comparer name, and opening a database with a
Expand Down Expand Up @@ -439,13 +443,51 @@ func MakeAssertComparer(c Comparer) Comparer {
ComparePointSuffixes: c.ComparePointSuffixes,
CompareRangeSuffixes: c.CompareRangeSuffixes,
AbbreviatedKey: c.AbbreviatedKey,
Separator: c.Separator,
Successor: c.Successor,
ImmediateSuccessor: c.ImmediateSuccessor,
FormatKey: c.FormatKey,
Split: c.Split,
FormatValue: c.FormatValue,
Name: c.Name,
Separator: func(dst, a, b []byte) []byte {
ret := c.Separator(dst, a, b)
// The Separator func must return a valid key.
c.ValidateKey.MustValidate(ret)
return ret
},
Successor: func(dst, a []byte) []byte {
ret := c.Successor(dst, a)
// The Successor func must return a valid key.
c.ValidateKey.MustValidate(ret)
return ret
},
ImmediateSuccessor: func(dst, a []byte) []byte {
ret := c.ImmediateSuccessor(dst, a)
// The ImmediateSuccessor func must return a valid key.
c.ValidateKey.MustValidate(ret)
return ret
},
FormatKey: c.FormatKey,
Split: c.Split,
FormatValue: c.FormatValue,
ValidateKey: c.ValidateKey,
Name: c.Name,
}
}

// ValidateKey is a func that determines whether a key is valid according to a
// particular key encoding. Returns nil if the provided key is a valid, full
// user key. Implementations must be careful to not mutate the provided key.
type ValidateKey func([]byte) error

// Validate validates the provided user key. If the func is nil, Validate
// returns nil.
func (v ValidateKey) Validate(key []byte) error {
if v == nil {
return nil
}
return v(key)
}

// MustValidate validates the provided user key, panicking if the key is
// invalid.
func (v ValidateKey) MustValidate(key []byte) {
if err := v.Validate(key); err != nil {
panic(err)
}
}

Expand Down Expand Up @@ -489,6 +531,13 @@ func CheckComparer(c *Comparer, prefixes [][]byte, suffixes [][]byte) error {
for i := 1; i < len(suffixes); i++ {
a := slices.Concat(p, suffixes[i-1])
b := slices.Concat(p, suffixes[i])
if err := c.ValidateKey.Validate(a); err != nil {
return err
}
if err := c.ValidateKey.Validate(b); err != nil {
return err
}

// Make sure the Compare function agrees with ComparePointSuffixes.
if cmp := c.Compare(a, b); cmp > 0 {
return errors.Errorf("Compare(%s, %s)=%d, expected <= 0", c.FormatKey(a), c.FormatKey(b), cmp)
Expand All @@ -500,9 +549,15 @@ func CheckComparer(c *Comparer, prefixes [][]byte, suffixes [][]byte) error {
for _, ap := range prefixes {
for _, as := range suffixes {
a := slices.Concat(ap, as)
if err := c.ValidateKey.Validate(a); err != nil {
return err
}
for _, bp := range prefixes {
for _, bs := range suffixes {
b := slices.Concat(bp, bs)
if err := c.ValidateKey.Validate(b); err != nil {
return err
}
result := c.Compare(a, b)
if (result == 0) != c.Equal(a, b) {
return errors.Errorf("Equal(%s, %s) doesn't agree with Compare", c.FormatKey(a), c.FormatKey(b))
Expand Down
16 changes: 15 additions & 1 deletion internal/testkeys/testkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
)

Expand Down Expand Up @@ -106,7 +107,20 @@ var Comparer = &base.Comparer{
return append(append(dst, a...), 0x00)
},
Split: split,
Name: "pebble.internal.testkeys",
ValidateKey: func(k []byte) error {
// Ensure that if the key has a suffix, it's a valid integer
// (potentially modulo a faux synthetic bit suffix).
k = bytes.TrimSuffix(k, ignoreTimestampSuffix)
i := split(k)
if i == len(k) {
return nil
}
if _, err := parseUintBytes(k[i+1:], 10, 64); err != nil {
return errors.Wrapf(err, "invalid key %q", k)
}
return nil
},
Name: "pebble.internal.testkeys",
}

// The comparator is similar to the one in Cockroach; when the prefixes are
Expand Down
2 changes: 2 additions & 0 deletions metamorphic/key_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func newKeyManager(numInstances int, kf KeyFormat) *keyManager {
// addNewKey adds the given key to the key manager for global key tracking.
// Returns false iff this is not a new key.
func (k *keyManager) addNewKey(key []byte) bool {
k.kf.Comparer.ValidateKey.MustValidate(key)
if k.globalKeysMap[string(key)] {
return false
}
Expand All @@ -409,6 +410,7 @@ func (k *keyManager) addNewKey(key []byte) bool {
// getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else
// allocates, initializes and returns a new value.
func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta {
k.kf.Comparer.ValidateKey.MustValidate(key)
objKeys := k.objKeyMeta(id)
m, ok := objKeys.keys[string(key)]
if ok {
Expand Down
16 changes: 0 additions & 16 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,19 +654,6 @@ type Options struct {
// limited by runtime.GOMAXPROCS.
FileCacheShards int

// KeyValidationFunc is a function to validate a user key in an SSTable.
//
// Currently, this function is used to validate the smallest and largest
// keys in an SSTable undergoing compaction. In this case, returning an
// error from the validation function will result in a panic at runtime,
// given that there is rarely any way of recovering from malformed keys
// present in compacted files. By default, validation is not performed.
//
// Additional use-cases may be added in the future.
//
// NOTE: callers should take care to not mutate the key being validated.
KeyValidationFunc func(userKey []byte) error

// ValidateOnIngest schedules validation of sstables after they have
// been ingested.
//
Expand Down Expand Up @@ -1165,9 +1152,6 @@ func (o *Options) EnsureDefaults() *Options {
if o.Experimental.CompactionLimiter == nil {
o.Experimental.CompactionLimiter = &base.DefaultCompactionLimiter{}
}
if o.Experimental.KeyValidationFunc == nil {
o.Experimental.KeyValidationFunc = func([]byte) error { return nil }
}
if o.KeySchema == "" && len(o.KeySchemas) == 0 {
ks := colblk.DefaultKeySchema(o.Comparer, 16 /* bundleSize */)
o.KeySchema = ks.Name
Expand Down
4 changes: 4 additions & 0 deletions sstable/rowblk/rowblk_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/sstable/block"
)

Expand Down Expand Up @@ -78,6 +79,9 @@ func (r *Rewriter) RewriteSuffixes(
// in the block, which includes the 1-byte prefix. This is fine since bw
// also does not know about the prefix and will preserve it in bw.add.
v := kv.InPlaceValue()
if invariants.Enabled && invariants.Sometimes(10) {
r.comparer.ValidateKey.MustValidate(r.scratchKey.UserKey)
}
r.writer.Add(r.scratchKey, v)
if start.UserKey == nil {
// Copy the first key.
Expand Down
6 changes: 6 additions & 0 deletions sstable/rowblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,12 @@ func (w *RawRowWriter) indexEntrySep(
} else {
sep = prevKey.Separator(w.compare, w.separator, dataBlockBuf.sepScratch[:0], key)
}
if invariants.Enabled && invariants.Sometimes(25) {
if w.compare(prevKey.UserKey, sep.UserKey) > 0 {
panic(errors.AssertionFailedf("prevKey.UserKey > sep.UserKey: %s > %s",
prevKey.UserKey, sep.UserKey))
}
}
return sep
}

Expand Down
15 changes: 15 additions & 0 deletions sstable/suffix_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func rewriteDataBlocksInParallel(
if err != nil {
return err
}
if err := r.Comparer.ValidateKey.Validate(output[i].start.UserKey); err != nil {
return err
}
if err := r.Comparer.ValidateKey.Validate(output[i].end.UserKey); err != nil {
return err
}
compressedBuf = compressedBuf[:cap(compressedBuf)]
finished := block.CompressAndChecksum(&compressedBuf, outputBlock, opts.Compression, &checksummer)
output[i].physical = finished.CloneWithByteAlloc(&blockAlloc)
Expand Down Expand Up @@ -219,6 +225,12 @@ func rewriteRangeKeyBlockToWriter(r *Reader, w RawWriter, from, to []byte) error
if !s.Valid() {
break
}
if err := r.Comparer.ValidateKey.Validate(s.Start); err != nil {
return err
}
if err := r.Comparer.ValidateKey.Validate(s.End); err != nil {
return err
}
for i := range s.Keys {
if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
return errBadKind
Expand Down Expand Up @@ -328,6 +340,9 @@ func RewriteKeySuffixesViaWriter(
if err != nil {
return nil, err
}
if invariants.Enabled && invariants.Sometimes(10) {
r.Comparer.ValidateKey.MustValidate(scratch.UserKey)
}
if err := w.AddWithForceObsolete(scratch, val, false); err != nil {
return nil, err
}
Expand Down

0 comments on commit 1157615

Please sign in to comment.