Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8069 Add override.data.change.topic.prefix property to TableTopicNamingStrategy #203

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import java.util.Properties;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.util.Collect;
import io.debezium.util.Strings;

/**
* Topic naming strategy where only the table name is added. This is used to avoid including
Expand All @@ -19,8 +21,12 @@
*/
public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy<TableId> {

private final String overrideDataChangeTopicPrefix;

public TableTopicNamingStrategy(Properties props) {
super(props);
Configuration config = Configuration.from(props);
this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
}

public static TableTopicNamingStrategy create(CommonConnectorConfig config) {
Expand All @@ -29,7 +35,13 @@ public static TableTopicNamingStrategy create(CommonConnectorConfig config) {

@Override
public String dataChangeTopic(TableId id) {
String topicName = mkString(Collect.arrayListOf(prefix, id.table()), delimiter);
String topicName;
if (!Strings.isNullOrBlank(overrideDataChangeTopicPrefix)) {
topicName = mkString(Collect.arrayListOf(overrideDataChangeTopicPrefix, id.table()), delimiter);
}
else {
topicName = mkString(Collect.arrayListOf(prefix, id.table()), delimiter);
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'false' (the default) offsets are stored as a single unit under the database name. "
+ "'true' stores the offsets per task id");

public static final Field OVERRIDE_DATA_CHANGE_TOPIC_PREFIX = Field.create("override.data.change.topic.prefix")
.withDisplayName("Override Data Topic prefix")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen")
.withDisplayName("Offset storage task key generation number")
.withType(Type.INT)
Expand Down Expand Up @@ -440,6 +448,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GRPC_HEADERS,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,28 @@ public void shouldGetTopicNameWithoutShard() {
assertThat(topicName).isEqualTo("prefix.table");
}

@Test
public void shouldGetOverrideDataChangeTopic() {
TableId tableId = new TableId("shard", "keyspace", "table");
final Properties props = new Properties();
props.put("topic.delimiter", ".");
props.put("topic.prefix", "prefix");
props.put(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), "override-prefix");
TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
String topicName = strategy.dataChangeTopic(tableId);
assertThat(topicName).isEqualTo("override-prefix.table");
}

@Test
public void shouldUseTopicPrefixIfOverrideIsBlank() {
TableId tableId = new TableId("shard", "keyspace", "table");
final Properties props = new Properties();
props.put("topic.delimiter", ".");
props.put("topic.prefix", "prefix");
props.put(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), "");
TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
String topicName = strategy.dataChangeTopic(tableId);
assertThat(topicName).isEqualTo("prefix.table");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

import org.junit.Test;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -44,4 +48,28 @@ public void shouldGetVitessHeartbeatNoOp() {
assertThat(heartbeat).isEqualTo(Heartbeat.DEFAULT_NOOP_HEARTBEAT);
}

@Test
public void shouldImproperOverrideTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, "hello@world").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldBlankOverrideTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, "").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

}