Skip to content

Commit

Permalink
use lz4
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 27, 2025
1 parent d91a931 commit 38b8e19
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
15 changes: 13 additions & 2 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,13 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro
for _, kv := range event.kvs {
key := EncodeKey(uint64(event.subID), event.tableID, &kv)
value := kv.Encode()
if err := batch.Set(key, value, pebble.NoSync); err != nil {
compressedValue, err := compressData(value)
if err != nil {
log.Panic("failed to compress data", zap.Error(err))
}
ratio := float64(len(value)) / float64(len(compressedValue))
metrics.EventStoreCompressRatio.Set(ratio)
if err := batch.Set(key, compressedValue, pebble.NoSync); err != nil {
log.Panic("failed to update pebble batch", zap.Error(err))
}
}
Expand Down Expand Up @@ -715,8 +721,13 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) {
}

value := iter.innerIter.Value()
decompressedValue, err := decompressData(value)
if err != nil {
log.Panic("failed to decompress value", zap.Error(err))
}
metrics.EventStoreScanBytes.Add(float64(len(decompressedValue)))
rawKV := &common.RawKVEntry{}
rawKV.Decode(value)
rawKV.Decode(decompressedValue)
isNewTxn := false
if iter.prevCommitTs == 0 || (rawKV.StartTs != iter.prevStartTs || rawKV.CRTs != iter.prevCommitTs) {
isNewTxn = true
Expand Down
48 changes: 47 additions & 1 deletion logservice/eventstore/pebble.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package eventstore

import (
"bytes"
"fmt"
"math"
"sync"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/pierrec/lz4"
"github.com/pingcap/log"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -52,7 +55,12 @@ func newPebbleOptions(dbNum int) *pebble.Options {
l.FilterPolicy = bloom.FilterPolicy(10)
l.FilterType = pebble.TableFilter
l.TargetFileSize = 32 << 20 // 32 MB
l.Compression = pebble.SnappyCompression
if i == 0 {
// level 0 with no compression for better write performance
l.Compression = pebble.NoCompression
} else {
l.Compression = pebble.SnappyCompression
}
l.EnsureDefaults()
}
opts.Levels[6].FilterPolicy = nil
Expand All @@ -77,3 +85,41 @@ func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB {
}
return dbs
}

var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

func compressData(data []byte) ([]byte, error) {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()

zw := lz4.NewWriter(buf)
_, err := zw.Write(data)
if err != nil {
return nil, err
}

if err := zw.Close(); err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func decompressData(compressed []byte) ([]byte, error) {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()

zr := lz4.NewReader(bytes.NewReader(compressed))
_, err := buf.ReadFrom(zr)
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

0 comments on commit 38b8e19

Please sign in to comment.