Skip to content

Commit

Permalink
Merge pull request #188 from ydb-platform/release_v2.1.8
Browse files Browse the repository at this point in the history
Release v2.1.8
  • Loading branch information
alex268 authored Nov 2, 2023
2 parents dedeb3b + 64bb53c commit 26402f2
Show file tree
Hide file tree
Showing 40 changed files with 669 additions and 160 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 2.1.8 ##

* Topics: Added DeferredCommitter class to group several read commits into one or just defer each commit without holding data reference
* Topics: Added onCommitResponse callback for AsyncReader to subscribe to server event directly and control commits more efficiently
* Topics: Removed usage of ForkJoinPool.commonPool()
* Table: Fixed typo in SessionPoolStats

## 2.1.7 ##

* Topics: Rethrow IO exceptions to user handlers while decoding messages
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Firstly you can import YDB Java BOM to specify correct versions of SDK modules.
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>tech.ydb</groupId>
<version>2.1.7</version>
<version>2.1.8</version>
<artifactId>ydb-sdk-bom</artifactId>
<name>Java SDK Bill of Materials</name>
<description>Java SDK Bill of Materials (BOM)</description>
Expand Down
2 changes: 1 addition & 1 deletion coordination/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
</parent>

<artifactId>ydb-sdk-coordination</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
</parent>

<artifactId>ydb-sdk-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.7
version=2.1.8
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>

<name>Java SDK for YDB</name>
<description>Java SDK for YDB</description>
Expand Down
2 changes: 1 addition & 1 deletion scheme/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
</parent>

<artifactId>ydb-sdk-scheme</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
</parent>

