From b3f65600982217d5a1fcbb879c7596151f018172 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Mon, 3 Jun 2024 19:15:48 +0800 Subject: [PATCH 01/20] add output-raw-change-event para --- cdc/model/sink.go | 40 ++++------ cdc/model/sink_test.go | 17 +++- .../sinkmanager/table_sink_wrapper_test.go | 4 +- .../dmlsink/blackhole/black_hole_dml_sink.go | 4 +- .../cloudstorage/cloud_storage_dml_sink.go | 25 +++--- cdc/sink/dmlsink/event.go | 2 +- cdc/sink/dmlsink/event_sink.go | 6 +- cdc/sink/dmlsink/mq/kafka_dml_sink.go | 5 +- cdc/sink/dmlsink/mq/mq_dml_sink.go | 23 +++--- cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 5 +- cdc/sink/dmlsink/txn/txn_dml_sink.go | 4 +- cdc/sink/tablesink/table_sink_impl.go | 2 +- cdc/sink/tablesink/table_sink_impl_test.go | 4 +- pkg/config/sink.go | 78 ++++++++++++++++--- pkg/errors/cdc_errors.go | 7 +- 15 files changed, 147 insertions(+), 79 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 217df3ae825..32571e0d327 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -272,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -444,7 +444,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -1140,10 +1140,19 @@ func (t *SingleTableTxn) GetPhysicalTableID() int64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { - if !t.shouldSplitUpdateEvent(scheme) { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error { + if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent { + // For MySQL Sink, all update events will be split into insert and delete at the puller side + // according to whether the changefeed is in safemode. We don't split update event here(in sink) + // since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062. + // + // For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control + // split behavior. TiCDC only output original change event if outputRawChangeEvent is true. return nil } + + // Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false. + // Note it is only for backward compatibility, and we should remove this logic in the future. newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) @@ -1152,21 +1161,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { return nil } -// Whether split a single update event into delete and insert events? -// -// For the MySQL Sink, we don't split any update event. -// This may cause error like "duplicate entry" when sink to the downstream. -// This kind of error will cause the changefeed to restart, -// and then the related update rows will be splitted to insert and delete at puller side. -// -// For the Kafka and Storage sink, always split a single unique key changed update event, since: -// 1. Avro and CSV does not output the previous column values for the update event, so it would -// cause consumer missing data if the unique key changed event is not split. -// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. -func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - return !sink.IsMySQLCompatibleScheme(sinkScheme) -} - // trySplitAndSortUpdateEvent try to split update events if unique key is updated // returns true if some updated events is split func trySplitAndSortUpdateEvent( @@ -1176,8 +1170,7 @@ func trySplitAndSortUpdateEvent( split := false for _, e := range events { if e == nil { - log.Warn("skip emit nil event", - zap.Any("event", e)) + log.Warn("skip emit nil event", zap.Any("event", e)) continue } @@ -1187,8 +1180,7 @@ func trySplitAndSortUpdateEvent( // begin; insert into t (id) values (1); delete from t where id=1; commit; // Just ignore these row changed events. if colLen == 0 && preColLen == 0 { - log.Warn("skip emit empty row event", - zap.Any("event", e)) + log.Warn("skip emit empty row event", zap.Any("event", e)) continue } @@ -1222,7 +1214,7 @@ func isNonEmptyUniqueOrHandleCol(col *ColumnData, tableInfo *TableInfo) bool { // ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on // whether the handle key column or unique key has been modified. -// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. +// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index cd285b3fff3..f77e0f2dff0 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -604,21 +604,32 @@ func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) { Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme) + outputRawChangeEvent := true + notOutputRawChangeEvent := false + err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 2) txn = &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 1) txn2 := &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn2.Rows, 2) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 2a986ec133f..f94f889f8a5 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -52,8 +52,8 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh return nil } -func (m *mockSink) Scheme() string { - return sink.BlackHoleScheme +func (m *mockSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false } func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] { diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index be470b6d357..642a4e818bd 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -48,8 +48,8 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang } // Scheme return the scheme of the sink. -func (s *DMLSink) Scheme() string { - return sink.BlackHoleScheme +func (s *DMLSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, true } // Close do nothing. diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index e1dca4941af..d3c034f990a 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/pingcap/tiflow/pkg/sink/codec/common" putil "github.com/pingcap/tiflow/pkg/util" + utilPkg "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -72,8 +73,9 @@ type eventFragment struct { // messages to individual dmlWorkers. // The dmlWorkers will write the encoded messages to external storage in parallel between different tables. type DMLSink struct { - changefeedID model.ChangeFeedID - scheme string + changefeedID model.ChangeFeedID + scheme string + outputRawChangeEvent bool // last sequence number lastSeqNum uint64 // encodingWorkers defines a group of workers for encoding events. @@ -144,13 +146,14 @@ func NewDMLSink(ctx context.Context, wgCtx, wgCancel := context.WithCancel(ctx) s := &DMLSink{ - changefeedID: changefeedID, - scheme: strings.ToLower(sinkURI.Scheme), - encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), - workers: make([]*dmlWorker, cfg.WorkerCount), - statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink), - cancel: wgCancel, - dead: make(chan struct{}), + changefeedID: changefeedID, + scheme: strings.ToLower(sinkURI.Scheme), + outputRawChangeEvent: utilPkg.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputRawChangeEvent), + encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), + workers: make([]*dmlWorker, cfg.WorkerCount), + statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink), + cancel: wgCancel, + dead: make(chan struct{}), } s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]() @@ -296,6 +299,6 @@ func (s *DMLSink) Dead() <-chan struct{} { } // Scheme returns the sink scheme. -func (s *DMLSink) Scheme() string { - return s.scheme +func (s *DMLSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent } diff --git a/cdc/sink/dmlsink/event.go b/cdc/sink/dmlsink/event.go index 8df85cab249..7126be06bbe 100644 --- a/cdc/sink/dmlsink/event.go +++ b/cdc/sink/dmlsink/event.go @@ -23,7 +23,7 @@ type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated - TrySplitAndSortUpdateEvent(scheme string) error + TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sink/dmlsink/event_sink.go b/cdc/sink/dmlsink/event_sink.go index 76a179ecc9c..660ed190019 100644 --- a/cdc/sink/dmlsink/event_sink.go +++ b/cdc/sink/dmlsink/event_sink.go @@ -18,10 +18,8 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error - - // Scheme returns the sink scheme. - Scheme() string - + // SchemeOption returns the sink scheme and whether the sink should output raw change event. + SchemeOption() (scheme string, outputRawChangeEvent bool) // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 3841c2a7298..04208088d21 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/pingcap/tiflow/pkg/sink/kafka" tiflowutil "github.com/pingcap/tiflow/pkg/util" + utilPkg "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -122,8 +123,8 @@ func NewKafkaDMLSink( metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient) dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh) encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) - s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, - eventRouter, trans, encoderGroup, protocol, scheme, errCh) + s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup, + protocol, scheme, utilPkg.GetOrZero(replicaConfig.Sink.KafkaConfig.OutputRawChangeEvent), errCh) log.Info("DML sink producer created", zap.String("namespace", changefeedID.Namespace), zap.String("changefeedID", changefeedID.ID)) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index e6d2105b69f..5bed1893625 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -70,7 +70,8 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} - scheme string + scheme string + outputRawChangeEvent bool } func newDMLSink( @@ -84,6 +85,7 @@ func newDMLSink( encoderGroup codec.EncoderGroup, protocol config.Protocol, scheme string, + outputRawChangeEvent bool, errCh chan error, ) *dmlSink { ctx, cancel := context.WithCancelCause(ctx) @@ -91,13 +93,14 @@ func newDMLSink( worker := newWorker(changefeedID, protocol, producer, encoderGroup, statistics) s := &dmlSink{ - id: changefeedID, - protocol: protocol, - adminClient: adminClient, - ctx: ctx, - cancel: cancel, - dead: make(chan struct{}), - scheme: scheme, + id: changefeedID, + protocol: protocol, + adminClient: adminClient, + ctx: ctx, + cancel: cancel, + dead: make(chan struct{}), + scheme: scheme, + outputRawChangeEvent: outputRawChangeEvent, } s.alive.transformer = transformer s.alive.eventRouter = eventRouter @@ -236,6 +239,6 @@ func (s *dmlSink) Dead() <-chan struct{} { } // Scheme returns the scheme of this sink. -func (s *dmlSink) Scheme() string { - return s.scheme +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent } diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index 089f521fd37..faa1d031ee7 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/builder" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" tiflowutil "github.com/pingcap/tiflow/pkg/util" + utilPkg "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -123,8 +124,8 @@ func NewPulsarDMLSink( encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) - s := newDMLSink(ctx, changefeedID, p, nil, topicManager, - eventRouter, trans, encoderGroup, protocol, scheme, errCh) + s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, trans, encoderGroup, + protocol, scheme, utilPkg.GetOrZero(replicaConfig.Sink.PulsarConfig.OutputRawChangeEvent), errCh) return s, nil } diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index 964d11b6eca..f5b2cb78edb 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -180,6 +180,6 @@ func (s *dmlSink) Dead() <-chan struct{} { return s.dead } -func (s *dmlSink) Scheme() string { - return s.scheme +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, false } diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index 167f30fc948..7e6b7d7154c 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -138,7 +138,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err resolvedCallbackableEvents := make([]*dmlsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.SchemeOption()); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 87fd1af4abc..b7c55d38f52 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -50,8 +50,8 @@ func (m *mockEventSink) Dead() <-chan struct{} { return m.dead } -func (m *mockEventSink) Scheme() string { - return sink.BlackHoleScheme +func (m *mockEventSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false } // acknowledge the txn events by call the callback function. diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 44e1a3dbcd3..df7533dd2d2 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -434,6 +434,9 @@ type KafkaConfig struct { CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` } // MaskSensitiveData masks sensitive data in KafkaConfig @@ -588,6 +591,9 @@ type PulsarConfig struct { // and 'type' always use 'client_credentials' OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` + // BrokerURL is used to configure service brokerUrl for the Pulsar service. // This parameter is a part of the `sink-uri`. Internal use only. BrokerURL string `toml:"-" json:"-"` @@ -653,6 +659,9 @@ type CloudStorageConfig struct { FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { @@ -796,21 +805,68 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { return err } - // Adjust that protocol is compatible with the scheme. For testing purposes, - // any protocol should be legal for blackhole. - if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { - _, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) - if err != nil { - return err - } - } else if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != nil { + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", util.GetOrZero(s.Protocol)), + zap.String("txnAtomicity", string(util.GetOrZero(s.TxnAtomicity)))) + + // Check that protocol config is compatible with the scheme. + if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != nil { return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol %s "+ "is incompatible with %s scheme", util.GetOrZero(s.Protocol), sinkURI.Scheme)) } + // For testing purposes, any protocol should be legal for blackhole. + if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { + s.ValidateProtocol(sinkURI.Scheme) + } - log.Info("succeed to parse parameter from sink uri", - zap.String("protocol", util.GetOrZero(s.Protocol)), - zap.String("txnAtomicity", string(util.GetOrZero(s.TxnAtomicity)))) + return nil +} + +// ValidateOutputRawChangeEvent validates the output-raw-change-event configuration. +func (s *SinkConfig) ValidateProtocol(scheme string) error { + protocol, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) + if err != nil { + return err + } + outputOldValue := false + switch protocol { + case ProtocolOpen: + if s.OpenProtocol != nil { + outputOldValue = s.OpenProtocol.OutputOldValue + } + case ProtocolDebezium: + if s.Debezium != nil { + outputOldValue = s.Debezium.OutputOldValue + } + case ProtocolCsv: + if s.CSVConfig != nil { + outputOldValue = s.CSVConfig.OutputOldValue + } + case ProtocolAvro: + outputOldValue = false + default: + return nil + } + + outputRawChangeEvent := false + switch scheme { + case sink.KafkaScheme, sink.KafkaSSLScheme: + if s.KafkaConfig != nil { + outputRawChangeEvent = util.GetOrZero(s.KafkaConfig.OutputRawChangeEvent) + } + case sink.PulsarScheme, sink.PulsarSSLScheme: + if s.PulsarConfig != nil { + outputRawChangeEvent = util.GetOrZero(s.PulsarConfig.OutputRawChangeEvent) + } + default: + if sink.IsStorageScheme(scheme) && s.CloudStorageConfig != nil { + outputRawChangeEvent = util.GetOrZero(s.CloudStorageConfig.OutputRawChangeEvent) + } + } + + if outputRawChangeEvent && !outputOldValue { + return cerror.ErrIncompatibleSinkConfig.GenWithStack("output-raw-change-event is true, but output-old-value is false") + } return nil } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index e3a2fada8b5..6e055b10d4a 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -316,8 +316,7 @@ var ( errors.RFCCodeText("CDC:ErrSinkURIInvalid"), ) ErrIncompatibleSinkConfig = errors.Normalize( - "incompatible configuration in sink uri(%s) and config file(%s), "+ - "please try to update the configuration only through sink uri", + "incompatible configuration '%s'", errors.RFCCodeText("CDC:ErrIncompatibleSinkConfig"), ) ErrSinkUnknownProtocol = errors.Normalize( @@ -400,6 +399,10 @@ var ( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), ) + ErrSinkIncompatibleConfig = errors.Normalize( + "incompatible configuration %s", + errors.RFCCodeText("CDC:ErrSinkIncompatibleConfig"), + ) ErrCraftCodecInvalidData = errors.Normalize( "craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData"), From fc0432f9252f9c076bd4e3b0ab36176091f9de75 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 13:38:47 +0800 Subject: [PATCH 02/20] fix log --- pkg/config/sink.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index df7533dd2d2..3990cfa1c3c 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -134,8 +134,6 @@ type SinkConfig struct { // DispatchRules is only available when the downstream is MQ. DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"` - // CSVConfig is only available when the downstream is Storage. - CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"` // SchemaRegistry is only available when the downstream is MQ using avro protocol. @@ -194,9 +192,10 @@ type SinkConfig struct { // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"` + // CSVConfig is only available when the downstream is Storage. + CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"` // OpenProtocol related configurations OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"` - // DebeziumConfig related configurations Debezium *DebeziumConfig `toml:"debezium" json:"debezium,omitempty"` } @@ -816,9 +815,8 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { } // For testing purposes, any protocol should be legal for blackhole. if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { - s.ValidateProtocol(sinkURI.Scheme) + return s.ValidateProtocol(sinkURI.Scheme) } - return nil } @@ -865,7 +863,10 @@ func (s *SinkConfig) ValidateProtocol(scheme string) error { } if outputRawChangeEvent && !outputOldValue { - return cerror.ErrIncompatibleSinkConfig.GenWithStack("output-raw-change-event is true, but output-old-value is false") + // TODO: return error if we do not need to keep backward compatibility. + log.Warn(fmt.Sprintf("TiCDC will not split the update pk/uk events if output-raw-change-event is true(scheme: %s).", scheme) + + fmt.Sprintf("It is not recommended to set output-old-value to false(protocol: %s) in this case.", protocol) + + "Otherwise, there may be data consistency issues in update pk/uk scenarios due to lack of old value information.") } return nil } From 0edd61ed5fa319fc75bbfe2a39e0a32bd835c7a2 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 15:19:44 +0800 Subject: [PATCH 03/20] add csv test --- .../conf/changefeed1.toml | 27 ++++++++++ .../conf/changefeed2.toml | 26 ++++++++++ .../conf/diff_config.toml | 29 +++++++++++ .../csv_storage_update_pk/data/prepare.sql | 24 +++++++++ .../csv_storage_update_pk/data/run.sql | 40 ++++++++++++++ .../csv_storage_update_pk/run.sh | 52 +++++++++++++++++++ 6 files changed, 198 insertions(+) create mode 100644 tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml create mode 100644 tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml create mode 100644 tests/integration_tests/csv_storage_update_pk/conf/diff_config.toml create mode 100644 tests/integration_tests/csv_storage_update_pk/data/prepare.sql create mode 100644 tests/integration_tests/csv_storage_update_pk/data/run.sql create mode 100644 tests/integration_tests/csv_storage_update_pk/run.sh diff --git a/tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml new file mode 100644 index 00000000000..aa0b633e7c9 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml @@ -0,0 +1,27 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# TiCDC split update pk/uk events into delete and insert events. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml new file mode 100644 index 00000000000..7575640c83b --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml @@ -0,0 +1,26 @@ +# Case 2: Split all update events into delete and insert events. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk/conf/diff_config.toml new file mode 100644 index 00000000000..4a265cbeda9 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_basic/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk/data/prepare.sql new file mode 100644 index 00000000000..02cdf938288 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk/data/prepare.sql @@ -0,0 +1,24 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int NOT NULL, + `pad` varchar(100) NOT NULL, + PRIMARY KEY (`id`) +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + +CREATE TABLE `update_uk` ( + `id` int NOT NULL, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + PRIMARY KEY (`id`) + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_pk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_pk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_pk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/data/run.sql b/tests/integration_tests/csv_storage_update_pk/data/run.sql new file mode 100644 index 00000000000..beec9e00928 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange with order dependency +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange with order dependency +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/run.sh b/tests/integration_tests/csv_storage_update_pk/run.sh new file mode 100644 index 00000000000..16e22eddc4c --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk/run.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# function run_consumer() { +# SINK_URI=$1 +# CONFIG_FILE=$2 + +# # run_sql "DROP DATABASE IF EXISTS test;create database `test`;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +# run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed1.toml "" +# sleep 8 +# check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +# } + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI1="file://$WORK_DIR/storage_test/changefeed1?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI1" --config=$CUR/conf/changefeed1.toml -c "changefeed1" + SINK_URI2="file://$WORK_DIR/storage_test/changefeed2?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI1" --config=$CUR/conf/changefeed2.toml -c "changefeed2" + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 6 + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed1.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed2.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From f3db300e16fbd96b84837491e2efa5be7c540695 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 16:10:09 +0800 Subject: [PATCH 04/20] fix panic --- .../cloudstorage/cloud_storage_dml_sink.go | 3 +- cdc/sink/dmlsink/mq/kafka_dml_sink.go | 3 +- cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 3 +- pkg/config/sink.go | 35 ++++++++++++++----- pkg/sink/pulsar/config.go | 1 + 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index d3c034f990a..938be24392e 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/pingcap/tiflow/pkg/sink/codec/common" putil "github.com/pingcap/tiflow/pkg/util" - utilPkg "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -148,7 +147,7 @@ func NewDMLSink(ctx context.Context, s := &DMLSink{ changefeedID: changefeedID, scheme: strings.ToLower(sinkURI.Scheme), - outputRawChangeEvent: utilPkg.GetOrZero(replicaConfig.Sink.CloudStorageConfig.OutputRawChangeEvent), + outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(), encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), workers: make([]*dmlWorker, cfg.WorkerCount), statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink), diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 04208088d21..a0b0c4193cc 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/pingcap/tiflow/pkg/sink/kafka" tiflowutil "github.com/pingcap/tiflow/pkg/util" - utilPkg "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -124,7 +123,7 @@ func NewKafkaDMLSink( dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh) encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup, - protocol, scheme, utilPkg.GetOrZero(replicaConfig.Sink.KafkaConfig.OutputRawChangeEvent), errCh) + protocol, scheme, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh) log.Info("DML sink producer created", zap.String("namespace", changefeedID.Namespace), zap.String("changefeedID", changefeedID.ID)) diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index faa1d031ee7..f9934b59147 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/builder" pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" tiflowutil "github.com/pingcap/tiflow/pkg/util" - utilPkg "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -125,7 +124,7 @@ func NewPulsarDMLSink( encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, trans, encoderGroup, - protocol, scheme, utilPkg.GetOrZero(replicaConfig.Sink.PulsarConfig.OutputRawChangeEvent), errCh) + protocol, scheme, pConfig.GetOutputRawChangeEvent(), errCh) return s, nil } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 3990cfa1c3c..b9d69bde151 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -438,6 +438,13 @@ type KafkaConfig struct { OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` } +func (k *KafkaConfig) GetOutputRawChangeEvent() bool { + if k == nil || k.OutputRawChangeEvent == nil { + return false + } + return *k.OutputRawChangeEvent +} + // MaskSensitiveData masks sensitive data in KafkaConfig func (k *KafkaConfig) MaskSensitiveData() { k.SASLPassword = aws.String("******") @@ -600,6 +607,14 @@ type PulsarConfig struct { SinkURI *url.URL `toml:"-" json:"-"` } +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (p *PulsarConfig) GetOutputRawChangeEvent() bool { + if p == nil || p.OutputRawChangeEvent == nil { + return false + } + return *p.OutputRawChangeEvent +} + // MaskSensitiveData masks sensitive data in PulsarConfig func (c *PulsarConfig) MaskSensitiveData() { if c.AuthenticationToken != nil { @@ -663,6 +678,14 @@ type CloudStorageConfig struct { OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` } +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool { + if c == nil || c.OutputRawChangeEvent == nil { + return false + } + return *c.OutputRawChangeEvent +} + func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err @@ -849,17 +872,11 @@ func (s *SinkConfig) ValidateProtocol(scheme string) error { outputRawChangeEvent := false switch scheme { case sink.KafkaScheme, sink.KafkaSSLScheme: - if s.KafkaConfig != nil { - outputRawChangeEvent = util.GetOrZero(s.KafkaConfig.OutputRawChangeEvent) - } + outputRawChangeEvent = s.KafkaConfig.GetOutputRawChangeEvent() case sink.PulsarScheme, sink.PulsarSSLScheme: - if s.PulsarConfig != nil { - outputRawChangeEvent = util.GetOrZero(s.PulsarConfig.OutputRawChangeEvent) - } + outputRawChangeEvent = s.PulsarConfig.GetOutputRawChangeEvent() default: - if sink.IsStorageScheme(scheme) && s.CloudStorageConfig != nil { - outputRawChangeEvent = util.GetOrZero(s.CloudStorageConfig.OutputRawChangeEvent) - } + outputRawChangeEvent = s.CloudStorageConfig.GetOutputRawChangeEvent() } if outputRawChangeEvent && !outputOldValue { diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index 6859fd2e561..edfbef27acd 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -67,6 +67,7 @@ func NewPulsarConfig(sinkURI *url.URL, pulsarConfig *config.PulsarConfig) (*conf c.SinkURI = sinkURI c.BrokerURL = sinkURI.Scheme + "://" + sinkURI.Host + c.OutputRawChangeEvent = pulsarConfig.OutputRawChangeEvent if pulsarConfig == nil { log.L().Debug("new pulsar config", zap.Any("config", c)) From cd87e9334fcf15d2f813baa9744818de545bf037 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 18:42:20 +0800 Subject: [PATCH 05/20] fix it --- .../csv_storage_update_pk/conf/changefeed2.toml | 6 +++--- .../csv_storage_update_pk/data/prepare.sql | 9 +++++---- tests/integration_tests/csv_storage_update_pk/run.sh | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml index 7575640c83b..2fcfaa33387 100644 --- a/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml +++ b/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml @@ -9,8 +9,6 @@ protocol = "csv" terminator = "\n" # Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. date-separator = 'day' -# Whether to output the value before the row data changes. The default value is false. -output-old-value = true [sink.cloud-storage-config] output-raw-change-event = true @@ -23,4 +21,6 @@ quote = '"' # Representation of null values in CSV files, the default value is '\N' null = '\N' # Include commit-ts in the row data. The default value is false. -include-commit-ts = true \ No newline at end of file +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk/data/prepare.sql index 02cdf938288..d1c848c4a7b 100644 --- a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql +++ b/tests/integration_tests/csv_storage_update_pk/data/prepare.sql @@ -16,9 +16,10 @@ CREATE TABLE `update_uk` ( `id` int NOT NULL, `uk` int NOT NULL, `pad` varchar(100) NOT NULL, - PRIMARY KEY (`id`) + PRIMARY KEY (`id`), UNIQUE KEY `uk` (`uk`) ); -INSERT INTO `update_pk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); -INSERT INTO `update_pk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); -INSERT INTO `update_pk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); \ No newline at end of file +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/run.sh b/tests/integration_tests/csv_storage_update_pk/run.sh index 16e22eddc4c..547c8c8cb66 100644 --- a/tests/integration_tests/csv_storage_update_pk/run.sh +++ b/tests/integration_tests/csv_storage_update_pk/run.sh @@ -31,7 +31,7 @@ function run() { SINK_URI1="file://$WORK_DIR/storage_test/changefeed1?flush-interval=5s" run_cdc_cli changefeed create --sink-uri="$SINK_URI1" --config=$CUR/conf/changefeed1.toml -c "changefeed1" SINK_URI2="file://$WORK_DIR/storage_test/changefeed2?flush-interval=5s" - run_cdc_cli changefeed create --sink-uri="$SINK_URI1" --config=$CUR/conf/changefeed2.toml -c "changefeed2" + run_cdc_cli changefeed create --sink-uri="$SINK_URI2" --config=$CUR/conf/changefeed2.toml -c "changefeed2" run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} sleep 6 @@ -41,7 +41,7 @@ function run() { sleep 8 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 - run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed2.toml "" + run_storage_consumer $WORK_DIR $SINK_URI2 $CUR/conf/changefeed2.toml "" sleep 8 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 } From 7180ff53a9c2cc16904c99d08f4d083c0c278068 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 19:02:50 +0800 Subject: [PATCH 06/20] fix --- cdc/model/sink.go | 7 +++++++ tests/integration_tests/csv_storage_update_pk/run.sh | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 32571e0d327..7d12b07d9a0 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1257,6 +1257,13 @@ func SplitUpdateEvent( // NOTICE: clean up pre cols for insert event. insertEvent.PreColumns = nil + log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs), + zap.Uint64("commitTs", updateEvent.CommitTs), + zap.String("schema", updateEvent.TableInfo.TableName.Schema), + zap.String("table", updateEvent.TableInfo.GetTableName()), + zap.Any("preCols", updateEvent.PreColumns), + zap.Any("cols", updateEvent.Columns)) + return &deleteEvent, &insertEvent, nil } diff --git a/tests/integration_tests/csv_storage_update_pk/run.sh b/tests/integration_tests/csv_storage_update_pk/run.sh index 547c8c8cb66..e922c389d8f 100644 --- a/tests/integration_tests/csv_storage_update_pk/run.sh +++ b/tests/integration_tests/csv_storage_update_pk/run.sh @@ -34,7 +34,7 @@ function run() { run_cdc_cli changefeed create --sink-uri="$SINK_URI2" --config=$CUR/conf/changefeed2.toml -c "changefeed2" run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - sleep 6 + sleep 10 run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed1.toml "" From 661f128999738394c3b7e4345d74eaaa8141a7c2 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 22:17:52 +0800 Subject: [PATCH 07/20] fix apiv2 --- cdc/api/v2/model.go | 51 +++++++++++-------- .../cloudstorage/cloud_storage_dml_sink.go | 2 +- .../csv_storage_update_pk/data/prepare.sql | 14 ++--- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 5bb6e356870..6a224ca2ee3 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -317,6 +317,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( BasicPassword: c.Sink.PulsarConfig.BasicPassword, AuthTLSCertificatePath: c.Sink.PulsarConfig.AuthTLSCertificatePath, AuthTLSPrivateKeyPath: c.Sink.PulsarConfig.AuthTLSPrivateKeyPath, + OutputRawChangeEvent: c.Sink.PulsarConfig.OutputRawChangeEvent, } if c.Sink.PulsarConfig.OAuth2 != nil { pulsarConfig.OAuth2 = &config.OAuth2{ @@ -402,6 +403,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( CodecConfig: codeConfig, LargeMessageHandle: largeMessageHandle, GlueSchemaRegistryConfig: glueSchemaRegistryConfig, + OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent, } } var mysqlConfig *config.MySQLConfig @@ -427,13 +429,14 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, - FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent, } } var debeziumConfig *config.DebeziumConfig @@ -666,6 +669,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { CodecConfig: codeConfig, LargeMessageHandle: largeMessageHandle, GlueSchemaRegistryConfig: glueSchemaRegistryConfig, + OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent, } } var mysqlConfig *MySQLConfig @@ -708,6 +712,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { BasicPassword: cloned.Sink.PulsarConfig.BasicPassword, AuthTLSCertificatePath: cloned.Sink.PulsarConfig.AuthTLSCertificatePath, AuthTLSPrivateKeyPath: cloned.Sink.PulsarConfig.AuthTLSPrivateKeyPath, + OutputRawChangeEvent: cloned.Sink.PulsarConfig.OutputRawChangeEvent, } if cloned.Sink.PulsarConfig.OAuth2 != nil { pulsarConfig.OAuth2 = &PulsarOAuth2{ @@ -722,13 +727,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, - FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent, } } var debeziumConfig *DebeziumConfig @@ -1194,6 +1200,7 @@ type PulsarConfig struct { AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"` AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"` OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"` + OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty"` } // PulsarOAuth2 is the configuration for OAuth2 @@ -1243,6 +1250,7 @@ type KafkaConfig struct { CodecConfig *CodecConfig `json:"codec_config,omitempty"` LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // MySQLConfig represents a MySQL sink configuration @@ -1266,13 +1274,14 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` - FileExpirationDays *int `json:"file_expiration_days,omitempty"` - FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` - FlushConcurrency *int `json:"flush_concurrency,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 938be24392e..876411d9817 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -299,5 +299,5 @@ func (s *DMLSink) Dead() <-chan struct{} { // Scheme returns the sink scheme. func (s *DMLSink) SchemeOption() (string, bool) { - return s.scheme, s.outputRawChangeEvent + return s.scheme, true } diff --git a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk/data/prepare.sql index d1c848c4a7b..c1114facbd9 100644 --- a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql +++ b/tests/integration_tests/csv_storage_update_pk/data/prepare.sql @@ -3,23 +3,25 @@ create database `test`; use `test`; CREATE TABLE `update_pk` ( - `id` int NOT NULL, - `pad` varchar(100) NOT NULL, - PRIMARY KEY (`id`) + `id` int PRIMARY KEY NONCLUSTERED, + `pad` varchar(100) NOT NULL ); INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); +SHOW INDEX FROM update_pk; + CREATE TABLE `update_uk` ( - `id` int NOT NULL, + `id` int PRIMARY KEY NONCLUSTERED, `uk` int NOT NULL, `pad` varchar(100) NOT NULL, - PRIMARY KEY (`id`), UNIQUE KEY `uk` (`uk`) ); INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); -INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); \ No newline at end of file +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file From f52887a3308cd9543ededb400b954255eb8d7c1a Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 4 Jun 2024 22:41:57 +0800 Subject: [PATCH 08/20] update --- cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 876411d9817..938be24392e 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -299,5 +299,5 @@ func (s *DMLSink) Dead() <-chan struct{} { // Scheme returns the sink scheme. func (s *DMLSink) SchemeOption() (string, bool) { - return s.scheme, true + return s.scheme, s.outputRawChangeEvent } From 5d305fdb5bdce32f81f837d2a81fa0ae261f1060 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 00:25:28 +0800 Subject: [PATCH 09/20] fix lint --- .../dmlsink/blackhole/black_hole_dml_sink.go | 2 +- .../cloudstorage/cloud_storage_dml_sink.go | 2 +- pkg/config/sink.go | 9 ++++--- pkg/sink/pulsar/config.go | 1 - .../conf/changefeed1.toml | 2 +- .../conf/changefeed2.toml | 2 +- .../conf/changefeed3.toml | 26 +++++++++++++++++++ .../conf/changefeed4.toml | 26 +++++++++++++++++++ .../conf/diff_config.toml | 0 .../data/prepare.sql | 4 +-- .../data/run.sql | 0 .../run.sh | 22 +++++++++------- 12 files changed, 75 insertions(+), 21 deletions(-) rename tests/integration_tests/{csv_storage_update_pk => csv_storage_update_pk_clustered}/conf/changefeed1.toml (93%) rename tests/integration_tests/{csv_storage_update_pk => csv_storage_update_pk_clustered}/conf/changefeed2.toml (92%) create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml rename tests/integration_tests/{csv_storage_update_pk => csv_storage_update_pk_clustered}/conf/diff_config.toml (100%) rename tests/integration_tests/{csv_storage_update_pk => csv_storage_update_pk_clustered}/data/prepare.sql (92%) rename tests/integration_tests/{csv_storage_update_pk => csv_storage_update_pk_clustered}/data/run.sql (100%) rename tests/integration_tests/{csv_storage_update_pk => csv_storage_update_pk_clustered}/run.sh (67%) diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index 642a4e818bd..83f321dcdfd 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -47,7 +47,7 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang return } -// Scheme return the scheme of the sink. +// SchemeOption returns the scheme and the option. func (s *DMLSink) SchemeOption() (string, bool) { return sink.BlackHoleScheme, true } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 938be24392e..f93315305e6 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -297,7 +297,7 @@ func (s *DMLSink) Dead() <-chan struct{} { return s.dead } -// Scheme returns the sink scheme. +// SchemeOption returns the scheme and the option. func (s *DMLSink) SchemeOption() (string, bool) { return s.scheme, s.outputRawChangeEvent } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b9d69bde151..9954fa4f51f 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -438,6 +438,7 @@ type KafkaConfig struct { OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` } +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent func (k *KafkaConfig) GetOutputRawChangeEvent() bool { if k == nil || k.OutputRawChangeEvent == nil { return false @@ -608,11 +609,11 @@ type PulsarConfig struct { } // GetOutputRawChangeEvent returns the value of OutputRawChangeEvent -func (p *PulsarConfig) GetOutputRawChangeEvent() bool { - if p == nil || p.OutputRawChangeEvent == nil { +func (c *PulsarConfig) GetOutputRawChangeEvent() bool { + if c == nil || c.OutputRawChangeEvent == nil { return false } - return *p.OutputRawChangeEvent + return *c.OutputRawChangeEvent } // MaskSensitiveData masks sensitive data in PulsarConfig @@ -843,7 +844,7 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { return nil } -// ValidateOutputRawChangeEvent validates the output-raw-change-event configuration. +// ValidateProtocol validates the protocol configuration. func (s *SinkConfig) ValidateProtocol(scheme string) error { protocol, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) if err != nil { diff --git a/pkg/sink/pulsar/config.go b/pkg/sink/pulsar/config.go index edfbef27acd..6859fd2e561 100644 --- a/pkg/sink/pulsar/config.go +++ b/pkg/sink/pulsar/config.go @@ -67,7 +67,6 @@ func NewPulsarConfig(sinkURI *url.URL, pulsarConfig *config.PulsarConfig) (*conf c.SinkURI = sinkURI c.BrokerURL = sinkURI.Scheme + "://" + sinkURI.Host - c.OutputRawChangeEvent = pulsarConfig.OutputRawChangeEvent if pulsarConfig == nil { log.L().Debug("new pulsar config", zap.Any("config", c)) diff --git a/tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml similarity index 93% rename from tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml rename to tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml index aa0b633e7c9..ff4fe5d6765 100644 --- a/tests/integration_tests/csv_storage_update_pk/conf/changefeed1.toml +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml @@ -1,5 +1,5 @@ # Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` -# TiCDC split update pk/uk events into delete and insert events. +# Split and sort update pk/uk events in table sink. [filter] rules = ['test.*'] diff --git a/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml similarity index 92% rename from tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml rename to tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml index 2fcfaa33387..96057f4dd48 100644 --- a/tests/integration_tests/csv_storage_update_pk/conf/changefeed2.toml +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml @@ -1,4 +1,4 @@ -# Case 2: Split all update events into delete and insert events. +# Case 2: Split all update events in csv encoder. [filter] rules = ['test.*'] diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml new file mode 100644 index 00000000000..033e2bd1a4f --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml @@ -0,0 +1,26 @@ +# Case 3: Don't split any update event + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml new file mode 100644 index 00000000000..112cfe4a012 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml @@ -0,0 +1,26 @@ +# Case 4: Split and sort update pk/uk events in table sink. Split other update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml similarity index 100% rename from tests/integration_tests/csv_storage_update_pk/conf/diff_config.toml rename to tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml diff --git a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql similarity index 92% rename from tests/integration_tests/csv_storage_update_pk/data/prepare.sql rename to tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql index c1114facbd9..506a6e75765 100644 --- a/tests/integration_tests/csv_storage_update_pk/data/prepare.sql +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql @@ -3,7 +3,7 @@ create database `test`; use `test`; CREATE TABLE `update_pk` ( - `id` int PRIMARY KEY NONCLUSTERED, + `id` int PRIMARY KEY CLUSTERED, `pad` varchar(100) NOT NULL ); INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); @@ -14,7 +14,7 @@ INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); SHOW INDEX FROM update_pk; CREATE TABLE `update_uk` ( - `id` int PRIMARY KEY NONCLUSTERED, + `id` int PRIMARY KEY CLUSTERED, `uk` int NOT NULL, `pad` varchar(100) NOT NULL, UNIQUE KEY `uk` (`uk`) diff --git a/tests/integration_tests/csv_storage_update_pk/data/run.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql similarity index 100% rename from tests/integration_tests/csv_storage_update_pk/data/run.sql rename to tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql diff --git a/tests/integration_tests/csv_storage_update_pk/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh similarity index 67% rename from tests/integration_tests/csv_storage_update_pk/run.sh rename to tests/integration_tests/csv_storage_update_pk_clustered/run.sh index e922c389d8f..022dc5fa8df 100644 --- a/tests/integration_tests/csv_storage_update_pk/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -8,16 +8,6 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -# function run_consumer() { -# SINK_URI=$1 -# CONFIG_FILE=$2 - -# # run_sql "DROP DATABASE IF EXISTS test;create database `test`;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} -# run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed1.toml "" -# sleep 8 -# check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 -# } - function run() { if [ "$SINK_TYPE" != "storage" ]; then return @@ -32,6 +22,10 @@ function run() { run_cdc_cli changefeed create --sink-uri="$SINK_URI1" --config=$CUR/conf/changefeed1.toml -c "changefeed1" SINK_URI2="file://$WORK_DIR/storage_test/changefeed2?flush-interval=5s" run_cdc_cli changefeed create --sink-uri="$SINK_URI2" --config=$CUR/conf/changefeed2.toml -c "changefeed2" + SINK_URI3="file://$WORK_DIR/storage_test/changefeed3?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI3" --config=$CUR/conf/changefeed3.toml -c "changefeed3" + SINK_URI4="file://$WORK_DIR/storage_test/changefeed4?flush-interval=5s" + run_cdc_cli changefeed create --sink-uri="$SINK_URI4" --config=$CUR/conf/changefeed4.toml -c "changefeed4" run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} sleep 10 @@ -44,6 +38,14 @@ function run() { run_storage_consumer $WORK_DIR $SINK_URI2 $CUR/conf/changefeed2.toml "" sleep 8 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + run_storage_consumer $WORK_DIR $SINK_URI3 $CUR/conf/changefeed3.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + run_storage_consumer $WORK_DIR $SINK_URI4 $CUR/conf/changefeed4.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 } trap stop_tidb_cluster EXIT From bfd769556f60c83cc696f1132768a71fc0ebcb21 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 00:53:21 +0800 Subject: [PATCH 10/20] fix tests --- errors.toml | 7 ++- .../data/run.sql | 4 +- .../csv_storage_update_pk_clustered/run.sh | 47 +++++++++---------- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/errors.toml b/errors.toml index ac4413f463f..bf22a8b5323 100755 --- a/errors.toml +++ b/errors.toml @@ -363,7 +363,7 @@ illegal parameter for sorter: %s ["CDC:ErrIncompatibleSinkConfig"] error = ''' -incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri +incompatible configuration '%s' ''' ["CDC:ErrInternalCheckFailed"] @@ -916,6 +916,11 @@ error = ''' cdc server is not ready ''' +["CDC:ErrSinkIncompatibleConfig"] +error = ''' +incompatible configuration %s +''' + ["CDC:ErrSinkInvalidConfig"] error = ''' sink config invalid diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql index beec9e00928..86e7d7e7d77 100644 --- a/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql @@ -2,7 +2,7 @@ USE `test`; -- update_pk -- -BEGIN; -- Note: multi-row exchange with order dependency +BEGIN; -- Note: multi-row exchange UPDATE update_pk SET id = 3 WHERE id = 1; UPDATE update_pk SET id = 1 WHERE id = 2; UPDATE update_pk SET id = 2 WHERE id = 3; @@ -21,7 +21,7 @@ COMMIT; UPDATE update_pk SET pad='example1001' WHERE id = 1000; -- update_uk -- -BEGIN; -- Note: multi-row exchange with order dependency +BEGIN; -- Note: multi-row exchange UPDATE update_uk SET uk = 3 WHERE uk = 1; UPDATE update_uk SET uk = 1 WHERE uk = 2; UPDATE update_uk SET uk = 2 WHERE uk = 3; diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh index 022dc5fa8df..aa18510b263 100644 --- a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -8,6 +8,24 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml "" + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi +} + function run() { if [ "$SINK_TYPE" != "storage" ]; then return @@ -18,34 +36,15 @@ function run() { cd $WORK_DIR run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - SINK_URI1="file://$WORK_DIR/storage_test/changefeed1?flush-interval=5s" - run_cdc_cli changefeed create --sink-uri="$SINK_URI1" --config=$CUR/conf/changefeed1.toml -c "changefeed1" - SINK_URI2="file://$WORK_DIR/storage_test/changefeed2?flush-interval=5s" - run_cdc_cli changefeed create --sink-uri="$SINK_URI2" --config=$CUR/conf/changefeed2.toml -c "changefeed2" - SINK_URI3="file://$WORK_DIR/storage_test/changefeed3?flush-interval=5s" - run_cdc_cli changefeed create --sink-uri="$SINK_URI3" --config=$CUR/conf/changefeed3.toml -c "changefeed3" - SINK_URI4="file://$WORK_DIR/storage_test/changefeed4?flush-interval=5s" - run_cdc_cli changefeed create --sink-uri="$SINK_URI4" --config=$CUR/conf/changefeed4.toml -c "changefeed4" + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - sleep 10 run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_storage_consumer $WORK_DIR $SINK_URI1 $CUR/conf/changefeed1.toml "" - sleep 8 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 - - run_storage_consumer $WORK_DIR $SINK_URI2 $CUR/conf/changefeed2.toml "" - sleep 8 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 - - run_storage_consumer $WORK_DIR $SINK_URI3 $CUR/conf/changefeed3.toml "" - sleep 8 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 - - run_storage_consumer $WORK_DIR $SINK_URI4 $CUR/conf/changefeed4.toml "" - sleep 8 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + run_changefeed "changefeed1" $start_ts 5 + run_changefeed "changefeed2" $start_ts 5 + run_changefeed "changefeed3" $start_ts 5 + run_changefeed "changefeed4" $start_ts 10 } trap stop_tidb_cluster EXIT From a02b698eb0b756547d6415f1ce186fde9b6b6087 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 01:26:41 +0800 Subject: [PATCH 11/20] add test --- .../conf/diff_config.toml | 2 +- .../csv_storage_update_pk_clustered/run.sh | 2 +- .../conf/changefeed1.toml | 27 +++++++++ .../conf/changefeed2.toml | 26 +++++++++ .../conf/changefeed3.toml | 26 +++++++++ .../conf/changefeed4.toml | 26 +++++++++ .../conf/diff_config.toml | 29 ++++++++++ .../data/prepare.sql | 28 +++++++++ .../data/run.sql | 40 +++++++++++++ .../csv_storage_update_pk_nonclustered/run.sh | 58 +++++++++++++++++++ tests/integration_tests/run_group.sh | 2 +- 11 files changed, 263 insertions(+), 3 deletions(-) create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml index 4a265cbeda9..8edf2368fa4 100644 --- a/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/csv_storage_basic/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_clustered/sync_diff/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh index aa18510b263..a7cd121f6c7 100644 --- a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -15,7 +15,7 @@ function run_changefeed() { SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" - run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml "" + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id sleep 8 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml new file mode 100644 index 00000000000..ff4fe5d6765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml @@ -0,0 +1,27 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml new file mode 100644 index 00000000000..96057f4dd48 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml @@ -0,0 +1,26 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml new file mode 100644 index 00000000000..033e2bd1a4f --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml @@ -0,0 +1,26 @@ +# Case 3: Don't split any update event + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml new file mode 100644 index 00000000000..112cfe4a012 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml @@ -0,0 +1,26 @@ +# Case 4: Split and sort update pk/uk events in table sink. Split other update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml new file mode 100644 index 00000000000..f632bdbb98d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_nonclustered/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql new file mode 100644 index 00000000000..f3cd4ca4d24 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql @@ -0,0 +1,28 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh new file mode 100644 index 00000000000..d3c32955148 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + local should_pass_check=$4 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + if [[ $should_pass_check == true ]]; then + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + else + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" + fi + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 10 true + run_changefeed "changefeed2" $start_ts 10 true + run_changefeed "changefeed3" $start_ts 10 false + run_changefeed "changefeed4" $start_ts 20 true +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index bbed3165515..ebcb416811e 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -19,7 +19,7 @@ kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error kafka_only_protocol="kafka_simple_basic kafka_simple_basic_avro kafka_simple_handle_key_only kafka_simple_handle_key_only_avro kafka_simple_claim_check kafka_simple_claim_check_avro canal_json_adapter_compatibility canal_json_basic canal_json_content_compatible multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check" kafka_only_v2="kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" -storage_only="lossy_ddl storage_csv_update" +storage_only="lossy_ddl storage_csv_update csv_storage_update_pk_clustered csv_storage_update_pk_nonclustered" storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table" From debfa12666fe15473db5dd3d9a71c32c6d3bab04 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 10:24:17 +0800 Subject: [PATCH 12/20] fix lint --- .../integration_tests/csv_storage_update_pk_clustered/run.sh | 2 +- .../csv_storage_update_pk_nonclustered/run.sh | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh index a7cd121f6c7..1b9329bb058 100644 --- a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -20,7 +20,7 @@ function run_changefeed() { check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) - if [[ $real_split_count -ne $expected_split_count ]]; then + if [[ $real_split_count -ne $expected_split_count ]]; then echo "expected split count $expected_split_count, real split count $real_split_count" exit 1 fi diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh index d3c32955148..3a88e00cb24 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -20,15 +20,16 @@ function run_changefeed() { sleep 8 if [[ $should_pass_check == true ]]; then check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 - else + else check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" fi real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) - if [[ $real_split_count -ne $expected_split_count ]]; then + if [[ $real_split_count -ne $expected_split_count ]]; then echo "expected split count $expected_split_count, real split count $real_split_count" exit 1 fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} } function run() { From ccff9a5b36855ea5d965d60802eedb1342d540c8 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 13:36:10 +0800 Subject: [PATCH 13/20] fix test --- tests/integration_tests/csv_storage_update_pk_clustered/run.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh index 1b9329bb058..8606518da0a 100644 --- a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -24,6 +24,7 @@ function run_changefeed() { echo "expected split count $expected_split_count, real split count $real_split_count" exit 1 fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} } function run() { From fb026016ed446d30f07f0765f04909b2e30ba488 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 13:56:25 +0800 Subject: [PATCH 14/20] fix decoder --- pkg/sink/codec/csv/csv_message.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index ccea08f280f..32486b2fe82 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -174,6 +174,11 @@ func (c *csvMessage) decode(datums []types.Datum) error { } else { c.commitTs = 0 } + if c.config.OutputOldValue { + // When c.config.OutputOldValue, we need an extra column "is-updated". + // TODO: use this flag to guarantee data consistency in update uk/pk scenario. + dataColIdx++ + } c.columns = c.columns[:0] for i := dataColIdx; i < len(datums); i++ { From 2e63a2b63d3f13a898ecc48df83de3084a1d9a40 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 17:58:29 +0800 Subject: [PATCH 15/20] add test result --- .../result/changefeed1_pk.res | 24 +++++++++++++++++ .../result/changefeed1_uk.res | 24 +++++++++++++++++ .../result/changefeed2_pk.res | 26 +++++++++++++++++++ .../result/changefeed2_uk.res | 26 +++++++++++++++++++ .../result/changefeed3_pk.res | 14 ++++++++++ .../result/changefeed3_uk.res | 14 ++++++++++ .../result/changefeed4_pk.res | 26 +++++++++++++++++++ .../result/changefeed4_uk.res | 26 +++++++++++++++++++ .../csv_storage_update_pk_nonclustered/run.sh | 4 ++- 9 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..08f6eedb804 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,1,"example1" +"D","update_pk","test",450250823807270922,2,"example2" +"I","update_pk","test",450250823807270922,2,"example1" +"I","update_pk","test",450250823807270922,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,10,"example10" +"D","update_pk","test",450250823807270925,20,"example20" +"I","update_pk","test",450250823807270925,30,"example10" +"I","update_pk","test",450250823807270925,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,100,"example100" +"I","update_pk","test",450250823807270927,200,"example100" + +"U","update_pk","test",450250823807270928,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..b26f2219af2 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,1,1,"example1" +"D","update_uk","test",450250823807270931,2,2,"example2" +"I","update_uk","test",450250823807270931,1,2,"example1" +"I","update_uk","test",450250823807270931,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,10,10,"example10" +"D","update_uk","test",450250823820115970,20,20,"example20" +"I","update_uk","test",450250823820115970,10,30,"example10" +"I","update_uk","test",450250823820115970,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,100,100,"example100" +"I","update_uk","test",450250823820115973,100,200,"example100" + +"U","update_uk","test",450250823820115977,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..001401036fe --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split in csv encoder, lost id=2 since delete are not sorted before insert within single txn +"D","update_pk","test",450250823807270922,true,1,"example1" +"I","update_pk","test",450250823807270922,true,2,"example1" +"D","update_pk","test",450250823807270922,true,2,"example2" +"I","update_pk","test",450250823807270922,true,1,"example2" + +# split in csv encoder +"D","update_pk","test",450250823807270925,true,10,"example10" +"I","update_pk","test",450250823807270925,true,30,"example10" +"D","update_pk","test",450250823807270925,true,20,"example20" +"I","update_pk","test",450250823807270925,true,40,"example20" + +# split in csv encoder +"D","update_pk","test",450250823807270927,true,100,"example100" +"I","update_pk","test",450250823807270927,true,200,"example100" + +# normal update event also split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..0151dbf843e --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split in csv encoder, TODO check lost +"D","update_uk","test",450250823807270931,true,1,1,"example1" +"I","update_uk","test",450250823807270931,true,1,2,"example1" +"D","update_uk","test",450250823807270931,true,2,2,"example2" +"I","update_uk","test",450250823807270931,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450250823820115970,true,10,10,"example10" +"I","update_uk","test",450250823820115970,true,10,30,"example10" +"D","update_uk","test",450250823820115970,true,20,20,"example20" +"I","update_uk","test",450250823820115970,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450250823820115973,true,100,100,"example100" +"I","update_uk","test",450250823820115973,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res new file mode 100644 index 00000000000..33ff20f0291 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res @@ -0,0 +1,14 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# output raw change event +"U","update_pk","test",450250823807270922,2,"example1" +"U","update_pk","test",450250823807270922,1,"example2" +"U","update_pk","test",450250823807270925,30,"example10" +"U","update_pk","test",450250823807270925,40,"example20" +"U","update_pk","test",450250823807270927,200,"example100" +"U","update_pk","test",450250823807270928,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res new file mode 100644 index 00000000000..79798b3e9af --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res @@ -0,0 +1,14 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# output raw change event +"U","update_uk","test",450250823807270931,1,2,"example1" +"U","update_uk","test",450250823807270931,2,1,"example2" +"U","update_uk","test",450250823820115970,10,30,"example10" +"U","update_uk","test",450250823820115970,20,40,"example20" +"U","update_uk","test",450250823820115973,100,200,"example100" +"U","update_uk","test",450250823820115977,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res new file mode 100644 index 00000000000..79dc50dbf3c --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,false,1,"example1" +"D","update_pk","test",450250823807270922,false,2,"example2" +"I","update_pk","test",450250823807270922,false,2,"example1" +"I","update_pk","test",450250823807270922,false,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,false,10,"example10" +"D","update_pk","test",450250823807270925,false,20,"example20" +"I","update_pk","test",450250823807270925,false,30,"example10" +"I","update_pk","test",450250823807270925,false,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,false,100,"example100" +"I","update_pk","test",450250823807270927,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res new file mode 100644 index 00000000000..2225f55ff95 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,false,1,1,"example1" +"D","update_uk","test",450250823807270931,false,2,2,"example2" +"I","update_uk","test",450250823807270931,false,1,2,"example1" +"I","update_uk","test",450250823807270931,false,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,false,10,10,"example10" +"D","update_uk","test",450250823820115970,false,20,20,"example20" +"I","update_uk","test",450250823820115970,false,10,30,"example10" +"I","update_uk","test",450250823820115970,false,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,false,100,100,"example100" +"I","update_uk","test",450250823820115973,false,100,200,"example100" + +# normal update event, split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh index 3a88e00cb24..438c264a85c 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -48,7 +48,9 @@ function run() { run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_changefeed "changefeed1" $start_ts 10 true - run_changefeed "changefeed2" $start_ts 10 true + # changefeed2 fail since delete events are not sorted + run_changefeed "changefeed2" $start_ts 10 false + # changefeed3 fail since update pk/uk events are not split run_changefeed "changefeed3" $start_ts 10 false run_changefeed "changefeed4" $start_ts 20 true } From 6ddec5c8c81adba061dd6170091d322d572deac3 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 18:17:50 +0800 Subject: [PATCH 16/20] fix sync diff --- .../conf/diff_config.toml | 2 +- .../result/changefeed2_pk.res | 2 +- .../csv_storage_update_pk_nonclustered/run.sh | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml index f632bdbb98d..0714c0c18d9 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_nonclustered/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_nonclustered/sync_diff-/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res index 001401036fe..687afcae6bf 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res @@ -21,6 +21,6 @@ "D","update_pk","test",450250823807270927,true,100,"example100" "I","update_pk","test",450250823807270927,true,200,"example100" -# normal update event also split in csv encoder +# normal update event, also split in csv encoder "D","update_pk","test",450250823807270928,true,1000,"example1000" "I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh index 438c264a85c..adbf3392ffd 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -18,10 +18,13 @@ function run_changefeed() { run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id sleep 8 + + cp $CUR/conf/diff_config.toml $WORK_DIR/diff_config.toml + sed -i "s//$changefeed_id/" $WORK_DIR/diff_config.toml if [[ $should_pass_check == true ]]; then - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 100 else - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" fi real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) From 7b195e60d7173b9bbb4ff3711f71ea74f2412768 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 18:40:06 +0800 Subject: [PATCH 17/20] fix res --- .../result/changefeed1_pk.res | 22 ++++++++++++++++ .../result/changefeed1_uk.res | 24 +++++++++++++++++ .../result/changefeed2_pk.res | 26 +++++++++++++++++++ .../result/changefeed2_uk.res | 26 +++++++++++++++++++ .../result/changefeed3_pk.res | 24 +++++++++++++++++ .../result/changefeed3_uk.res | 14 ++++++++++ .../result/changefeed4_pk.res | 26 +++++++++++++++++++ .../result/changefeed4_uk.res | 26 +++++++++++++++++++ 8 files changed, 188 insertions(+) create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..ad6016c8059 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res @@ -0,0 +1,22 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +"U","update_pk","test",450253245485940753,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..ebe3a635252 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,1,1,"example1" +"D","update_uk","test",450253245499047940,2,2,"example2" +"I","update_uk","test",450253245499047940,1,2,"example1" +"I","update_uk","test",450253245499047940,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,10,10,"example10" +"D","update_uk","test",450253245499047943,20,20,"example20" +"I","update_uk","test",450253245499047943,10,30,"example10" +"I","update_uk","test",450253245499047943,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,100,100,"example100" +"I","update_uk","test",450253245499047946,100,200,"example100" + +"U","update_uk","test",450253245512155140,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..5e7f2ce0e71 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450253245499047940,true,1,1,"example1" +"I","update_uk","test",450253245499047940,true,1,2,"example1" +"D","update_uk","test",450253245499047940,true,2,2,"example2" +"I","update_uk","test",450253245499047940,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450253245499047943,true,10,10,"example10" +"I","update_uk","test",450253245499047943,true,10,30,"example10" +"D","update_uk","test",450253245499047943,true,20,20,"example20" +"I","update_uk","test",450253245499047943,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450253245499047946,true,100,100,"example100" +"I","update_uk","test",450253245499047946,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res new file mode 100644 index 00000000000..9f4448e5d9a --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# output raw change event +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +# normal update event +"U","update_pk","test",450253245485940753,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res new file mode 100644 index 00000000000..66348746ded --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res @@ -0,0 +1,14 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# output raw change event, data is consistent since replace by pk/uk +"U","update_uk","test",450253245499047940,1,2,"example1" +"U","update_uk","test",450253245499047940,2,1,"example2" +"U","update_uk","test",450253245499047943,10,30,"example10" +"U","update_uk","test",450253245499047943,20,40,"example20" +"U","update_uk","test",450253245499047946,100,200,"example100" +"U","update_uk","test",450253245512155140,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res new file mode 100644 index 00000000000..ea644868640 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,false,1,1,"example1" +"D","update_uk","test",450253245499047940,false,2,2,"example2" +"I","update_uk","test",450253245499047940,false,1,2,"example1" +"I","update_uk","test",450253245499047940,false,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,false,10,10,"example10" +"D","update_uk","test",450253245499047943,false,20,20,"example20" +"I","update_uk","test",450253245499047943,false,10,30,"example10" +"I","update_uk","test",450253245499047943,false,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,false,100,100,"example100" +"I","update_uk","test",450253245499047946,false,100,200,"example100" + +# normal update event, split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" From 19f031375de51b8886d3a8622a3b9c19b5699901 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 18:54:47 +0800 Subject: [PATCH 18/20] fix res --- .../result/changefeed2_pk.res | 4 +++- .../result/changefeed2_uk.res | 2 +- .../result/changefeed3_pk.res | 8 ++++++++ .../result/changefeed3_uk.res | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res index 687afcae6bf..e2713a94f63 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res @@ -5,7 +5,9 @@ "I","update_pk","test",450250823741472791,false,100,"example100" "I","update_pk","test",450250823741472792,false,1000,"example1000" -# split in csv encoder, lost id=2 since delete are not sorted before insert within single txn +# split in csv encoder +# DIFF_RES: REPLACE INTO `test`.`update_pk`(`id`,`pad`) VALUES (2,'example1'); +# lost id=2 since delete are not sorted before insert within single txn "D","update_pk","test",450250823807270922,true,1,"example1" "I","update_pk","test",450250823807270922,true,2,"example1" "D","update_pk","test",450250823807270922,true,2,"example2" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res index 0151dbf843e..1783ee5a0dd 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res @@ -5,7 +5,7 @@ "I","update_uk","test",450250823780794389,false,100,100,"example100" "I","update_uk","test",450250823780794390,false,1000,1000,"example1000" -# split in csv encoder, TODO check lost +# split in csv encoder, data is consistent since delete by pk "D","update_uk","test",450250823807270931,true,1,1,"example1" "I","update_uk","test",450250823807270931,true,1,2,"example1" "D","update_uk","test",450250823807270931,true,2,2,"example2" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res index 33ff20f0291..54595b6f49b 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res @@ -8,7 +8,15 @@ # output raw change event "U","update_pk","test",450250823807270922,2,"example1" "U","update_pk","test",450250823807270922,1,"example2" + + +# DIFF_RES: +# DELETE FROM `test`.`update_pk` WHERE `id` = 10 AND `pad` = 'example10' LIMIT 1; +# DELETE FROM `test`.`update_pk` WHERE `id` = 20 AND `pad` = 'example20' LIMIT 1; +# DELETE FROM `test`.`update_pk` WHERE `id` = 100 AND `pad` = 'example100' LIMIT 1; +# old row is not deleted since lack of old value "U","update_pk","test",450250823807270925,30,"example10" "U","update_pk","test",450250823807270925,40,"example20" "U","update_pk","test",450250823807270927,200,"example100" + "U","update_pk","test",450250823807270928,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res index 79798b3e9af..ed01246a1ed 100644 --- a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res @@ -5,7 +5,7 @@ "I","update_uk","test",450250823780794389,100,100,"example100" "I","update_uk","test",450250823780794390,1000,1000,"example1000" -# output raw change event +# output raw change event, data is consistent since replace by pk/uk "U","update_uk","test",450250823807270931,1,2,"example1" "U","update_uk","test",450250823807270931,2,1,"example2" "U","update_uk","test",450250823820115970,10,30,"example10" From 6ddd52820407997e426f16e88a29b3abbfd24c23 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Wed, 5 Jun 2024 20:55:36 +0800 Subject: [PATCH 19/20] fix errdoc --- errors.toml | 2 +- pkg/errors/cdc_errors.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/errors.toml b/errors.toml index bf22a8b5323..8ae2a3abbf1 100755 --- a/errors.toml +++ b/errors.toml @@ -363,7 +363,7 @@ illegal parameter for sorter: %s ["CDC:ErrIncompatibleSinkConfig"] error = ''' -incompatible configuration '%s' +incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri ''' ["CDC:ErrInternalCheckFailed"] diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 6e055b10d4a..cba2637a105 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -316,7 +316,8 @@ var ( errors.RFCCodeText("CDC:ErrSinkURIInvalid"), ) ErrIncompatibleSinkConfig = errors.Normalize( - "incompatible configuration '%s'", + "incompatible configuration in sink uri(%s) and config file(%s), "+ + "please try to update the configuration only through sink uri", errors.RFCCodeText("CDC:ErrIncompatibleSinkConfig"), ) ErrSinkUnknownProtocol = errors.Normalize( From 039b5449707feed013ea617009ebe149d77f1ac0 Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 11 Jun 2024 11:45:10 +0800 Subject: [PATCH 20/20] fix swag doc --- docs/swagger/docs.go | 21 +++++++++++++++++++++ docs/swagger/swagger.json | 21 +++++++++++++++++++++ docs/swagger/swagger.yaml | 18 ++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 4f605257db2..ad976303486 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1447,6 +1447,10 @@ var doc = `{ "output-column-id": { "type": "boolean" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "worker-count": { "type": "integer" } @@ -1602,6 +1606,10 @@ var doc = `{ "max-message-bytes": { "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "partition-num": { "type": "integer" }, @@ -1817,6 +1825,10 @@ var doc = `{ "description": "Set the operation timeout (default: 30 seconds)\nProducer-create, subscribe and unsubscribe operations will be retried until this interval, after which the\noperation will be marked as failed", "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "pulsar-producer-cache-size": { "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", "type": "integer" @@ -2452,6 +2464,9 @@ var doc = `{ "output_column_id": { "type": "boolean" }, + "output_raw_change_event": { + "type": "boolean" + }, "worker_count": { "type": "integer" } @@ -2723,6 +2738,9 @@ var doc = `{ "max_message_bytes": { "type": "integer" }, + "output_raw_change_event": { + "type": "boolean" + }, "partition_num": { "type": "integer" }, @@ -2944,6 +2962,9 @@ var doc = `{ "operation-timeout": { "type": "integer" }, + "output-raw-change-event": { + "type": "boolean" + }, "pulsar-producer-cache-size": { "type": "integer" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 38f0c2008e5..8d4f5d1907a 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1428,6 +1428,10 @@ "output-column-id": { "type": "boolean" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "worker-count": { "type": "integer" } @@ -1583,6 +1587,10 @@ "max-message-bytes": { "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "partition-num": { "type": "integer" }, @@ -1798,6 +1806,10 @@ "description": "Set the operation timeout (default: 30 seconds)\nProducer-create, subscribe and unsubscribe operations will be retried until this interval, after which the\noperation will be marked as failed", "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "pulsar-producer-cache-size": { "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", "type": "integer" @@ -2433,6 +2445,9 @@ "output_column_id": { "type": "boolean" }, + "output_raw_change_event": { + "type": "boolean" + }, "worker_count": { "type": "integer" } @@ -2704,6 +2719,9 @@ "max_message_bytes": { "type": "integer" }, + "output_raw_change_event": { + "type": "boolean" + }, "partition_num": { "type": "integer" }, @@ -2925,6 +2943,9 @@ "operation-timeout": { "type": "integer" }, + "output-raw-change-event": { + "type": "boolean" + }, "pulsar-producer-cache-size": { "type": "integer" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 11113b710e8..9cb8b63d7de 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -40,6 +40,10 @@ definitions: type: string output-column-id: type: boolean + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean worker-count: type: integer type: object @@ -147,6 +151,10 @@ definitions: $ref: '#/definitions/config.LargeMessageHandleConfig' max-message-bytes: type: integer + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean partition-num: type: integer read-timeout: @@ -306,6 +314,10 @@ definitions: Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed type: integer + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean pulsar-producer-cache-size: description: PulsarProducerCacheSize is the size of the cache of pulsar producers type: integer @@ -764,6 +776,8 @@ definitions: type: string output_column_id: type: boolean + output_raw_change_event: + type: boolean worker_count: type: integer type: object @@ -943,6 +957,8 @@ definitions: $ref: '#/definitions/v2.LargeMessageHandleConfig' max_message_bytes: type: integer + output_raw_change_event: + type: boolean partition_num: type: integer read_timeout: @@ -1088,6 +1104,8 @@ definitions: $ref: '#/definitions/v2.PulsarOAuth2' operation-timeout: type: integer + output-raw-change-event: + type: boolean pulsar-producer-cache-size: type: integer pulsar-version: