diff --git a/systemtests/pom.xml b/systemtests/pom.xml
index ff4da44..9fae48a 100644
--- a/systemtests/pom.xml
+++ b/systemtests/pom.xml
@@ -13,16 +13,16 @@
6.10.0
5.10.2
1.10.2
- 2.17.2
- 1.7.36
+ 2.23.1
3.25.3
- 0.1.1
+ 0.2.1
0.0.1-alpha1
5.0.0-alpha.12
4.2.1
2.16.1
1.18.34
3.14.0
+ 1.1.1
@@ -75,23 +75,8 @@
org.apache.logging.log4j
- log4j-api
- ${log4j.version}
-
-
- org.apache.logging.log4j
- log4j-core
- ${log4j.version}
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- ${log4j.version}
-
-
- org.slf4j
- slf4j-api
- ${slf4j.version}
+ log4j-slf4j2-impl
+ ${log4j2.version}
org.projectlombok
@@ -139,6 +124,11 @@
awaitility
${awaitility.version}
+
+ com.googlecode.json-simple
+ json-simple
+ ${google.json.simple.version}
+
diff --git a/systemtests/src/main/java/io/debezium/operator/systemtests/ConfigProperties.java b/systemtests/src/main/java/io/debezium/operator/systemtests/ConfigProperties.java
index 580144f..064b811 100644
--- a/systemtests/src/main/java/io/debezium/operator/systemtests/ConfigProperties.java
+++ b/systemtests/src/main/java/io/debezium/operator/systemtests/ConfigProperties.java
@@ -9,4 +9,6 @@ public final class ConfigProperties {
public static final String BUNDLE_PATH = System.getProperty("test.bundle.path", System.getProperty("user.dir") + "/../k8/");
public static final Integer HTTP_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.http.poll.timeout", "20"));
public static final Integer HTTP_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.http.poll.interval", "200"));
+ public static final Integer FABRIC8_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.fabric8.poll.interval", "2"));
+ public static final Integer FABRIC8_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.fabric8.poll.timeout", "60"));
}
diff --git a/systemtests/src/test/java/io/debezium/operator/systemtests/OffsetStorageTest.java b/systemtests/src/test/java/io/debezium/operator/systemtests/OffsetStorageTest.java
new file mode 100644
index 0000000..1b46565
--- /dev/null
+++ b/systemtests/src/test/java/io/debezium/operator/systemtests/OffsetStorageTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.operator.systemtests;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.operator.api.model.DebeziumServer;
+import io.debezium.operator.api.model.source.Offset;
+import io.debezium.operator.api.model.source.OffsetBuilder;
+import io.debezium.operator.systemtests.resources.NamespaceHolder;
+import io.debezium.operator.systemtests.resources.dmt.DmtClient;
+import io.debezium.operator.systemtests.resources.operator.DebeziumOperatorBundleResource;
+import io.debezium.operator.systemtests.resources.server.DebeziumServerGenerator;
+import io.debezium.operator.systemtests.resources.sinks.RedisResource;
+import io.fabric8.kubernetes.client.LocalPortForward;
+import io.skodjob.testframe.resources.KubeResourceManager;
+
+public class OffsetStorageTest extends TestBase {
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ @Test
+ void testRedisOffsetStorage() {
+ String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
+ DebeziumOperatorBundleResource operatorBundleResource = new DebeziumOperatorBundleResource();
+ operatorBundleResource.configureAsDefault(namespace);
+ logger.info("Deploying Operator");
+ operatorBundleResource.deploy();
+ logger.info("Deploying Debezium Server");
+ DebeziumServer server = DebeziumServerGenerator.generateDefaultMysqlToRedis(namespace);
+
+ Offset offset = new OffsetBuilder()
+ .withNewRedis()
+ .withAddress(RedisResource.getDefaultRedisAddress())
+ .endRedis()
+ .withFlushMs(10)
+ .build();
+ server.getSpec().getSource().setOffset(offset);
+
+ KubeResourceManager.getInstance().createResourceWithWait(server);
+ assertStreamingWorks();
+
+ try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
+ String redis_offset = DmtClient.readRedisOffsets(portForwardHost, portForwardPort);
+ assertThat(redis_offset).contains("file");
+ assertThat(redis_offset).contains("pos");
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ server.getSpec().getSource().getOffset().getRedis().setKey("metadata:debezium_n:offsets");
+ KubeResourceManager.getInstance().createOrUpdateResourceWithWait(server);
+ assertStreamingWorks(10, 20);
+
+ try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
+ String redis_offset = DmtClient.readRedisOffsets(portForwardHost, portForwardPort, "metadata:debezium_n:offsets");
+ assertThat(redis_offset).contains("file");
+ assertThat(redis_offset).contains("pos");
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/systemtests/src/test/java/io/debezium/operator/systemtests/TestBase.java b/systemtests/src/test/java/io/debezium/operator/systemtests/TestBase.java
index 10a9f1b..58a976d 100644
--- a/systemtests/src/test/java/io/debezium/operator/systemtests/TestBase.java
+++ b/systemtests/src/test/java/io/debezium/operator/systemtests/TestBase.java
@@ -12,6 +12,7 @@
import java.io.IOException;
import java.time.Duration;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
@@ -34,9 +35,9 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestBase {
private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
- private final DmtResource dmtResource = new DmtResource();
- private final String portForwardHost = "127.0.0.1";
- private int portForwardPort = 8080;
+ protected final DmtResource dmtResource = new DmtResource();
+ protected final String portForwardHost = "127.0.0.1";
+ protected int portForwardPort = 8080;
@BeforeAll
void initDefault() {
@@ -70,18 +71,27 @@ void cleanUp() {
}
public void assertStreamingWorks() {
+ assertStreamingWorks(10, 10);
+ }
+
+ public void assertStreamingWorks(int messagesToDatabase, int expectedMessages) {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
try (LocalPortForward lcp = dmtResource.portForward(8080, namespace)) {
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(HTTP_POLL_TIMEOUT));
- DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, 10);
- DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(40), "inventory.inventory.operator_test");
+ DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, messagesToDatabase);
+ DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(60), "inventory.inventory.operator_test");
await().atMost(Duration.ofMinutes(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
- .until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, 10) == 10);
+ .until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, expectedMessages) == expectedMessages);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
+ @AfterAll
+ void resetNamespace() {
+ NamespaceHolder.INSTANCE.resetNamespace();
+ }
+
}
diff --git a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/NamespaceHolder.java b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/NamespaceHolder.java
index 01f3c91..bde9068 100644
--- a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/NamespaceHolder.java
+++ b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/NamespaceHolder.java
@@ -40,6 +40,10 @@ public String getCurrentNamespace() {
return currentNamespace;
}
+ public void resetNamespace() {
+ this.currentNamespace = null;
+ }
+
public DmtResource getNamespacedDmt() {
return namespacedDmt;
}
diff --git a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/dmt/DmtClient.java b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/dmt/DmtClient.java
index 6fac13c..53e2702 100644
--- a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/dmt/DmtClient.java
+++ b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/dmt/DmtClient.java
@@ -17,6 +17,10 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +51,30 @@ private static OkHttpClient defaultClient() {
.build();
}
+ public static String readRedisOffsets(String host, int port) {
+ return readRedisOffsets(host, port, "metadata:debezium:offsets");
+ }
+
+ public static String readRedisOffsets(String host, int port, String key) {
+ AtomicReference offset = new AtomicReference<>();
+ Map params = Map.of("hashKey", key);
+ await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
+ .pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
+ .until(() -> {
+ try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/readHash", params)) {
+ offset.set(response.body().string());
+ return response.isSuccessful();
+ }
+ catch (Exception e) {
+ return false;
+ }
+ });
+ return offset.get();
+ }
+
public static void resetRedis(String host, int port) {
- await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
+ await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
+ .pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/reset")) {
return response.isSuccessful();
@@ -60,7 +86,8 @@ public static void resetRedis(String host, int port) {
}
public static void resetMysql(String host, int port) {
- await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
+ await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
+ .pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Main/ResetDatabase")) {
return response.isSuccessful();
@@ -92,17 +119,24 @@ public static void waitForFilledRedis(String host, int port, Duration atMost, St
}
public static int digStreamedData(String host, int port, int number) {
- String jsonRespo = readRedisChannel(host, port, "inventory.inventory.operator_test", number);
+ final String CHANNEL = "inventory.inventory.operator_test";
+ JSONParser parser = new JSONParser();
+ String jsonRespo = readRedisChannel(host, port, CHANNEL, number);
+
if (Objects.isNull(jsonRespo)) {
return 0;
}
- int count = 0;
- for (int i = 0; i < number; i++) {
- if (jsonRespo.contains("name" + i)) {
- count++;
- }
+
+ try {
+ JSONArray response = (JSONArray) parser.parse(jsonRespo);
+ JSONObject topic = (JSONObject) response.get(0);
+ JSONArray responses = (JSONArray) topic.get(CHANNEL);
+ return responses.size();
+ }
+ catch (ParseException e) {
+ LOGGER.error("Cannot parse JSON response from DMT: {}", e.getMessage());
+ return 0;
}
- return count;
}
public static String readRedisChannel(String host, int port, String channel, int limit) {
@@ -242,4 +276,24 @@ public static Response sendGetRequest(String host, int port, String command) thr
Call call = client.newCall(request);
return call.execute();
}
+
+ public static Response sendGetRequest(String host, int port, String command, Map params) throws IOException {
+ OkHttpClient client = defaultClient();
+
+ HttpUrl.Builder builder = Objects.requireNonNull(HttpUrl.parse("http://" + host + ":" + port + command))
+ .newBuilder();
+
+ if (!Objects.isNull(params)) {
+ for (Map.Entry entry : params.entrySet()) {
+ builder = builder.addQueryParameter(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Request request = new Request.Builder()
+ .url(builder.build())
+ .build();
+
+ Call call = client.newCall(request);
+ return call.execute();
+ }
}
diff --git a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/server/DebeziumServerResource.java b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/server/DebeziumServerResource.java
index ced03f9..e87f278 100644
--- a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/server/DebeziumServerResource.java
+++ b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/server/DebeziumServerResource.java
@@ -5,22 +5,29 @@
*/
package io.debezium.operator.systemtests.resources.server;
+import static io.debezium.operator.systemtests.ConfigProperties.FABRIC8_POLL_INTERVAL;
+import static io.debezium.operator.systemtests.ConfigProperties.FABRIC8_POLL_TIMEOUT;
+import static org.awaitility.Awaitility.await;
+
import java.io.InputStream;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.debezium.operator.api.model.DebeziumServer;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperationsImpl;
import io.skodjob.testframe.interfaces.ResourceType;
-import io.skodjob.testframe.resources.DeploymentType;
import io.skodjob.testframe.resources.KubeResourceManager;
public class DebeziumServerResource implements ResourceType {
private final MixedOperation> client;
+ private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
public DebeziumServerResource() {
this.client = KubeResourceManager.getKubeClient().getClient().resources(DebeziumServer.class, DebeziumServerList.class);
@@ -65,11 +72,22 @@ public void replace(String s, Consumer editor) {
@Override
public boolean waitForReadiness(DebeziumServer debeziumServer) {
- new DeploymentType().getClient()
- .inNamespace(debeziumServer.getMetadata().getNamespace())
- .withName(debeziumServer.getMetadata().getName()).waitUntilReady(1, TimeUnit.MINUTES);
-
- return client.resource(debeziumServer).isReady();
+ await().atMost(Duration.ofSeconds(FABRIC8_POLL_TIMEOUT)).pollInterval(Duration.ofSeconds(FABRIC8_POLL_INTERVAL))
+ .until(() -> {
+ DebeziumServer dbzServer = client.inNamespace(debeziumServer.getMetadata().getNamespace())
+ .withName(debeziumServer.getMetadata().getName()).get();
+
+ boolean ready = dbzServer.getStatus().getConditions().stream()
+ .anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True"));
+ if (ready) {
+ return true;
+ }
+ else {
+ logger.info("Waiting for readiness of Debezium Server...");
+ return false;
+ }
+ });
+ return true;
}
@Override
diff --git a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/sinks/RedisResource.java b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/sinks/RedisResource.java
index 7a089dc..d4f2821 100644
--- a/systemtests/src/test/java/io/debezium/operator/systemtests/resources/sinks/RedisResource.java
+++ b/systemtests/src/test/java/io/debezium/operator/systemtests/resources/sinks/RedisResource.java
@@ -50,6 +50,10 @@ public void configureAsDefault(String namespace) {
}
}
+ public static String getDefaultRedisAddress() {
+ return "redis-service:6379";
+ }
+
@Override
public void deploy() {
KubeResourceManager.getInstance().createResourceWithoutWait(configMap, service, persistentVolumeClaim);