From c9930cc37b4085aa84e12ad4a21e5007b3c35e78 Mon Sep 17 00:00:00 2001 From: Fiore Mario Vitale Date: Fri, 17 Jan 2025 10:51:41 +0100 Subject: [PATCH] DBZ-8512 Support storages supported by Debezium operator for pipeline in Debezium platform (#8) * DBZ-8512 Use JDBC store as default for offsets and schema history store * DBZ-8512 Enable offset store in helm * DBZ-8512 Configure watcher to create publication only for the outbox table * DBZ-8512 Use CustomStore to configure the DS resources * DBZ-8512 Update debezium operator chart --------- Signed-off-by: Fiore Mario Vitale --- debezium-platform-conductor/pom.xml | 16 +- .../platform/config/OffsetConfigGroup.java | 15 ++ .../config/OffsetStorageConfigGroup.java | 14 ++ .../platform/config/PipelineConfigGroup.java | 22 +++ .../config/SchemaHistoryConfigGroup.java | 15 ++ .../operator/OperatorPipelineController.java | 64 ++++++-- .../configuration/TableNameResolver.java | 82 ++++++++++ .../watcher/ConductorEnvironmentWatcher.java | 6 +- .../watcher/config/WatcherConfigGroup.java | 16 +- .../src/main/resources/application.yml | 50 ++++++ .../configuration/TableNameResolverTest.java | 152 ++++++++++++++++++ helm/Chart.yaml | 2 +- helm/README.md | 57 ++++--- helm/templates/_helpers.tpl | 56 +++++++ helm/templates/conductor-deployment.yaml | 2 + helm/values.schema.json | 68 +++++++- helm/values.yaml | 20 +++ 17 files changed, 605 insertions(+), 52 deletions(-) create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetConfigGroup.java create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetStorageConfigGroup.java create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/config/PipelineConfigGroup.java create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/config/SchemaHistoryConfigGroup.java create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/configuration/TableNameResolver.java create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/operator/configuration/TableNameResolverTest.java diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml index 34acc15..1fa4bf4 100644 --- a/debezium-platform-conductor/pom.xml +++ b/debezium-platform-conductor/pom.xml @@ -12,13 +12,13 @@ UTF-8 quarkus-bom io.quarkus.platform - 3.17.3 + 3.17.4 true 3.2.5 3.24.2 1.18.32 1.5.5.Final - 3.0.4.Final + 3.0.5.Final quay.io debezium @@ -28,7 +28,7 @@ 1.12.0 - 3.1.1 + 3.6.0 2.20.0 format sort @@ -214,6 +214,16 @@ mapstruct ${version.mapstruct} + + + org.mockito + mockito-core + + + org.mockito + mockito-junit-jupiter + test + diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetConfigGroup.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetConfigGroup.java new file mode 100644 index 0000000..3bf45e0 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetConfigGroup.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.config; + +import java.util.Map; + +public interface OffsetConfigGroup { + + OffsetStorageConfigGroup storage(); + + Map config(); +} diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetStorageConfigGroup.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetStorageConfigGroup.java new file mode 100644 index 0000000..c29a879 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/OffsetStorageConfigGroup.java @@ -0,0 +1,14 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.config; + +import java.util.Map; + +public interface OffsetStorageConfigGroup { + String type(); + + Map config(); +} diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/config/PipelineConfigGroup.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/PipelineConfigGroup.java new file mode 100644 index 0000000..422b0d3 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/PipelineConfigGroup.java @@ -0,0 +1,22 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "pipeline") +@ConfigRoot(prefix = "pipeline", phase = ConfigPhase.RUN_TIME) +public interface PipelineConfigGroup { + + OffsetConfigGroup offset(); + + @WithName("schema.history") + SchemaHistoryConfigGroup schema(); + +} diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/config/SchemaHistoryConfigGroup.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/SchemaHistoryConfigGroup.java new file mode 100644 index 0000000..ace4f57 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/config/SchemaHistoryConfigGroup.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.config; + +import java.util.Map; + +public interface SchemaHistoryConfigGroup { + + String internal(); + + Map config(); +} diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/OperatorPipelineController.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/OperatorPipelineController.java index e1bc56d..2612b0d 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/OperatorPipelineController.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/OperatorPipelineController.java @@ -5,6 +5,8 @@ */ package io.debezium.platform.environment.operator; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -19,14 +21,17 @@ import io.debezium.operator.api.model.runtime.RuntimeBuilder; import io.debezium.operator.api.model.runtime.metrics.JmxExporterBuilder; import io.debezium.operator.api.model.runtime.metrics.MetricsBuilder; +import io.debezium.operator.api.model.source.Offset; import io.debezium.operator.api.model.source.OffsetBuilder; +import io.debezium.operator.api.model.source.SchemaHistory; import io.debezium.operator.api.model.source.SchemaHistoryBuilder; import io.debezium.operator.api.model.source.SourceBuilder; -import io.debezium.operator.api.model.source.storage.offset.InMemoryOffsetStore; -import io.debezium.operator.api.model.source.storage.schema.InMemorySchemaHistoryStore; +import io.debezium.operator.api.model.source.storage.CustomStoreBuilder; +import io.debezium.platform.config.PipelineConfigGroup; import io.debezium.platform.domain.views.flat.PipelineFlat; import io.debezium.platform.environment.PipelineController; import io.debezium.platform.environment.logs.LogReader; +import io.debezium.platform.environment.operator.configuration.TableNameResolver; import io.debezium.platform.environment.operator.logs.KubernetesLogReader; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; @@ -37,11 +42,18 @@ public class OperatorPipelineController implements PipelineController { public static final String LABEL_DBZ_CONDUCTOR_ID = "debezium.io/conductor-id"; + private static final List RESOLVABLE_CONFIGS = List.of("jdbc.schema.history.table.name", "jdbc.offset.table.name"); private final KubernetesClient k8s; + private final PipelineConfigGroup pipelineConfigGroup; + private final TableNameResolver tableNameResolver; - public OperatorPipelineController(KubernetesClient k8s) { + public OperatorPipelineController(KubernetesClient k8s, + PipelineConfigGroup pipelineConfigGroup, + TableNameResolver tableNameResolver) { this.k8s = k8s; + this.pipelineConfigGroup = pipelineConfigGroup; + this.tableNameResolver = tableNameResolver; } @Override @@ -68,18 +80,10 @@ public void deploy(PipelineFlat pipeline) { var sourceConfig = new ConfigProperties(); sourceConfig.setAllProps(source.getConfig()); - // TODO: offset and schema history type should be configurable in the future - var offset = new OffsetBuilder() - .withMemory(new InMemoryOffsetStore()) - .build(); - var schemaHistory = new SchemaHistoryBuilder() - .withMemory(new InMemorySchemaHistoryStore()) - .build(); - var dsSource = new SourceBuilder() .withSourceClass(source.getType()) - .withOffset(offset) - .withSchemaHistory(schemaHistory) + .withOffset(getOffset(pipeline)) + .withSchemaHistory(getSchemaHistory(pipeline)) .withConfig(sourceConfig) .build(); @@ -111,6 +115,40 @@ public void deploy(PipelineFlat pipeline) { k8s.resource(ds).serverSideApply(); } + private SchemaHistory getSchemaHistory(PipelineFlat pipeline) { + + var pipelineSchemaHistoryConfigs = pipelineConfigGroup.schema().config(); + var schemaHistoryType = pipelineConfigGroup.schema().internal(); + + Map schemaHistoryStorageConfigs = new HashMap<>(pipelineSchemaHistoryConfigs); + ConfigProperties schemaHistoryProps = new ConfigProperties(); + schemaHistoryStorageConfigs.forEach(schemaHistoryProps::setProps); + + RESOLVABLE_CONFIGS.forEach(prop -> schemaHistoryProps.setProps(prop, tableNameResolver.resolve(pipeline, schemaHistoryStorageConfigs.get(prop)))); + + return new SchemaHistoryBuilder().withStore(new CustomStoreBuilder() + .withType(schemaHistoryType) + .withConfig(schemaHistoryProps) + .build()).build(); + } + + private Offset getOffset(PipelineFlat pipeline) { + + var pipelineOffsetConfigs = pipelineConfigGroup.offset().storage().config(); + var offsetType = pipelineConfigGroup.offset().storage().type(); + + Map offsetStorageConfigs = new HashMap<>(pipelineOffsetConfigs); + ConfigProperties offsetStorageProps = new ConfigProperties(); + offsetStorageConfigs.forEach(offsetStorageProps::setProps); + + RESOLVABLE_CONFIGS.forEach(prop -> offsetStorageProps.setProps(prop, tableNameResolver.resolve(pipeline, pipelineOffsetConfigs.get(prop)))); + + return new OffsetBuilder().withStore(new CustomStoreBuilder() + .withType(offsetType) + .withConfig(offsetStorageProps) + .build()).build(); + } + @Override public void undeploy(Long id) { k8s.resources(DebeziumServer.class) diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/configuration/TableNameResolver.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/configuration/TableNameResolver.java new file mode 100644 index 0000000..7d82811 --- /dev/null +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/operator/configuration/TableNameResolver.java @@ -0,0 +1,82 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.operator.configuration; + +import java.util.List; +import java.util.function.Function; + +import jakarta.enterprise.context.Dependent; + +import io.debezium.platform.domain.views.flat.PipelineFlat; + +@Dependent +public class TableNameResolver { + + private static final List PLACE_HOLDERS = List.of( + new PlaceHolder("@{pipeline_name}", PipelineFlat::getName)); + + public String resolve(PipelineFlat pipeline, String currentValue) { + + if (currentValue == null || currentValue.isEmpty()) { + return currentValue; + } + + String processedValue = currentValue; + for (PlaceHolder placeHolder : PLACE_HOLDERS) { + processedValue = placeHolder.apply(processedValue, pipeline); + } + + return sanitizeTableName(processedValue); + } + + private record PlaceHolder(String token, Function valueResolver) { + + String apply(String text, PipelineFlat pipeline) { + return text.replace(token, valueResolver.apply(pipeline)); + } + } + + /** + * Sanitizes a string to be used as a PostgreSQL table name. + * - Replaces invalid characters with underscores + * - Ensures the name starts with a letter or underscore + * - Truncates the name to 63 bytes (PostgreSQL's limit) + * + * @param tableName The original table name + * @return A sanitized version safe for use as a PostgreSQL table name + */ + private String sanitizeTableName(String tableName) { + + if (tableName == null || tableName.isEmpty()) { + throw new IllegalArgumentException("Table name cannot be null or empty"); + } + + // PostgreSQL folds unquoted identifiers to lowercase + String sanitized = tableName.toLowerCase(); + + sanitized = sanitized.replaceAll("[^a-z0-9_]", "_"); + + // Ensure the name starts with a letter or underscore + if (!sanitized.matches("^[a-z_].*")) { + sanitized = "_" + sanitized; + } + + // Remove consecutive underscores + sanitized = sanitized.replaceAll("_+", "_"); + + // Trim trailing underscore + sanitized = sanitized.replaceAll("_$", ""); + + // Truncate to PostgreSQL's limit of 63 bytes + if (sanitized.length() > 63) { + sanitized = sanitized.substring(0, 63); + // If we happened to end with an underscore after truncating, remove it + sanitized = sanitized.replaceAll("_$", ""); + } + + return sanitized; + } +} diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/ConductorEnvironmentWatcher.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/ConductorEnvironmentWatcher.java index 59339fa..f6e11ba 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/ConductorEnvironmentWatcher.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/ConductorEnvironmentWatcher.java @@ -22,11 +22,12 @@ import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnector; import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresConnectorConfig.AutoCreateMode; import io.debezium.embedded.Connect; import io.debezium.embedded.EmbeddedEngineConfig; import io.debezium.engine.DebeziumEngine; +import io.debezium.platform.config.OffsetConfigGroup; import io.debezium.platform.environment.watcher.config.WatcherConfig; -import io.debezium.platform.environment.watcher.config.WatcherConfigGroup; import io.debezium.platform.environment.watcher.consumers.OutboxParentEventConsumer; import io.debezium.transforms.outbox.EventRouter; import io.quarkus.runtime.ShutdownEvent; @@ -77,6 +78,7 @@ public void start() { .with(PostgresConnectorConfig.PLUGIN_NAME, PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.getValue()) .with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.%s".formatted(outbox.table())) + .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, AutoCreateMode.FILTERED) .with("transforms", "outbox") .with("transforms.outbox.type", EventRouter.class.getName()) .with("transforms.outbox.table.fields.additional.placement", extraFields); @@ -95,7 +97,7 @@ public void start() { executor.execute(engine); } - private Map offsetConfigurations(WatcherConfigGroup.OffsetConfigGroup offset) { + private Map offsetConfigurations(OffsetConfigGroup offset) { Map config = new HashMap<>(); diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/config/WatcherConfigGroup.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/config/WatcherConfigGroup.java index a587ab7..5cb0661 100644 --- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/config/WatcherConfigGroup.java +++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/watcher/config/WatcherConfigGroup.java @@ -5,12 +5,15 @@ */ package io.debezium.platform.environment.watcher.config; -import java.util.Map; import java.util.Optional; +import io.debezium.platform.config.OffsetConfigGroup; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; import io.smallrye.config.ConfigMapping; @ConfigMapping(prefix = "conductor.watcher") +@ConfigRoot(prefix = "conductor", phase = ConfigPhase.RUN_TIME) public interface WatcherConfigGroup { boolean enabled(); @@ -19,15 +22,4 @@ public interface WatcherConfigGroup { OffsetConfigGroup offset(); - interface OffsetConfigGroup { - OffsetStorageConfigGroup storage(); - - Map config(); - } - - interface OffsetStorageConfigGroup { - String type(); - - Map config(); - } } diff --git a/debezium-platform-conductor/src/main/resources/application.yml b/debezium-platform-conductor/src/main/resources/application.yml index 7e102f4..7a6ec89 100644 --- a/debezium-platform-conductor/src/main/resources/application.yml +++ b/debezium-platform-conductor/src/main/resources/application.yml @@ -11,6 +11,31 @@ conductor: flush: interval: ms: 300 +pipeline: + offset: + storage: + # In the future when we will have other environment implementations the default should be re-thought. + type: io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore + config: + jdbc: + url: ${OFFSET_JDBC_URL:${quarkus.datasource.jdbc.url}} + user: ${OFFSET_JDBC_USERNAME:${quarkus.datasource.username}} + password: ${OFFSET_JDBC_PASSWORD:${quarkus.datasource.password}} + offset: + table: + name: "@{pipeline_name}_offset" + schema: + history: + internal: io.debezium.storage.jdbc.history.JdbcSchemaHistory + config: + jdbc: + url: ${SCHEMA_HISTORY_JDBC_URL:${quarkus.datasource.jdbc.url}} + user: ${SCHEMA_HISTORY_JDBC_USERNAME:${quarkus.datasource.username}} + password: ${SCHEMA_HISTORY_JDBC_PASSWORD:${quarkus.datasource.password}} + schema: + history: + table: + name: "@{pipeline_name}_schema_history" quarkus: http: @@ -49,6 +74,31 @@ quarkus: flush: interval: ms: 300 + pipeline: + offset: + storage: + # In the future when we will have other environment implementations the default should be re-thought. + type: io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore + config: + jdbc: + url: jdbc:postgresql://postgresql:5432/debezium?loggerLevel=OFF + user: debezium + password: debezium + offset: + table: + name: "@{pipeline_name}_offset" + schema: + history: + internal: io.debezium.storage.jdbc.history.JdbcSchemaHistory + config: + jdbc: + url: jdbc:postgresql://postgresql:5432/debezium?loggerLevel=OFF + user: debezium + password: debezium + schema: + history: + table: + name: "@{pipeline_name}_schema_history" quarkus: debezium-outbox: remove-after-insert: false diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/operator/configuration/TableNameResolverTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/operator/configuration/TableNameResolverTest.java new file mode 100644 index 0000000..7d36c95 --- /dev/null +++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/operator/configuration/TableNameResolverTest.java @@ -0,0 +1,152 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.platform.environment.operator.configuration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.debezium.platform.domain.views.flat.PipelineFlat; + +@ExtendWith(MockitoExtension.class) +class TableNameResolverTest { + + @InjectMocks + private TableNameResolver tableNameResolver; + + @Mock + private PipelineFlat pipelineFlat; + + @Test + public void testResolve_WithNullCurrentValue_ShouldReturnNull() { + + // When + String result = tableNameResolver.resolve(pipelineFlat, null); + + // Then + assertThat(result).isNull(); + } + + @Test + public void testResolve_WithEmptyCurrentValue_ShouldReturnEmptyString() { + // Given + String currentValue = ""; + + // When + String result = tableNameResolver.resolve(pipelineFlat, currentValue); + + // Then + assertThat(result).isEmpty(); + } + + @Test + public void testResolve_ShouldReplacePlaceholderWithPipelineName() { + // Given + String currentValue = "@{pipeline_name}"; + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, currentValue); + + // Then + assertThat(result).isEqualTo("testpipeline"); + } + + @Test + public void testResolve_ShouldSanitizeTableName() { + // Given + String currentValue = "@{pipeline_name}#$%!"; + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, currentValue); + + // Then + assertThat(result).isEqualTo("testpipeline"); + } + + @Test + public void testSanitizeTableName_ShouldSanitizeCorrectly() { + // Given + String currentValue = "invalid!@name$"; + String expected = "invalid_name"; + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, currentValue); + + // Then + assertThat(result).isEqualTo(expected); + } + + @Test + public void testSanitizeTableName_ShouldTruncateTo63Bytes() { + // Given + String longName = "a".repeat(70); // 70 characters long string + String expected = "a".repeat(63); // 63 characters should be the max length + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, longName); + + // Then + assertThat(result).isEqualTo(expected); + } + + @Test + public void testSanitizeTableName_ShouldPrefixWithUnderscoreIfStartsWithNonAlpha() { + // Given + String invalidName = "123abc"; + String expected = "_123abc"; + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, invalidName); + + // Then + assertThat(result).isEqualTo(expected); + } + + @Test + public void testSanitizeTableName_ShouldRemoveConsecutiveUnderscores() { + // Given + String nameWithConsecutiveUnderscores = "abc___def"; + String expected = "abc_def"; + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, nameWithConsecutiveUnderscores); + + // Then + assertThat(result).isEqualTo(expected); + } + + @Test + public void testSanitizeTableName_ShouldRemoveTrailingUnderscore() { + // Given + String nameWithTrailingUnderscore = "abc_def_"; + String expected = "abc_def"; + String pipelineName = "TestPipeline"; + when(pipelineFlat.getName()).thenReturn(pipelineName); + + // When + String result = tableNameResolver.resolve(pipelineFlat, nameWithTrailingUnderscore); + + // Then + assertThat(result).isEqualTo(expected); + } +} diff --git a/helm/Chart.yaml b/helm/Chart.yaml index 4ced242..2b6d663 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -10,7 +10,7 @@ sources: home: https://github.com/debezium/debezium-platform-conductor dependencies: - name: debezium-operator - version: "3.0.4-final" + version: "3.0.7-final" repository: "https://charts.debezium.io" - name: database version: 0.0.1 diff --git a/helm/README.md b/helm/README.md index 7862d20..ea802a5 100644 --- a/helm/README.md +++ b/helm/README.md @@ -2,37 +2,56 @@ This chart will install the components required to run the Debezium Platform. 1. Conductor: The back-end component which provides a set of APIs to orchestrate and control Debezium deployments. 2. Stage: The front-end component which provides a user interface to interact with the Conductor. -3. Debezium operator: operator that manages the creation of Debezium Server custom resource. -4. [Optional] PostgreSQL database used by conductor to store its data. -5. [Optional] Strimzi operator: operator for creating Kakfa cluster. In case you want to use a Kafka destination in you pipeline. +3. Debezium operator: operator that manages the creation of Debezium Server custom resource. +4. [Optional] PostgreSQL database used by conductor to store its data. +5. [Optional] Strimzi operator: operator for creating Kakfa cluster. In case you want to use a Kafka destination in you + pipeline. # Prerequisites + The chart use an ingress to expose `debezium-stage (UI)` and `debezium-conductor (backend)`, -this will require to have an [ingress controller](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/) installed in you cluster. -You need also to have domain that must point to the cluster IP and then configure the `domain.url` property in you `values.yaml` with your domain. +this will require to have +an [ingress controller](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/) installed in you +cluster. +You need also to have domain that must point to the cluster IP and then configure the `domain.url` property in +you `values.yaml` with your domain. ### Configurations -| Name | Description | Value | -|------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------| -| domain.url | domain used as ingress host | "" | -| stage.image | Image for the stage (UI) | quay.io/debezium/platform-stage:latest | -| conductor.image | Image for the conductor | quay.io/debezium/platform-conductor:nightly | -| conductor.offset.existingConfigMap | Name of the config map used to store conductor offsets. If empty it will be automatically created. | "" | -| database.enabled | Enable the installation of PostgreSQL by the chart | false | -| database.name | Database name | postgres | -| database.host | Database host | | -| database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If empty a secret will be created using the `username` and `password` properties | "" | -| database.auth.username | Database username | user | -| database.auth.password | Database password | password | -| env | List of env variable to pass to the conductor | [] | - +| Name | Description | Default | +|:-------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------| +| domain.url | domain used as ingress host | "" | +| stage.image | Image for the stage (UI) | quay.io/debezium/platform-stage:latest | +| conductor.image | Image for the conductor | quay.io/debezium/platform-conductor:nightly | +| conductor.offset.existingConfigMap | Name of the config map used to store conductor offsets. If empty it will be automatically created. | "" | +| database.enabled | Enable the installation of PostgreSQL by the chart | false | +| database.name | Database name | postgres | +| database.host | Database host | postgres | +| database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If empty a secret will be created using the `username` and `password` properties | "" | +| database.auth.username | Database username | user | +| database.auth.password | Database password | password | +| offset.reusePlatformDatabase | Pipelines will use database to store offsets. By default the database used by the conductor service is used.
If you want to use a dedicated one set this property to false | true | +| offset.database.name | Database name | postgres | +| offset.database.host | Database host | postgres | +| offset.database.port | Database port | 5432 | | | +| offset.database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If not set `offset.database.auth.username` and `offset.database.auth.password` will be used. | "" | +| offset.database.auth.username | Database username | user | +| offset.database.auth.password | Database password | password | | | +| schemaHistory.reusePlatformDatabase | Pipelines will use database to store schema history. By default the database used by the conductor service is used.
If you want to use a dedicated one set this property to false | true | +| schemaHistory.database.name | Database name | postgres | +| schemaHistory.database.host | Database host | postgres | +| schemaHistory.database.port | Database port | 5432 | | | +| schemaHistory.database.auth.existingSecret | Name of the secret to where `username` and `password` are stored. If not set `schemaHistory.database.auth.username` and `schemaHistory.database.auth.password` will be used. | "" | +| schemaHistory.database.auth.username | Database username | user | +| schemaHistory.database.auth.password | Database password | password | | | | +| env | List of env variable to pass to the conductor | [] | # Install ```shell helm dependency build ``` + Thi will download the required [Debezium Operator](https://github.com/debezium/debezium-operator) chart. ```shell diff --git a/helm/templates/_helpers.tpl b/helm/templates/_helpers.tpl index b2d8804..ad8b48f 100644 --- a/helm/templates/_helpers.tpl +++ b/helm/templates/_helpers.tpl @@ -20,6 +20,62 @@ Get the offset config map name. {{- end -}} {{- end -}} +{{/* +Generates offset envs. +*/}} +{{- define "debezium-platform.offsetConfig" -}} +{{- if not .Values.offset.reusePlatformDatabase -}} +- name: OFFSET_JDBC_URL + value: jdbc:postgresql://{{ .Values.offset.database.host }}:{{ .Values.offset.database.port }}/{{ .Values.offset.database.name }} +- name: OFFSET_JDBC_USERNAME +{{- if .Values.offset.database.auth.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.offset.database.auth.existingSecret }} + key: username +{{- else }} + value: {{ .Values.offset.database.username }} +{{- end }} +- name: OFFSET_JDBC_PASSWORD +{{- if .Values.offset.database.auth.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.offset.database.auth.existingSecret }} + key: password +{{- else }} + value: {{ .Values.offset.database.password }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Generates schema history envs. +*/}} +{{- define "debezium-platform.schemaHistoryConfig" -}} +{{- if not .Values.schemaHistory.reusePlatformDatabase -}} +- name: SCHEMA_HISTORY_JDBC_URL + value: jdbc:postgresql://{{ .Values.schemaHistory.database.host }}:{{ .Values.schemaHistory.database.port }}/{{ .Values.schemaHistory.database.name }} +- name: SCHEMA_HISTORY_JDBC_USERNAME +{{- if .Values.schemaHistory.database.auth.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.schemaHistory.database.auth.existingSecret }} + key: username +{{- else }} + value: {{ .Values.schemaHistory.database.username }} +{{- end }} +- name: SCHEMA_HISTORY_JDBC_PASSWORD +{{- if .Values.schemaHistory.database.auth.existingSecret }} + valueFrom: + secretKeyRef: + name: {{ .Values.schemaHistory.database.auth.existingSecret }} + key: password +{{- else }} + value: {{ .Values.schemaHistory.database.password }} +{{- end }} +{{- end }} +{{- end }} + {{/* Common labels */}} diff --git a/helm/templates/conductor-deployment.yaml b/helm/templates/conductor-deployment.yaml index 4388e2b..7395cfe 100644 --- a/helm/templates/conductor-deployment.yaml +++ b/helm/templates/conductor-deployment.yaml @@ -44,6 +44,8 @@ spec: value: jdbc:postgresql://{{ .Values.database.host }}:5432/{{ .Values.database.name }} - name: QUARKUS_KUBERNETES_CLIENT_NAMESPACE value: {{ .Release.Namespace }} +{{ include "debezium-platform.offsetConfig" . | indent 12 }} +{{ include "debezium-platform.schemaHistoryConfig" . | indent 12 }} {{- range .Values.env }} - name: {{ .name }} value: {{ .value }} diff --git a/helm/values.schema.json b/helm/values.schema.json index 7e9eafb..a685bd2 100644 --- a/helm/values.schema.json +++ b/helm/values.schema.json @@ -50,9 +50,73 @@ "env": { "type": "array" }, - "kafka": { + "offset": { "properties": { - "enabled": { + "database": { + "properties": { + "auth": { + "properties": { + "existingSecret": { + "type": "string" + } + }, + "type": "object" + }, + "host": { + "type": "string" + }, + "name": { + "type": "string" + }, + "password": { + "type": "string" + }, + "port": { + "type": "integer" + }, + "username": { + "type": "string" + } + }, + "type": "object" + }, + "reusePlatformDatabase": { + "type": "boolean" + } + }, + "type": "object" + }, + "schemaHistory": { + "properties": { + "database": { + "properties": { + "auth": { + "properties": { + "existingSecret": { + "type": "string" + } + }, + "type": "object" + }, + "host": { + "type": "string" + }, + "name": { + "type": "string" + }, + "password": { + "type": "string" + }, + "port": { + "type": "integer" + }, + "username": { + "type": "string" + } + }, + "type": "object" + }, + "reusePlatformDatabase": { "type": "boolean" } }, diff --git a/helm/values.yaml b/helm/values.yaml index 7102dd6..705632f 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -6,6 +6,26 @@ conductor: image: quay.io/debezium/platform-conductor:nightly offset: existingConfigMap: "" +offset: + reusePlatformDatabase: true + database: + name: postgres + host: postgres + port: 5432 + username: user + password: password + auth: + existingSecret: "" +schemaHistory: + reusePlatformDatabase: true + database: + name: postgres + host: postgres + port: 5432 + username: user + password: password + auth: + existingSecret: "" database: enabled: false name: postgres