Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 28, 2025
1 parent db93023 commit 4b2cf89
Showing 1 changed file with 3 additions and 27 deletions.
30 changes: 3 additions & 27 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/cockroachdb/pebble"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/logpuller"
Expand Down Expand Up @@ -171,9 +170,6 @@ type eventStore struct {
// use table id as the key is to share data between spans not completely the same in the future.
tableToDispatchers map[int64]map[common.DispatcherID]bool
}

encoder *zstd.Encoder
decoder *zstd.Decoder
}

const (
Expand All @@ -196,16 +192,6 @@ func New(
log.Panic("fail to remove path")
}

// Create the zstd encoder
encoder, err := zstd.NewWriter(nil)
if err != nil {
log.Panic("Failed to create zstd encoder", zap.Error(err))
}

decoder, err := zstd.NewReader(nil)
if err != nil {
log.Panic("Failed to create zstd decoder", zap.Error(err))
}
store := &eventStore{
pdClock: pdClock,
subClient: subClient,
Expand All @@ -215,8 +201,6 @@ func New(
writeTaskPools: make([]*writeTaskPool, 0, dbCount),

gcManager: newGCManager(),
encoder: encoder,
decoder: decoder,
}

// create a write task pool per db instance
Expand Down Expand Up @@ -267,13 +251,10 @@ func (p *writeTaskPool) run(ctx context.Context) {
if !ok {
return
}
log.Info("get write task", zap.Int("eventCount", len(events)))
p.store.writeEvents(p.db, events)
time.Sleep(500 * time.Millisecond)
for i := range events {
events[i].callback()
}
log.Info("get write task done", zap.Int("eventCount", len(events)))
buffer = buffer[:0]
}
}
Expand Down Expand Up @@ -631,7 +612,6 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
startTs: dataRange.StartTs,
endTs: dataRange.EndTs,
rowCount: 0,
decoder: e.decoder,
}, nil
}

Expand Down Expand Up @@ -686,7 +666,6 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro
for _, kv := range event.kvs {
key := EncodeKey(uint64(event.subID), event.tableID, &kv)
value := kv.Encode()
// compressedValue := e.encoder.EncodeAll(value, nil)
if err := batch.Set(key, value, pebble.NoSync); err != nil {
log.Panic("failed to update pebble batch", zap.Error(err))
}
Expand All @@ -698,7 +677,6 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro
metrics.EventStoreWriteBytes.Add(float64(batch.Len()))
start := time.Now()
err := batch.Commit(pebble.NoSync)
log.Info("write events", zap.Int("kvCount", kvCount), zap.Duration("duration", time.Since(start)))
metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000)
return err
}
Expand All @@ -722,7 +700,6 @@ type eventStoreIter struct {
startTs uint64
endTs uint64
rowCount int64
decoder *zstd.Decoder
}

func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) {
Expand All @@ -734,10 +711,9 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) {
}

value := iter.innerIter.Value()
// decompressedValue, err := iter.decoder.DecodeAll(value, nil)
// if err != nil {
// log.Panic("failed to decompress value", zap.Error(err))
// }
// rawKV need reference in the byte slice, so we need copy it here
copiedValue := make([]byte, len(value))
copy(copiedValue, value)
rawKV := &common.RawKVEntry{}
rawKV.Decode(value)
metrics.EventStoreScanBytes.Add(float64(len(value)))
Expand Down

0 comments on commit 4b2cf89

Please sign in to comment.