Skip to content

Commit

Permalink
add: Configuration scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
piece-of-tart committed Nov 6, 2023
1 parent d9e1f4e commit 13c6594
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tech.ydb.coordination.scenario.configuration;

import java.util.concurrent.CompletableFuture;

import tech.ydb.coordination.CoordinationClient;
import tech.ydb.coordination.CoordinationSession;
import tech.ydb.core.Status;

public class Publisher {
static final String SEMAPHORE_PREFIX = "configuration-";

private final String semaphoreName;
private final CoordinationSession session;
private final CompletableFuture<Status>[] semaphoreFuture;

private Publisher(CoordinationSession session, String semaphoreName, CompletableFuture<Status>[] semaphoreFuture) {
this.session = session;
this.semaphoreName = semaphoreName;
this.semaphoreFuture = semaphoreFuture;
}

public static CompletableFuture<Publisher> newPublisher(CoordinationClient client, String path, long token) {
return client.createSession(path)
.thenApply(session -> {
final CompletableFuture<Status>[] isSemaphoreCreated = new CompletableFuture[1];
final CompletableFuture<Status> createFuture = new CompletableFuture<>();
isSemaphoreCreated[0] = createFuture;
session.createSemaphore(SEMAPHORE_PREFIX + token, 1).whenComplete(
(status, throwable) -> createFuture.complete(status));
return new Publisher(session, SEMAPHORE_PREFIX + token, isSemaphoreCreated);
});
}

public synchronized CompletableFuture<Status> publish(byte[] data) {
if (semaphoreFuture[0].isDone()) {
semaphoreFuture[0] = session.updateSemaphore(semaphoreName, data);
} else {
semaphoreFuture[0] = semaphoreFuture[0].thenCompose(status -> session.updateSemaphore(semaphoreName, data));
}
return semaphoreFuture[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package tech.ydb.coordination.scenario.configuration;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.coordination.CoordinationClient;
import tech.ydb.coordination.description.SemaphoreChangedEvent;
import tech.ydb.coordination.settings.DescribeSemaphoreMode;
import tech.ydb.coordination.settings.WatchSemaphoreMode;

public class Subscriber implements AutoCloseable {
static final String SEMAPHORE_PREFIX = "configuration-";
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
private final AtomicBoolean isWorking;

private Subscriber(AtomicBoolean isWorking) {
this.isWorking = isWorking;
}

public static CompletableFuture<Subscriber> newSubscriber(CoordinationClient client, String path,
long token, Consumer<byte[]> observer) {
return client.createSession(path)
.thenApply(session -> {
final String name = SEMAPHORE_PREFIX + token;
final AtomicBoolean isWorking = new AtomicBoolean(true);
BiConsumer<? super SemaphoreChangedEvent, ? super Throwable>[] onChanges = new BiConsumer[1];
onChanges[0] = (changes, changesTh) -> {
if (isWorking.get()) {
session.describeAndWatchSemaphore(name,
DescribeSemaphoreMode.DATA_ONLY, WatchSemaphoreMode.WATCH_DATA)
.whenComplete((result, throwable) -> {
if (throwable == null && result != null && result.isSuccess()) {
observer.accept(result.getValue().getDescription().getData());
result.getValue().getChangedFuture().whenComplete(onChanges[0]);
return;
}
isWorking.set(false);
logger.debug("Exception in describeAndWatchSemaphore request ({}, {}).",
result, throwable);
});
}
};
session.createSemaphore(name, 1)
.whenComplete(
(status, th1) -> onChanges[0].accept(new SemaphoreChangedEvent(false, false, false),
null)
);
return new Subscriber(isWorking);
});
}

@Override
public void close() {
isWorking.set(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package tech.ydb.coordination;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import tech.ydb.coordination.scenario.configuration.Publisher;
import tech.ydb.coordination.scenario.configuration.Subscriber;
import tech.ydb.coordination.settings.CoordinationNodeSettings;
import tech.ydb.coordination.settings.DropCoordinationNodeSettings;
import tech.ydb.core.Status;
import tech.ydb.test.junit4.GrpcTransportRule;

public class ConfigurationScenarioTest {
@ClassRule
public static final GrpcTransportRule YDB_TRANSPORT = new GrpcTransportRule();
private final String path = YDB_TRANSPORT.getDatabase() + "/coordination-node";
private final CoordinationClient client = CoordinationClient.newClient(YDB_TRANSPORT);

@Before
public void createNode() {
CompletableFuture<Status> result = client.createNode(
path,
CoordinationNodeSettings.newBuilder()
.build()
);

Assert.assertTrue(result.join().isSuccess());
}

@Test(timeout = 20_000)
public void configurationScenarioTest() throws BrokenBarrierException, InterruptedException {
final long token = 1_000_001;
final int n = 5;
Publisher publisher = Publisher.newPublisher(client, path, token).join();

final String[] dataNow = new String[1];
final CountDownLatch[] updateCounter = new CountDownLatch[1];

updateCounter[0] = new CountDownLatch(n);
dataNow[0] = "Nothing";
publisher.publish(dataNow[0].getBytes(StandardCharsets.UTF_8)).join();

final List<Subscriber> subscribers = Stream.generate(() ->
Subscriber.newSubscriber(client, path, token, data -> {
if (Arrays.equals(data, dataNow[0].getBytes(StandardCharsets.UTF_8))) {
updateCounter[0].countDown();
}
}).join()
).limit(n).collect(Collectors.toList());

updateCounter[0].await();
dataNow[0] = "Second message";
updateCounter[0] = new CountDownLatch(n);
publisher.publish(dataNow[0].getBytes(StandardCharsets.UTF_8)).join();

updateCounter[0].await();

subscribers.forEach(Subscriber::close);
}

@After
public void deleteNode() {
CompletableFuture<Status> result = client.dropNode(
path,
DropCoordinationNodeSettings.newBuilder()
.build()
);
Assert.assertTrue(result.join().isSuccess());
}
}

0 comments on commit 13c6594

Please sign in to comment.