Skip to content

Commit

Permalink
Merge pull request #181 from ydb-platform/release_v2.1.7
Browse files Browse the repository at this point in the history
Release v2.1.7
  • Loading branch information
alex268 authored Sep 22, 2023
2 parents 673991f + f3d4be0 commit dedeb3b
Show file tree
Hide file tree
Showing 29 changed files with 619 additions and 526 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.1.7 ##

* Topics: Rethrow IO exceptions to user handlers while decoding messages
* Topics: Tie partition sessions to streams

## 2.1.6 ##

* Topics: Fixed a bug where topic writer / reader init() future was not completed under certain conditions
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Firstly you can import YDB Java BOM to specify correct versions of SDK modules.
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.1.5</version>
<version>2.1.7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>tech.ydb</groupId>
<version>2.1.6</version>
<version>2.1.7</version>
<artifactId>ydb-sdk-bom</artifactId>
<name>Java SDK Bill of Materials</name>
<description>Java SDK Bill of Materials (BOM)</description>
Expand Down
2 changes: 1 addition & 1 deletion coordination/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
</parent>

<artifactId>ydb-sdk-coordination</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
</parent>

<artifactId>ydb-sdk-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.6
version=2.1.7
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>

<name>Java SDK for YDB</name>
<description>Java SDK for YDB</description>
Expand Down
2 changes: 1 addition & 1 deletion scheme/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
</parent>

<artifactId>ydb-sdk-scheme</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
</parent>

<artifactId>ydb-sdk-table</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion tests/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit4-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit5-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.6</version>
<version>2.1.7</version>
</parent>

<artifactId>ydb-sdk-topic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
protected abstract Logger getLogger();
protected abstract String getStreamName();
protected abstract void onStreamReconnect();
protected abstract void onStreamFinished();
protected abstract void onShutdown(String reason);

private void tryScheduleReconnect() {
Expand Down Expand Up @@ -98,9 +97,8 @@ protected CompletableFuture<Void> shutdownImpl(String reason) {
});
}

protected void completeSession(Status status, Throwable th) {
getLogger().info("[{}] CompleteSession called", id);
onStreamFinished();
protected void onSessionClosed(Status status, Throwable th) {
getLogger().info("[{}] onSessionClosed called", id);

if (th != null) {
getLogger().error("[{}] Exception in {} stream session: ", id, getStreamName(), th);
Expand Down
39 changes: 0 additions & 39 deletions topic/src/main/java/tech/ydb/topic/impl/ReaderWriterBaseImpl.java

This file was deleted.

2 changes: 1 addition & 1 deletion topic/src/main/java/tech/ydb/topic/impl/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
* @author Nikolay Perfilov
*/
public interface Session {
boolean stop();
void startAndInitialize();
void shutdown();
}
16 changes: 9 additions & 7 deletions topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public abstract class SessionBase<R, W> implements Session {

protected final GrpcReadWriteStream<R, W> streamConnection;
private final AtomicBoolean isWorking = new AtomicBoolean(true);
protected final AtomicBoolean isWorking = new AtomicBoolean(true);
private String token;

public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {
Expand All @@ -30,7 +30,9 @@ public SessionBase(GrpcReadWriteStream<R, W> streamConnection) {

protected abstract void sendUpdateTokenRequest(String token);

public synchronized CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
protected abstract void onStop();

protected synchronized CompletableFuture<Status> start(GrpcReadStream.Observer<R> streamObserver) {
getLogger().info("Session start");
return streamConnection.start(message -> {
if (getLogger().isTraceEnabled()) {
Expand Down Expand Up @@ -67,18 +69,18 @@ public synchronized void send(W request) {
streamConnection.sendNext(request);
}

@Override
public boolean stop() {
private boolean stop() {
getLogger().info("Session stop");
return isWorking.compareAndSet(true, false);
}


@Override
public synchronized void shutdown() {
getLogger().info("Session shutdown");
if (!stop()) {
return;
if (stop()) {
onStop();
streamConnection.close();
}
streamConnection.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tech.ydb.topic.read;

import java.io.IOException;
import java.io.UncheckedIOException;

/**
* @author Nikolay Perfilov
*/
public class DecompressionException extends UncheckedIOException {
private final byte[] rawData;
public DecompressionException(String message, IOException cause, byte[] rawData) {
super(message, cause);
this.rawData = rawData;
}

/**
* @return Raw message byte data that failed be decompressed
*/
public byte[] getRawData() {
return rawData;
}
}
3 changes: 2 additions & 1 deletion topic/src/main/java/tech/ydb/topic/read/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
@ExperimentalApi("Topic service interfaces are experimental and may change without notice")
public interface Message {
/**
* @return Message byte data
* @return Message byte data.
* @throws DecompressionException in case of decompression error. Raw data can be retrieved this exception
*/
byte[] getData();

Expand Down
24 changes: 10 additions & 14 deletions topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import javax.annotation.Nullable;

Expand All @@ -25,6 +26,7 @@
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;

/**
* @author Nikolay Perfilov
Expand Down Expand Up @@ -71,34 +73,28 @@ protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent even
}

@Override
protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request) {
protected void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest request,
PartitionSession partitionSession,
Consumer<StartPartitionSessionSettings> confirmCallback) {
handlerExecutor.execute(() -> {
YdbTopic.StreamReadMessage.PartitionSession partitionSession = request.getPartitionSession();
YdbTopic.OffsetsRange offsetsRange = request.getPartitionOffsets();
StartPartitionSessionEvent event = new StartPartitionSessionEventImpl(
new PartitionSession(
partitionSession.getPartitionSessionId(),
partitionSession.getPartitionId(),
partitionSession.getPath()),
partitionSession,
request.getCommittedOffset(),
new OffsetsRange(offsetsRange.getStart(), offsetsRange.getEnd()),
(startSettings) -> sendStartPartitionSessionResponse(request, startSettings)
confirmCallback
);
eventHandler.onStartPartitionSession(event);
});
}

@Override
protected void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest request,
@Nullable Long partitionId) {
@Nullable Long partitionId, Runnable confirmCallback) {
final long partitionSessionId = request.getPartitionSessionId();
final long committedOffset = request.getCommittedOffset();
final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(
partitionSessionId,
partitionId,
committedOffset,
() -> sendStopPartitionSessionResponse(partitionSessionId)
);
final StopPartitionSessionEvent event = new StopPartitionSessionEventImpl(partitionSessionId, partitionId,
committedOffset, confirmCallback);
handlerExecutor.execute(() -> {
eventHandler.onStopPartitionSession(event);
});
Expand Down
11 changes: 11 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.read.impl;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -8,6 +9,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionSession;

Expand All @@ -26,6 +28,7 @@ public class MessageImpl implements Message {
private final PartitionSession partitionSession;
private final Function<OffsetsRange, CompletableFuture<Void>> commitFunction;
private boolean isDecompressed = false;
private IOException exception = null;

private MessageImpl(Builder builder) {
this.data = builder.data;
Expand All @@ -41,13 +44,21 @@ private MessageImpl(Builder builder) {

@Override
public byte[] getData() {
if (exception != null) {
throw new DecompressionException("Error occurred while decoding a message",
exception, data);
}
return data;
}

public void setData(byte[] data) {
this.data = data;
}

public void setException(IOException exception) {
this.exception = exception;
}

@Override
public long getOffset() {
return offset;
Expand Down
Loading

0 comments on commit dedeb3b

Please sign in to comment.