From 4b2cf89be5a16c3db3389a969cb7b18ed5049393 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 28 Feb 2025 16:58:18 +0800 Subject: [PATCH] fix --- logservice/eventstore/event_store.go | 30 +++------------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index c60ca4a6..805960b4 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -24,7 +24,6 @@ import ( "time" "github.com/cockroachdb/pebble" - "github.com/klauspost/compress/zstd" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller" @@ -171,9 +170,6 @@ type eventStore struct { // use table id as the key is to share data between spans not completely the same in the future. tableToDispatchers map[int64]map[common.DispatcherID]bool } - - encoder *zstd.Encoder - decoder *zstd.Decoder } const ( @@ -196,16 +192,6 @@ func New( log.Panic("fail to remove path") } - // Create the zstd encoder - encoder, err := zstd.NewWriter(nil) - if err != nil { - log.Panic("Failed to create zstd encoder", zap.Error(err)) - } - - decoder, err := zstd.NewReader(nil) - if err != nil { - log.Panic("Failed to create zstd decoder", zap.Error(err)) - } store := &eventStore{ pdClock: pdClock, subClient: subClient, @@ -215,8 +201,6 @@ func New( writeTaskPools: make([]*writeTaskPool, 0, dbCount), gcManager: newGCManager(), - encoder: encoder, - decoder: decoder, } // create a write task pool per db instance @@ -267,13 +251,10 @@ func (p *writeTaskPool) run(ctx context.Context) { if !ok { return } - log.Info("get write task", zap.Int("eventCount", len(events))) p.store.writeEvents(p.db, events) - time.Sleep(500 * time.Millisecond) for i := range events { events[i].callback() } - log.Info("get write task done", zap.Int("eventCount", len(events))) buffer = buffer[:0] } } @@ -631,7 +612,6 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com startTs: dataRange.StartTs, endTs: dataRange.EndTs, rowCount: 0, - decoder: e.decoder, }, nil } @@ -686,7 +666,6 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro for _, kv := range event.kvs { key := EncodeKey(uint64(event.subID), event.tableID, &kv) value := kv.Encode() - // compressedValue := e.encoder.EncodeAll(value, nil) if err := batch.Set(key, value, pebble.NoSync); err != nil { log.Panic("failed to update pebble batch", zap.Error(err)) } @@ -698,7 +677,6 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro metrics.EventStoreWriteBytes.Add(float64(batch.Len())) start := time.Now() err := batch.Commit(pebble.NoSync) - log.Info("write events", zap.Int("kvCount", kvCount), zap.Duration("duration", time.Since(start))) metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000) return err } @@ -722,7 +700,6 @@ type eventStoreIter struct { startTs uint64 endTs uint64 rowCount int64 - decoder *zstd.Decoder } func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) { @@ -734,10 +711,9 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) { } value := iter.innerIter.Value() - // decompressedValue, err := iter.decoder.DecodeAll(value, nil) - // if err != nil { - // log.Panic("failed to decompress value", zap.Error(err)) - // } + // rawKV need reference in the byte slice, so we need copy it here + copiedValue := make([]byte, len(value)) + copy(copiedValue, value) rawKV := &common.RawKVEntry{} rawKV.Decode(value) metrics.EventStoreScanBytes.Add(float64(len(value)))