Skip to content

Commit

Permalink
DBZ-8512 Support storages supported by Debezium operator for pipeline…
Browse files Browse the repository at this point in the history
… in Debezium platform (#8)

* DBZ-8512 Use JDBC store as default for offsets and schema history store

* DBZ-8512 Enable offset store in helm

* DBZ-8512 Configure watcher to create publication only for the outbox table

* DBZ-8512 Use CustomStore to configure the DS resources

* DBZ-8512 Update debezium operator chart

---------

Signed-off-by: Fiore Mario Vitale <[email protected]>
  • Loading branch information
mfvitale authored Jan 17, 2025
1 parent 3a3aefc commit c9930cc
Show file tree
Hide file tree
Showing 17 changed files with 605 additions and 52 deletions.
16 changes: 13 additions & 3 deletions debezium-platform-conductor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.17.3</quarkus.platform.version>
<quarkus.platform.version>3.17.4</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.2.5</surefire-plugin.version>
<version.assertj>3.24.2</version.assertj>
<version.lombok>1.18.32</version.lombok>
<version.mapstruct>1.5.5.Final</version.mapstruct>
<version.debezium>3.0.4.Final</version.debezium>
<version.debezium>3.0.5.Final</version.debezium>

<quarkus.container-image.registry>quay.io</quarkus.container-image.registry>
<quarkus.container-image.group>debezium</quarkus.container-image.group>
Expand All @@ -28,7 +28,7 @@

<!-- Set formatting default goals -->
<version.impsort>1.12.0</version.impsort>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<version.checkstyle.plugin>3.6.0</version.checkstyle.plugin>
<version.code.formatter>2.20.0</version.code.formatter>
<format.formatter.goal>format</format.formatter.goal>
<format.imports.goal>sort</format.imports.goal>
Expand Down Expand Up @@ -214,6 +214,16 @@
<artifactId>mapstruct</artifactId>
<version>${version.mapstruct}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.config;

import java.util.Map;

public interface OffsetConfigGroup {

OffsetStorageConfigGroup storage();

Map<String, String> config();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.config;

import java.util.Map;

public interface OffsetStorageConfigGroup {
String type();

Map<String, String> config();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.config;

import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithName;

@ConfigMapping(prefix = "pipeline")
@ConfigRoot(prefix = "pipeline", phase = ConfigPhase.RUN_TIME)
public interface PipelineConfigGroup {

OffsetConfigGroup offset();

@WithName("schema.history")
SchemaHistoryConfigGroup schema();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.config;

import java.util.Map;

public interface SchemaHistoryConfigGroup {

String internal();

Map<String, String> config();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.platform.environment.operator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -19,14 +21,17 @@
import io.debezium.operator.api.model.runtime.RuntimeBuilder;
import io.debezium.operator.api.model.runtime.metrics.JmxExporterBuilder;
import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder;
import io.debezium.operator.api.model.source.Offset;
import io.debezium.operator.api.model.source.OffsetBuilder;
import io.debezium.operator.api.model.source.SchemaHistory;
import io.debezium.operator.api.model.source.SchemaHistoryBuilder;
import io.debezium.operator.api.model.source.SourceBuilder;
import io.debezium.operator.api.model.source.storage.offset.InMemoryOffsetStore;
import io.debezium.operator.api.model.source.storage.schema.InMemorySchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.CustomStoreBuilder;
import io.debezium.platform.config.PipelineConfigGroup;
import io.debezium.platform.domain.views.flat.PipelineFlat;
import io.debezium.platform.environment.PipelineController;
import io.debezium.platform.environment.logs.LogReader;
import io.debezium.platform.environment.operator.configuration.TableNameResolver;
import io.debezium.platform.environment.operator.logs.KubernetesLogReader;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
Expand All @@ -37,11 +42,18 @@
public class OperatorPipelineController implements PipelineController {

public static final String LABEL_DBZ_CONDUCTOR_ID = "debezium.io/conductor-id";
private static final List<String> RESOLVABLE_CONFIGS = List.of("jdbc.schema.history.table.name", "jdbc.offset.table.name");

private final KubernetesClient k8s;
private final PipelineConfigGroup pipelineConfigGroup;
private final TableNameResolver tableNameResolver;

public OperatorPipelineController(KubernetesClient k8s) {
public OperatorPipelineController(KubernetesClient k8s,
PipelineConfigGroup pipelineConfigGroup,
TableNameResolver tableNameResolver) {
this.k8s = k8s;
this.pipelineConfigGroup = pipelineConfigGroup;
this.tableNameResolver = tableNameResolver;
}

@Override
Expand All @@ -68,18 +80,10 @@ public void deploy(PipelineFlat pipeline) {
var sourceConfig = new ConfigProperties();
sourceConfig.setAllProps(source.getConfig());

// TODO: offset and schema history type should be configurable in the future
var offset = new OffsetBuilder()
.withMemory(new InMemoryOffsetStore())
.build();
var schemaHistory = new SchemaHistoryBuilder()
.withMemory(new InMemorySchemaHistoryStore())
.build();

var dsSource = new SourceBuilder()
.withSourceClass(source.getType())
.withOffset(offset)
.withSchemaHistory(schemaHistory)
.withOffset(getOffset(pipeline))
.withSchemaHistory(getSchemaHistory(pipeline))
.withConfig(sourceConfig)
.build();

Expand Down Expand Up @@ -111,6 +115,40 @@ public void deploy(PipelineFlat pipeline) {
k8s.resource(ds).serverSideApply();
}

private SchemaHistory getSchemaHistory(PipelineFlat pipeline) {

var pipelineSchemaHistoryConfigs = pipelineConfigGroup.schema().config();
var schemaHistoryType = pipelineConfigGroup.schema().internal();

Map<String, String> schemaHistoryStorageConfigs = new HashMap<>(pipelineSchemaHistoryConfigs);
ConfigProperties schemaHistoryProps = new ConfigProperties();
schemaHistoryStorageConfigs.forEach(schemaHistoryProps::setProps);

RESOLVABLE_CONFIGS.forEach(prop -> schemaHistoryProps.setProps(prop, tableNameResolver.resolve(pipeline, schemaHistoryStorageConfigs.get(prop))));

return new SchemaHistoryBuilder().withStore(new CustomStoreBuilder()
.withType(schemaHistoryType)
.withConfig(schemaHistoryProps)
.build()).build();
}

private Offset getOffset(PipelineFlat pipeline) {

var pipelineOffsetConfigs = pipelineConfigGroup.offset().storage().config();
var offsetType = pipelineConfigGroup.offset().storage().type();

Map<String, String> offsetStorageConfigs = new HashMap<>(pipelineOffsetConfigs);
ConfigProperties offsetStorageProps = new ConfigProperties();
offsetStorageConfigs.forEach(offsetStorageProps::setProps);

RESOLVABLE_CONFIGS.forEach(prop -> offsetStorageProps.setProps(prop, tableNameResolver.resolve(pipeline, pipelineOffsetConfigs.get(prop))));

return new OffsetBuilder().withStore(new CustomStoreBuilder()
.withType(offsetType)
.withConfig(offsetStorageProps)
.build()).build();
}

@Override
public void undeploy(Long id) {
k8s.resources(DebeziumServer.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.platform.environment.operator.configuration;

import java.util.List;
import java.util.function.Function;

import jakarta.enterprise.context.Dependent;

import io.debezium.platform.domain.views.flat.PipelineFlat;

@Dependent
public class TableNameResolver {

private static final List<PlaceHolder> PLACE_HOLDERS = List.of(
new PlaceHolder("@{pipeline_name}", PipelineFlat::getName));

public String resolve(PipelineFlat pipeline, String currentValue) {

if (currentValue == null || currentValue.isEmpty()) {
return currentValue;
}

String processedValue = currentValue;
for (PlaceHolder placeHolder : PLACE_HOLDERS) {
processedValue = placeHolder.apply(processedValue, pipeline);
}

return sanitizeTableName(processedValue);
}

private record PlaceHolder(String token, Function<PipelineFlat, String> valueResolver) {

String apply(String text, PipelineFlat pipeline) {
return text.replace(token, valueResolver.apply(pipeline));
}
}

/**
* Sanitizes a string to be used as a PostgreSQL table name.
* - Replaces invalid characters with underscores
* - Ensures the name starts with a letter or underscore
* - Truncates the name to 63 bytes (PostgreSQL's limit)
*
* @param tableName The original table name
* @return A sanitized version safe for use as a PostgreSQL table name
*/
private String sanitizeTableName(String tableName) {

if (tableName == null || tableName.isEmpty()) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}

// PostgreSQL folds unquoted identifiers to lowercase
String sanitized = tableName.toLowerCase();

sanitized = sanitized.replaceAll("[^a-z0-9_]", "_");

// Ensure the name starts with a letter or underscore
if (!sanitized.matches("^[a-z_].*")) {
sanitized = "_" + sanitized;
}

// Remove consecutive underscores
sanitized = sanitized.replaceAll("_+", "_");

// Trim trailing underscore
sanitized = sanitized.replaceAll("_$", "");

// Truncate to PostgreSQL's limit of 63 bytes
if (sanitized.length() > 63) {
sanitized = sanitized.substring(0, 63);
// If we happened to end with an underscore after truncating, remove it
sanitized = sanitized.replaceAll("_$", "");
}

return sanitized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig.AutoCreateMode;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.engine.DebeziumEngine;
import io.debezium.platform.config.OffsetConfigGroup;
import io.debezium.platform.environment.watcher.config.WatcherConfig;
import io.debezium.platform.environment.watcher.config.WatcherConfigGroup;
import io.debezium.platform.environment.watcher.consumers.OutboxParentEventConsumer;
import io.debezium.transforms.outbox.EventRouter;
import io.quarkus.runtime.ShutdownEvent;
Expand Down Expand Up @@ -77,6 +78,7 @@ public void start() {
.with(PostgresConnectorConfig.PLUGIN_NAME, PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.getValue())
.with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.%s".formatted(outbox.table()))
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, AutoCreateMode.FILTERED)
.with("transforms", "outbox")
.with("transforms.outbox.type", EventRouter.class.getName())
.with("transforms.outbox.table.fields.additional.placement", extraFields);
Expand All @@ -95,7 +97,7 @@ public void start() {
executor.execute(engine);
}

private Map<String, String> offsetConfigurations(WatcherConfigGroup.OffsetConfigGroup offset) {
private Map<String, String> offsetConfigurations(OffsetConfigGroup offset) {

Map<String, String> config = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
*/
package io.debezium.platform.environment.watcher.config;

import java.util.Map;
import java.util.Optional;

import io.debezium.platform.config.OffsetConfigGroup;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;

@ConfigMapping(prefix = "conductor.watcher")
@ConfigRoot(prefix = "conductor", phase = ConfigPhase.RUN_TIME)
public interface WatcherConfigGroup {

boolean enabled();
Expand All @@ -19,15 +22,4 @@ public interface WatcherConfigGroup {

OffsetConfigGroup offset();

interface OffsetConfigGroup {
OffsetStorageConfigGroup storage();

Map<String, String> config();
}

interface OffsetStorageConfigGroup {
String type();

Map<String, String> config();
}
}
Loading

0 comments on commit c9930cc

Please sign in to comment.