Skip to content

Commit

Permalink
mounter(ticdc): calculate row level checksum for timestmap by using U…
Browse files Browse the repository at this point in the history
…TC time zone (#10564) (#10646)

close #10573
  • Loading branch information
ti-chi-bot authored Apr 28, 2024
1 parent 3ab4763 commit 47417d7
Showing 1 changed file with 48 additions and 39 deletions.
87 changes: 48 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ type mounter struct {
// they should not be nil after decode at least one event in the row format v2.
decoder *rowcodec.DatumMapDecoder
preDecoder *rowcodec.DatumMapDecoder

// encoder is used to calculate the checksum.
encoder *rowcodec.Encoder
}

// NewMounter creates a mounter
Expand All @@ -107,8 +104,6 @@ func NewMounter(schemaStorage SchemaStorage,
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
integrity: integrity,

encoder: &rowcodec.Encoder{},
}
}

Expand Down Expand Up @@ -398,14 +393,40 @@ func datum2Column(
return cols, rawCols, columnInfos, rowColumnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
func (m *mounter) calculateChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
column := rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
}
columns = append(columns, column)
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
})

calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum(m.tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

// return error when calculate the checksum.
// return false if the checksum is not matched
// return true if the checksum is matched and the checksum is the matched one.
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
) (uint32, int, bool, error) {
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, 0, true, nil
return 0, true, nil
}

var decoder *rowcodec.DatumMapDecoder
Expand All @@ -415,68 +436,50 @@ func (m *mounter) verifyChecksum(
decoder = m.decoder
}
if decoder == nil {
return 0, 0, false, errors.New("cannot found the decoder to get the checksum")
return 0, false, errors.New("cannot found the decoder to get the checksum")
}

version := decoder.ChecksumVersion()
// if the checksum cannot be found, which means the upstream TiDB checksum is not enabled,
// so return matched as true to skip check the event.
first, ok := decoder.GetChecksum()
if !ok {
return 0, version, true, nil
}

columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
columns = append(columns, rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
})
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
})
calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
return 0, true, nil
}

checksum, err := calculator.Checksum(m.tz)
checksum, err := m.calculateChecksum(columnInfos, rawColumns)
if err != nil {
log.Error("failed to calculate the checksum", zap.Error(err))
return 0, version, false, errors.Trace(err)
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, false, errors.Trace(err)
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, version, true, nil
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if !ok {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, version,
false, errors.New("cannot found the extra checksum from the event")
zap.Uint32("first", first))
return checksum, false, nil
}

if checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL"+
"execution phase",
zap.Uint32("checksum", checksum),
zap.Uint32("extra", extra))
return checksum, version, true, nil
return checksum, true, nil
}

log.Error("checksum mismatch",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, version, false, nil
return checksum, false, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
Expand All @@ -493,6 +496,12 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
corrupted bool
)

if m.decoder != nil {
checksumVersion = m.decoder.ChecksumVersion()
} else if m.preDecoder != nil {
checksumVersion = m.preDecoder.ChecksumVersion()
}

// Decode previous columns.
var (
preCols []*model.Column
Expand All @@ -507,7 +516,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

preChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
preChecksum, matched, err = m.verifyChecksum(columnInfos, preRawCols, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -536,7 +545,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
return nil, rawRow, errors.Trace(err)
}

current, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
current, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down

0 comments on commit 47417d7

Please sign in to comment.