diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java b/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java new file mode 100644 index 000000000..e2c0a9c7f --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java @@ -0,0 +1,42 @@ +package tech.ydb.coordination.scenario.configuration; + +import java.util.concurrent.CompletableFuture; + +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.core.Status; + +public class Publisher { + static final String SEMAPHORE_PREFIX = "configuration-"; + + private final String semaphoreName; + private final CoordinationSession session; + private final CompletableFuture[] semaphoreFuture; + + private Publisher(CoordinationSession session, String semaphoreName, CompletableFuture[] semaphoreFuture) { + this.session = session; + this.semaphoreName = semaphoreName; + this.semaphoreFuture = semaphoreFuture; + } + + public static CompletableFuture newPublisher(CoordinationClient client, String path, long token) { + return client.createSession(path) + .thenApply(session -> { + final CompletableFuture[] isSemaphoreCreated = new CompletableFuture[1]; + final CompletableFuture createFuture = new CompletableFuture<>(); + isSemaphoreCreated[0] = createFuture; + session.createSemaphore(SEMAPHORE_PREFIX + token, 1).whenComplete( + (status, throwable) -> createFuture.complete(status)); + return new Publisher(session, SEMAPHORE_PREFIX + token, isSemaphoreCreated); + }); + } + + public synchronized CompletableFuture publish(byte[] data) { + if (semaphoreFuture[0].isDone()) { + semaphoreFuture[0] = session.updateSemaphore(semaphoreName, data); + } else { + semaphoreFuture[0] = semaphoreFuture[0].thenCompose(status -> session.updateSemaphore(semaphoreName, data)); + } + return semaphoreFuture[0]; + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Subscriber.java b/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Subscriber.java new file mode 100644 index 000000000..84c3f7edb --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Subscriber.java @@ -0,0 +1,61 @@ +package tech.ydb.coordination.scenario.configuration; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.description.SemaphoreChangedEvent; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.coordination.settings.WatchSemaphoreMode; + +public class Subscriber implements AutoCloseable { + static final String SEMAPHORE_PREFIX = "configuration-"; + private static final Logger logger = LoggerFactory.getLogger(Subscriber.class); + private final AtomicBoolean isWorking; + + private Subscriber(AtomicBoolean isWorking) { + this.isWorking = isWorking; + } + + public static CompletableFuture newSubscriber(CoordinationClient client, String path, + long token, Consumer observer) { + return client.createSession(path) + .thenApply(session -> { + final String name = SEMAPHORE_PREFIX + token; + final AtomicBoolean isWorking = new AtomicBoolean(true); + BiConsumer[] onChanges = new BiConsumer[1]; + onChanges[0] = (changes, changesTh) -> { + if (isWorking.get()) { + session.describeAndWatchSemaphore(name, + DescribeSemaphoreMode.DATA_ONLY, WatchSemaphoreMode.WATCH_DATA) + .whenComplete((result, throwable) -> { + if (throwable == null && result != null && result.isSuccess()) { + observer.accept(result.getValue().getDescription().getData()); + result.getValue().getChangedFuture().whenComplete(onChanges[0]); + return; + } + isWorking.set(false); + logger.debug("Exception in describeAndWatchSemaphore request ({}, {}).", + result, throwable); + }); + } + }; + session.createSemaphore(name, 1) + .whenComplete( + (status, th1) -> onChanges[0].accept(new SemaphoreChangedEvent(false, false, false), + null) + ); + return new Subscriber(isWorking); + }); + } + + @Override + public void close() { + isWorking.set(false); + } +} diff --git a/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java b/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java new file mode 100644 index 000000000..b9eee3f33 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/ConfigurationScenarioTest.java @@ -0,0 +1,82 @@ +package tech.ydb.coordination; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.coordination.scenario.configuration.Publisher; +import tech.ydb.coordination.scenario.configuration.Subscriber; +import tech.ydb.coordination.settings.CoordinationNodeSettings; +import tech.ydb.coordination.settings.DropCoordinationNodeSettings; +import tech.ydb.core.Status; +import tech.ydb.test.junit4.GrpcTransportRule; + +public class ConfigurationScenarioTest { + @ClassRule + public static final GrpcTransportRule YDB_TRANSPORT = new GrpcTransportRule(); + private final String path = YDB_TRANSPORT.getDatabase() + "/coordination-node"; + private final CoordinationClient client = CoordinationClient.newClient(YDB_TRANSPORT); + + @Before + public void createNode() { + CompletableFuture result = client.createNode( + path, + CoordinationNodeSettings.newBuilder() + .build() + ); + + Assert.assertTrue(result.join().isSuccess()); + } + + @Test(timeout = 20_000) + public void configurationScenarioTest() throws BrokenBarrierException, InterruptedException { + final long token = 1_000_001; + final int n = 5; + Publisher publisher = Publisher.newPublisher(client, path, token).join(); + + final String[] dataNow = new String[1]; + final CountDownLatch[] updateCounter = new CountDownLatch[1]; + + updateCounter[0] = new CountDownLatch(n); + dataNow[0] = "Nothing"; + publisher.publish(dataNow[0].getBytes(StandardCharsets.UTF_8)).join(); + + final List subscribers = Stream.generate(() -> + Subscriber.newSubscriber(client, path, token, data -> { + if (Arrays.equals(data, dataNow[0].getBytes(StandardCharsets.UTF_8))) { + updateCounter[0].countDown(); + } + }).join() + ).limit(n).collect(Collectors.toList()); + + updateCounter[0].await(); + dataNow[0] = "Second message"; + updateCounter[0] = new CountDownLatch(n); + publisher.publish(dataNow[0].getBytes(StandardCharsets.UTF_8)).join(); + + updateCounter[0].await(); + + subscribers.forEach(Subscriber::close); + } + + @After + public void deleteNode() { + CompletableFuture result = client.dropNode( + path, + DropCoordinationNodeSettings.newBuilder() + .build() + ); + Assert.assertTrue(result.join().isSuccess()); + } +}