diff --git a/cdc/entry/mounter_group.go b/cdc/entry/mounter_group.go index d4b55149f0e..4efdb91f57a 100644 --- a/cdc/entry/mounter_group.go +++ b/cdc/entry/mounter_group.go @@ -18,10 +18,12 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -47,6 +49,7 @@ type mounterGroup struct { const ( defaultMounterWorkerNum = 16 + maxMounterWorkerNum = 512 defaultInputChanSize = 1024 defaultMetricInterval = 15 * time.Second ) @@ -63,6 +66,10 @@ func NewMounterGroup( if workerNum <= 0 { workerNum = defaultMounterWorkerNum } + if workerNum > maxMounterWorkerNum { + workerNum = maxMounterWorkerNum + log.Warn("limit worker num to avoid crash", zap.Int("workerNum", workerNum), zap.Any("maxMounterWorkerNum", maxMounterWorkerNum)) + } return &mounterGroup{ schemaStorage: schemaStorage, inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize), diff --git a/cdc/entry/mounter_group_test.go b/cdc/entry/mounter_group_test.go new file mode 100644 index 00000000000..f96f906684b --- /dev/null +++ b/cdc/entry/mounter_group_test.go @@ -0,0 +1,28 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package entry + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestWorkerNum(t *testing.T) { + mg := NewMounterGroup(nil, -1, nil, nil, model.ChangeFeedID4Test("", ""), nil) + require.Equal(t, mg.workerNum, defaultMounterWorkerNum) + mg = NewMounterGroup(nil, maxMounterWorkerNum+10, nil, nil, model.ChangeFeedID4Test("", ""), nil) + require.Equal(t, mg.workerNum, maxMounterWorkerNum) +} diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index fdcfe2c30ac..f97e41b2f42 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -16,11 +16,13 @@ package config import ( "fmt" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/pkg/compression" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" ) // ConsistentConfig represents replication consistency config for a changefeed. @@ -77,7 +79,7 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { return nil } - if c.MaxLogSize == 0 { + if c.MaxLogSize <= 0 { c.MaxLogSize = redo.DefaultMaxLogSize } @@ -104,12 +106,20 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { fmt.Sprintf("The consistent.compression:%s must be 'none' or 'lz4'", c.Compression)) } - if c.EncodingWorkerNum == 0 { + if c.EncodingWorkerNum <= 0 { c.EncodingWorkerNum = redo.DefaultEncodingWorkerNum } - if c.FlushWorkerNum == 0 { + if c.EncodingWorkerNum > redo.MaxEncodingWorkerNum { + log.Warn("limit encoding worker num to avoid crash", zap.Int("encodingWorkerNum", c.EncodingWorkerNum), zap.Any("maxEncodingWorkerNum", redo.MaxEncodingWorkerNum)) + c.EncodingWorkerNum = redo.MaxEncodingWorkerNum + } + if c.FlushWorkerNum <= 0 { c.FlushWorkerNum = redo.DefaultFlushWorkerNum } + if c.FlushWorkerNum > redo.MaxFlushWorkerNum { + log.Warn("limit flush worker num to avoid crash", zap.Int("flushWorkerNum", c.FlushWorkerNum), zap.Any("maxFlushWorkerNum", redo.MaxFlushWorkerNum)) + c.FlushWorkerNum = redo.MaxFlushWorkerNum + } uri, err := storage.ParseRawURL(c.Storage) if err != nil { diff --git a/pkg/config/consistent_test.go b/pkg/config/consistent_test.go new file mode 100644 index 00000000000..3fa8056230e --- /dev/null +++ b/pkg/config/consistent_test.go @@ -0,0 +1,55 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" + "github.com/stretchr/testify/require" +) + +func TestConsistentConfig(t *testing.T) { + config := ConsistentConfig{ + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: -1, + EncodingWorkerNum: -1, + FlushWorkerNum: -1, + } + config.ValidateAndAdjust() + require.Equal(t, config.MaxLogSize, redo.DefaultMaxLogSize) + require.EqualValues(t, config.FlushIntervalInMs, redo.DefaultFlushIntervalInMs) + require.EqualValues(t, config.MetaFlushIntervalInMs, redo.DefaultMetaFlushIntervalInMs) + require.EqualValues(t, config.EncodingWorkerNum, redo.DefaultEncodingWorkerNum) + require.EqualValues(t, config.FlushWorkerNum, redo.DefaultFlushWorkerNum) + + config.EncodingWorkerNum = redo.MaxEncodingWorkerNum + 10 + config.FlushWorkerNum = redo.MaxFlushWorkerNum + 10 + config.ValidateAndAdjust() + require.EqualValues(t, config.EncodingWorkerNum, redo.MaxEncodingWorkerNum) + require.EqualValues(t, config.FlushWorkerNum, redo.MaxFlushWorkerNum) + + config.FlushIntervalInMs = -1 + require.ErrorIs(t, config.ValidateAndAdjust(), cerror.ErrInvalidReplicaConfig) + config.FlushIntervalInMs = 0 + + config.MetaFlushIntervalInMs = -1 + require.ErrorIs(t, config.ValidateAndAdjust(), cerror.ErrInvalidReplicaConfig) + config.MetaFlushIntervalInMs = 0 + + config.Compression = "compress" + require.ErrorIs(t, config.ValidateAndAdjust(), cerror.ErrInvalidReplicaConfig) + config.Compression = "" +} diff --git a/pkg/redo/config.go b/pkg/redo/config.go index 4b7821b1048..d3875d02977 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -60,6 +60,11 @@ const ( // `8*64MB = 512MB` by default. DefaultFlushWorkerNum = 8 + // MaxEncodingWorkerNum is the max number of encoding workers. + MaxEncodingWorkerNum = 512 + // MaxFlushWorkerNum is the max number of flush workers. + MaxFlushWorkerNum = 256 + // DefaultFileMode is the default mode when operation files DefaultFileMode = 0o644 // DefaultDirMode is the default mode when operation dir diff --git a/pkg/sink/codec/encoder_group_test.go b/pkg/sink/codec/encoder_group_test.go new file mode 100644 index 00000000000..500f6bdbc37 --- /dev/null +++ b/pkg/sink/codec/encoder_group_test.go @@ -0,0 +1,44 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/util/cpu" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/util" + "github.com/stretchr/testify/require" +) + +func TestEncoderConcurrency(t *testing.T) { + cfg := config.GetDefaultReplicaConfig().Sink + cfg.EncoderConcurrency = util.AddressOf(-1) + eg := NewEncoderGroup( + cfg, + nil, + model.ChangeFeedID4Test("", ""), + ) + require.Equal(t, len(eg.inputCh), config.DefaultEncoderGroupConcurrency) + + limitConcurrency := cpu.GetCPUCount() * 10 + cfg.EncoderConcurrency = util.AddressOf(limitConcurrency + 10) + eg = NewEncoderGroup( + cfg, + nil, + model.ChangeFeedID4Test("", ""), + ) + require.Equal(t, len(eg.inputCh), limitConcurrency) +}