diff --git a/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java b/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java index ed421f1fb..73e8db679 100644 --- a/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java +++ b/coordination/src/main/java/tech/ydb/coordination/scenario/configuration/Publisher.java @@ -1,6 +1,11 @@ package tech.ydb.coordination.scenario.configuration; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import tech.ydb.coordination.CoordinationClient; import tech.ydb.coordination.CoordinationSession; @@ -8,6 +13,7 @@ import tech.ydb.core.StatusCode; public class Publisher implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(Publisher.class); private final String semaphoreName; private final CoordinationSession session; private CompletableFuture semaphoreFuture; @@ -20,8 +26,9 @@ private Publisher(CoordinationSession session, String semaphoreName) { /** * Create new Publisher for Configuration service - * @param client - Coordination client - * @param fullPath - full path to the coordination node + * + * @param client - Coordination client + * @param fullPath - full path to the coordination node * @param semaphoreName - name of Configuration service semaphore * @return Completable future with Publisher */ @@ -40,22 +47,25 @@ public static Publisher newPublisher(CoordinationClient client, String path, Str /** * Change data on semaphore + * * @param data - data which all Subscribers will see * @return Completable future with status of change data on semaphore */ public synchronized CompletableFuture publishAsync(byte[] data) { - if (semaphoreFuture.isDone()) { - semaphoreFuture = session.updateSemaphore(semaphoreName, data); - } else { - semaphoreFuture = semaphoreFuture.thenCompose(status -> session.updateSemaphore(semaphoreName, data)); - } - return semaphoreFuture; + return semaphoreFuture = semaphoreFuture.handleAsync((status, th) -> { + if (status != null && th == null) { + return session.updateSemaphore(semaphoreName, data); + } + logger.warn("Exception when publish data \"{}\": (status: {}, throwable: {}", + new String(data, StandardCharsets.UTF_8), status, th); + return CompletableFuture.completedFuture(Status.of(StatusCode.UNUSED_STATUS)); + }).thenComposeAsync(Function.identity()); } /** * {@link Publisher#publishAsync(byte[])} */ - public synchronized Status publish(byte[] data) { + public Status publish(byte[] data) { return publishAsync(data).join(); }