diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java new file mode 100644 index 000000000..51172141d --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java @@ -0,0 +1,44 @@ +package tech.ydb.topic.read; + +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.DeferredCommitterImpl; + +/** + * A helper class that is used to call deferred commits. + * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later + * all at once. + * Contains no data references and therefore may also be useful in cases where commit() is called after processing data + * in an external system. + * + * @author Nikolay Perfilov + */ +public interface DeferredCommitter { + /** + * Creates a new instance of {@link DeferredCommitter} + * + * @return a new instance of {@link DeferredCommitter} + */ + static DeferredCommitter newInstance() { + return new DeferredCommitterImpl(); + } + + /** + * Adds a {@link Message} to commit it later with a commit method + * + * @param message a {@link Message} to commit later + */ + void add(Message message); + + /** + * Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method + * + * @param event a {@link DataReceivedEvent} to commit later + */ + void add(DataReceivedEvent event); + + /** + * Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s + * that were added to this DeferredCommitter since last commit + */ + void commit(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 6b0cdd269..35383e423 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -58,7 +58,11 @@ public interface Message { PartitionSession getPartitionSession(); /** - * Commit this message + * Commits this message + * If there was an error while committing, there is no point of retrying committing the same message: + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * * @return CompletableFuture that will be completed when commit confirmation from server will be received */ CompletableFuture commit(); diff --git a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java new file mode 100644 index 000000000..0045e644c --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java @@ -0,0 +1,10 @@ +package tech.ydb.topic.read; + +/** + * @author Nikolay Perfilov + */ +public interface OffsetsRange { + long getStart(); + + long getEnd(); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 56f89f5f9..416c1481e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -11,10 +11,29 @@ */ public interface DataReceivedEvent { + /** + * Returns a list of messages grouped in one batch. + * Each message can be committed individually or all messages can be committed at once with commit() method + * + * @return a list of messages + */ List getMessages(); + /** + * Returns a partition session this data was received on + * + * @return a partition session this data was received on + */ PartitionSession getPartitionSession(); + /** + * Commits all messages in this event at once. + * If there was an error while committing, there is no point of retrying committing the same messages: + * the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server, + * it will resend all these messages in next PartitionSession. + * + * @return a CompletableFuture that will be completed when commit confirmation from server will be received + */ CompletableFuture commit(); } diff --git a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java index d5c394428..aa64e60de 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java @@ -1,7 +1,7 @@ package tech.ydb.topic.read.events; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.settings.StartPartitionSessionSettings; /** diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 2cc8237e3..d90ca71ae 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -81,7 +81,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta StartPartitionSessionEvent event = new StartPartitionSessionEventImpl( partitionSession, request.getCommittedOffset(), - new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()), + new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()), confirmCallback ); eventHandler.onStartPartitionSession(event); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java new file mode 100644 index 000000000..107657a2c --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -0,0 +1,39 @@ +package tech.ydb.topic.read.impl; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class CommitterImpl { + private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class); + private final PartitionSessionImpl partitionSession; + private final int messageCount; + private final OffsetsRange offsetsToCommit; + + public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, OffsetsRange offsetsToCommit) { + this.partitionSession = partitionSession; + this.messageCount = messageCount; + this.offsetsToCommit = offsetsToCommit; + } + + + public CompletableFuture commit() { + return commitImpl(true); + } + + public CompletableFuture commitImpl(boolean fromCommitter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" + + " [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(), + partitionSession.getId(), partitionSession.getPartitionId(), messageCount, + offsetsToCommit.getStart(), offsetsToCommit.getEnd()); + } + return partitionSession.commitOffsetRange(offsetsToCommit); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java new file mode 100644 index 000000000..fb3457038 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -0,0 +1,77 @@ +package tech.ydb.topic.read.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.read.DeferredCommitter; +import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; + +/** + * @author Nikolay Perfilov + */ +public class DeferredCommitterImpl implements DeferredCommitter { + private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class); + + private final Map rangesByPartition = new ConcurrentHashMap<>(); + + private static class PartitionRanges { + private final PartitionSessionImpl partitionSession; + private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + private PartitionRanges(PartitionSessionImpl partitionSession) { + this.partitionSession = partitionSession; + } + + private void add(OffsetsRange offsetRange) { + try { + synchronized (ranges) { + ranges.add(offsetRange); + } + } catch (RuntimeException exception) { + String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + + partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " + + exception.getMessage(); + logger.error(errorMessage); + throw new RuntimeException(errorMessage, exception); + } + } + + private void commit() { + List rangesToCommit; + synchronized (ranges) { + rangesToCommit = ranges.getRangesAndClear(); + } + partitionSession.commitOffsetRanges(rangesToCommit); + } + } + + @Override + public void add(Message message) { + MessageImpl messageImpl = (MessageImpl) message; + PartitionRanges partitionRanges = rangesByPartition + .computeIfAbsent(messageImpl.getPartitionSessionImpl(), PartitionRanges::new); + partitionRanges.add(messageImpl.getOffsetsToCommit()); + } + + @Override + public void add(DataReceivedEvent event) { + DataReceivedEventImpl eventImpl = (DataReceivedEventImpl) event; + PartitionRanges partitionRanges = rangesByPartition + .computeIfAbsent(eventImpl.getPartitionSessionImpl(), PartitionRanges::new); + partitionRanges.add(eventImpl.getOffsetsToCommit()); + } + + @Override + public void commit() { + rangesByPartition.forEach((session, partitionRanges) -> { + partitionRanges.commit(); + }); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java new file mode 100644 index 000000000..5f0988487 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -0,0 +1,63 @@ +package tech.ydb.topic.read.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class DisjointOffsetRangeSet { + private final NavigableMap ranges = new ConcurrentSkipListMap<>(); + + public void add(OffsetsRange rangeToCommit) { + Map.Entry floorEntry = ranges.floorEntry(rangeToCommit.getStart()); + if (floorEntry != null && floorEntry.getValue().getEnd() > rangeToCommit.getStart()) { + throwClashesException(floorEntry.getValue(), rangeToCommit); + } + Map.Entry ceilingEntry = ranges.ceilingEntry(rangeToCommit.getStart()); + if (ceilingEntry != null && rangeToCommit.getEnd() > ceilingEntry.getValue().getStart()) { + throwClashesException(ceilingEntry.getValue(), rangeToCommit); + } + boolean mergedFloor = false; + if (floorEntry != null && floorEntry.getValue().getEnd() == rangeToCommit.getStart()) { + floorEntry.getValue().setEnd(rangeToCommit.getEnd()); + mergedFloor = true; + } + if (ceilingEntry != null) { + OffsetsRangeImpl ceilingValue = ceilingEntry.getValue(); + if (rangeToCommit.getEnd() == ceilingValue.getStart()) { + ranges.remove(ceilingEntry.getKey()); + if (mergedFloor) { + floorEntry.getValue().setEnd(ceilingValue.getEnd()); + } else { + ceilingValue.setStart(rangeToCommit.getStart()); + ranges.put(rangeToCommit.getStart(), ceilingValue); + } + return; + } + } + if (!mergedFloor) { + ranges.put(rangeToCommit.getStart(), new OffsetsRangeImpl(rangeToCommit)); + } + } + + public List getRangesAndClear() { + Collection values = ranges.values(); + List result = new ArrayList<>(values); + values.clear(); + return result; + } + + private void throwClashesException(OffsetsRangeImpl existingRange, OffsetsRange newRange) { + String errMessage = "Error adding new offset range. Added range [" + + newRange.getStart() + "," + newRange.getEnd() + ") clashes with existing range [" + + existingRange.getStart() + "," + existingRange.getEnd() + ")"; + throw new RuntimeException(errMessage); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 737fa82f2..123f72638 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -4,20 +4,16 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; /** * @author Nikolay Perfilov */ public class MessageImpl implements Message { - private static final Logger logger = LoggerFactory.getLogger(MessageImpl.class); private byte[] data; private final long offset; private final long seqNo; @@ -25,8 +21,9 @@ public class MessageImpl implements Message { private final Instant createdAt; private final String messageGroupId; private final BatchMeta batchMeta; - private final PartitionSession partitionSession; - private final Function> commitFunction; + private final PartitionSessionImpl partitionSession; + private final OffsetsRange offsetsToCommit; + private final CommitterImpl committer; private boolean isDecompressed = false; private IOException exception = null; @@ -39,7 +36,8 @@ private MessageImpl(Builder builder) { this.messageGroupId = builder.messageGroupId; this.batchMeta = builder.batchMeta; this.partitionSession = builder.partitionSession; - this.commitFunction = builder.commitFunction; + this.offsetsToCommit = new OffsetsRangeImpl(commitOffsetFrom, offset + 1); + this.committer = new CommitterImpl(partitionSession, 1, offsetsToCommit); } @Override @@ -100,6 +98,10 @@ public Instant getWrittenAt() { @Override public PartitionSession getPartitionSession() { + return partitionSession.getSessionInfo(); + } + + public PartitionSessionImpl getPartitionSessionImpl() { return partitionSession; } @@ -109,13 +111,11 @@ public void setDecompressed(boolean decompressed) { @Override public CompletableFuture commit() { - final long commitOffsetTo = offset + 1; - if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing message with offset {} [{}-{})", - partitionSession.getPath(), partitionSession.getId(), partitionSession.getPartitionId(), - offset, commitOffsetFrom, commitOffsetTo); - } - return commitFunction.apply(new OffsetsRange(commitOffsetFrom, commitOffsetTo)); + return committer.commitImpl(false); + } + + public OffsetsRange getOffsetsToCommit() { + return offsetsToCommit; } /** @@ -129,8 +129,7 @@ public static class Builder { private Instant createdAt; private String messageGroupId; private BatchMeta batchMeta; - private PartitionSession partitionSession; - private Function> commitFunction; + private PartitionSessionImpl partitionSession; public Builder setData(byte[] data) { this.data = data; @@ -167,16 +166,11 @@ public Builder setBatchMeta(BatchMeta batchMeta) { return this; } - public Builder setPartitionSession(PartitionSession partitionSession) { + public Builder setPartitionSession(PartitionSessionImpl partitionSession) { this.partitionSession = partitionSession; return this; } - public Builder setCommitFunction(Function> commitFunction) { - this.commitFunction = commitFunction; - return this; - } - public MessageImpl build() { return new MessageImpl(this); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java deleted file mode 100644 index 81a6fc140..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRange.java +++ /dev/null @@ -1,22 +0,0 @@ -package tech.ydb.topic.read.impl; - -/** - * @author Nikolay Perfilov - */ -public class OffsetsRange { - private final long start; - private final long end; - - public OffsetsRange(long start, long end) { - this.start = start; - this.end = end; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } -} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java new file mode 100644 index 000000000..5b4661b72 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java @@ -0,0 +1,39 @@ +package tech.ydb.topic.read.impl; + +import tech.ydb.topic.read.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class OffsetsRangeImpl implements OffsetsRange { + private long start; + private long end; + + public OffsetsRangeImpl(long start, long end) { + this.start = start; + this.end = end; + } + + public OffsetsRangeImpl(OffsetsRange offsetsRange) { + this.start = offsetsRange.getStart(); + this.end = offsetsRange.getEnd(); + } + + @Override + public long getStart() { + return start; + } + + @Override + public long getEnd() { + return end; + } + + public void setStart(long start) { + this.start = start; + } + + public void setEnd(long end) { + this.end = end; + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 9ff6a5740..696b72d72 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -22,6 +22,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.Codec; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; @@ -45,7 +46,7 @@ public class PartitionSessionImpl { private final Queue readingQueue = new ConcurrentLinkedQueue<>(); private final Function> dataEventCallback; private final AtomicBoolean isReadingNow = new AtomicBoolean(); - private final Consumer commitFunction; + private final Consumer> commitFunction; private final NavigableMap> commitFutures = new ConcurrentSkipListMap<>(); // Offset of the last read message + 1 private long lastReadOffset; @@ -124,14 +125,13 @@ public CompletableFuture addBatches(List addBatches(List[0])); } - private CompletableFuture commitOffset(OffsetsRange offsets) { + // Сommit single offset range with result future + public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { CompletableFuture resultFuture = new CompletableFuture<>(); synchronized (commitFutures) { if (isWorking.get()) { if (logger.isDebugEnabled()) { logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " + "(partition {}). Last committed offset is {} (commit lag is {})", path, - offsets.getStart(), offsets.getEnd(), id, partitionId, lastCommittedOffset, - offsets.getStart() - lastCommittedOffset); + rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId, lastCommittedOffset, + rangeToCommit.getStart() - lastCommittedOffset); } - commitFutures.put(offsets.getEnd(), resultFuture); - commitFunction.accept(offsets); + commitFutures.put(rangeToCommit.getEnd(), resultFuture); } else { logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session {} " + - "(partition {}) is already closed", path, offsets.getStart(), offsets.getEnd(), id, + "(partition {}) is already closed", path, rangeToCommit.getStart(), rangeToCommit.getEnd(), id, partitionId); resultFuture.completeExceptionally(new RuntimeException("Partition session " + id + " (partition " + partitionId + ") for " + path + " is already closed")); + return resultFuture; } } + List rangeWrapper = new ArrayList<>(1); + rangeWrapper.add(rangeToCommit); + commitFunction.accept(rangeWrapper); return resultFuture; } + // Bulk commit without result future + public void commitOffsetRanges(List rangesToCommit) { + if (isWorking.get()) { + if (logger.isDebugEnabled()) { + StringBuilder message = new StringBuilder("[").append(path) + .append("] Sending CommitRequest for partition session ").append(id) + .append(" (partition ").append(partitionId).append(") with offset ranges "); + addRangesToString(message, rangesToCommit); + logger.info(message.toString()); + } + commitFunction.accept(rangesToCommit); + } else if (logger.isInfoEnabled()) { + StringBuilder message = new StringBuilder("[").append(path).append("] Offset ranges "); + addRangesToString(message, rangesToCommit); + message.append(" are requested to be committed, but partition session ").append(id) + .append(" (partition ").append(partitionId).append(") is already closed"); + logger.info(message.toString()); + } + } + + private static void addRangesToString(StringBuilder stringBuilder, List ranges) { + for (int i = 0; i < ranges.size(); i++) { + if (i > 0) { + stringBuilder.append(", "); + } + OffsetsRange range = ranges.get(i); + stringBuilder.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")"); + } + } + public void handleCommitResponse(long committedOffset) { if (committedOffset <= lastCommittedOffset) { logger.error("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" + @@ -250,9 +284,9 @@ private void sendDataToReadersIfNeeded() { // Should be called maximum in 1 thread at a time List messageImplList = batchToRead.getMessages(); List messagesToRead = new ArrayList<>(messageImplList); - DataReceivedEvent event = new DataReceivedEventImpl(messagesToRead, sessionInfo, - () -> commitOffset(new OffsetsRange(messageImplList.get(0).getCommitOffsetFrom(), - messageImplList.get(messageImplList.size() - 1).getOffset() + 1))); + OffsetsRange offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(), + messageImplList.get(messageImplList.size() - 1).getOffset() + 1); + DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition " + "session {} " + "(partition {}) is about to be called...", path, messagesToRead.size(), @@ -309,7 +343,7 @@ public static class Builder { private OffsetsRange partitionOffsets; private Executor decompressionExecutor; private Function> dataEventCallback; - private Consumer commitFunction; + private Consumer> commitFunction; public Builder setId(long id) { this.id = id; @@ -346,7 +380,7 @@ public Builder setDataEventCallback(Function commitFunction) { + public Builder setCommitFunction(Consumer> commitFunction) { this.commitFunction = commitFunction; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index c3b506f43..be7e174c6 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -25,6 +25,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.impl.GrpcStreamRetrier; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.settings.ReaderSettings; @@ -240,24 +241,39 @@ private void sendStopPartitionSessionResponse(long partitionSessionId) { .build()); } - private void commitOffset(long partitionSessionId, long partitionId, OffsetsRange offsets) { + private void sendCommitOffsetRequest(long partitionSessionId, long partitionId, + List rangesToCommit) { if (!isWorking.get()) { - logger.info("[{}] Need to send CommitRequest for partition session {} (partition {})," + - " but reading session is already closed", fullId, partitionSessionId, partitionId); + if (logger.isInfoEnabled()) { + StringBuilder message = new StringBuilder("[").append(fullId) + .append("] Need to send CommitRequest for partition session ").append(partitionSessionId) + .append(" (partition ").append(partitionId).append(") with offset ranges "); + for (int i = 0; i < rangesToCommit.size(); i++) { + if (i > 0) { + message.append(", "); + } + OffsetsRange range = rangesToCommit.get(i); + message.append("[").append(range.getStart()).append(",").append(range.getEnd()).append(")"); + } + message.append(", but reading session is already closed"); + logger.info(message.toString()); + } return; } - logger.info("[{}] Sending CommitRequest for partition session {} (partition {}) with offset range [{},{})", - fullId, partitionSessionId, partitionId, offsets.getStart(), offsets.getEnd()); + + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.Builder builder = + YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() + .setPartitionSessionId(partitionSessionId); + rangesToCommit.forEach(range -> { + builder.addOffsets(YdbTopic.OffsetsRange.newBuilder() + .setStart(range.getStart()) + .setEnd(range.getEnd())); + }); send(YdbTopic.StreamReadMessage.FromClient.newBuilder() .setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder() - .addCommitOffsets( - YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder() - .setPartitionSessionId(partitionSessionId) - .addOffsets(YdbTopic.OffsetsRange.newBuilder() - .setStart(offsets.getStart()) - .setEnd(offsets.getEnd())) - )) + .addCommitOffsets(builder)) .build()); + } private void closePartitionSessions() { @@ -293,11 +309,11 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart .setPath(request.getPartitionSession().getPath()) .setPartitionId(partitionId) .setCommittedOffset(request.getCommittedOffset()) - .setPartitionOffsets(new OffsetsRange(request.getPartitionOffsets().getStart(), + .setPartitionOffsets(new OffsetsRangeImpl(request.getPartitionOffsets().getStart(), request.getPartitionOffsets().getEnd())) .setDecompressionExecutor(decompressionExecutor) .setDataEventCallback(ReaderImpl.this::handleDataReceivedEvent) - .setCommitFunction((offsets) -> commitOffset(partitionSessionId, partitionId, offsets)) + .setCommitFunction((offsets) -> sendCommitOffsetRequest(partitionSessionId, partitionId, offsets)) .build(); partitionSessions.put(partitionSession.getId(), partitionSession); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index ea5e6f64c..b65789434 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -2,29 +2,29 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.CommitterImpl; +import tech.ydb.topic.read.impl.PartitionSessionImpl; /** * @author Nikolay Perfilov */ public class DataReceivedEventImpl implements DataReceivedEvent { - private static final Logger logger = LoggerFactory.getLogger(DataReceivedEventImpl.class); private final List messages; - private final PartitionSession partitionSession; - private final Supplier> commitCallback; + private final PartitionSessionImpl partitionSession; + private final OffsetsRange offsetsToCommit; + private final CommitterImpl committer; - public DataReceivedEventImpl(List messages, PartitionSession partitionSession, - Supplier> commitCallback) { + public DataReceivedEventImpl(PartitionSessionImpl partitionSession, List messages, + OffsetsRange offsetsToCommit) { this.messages = messages; this.partitionSession = partitionSession; - this.commitCallback = commitCallback; + this.offsetsToCommit = offsetsToCommit; + this.committer = new CommitterImpl(partitionSession, messages.size(), offsetsToCommit); } @Override @@ -34,17 +34,19 @@ public List getMessages() { @Override public PartitionSession getPartitionSession() { + return partitionSession.getSessionInfo(); + } + + public PartitionSessionImpl getPartitionSessionImpl() { return partitionSession; } @Override public CompletableFuture commit() { - if (logger.isDebugEnabled()) { - logger.debug("[{}] partition session {} (partition {}): committing batch with {} message(s) and offsets" + - " {}-{}", partitionSession.getPath(), partitionSession.getId(), - partitionSession.getPartitionId(), messages.size(), messages.get(0).getOffset(), - messages.get(messages.size() - 1).getOffset()); - } - return commitCallback.get(); + return committer.commitImpl(false); + } + + public OffsetsRange getOffsetsToCommit() { + return offsetsToCommit; } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java index e8735f822..9fa4632e7 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java @@ -2,9 +2,9 @@ import java.util.function.Consumer; +import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.StartPartitionSessionEvent; -import tech.ydb.topic.read.impl.OffsetsRange; import tech.ydb.topic.settings.StartPartitionSessionSettings; /** diff --git a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java new file mode 100644 index 000000000..02a841422 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java @@ -0,0 +1,75 @@ +package tech.ydb.topic.impl; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; +import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.impl.DisjointOffsetRangeSet; +import tech.ydb.topic.read.impl.OffsetsRangeImpl; + +/** + * @author Nikolay Perfilov + */ +public class DisjointOffsetRangeSetTest { + + @Test + public void testRangesSimple() { + DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + ranges.add(new OffsetsRangeImpl(0, 1)); + ranges.add(new OffsetsRangeImpl(1, 2)); + ranges.add(new OffsetsRangeImpl(3, 4)); + List rangesResult = ranges.getRangesAndClear(); + Assert.assertEquals(2, rangesResult.size()); + Assert.assertEquals(0, rangesResult.get(0).getStart()); + Assert.assertEquals(2, rangesResult.get(0).getEnd()); + Assert.assertEquals(3, rangesResult.get(1).getStart()); + Assert.assertEquals(4, rangesResult.get(1).getEnd()); + } + + @Test + public void testReuseRangeSet() { + DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + ranges.add(new OffsetsRangeImpl(30, 40)); + ranges.add(new OffsetsRangeImpl(10, 20)); + ranges.add(new OffsetsRangeImpl(0, 9)); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 11))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 10))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(9, 11))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 31))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(31, 100))); + Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 100))); + ranges.add(new OffsetsRangeImpl(9, 10)); + List firstResult = ranges.getRangesAndClear(); + Assert.assertEquals(2, firstResult.size()); + Assert.assertEquals(0, firstResult.get(0).getStart()); + Assert.assertEquals(20, firstResult.get(0).getEnd()); + Assert.assertEquals(30, firstResult.get(1).getStart()); + Assert.assertEquals(40, firstResult.get(1).getEnd()); + + Assert.assertTrue(ranges.getRangesAndClear().isEmpty()); + + ranges.add(new OffsetsRangeImpl(0, 9)); + ranges.add(new OffsetsRangeImpl(10, 19)); + ranges.add(new OffsetsRangeImpl(20, 30)); + List secondResult = ranges.getRangesAndClear(); + Assert.assertEquals(3, secondResult.size()); + Assert.assertEquals(0, secondResult.get(0).getStart()); + Assert.assertEquals(9, secondResult.get(0).getEnd()); + Assert.assertEquals(10, secondResult.get(1).getStart()); + Assert.assertEquals(19, secondResult.get(1).getEnd()); + Assert.assertEquals(20, secondResult.get(2).getStart()); + Assert.assertEquals(30, secondResult.get(2).getEnd()); + + ranges.add(new OffsetsRangeImpl(39, 40)); + ranges.add(new OffsetsRangeImpl(30, 39)); + ranges.add(new OffsetsRangeImpl(40, 50)); + List thirdResult = ranges.getRangesAndClear(); + Assert.assertEquals(1, thirdResult.size()); + Assert.assertEquals(30, thirdResult.get(0).getStart()); + Assert.assertEquals(50, thirdResult.get(0).getEnd()); + + Assert.assertTrue(ranges.getRangesAndClear().isEmpty()); + } +}