<artifactId>ydb-sdk-table</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public String toString() {
+ ", idleCount=" + getIdleCount()
+ ", acquiredCount=" + getAcquiredCount()
+ ", pendingAcquireCount=" + getPendingAcquireCount()
+ ", acquiredTotal=" + getPendingAcquireCount()
+ ", acquiredTotal=" + getAcquiredTotal()
+ ", releasedTotal=" + getReleasedTotal()
+ ", requestsTotal=" + getRequestedTotal()
+ ", createdTotal=" + getCreatedTotal()
Expand Down
2 changes: 1 addition & 1 deletion tests/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit4-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit5-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.7</version>
<version>2.1.8</version>
</parent>

<artifactId>ydb-sdk-topic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* @author Nikolay Perfilov
*/
public class DecompressionException extends UncheckedIOException {
private static final long serialVersionUID = 2720187645859527813L;

private final byte[] rawData;
public DecompressionException(String message, IOException cause, byte[] rawData) {
super(message, cause);
Expand Down
44 changes: 44 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tech.ydb.topic.read;

import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.DeferredCommitterImpl;

/**
* A helper class that is used to call deferred commits.
* Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to commit later
* all at once.
* Contains no data references and therefore may also be useful in cases where commit() is called after processing data
* in an external system.
*
* @author Nikolay Perfilov
*/
public interface DeferredCommitter {
/**
* Creates a new instance of {@link DeferredCommitter}
*
* @return a new instance of {@link DeferredCommitter}
*/
static DeferredCommitter newInstance() {
return new DeferredCommitterImpl();
}

/**
* Adds a {@link Message} to commit it later with a commit method
*
* @param message a {@link Message} to commit later
*/
void add(Message message);

/**
* Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method
*
* @param event a {@link DataReceivedEvent} to commit later
*/
void add(DataReceivedEvent event);

/**
* Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s
* that were added to this DeferredCommitter since last commit
*/
void commit();
}
6 changes: 5 additions & 1 deletion topic/src/main/java/tech/ydb/topic/read/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public interface Message {
PartitionSession getPartitionSession();

/**
* Commit this message
* Commits this message
* If there was an error while committing, there is no point of retrying committing the same message:
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
*
* @return CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();
Expand Down
10 changes: 10 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tech.ydb.topic.read;

/**
* @author Nikolay Perfilov
*/
public interface OffsetsRange {
long getStart();

long getEnd();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ public abstract class AbstractReadEventHandler implements ReadEventHandler {

// onMessages(DataReceivedEvent event) method should be defined in user's implementation

@Override
public void onCommitResponse(CommitOffsetAcknowledgementEvent event) {

}

@Override
public void onStartPartitionSession(StartPartitionSessionEvent event) {
event.confirm();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package tech.ydb.topic.read.events;

import tech.ydb.topic.read.PartitionSession;

/**
* @author Nikolay Perfilov
*/
public class CommitOffsetAcknowledgementEvent {
public interface CommitOffsetAcknowledgementEvent {
PartitionSession getPartitionSession();
long getCommittedOffset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,29 @@
*/
public interface DataReceivedEvent {

/**
* Returns a list of messages grouped in one batch.
* Each message can be committed individually or all messages can be committed at once with commit() method
*
* @return a list of messages
*/
List<Message> getMessages();

/**
* Returns a partition session this data was received on
*
* @return a partition session this data was received on
*/
PartitionSession getPartitionSession();

/**
* Commits all messages in this event at once.
* If there was an error while committing, there is no point of retrying committing the same messages:
* the whole PartitionSession should be shut down by that time. And if commit hadn't reached the server,
* it will resend all these messages in next PartitionSession.
*
* @return a CompletableFuture that will be completed when commit confirmation from server will be received
*/
CompletableFuture<Void> commit();

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface ReadEventHandler {

void onMessages(DataReceivedEvent event);

void onCommitResponse(CommitOffsetAcknowledgementEvent event);

void onStartPartitionSession(StartPartitionSessionEvent event);

void onStopPartitionSession(StopPartitionSessionEvent event);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.ydb.topic.read.events;

import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.impl.OffsetsRange;
import tech.ydb.topic.settings.StartPartitionSessionSettings;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.CommitOffsetAcknowledgementEvent;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.events.PartitionSessionClosedEvent;
import tech.ydb.topic.read.events.ReadEventHandler;
import tech.ydb.topic.read.events.ReaderClosedEvent;
import tech.ydb.topic.read.events.StartPartitionSessionEvent;
import tech.ydb.topic.read.events.StopPartitionSessionEvent;
import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl;
import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl;
import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl;
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
Expand Down Expand Up @@ -72,6 +74,15 @@ protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent even
}, handlerExecutor);
}

@Override
protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) {
handlerExecutor.execute(() -> {
CommitOffsetAcknowledgementEvent event = new CommitOffsetAcknowledgementEventImpl(partitionSession,
committedOffset);
eventHandler.onCommitResponse(event);
});
}

@Override
protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request,
PartitionSession partitionSession,
Expand All @@ -81,7 +92,7 @@ protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.Sta
StartPartitionSessionEvent event = new StartPartitionSessionEventImpl(
partitionSession,
request.getCommittedOffset(),
new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()),
new OffsetsRangeImpl(offsetsRange.getStart(), offsetsRange.getEnd()),
confirmCallback
);
eventHandler.onStartPartitionSession(event);
Expand Down
39 changes: 39 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package tech.ydb.topic.read.impl;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.read.OffsetsRange;

/**
* @author Nikolay Perfilov
*/
public class CommitterImpl {
private static final Logger logger = LoggerFactory.getLogger(CommitterImpl.class);
private final PartitionSessionImpl partitionSession;
private final int messageCount;
private final OffsetsRange offsetsToCommit;

public CommitterImpl(PartitionSessionImpl partitionSession, int messageCount, OffsetsRange offsetsToCommit) {
this.partitionSession = partitionSession;
this.messageCount = messageCount;
this.offsetsToCommit = offsetsToCommit;
}


public CompletableFuture<Void> commit() {
return commitImpl(true);
}

public CompletableFuture<Void> commitImpl(boolean fromCommitter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" +
" [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(),
partitionSession.getId(), partitionSession.getPartitionId(), messageCount,
offsetsToCommit.getStart(), offsetsToCommit.getEnd());
}
return partitionSession.commitOffsetRange(offsetsToCommit);
}
}
Loading

0 comments on commit 26402f2

Please sign in to comment.