From 17724d014fce4cc3c18d08e263cab561015792d4 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 14:55:52 +0300 Subject: [PATCH] Remove regular Committer --- .../java/tech/ydb/topic/read/Committer.java | 21 ------------------- .../java/tech/ydb/topic/read/Message.java | 9 -------- .../topic/read/events/DataReceivedEvent.java | 10 --------- .../ydb/topic/read/impl/CommitterImpl.java | 5 ++--- .../tech/ydb/topic/read/impl/MessageImpl.java | 6 ------ .../impl/events/DataReceivedEventImpl.java | 6 ------ 6 files changed, 2 insertions(+), 55 deletions(-) delete mode 100644 topic/src/main/java/tech/ydb/topic/read/Committer.java diff --git a/topic/src/main/java/tech/ydb/topic/read/Committer.java b/topic/src/main/java/tech/ydb/topic/read/Committer.java deleted file mode 100644 index c0cf08984..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/Committer.java +++ /dev/null @@ -1,21 +0,0 @@ -package tech.ydb.topic.read; - -import java.util.concurrent.CompletableFuture; - -/** - * A helper class that is used to call deferred commits - * Contains no data references and therefore may be useful in cases where commit() is called after processing data in - * an external system - * - * @author Nikolay Perfilov - */ -public interface Committer { - /** - * Commits offsets associated with this committer - * If there was an error while committing, there is no point of retrying committing the same message(s): - * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, - * it will resend all these messages in next PartitionSession. - * @return CompletableFuture that will be completed when commit confirmation from server will be received - */ - CompletableFuture commit(); -} diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 91f72d757..35383e423 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -67,13 +67,4 @@ public interface Message { */ CompletableFuture commit(); - /** - * Returns a Committer object to call commit() on later. - * This object has no data references and therefore may be useful in cases where commit() is called after - * processing data in an external system - * - * @return a Committer object - */ - Committer getCommitter(); - } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 729e3f7c9..416c1481e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -37,13 +36,4 @@ public interface DataReceivedEvent { */ CompletableFuture commit(); - /** - * Returns a Committer object to call commit() on later. - * This object has no data references and therefore may be useful in cases where commit() is called after - * processing data in an external system - * - * @return a Committer object - */ - Committer getCommitter(); - } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index 888922d2d..107657a2c 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -5,13 +5,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.OffsetsRange; /** * @author Nikolay Perfilov */ -public class CommitterImpl implements Committer { +public class CommitterImpl { private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class); private final PartitionSessionImpl partitionSession; private final int messageCount; @@ -23,7 +22,7 @@ public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, Of this.offsetsToCommit = offsetsToCommit; } - @Override + public CompletableFuture commit() { return commitImpl(true); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 86996152d..123f72638 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -5,7 +5,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; @@ -119,11 +118,6 @@ public OffsetsRange getOffsetsToCommit() { return offsetsToCommit; } - @Override - public Committer getCommitter() { - return committer; - } - /** * BUILDER */ diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index d1252effa..b65789434 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; @@ -50,9 +49,4 @@ public CompletableFuture commit() { public OffsetsRange getOffsetsToCommit() { return offsetsToCommit; } - - @Override - public Committer getCommitter() { - return committer; - } }