Skip to content

Commit

Permalink
DBZ-8325 Include config to avoid name conflicts for per-table connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 21, 2024
1 parent 4fce59d commit c379e0d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy<TableId> {

private final String overrideDataChangeTopicPrefix;
private final String overrideSchemaChangeTopicName;

public TableTopicNamingStrategy(Properties props) {
super(props);
Configuration config = Configuration.from(props);
this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
this.overrideSchemaChangeTopicName = config.getString(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC);
}

public static TableTopicNamingStrategy create(CommonConnectorConfig config) {
Expand All @@ -44,4 +46,15 @@ public String dataChangeTopic(TableId id) {
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}

@Override
public String schemaChangeTopic() {
String topicName;
if (!Strings.isNullOrBlank(overrideSchemaChangeTopicName)) {
return overrideSchemaChangeTopicName;
}
else {
return prefix;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic")
.withDisplayName("Override schema change topic name")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx).");

public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen")
.withDisplayName("Offset storage task key generation number")
.withType(Type.INT)
Expand Down Expand Up @@ -521,6 +529,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
OVERRIDE_SCHEMA_CHANGE_TOPIC,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,26 @@ public void shouldUseTopicPrefixIfOverrideIsBlank() {
assertThat(topicName).isEqualTo("prefix.table");
}

@Test
public void shouldGetOverrideSchemaChangeTopic() {
TableId tableId = new TableId("shard", "keyspace", "table");
final Properties props = new Properties();
props.put("topic.prefix", "prefix");
props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "override-prefix");
TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
String topicName = strategy.schemaChangeTopic();
assertThat(topicName).isEqualTo("override-prefix");
}

@Test
public void shouldUseTopicPrefixIfOverrideSchemaIsBlank() {
TableId tableId = new TableId("shard", "keyspace", "table");
final Properties props = new Properties();
props.put("topic.prefix", "prefix");
props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "");
TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
String topicName = strategy.schemaChangeTopic();
assertThat(topicName).isEqualTo("prefix");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ public void shouldBlankOverrideTopicPrefixFailValidation() {
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldImproperOverrideSchemaTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "hello@world").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldBlankOverrideSchemaTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldExcludeEmptyShards() {
Configuration configuration = TestHelper.defaultConfig().with(
Expand Down

0 comments on commit c379e0d

Please sign in to comment.