Skip to content

Commit

Permalink
refactor: configuration scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
piece-of-tart committed Nov 20, 2023
1 parent 8c85841 commit 6b1e9eb
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package tech.ydb.coordination.scenario.configuration;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.coordination.CoordinationClient;
import tech.ydb.coordination.CoordinationSession;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;

public class Publisher implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
private final String semaphoreName;
private final CoordinationSession session;
private CompletableFuture<Status> semaphoreFuture;
Expand All @@ -20,8 +26,9 @@ private Publisher(CoordinationSession session, String semaphoreName) {

/**
* Create new Publisher for Configuration service
* @param client - Coordination client
* @param fullPath - full path to the coordination node
*
* @param client - Coordination client
* @param fullPath - full path to the coordination node
* @param semaphoreName - name of Configuration service semaphore
* @return Completable future with Publisher
*/
Expand All @@ -40,22 +47,25 @@ public static Publisher newPublisher(CoordinationClient client, String path, Str

/**
* Change data on semaphore
*
* @param data - data which all Subscribers will see
* @return Completable future with status of change data on semaphore
*/
public synchronized CompletableFuture<Status> publishAsync(byte[] data) {
if (semaphoreFuture.isDone()) {
semaphoreFuture = session.updateSemaphore(semaphoreName, data);
} else {
semaphoreFuture = semaphoreFuture.thenCompose(status -> session.updateSemaphore(semaphoreName, data));
}
return semaphoreFuture;
return semaphoreFuture = semaphoreFuture.handleAsync((status, th) -> {
if (status != null && th == null) {
return session.updateSemaphore(semaphoreName, data);
}
logger.warn("Exception when publish data \"{}\": (status: {}, throwable: {}",
new String(data, StandardCharsets.UTF_8), status, th);
return CompletableFuture.completedFuture(Status.of(StatusCode.UNUSED_STATUS));
}).thenComposeAsync(Function.identity());
}

/**
* {@link Publisher#publishAsync(byte[])}
*/
public synchronized Status publish(byte[] data) {
public Status publish(byte[] data) {
return publishAsync(data).join();
}

Expand Down

0 comments on commit 6b1e9eb

Please sign in to comment.