From 11539a5ef62eb3c46eba568a60d5aca9ec01462d Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 19 Oct 2023 13:20:44 +0300 Subject: [PATCH 1/6] Add message Committer, ramove data references where possible --- .../java/tech/ydb/topic/read/Committer.java | 21 ++++++++++ .../java/tech/ydb/topic/read/Message.java | 15 ++++++- .../topic/read/events/DataReceivedEvent.java | 29 ++++++++++++++ .../ydb/topic/read/impl/CommitterImpl.java | 39 +++++++++++++++++++ .../tech/ydb/topic/read/impl/MessageImpl.java | 37 +++++++----------- .../topic/read/impl/PartitionSessionImpl.java | 11 +++--- .../impl/events/DataReceivedEventImpl.java | 34 ++++++++-------- 7 files changed, 137 insertions(+), 49 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/read/Committer.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java diff --git a/topic/src/main/java/tech/ydb/topic/read/Committer.java b/topic/src/main/java/tech/ydb/topic/read/Committer.java new file mode 100644 index 000000000..c0cf08984 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/Committer.java @@ -0,0 +1,21 @@ +package tech.ydb.topic.read; + +import java.util.concurrent.CompletableFuture; + +/** + * A helper class that is used to call deferred commits + * Contains no data references and therefore may be useful in cases where commit() is called after processing data in + * an external system + * + * @author Nikolay Perfilov + */ +public interface Committer { + /** + * Commits offsets associated with this committer + * If there was an error while committing, there is no point of retrying committing the same message(s): + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * @return CompletableFuture that will be completed when commit confirmation from server will be received + */ + CompletableFuture commit(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 6b0cdd269..91f72d757 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -58,9 +58,22 @@ public interface Message { PartitionSession getPartitionSession(); /** - * Commit this message + * Commits this message + * If there was an error while committing, there is no point of retrying committing the same message: + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * * @return CompletableFuture that will be completed when commit confirmation from server will be received */ CompletableFuture commit(); + /** + * Returns a Committer object to call commit() on later. + * This object has no data references and therefore may be useful in cases where commit() is called after + * processing data in an external system + * + * @return a Committer object + */ + Committer getCommitter(); + } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 56f89f5f9..729e3f7c9 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -11,10 +12,38 @@ */ public interface DataReceivedEvent { + /** + * Returns a list of messages grouped in one batch. + * Each message can be committed individually or all messages can be committed at once with commit() method + * + * @return a list of messages + */ List getMessages(); + /** + * Returns a partition session this data was received on + * + * @return a partition session this data was received on + */ PartitionSession getPartitionSession(); + /** + * Commits all messages in this event at once. + * If there was an error while committing, there is no point of retrying committing the same messages: + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * + * @return a CompletableFuture that will be completed when commit confirmation from server will be received + */ CompletableFuture commit(); + /** + * Returns a Committer object to call commit() on later. + * This object has no data references and therefore may be useful in cases where commit() is called after + * processing data in an external system + * + * @return a Committer object + */ + Committer getCommitter(); + } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java new file mode 100644 index 000000000..b17263a63 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -0,0 +1,39 @@ +package tech.ydb.topic.read.impl; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.read.Committer; + +/** + * @author Nikolay Perfilov + */ +public class CommitterImpl implements Committer { + private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class); + private final PartitionSessionImpl partitionSession; + private final int messageCount; + private final OffsetsRange offsetsToCommit; + + public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, OffsetsRange offsetsToCommit) { + this.partitionSession = partitionSession; + this.messageCount = messageCount; + this.offsetsToCommit = offsetsToCommit; + } + + @Override + public CompletableFuture commit() { + return commitImpl(true); + } + + public CompletableFuture commitImpl(boolean fromCommitter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" + + " [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(), + partitionSession.getId(), partitionSession.getPartitionId(), messageCount, + offsetsToCommit.getStart(), offsetsToCommit.getEnd()); + } + return partitionSession.commitOffset(offsetsToCommit); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 737fa82f2..d46467095 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -4,11 +4,8 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -17,7 +14,6 @@ * @author Nikolay Perfilov */ public class MessageImpl implements Message { - private static final Logger logger = LoggerFactory.getLogger(MessageImpl.class); private byte[] data; private final long offset; private final long seqNo; @@ -25,8 +21,8 @@ public class MessageImpl implements Message { private final Instant createdAt; private final String messageGroupId; private final BatchMeta batchMeta; - private final PartitionSession partitionSession; - private final Function> commitFunction; + private final PartitionSessionImpl partitionSession; + private final CommitterImpl committer; private boolean isDecompressed = false; private IOException exception = null; @@ -39,7 +35,7 @@ private MessageImpl(Builder builder) { this.messageGroupId = builder.messageGroupId; this.batchMeta = builder.batchMeta; this.partitionSession = builder.partitionSession; - this.commitFunction = builder.commitFunction; + this.committer = new CommitterImpl(partitionSession, 1, new OffsetsRange(commitOffsetFrom, offset + 1)); } @Override @@ -100,7 +96,7 @@ public Instant getWrittenAt() { @Override public PartitionSession getPartitionSession() { - return partitionSession; + return partitionSession.getSessionInfo(); } public void setDecompressed(boolean decompressed) { @@ -109,13 +105,12 @@ public void setDecompressed(boolean decompressed) { @Override public CompletableFuture commit() { - final long commitOffsetTo = offset + 1; - if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing message with offset {} [{}-{})", - partitionSession.getPath(), partitionSession.getId(), partitionSession.getPartitionId(), - offset, commitOffsetFrom, commitOffsetTo); - } - return commitFunction.apply(new OffsetsRange(commitOffsetFrom, commitOffsetTo)); + return committer.commitImpl(false); + } + + @Override + public Committer getCommitter() { + return committer; } /** @@ -129,8 +124,7 @@ public static class Builder { private Instant createdAt; private String messageGroupId; private BatchMeta batchMeta; - private PartitionSession partitionSession; - private Function> commitFunction; + private PartitionSessionImpl partitionSession; public Builder setData(byte[] data) { this.data = data; @@ -167,16 +161,11 @@ public Builder setBatchMeta(BatchMeta batchMeta) { return this; } - public Builder setPartitionSession(PartitionSession partitionSession) { + public Builder setPartitionSession(PartitionSessionImpl partitionSession) { this.partitionSession = partitionSession; return this; } - public Builder setCommitFunction(Function> commitFunction) { - this.commitFunction = commitFunction; - return this; - } - public MessageImpl build() { return new MessageImpl(this); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 9ff6a5740..0adc8a64d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -124,14 +124,13 @@ public CompletableFuture addBatches(List addBatches(List[0])); } - private CompletableFuture commitOffset(OffsetsRange offsets) { + public CompletableFuture commitOffset(OffsetsRange offsets) { CompletableFuture resultFuture = new CompletableFuture<>(); synchronized (commitFutures) { if (isWorking.get()) { @@ -250,9 +249,9 @@ private void sendDataToReadersIfNeeded() { // Should be called maximum in 1 thread at a time List messageImplList = batchToRead.getMessages(); List messagesToRead = new ArrayList<>(messageImplList); - DataReceivedEvent event = new DataReceivedEventImpl(messagesToRead, sessionInfo, - () -> commitOffset(new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), - messageImplList.get(messageImplList.size() - 1).getOffset() + 1))); + OffsetsRange offsetsToCommit = new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), + messageImplList.get(messageImplList.size() - 1).getOffset() + 1); + DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition " + "session {} " + "(partition {}) is about to be called...", path, messagesToRead.size(), diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index ea5e6f64c..c9face057 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -2,29 +2,28 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.CommitterImpl; +import tech.ydb.topic.read.impl.OffsetsRange; +import tech.ydb.topic.read.impl.PartitionSessionImpl; /** * @author Nikolay Perfilov */ public class DataReceivedEventImpl implements DataReceivedEvent { - private static final Logger logger = LoggerFactory.getLogger(DataReceivedEventImpl.class); private final List messages; - private final PartitionSession partitionSession; - private final Supplier> commitCallback; + private final PartitionSessionImpl partitionSession; + private final CommitterImpl committer; - public DataReceivedEventImpl(List messages, PartitionSession partitionSession, - Supplier> commitCallback) { + public DataReceivedEventImpl(PartitionSessionImpl partitionSession, List messages, + OffsetsRange offsetsToCommit) { this.messages = messages; this.partitionSession = partitionSession; - this.commitCallback = commitCallback; + this.committer = new CommitterImpl(partitionSession, messages.size(), offsetsToCommit); } @Override @@ -34,17 +33,16 @@ public List getMessages() { @Override public PartitionSession getPartitionSession() { - return partitionSession; + return partitionSession.getSessionInfo(); } @Override public CompletableFuture commit() { - if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing batch with {} message(s) and offsets" + - " {}-{}", partitionSession.getPath(), partitionSession.getId(), - partitionSession.getPartitionId(), messages.size(), messages.get(0).getOffset(), - messages.get(messages.size() - 1).getOffset()); - } - return commitCallback.get(); + return committer.commitImpl(false); + } + + @Override + public Committer getCommitter() { + return committer; } } From 890c4cf6fae81edafa8c4e4e1496ec4f3739cbdb Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Wed, 25 Oct 2023 21:51:25 +0300 Subject: [PATCH 2/6] Add DeferredCommitter, refactor OffsetsRange --- .../ydb/topic/read/DeferredCommitter.java | 25 +++++++ .../tech/ydb/topic/read/OffsetsRange.java | 10 +++ .../events/StartPartitionSessionEvent.java | 2 +- .../ydb/topic/read/impl/AsyncReaderImpl.java | 2 +- .../ydb/topic/read/impl/CommitterImpl.java | 3 +- .../read/impl/DeferredCommitterImpl.java | 67 +++++++++++++++++++ .../read/impl/DisjointOffsetRangeSet.java | 66 ++++++++++++++++++ .../tech/ydb/topic/read/impl/MessageImpl.java | 13 +++- .../ydb/topic/read/impl/OffsetsRange.java | 22 ------ .../ydb/topic/read/impl/OffsetsRangeImpl.java | 39 +++++++++++ .../topic/read/impl/PartitionSessionImpl.java | 55 ++++++++++++--- .../tech/ydb/topic/read/impl/ReaderImpl.java | 44 ++++++++---- .../impl/events/DataReceivedEventImpl.java | 2 +- .../StartPartitionSessionEventImpl.java | 2 +- 14 files changed, 300 insertions(+), 52 deletions(-) create mode 100644 topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java delete mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java create mode 100644 topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java new file mode 100644 index 000000000..0aa46e061 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java @@ -0,0 +1,25 @@ +package tech.ydb.topic.read; + +/** + * A helper class that is used to call deferred commits. + * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later + * all at once. + * Contains no data references and therefore may also be useful in cases where commit() is called after processing data + * in an external system. + * + * @author Nikolay Perfilov + */ +public interface DeferredCommitter { + /** + * Adds a {@link Message} to commit it later with a commit method + * + * @param message a {@link Message} to commit later + */ + void add(Message message); + + /** + * Commits offset ranges from all {@link Message}s and {@link tech.ydb.topic.read.events.DataReceivedEvent}s + * that were added to this DeferredCommitter since last commit + */ + void commit(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java new file mode 100644 index 000000000..0045e644c --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java @@ -0,0 +1,10 @@ +package tech.ydb.topic.read; + +/** + * @author Nikolay Perfilov + */ +public interface OffsetsRange { + long getStart(); + + long getEnd(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java index d5c394428..aa64e60de 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java @@ -1,7 +1,7 @@ package tech.ydb.topic.read.events; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.settings.StartPartitionSessionSettings; /** diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 2cc8237e3..d90ca71ae 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -81,7 +81,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta StartPartitionSessionEvent event = new StartPartitionSessionEventImpl( partitionSession, request.getCommittedOffset(), - new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()), + new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()), confirmCallback ); eventHandler.onStartPartitionSession(event); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index b17263a63..888922d2d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory; import tech.ydb.topic.read.Committer; +import tech.ydb.topic.read.OffsetsRange; /** * @author Nikolay Perfilov @@ -34,6 +35,6 @@ public CompletableFuture commitImpl(boolean fromCommitter) { partitionSession.getId(), partitionSession.getPartitionId(), messageCount, offsetsToCommit.getStart(), offsetsToCommit.getEnd()); } - return partitionSession.commitOffset(offsetsToCommit); + return partitionSession.commitOffsetRange(offsetsToCommit); } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java new file mode 100644 index 000000000..a9379e515 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -0,0 +1,67 @@ +package tech.ydb.topic.read.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.read.DeferredCommitter; +import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class DeferredCommitterImpl implements DeferredCommitter { + private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class); + + private final Map rangesByPartition = new ConcurrentHashMap<>(); + + private static class PartitionRanges { + private final PartitionSessionImpl partitionSession; + private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + private PartitionRanges(PartitionSessionImpl partitionSession) { + this.partitionSession = partitionSession; + } + + private void add(MessageImpl message) { + try { + synchronized (ranges) { + ranges.add(message.getOffsetsToCommit()); + } + } catch (RuntimeException exception) { + String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + + partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " + + exception.getMessage(); + logger.error(errorMessage); + throw new RuntimeException(errorMessage, exception); + } + } + + private void commit() { + List rangesToCommit; + synchronized (ranges) { + rangesToCommit = ranges.getRangesAndClear(); + } + partitionSession.commitOffsetRanges(rangesToCommit); + } + } + + @Override + public void add(Message message) { + MessageImpl messageImpl = (MessageImpl) message; + PartitionRanges partitionRanges = rangesByPartition + .computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new); + partitionRanges.add(messageImpl); + } + + @Override + public void commit() { + rangesByPartition.forEach((session, partitionRanges) -> { + partitionRanges.commit(); + }); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java new file mode 100644 index 000000000..53d529043 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -0,0 +1,66 @@ +package tech.ydb.topic.read.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class DisjointOffsetRangeSet { + private final NavigableMap ranges = new ConcurrentSkipListMap<>(); + + public void add(OffsetsRange rangeToCommit) { + Map.Entry floorEntry = ranges.floorEntry(rangeToCommit.getStart()); + boolean mergedFloor = false; + if (floorEntry != null) { + if (floorEntry.getValue().getStart() > rangeToCommit.getStart()) { + throwClashesException(floorEntry.getValue(), rangeToCommit); + } + if (floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { + floorEntry.getValue().setEnd(rangeToCommit.getEnd()); + mergedFloor = true; + } + } + Map.Entry ceilingEntry = + ranges.ceilingEntry(rangeToCommit.getStart()); + if (ceilingEntry != null) { + OffsetsRangeImpl ceilingValue = ceilingEntry.getValue(); + if (rangeToCommit.getEnd() > ceilingValue.getStart()) { + throwClashesException(ceilingValue, rangeToCommit); + } + if (rangeToCommit.getEnd() == ceilingValue.getStart()) { + ranges.remove(ceilingEntry.getKey()); + if (mergedFloor) { + floorEntry.getValue().setEnd(ceilingValue.getEnd()); + } else { + ceilingValue.setStart(rangeToCommit.getStart()); + ranges.put(rangeToCommit.getStart(), ceilingValue); + } + return; + } + } + if (!mergedFloor) { + ranges.put(rangeToCommit.getStart(), new OffsetsRangeImpl(rangeToCommit)); + } + } + + public List getRangesAndClear() { + Collection values = ranges.values(); + List result = new ArrayList<>(values); + values.clear(); + return result; + } + + private void throwClashesException(OffsetsRangeImpl existingRange, OffsetsRange newRange) { + String errMessage = "Error adding new offset range. Added range [" + + newRange.getStart() + "," + newRange.getEnd() + ") clashes with existing range [" + + existingRange.getStart() + "," + existingRange.getEnd() + ")"; + throw new RuntimeException(errMessage); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index d46467095..86996152d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -8,6 +8,7 @@ import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; /** @@ -22,6 +23,7 @@ public class MessageImpl implements Message { private final String messageGroupId; private final BatchMeta batchMeta; private final PartitionSessionImpl partitionSession; + private final OffsetsRange offsetsToCommit; private final CommitterImpl committer; private boolean isDecompressed = false; private IOException exception = null; @@ -35,7 +37,8 @@ private MessageImpl(Builder builder) { this.messageGroupId = builder.messageGroupId; this.batchMeta = builder.batchMeta; this.partitionSession = builder.partitionSession; - this.committer = new CommitterImpl(partitionSession, 1, new OffsetsRange(commitOffsetFrom, offset + 1)); + this.offsetsToCommit = new OffsetsRangeImpl(commitOffsetFrom, offset + 1); + this.committer = new CommitterImpl(partitionSession, 1, offsetsToCommit); } @Override @@ -99,6 +102,10 @@ public PartitionSession getPartitionSession() { return partitionSession.getSessionInfo(); } + public PartitionSessionImpl getPartitionSessionImpl() { + return partitionSession; + } + public void setDecompressed(boolean decompressed) { isDecompressed = decompressed; } @@ -108,6 +115,10 @@ public CompletableFuture commit() { return committer.commitImpl(false); } + public OffsetsRange getOffsetsToCommit() { + return offsetsToCommit; + } + @Override public Committer getCommitter() { return committer; diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java deleted file mode 100644 index 81a6fc140..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java +++ /dev/null @@ -1,22 +0,0 @@ -package tech.ydb.topic.read.impl; - -/** - * @author Nikolay Perfilov - */ -public class OffsetsRange { - private final long start; - private final long end; - - public OffsetsRange(long start, long end) { - this.start = start; - this.end = end; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } -} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java new file mode 100644 index 000000000..5b4661b72 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java @@ -0,0 +1,39 @@ +package tech.ydb.topic.read.impl; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class OffsetsRangeImpl implements OffsetsRange { + private long start; + private long end; + + public OffsetsRangeImpl(long start, long end) { + this.start = start; + this.end = end; + } + + public OffsetsRangeImpl(OffsetsRange offsetsRange) { + this.start = offsetsRange.getStart(); + this.end = offsetsRange.getEnd(); + } + + @Override + public long getStart() { + return start; + } + + @Override + public long getEnd() { + return end; + } + + public void setStart(long start) { + this.start = start; + } + + public void setEnd(long end) { + this.end = end; + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 0adc8a64d..696b72d72 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -22,6 +22,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.Codec; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; @@ -45,7 +46,7 @@ public class PartitionSessionImpl { private final Queue readingQueue = new ConcurrentLinkedQueue<>(); private final Function> dataEventCallback; private final AtomicBoolean isReadingNow = new AtomicBoolean(); - private final Consumer commitFunction; + private final Consumer> commitFunction; private final NavigableMap> commitFutures = new ConcurrentSkipListMap<>(); // Offset of the last read message + 1 private long lastReadOffset; @@ -171,29 +172,63 @@ public CompletableFuture addBatches(List[0])); } - public CompletableFuture commitOffset(OffsetsRange offsets) { + // Сommit single offset range with result future + public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { CompletableFuture resultFuture = new CompletableFuture<>(); synchronized (commitFutures) { if (isWorking.get()) { if (logger.isDebugEnabled()) { logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " + "(partition {}). Last committed offset is {} (commit lag is {})", path, - offsets.getStart(), offsets.getEnd(), id, partitionId, lastCommittedOffset, - offsets.getStart() - lastCommittedOffset); + rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId, lastCommittedOffset, + rangeToCommit.getStart() - lastCommittedOffset); } - commitFutures.put(offsets.getEnd(), resultFuture); - commitFunction.accept(offsets); + commitFutures.put(rangeToCommit.getEnd(), resultFuture); } else { logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session {} " + - "(partition {}) is already closed", path, offsets.getStart(), offsets.getEnd(), id, + "(partition {}) is already closed", path, rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId); resultFuture.completeExceptionally(new RuntimeException("Partition session " + id + " (partition " + partitionId + ") for " + path + " is already closed")); + return resultFuture; } } + List rangeWrapper = new ArrayList<>(1); + rangeWrapper.add(rangeToCommit); + commitFunction.accept(rangeWrapper); return resultFuture; } + // Bulk commit without result future + public void commitOffsetRanges(List rangesToCommit) { + if (isWorking.get()) { + if (logger.isDebugEnabled()) { + StringBuilder message = new StringBuilder("[").append(path) + .append("] Sending CommitRequest for partition session ").append(id) + .append(" (partition ").append(partitionId).append(") with offset ranges "); + addRangesToString(message, rangesToCommit); + logger.info(message.toString()); + } + commitFunction.accept(rangesToCommit); + } else if (logger.isInfoEnabled()) { + StringBuilder message = new StringBuilder("[").append(path).append("] Offset ranges "); + addRangesToString(message, rangesToCommit); + message.append(" are requested to be committed, but partition session ").append(id) + .append(" (partition ").append(partitionId).append(") is already closed"); + logger.info(message.toString()); + } + } + + private static void addRangesToString(StringBuilder stringBuilder, List ranges) { + for (int i = 0; i < ranges.size(); i++) { + if (i > 0) { + stringBuilder.append(", "); + } + OffsetsRange range = ranges.get(i); + stringBuilder.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")"); + } + } + public void handleCommitResponse(long committedOffset) { if (committedOffset <= lastCommittedOffset) { logger.error("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" + @@ -249,7 +284,7 @@ private void sendDataToReadersIfNeeded() { // Should be called maximum in 1 thread at a time List messageImplList = batchToRead.getMessages(); List messagesToRead = new ArrayList<>(messageImplList); - OffsetsRange offsetsToCommit = new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), + OffsetsRange offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(), messageImplList.get(messageImplList.size() - 1).getOffset() + 1); DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { @@ -308,7 +343,7 @@ public static class Builder { private OffsetsRange partitionOffsets; private Executor decompressionExecutor; private Function> dataEventCallback; - private Consumer commitFunction; + private Consumer> commitFunction; public Builder setId(long id) { this.id = id; @@ -345,7 +380,7 @@ public Builder setDataEventCallback(Function commitFunction) { + public Builder setCommitFunction(Consumer> commitFunction) { this.commitFunction = commitFunction; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index c3b506f43..be7e174c6 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -25,6 +25,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.impl.GrpcStreamRetrier; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.settings.ReaderSettings; @@ -240,24 +241,39 @@ private void sendStopPartitionSessionResponse(long partitionSessionId) { .build()); } - private void commitOffset(long partitionSessionId, long partitionId, OffsetsRange offsets) { + private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, + List rangesToCommit) { if (!isWorking.get()) { - logger.info("[{}] Need to send CommitRequest for partition session {} (partition {})," + - " but reading session is already closed", fullId, partitionSessionId, partitionId); + if (logger.isInfoEnabled()) { + StringBuilder message = new StringBuilder("[").append(fullId) + .append("] Need to send CommitRequest for partition session ").append(partitionSessionId) + .append(" (partition ").append(partitionId).append(") with offset ranges "); + for (int i = 0; i < rangesToCommit.size(); i++) { + if (i > 0) { + message.append(", "); + } + OffsetsRange range = rangesToCommit.get(i); + message.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")"); + } + message.append(", but reading session is already closed"); + logger.info(message.toString()); + } return; } - logger.info("[{}] Sending CommitRequest for partition session {} (partition {}) with offset range [{},{})", - fullId, partitionSessionId, partitionId, offsets.getStart(), offsets.getEnd()); + + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder builder = + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() + .setPartitionSessionId(partitionSessionId); + rangesToCommit.forEach(range -> { + builder.addOffsets(YdbTopic.OffsetsRange.newBuilder() + .setStart(range.getStart()) + .setEnd(range.getEnd())); + }); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder() - .addCommitOffsets( - YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() - .setPartitionSessionId(partitionSessionId) - .addOffsets(YdbTopic.OffsetsRange.newBuilder() - .setStart(offsets.getStart()) - .setEnd(offsets.getEnd())) - )) + .addCommitOffsets(builder)) .build()); + } private void closePartitionSessions() { @@ -293,11 +309,11 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart .setPath(request.getPartitionSession().getPath()) .setPartitionId(partitionId) .setCommittedOffset(request.getCommittedOffset()) - .setPartitionOffsets(new OffsetsRange(request.getPartitionOffsets().getStart(), + .setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd())) .setDecompressionExecutor(decompressionExecutor) .setDataEventCallback(ReaderImpl.this::handleDataReceivedEvent) - .setCommitFunction((offsets) -> commitOffset(partitionSessionId, partitionId, offsets)) + .setCommitFunction((offsets) -> sendCommitOffsetRequest(partitionSessionId, partitionId, offsets)) .build(); partitionSessions.put(partitionSession.getId(), partitionSession); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index c9face057..9a2e0dcb1 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -5,10 +5,10 @@ import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.CommitterImpl; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.read.impl.PartitionSessionImpl; /** diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java index e8735f822..9fa4632e7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java @@ -2,9 +2,9 @@ import java.util.function.Consumer; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.StartPartitionSessionEvent; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.settings.StartPartitionSessionSettings; /** From 2da342d7c7164a821006e96a20c0237962059332 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 12:35:38 +0300 Subject: [PATCH 3/6] Fix corner case in DisjointOffsetRangeSet --- .../read/impl/DisjointOffsetRangeSet.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java index 53d529043..5f0988487 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -17,23 +17,20 @@ public class DisjointOffsetRangeSet { public void add(OffsetsRange rangeToCommit) { Map.Entry floorEntry = ranges.floorEntry(rangeToCommit.getStart()); + if (floorEntry != null && floorEntry.getValue().getEnd() > rangeToCommit.getStart()) { + throwClashesException(floorEntry.getValue(), rangeToCommit); + } + Map.Entry ceilingEntry = ranges.ceilingEntry(rangeToCommit.getStart()); + if (ceilingEntry != null && rangeToCommit.getEnd() > ceilingEntry.getValue().getStart()) { + throwClashesException(ceilingEntry.getValue(), rangeToCommit); + } boolean mergedFloor = false; - if (floorEntry != null) { - if (floorEntry.getValue().getStart() > rangeToCommit.getStart()) { - throwClashesException(floorEntry.getValue(), rangeToCommit); - } - if (floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { - floorEntry.getValue().setEnd(rangeToCommit.getEnd()); - mergedFloor = true; - } + if (floorEntry != null && floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { + floorEntry.getValue().setEnd(rangeToCommit.getEnd()); + mergedFloor = true; } - Map.Entry ceilingEntry = - ranges.ceilingEntry(rangeToCommit.getStart()); if (ceilingEntry != null) { OffsetsRangeImpl ceilingValue = ceilingEntry.getValue(); - if (rangeToCommit.getEnd() > ceilingValue.getStart()) { - throwClashesException(ceilingValue, rangeToCommit); - } if (rangeToCommit.getEnd() == ceilingValue.getStart()) { ranges.remove(ceilingEntry.getKey()); if (mergedFloor) { From 4247942391e6de239060b0685d4eda96eacb0e10 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 12:38:20 +0300 Subject: [PATCH 4/6] Add tests for DisjointOffsetRangeSet --- .../impl/DisjointOffsetRangeSetTest.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java diff --git a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java new file mode 100644 index 000000000..02a841422 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java @@ -0,0 +1,75 @@ +package tech.ydb.topic.impl; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; +import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.impl.DisjointOffsetRangeSet; +import tech.ydb.topic.read.impl.OffsetsRangeImpl; + +/** + * @author Nikolay Perfilov + */ +public class DisjointOffsetRangeSetTest { + + @Test + public void testRangesSimple() { + DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + ranges.add(new OffsetsRangeImpl(0, 1)); + ranges.add(new OffsetsRangeImpl(1, 2)); + ranges.add(new OffsetsRangeImpl(3, 4)); + List rangesResult = ranges.getRangesAndClear(); + Assert.assertEquals(2, rangesResult.size()); + Assert.assertEquals(0, rangesResult.get(0).getStart()); + Assert.assertEquals(2, rangesResult.get(0).getEnd()); + Assert.assertEquals(3, rangesResult.get(1).getStart()); + Assert.assertEquals(4, rangesResult.get(1).getEnd()); + } + + @Test + public void testReuseRangeSet() { + DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + ranges.add(new OffsetsRangeImpl(30, 40)); + ranges.add(new OffsetsRangeImpl(10, 20)); + ranges.add(new OffsetsRangeImpl(0, 9)); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 11))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 10))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(9, 11))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 31))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(31, 100))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 100))); + ranges.add(new OffsetsRangeImpl(9, 10)); + List firstResult = ranges.getRangesAndClear(); + Assert.assertEquals(2, firstResult.size()); + Assert.assertEquals(0, firstResult.get(0).getStart()); + Assert.assertEquals(20, firstResult.get(0).getEnd()); + Assert.assertEquals(30, firstResult.get(1).getStart()); + Assert.assertEquals(40, firstResult.get(1).getEnd()); + + Assert.assertTrue(ranges.getRangesAndClear().isEmpty()); + + ranges.add(new OffsetsRangeImpl(0, 9)); + ranges.add(new OffsetsRangeImpl(10, 19)); + ranges.add(new OffsetsRangeImpl(20, 30)); + List secondResult = ranges.getRangesAndClear(); + Assert.assertEquals(3, secondResult.size()); + Assert.assertEquals(0, secondResult.get(0).getStart()); + Assert.assertEquals(9, secondResult.get(0).getEnd()); + Assert.assertEquals(10, secondResult.get(1).getStart()); + Assert.assertEquals(19, secondResult.get(1).getEnd()); + Assert.assertEquals(20, secondResult.get(2).getStart()); + Assert.assertEquals(30, secondResult.get(2).getEnd()); + + ranges.add(new OffsetsRangeImpl(39, 40)); + ranges.add(new OffsetsRangeImpl(30, 39)); + ranges.add(new OffsetsRangeImpl(40, 50)); + List thirdResult = ranges.getRangesAndClear(); + Assert.assertEquals(1, thirdResult.size()); + Assert.assertEquals(30, thirdResult.get(0).getStart()); + Assert.assertEquals(50, thirdResult.get(0).getEnd()); + + Assert.assertTrue(ranges.getRangesAndClear().isEmpty()); + } +} From 227588b391e09faae770efd9f40979395c3750d8 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 14:39:50 +0300 Subject: [PATCH 5/6] Add DeferredCommitter factory method --- .../ydb/topic/read/DeferredCommitter.java | 21 ++++++++++++++++++- .../read/impl/DeferredCommitterImpl.java | 16 +++++++++++--- .../impl/events/DataReceivedEventImpl.java | 10 +++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java index 0aa46e061..51172141d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java +++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java @@ -1,5 +1,8 @@ package tech.ydb.topic.read; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.DeferredCommitterImpl; + /** * A helper class that is used to call deferred commits. * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later @@ -10,6 +13,15 @@ * @author Nikolay Perfilov */ public interface DeferredCommitter { + /** + * Creates a new instance of {@link DeferredCommitter} + * + * @return a new instance of {@link DeferredCommitter} + */ + static DeferredCommitter newInstance() { + return new DeferredCommitterImpl(); + } + /** * Adds a {@link Message} to commit it later with a commit method * @@ -18,7 +30,14 @@ public interface DeferredCommitter { void add(Message message); /** - * Commits offset ranges from all {@link Message}s and {@link tech.ydb.topic.read.events.DataReceivedEvent}s + * Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method + * + * @param event a {@link DataReceivedEvent} to commit later + */ + void add(DataReceivedEvent event); + + /** + * Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s * that were added to this DeferredCommitter since last commit */ void commit(); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java index a9379e515..fb3457038 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -10,6 +10,8 @@ import tech.ydb.topic.read.DeferredCommitter; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; /** * @author Nikolay Perfilov @@ -27,10 +29,10 @@ private PartitionRanges(PartitionSessionImpl partitionSession) { this.partitionSession = partitionSession; } - private void add(MessageImpl message) { + private void add(OffsetsRange offsetRange) { try { synchronized (ranges) { - ranges.add(message.getOffsetsToCommit()); + ranges.add(offsetRange); } } catch (RuntimeException exception) { String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + @@ -55,7 +57,15 @@ public void add(Message message) { MessageImpl messageImpl = (MessageImpl) message; PartitionRanges partitionRanges = rangesByPartition .computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new); - partitionRanges.add(messageImpl); + partitionRanges.add(messageImpl.getOffsetsToCommit()); + } + + @Override + public void add(DataReceivedEvent event) { + DataReceivedEventImpl eventImpl = (DataReceivedEventImpl) event; + PartitionRanges partitionRanges = rangesByPartition + .computeIfAbsent(eventImpl.getPartitionSessionImpl(), PartitionRanges::new); + partitionRanges.add(eventImpl.getOffsetsToCommit()); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index 9a2e0dcb1..d1252effa 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -17,12 +17,14 @@ public class DataReceivedEventImpl implements DataReceivedEvent { private final List messages; private final PartitionSessionImpl partitionSession; + private final OffsetsRange offsetsToCommit; private final CommitterImpl committer; public DataReceivedEventImpl(PartitionSessionImpl partitionSession, List messages, OffsetsRange offsetsToCommit) { this.messages = messages; this.partitionSession = partitionSession; + this.offsetsToCommit = offsetsToCommit; this.committer = new CommitterImpl(partitionSession, messages.size(), offsetsToCommit); } @@ -36,11 +38,19 @@ public PartitionSession getPartitionSession() { return partitionSession.getSessionInfo(); } + public PartitionSessionImpl getPartitionSessionImpl() { + return partitionSession; + } + @Override public CompletableFuture commit() { return committer.commitImpl(false); } + public OffsetsRange getOffsetsToCommit() { + return offsetsToCommit; + } + @Override public Committer getCommitter() { return committer; From 17724d014fce4cc3c18d08e263cab561015792d4 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Thu, 26 Oct 2023 14:55:52 +0300 Subject: [PATCH 6/6] Remove regular Committer --- .../java/tech/ydb/topic/read/Committer.java | 21 ------------------- .../java/tech/ydb/topic/read/Message.java | 9 -------- .../topic/read/events/DataReceivedEvent.java | 10 --------- .../ydb/topic/read/impl/CommitterImpl.java | 5 ++--- .../tech/ydb/topic/read/impl/MessageImpl.java | 6 ------ .../impl/events/DataReceivedEventImpl.java | 6 ------ 6 files changed, 2 insertions(+), 55 deletions(-) delete mode 100644 topic/src/main/java/tech/ydb/topic/read/Committer.java diff --git a/topic/src/main/java/tech/ydb/topic/read/Committer.java b/topic/src/main/java/tech/ydb/topic/read/Committer.java deleted file mode 100644 index c0cf08984..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/Committer.java +++ /dev/null @@ -1,21 +0,0 @@ -package tech.ydb.topic.read; - -import java.util.concurrent.CompletableFuture; - -/** - * A helper class that is used to call deferred commits - * Contains no data references and therefore may be useful in cases where commit() is called after processing data in - * an external system - * - * @author Nikolay Perfilov - */ -public interface Committer { - /** - * Commits offsets associated with this committer - * If there was an error while committing, there is no point of retrying committing the same message(s): - * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, - * it will resend all these messages in next PartitionSession. - * @return CompletableFuture that will be completed when commit confirmation from server will be received - */ - CompletableFuture commit(); -} diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 91f72d757..35383e423 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -67,13 +67,4 @@ public interface Message { */ CompletableFuture commit(); - /** - * Returns a Committer object to call commit() on later. - * This object has no data references and therefore may be useful in cases where commit() is called after - * processing data in an external system - * - * @return a Committer object - */ - Committer getCommitter(); - } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 729e3f7c9..416c1481e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; @@ -37,13 +36,4 @@ public interface DataReceivedEvent { */ CompletableFuture commit(); - /** - * Returns a Committer object to call commit() on later. - * This object has no data references and therefore may be useful in cases where commit() is called after - * processing data in an external system - * - * @return a Committer object - */ - Committer getCommitter(); - } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index 888922d2d..107657a2c 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -5,13 +5,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.OffsetsRange; /** * @author Nikolay Perfilov */ -public class CommitterImpl implements Committer { +public class CommitterImpl { private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class); private final PartitionSessionImpl partitionSession; private final int messageCount; @@ -23,7 +22,7 @@ public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, Of this.offsetsToCommit = offsetsToCommit; } - @Override + public CompletableFuture commit() { return commitImpl(true); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 86996152d..123f72638 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -5,7 +5,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; @@ -119,11 +118,6 @@ public OffsetsRange getOffsetsToCommit() { return offsetsToCommit; } - @Override - public Committer getCommitter() { - return committer; - } - /** * BUILDER */ diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index d1252effa..b65789434 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -3,7 +3,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.read.Committer; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; @@ -50,9 +49,4 @@ public CompletableFuture commit() { public OffsetsRange getOffsetsToCommit() { return offsetsToCommit; } - - @Override - public Committer getCommitter() { - return committer; - } }