Skip to content

Commit

Permalink
changefeedccl: sunset pubsub v1 sink
Browse files Browse the repository at this point in the history
The pubsub v2 sink implementation has been GA since v23.1 and enabled
by default since v23.2. This PR removes the pubsub v1 sink code and
retires the non-public cluster setting
changefeed.new_pubsub_sink_enabled/changefeed.new_pubsub_sink.enabled.

Epic: CRDB-37336
Informs: #121249

Release note (enterprise change): The cluster setting
changefeed.new_pubsub_sink_enabled/changefeed.new_pubsub_sink.enabled is
no longer supported. The new pubsub sink has been enabled by default
since v23.2, and the first version pubsub sink has been removed.
  • Loading branch information
rharding6373 committed Feb 26, 2025
1 parent 4215a94 commit 643e1f7
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 742 deletions.
2 changes: 0 additions & 2 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ go_library(
"sink_external_connection.go",
"sink_kafka.go",
"sink_kafka_v2.go",
"sink_pubsub.go",
"sink_pubsub_v2.go",
"sink_pulsar.go",
"sink_sql.go",
Expand Down Expand Up @@ -177,7 +176,6 @@ go_library(
"@com_github_twmb_franz_go//pkg/kgo",
"@com_github_twmb_franz_go//pkg/kversion",
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@org_golang_google_api//impersonate",
Expand Down
8 changes: 0 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4113,9 +4113,6 @@ func TestChangefeedOutputTopics(t *testing.T) {
defer cleanup()
s := cluster.Server(1)

// Only pubsub v2 emits notices.
PubsubV2Enabled.Override(context.Background(), &s.ClusterSettings().SV, true)

pgURL, cleanup := pgurlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
pgBase, err := pq.NewConnector(pgURL.String())
Expand Down Expand Up @@ -9748,8 +9745,6 @@ func TestChangefeedPubsubResolvedMessages(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, "CREATE TABLE one (i int)")
Expand Down Expand Up @@ -9908,7 +9903,6 @@ func TestParallelIOMetrics(t *testing.T) {
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, `SET CLUSTER SETTING changefeed.new_pubsub_sink_enabled = true`)
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
db.Exec(t, `
CREATE TABLE foo (a INT PRIMARY KEY);
Expand Down Expand Up @@ -9984,8 +9978,6 @@ func TestPubsubAttributes(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ctx := context.Background()
PubsubV2Enabled.Override(ctx, &s.Server.ClusterSettings().SV, true)
db := sqlutils.MakeSQLRunner(s.DB)

// asserts the next message has these attributes and is sent to each of the supplied topics.
Expand Down
24 changes: 4 additions & 20 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,6 @@ var WebhookV2Enabled = settings.RegisterBoolSetting(
settings.WithName("changefeed.new_webhook_sink.enabled"),
)

// PubsubV2Enabled determines whether or not the refactored Webhook sink
// or the deprecated sink should be used.
var PubsubV2Enabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"changefeed.new_pubsub_sink_enabled",
"if enabled, this setting enables a new implementation of the pubsub sink"+
" that allows for a higher throughput",
// TODO: delete the original pubsub sink code
metamorphic.ConstantWithTestBool("changefeed.new_pubsub_sink.enabled", true),
settings.WithName("changefeed.new_pubsub_sink.enabled"),
)

// KafkaV2Enabled determines whether or not the refactored Kafka sink
// or the deprecated sink should be used.
var KafkaV2Enabled = settings.RegisterBoolSetting(
Expand Down Expand Up @@ -309,14 +297,10 @@ func getSink(
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
testingKnobs = knobs
}
if PubsubV2Enabled.Get(&serverCfg.Settings.SV) {
return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg),
opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg),
newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
metricsBuilder, serverCfg.Settings, testingKnobs)
} else {
return makeDeprecatedPubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), metricsBuilder, testingKnobs)
}
return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg),
opts.IsSet(changefeedbase.OptUnordered), numSinkIOWorkers(serverCfg),
newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
metricsBuilder, serverCfg.Settings, testingKnobs)
case isCloudStorageSink(u):
return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) {
var testingKnobs *TestingKnobs
Expand Down
Loading

0 comments on commit 643e1f7

Please sign in to comment.