From 1d08b211c07d1fea34f3e773b146a917c939ebfc Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 17 Oct 2023 18:13:28 +0100 Subject: [PATCH 01/18] New snapshot version v2.1.8 --- bom/pom.xml | 2 +- coordination/pom.xml | 2 +- core/pom.xml | 2 +- pom.xml | 2 +- scheme/pom.xml | 2 +- table/pom.xml | 2 +- tests/common/pom.xml | 2 +- tests/junit4-support/pom.xml | 2 +- tests/junit5-support/pom.xml | 2 +- topic/pom.xml | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bom/pom.xml b/bom/pom.xml index b2cbf9ec4..c6dbe658e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -6,7 +6,7 @@ 4.0.0 tech.ydb - 2.1.7 + 2.1.8-SNAPSHOT ydb-sdk-bom Java SDK Bill of Materials Java SDK Bill of Materials (BOM) diff --git a/coordination/pom.xml b/coordination/pom.xml index 0597b44ca..491545d84 100644 --- a/coordination/pom.xml +++ b/coordination/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ydb-sdk-coordination diff --git a/core/pom.xml b/core/pom.xml index dcc2f3a02..da4d724a3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ydb-sdk-core diff --git a/pom.xml b/pom.xml index 9653e9adf..3e51130ef 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT Java SDK for YDB Java SDK for YDB diff --git a/scheme/pom.xml b/scheme/pom.xml index 7f4a8ba85..56a39de6c 100644 --- a/scheme/pom.xml +++ b/scheme/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ydb-sdk-scheme diff --git a/table/pom.xml b/table/pom.xml index e1372bff5..0067944fe 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ydb-sdk-table diff --git a/tests/common/pom.xml b/tests/common/pom.xml index 12f4d3500..9793d28fe 100644 --- a/tests/common/pom.xml +++ b/tests/common/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ../../pom.xml diff --git a/tests/junit4-support/pom.xml b/tests/junit4-support/pom.xml index 1d12fbc3f..4e946ff95 100644 --- a/tests/junit4-support/pom.xml +++ b/tests/junit4-support/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ../../pom.xml diff --git a/tests/junit5-support/pom.xml b/tests/junit5-support/pom.xml index e4a3d503f..db2000a5e 100644 --- a/tests/junit5-support/pom.xml +++ b/tests/junit5-support/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ../../pom.xml diff --git a/topic/pom.xml b/topic/pom.xml index 7578a8ff0..f68c4d020 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.7 + 2.1.8-SNAPSHOT ydb-sdk-topic From 167bb8643fcd43fb4cfffbc5d57e9f214743aafc Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Fri, 13 Oct 2023 14:19:54 +0100 Subject: [PATCH 02/18] Added lost serialVersionUID --- .../main/java/tech/ydb/topic/read/DecompressionException.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java b/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java index 9c50028cf..b49edbee4 100644 --- a/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java +++ b/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java @@ -7,6 +7,8 @@ * @author Nikolay Perfilov */ public class DecompressionException extends UncheckedIOException { + private static final long serialVersionUID = 2720187645859527813L; + private final byte[] rawData; public DecompressionException(String message, IOException cause, byte[] rawData) { super(message, cause); From e03949e9bbc834dbb498ff5489a5c735b17e8da0 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Fri, 13 Oct 2023 14:20:34 +0100 Subject: [PATCH 03/18] Fixed typo --- table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java index 3849bc934..7a7421361 100644 --- a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java +++ b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java @@ -348,7 +348,7 @@ public String toString() { + ", idleCount=" + getIdleCount() + ", acquiredCount=" + getAcquiredCount() + ", pendingAcquireCount=" + getPendingAcquireCount() - + ", acquiredTotal=" + getPendingAcquireCount() + + ", acquiredTotal=" + getAcquiredTotal() + ", releasedTotal=" + getReleasedTotal() + ", requestsTotal=" + getRequestedTotal() + ", createdTotal=" + getCreatedTotal() From b00403e327f92fd599e59cd9b51af7a8645df8f5 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 12 Oct 2023 14:32:45 +0300 Subject: [PATCH 04/18] Improve error handling and logging --- .../ydb/topic/write/impl/EnqueuedMessage.java | 11 ++- .../tech/ydb/topic/write/impl/WriterImpl.java | 93 ++++++++++--------- 2 files changed, 61 insertions(+), 43 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java index dd868e931..30e9712ea 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java @@ -9,7 +9,8 @@ public class EnqueuedMessage { private final Message message; private final CompletableFuture future = new CompletableFuture<>(); - private final AtomicBoolean isCompressed = new AtomicBoolean(false); + private final AtomicBoolean isCompressed = new AtomicBoolean(); + private final AtomicBoolean isProcessingFailed = new AtomicBoolean(); private final long uncompressedSizeBytes; private long compressedSizeBytes; private Long seqNo; @@ -35,6 +36,14 @@ public void setCompressed(boolean compressed) { this.isCompressed.set(compressed); } + public boolean isProcessingFailed() { + return isProcessingFailed.get(); + } + + public void setProcessingFailed(boolean procesingFailed) { + isProcessingFailed.set(procesingFailed); + } + public long getUncompressedSizeBytes() { return uncompressedSizeBytes; } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 1e1c14227..04c6665d6 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -53,7 +53,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier { private final long maxSendBufferMemorySize; // Every writing stream has a sequential number (for debug purposes) - private final AtomicLong seqNumberCounter = new AtomicLong(0); + private final AtomicLong sessionSeqNumberCounter = new AtomicLong(0); private Boolean isSeqNoProvided = null; private int currentInFlightCount = 0; @@ -100,22 +100,22 @@ public CompletableFuture tryToEnqueue(EnqueuedMessage message, boolean ins synchronized (incomingQueue) { if (currentInFlightCount >= settings.getMaxSendBufferMessagesCount()) { if (instant) { - logger.trace("[{}] Rejecting a message due to reaching message queue in-flight limit", id); + logger.info("[{}] Rejecting a message due to reaching message queue in-flight limit", id); CompletableFuture result = new CompletableFuture<>(); result.completeExceptionally(new QueueOverflowException("Message queue in-flight limit reached")); return result; } else { - logger.debug("[{}] Message queue in-flight limit reached. Putting the message into incoming " + + logger.info("[{}] Message queue in-flight limit reached. Putting the message into incoming " + "waiting queue", id); } } else if (availableSizeBytes <= message.getMessage().getData().length) { if (instant) { - logger.trace("[{}] Rejecting a message due to reaching message queue size limit", id); + logger.info("[{}] Rejecting a message due to reaching message queue size limit", id); CompletableFuture result = new CompletableFuture<>(); result.completeExceptionally(new QueueOverflowException("Message queue size limit reached")); return result; } else { - logger.debug("[{}] Message queue size limit reached. Putting the message into incoming waiting " + + logger.info("[{}] Message queue size limit reached. Putting the message into incoming waiting " + "queue", id); } } else if (incomingQueue.isEmpty()) { @@ -143,46 +143,15 @@ private void acceptMessageIntoSendingQueue(EnqueuedMessage message) { } this.encodingMessages.add(message); - CompletableFuture.runAsync(() -> encode(message), compressionExecutor) - .thenRunAsync(() -> { - boolean haveNewMessagesToSend = false; - // Working with encodingMessages under synchronized incomingQueue to prevent deadlocks - // while working with free method - synchronized (incomingQueue) { - // Taking all encoded messages to sending queue - while (true) { - EnqueuedMessage encodedMessage = encodingMessages.peek(); - if (encodedMessage != null - && (encodedMessage.isCompressed() || settings.getCodec() == Codec.RAW)) { - encodingMessages.remove(); - if (encodedMessage.isCompressed()) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] Message compressed from {} to {} bytes", id, - encodedMessage.getUncompressedSizeBytes(), - encodedMessage.getCompressedSizeBytes()); - } - // message was actually encoded. Need to free some bytes - long bytesFreed = encodedMessage.getUncompressedSizeBytes() - - encodedMessage.getCompressedSizeBytes(); - // bytesFreed can be less than 0 - free(0, bytesFreed); - } - logger.debug("[{}] Adding message to sending queue", id); - sendingQueue.add(encodedMessage); - haveNewMessagesToSend = true; - } else { - break; - } - } - } - if (haveNewMessagesToSend) { - session.sendDataRequestIfNeeded(); - } - }) + CompletableFuture + .runAsync(() -> encode(message), compressionExecutor) + .thenRunAsync(this::moveEncodedMessagesToSendingQueue) .exceptionally((throwable) -> { logger.error("[{}] Exception while encoding message: ", id, throwable); free(1, message.getSizeBytes()); message.getFuture().completeExceptionally(throwable); + message.setProcessingFailed(true); + moveEncodedMessagesToSendingQueue(); return null; }); } @@ -198,6 +167,46 @@ private void encode(EnqueuedMessage message) { logger.trace("[{}] Successfully finished encoding message", id); } + private void moveEncodedMessagesToSendingQueue() { + boolean haveNewMessagesToSend = false; + // Working with encodingMessages under synchronized incomingQueue to prevent deadlocks + // while working with free method + synchronized (incomingQueue) { + // Taking all encoded messages to sending queue + while (true) { + EnqueuedMessage encodedMessage = encodingMessages.peek(); + if (encodedMessage == null) { + break; + } + if (encodedMessage.isProcessingFailed()) { + encodingMessages.remove(); + } else if (encodedMessage.isCompressed() || settings.getCodec() == Codec.RAW) { + encodingMessages.remove(); + if (encodedMessage.isCompressed()) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] Message compressed from {} to {} bytes", id, + encodedMessage.getUncompressedSizeBytes(), + encodedMessage.getCompressedSizeBytes()); + } + // message was actually encoded. Need to free some bytes + long bytesFreed = encodedMessage.getUncompressedSizeBytes() + - encodedMessage.getCompressedSizeBytes(); + // bytesFreed can be less than 0 + free(0, bytesFreed); + } + logger.debug("[{}] Adding message to sending queue", id); + sendingQueue.add(encodedMessage); + haveNewMessagesToSend = true; + } else { + break; + } + } + } + if (haveNewMessagesToSend) { + session.sendDataRequestIfNeeded(); + } + } + protected CompletableFuture initImpl() { logger.info("[{}] initImpl called", id); if (initResultFutureRef.compareAndSet(null, new CompletableFuture<>())) { @@ -300,7 +309,7 @@ private class WriteSessionImpl extends WriteSession { private WriteSessionImpl() { super(topicRpc); - this.fullId = id + '.' + seqNumberCounter.incrementAndGet(); + this.fullId = id + '.' + sessionSeqNumberCounter.incrementAndGet(); this.messageSender = new MessageSender(settings); } From e1085f4469fa64a48f6058e52f1731b0ad4e19a2 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 12 Oct 2023 14:45:25 +0300 Subject: [PATCH 05/18] Improve logging --- .../tech/ydb/topic/write/impl/WriterImpl.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 04c6665d6..ca21cf14b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -100,26 +100,29 @@ public CompletableFuture tryToEnqueue(EnqueuedMessage message, boolean ins synchronized (incomingQueue) { if (currentInFlightCount >= settings.getMaxSendBufferMessagesCount()) { if (instant) { - logger.info("[{}] Rejecting a message due to reaching message queue in-flight limit", id); + logger.info("[{}] Rejecting a message due to reaching message queue in-flight limit of {}", id, + settings.getMaxSendBufferMessagesCount()); CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(new QueueOverflowException("Message queue in-flight limit reached")); + result.completeExceptionally(new QueueOverflowException("Message queue in-flight limit of " + + settings.getMaxSendBufferMessagesCount() + " reached")); return result; } else { - logger.info("[{}] Message queue in-flight limit reached. Putting the message into incoming " + - "waiting queue", id); + logger.info("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming " + + "waiting queue", id, settings.getMaxSendBufferMessagesCount()); } } else if (availableSizeBytes <= message.getMessage().getData().length) { if (instant) { - logger.info("[{}] Rejecting a message due to reaching message queue size limit", id); + logger.info("[{}] Rejecting a message due to reaching message queue size limit of {} bytes", id, + settings.getMaxSendBufferMemorySize()); CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(new QueueOverflowException("Message queue size limit reached")); + result.completeExceptionally(new QueueOverflowException("Message queue size limit of " + + settings.getMaxSendBufferMemorySize() + " bytes reached")); return result; } else { - logger.info("[{}] Message queue size limit reached. Putting the message into incoming waiting " + - "queue", id); + logger.info("[{}] Message queue size limit of {} bytes reached. Putting the message into incoming" + + " waiting queue", id, settings.getMaxSendBufferMemorySize()); } } else if (incomingQueue.isEmpty()) { - logger.trace("[{}] Putting a message into the queue right now, enough space in send buffer", id); acceptMessageIntoSendingQueue(message); return CompletableFuture.completedFuture(null); } From 62e0cd3f0abd4d3089804669787e636acb8dc9f1 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Mon, 16 Oct 2023 20:17:24 +0300 Subject: [PATCH 06/18] Move encoded messages in compression executer --- .../src/main/java/tech/ydb/topic/write/impl/WriterImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index ca21cf14b..870d43055 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -147,8 +147,10 @@ private void acceptMessageIntoSendingQueue(EnqueuedMessage message) { this.encodingMessages.add(message); CompletableFuture - .runAsync(() -> encode(message), compressionExecutor) - .thenRunAsync(this::moveEncodedMessagesToSendingQueue) + .runAsync(() -> { + encode(message); + moveEncodedMessagesToSendingQueue(); + }, compressionExecutor) .exceptionally((throwable) -> { logger.error("[{}] Exception while encoding message: ", id, throwable); free(1, message.getSizeBytes()); From 599aedf9a30fa1ffdb073b6a90b7e5d1e845dea2 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 19 Oct 2023 14:45:54 +0300 Subject: [PATCH 07/18] Fix StartPartitionSessionSettings.Builder methods return value, add comments --- .../StartPartitionSessionSettings.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java b/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java index 067197830..b61b40ccb 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java @@ -28,12 +28,34 @@ public static class Builder { private Long readOffset; private Long commitOffset; - public void setReadOffset(Long readOffset) { + /** + * Reads in this partition session will start from offset no less than readOffset. + * If readOffset is set, server will check if that readOffset is not less than the actual committed offset. + * If the check fails then server will send an error message (status != SUCCESS) and close the stream. + * If readOffset is not set or is null (which is default), no check will be made. + * + * InitRequest.max_lag and InitRequest.read_from could lead to skip of more messages. + * Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set) + * and offsets calculated from InitRequest.max_lag and InitRequest.read_from. + * + * @param readOffset Offset to read from. Default: null + * @return Builder + */ + public Builder setReadOffset(Long readOffset) { this.readOffset = readOffset; + return this; } - public void setCommitOffset(Long commitOffset) { + /** + * Make server know that all messages with offsets less than commitOffset were fully processed by client. + * Server will commit this position if it is not already done. + * + * @param commitOffset Commit offset, following the offset of last processed (committed) message. Default: null + * @return Builder + */ + public Builder setCommitOffset(Long commitOffset) { this.commitOffset = commitOffset; + return this; } public StartPartitionSessionSettings build() { From 0311fe475a7157fb3747a4c305418e0e5c273d46 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 19 Oct 2023 13:20:44 +0300 Subject: [PATCH 08/18] Add message Committer, ramove data references where possible --- .../java/tech/ydb/topic/read/Committer.java | 21 ++++++++++ .../java/tech/ydb/topic/read/Message.java | 15 ++++++- .../topic/read/events/DataReceivedEvent.java | 29 ++++++++++++++ .../ydb/topic/read/impl/CommitterImpl.java | 39 +++++++++++++++++++ .../tech/ydb/topic/read/impl/MessageImpl.java | 37 +++++++----------- .../topic/read/impl/PartitionSessionImpl.java | 11 +++--- .../impl/events/DataReceivedEventImpl.java | 34 ++++++++-------- 7 files changed, 137 insertions(+), 49 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/read/Committer.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java diff --git a/topic/src/main/java/tech/ydb/topic/read/Committer.java b/topic/src/main/java/tech/ydb/topic/read/Committer.java new file mode 100644 index 000000000..c0cf08984 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/Committer.java @@ -0,0 +1,21 @@ +package tech.ydb.topic.read; + +import java.util.concurrent.CompletableFuture; + +/** + * A helper class that is used to call deferred commits + * Contains no data references and therefore may be useful in cases where commit() is called after processing data in + * an external system + * + * @author Nikolay Perfilov + */ +public interface Committer { + /** + * Commits offsets associated with this committer + * If there was an error while committing, there is no point of retrying committing the same message(s): + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * @return CompletableFuture that will be completed when commit confirmation from server will be received + */ + CompletableFuture commit(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 6b0cdd269..91f72d757 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -58,9 +58,22 @@ public interface Message { PartitionSession getPartitionSession(); /** - * Commit this message + * Commits this message + * If there was an error while committing, there is no point of retrying committing the same message: + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * * @return CompletableFuture that will be completed when commit confirmation from server will be received */ CompletableFuture commit(); + /** + * Returns a Committer object to call commit() on later. + * This object has no data references and therefore may be useful in cases where commit() is called after + * processing data in an external system + * + * @return a Committer object + */ + Committer getCommitter(); + } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 56f89f5f9..729e3f7c9 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -11,10 +12,38 @@ */ public interface DataReceivedEvent { + /** + * Returns a list of messages grouped in one batch. + * Each message can be committed individually or all messages can be committed at once with commit() method + * + * @return a list of messages + */ List getMessages(); + /** + * Returns a partition session this data was received on + * + * @return a partition session this data was received on + */ PartitionSession getPartitionSession(); + /** + * Commits all messages in this event at once. + * If there was an error while committing, there is no point of retrying committing the same messages: + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * + * @return a CompletableFuture that will be completed when commit confirmation from server will be received + */ CompletableFuture commit(); + /** + * Returns a Committer object to call commit() on later. + * This object has no data references and therefore may be useful in cases where commit() is called after + * processing data in an external system + * + * @return a Committer object + */ + Committer getCommitter(); + } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java new file mode 100644 index 000000000..b17263a63 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -0,0 +1,39 @@ +package tech.ydb.topic.read.impl; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.read.Committer; + +/** + * @author Nikolay Perfilov + */ +public class CommitterImpl implements Committer { + private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class); + private final PartitionSessionImpl partitionSession; + private final int messageCount; + private final OffsetsRange offsetsToCommit; + + public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, OffsetsRange offsetsToCommit) { + this.partitionSession = partitionSession; + this.messageCount = messageCount; + this.offsetsToCommit = offsetsToCommit; + } + + @Override + public CompletableFuture commit() { + return commitImpl(true); + } + + public CompletableFuture commitImpl(boolean fromCommitter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" + + " [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(), + partitionSession.getId(), partitionSession.getPartitionId(), messageCount, + offsetsToCommit.getStart(), offsetsToCommit.getEnd()); + } + return partitionSession.commitOffset(offsetsToCommit); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 737fa82f2..d46467095 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -4,11 +4,8 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -17,7 +14,6 @@ * @author Nikolay Perfilov */ public class MessageImpl implements Message { - private static final Logger logger = LoggerFactory.getLogger(MessageImpl.class); private byte[] data; private final long offset; private final long seqNo; @@ -25,8 +21,8 @@ public class MessageImpl implements Message { private final Instant createdAt; private final String messageGroupId; private final BatchMeta batchMeta; - private final PartitionSession partitionSession; - private final Function> commitFunction; + private final PartitionSessionImpl partitionSession; + private final CommitterImpl committer; private boolean isDecompressed = false; private IOException exception = null; @@ -39,7 +35,7 @@ private MessageImpl(Builder builder) { this.messageGroupId = builder.messageGroupId; this.batchMeta = builder.batchMeta; this.partitionSession = builder.partitionSession; - this.commitFunction = builder.commitFunction; + this.committer = new CommitterImpl(partitionSession, 1, new OffsetsRange(commitOffsetFrom, offset + 1)); } @Override @@ -100,7 +96,7 @@ public Instant getWrittenAt() { @Override public PartitionSession getPartitionSession() { - return partitionSession; + return partitionSession.getSessionInfo(); } public void setDecompressed(boolean decompressed) { @@ -109,13 +105,12 @@ public void setDecompressed(boolean decompressed) { @Override public CompletableFuture commit() { - final long commitOffsetTo = offset + 1; - if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing message with offset {} [{}-{})", - partitionSession.getPath(), partitionSession.getId(), partitionSession.getPartitionId(), - offset, commitOffsetFrom, commitOffsetTo); - } - return commitFunction.apply(new OffsetsRange(commitOffsetFrom, commitOffsetTo)); + return committer.commitImpl(false); + } + + @Override + public Committer getCommitter() { + return committer; } /** @@ -129,8 +124,7 @@ public static class Builder { private Instant createdAt; private String messageGroupId; private BatchMeta batchMeta; - private PartitionSession partitionSession; - private Function> commitFunction; + private PartitionSessionImpl partitionSession; public Builder setData(byte[] data) { this.data = data; @@ -167,16 +161,11 @@ public Builder setBatchMeta(BatchMeta batchMeta) { return this; } - public Builder setPartitionSession(PartitionSession partitionSession) { + public Builder setPartitionSession(PartitionSessionImpl partitionSession) { this.partitionSession = partitionSession; return this; } - public Builder setCommitFunction(Function> commitFunction) { - this.commitFunction = commitFunction; - return this; - } - public MessageImpl build() { return new MessageImpl(this); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 9ff6a5740..0adc8a64d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -124,14 +124,13 @@ public CompletableFuture addBatches(List addBatches(List[0])); } - private CompletableFuture commitOffset(OffsetsRange offsets) { + public CompletableFuture commitOffset(OffsetsRange offsets) { CompletableFuture resultFuture = new CompletableFuture<>(); synchronized (commitFutures) { if (isWorking.get()) { @@ -250,9 +249,9 @@ private void sendDataToReadersIfNeeded() { // Should be called maximum in 1 thread at a time List messageImplList = batchToRead.getMessages(); List messagesToRead = new ArrayList<>(messageImplList); - DataReceivedEvent event = new DataReceivedEventImpl(messagesToRead, sessionInfo, - () -> commitOffset(new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), - messageImplList.get(messageImplList.size() - 1).getOffset() + 1))); + OffsetsRange offsetsToCommit = new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), + messageImplList.get(messageImplList.size() - 1).getOffset() + 1); + DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition " + "session {} " + "(partition {}) is about to be called...", path, messagesToRead.size(), diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index ea5e6f64c..c9face057 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -2,29 +2,28 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.CommitterImpl; +import tech.ydb.topic.read.impl.OffsetsRange; +import tech.ydb.topic.read.impl.PartitionSessionImpl; /** * @author Nikolay Perfilov */ public class DataReceivedEventImpl implements DataReceivedEvent { - private static final Logger logger = LoggerFactory.getLogger(DataReceivedEventImpl.class); private final List messages; - private final PartitionSession partitionSession; - private final Supplier> commitCallback; + private final PartitionSessionImpl partitionSession; + private final CommitterImpl committer; - public DataReceivedEventImpl(List messages, PartitionSession partitionSession, - Supplier> commitCallback) { + public DataReceivedEventImpl(PartitionSessionImpl partitionSession, List messages, + OffsetsRange offsetsToCommit) { this.messages = messages; this.partitionSession = partitionSession; - this.commitCallback = commitCallback; + this.committer = new CommitterImpl(partitionSession, messages.size(), offsetsToCommit); } @Override @@ -34,17 +33,16 @@ public List getMessages() { @Override public PartitionSession getPartitionSession() { - return partitionSession; + return partitionSession.getSessionInfo(); } @Override public CompletableFuture commit() { - if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing batch with {} message(s) and offsets" + - " {}-{}", partitionSession.getPath(), partitionSession.getId(), - partitionSession.getPartitionId(), messages.size(), messages.get(0).getOffset(), - messages.get(messages.size() - 1).getOffset()); - } - return commitCallback.get(); + return committer.commitImpl(false); + } + + @Override + public Committer getCommitter() { + return committer; } } From d65b710737147edfcc46e186c09f3d252abfee2e Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Wed, 25 Oct 2023 21:51:25 +0300 Subject: [PATCH 09/18] Add DeferredCommitter, refactor OffsetsRange --- .../ydb/topic/read/DeferredCommitter.java | 25 +++++++ .../tech/ydb/topic/read/OffsetsRange.java | 10 +++ .../events/StartPartitionSessionEvent.java | 2 +- .../ydb/topic/read/impl/AsyncReaderImpl.java | 2 +- .../ydb/topic/read/impl/CommitterImpl.java | 3 +- .../read/impl/DeferredCommitterImpl.java | 67 +++++++++++++++++++ .../read/impl/DisjointOffsetRangeSet.java | 66 ++++++++++++++++++ .../tech/ydb/topic/read/impl/MessageImpl.java | 13 +++- .../ydb/topic/read/impl/OffsetsRange.java | 22 ------ .../ydb/topic/read/impl/OffsetsRangeImpl.java | 39 +++++++++++ .../topic/read/impl/PartitionSessionImpl.java | 55 ++++++++++++--- .../tech/ydb/topic/read/impl/ReaderImpl.java | 44 ++++++++---- .../impl/events/DataReceivedEventImpl.java | 2 +- .../StartPartitionSessionEventImpl.java | 2 +- 14 files changed, 300 insertions(+), 52 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java delete mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java new file mode 100644 index 000000000..0aa46e061 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java @@ -0,0 +1,25 @@ +package tech.ydb.topic.read; + +/** + * A helper class that is used to call deferred commits. + * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later + * all at once. + * Contains no data references and therefore may also be useful in cases where commit() is called after processing data + * in an external system. + * + * @author Nikolay Perfilov + */ +public interface DeferredCommitter { + /** + * Adds a {@link Message} to commit it later with a commit method + * + * @param message a {@link Message} to commit later + */ + void add(Message message); + + /** + * Commits offset ranges from all {@link Message}s and {@link tech.ydb.topic.read.events.DataReceivedEvent}s + * that were added to this DeferredCommitter since last commit + */ + void commit(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java new file mode 100644 index 000000000..0045e644c --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java @@ -0,0 +1,10 @@ +package tech.ydb.topic.read; + +/** + * @author Nikolay Perfilov + */ +public interface OffsetsRange { + long getStart(); + + long getEnd(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java index d5c394428..aa64e60de 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java @@ -1,7 +1,7 @@ package tech.ydb.topic.read.events; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.settings.StartPartitionSessionSettings; /** diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 2cc8237e3..d90ca71ae 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -81,7 +81,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta StartPartitionSessionEvent event = new StartPartitionSessionEventImpl( partitionSession, request.getCommittedOffset(), - new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()), + new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()), confirmCallback ); eventHandler.onStartPartitionSession(event); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index b17263a63..888922d2d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory; import tech.ydb.topic.read.Committer; +import tech.ydb.topic.read.OffsetsRange; /** * @author Nikolay Perfilov @@ -34,6 +35,6 @@ public CompletableFuture commitImpl(boolean fromCommitter) { partitionSession.getId(), partitionSession.getPartitionId(), messageCount, offsetsToCommit.getStart(), offsetsToCommit.getEnd()); } - return partitionSession.commitOffset(offsetsToCommit); + return partitionSession.commitOffsetRange(offsetsToCommit); } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java new file mode 100644 index 000000000..a9379e515 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -0,0 +1,67 @@ +package tech.ydb.topic.read.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.read.DeferredCommitter; +import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class DeferredCommitterImpl implements DeferredCommitter { + private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class); + + private final Map rangesByPartition = new ConcurrentHashMap<>(); + + private static class PartitionRanges { + private final PartitionSessionImpl partitionSession; + private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + private PartitionRanges(PartitionSessionImpl partitionSession) { + this.partitionSession = partitionSession; + } + + private void add(MessageImpl message) { + try { + synchronized (ranges) { + ranges.add(message.getOffsetsToCommit()); + } + } catch (RuntimeException exception) { + String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + + partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " + + exception.getMessage(); + logger.error(errorMessage); + throw new RuntimeException(errorMessage, exception); + } + } + + private void commit() { + List rangesToCommit; + synchronized (ranges) { + rangesToCommit = ranges.getRangesAndClear(); + } + partitionSession.commitOffsetRanges(rangesToCommit); + } + } + + @Override + public void add(Message message) { + MessageImpl messageImpl = (MessageImpl) message; + PartitionRanges partitionRanges = rangesByPartition + .computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new); + partitionRanges.add(messageImpl); + } + + @Override + public void commit() { + rangesByPartition.forEach((session, partitionRanges) -> { + partitionRanges.commit(); + }); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java new file mode 100644 index 000000000..53d529043 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -0,0 +1,66 @@ +package tech.ydb.topic.read.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class DisjointOffsetRangeSet { + private final NavigableMap ranges = new ConcurrentSkipListMap<>(); + + public void add(OffsetsRange rangeToCommit) { + Map.Entry floorEntry = ranges.floorEntry(rangeToCommit.getStart()); + boolean mergedFloor = false; + if (floorEntry != null) { + if (floorEntry.getValue().getStart() > rangeToCommit.getStart()) { + throwClashesException(floorEntry.getValue(), rangeToCommit); + } + if (floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { + floorEntry.getValue().setEnd(rangeToCommit.getEnd()); + mergedFloor = true; + } + } + Map.Entry ceilingEntry = + ranges.ceilingEntry(rangeToCommit.getStart()); + if (ceilingEntry != null) { + OffsetsRangeImpl ceilingValue = ceilingEntry.getValue(); + if (rangeToCommit.getEnd() > ceilingValue.getStart()) { + throwClashesException(ceilingValue, rangeToCommit); + } + if (rangeToCommit.getEnd() == ceilingValue.getStart()) { + ranges.remove(ceilingEntry.getKey()); + if (mergedFloor) { + floorEntry.getValue().setEnd(ceilingValue.getEnd()); + } else { + ceilingValue.setStart(rangeToCommit.getStart()); + ranges.put(rangeToCommit.getStart(), ceilingValue); + } + return; + } + } + if (!mergedFloor) { + ranges.put(rangeToCommit.getStart(), new OffsetsRangeImpl(rangeToCommit)); + } + } + + public List getRangesAndClear() { + Collection values = ranges.values(); + List result = new ArrayList<>(values); + values.clear(); + return result; + } + + private void throwClashesException(OffsetsRangeImpl existingRange, OffsetsRange newRange) { + String errMessage = "Error adding new offset range. Added range [" + + newRange.getStart() + "," + newRange.getEnd() + ") clashes with existing range [" + + existingRange.getStart() + "," + existingRange.getEnd() + ")"; + throw new RuntimeException(errMessage); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index d46467095..86996152d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -8,6 +8,7 @@ import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; /** @@ -22,6 +23,7 @@ public class MessageImpl implements Message { private final String messageGroupId; private final BatchMeta batchMeta; private final PartitionSessionImpl partitionSession; + private final OffsetsRange offsetsToCommit; private final CommitterImpl committer; private boolean isDecompressed = false; private IOException exception = null; @@ -35,7 +37,8 @@ private MessageImpl(Builder builder) { this.messageGroupId = builder.messageGroupId; this.batchMeta = builder.batchMeta; this.partitionSession = builder.partitionSession; - this.committer = new CommitterImpl(partitionSession, 1, new OffsetsRange(commitOffsetFrom, offset + 1)); + this.offsetsToCommit = new OffsetsRangeImpl(commitOffsetFrom, offset + 1); + this.committer = new CommitterImpl(partitionSession, 1, offsetsToCommit); } @Override @@ -99,6 +102,10 @@ public PartitionSession getPartitionSession() { return partitionSession.getSessionInfo(); } + public PartitionSessionImpl getPartitionSessionImpl() { + return partitionSession; + } + public void setDecompressed(boolean decompressed) { isDecompressed = decompressed; } @@ -108,6 +115,10 @@ public CompletableFuture commit() { return committer.commitImpl(false); } + public OffsetsRange getOffsetsToCommit() { + return offsetsToCommit; + } + @Override public Committer getCommitter() { return committer; diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java deleted file mode 100644 index 81a6fc140..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java +++ /dev/null @@ -1,22 +0,0 @@ -package tech.ydb.topic.read.impl; - -/** - * @author Nikolay Perfilov - */ -public class OffsetsRange { - private final long start; - private final long end; - - public OffsetsRange(long start, long end) { - this.start = start; - this.end = end; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } -} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java new file mode 100644 index 000000000..5b4661b72 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java @@ -0,0 +1,39 @@ +package tech.ydb.topic.read.impl; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class OffsetsRangeImpl implements OffsetsRange { + private long start; + private long end; + + public OffsetsRangeImpl(long start, long end) { + this.start = start; + this.end = end; + } + + public OffsetsRangeImpl(OffsetsRange offsetsRange) { + this.start = offsetsRange.getStart(); + this.end = offsetsRange.getEnd(); + } + + @Override + public long getStart() { + return start; + } + + @Override + public long getEnd() { + return end; + } + + public void setStart(long start) { + this.start = start; + } + + public void setEnd(long end) { + this.end = end; + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 0adc8a64d..696b72d72 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -22,6 +22,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.Codec; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; @@ -45,7 +46,7 @@ public class PartitionSessionImpl { private final Queue readingQueue = new ConcurrentLinkedQueue<>(); private final Function> dataEventCallback; private final AtomicBoolean isReadingNow = new AtomicBoolean(); - private final Consumer commitFunction; + private final Consumer> commitFunction; private final NavigableMap> commitFutures = new ConcurrentSkipListMap<>(); // Offset of the last read message + 1 private long lastReadOffset; @@ -171,29 +172,63 @@ public CompletableFuture addBatches(List[0])); } - public CompletableFuture commitOffset(OffsetsRange offsets) { + // Сommit single offset range with result future + public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { CompletableFuture resultFuture = new CompletableFuture<>(); synchronized (commitFutures) { if (isWorking.get()) { if (logger.isDebugEnabled()) { logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " + "(partition {}). Last committed offset is {} (commit lag is {})", path, - offsets.getStart(), offsets.getEnd(), id, partitionId, lastCommittedOffset, - offsets.getStart() - lastCommittedOffset); + rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId, lastCommittedOffset, + rangeToCommit.getStart() - lastCommittedOffset); } - commitFutures.put(offsets.getEnd(), resultFuture); - commitFunction.accept(offsets); + commitFutures.put(rangeToCommit.getEnd(), resultFuture); } else { logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session {} " + - "(partition {}) is already closed", path, offsets.getStart(), offsets.getEnd(), id, + "(partition {}) is already closed", path, rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId); resultFuture.completeExceptionally(new RuntimeException("Partition session " + id + " (partition " + partitionId + ") for " + path + " is already closed")); + return resultFuture; } } + List rangeWrapper = new ArrayList<>(1); + rangeWrapper.add(rangeToCommit); + commitFunction.accept(rangeWrapper); return resultFuture; } + // Bulk commit without result future + public void commitOffsetRanges(List rangesToCommit) { + if (isWorking.get()) { + if (logger.isDebugEnabled()) { + StringBuilder message = new StringBuilder("[").append(path) + .append("] Sending CommitRequest for partition session ").append(id) + .append(" (partition ").append(partitionId).append(") with offset ranges "); + addRangesToString(message, rangesToCommit); + logger.info(message.toString()); + } + commitFunction.accept(rangesToCommit); + } else if (logger.isInfoEnabled()) { + StringBuilder message = new StringBuilder("[").append(path).append("] Offset ranges "); + addRangesToString(message, rangesToCommit); + message.append(" are requested to be committed, but partition session ").append(id) + .append(" (partition ").append(partitionId).append(") is already closed"); + logger.info(message.toString()); + } + } + + private static void addRangesToString(StringBuilder stringBuilder, List ranges) { + for (int i = 0; i < ranges.size(); i++) { + if (i > 0) { + stringBuilder.append(", "); + } + OffsetsRange range = ranges.get(i); + stringBuilder.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")"); + } + } + public void handleCommitResponse(long committedOffset) { if (committedOffset <= lastCommittedOffset) { logger.error("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" + @@ -249,7 +284,7 @@ private void sendDataToReadersIfNeeded() { // Should be called maximum in 1 thread at a time List messageImplList = batchToRead.getMessages(); List messagesToRead = new ArrayList<>(messageImplList); - OffsetsRange offsetsToCommit = new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), + OffsetsRange offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(), messageImplList.get(messageImplList.size() - 1).getOffset() + 1); DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { @@ -308,7 +343,7 @@ public static class Builder { private OffsetsRange partitionOffsets; private Executor decompressionExecutor; private Function> dataEventCallback; - private Consumer commitFunction; + private Consumer> commitFunction; public Builder setId(long id) { this.id = id; @@ -345,7 +380,7 @@ public Builder setDataEventCallback(Function commitFunction) { + public Builder setCommitFunction(Consumer> commitFunction) { this.commitFunction = commitFunction; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index c3b506f43..be7e174c6 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -25,6 +25,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.impl.GrpcStreamRetrier; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.settings.ReaderSettings; @@ -240,24 +241,39 @@ private void sendStopPartitionSessionResponse(long partitionSessionId) { .build()); } - private void commitOffset(long partitionSessionId, long partitionId, OffsetsRange offsets) { + private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, + List rangesToCommit) { if (!isWorking.get()) { - logger.info("[{}] Need to send CommitRequest for partition session {} (partition {})," + - " but reading session is already closed", fullId, partitionSessionId, partitionId); + if (logger.isInfoEnabled()) { + StringBuilder message = new StringBuilder("[").append(fullId) + .append("] Need to send CommitRequest for partition session ").append(partitionSessionId) + .append(" (partition ").append(partitionId).append(") with offset ranges "); + for (int i = 0; i < rangesToCommit.size(); i++) { + if (i > 0) { + message.append(", "); + } + OffsetsRange range = rangesToCommit.get(i); + message.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")"); + } + message.append(", but reading session is already closed"); + logger.info(message.toString()); + } return; } - logger.info("[{}] Sending CommitRequest for partition session {} (partition {}) with offset range [{},{})", - fullId, partitionSessionId, partitionId, offsets.getStart(), offsets.getEnd()); + + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder builder = + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() + .setPartitionSessionId(partitionSessionId); + rangesToCommit.forEach(range -> { + builder.addOffsets(YdbTopic.OffsetsRange.newBuilder() + .setStart(range.getStart()) + .setEnd(range.getEnd())); + }); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder() - .addCommitOffsets( - YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() - .setPartitionSessionId(partitionSessionId) - .addOffsets(YdbTopic.OffsetsRange.newBuilder() - .setStart(offsets.getStart()) - .setEnd(offsets.getEnd())) - )) + .addCommitOffsets(builder)) .build()); + } private void closePartitionSessions() { @@ -293,11 +309,11 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart .setPath(request.getPartitionSession().getPath()) .setPartitionId(partitionId) .setCommittedOffset(request.getCommittedOffset()) - .setPartitionOffsets(new OffsetsRange(request.getPartitionOffsets().getStart(), + .setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd())) .setDecompressionExecutor(decompressionExecutor) .setDataEventCallback(ReaderImpl.this::handleDataReceivedEvent) - .setCommitFunction((offsets) -> commitOffset(partitionSessionId, partitionId, offsets)) + .setCommitFunction((offsets) -> sendCommitOffsetRequest(partitionSessionId, partitionId, offsets)) .build(); partitionSessions.put(partitionSession.getId(), partitionSession); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index c9face057..9a2e0dcb1 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -5,10 +5,10 @@ import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.CommitterImpl; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.read.impl.PartitionSessionImpl; /** diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java index e8735f822..9fa4632e7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java @@ -2,9 +2,9 @@ import java.util.function.Consumer; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.StartPartitionSessionEvent; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.settings.StartPartitionSessionSettings; /** From bb4abeae35b677b5fdbab5c28aac4f22e3cf8282 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 12:35:38 +0300 Subject: [PATCH 10/18] Fix corner case in DisjointOffsetRangeSet --- .../read/impl/DisjointOffsetRangeSet.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java index 53d529043..5f0988487 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -17,23 +17,20 @@ public class DisjointOffsetRangeSet { public void add(OffsetsRange rangeToCommit) { Map.Entry floorEntry = ranges.floorEntry(rangeToCommit.getStart()); + if (floorEntry != null && floorEntry.getValue().getEnd() > rangeToCommit.getStart()) { + throwClashesException(floorEntry.getValue(), rangeToCommit); + } + Map.Entry ceilingEntry = ranges.ceilingEntry(rangeToCommit.getStart()); + if (ceilingEntry != null && rangeToCommit.getEnd() > ceilingEntry.getValue().getStart()) { + throwClashesException(ceilingEntry.getValue(), rangeToCommit); + } boolean mergedFloor = false; - if (floorEntry != null) { - if (floorEntry.getValue().getStart() > rangeToCommit.getStart()) { - throwClashesException(floorEntry.getValue(), rangeToCommit); - } - if (floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { - floorEntry.getValue().setEnd(rangeToCommit.getEnd()); - mergedFloor = true; - } + if (floorEntry != null && floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { + floorEntry.getValue().setEnd(rangeToCommit.getEnd()); + mergedFloor = true; } - Map.Entry ceilingEntry = - ranges.ceilingEntry(rangeToCommit.getStart()); if (ceilingEntry != null) { OffsetsRangeImpl ceilingValue = ceilingEntry.getValue(); - if (rangeToCommit.getEnd() > ceilingValue.getStart()) { - throwClashesException(ceilingValue, rangeToCommit); - } if (rangeToCommit.getEnd() == ceilingValue.getStart()) { ranges.remove(ceilingEntry.getKey()); if (mergedFloor) { From 15997f08c2ef5fc723fb7c7833fabf8671e3f3c9 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 12:38:20 +0300 Subject: [PATCH 11/18] Add tests for DisjointOffsetRangeSet --- .../impl/DisjointOffsetRangeSetTest.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java diff --git a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java new file mode 100644 index 000000000..02a841422 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java @@ -0,0 +1,75 @@ +package tech.ydb.topic.impl; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; +import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.impl.DisjointOffsetRangeSet; +import tech.ydb.topic.read.impl.OffsetsRangeImpl; + +/** + * @author Nikolay Perfilov + */ +public class DisjointOffsetRangeSetTest { + + @Test + public void testRangesSimple() { + DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + ranges.add(new OffsetsRangeImpl(0, 1)); + ranges.add(new OffsetsRangeImpl(1, 2)); + ranges.add(new OffsetsRangeImpl(3, 4)); + List rangesResult = ranges.getRangesAndClear(); + Assert.assertEquals(2, rangesResult.size()); + Assert.assertEquals(0, rangesResult.get(0).getStart()); + Assert.assertEquals(2, rangesResult.get(0).getEnd()); + Assert.assertEquals(3, rangesResult.get(1).getStart()); + Assert.assertEquals(4, rangesResult.get(1).getEnd()); + } + + @Test + public void testReuseRangeSet() { + DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + ranges.add(new OffsetsRangeImpl(30, 40)); + ranges.add(new OffsetsRangeImpl(10, 20)); + ranges.add(new OffsetsRangeImpl(0, 9)); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 11))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 10))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(9, 11))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 31))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(31, 100))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 100))); + ranges.add(new OffsetsRangeImpl(9, 10)); + List firstResult = ranges.getRangesAndClear(); + Assert.assertEquals(2, firstResult.size()); + Assert.assertEquals(0, firstResult.get(0).getStart()); + Assert.assertEquals(20, firstResult.get(0).getEnd()); + Assert.assertEquals(30, firstResult.get(1).getStart()); + Assert.assertEquals(40, firstResult.get(1).getEnd()); + + Assert.assertTrue(ranges.getRangesAndClear().isEmpty()); + + ranges.add(new OffsetsRangeImpl(0, 9)); + ranges.add(new OffsetsRangeImpl(10, 19)); + ranges.add(new OffsetsRangeImpl(20, 30)); + List secondResult = ranges.getRangesAndClear(); + Assert.assertEquals(3, secondResult.size()); + Assert.assertEquals(0, secondResult.get(0).getStart()); + Assert.assertEquals(9, secondResult.get(0).getEnd()); + Assert.assertEquals(10, secondResult.get(1).getStart()); + Assert.assertEquals(19, secondResult.get(1).getEnd()); + Assert.assertEquals(20, secondResult.get(2).getStart()); + Assert.assertEquals(30, secondResult.get(2).getEnd()); + + ranges.add(new OffsetsRangeImpl(39, 40)); + ranges.add(new OffsetsRangeImpl(30, 39)); + ranges.add(new OffsetsRangeImpl(40, 50)); + List thirdResult = ranges.getRangesAndClear(); + Assert.assertEquals(1, thirdResult.size()); + Assert.assertEquals(30, thirdResult.get(0).getStart()); + Assert.assertEquals(50, thirdResult.get(0).getEnd()); + + Assert.assertTrue(ranges.getRangesAndClear().isEmpty()); + } +} From 16421f1b2a3f0211feb6addd2d2956f0eb0b178e Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 14:39:50 +0300 Subject: [PATCH 12/18] Add DeferredCommitter factory method --- .../ydb/topic/read/DeferredCommitter.java | 21 ++++++++++++++++++- .../read/impl/DeferredCommitterImpl.java | 16 +++++++++++--- .../impl/events/DataReceivedEventImpl.java | 10 +++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java index 0aa46e061..51172141d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java +++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java @@ -1,5 +1,8 @@ package tech.ydb.topic.read; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.DeferredCommitterImpl; + /** * A helper class that is used to call deferred commits. * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later @@ -10,6 +13,15 @@ * @author Nikolay Perfilov */ public interface DeferredCommitter { + /** + * Creates a new instance of {@link DeferredCommitter} + * + * @return a new instance of {@link DeferredCommitter} + */ + static DeferredCommitter newInstance() { + return new DeferredCommitterImpl(); + } + /** * Adds a {@link Message} to commit it later with a commit method * @@ -18,7 +30,14 @@ public interface DeferredCommitter { void add(Message message); /** - * Commits offset ranges from all {@link Message}s and {@link tech.ydb.topic.read.events.DataReceivedEvent}s + * Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method + * + * @param event a {@link DataReceivedEvent} to commit later + */ + void add(DataReceivedEvent event); + + /** + * Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s * that were added to this DeferredCommitter since last commit */ void commit(); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java index a9379e515..fb3457038 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -10,6 +10,8 @@ import tech.ydb.topic.read.DeferredCommitter; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; /** * @author Nikolay Perfilov @@ -27,10 +29,10 @@ private PartitionRanges(PartitionSessionImpl partitionSession) { this.partitionSession = partitionSession; } - private void add(MessageImpl message) { + private void add(OffsetsRange offsetRange) { try { synchronized (ranges) { - ranges.add(message.getOffsetsToCommit()); + ranges.add(offsetRange); } } catch (RuntimeException exception) { String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + @@ -55,7 +57,15 @@ public void add(Message message) { MessageImpl messageImpl = (MessageImpl) message; PartitionRanges partitionRanges = rangesByPartition .computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new); - partitionRanges.add(messageImpl); + partitionRanges.add(messageImpl.getOffsetsToCommit()); + } + + @Override + public void add(DataReceivedEvent event) { + DataReceivedEventImpl eventImpl = (DataReceivedEventImpl) event; + PartitionRanges partitionRanges = rangesByPartition + .computeIfAbsent(eventImpl.getPartitionSessionImpl(), PartitionRanges::new); + partitionRanges.add(eventImpl.getOffsetsToCommit()); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index 9a2e0dcb1..d1252effa 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -17,12 +17,14 @@ public class DataReceivedEventImpl implements DataReceivedEvent { private final List messages; private final PartitionSessionImpl partitionSession; + private final OffsetsRange offsetsToCommit; private final CommitterImpl committer; public DataReceivedEventImpl(PartitionSessionImpl partitionSession, List messages, OffsetsRange offsetsToCommit) { this.messages = messages; this.partitionSession = partitionSession; + this.offsetsToCommit = offsetsToCommit; this.committer = new CommitterImpl(partitionSession, messages.size(), offsetsToCommit); } @@ -36,11 +38,19 @@ public PartitionSession getPartitionSession() { return partitionSession.getSessionInfo(); } + public PartitionSessionImpl getPartitionSessionImpl() { + return partitionSession; + } + @Override public CompletableFuture commit() { return committer.commitImpl(false); } + public OffsetsRange getOffsetsToCommit() { + return offsetsToCommit; + } + @Override public Committer getCommitter() { return committer; From 54f551be4ddfeb0848a4ab899b7ed93a7b649989 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 14:55:52 +0300 Subject: [PATCH 13/18] Remove regular Committer --- .../java/tech/ydb/topic/read/Committer.java | 21 ------------------- .../java/tech/ydb/topic/read/Message.java | 9 -------- .../topic/read/events/DataReceivedEvent.java | 10 --------- .../ydb/topic/read/impl/CommitterImpl.java | 5 ++--- .../tech/ydb/topic/read/impl/MessageImpl.java | 6 ------ .../impl/events/DataReceivedEventImpl.java | 6 ------ 6 files changed, 2 insertions(+), 55 deletions(-) delete mode 100644 topic/src/main/java/tech/ydb/topic/read/Committer.java diff --git a/topic/src/main/java/tech/ydb/topic/read/Committer.java b/topic/src/main/java/tech/ydb/topic/read/Committer.java deleted file mode 100644 index c0cf08984..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/Committer.java +++ /dev/null @@ -1,21 +0,0 @@ -package tech.ydb.topic.read; - -import java.util.concurrent.CompletableFuture; - -/** - * A helper class that is used to call deferred commits - * Contains no data references and therefore may be useful in cases where commit() is called after processing data in - * an external system - * - * @author Nikolay Perfilov - */ -public interface Committer { - /** - * Commits offsets associated with this committer - * If there was an error while committing, there is no point of retrying committing the same message(s): - * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, - * it will resend all these messages in next PartitionSession. - * @return CompletableFuture that will be completed when commit confirmation from server will be received - */ - CompletableFuture commit(); -} diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 91f72d757..35383e423 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -67,13 +67,4 @@ public interface Message { */ CompletableFuture commit(); - /** - * Returns a Committer object to call commit() on later. - * This object has no data references and therefore may be useful in cases where commit() is called after - * processing data in an external system - * - * @return a Committer object - */ - Committer getCommitter(); - } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 729e3f7c9..416c1481e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -37,13 +36,4 @@ public interface DataReceivedEvent { */ CompletableFuture commit(); - /** - * Returns a Committer object to call commit() on later. - * This object has no data references and therefore may be useful in cases where commit() is called after - * processing data in an external system - * - * @return a Committer object - */ - Committer getCommitter(); - } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index 888922d2d..107657a2c 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -5,13 +5,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.OffsetsRange; /** * @author Nikolay Perfilov */ -public class CommitterImpl implements Committer { +public class CommitterImpl { private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class); private final PartitionSessionImpl partitionSession; private final int messageCount; @@ -23,7 +22,7 @@ public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, Of this.offsetsToCommit = offsetsToCommit; } - @Override + public CompletableFuture commit() { return commitImpl(true); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 86996152d..123f72638 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -5,7 +5,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; @@ -119,11 +118,6 @@ public OffsetsRange getOffsetsToCommit() { return offsetsToCommit; } - @Override - public Committer getCommitter() { - return committer; - } - /** * BUILDER */ diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index d1252effa..b65789434 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; @@ -50,9 +49,4 @@ public CompletableFuture commit() { public OffsetsRange getOffsetsToCommit() { return offsetsToCommit; } - - @Override - public Committer getCommitter() { - return committer; - } } From e15a1566adcdaf4ca5d98b8dccb8d51d66316bc1 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Fri, 27 Oct 2023 12:02:40 +0300 Subject: [PATCH 14/18] Refactor: change map implementation in disjoint set --- .../java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java index 5f0988487..41ec47ec7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -5,7 +5,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.TreeMap; import tech.ydb.topic.read.OffsetsRange; @@ -13,7 +13,7 @@ * @author Nikolay Perfilov */ public class DisjointOffsetRangeSet { - private final NavigableMap ranges = new ConcurrentSkipListMap<>(); + private final NavigableMap ranges = new TreeMap<>(); public void add(OffsetsRange rangeToCommit) { Map.Entry floorEntry = ranges.floorEntry(rangeToCommit.getStart()); From a243ffbeb5f370ff7c5953ffb7c93d7529ae990a Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Fri, 27 Oct 2023 19:30:09 +0300 Subject: [PATCH 15/18] Add onCommitResponse callback --- .../read/events/AbstractReadEventHandler.java | 5 ++++ .../CommitOffsetAcknowledgementEvent.java | 6 ++++- .../topic/read/events/ReadEventHandler.java | 1 + .../ydb/topic/read/impl/AsyncReaderImpl.java | 11 +++++++++ .../tech/ydb/topic/read/impl/ReaderImpl.java | 11 +++++++-- .../ydb/topic/read/impl/SyncReaderImpl.java | 8 +++++++ .../CommitOffsetAcknowledgementEventImpl.java | 24 +++++++++++++++++++ 7 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/events/CommitOffsetAcknowledgementEventImpl.java diff --git a/topic/src/main/java/tech/ydb/topic/read/events/AbstractReadEventHandler.java b/topic/src/main/java/tech/ydb/topic/read/events/AbstractReadEventHandler.java index d3be7302f..8c5a138b4 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/AbstractReadEventHandler.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/AbstractReadEventHandler.java @@ -7,6 +7,11 @@ public abstract class AbstractReadEventHandler implements ReadEventHandler { // onMessages(DataReceivedEvent event) method should be defined in user's implementation + @Override + public void onCommitResponse(CommitOffsetAcknowledgementEvent event) { + + } + @Override public void onStartPartitionSession(StartPartitionSessionEvent event) { event.confirm(); diff --git a/topic/src/main/java/tech/ydb/topic/read/events/CommitOffsetAcknowledgementEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/CommitOffsetAcknowledgementEvent.java index b53a4a3d7..5c6cac799 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/CommitOffsetAcknowledgementEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/CommitOffsetAcknowledgementEvent.java @@ -1,7 +1,11 @@ package tech.ydb.topic.read.events; +import tech.ydb.topic.read.PartitionSession; + /** * @author Nikolay Perfilov */ -public class CommitOffsetAcknowledgementEvent { +public interface CommitOffsetAcknowledgementEvent { + PartitionSession getPartitionSession(); + long getCommittedOffset(); } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java b/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java index 31bb728c2..85ec64bc3 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java @@ -6,6 +6,7 @@ public interface ReadEventHandler { void onMessages(DataReceivedEvent event); + void onCommitResponse(CommitOffsetAcknowledgementEvent event); void onStartPartitionSession(StartPartitionSessionEvent event); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index d90ca71ae..5de376986 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -15,12 +15,14 @@ import tech.ydb.topic.TopicRpc; import tech.ydb.topic.read.AsyncReader; import tech.ydb.topic.read.PartitionSession; +import tech.ydb.topic.read.events.CommitOffsetAcknowledgementEvent; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.events.PartitionSessionClosedEvent; import tech.ydb.topic.read.events.ReadEventHandler; import tech.ydb.topic.read.events.ReaderClosedEvent; import tech.ydb.topic.read.events.StartPartitionSessionEvent; import tech.ydb.topic.read.events.StopPartitionSessionEvent; +import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl; import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl; import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl; import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl; @@ -72,6 +74,15 @@ protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent even }, handlerExecutor); } + @Override + protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) { + handlerExecutor.execute(() -> { + CommitOffsetAcknowledgementEvent event = new CommitOffsetAcknowledgementEventImpl(partitionSession, + committedOffset); + eventHandler.onCommitResponse(event); + }); + } + @Override protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index be7e174c6..e7f52da19 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -99,6 +99,7 @@ protected CompletableFuture initImpl() { } protected abstract CompletableFuture handleDataReceivedEvent(DataReceivedEvent event); + protected abstract void handleCommitResponse(long committedOffset, PartitionSession partitionSession); protected abstract void handleStartPartitionSessionRequest( YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, @@ -359,7 +360,8 @@ private void onReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse CompletableFuture readFuture = partitionSession.addBatches(data.getBatchesList()); batchReadFutures.add(readFuture); } else { - logger.warn("[{}] Received PartitionData for unknown(closed?) PartitionSessionId={}", + logger.error("[{}] Received PartitionData for unknown(closed?) PartitionSessionId={}. " + + "This shouldn't happen", fullId, partitionId); } }); @@ -387,9 +389,14 @@ protected void onCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetRes PartitionSessionImpl partitionSession = partitionSessions.get(partitionCommittedOffset.getPartitionSessionId()); if (partitionSession != null) { + // Handling CompletableFuture completions for single commits partitionSession.handleCommitResponse(partitionCommittedOffset.getCommittedOffset()); + // Handling onCommitResponse callback + handleCommitResponse(partitionCommittedOffset.getCommittedOffset(), + partitionSession.getSessionInfo()); } else { - logger.debug("[{}] Received CommitOffsetResponse for closed partition session with id={}", fullId, + logger.error("[{}] Received CommitOffsetResponse for unknown (closed?) partition session with " + + "id={}. This shouldn't happen", fullId, partitionCommittedOffset.getPartitionSessionId()); } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index 49f3b2591..ed9492190 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -132,6 +132,14 @@ protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent even return resultFuture; } + @Override + protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) { + if (logger.isDebugEnabled()) { + logger.debug("CommitResponse received for partition session {} (partition {}) with committedOffset {}", + partitionSession.getId(), partitionSession.getPartitionId(), committedOffset); + } + } + @Override protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, PartitionSession partitionSession, diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/CommitOffsetAcknowledgementEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/CommitOffsetAcknowledgementEventImpl.java new file mode 100644 index 000000000..62c7d1b66 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/CommitOffsetAcknowledgementEventImpl.java @@ -0,0 +1,24 @@ +package tech.ydb.topic.read.impl.events; + +import tech.ydb.topic.read.PartitionSession; +import tech.ydb.topic.read.events.CommitOffsetAcknowledgementEvent; + +public class CommitOffsetAcknowledgementEventImpl implements CommitOffsetAcknowledgementEvent { + private final PartitionSession partitionSession; + private final long committedOffset; + + public CommitOffsetAcknowledgementEventImpl(PartitionSession partitionSession, long committedOffset) { + this.partitionSession = partitionSession; + this.committedOffset = committedOffset; + } + + @Override + public PartitionSession getPartitionSession() { + return partitionSession; + } + + @Override + public long getCommittedOffset() { + return committedOffset; + } +} From 453b6ec995400dbc9de37b39b01dcf597d259ad1 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Tue, 31 Oct 2023 12:59:53 +0300 Subject: [PATCH 16/18] Minor refactoring: add extra empty line --- .../main/java/tech/ydb/topic/read/events/ReadEventHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java b/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java index 85ec64bc3..92db832be 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java @@ -6,6 +6,7 @@ public interface ReadEventHandler { void onMessages(DataReceivedEvent event); + void onCommitResponse(CommitOffsetAcknowledgementEvent event); void onStartPartitionSession(StartPartitionSessionEvent event); From 61d51dc33b77081780b818f9b884695264126fcb Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 17 Oct 2023 18:20:00 +0100 Subject: [PATCH 17/18] Updated changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d82ddbd1a..4a8a91b8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 2.1.8 ## + +* Topics: Added DeferredCommitter class to group several read commits into one or just defer each commit without holding data reference +* Topics: Added onCommitResponse callback for AsyncReader to subscribe to server event directly and control commits more efficiently +* Topics: Removed usage of ForkJoinPool.commonPool() +* Table: Fixed typo in SessionPoolStats + ## 2.1.7 ## * Topics: Rethrow IO exceptions to user handlers while decoding messages From 64bb53cb6f811f19a6b9819e4b5940eb71da8e77 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 17 Oct 2023 18:21:13 +0100 Subject: [PATCH 18/18] New release version v2.1.8 --- README.md | 2 +- bom/pom.xml | 2 +- coordination/pom.xml | 2 +- core/pom.xml | 2 +- core/src/main/resources/version.properties | 2 +- pom.xml | 2 +- scheme/pom.xml | 2 +- table/pom.xml | 2 +- tests/common/pom.xml | 2 +- tests/junit4-support/pom.xml | 2 +- tests/junit5-support/pom.xml | 2 +- topic/pom.xml | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index d6600ed3c..4f10483b8 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Firstly you can import YDB Java BOM to specify correct versions of SDK modules. tech.ydb ydb-sdk-bom - 2.1.7 + 2.1.8 pom import diff --git a/bom/pom.xml b/bom/pom.xml index c6dbe658e..cd251cb36 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -6,7 +6,7 @@ 4.0.0 tech.ydb - 2.1.8-SNAPSHOT + 2.1.8 ydb-sdk-bom Java SDK Bill of Materials Java SDK Bill of Materials (BOM) diff --git a/coordination/pom.xml b/coordination/pom.xml index 491545d84..4c3dbed4a 100644 --- a/coordination/pom.xml +++ b/coordination/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ydb-sdk-coordination diff --git a/core/pom.xml b/core/pom.xml index da4d724a3..36e76f68b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ydb-sdk-core diff --git a/core/src/main/resources/version.properties b/core/src/main/resources/version.properties index a8cfa076d..0833a7964 100644 --- a/core/src/main/resources/version.properties +++ b/core/src/main/resources/version.properties @@ -1 +1 @@ -version=2.1.7 +version=2.1.8 diff --git a/pom.xml b/pom.xml index 3e51130ef..58b593a2c 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 Java SDK for YDB Java SDK for YDB diff --git a/scheme/pom.xml b/scheme/pom.xml index 56a39de6c..4e79d9135 100644 --- a/scheme/pom.xml +++ b/scheme/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ydb-sdk-scheme diff --git a/table/pom.xml b/table/pom.xml index 0067944fe..441001398 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ydb-sdk-table diff --git a/tests/common/pom.xml b/tests/common/pom.xml index 9793d28fe..7ad9656d1 100644 --- a/tests/common/pom.xml +++ b/tests/common/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ../../pom.xml diff --git a/tests/junit4-support/pom.xml b/tests/junit4-support/pom.xml index 4e946ff95..51a0cbd4e 100644 --- a/tests/junit4-support/pom.xml +++ b/tests/junit4-support/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ../../pom.xml diff --git a/tests/junit5-support/pom.xml b/tests/junit5-support/pom.xml index db2000a5e..c981fc794 100644 --- a/tests/junit5-support/pom.xml +++ b/tests/junit5-support/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ../../pom.xml diff --git a/topic/pom.xml b/topic/pom.xml index f68c4d020..3b96b4e3c 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -8,7 +8,7 @@ tech.ydb ydb-sdk-parent - 2.1.8-SNAPSHOT + 2.1.8 ydb-sdk-topic