diff --git a/bom/pom.xml b/bom/pom.xml
index e8ddac354..702d14540 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -55,6 +55,11 @@
${ydb-auth-api.version}
+
+ tech.ydb
+ ydb-sdk-common
+ ${project.version}
+
tech.ydb
ydb-sdk-core
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 000000000..99b593a62
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+
+ tech.ydb
+ ydb-sdk-parent
+ 2.2.0-SNAPSHOT
+
+
+ ydb-sdk-common
+ Common module of Java SDK for YDB
+ Common module of Java SDK for YDB
+
+
+ UTF-8
+
+
+
+
+ tech.ydb
+ ydb-sdk-core
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+
diff --git a/query/src/main/java/tech/ydb/query/QueryTx.java b/common/src/main/java/tech/ydb/common/transaction/TxMode.java
similarity index 72%
rename from query/src/main/java/tech/ydb/query/QueryTx.java
rename to common/src/main/java/tech/ydb/common/transaction/TxMode.java
index 6c9399a94..5e6307a29 100644
--- a/query/src/main/java/tech/ydb/query/QueryTx.java
+++ b/common/src/main/java/tech/ydb/common/transaction/TxMode.java
@@ -1,10 +1,10 @@
-package tech.ydb.query;
+package tech.ydb.common.transaction;
/**
*
* @author Aleksandr Gorshenin
*/
-public enum QueryTx {
+public enum TxMode {
NONE,
SERIALIZABLE_RW,
diff --git a/common/src/main/java/tech/ydb/common/transaction/YdbTransaction.java b/common/src/main/java/tech/ydb/common/transaction/YdbTransaction.java
new file mode 100644
index 000000000..56757c744
--- /dev/null
+++ b/common/src/main/java/tech/ydb/common/transaction/YdbTransaction.java
@@ -0,0 +1,37 @@
+package tech.ydb.common.transaction;
+
+import java.util.concurrent.CompletableFuture;
+
+import tech.ydb.core.Status;
+
+/**
+ * A base interface for all YDB transactions from different services
+ * @author Nikolay Perfilov
+ */
+public interface YdbTransaction {
+
+ /**
+ * Returns identifier of the transaction or null if the transaction is not active = (not
+ * started/committed/rolled back). When {@link YdbTransaction} is not active - any query on this object
+ * starts a new transaction on server. When transaction is active any call of {@code commit},
+ * {@code rollback} or execution of any query with {@code commitAtEnd}=true finishes this transaction
+ *
+ * @return identifier of the transaction or null if the transaction is not active
+ */
+ String getId();
+
+ /**
+ * Returns {@link TxMode} with mode of the transaction
+ *
+ * @return the transaction mode
+ */
+ TxMode getTxMode();
+
+ default boolean isActive() {
+ return getId() != null;
+ }
+
+ String getSessionId();
+
+ CompletableFuture getStatusFuture();
+}
diff --git a/common/src/main/java/tech/ydb/common/transaction/impl/YdbTransactionImpl.java b/common/src/main/java/tech/ydb/common/transaction/impl/YdbTransactionImpl.java
new file mode 100644
index 000000000..5af6f3ffe
--- /dev/null
+++ b/common/src/main/java/tech/ydb/common/transaction/impl/YdbTransactionImpl.java
@@ -0,0 +1,38 @@
+package tech.ydb.common.transaction.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import tech.ydb.common.transaction.TxMode;
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.core.Status;
+
+/**
+ * @author Nikolay Perfilov
+ */
+public abstract class YdbTransactionImpl implements YdbTransaction {
+ protected final TxMode txMode;
+ protected final AtomicReference txId;
+ protected final AtomicReference> statusFuture = new AtomicReference<>(
+ new CompletableFuture<>());
+
+ protected YdbTransactionImpl(TxMode txMode, String txId) {
+ this.txMode = txMode;
+ this.txId = new AtomicReference<>(txId);
+ }
+
+ @Override
+ public String getId() {
+ return txId.get();
+ }
+
+ @Override
+ public TxMode getTxMode() {
+ return txMode;
+ }
+
+ @Override
+ public CompletableFuture getStatusFuture() {
+ return statusFuture.get();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 65de259c3..c9a1255e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,6 +16,7 @@
bom
+ common
core
table
scheme
diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java
index dbbd4876b..69d07cd41 100644
--- a/query/src/main/java/tech/ydb/query/QuerySession.java
+++ b/query/src/main/java/tech/ydb/query/QuerySession.java
@@ -2,6 +2,7 @@
import java.util.concurrent.CompletableFuture;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.query.settings.BeginTransactionSettings;
import tech.ydb.query.settings.ExecuteQuerySettings;
@@ -40,7 +41,7 @@ public interface QuerySession extends AutoCloseable {
* @param txMode transaction mode
* @return new implicit transaction
*/
- QueryTransaction createNewTransaction(QueryTx txMode);
+ QueryTransaction createNewTransaction(TxMode txMode);
/**
* Create and start a new active {@link QueryTransaction}. This method creates a transaction on the server
@@ -50,10 +51,10 @@ public interface QuerySession extends AutoCloseable {
* @param settings additional settings for request
* @return future with result of the transaction starting
*/
- CompletableFuture> beginTransaction(QueryTx txMode, BeginTransactionSettings settings);
+ CompletableFuture> beginTransaction(TxMode txMode, BeginTransactionSettings settings);
/**
- * Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and
+ * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and
* DCL statements. Supported mix of different statement types depends on the chosen transaction type.
*
* @param query text of query
@@ -62,13 +63,13 @@ public interface QuerySession extends AutoCloseable {
* @param settings additional settings of query execution
* @return a ready to execute instance of {@link QueryStream}
*/
- QueryStream createQuery(String query, QueryTx tx, Params params, ExecuteQuerySettings settings);
+ QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings);
@Override
void close();
/**
- * Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and
+ * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and
* DCL statements. Supported mix of different statement types depends on the chosen transaction type.
*
* @param query text of query
@@ -76,19 +77,19 @@ public interface QuerySession extends AutoCloseable {
* @param params query parameters
* @return a ready to execute instance of {@link QueryStream}
*/
- default QueryStream createQuery(String query, QueryTx tx, Params params) {
+ default QueryStream createQuery(String query, TxMode tx, Params params) {
return createQuery(query, tx, params, ExecuteQuerySettings.newBuilder().build());
}
/**
- * Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and
+ * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and
* DCL statements. Supported mix of different statement types depends on the chosen transaction type.
*
* @param query text of query
* @param tx transaction mode
* @return a ready to execute instance of {@link QueryStream}
*/
- default QueryStream createQuery(String query, QueryTx tx) {
+ default QueryStream createQuery(String query, TxMode tx) {
return createQuery(query, tx, Params.empty(), ExecuteQuerySettings.newBuilder().build());
}
@@ -99,7 +100,7 @@ default QueryStream createQuery(String query, QueryTx tx) {
* @param txMode transaction mode
* @return future with result of the transaction starting
*/
- default CompletableFuture> beginTransaction(QueryTx txMode) {
+ default CompletableFuture> beginTransaction(TxMode txMode) {
return beginTransaction(txMode, BeginTransactionSettings.newBuilder().build());
}
}
diff --git a/query/src/main/java/tech/ydb/query/QueryTransaction.java b/query/src/main/java/tech/ydb/query/QueryTransaction.java
index bf262bf27..4780c62df 100644
--- a/query/src/main/java/tech/ydb/query/QueryTransaction.java
+++ b/query/src/main/java/tech/ydb/query/QueryTransaction.java
@@ -2,6 +2,7 @@
import java.util.concurrent.CompletableFuture;
+import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.query.result.QueryInfo;
@@ -11,6 +12,7 @@
import tech.ydb.table.query.Params;
/**
+ * Interface of transaction from query service
* Short-living object allows transactional execution of several queries in one interactive transaction.
* QueryTransaction can be used in implicit mode - without calling commit()/rollback(). When QueryTransaction is not
* active - any execution of query with commitAtEnd=false starts a new transaction. And execution of query with
@@ -18,29 +20,7 @@
*
* @author Aleksandr Gorshenin
*/
-public interface QueryTransaction {
-
- /**
- * Returns identifier of the transaction or null if the transaction is not active = (not
- * started/committed/rolled back). When {@link QueryTransaction} is not active - any execution of the query created
- * by {@code createQuery} starts a new transaction. When QueryTransaction is active - any call of {@code commit},
- * {@code rollback} or execution of the query created by {@code createQuery} with {@code commitAtEnd}=true finishes
- * the transaction
- *
- * @return identifier of the transaction or null if the transaction is not active
- */
- String getId();
-
- /**
- * Returns {@link QueryTx} with mode of the transaction
- *
- * @return the transaction mode
- */
- QueryTx getQueryTx();
-
- default boolean isActive() {
- return getId() != null;
- }
+public interface QueryTransaction extends YdbTransaction {
/**
* Returns {@link QuerySession} that was used for creating the transaction
diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java
index 22c70f66b..05a1d01b6 100644
--- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java
+++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java
@@ -12,6 +12,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.common.transaction.TxMode;
+import tech.ydb.common.transaction.impl.YdbTransactionImpl;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
@@ -26,7 +28,6 @@
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
import tech.ydb.query.QueryTransaction;
-import tech.ydb.query.QueryTx;
import tech.ydb.query.result.QueryInfo;
import tech.ydb.query.result.QueryResultPart;
import tech.ydb.query.result.QueryStats;
@@ -67,7 +68,7 @@ abstract class SessionImpl implements QuerySession {
this.sessionId = response.getSessionId();
this.nodeID = getNodeBySessionId(response.getSessionId(), response.getNodeId());
this.isTraceEnabled = logger.isTraceEnabled();
- this.transaction = new AtomicReference<>(new TransactionImpl(QueryTx.SERIALIZABLE_RW, null));
+ this.transaction = new AtomicReference<>(new TransactionImpl(TxMode.SERIALIZABLE_RW, null));
}
private static Long getNodeBySessionId(String sessionId, long defaultValue) {
@@ -99,14 +100,14 @@ public QueryTransaction currentTransaction() {
}
@Override
- public QueryTransaction createNewTransaction(QueryTx txMode) {
+ public QueryTransaction createNewTransaction(TxMode txMode) {
return updateTransaction(new TransactionImpl(txMode, null));
}
public abstract void updateSessionState(Status status);
@Override
- public CompletableFuture> beginTransaction(QueryTx tx, BeginTransactionSettings settings) {
+ public CompletableFuture> beginTransaction(TxMode tx, BeginTransactionSettings settings) {
YdbQuery.BeginTransactionRequest request = YdbQuery.BeginTransactionRequest.newBuilder()
.setSessionId(sessionId)
.setTxSettings(TxControl.txSettings(tx))
@@ -201,7 +202,7 @@ GrpcReadStream createGrpcStream(
}
@Override
- public QueryStream createQuery(String query, QueryTx tx, Params prms, ExecuteQuerySettings settings) {
+ public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) {
YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true);
return new StreamImpl(createGrpcStream(query, tc, prms, settings)) {
@Override
@@ -251,6 +252,8 @@ abstract class StreamImpl implements QueryStream {
}
abstract void handleTxMeta(YdbQuery.TransactionMeta meta);
+ void handleCompletion(Status status, Throwable th) {
+ }
@Override
public CompletableFuture> execute(PartsHandler handler) {
@@ -292,6 +295,7 @@ public CompletableFuture> execute(PartsHandler handler) {
}
}
}).whenComplete((status, th) -> {
+ handleCompletion(status, th);
if (th != null) {
result.completeExceptionally(th);
}
@@ -312,23 +316,15 @@ public void cancel() {
}
}
- class TransactionImpl implements QueryTransaction {
- private final QueryTx txMode;
- private final AtomicReference txId;
-
- TransactionImpl(QueryTx tx, String txID) {
- this.txMode = tx;
- this.txId = new AtomicReference<>(txID);
- }
+ class TransactionImpl extends YdbTransactionImpl implements QueryTransaction {
- @Override
- public String getId() {
- return txId.get();
+ TransactionImpl(TxMode txMode, String txId) {
+ super(txMode, txId);
}
@Override
- public QueryTx getQueryTx() {
- return txMode;
+ public String getSessionId() {
+ return sessionId;
}
@Override
@@ -338,6 +334,10 @@ public QuerySession getSession() {
@Override
public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, ExecuteQuerySettings settings) {
+ // If we intend to commit, statusFuture is reset to reflect only future actions in transaction
+ CompletableFuture currentStatusFuture = commitAtEnd
+ ? statusFuture.getAndSet(new CompletableFuture<>())
+ : statusFuture.get();
final String currentId = txId.get();
YdbQuery.TransactionControl tc = currentId != null
? TxControl.txIdCtrl(currentId, commitAtEnd)
@@ -351,13 +351,31 @@ void handleTxMeta(YdbQuery.TransactionMeta meta) {
logger.warn("{} lost transaction meta id {}", SessionImpl.this, newId);
}
}
+ @Override
+ void handleCompletion(Status status, Throwable th) {
+ if (th != null) {
+ currentStatusFuture.completeExceptionally(
+ new RuntimeException("Query on transaction failed with exception ", th));
+ }
+ if (status.isSuccess()) {
+ if (commitAtEnd) {
+ currentStatusFuture.complete(Status.SUCCESS);
+ }
+ } else {
+ currentStatusFuture.complete(Status
+ .of(StatusCode.ABORTED)
+ .withIssues(Issue.of("Query on transaction failed with status "
+ + status, Issue.Severity.ERROR)));
+ }
+ }
};
}
@Override
public CompletableFuture> commit(CommitTransactionSettings settings) {
- final String trasactionId = txId.get();
- if (trasactionId == null) {
+ CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>());
+ final String transactionId = txId.get();
+ if (transactionId == null) {
Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING);
Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, null, issue));
return CompletableFuture.completedFuture(res);
@@ -365,40 +383,54 @@ public CompletableFuture> commit(CommitTransactionSettings set
YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder()
.setSessionId(sessionId)
- .setTxId(trasactionId)
+ .setTxId(transactionId)
.build();
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
- return rpc.commitTransaction(request, grpcSettings).thenApply(res -> {
- updateSessionState(res.getStatus());
- if (!txId.compareAndSet(trasactionId, null)) {
- logger.warn("{} lost commit response for transaction {}", SessionImpl.this, trasactionId);
- }
- // TODO: CommitTrasactionResponse must contain exec_stats
- return res.map(resp -> new QueryInfo(null));
- });
+ return rpc.commitTransaction(request, grpcSettings)
+ .thenApply(res -> {
+ Status status = res.getStatus();
+ currentStatusFuture.complete(status);
+ updateSessionState(status);
+ if (!txId.compareAndSet(transactionId, null)) {
+ logger.warn("{} lost commit response for transaction {}", SessionImpl.this, transactionId);
+ }
+ // TODO: CommitTransactionResponse must contain exec_stats
+ return res.map(resp -> new QueryInfo(null));
+ }).whenComplete(((status, th) -> {
+ if (th != null) {
+ currentStatusFuture.completeExceptionally(
+ new RuntimeException("Transaction commit failed with exception", th));
+ }
+ }));
}
@Override
public CompletableFuture rollback(RollbackTransactionSettings settings) {
- final String trasactionId = txId.get();
+ CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>());
+ final String transactionId = txId.get();
- if (trasactionId == null) {
+ if (transactionId == null) {
Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING);
return CompletableFuture.completedFuture(Status.of(StatusCode.SUCCESS, null, issue));
}
YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder()
.setSessionId(sessionId)
- .setTxId(trasactionId)
+ .setTxId(transactionId)
.build();
GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings);
- return rpc.rollbackTransaction(request, grpcSettings).thenApply(result -> {
- updateSessionState(result.getStatus());
- if (!txId.compareAndSet(trasactionId, null)) {
- logger.warn("{} lost rollback response for transaction {}", SessionImpl.this, trasactionId);
- }
- return result.getStatus();
- });
+ return rpc.rollbackTransaction(request, grpcSettings)
+ .thenApply(result -> {
+ updateSessionState(result.getStatus());
+ if (!txId.compareAndSet(transactionId, null)) {
+ logger.warn("{} lost rollback response for transaction {}", SessionImpl.this,
+ transactionId);
+ }
+ return result.getStatus();
+ })
+ .whenComplete((status, th) -> currentStatusFuture.complete(Status
+ .of(StatusCode.ABORTED)
+ .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))));
}
}
}
diff --git a/query/src/main/java/tech/ydb/query/impl/TxControl.java b/query/src/main/java/tech/ydb/query/impl/TxControl.java
index bedde690d..e0174958f 100644
--- a/query/src/main/java/tech/ydb/query/impl/TxControl.java
+++ b/query/src/main/java/tech/ydb/query/impl/TxControl.java
@@ -1,7 +1,7 @@
package tech.ydb.query.impl;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.proto.query.YdbQuery;
-import tech.ydb.query.QueryTx;
/**
*
@@ -31,7 +31,7 @@ class TxControl {
private TxControl() { }
- public static YdbQuery.TransactionControl txModeCtrl(QueryTx tx, boolean commitTx) {
+ public static YdbQuery.TransactionControl txModeCtrl(TxMode tx, boolean commitTx) {
YdbQuery.TransactionSettings ts = txSettings(tx);
if (ts == null) {
return null;
@@ -49,7 +49,7 @@ public static YdbQuery.TransactionControl txIdCtrl(String txId, boolean commitTx
.build();
}
- public static YdbQuery.TransactionSettings txSettings(QueryTx tx) {
+ public static YdbQuery.TransactionSettings txSettings(TxMode tx) {
switch (tx) {
case SERIALIZABLE_RW:
return TS_SERIALIZABLE;
diff --git a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java
index 7c7e43894..94921659b 100644
--- a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java
+++ b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java
@@ -21,7 +21,7 @@
import tech.ydb.query.QueryClient;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryTransaction;
-import tech.ydb.query.QueryTx;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.query.result.QueryInfo;
import tech.ydb.query.result.QueryResultPart;
import tech.ydb.query.settings.ExecuteQuerySettings;
@@ -123,7 +123,7 @@ public static void dropAll() {
public void testSimpleSelect() {
try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
QueryReader reader = QueryReader.readFrom(
- session.createQuery("SELECT 2 + 3;", QueryTx.SERIALIZABLE_RW)
+ session.createQuery("SELECT 2 + 3;", TxMode.SERIALIZABLE_RW)
).join().getValue();
@@ -152,7 +152,7 @@ public void testSimplePrepare() {
.build();
QueryReader reader = QueryReader.readFrom(
- session.createQuery(query, QueryTx.NONE, Params.empty(), settings)
+ session.createQuery(query, TxMode.NONE, Params.empty(), settings)
).join().getValue();
@@ -210,7 +210,7 @@ public void testSimpleCRUD() {
);
try (QuerySession session = queryClient.createSession(SESSION_TIMEOUT).join().getValue()) {
- session.createQuery(query, QueryTx.SERIALIZABLE_RW, params)
+ session.createQuery(query, TxMode.SERIALIZABLE_RW, params)
.execute(this::printQuerySetPart)
.join().getStatus().expectSuccess();
}
@@ -218,13 +218,13 @@ public void testSimpleCRUD() {
try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) {
String query = "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;";
- session.createQuery(query, QueryTx.SERIALIZABLE_RW)
+ session.createQuery(query, TxMode.SERIALIZABLE_RW)
.execute(this::printQuerySetPart)
.join().getStatus().expectSuccess();
}
try (QuerySession session = queryClient.createSession(SESSION_TIMEOUT).join().getValue()) {
- session.createQuery("DELETE FROM " + TEST_TABLE, QueryTx.SERIALIZABLE_RW)
+ session.createQuery("DELETE FROM " + TEST_TABLE, TxMode.SERIALIZABLE_RW)
.execute(this::printQuerySetPart)
.join().getStatus().expectSuccess();
}
@@ -242,14 +242,17 @@ public void printQuerySetPart(QueryResultPart part) {
public void updateMultipleTablesInOneTransaction() {
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
- QueryTransaction tx = session.createNewTransaction(QueryTx.SERIALIZABLE_RW);
+ QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
+ Assert.assertFalse(tx.isActive());
QueryReader.readFrom(
tx.createQuery("UPDATE " + TEST_TABLE + " SET name='test' WHERE id=1")
).join().getStatus().expectSuccess();
+ Assert.assertTrue(tx.isActive());
QueryReader.readFrom(
tx.createQueryWithCommit("UPDATE " + TEST_DOUBLE_TABLE + " SET amount=300 WHERE id=1")
).join().getStatus().expectSuccess();
+ Assert.assertFalse(tx.isActive());
}
}
}
@@ -258,15 +261,19 @@ public void updateMultipleTablesInOneTransaction() {
public void interactiveTransaction() {
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
- QueryTransaction tx = session.createNewTransaction(QueryTx.SERIALIZABLE_RW);
+ QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
+ Assert.assertFalse(tx.isActive());
tx.createQuery("INSERT INTO " + TEST_TABLE + " (id, name) VALUES (1, 'rec1');").execute(null)
.join().getStatus().expectSuccess();
+ Assert.assertTrue(tx.isActive());
tx.createQuery("INSERT INTO " + TEST_TABLE + " (id, name) VALUES (3, 'rec3');").execute(null)
.join().getStatus().expectSuccess();
+ Assert.assertTrue(tx.isActive());
Iterator rsIter = QueryReader.readFrom(
tx.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id")
).join().getValue().iterator();
+ Assert.assertTrue(tx.isActive());
Assert.assertTrue(rsIter.hasNext());
ResultSetReader rs = rsIter.next();
@@ -278,6 +285,7 @@ public void interactiveTransaction() {
Assert.assertFalse(rsIter.hasNext());
tx.commit().join().getStatus().expectSuccess();
+ Assert.assertFalse(tx.isActive());
tx.createQuery("INSERT INTO " + TEST_TABLE + " (id, name) VALUES (2, 'rec2');").execute(null)
.join().getStatus().expectSuccess();
@@ -326,13 +334,13 @@ public void testSchemeQuery() {
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
CompletableFuture> createTable = session
- .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", QueryTx.NONE)
+ .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", TxMode.NONE)
.execute(this::printQuerySetPart);
createTable.join().getStatus().expectSuccess();
CompletableFuture> dropTable = session
- .createQuery("DROP TABLE demo_table;", QueryTx.NONE)
+ .createQuery("DROP TABLE demo_table;", TxMode.NONE)
.execute(this::printQuerySetPart);
dropTable.join().getStatus().expectSuccess();
}
@@ -344,13 +352,13 @@ public void testQueryStats() {
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
CompletableFuture> createTable = session
- .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", QueryTx.NONE)
+ .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", TxMode.NONE)
.execute(this::printQuerySetPart);
createTable.join().getStatus().expectSuccess();
CompletableFuture> dropTable = session
- .createQuery("DROP TABLE demo_table;", QueryTx.NONE)
+ .createQuery("DROP TABLE demo_table;", TxMode.NONE)
.execute(this::printQuerySetPart);
dropTable.join().getStatus().expectSuccess();
}
diff --git a/table/pom.xml b/table/pom.xml
index c54fca140..da51d9c98 100644
--- a/table/pom.xml
+++ b/table/pom.xml
@@ -24,6 +24,10 @@
tech.ydb
ydb-sdk-core
+
+ tech.ydb
+ ydb-sdk-common
+
tech.ydb.test
diff --git a/table/src/main/java/tech/ydb/table/Session.java b/table/src/main/java/tech/ydb/table/Session.java
index 28b24b568..c1e48dc57 100644
--- a/table/src/main/java/tech/ydb/table/Session.java
+++ b/table/src/main/java/tech/ydb/table/Session.java
@@ -3,6 +3,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
@@ -34,6 +35,7 @@
import tech.ydb.table.settings.ReadTableSettings;
import tech.ydb.table.settings.RenameTablesSettings;
import tech.ydb.table.settings.RollbackTxSettings;
+import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListValue;
@@ -41,6 +43,7 @@
/**
* @author Sergey Polovko
+ * @author Nikolay Perfilov
*/
public interface Session extends AutoCloseable {
enum State {
@@ -83,10 +86,54 @@ CompletableFuture> executeDataQuery(
CompletableFuture> explainDataQuery(String query, ExplainDataQuerySettings settings);
+ /**
+ * @deprecated
+ * Use {@link Session#beginTransaction(TxMode, BeginTxSettings)} instead
+ */
+ @Deprecated
CompletableFuture> beginTransaction(Transaction.Mode transactionMode, BeginTxSettings settings);
+ /**
+ * Create a new not active {@link TableTransaction}. This TableDescription will have no identifier and
+ * starts a transaction on server by execution a query
+ * @param txMode transaction mode
+ * @return new implicit transaction
+ */
+ TableTransaction createNewTransaction(TxMode txMode);
+
+ /**
+ * Create and start a new active {@link TableTransaction}. This method creates a transaction on the server
+ * and returns TableDescription which is ready to execute queries on this server transaction
+ *
+ * @param txMode transaction mode
+ * @param settings additional settings for request
+ * @return future with result of the transaction starting
+ */
+ CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings);
+
+ /**
+ * Create and start a new active {@link TableTransaction}. This method creates a transaction on the server
+ * and returns TableDescription which is ready to execute queries on this server transaction
+ *
+ * @param txMode transaction mode
+ * @return future with result of the transaction starting
+ */
+ default CompletableFuture> beginTransaction(TxMode txMode) {
+ return beginTransaction(txMode, new BeginTxSettings());
+ }
+
+ /**
+ * @deprecated
+ * Use {@link TableTransaction#commit()} ()} instead
+ */
+ @Deprecated
CompletableFuture commitTransaction(String txId, CommitTxSettings settings);
+ /**
+ * @deprecated
+ * Use {@link TableTransaction#rollback()} instead
+ */
+ @Deprecated
CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings);
GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings);
@@ -165,6 +212,11 @@ default CompletableFuture> explainDataQuery(Strin
return explainDataQuery(query, new ExplainDataQuerySettings());
}
+ /**
+ * @deprecated
+ * Use {@link Session#beginTransaction(TxMode)} instead
+ */
+ @Deprecated
default CompletableFuture> beginTransaction(Transaction.Mode transactionMode) {
return beginTransaction(transactionMode, new BeginTxSettings());
}
diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java
index 5fad1ca02..1c53ebd89 100644
--- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java
+++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java
@@ -19,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.common.transaction.TxMode;
+import tech.ydb.common.transaction.impl.YdbTransactionImpl;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
@@ -79,6 +81,7 @@
import tech.ydb.table.settings.RollbackTxSettings;
import tech.ydb.table.settings.StoragePolicy;
import tech.ydb.table.settings.TtlSettings;
+import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListType;
@@ -94,6 +97,7 @@
/**
* @author Sergey Polovko
* @author Alexandr Gorshenin
+ * @author Nikolay Perfilov
*/
@ThreadSafe
public abstract class BaseSession implements Session {
@@ -661,13 +665,12 @@ private static YdbTable.TransactionSettings txSettings(Transaction.Mode transact
return settings.build();
}
- @Override
- public CompletableFuture> executeDataQuery(
- String query, TxControl> txControl, Params params, ExecuteDataQuerySettings settings) {
+ private CompletableFuture> executeDataQueryInternal(
+ String query, YdbTable.TransactionControl txControl, Params params, ExecuteDataQuerySettings settings) {
YdbTable.ExecuteDataQueryRequest.Builder request = YdbTable.ExecuteDataQueryRequest.newBuilder()
.setSessionId(id)
.setOperationParams(OperationUtils.createParams(settings.toOperationSettings()))
- .setTxControl(txControl.toPb())
+ .setTxControl(txControl)
.setQuery(YdbTable.Query.newBuilder().setYqlText(query))
.setCollectStats(settings.collectStats().toPb())
.putAllParameters(params.toPb());
@@ -703,6 +706,12 @@ public CompletableFuture> executeDataQuery(
.thenApply(result -> result.map(DataQueryResult::new));
}
+ @Override
+ public CompletableFuture> executeDataQuery(
+ String query, TxControl> txControl, Params params, ExecuteDataQuerySettings settings) {
+ return executeDataQueryInternal(query, txControl.toPb(), params, settings);
+ }
+
@Override
public CompletableFuture> readRows(String pathToTable, ReadRowsSettings settings) {
YdbTable.ReadRowsRequest.Builder requestBuilder = YdbTable.ReadRowsRequest.newBuilder()
@@ -821,7 +830,26 @@ public CompletableFuture> beginTransaction(Transaction.Mode
final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration());
return interceptResultWithLog("begin transaction",
tableRpc.beginTransaction(request, grpcRequestSettings))
- .thenApply(result -> result.map(tx -> new TransactionImpl(this, tx.getTxMeta().getId())));
+ .thenApply(result -> result.map(tx -> new DeprecatedTransactionImpl(tx.getTxMeta().getId())));
+ }
+
+ @Override
+ public TableTransaction createNewTransaction(TxMode txMode) {
+ return new TableTransactionImpl(txMode, null);
+ }
+
+ @Override
+ public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) {
+ YdbTable.BeginTransactionRequest request = YdbTable.BeginTransactionRequest.newBuilder()
+ .setSessionId(id)
+ .setOperationParams(OperationUtils.createParams(settings.toOperationSettings()))
+ .setTxSettings(TxControlToPb.txSettings(txMode))
+ .build();
+
+ final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration());
+ return interceptResultWithLog("begin transaction",
+ tableRpc.beginTransaction(request, grpcRequestSettings))
+ .thenApply(result -> result.map(tx -> new TableTransactionImpl(txMode, tx.getTxMeta().getId())));
}
@Override
@@ -915,8 +943,7 @@ public GrpcReadStream executeScanQuery(String query, Params par
});
}
- @Override
- public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) {
+ private CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) {
YdbTable.CommitTransactionRequest request = YdbTable.CommitTransactionRequest.newBuilder()
.setSessionId(id)
.setOperationParams(OperationUtils.createParams(settings.toOperationSettings()))
@@ -929,7 +956,11 @@ public CompletableFuture commitTransaction(String txId, CommitTxSettings
}
@Override
- public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) {
+ public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) {
+ return commitTransactionInternal(txId, settings);
+ }
+
+ private CompletableFuture rollbackTransactionInternal(String txId, RollbackTxSettings settings) {
YdbTable.RollbackTransactionRequest request = YdbTable.RollbackTransactionRequest.newBuilder()
.setSessionId(id)
.setOperationParams(OperationUtils.createParams(settings.toOperationSettings()))
@@ -941,6 +972,12 @@ public CompletableFuture rollbackTransaction(String txId, RollbackTxSett
tableRpc.rollbackTransaction(request, grpcRequestSettings));
}
+ @Override
+ @Deprecated
+ public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) {
+ return rollbackTransactionInternal(txId, settings);
+ }
+
@Override
public CompletableFuture> keepAlive(KeepAliveSessionSettings settings) {
YdbTable.KeepAliveRequest request = YdbTable.KeepAliveRequest.newBuilder()
@@ -1032,6 +1069,115 @@ public String toString() {
return "Session{" + id + "}";
}
+ class TableTransactionImpl extends YdbTransactionImpl implements TableTransaction {
+
+ TableTransactionImpl(TxMode txMode, String txId) {
+ super(txMode, txId);
+ }
+
+ @Override
+ public String getSessionId() {
+ return id;
+ }
+
+ @Override
+ public Session getSession() {
+ return BaseSession.this;
+ }
+
+ @Override
+ public CompletableFuture> executeDataQuery(
+ String query, boolean commitAtEnd, Params params, ExecuteDataQuerySettings settings) {
+ // If we intend to commit, statusFuture is reset to reflect only future actions in transaction
+ CompletableFuture currentStatusFuture = commitAtEnd
+ ? statusFuture.getAndSet(new CompletableFuture<>())
+ : statusFuture.get();
+ final String currentId = txId.get();
+ YdbTable.TransactionControl transactionControl = currentId != null
+ ? TxControlToPb.txIdCtrl(currentId, commitAtEnd)
+ : TxControlToPb.txModeCtrl(txMode, commitAtEnd);
+ return executeDataQueryInternal(query, transactionControl, params, settings)
+ .whenComplete((result, th) -> {
+ if (th != null) {
+ currentStatusFuture.completeExceptionally(
+ new RuntimeException("ExecuteDataQuery on transaction failed with exception ", th));
+ setNewId(currentId, null);
+ } else if (result.isSuccess()) {
+ setNewId(currentId, result.getValue().getTxId());
+ if (commitAtEnd) {
+ currentStatusFuture.complete(Status.SUCCESS);
+ }
+ } else {
+ setNewId(currentId, null);
+ currentStatusFuture.complete(Status
+ .of(StatusCode.ABORTED)
+ .withIssues(Issue.of("ExecuteDataQuery on transaction failed with status "
+ + result.getStatus(), Issue.Severity.ERROR)));
+ }
+ });
+ }
+
+ private void setNewId(String currentId, String newId) {
+ if (!txId.compareAndSet(currentId, newId)) {
+ logger.warn("{} Couldn't change transaction id from {} to {}", BaseSession.this, currentId, newId);
+ }
+ }
+
+ @Override
+ public CompletableFuture commit(CommitTxSettings settings) {
+ CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>());
+ final String transactionId = txId.get();
+ if (transactionId == null) {
+ Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING);
+ return CompletableFuture.completedFuture(Status.of(StatusCode.SUCCESS, null, issue));
+ }
+ return commitTransactionInternal(transactionId, settings).whenComplete(((status, th) -> {
+ if (th != null) {
+ currentStatusFuture.completeExceptionally(th);
+ } else {
+ currentStatusFuture.complete(status);
+ }
+ }));
+ }
+
+ @Override
+ public CompletableFuture rollback(RollbackTxSettings settings) {
+ CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>());
+ final String transactionId = txId.get();
+ if (transactionId == null) {
+ Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING);
+ return CompletableFuture.completedFuture(Status.of(StatusCode.SUCCESS, null, issue));
+ }
+ return rollbackTransactionInternal(transactionId, settings)
+ .whenComplete((status, th) -> currentStatusFuture.complete(Status
+ .of(StatusCode.ABORTED)
+ .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR))));
+ }
+ }
+
+ public final class DeprecatedTransactionImpl implements Transaction {
+ private final String txId;
+
+ DeprecatedTransactionImpl(String txId) {
+ this.txId = txId;
+ }
+
+ @Override
+ public String getId() {
+ return txId;
+ }
+
+ @Override
+ public CompletableFuture commit(CommitTxSettings settings) {
+ return commitTransactionInternal(txId, settings);
+ }
+
+ @Override
+ public CompletableFuture rollback(RollbackTxSettings settings) {
+ return rollbackTransactionInternal(txId, settings);
+ }
+ }
+
private static class ShutdownHandler implements Consumer {
private static final String GRACEFUL_SHUTDOWN_HINT = "session-close";
private volatile boolean needShutdown;
diff --git a/table/src/main/java/tech/ydb/table/impl/TransactionImpl.java b/table/src/main/java/tech/ydb/table/impl/TransactionImpl.java
deleted file mode 100644
index 435386c71..000000000
--- a/table/src/main/java/tech/ydb/table/impl/TransactionImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package tech.ydb.table.impl;
-
-import java.util.concurrent.CompletableFuture;
-
-import tech.ydb.core.Status;
-import tech.ydb.table.Session;
-import tech.ydb.table.settings.CommitTxSettings;
-import tech.ydb.table.settings.RollbackTxSettings;
-import tech.ydb.table.transaction.Transaction;
-
-
-/**
- * @author Sergey Polovko
- */
-final class TransactionImpl implements Transaction {
-
- private final Session session;
- private final String txId;
-
- TransactionImpl(Session session, String txId) {
- this.session = session;
- this.txId = txId;
- }
-
- @Override
- public String getId() {
- return txId;
- }
-
- @Override
- public CompletableFuture commit(CommitTxSettings settings) {
- return session.commitTransaction(txId, settings);
- }
-
- @Override
- public CompletableFuture rollback(RollbackTxSettings settings) {
- return session.rollbackTransaction(txId, settings);
- }
-}
diff --git a/table/src/main/java/tech/ydb/table/impl/TxControlToPb.java b/table/src/main/java/tech/ydb/table/impl/TxControlToPb.java
new file mode 100644
index 000000000..517dbad06
--- /dev/null
+++ b/table/src/main/java/tech/ydb/table/impl/TxControlToPb.java
@@ -0,0 +1,69 @@
+package tech.ydb.table.impl;
+
+import tech.ydb.common.transaction.TxMode;
+import tech.ydb.proto.table.YdbTable;
+
+/**
+ * @author Aleksandr Gorshenin
+ * @author Nikolay Perfilov
+ */
+public class TxControlToPb {
+ private static final YdbTable.TransactionSettings TS_SERIALIZABLE = YdbTable.TransactionSettings.newBuilder()
+ .setSerializableReadWrite(YdbTable.SerializableModeSettings.getDefaultInstance())
+ .build();
+
+ private static final YdbTable.TransactionSettings TS_SNAPSHOT = YdbTable.TransactionSettings.newBuilder()
+ .setSnapshotReadOnly(YdbTable.SnapshotModeSettings.getDefaultInstance())
+ .build();
+
+ private static final YdbTable.TransactionSettings TS_STALE = YdbTable.TransactionSettings.newBuilder()
+ .setStaleReadOnly(YdbTable.StaleModeSettings.getDefaultInstance())
+ .build();
+
+ private static final YdbTable.TransactionSettings TS_ONLINE = YdbTable.TransactionSettings.newBuilder()
+ .setOnlineReadOnly(YdbTable.OnlineModeSettings.newBuilder().setAllowInconsistentReads(false).build())
+ .build();
+
+ private static final YdbTable.TransactionSettings TS_ONLINE_INCONSISTENT = YdbTable.TransactionSettings
+ .newBuilder()
+ .setOnlineReadOnly(YdbTable.OnlineModeSettings.newBuilder().setAllowInconsistentReads(true).build())
+ .build();
+
+ private TxControlToPb() { }
+
+ public static YdbTable.TransactionControl txModeCtrl(TxMode tx, boolean commitTx) {
+ YdbTable.TransactionSettings ts = txSettings(tx);
+ if (ts == null) {
+ return null;
+ }
+ return YdbTable.TransactionControl.newBuilder()
+ .setBeginTx(ts)
+ .setCommitTx(commitTx)
+ .build();
+ }
+
+ public static YdbTable.TransactionControl txIdCtrl(String txId, boolean commitTx) {
+ return YdbTable.TransactionControl.newBuilder()
+ .setTxId(txId)
+ .setCommitTx(commitTx)
+ .build();
+ }
+
+ public static YdbTable.TransactionSettings txSettings(TxMode tx) {
+ switch (tx) {
+ case SERIALIZABLE_RW:
+ return TS_SERIALIZABLE;
+ case SNAPSHOT_RO:
+ return TS_SNAPSHOT;
+ case STALE_RO:
+ return TS_STALE;
+ case ONLINE_RO:
+ return TS_ONLINE;
+ case ONLINE_INCONSISTENT_RO:
+ return TS_ONLINE_INCONSISTENT;
+ case NONE:
+ default:
+ return null;
+ }
+ }
+}
diff --git a/table/src/main/java/tech/ydb/table/transaction/TableTransaction.java b/table/src/main/java/tech/ydb/table/transaction/TableTransaction.java
new file mode 100644
index 000000000..8e4dfc685
--- /dev/null
+++ b/table/src/main/java/tech/ydb/table/transaction/TableTransaction.java
@@ -0,0 +1,100 @@
+package tech.ydb.table.transaction;
+
+import java.util.concurrent.CompletableFuture;
+
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.table.Session;
+import tech.ydb.table.query.DataQueryResult;
+import tech.ydb.table.query.Params;
+import tech.ydb.table.settings.CommitTxSettings;
+import tech.ydb.table.settings.ExecuteDataQuerySettings;
+import tech.ydb.table.settings.RollbackTxSettings;
+
+/**
+ * Interface of transaction from table service
+ * Short-living object allows transactional execution of several queries in one interactive transaction.
+ * TableTransaction can be used in implicit mode - without calling commit()/rollback().
+ * When TableTransaction is not active, any execution of a query with commitAtEnd=false starts a new transaction.
+ * And execution of a query with commitAtEnd=true commits this transaction.
+ * @author Nikolay Perfilov
+ */
+public interface TableTransaction extends YdbTransaction {
+
+ /**
+ * Returns {@link Session} that was used to create this transaction
+ *
+ * @return session that was used to create this transaction
+ */
+ Session getSession();
+
+ /**
+ * Execute DataQuery
+ *
+ * @param query text of query. Can only contain DML statements
+ * @param commitAtEnd true if transaction must be committed after query execution
+ * @param params query parameters
+ * @param settings additional settings of query execution
+ * @return a future to query result
+ */
+ CompletableFuture> executeDataQuery(
+ String query, boolean commitAtEnd, Params params, ExecuteDataQuerySettings settings);
+
+ /**
+ * Execute DataQuery.
+ * Transaction will not be committed after the execution of query.
+ *
+ * @param query text of query. Can only contain DML statements
+ * @return a future to query result
+ */
+ default CompletableFuture> executeDataQuery(String query) {
+ return executeDataQuery(query, false, Params.empty(), new ExecuteDataQuerySettings());
+ }
+
+ /**
+ * Execute DataQuery.
+ * Transaction will not be committed after the execution of query.
+ *
+ * @param query text of query. Can only contain DML statements
+ * @param params query parameters
+ * @return a future to query result
+ */
+ default CompletableFuture> executeDataQuery(String query, Params params) {
+ return executeDataQuery(query, false, params, new ExecuteDataQuerySettings());
+ }
+
+ /**
+ * Execute DataQuery.
+ * Transaction will be committed after the execution of query.
+ *
+ * @param query text of query. Can only contain DML statements
+ * @return a future to query result
+ */
+ default CompletableFuture> executeDataQueryAndCommit(String query) {
+ return executeDataQuery(query, true, Params.empty(), new ExecuteDataQuerySettings());
+ }
+
+ /**
+ * Execute DataQuery.
+ * Transaction will be committed after the execution of query.
+ *
+ * @param query text of query. Can only contain DML statements
+ * @param params query parameters
+ * @return a future to query result
+ */
+ default CompletableFuture> executeDataQueryAndCommit(String query, Params params) {
+ return executeDataQuery(query, false, params, new ExecuteDataQuerySettings());
+ }
+
+ CompletableFuture commit(CommitTxSettings settings);
+ CompletableFuture rollback(RollbackTxSettings settings);
+
+ default CompletableFuture commit() {
+ return commit(new CommitTxSettings());
+ }
+
+ default CompletableFuture rollback() {
+ return rollback(new RollbackTxSettings());
+ }
+}
diff --git a/table/src/main/java/tech/ydb/table/transaction/Transaction.java b/table/src/main/java/tech/ydb/table/transaction/Transaction.java
index 74eb995cd..fd1bbe9b8 100644
--- a/table/src/main/java/tech/ydb/table/transaction/Transaction.java
+++ b/table/src/main/java/tech/ydb/table/transaction/Transaction.java
@@ -8,7 +8,10 @@
/**
* @author Sergey Polovko
+ * @deprecated
+ * Use {@link TableTransaction} instead
*/
+@Deprecated
public interface Transaction {
enum Mode {
SERIALIZABLE_READ_WRITE,
diff --git a/table/src/main/java/tech/ydb/table/transaction/TxControl.java b/table/src/main/java/tech/ydb/table/transaction/TxControl.java
index 9446c595f..62164055b 100644
--- a/table/src/main/java/tech/ydb/table/transaction/TxControl.java
+++ b/table/src/main/java/tech/ydb/table/transaction/TxControl.java
@@ -1,12 +1,14 @@
package tech.ydb.table.transaction;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.proto.table.YdbTable;
import tech.ydb.proto.table.YdbTable.OnlineModeSettings;
import tech.ydb.proto.table.YdbTable.SerializableModeSettings;
import tech.ydb.proto.table.YdbTable.StaleModeSettings;
import tech.ydb.proto.table.YdbTable.TransactionControl;
import tech.ydb.proto.table.YdbTable.TransactionSettings;
+import tech.ydb.table.settings.BeginTxSettings;
/**
@@ -31,10 +33,22 @@ protected TxControl(boolean commitTx, TransactionSettings settings) {
.build();
}
+ /**
+ * @deprecated
+ * Use {@link TableTransaction} created by {@link tech.ydb.table.Session#createNewTransaction(TxMode)}
+ * or {@link tech.ydb.table.Session#beginTransaction(TxMode, BeginTxSettings)} to execute queries in transaction
+ */
+ @Deprecated
public static TxId id(String id) {
return new TxId(true, id);
}
+ /**
+ * @deprecated
+ * Use {@link TableTransaction} created by {@link tech.ydb.table.Session#createNewTransaction(TxMode)}
+ * or {@link tech.ydb.table.Session#beginTransaction(TxMode, BeginTxSettings)} to execute queries in transaction
+ */
+ @Deprecated
public static TxId id(Transaction tx) {
return new TxId(true, tx.getId());
}
@@ -60,7 +74,6 @@ public boolean isCommitTx() {
}
public abstract Self setCommitTx(boolean commitTx);
-
public TransactionControl toPb() {
return pb;
}
diff --git a/table/src/test/java/tech/ydb/table/SessionStub.java b/table/src/test/java/tech/ydb/table/SessionStub.java
index c6cfe9059..7fb1846a5 100644
--- a/table/src/test/java/tech/ydb/table/SessionStub.java
+++ b/table/src/test/java/tech/ydb/table/SessionStub.java
@@ -1,11 +1,9 @@
package tech.ydb.table;
-import java.sql.ResultSet;
-import java.time.Duration;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
@@ -37,10 +35,10 @@
import tech.ydb.table.settings.ReadRowsSettings;
import tech.ydb.table.settings.ReadTableSettings;
import tech.ydb.table.settings.RollbackTxSettings;
+import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListValue;
-import tech.ydb.table.values.StructValue;
/**
@@ -130,12 +128,23 @@ public CompletableFuture> explainDataQuery(
}
@Override
+ @Deprecated
public CompletableFuture> beginTransaction(
Transaction.Mode transactionMode, BeginTxSettings settings)
{
return notImplemented("beginTransaction()");
}
+ @Override
+ public TableTransaction createNewTransaction(TxMode txMode) {
+ throw new UnsupportedOperationException("createNewTransaction is not implemented");
+ }
+
+ @Override
+ public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) {
+ throw new UnsupportedOperationException("beginTransaction is not implemented");
+ }
+
@Override
public GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings) {
throw new UnsupportedOperationException("executeReadTable not implemented");
diff --git a/topic/pom.xml b/topic/pom.xml
index fb0613f02..f1ab0ae11 100644
--- a/topic/pom.xml
+++ b/topic/pom.xml
@@ -25,6 +25,11 @@
ydb-sdk-core
+
+ tech.ydb
+ ydb-sdk-common
+
+
org.anarres.lzo
lzo-core
diff --git a/topic/src/main/java/tech/ydb/topic/TopicRpc.java b/topic/src/main/java/tech/ydb/topic/TopicRpc.java
index 5363e79b3..96d091338 100644
--- a/topic/src/main/java/tech/ydb/topic/TopicRpc.java
+++ b/topic/src/main/java/tech/ydb/topic/TopicRpc.java
@@ -56,6 +56,15 @@ CompletableFuture> describeTopic(YdbTopic.D
*/
CompletableFuture commitOffset(YdbTopic.CommitOffsetRequest request, GrpcRequestSettings settings);
+ /**
+ * Updates offsets in transaction.
+ * @param request request proto
+ * @param settings rpc call settings
+ * @return completable future with result of operation
+ */
+ CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
+ GrpcRequestSettings settings);
+
GrpcReadWriteStream writeSession();
GrpcReadWriteStream readSession();
diff --git a/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java
index c806e5090..32dcb4cf7 100644
--- a/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java
+++ b/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java
@@ -3,20 +3,8 @@
/**
* @author Nikolay Perfilov
*/
-public class OffsetsRange {
- private final long start;
- private final long end;
+public interface OffsetsRange {
+ long getStart();
- public OffsetsRange(long start, long end) {
- this.start = start;
- this.end = end;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return end;
- }
+ long getEnd();
}
diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java
index 7ae6ad178..1d4eaa415 100644
--- a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java
+++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java
@@ -70,6 +70,15 @@ public CompletableFuture commitOffset(YdbTopic.CommitOffsetRequest reque
.thenApply(OperationManager.syncStatusUnwrapper(YdbTopic.CommitOffsetResponse::getOperation));
}
+ @Override
+ public CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
+ GrpcRequestSettings settings) {
+ return transport
+ .unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request)
+ .thenApply(OperationManager.syncStatusUnwrapper(
+ YdbTopic.UpdateOffsetsInTransactionResponse::getOperation));
+ }
+
@Override
public GrpcReadWriteStream<
YdbTopic.StreamWriteMessage.FromServer,
diff --git a/topic/src/main/java/tech/ydb/topic/impl/Session.java b/topic/src/main/java/tech/ydb/topic/impl/Session.java
index 8fc3b07c4..f9745d173 100644
--- a/topic/src/main/java/tech/ydb/topic/impl/Session.java
+++ b/topic/src/main/java/tech/ydb/topic/impl/Session.java
@@ -5,5 +5,5 @@
*/
public interface Session {
void startAndInitialize();
- void shutdown();
+ boolean shutdown();
}
diff --git a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
index 4218cd1cf..5c73d3277 100644
--- a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
+++ b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java
@@ -76,11 +76,13 @@ private boolean stop() {
@Override
- public synchronized void shutdown() {
+ public synchronized boolean shutdown() {
getLogger().info("Session shutdown");
if (stop()) {
onStop();
streamConnection.close();
+ return true;
}
+ return false;
}
}
diff --git a/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java b/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java
index 2be8b7e17..c8ad9b3a9 100644
--- a/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java
+++ b/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java
@@ -1,9 +1,17 @@
package tech.ydb.topic.read;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import io.grpc.ExperimentalApi;
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.core.Status;
+import tech.ydb.topic.read.events.DataReceivedEvent;
+import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;
+
/**
* @author Nikolay Perfilov
*/
@@ -19,4 +27,41 @@ public interface AsyncReader {
* Stops internal threads and makes cleanup in background. Non-blocking
*/
CompletableFuture shutdown();
+
+ /**
+ * Add offsets to transaction. Offsets could be from several topics.
+ * These offsets are "committed" only after said transaction is successfully committed.
+ * It is a separate request sent outside the reading stream.
+ *
+ * @param transaction a {@link YdbTransaction} that offsets should be added to.
+ * Transaction has to be active.
+ * @param offsets Offsets that should be added to transaction.
+ * Map key: topic Path
+ * Map value: List of Partition ranges for every partition in this topic to add
+ * @param settings Operation settings.
+ * @return {@link CompletableFuture} to operation status
+ */
+ CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction,
+ Map> offsets,
+ UpdateOffsetsInTransactionSettings settings);
+
+ /**
+ * Add offsets of a single partition session to transaction.Offsets could be from several topics.
+ * These offsets are "committed" only after said transaction is successfully committed.
+ * It is a separate request sent outside the reading stream.
+ *
+ * @param transaction a {@link YdbTransaction} that offsets should be added to.
+ * Transaction has to be active.
+ * @param offsets Offsets that should be added to transaction.
+ * Could be constructed with helper methods {@link Message#getPartitionOffsets()}
+ * or {@link DataReceivedEvent#getPartitionOffsets()}
+ * @param settings Operation settings.
+ * @return {@link CompletableFuture} to operation status
+ */
+ default CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction, PartitionOffsets offsets,
+ UpdateOffsetsInTransactionSettings settings) {
+ return updateOffsetsInTransaction(transaction,
+ Collections.singletonMap(offsets.getPartitionSession().getPath(), Collections.singletonList(offsets)),
+ settings);
+ }
}
diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java
index 51172141d..011b28031 100644
--- a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java
+++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java
@@ -12,7 +12,7 @@
*
* @author Nikolay Perfilov
*/
-public interface DeferredCommitter {
+public interface DeferredCommitter extends MessageAccumulator {
/**
* Creates a new instance of {@link DeferredCommitter}
*
@@ -22,20 +22,6 @@ static DeferredCommitter newInstance() {
return new DeferredCommitterImpl();
}
- /**
- * Adds a {@link Message} to commit it later with a commit method
- *
- * @param message a {@link Message} to commit later
- */
- void add(Message message);
-
- /**
- * Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method
- *
- * @param event a {@link DataReceivedEvent} to commit later
- */
- void add(DataReceivedEvent event);
-
/**
* Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s
* that were added to this DeferredCommitter since last commit
diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java
index 80be5bcc2..8627ebeab 100644
--- a/topic/src/main/java/tech/ydb/topic/read/Message.java
+++ b/topic/src/main/java/tech/ydb/topic/read/Message.java
@@ -65,6 +65,11 @@ public interface Message {
*/
PartitionSession getPartitionSession();
+ /**
+ * @return Partition offsets of this message
+ */
+ PartitionOffsets getPartitionOffsets();
+
/**
* Commits this message
* If there was an error while committing, there is no point of retrying committing the same message:
diff --git a/topic/src/main/java/tech/ydb/topic/read/MessageAccumulator.java b/topic/src/main/java/tech/ydb/topic/read/MessageAccumulator.java
new file mode 100644
index 000000000..8fdec247d
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/read/MessageAccumulator.java
@@ -0,0 +1,26 @@
+package tech.ydb.topic.read;
+
+import tech.ydb.topic.read.events.DataReceivedEvent;
+
+/**
+ * A common interface that is used to accumulate several {@link Message}s or/and
+ * {@link tech.ydb.topic.read.events.DataReceivedEvent}s to commit later all at once or to add to transaction.
+ *
+ * @author Nikolay Perfilov
+ */
+public interface MessageAccumulator {
+
+ /**
+ * Adds a {@link Message} to commit it later or to add to transaction
+ *
+ * @param message a {@link Message}
+ */
+ void add(Message message);
+
+ /**
+ * Adds a {@link DataReceivedEvent} to commit all its messages later or to add to transaction
+ *
+ * @param event a {@link DataReceivedEvent}
+ */
+ void add(DataReceivedEvent event);
+}
diff --git a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java
deleted file mode 100644
index 0045e644c..000000000
--- a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package tech.ydb.topic.read;
-
-/**
- * @author Nikolay Perfilov
- */
-public interface OffsetsRange {
- long getStart();
-
- long getEnd();
-}
diff --git a/topic/src/main/java/tech/ydb/topic/read/PartitionOffsets.java b/topic/src/main/java/tech/ydb/topic/read/PartitionOffsets.java
new file mode 100644
index 000000000..31d273d5c
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/read/PartitionOffsets.java
@@ -0,0 +1,27 @@
+package tech.ydb.topic.read;
+
+import java.util.List;
+
+import tech.ydb.topic.description.OffsetsRange;
+
+/**
+ * @author Nikolay Perfilov
+ */
+public class PartitionOffsets {
+ private final PartitionSession partitionSession;
+ private final List offsets;
+
+ public PartitionOffsets(PartitionSession partitionSession, List offsets) {
+ this.partitionSession = partitionSession;
+ this.offsets = offsets;
+ }
+
+ public PartitionSession getPartitionSession() {
+ return partitionSession;
+ }
+
+ public List getOffsets() {
+ return offsets;
+ }
+
+}
diff --git a/topic/src/main/java/tech/ydb/topic/read/SyncReader.java b/topic/src/main/java/tech/ydb/topic/read/SyncReader.java
index e31ddc7a5..940fe9b8b 100644
--- a/topic/src/main/java/tech/ydb/topic/read/SyncReader.java
+++ b/topic/src/main/java/tech/ydb/topic/read/SyncReader.java
@@ -6,6 +6,8 @@
import io.grpc.ExperimentalApi;
+import tech.ydb.topic.settings.ReceiveSettings;
+
/**
* @author Nikolay Perfilov
*/
@@ -24,21 +26,34 @@ public interface SyncReader {
/**
* Receive a {@link Message}. Blocks until a Message is received.
- * Throws {@link java.util.concurrent.TimeoutException} if timeout runs off
+ *
+ * @param settings settings for receiving a Message
+ * @return returns a {@link Message}, or null if the specified timeout time elapses before a message is available
+ */
+ Message receive(ReceiveSettings settings) throws InterruptedException;
+
+ /**
+ * Receive a {@link Message}. Blocks until a Message is received.
*
* @param timeout timeout to wait a Message with
* @param unit TimeUnit for timeout
* @return returns a {@link Message}, or null if the specified waiting time elapses before a message is available
*/
@Nullable
- Message receive(long timeout, TimeUnit unit) throws InterruptedException;
+ default Message receive(long timeout, TimeUnit unit) throws InterruptedException {
+ return receive(ReceiveSettings.newBuilder()
+ .setTimeout(timeout, unit)
+ .build());
+ }
/**
* Receive a {@link Message}. Blocks until a Message is received.
*
* @return {@link Message}
*/
- Message receive() throws InterruptedException;
+ default Message receive() throws InterruptedException {
+ return receive(ReceiveSettings.newBuilder().build());
+ }
/**
* Stops internal threads and makes cleanup in background. Blocking
diff --git a/topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java b/topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java
new file mode 100644
index 000000000..d6b4d0bfb
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java
@@ -0,0 +1,22 @@
+package tech.ydb.topic.read;
+
+import java.util.concurrent.CompletableFuture;
+
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.core.Status;
+import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;
+
+/**
+ * A helper class that is used to accumulate messages and add them to {@link YdbTransaction}.
+ * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to add to
+ * {@link YdbTransaction} later all at once.
+ * All messages should be read from the same Reader this accumulator was created on.
+ * Contains no data references and therefore may also be useful in cases where messages are committed after processing
+ * data in an external system.
+ *
+ * @author Nikolay Perfilov
+ */
+public interface TransactionMessageAccumulator extends MessageAccumulator {
+ CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction,
+ UpdateOffsetsInTransactionSettings settings);
+}
diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java
index 416c1481e..896484510 100644
--- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java
+++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java
@@ -4,6 +4,7 @@
import java.util.concurrent.CompletableFuture;
import tech.ydb.topic.read.Message;
+import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
/**
@@ -26,6 +27,13 @@ public interface DataReceivedEvent {
*/
PartitionSession getPartitionSession();
+ /**
+ * Returns partition offsets of this message
+ *
+ * @return Partition offsets of this message
+ */
+ PartitionOffsets getPartitionOffsets();
+
/**
* Commits all messages in this event at once.
* If there was an error while committing, there is no point of retrying committing the same messages:
diff --git a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java
index aa64e60de..b95a84df9 100644
--- a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java
+++ b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java
@@ -1,6 +1,6 @@
package tech.ydb.topic.read.events;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java
index 92aa68ac7..fe1c2e0d3 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java
@@ -1,5 +1,7 @@
package tech.ydb.topic.read.impl;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -11,9 +13,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.core.Status;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.AsyncReader;
+import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.CommitOffsetAcknowledgementEvent;
import tech.ydb.topic.read.events.DataReceivedEvent;
@@ -29,6 +34,7 @@
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
+import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;
/**
* @author Nikolay Perfilov
@@ -61,6 +67,17 @@ public CompletableFuture init() {
return initImpl();
}
+ @Override
+ public CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction,
+ Map> offsets,
+ UpdateOffsetsInTransactionSettings settings) {
+ if (!transaction.isActive()) {
+ throw new IllegalArgumentException("Transaction is not active. " +
+ "Can only read topic messages in already running transactions from other services");
+ }
+ return sendUpdateOffsetsInTransaction(transaction, offsets, settings);
+ }
+
@Override
protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) {
return CompletableFuture.runAsync(() -> {
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java
index 107657a2c..512ee8701 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java
@@ -5,7 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.description.OffsetsRange;
/**
* @author Nikolay Perfilov
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java
index fb3457038..6abf30475 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java
@@ -7,9 +7,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.DeferredCommitter;
import tech.ydb.topic.read.Message;
-import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java
index 41ec47ec7..33130d321 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java
@@ -7,7 +7,7 @@
import java.util.NavigableMap;
import java.util.TreeMap;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.description.OffsetsRange;
/**
* @author Nikolay Perfilov
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java
index 3152046e0..2c90709cf 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java
@@ -2,14 +2,16 @@
import java.io.IOException;
import java.time.Instant;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import tech.ydb.topic.description.MetadataItem;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
/**
@@ -114,6 +116,11 @@ public List getMetadataItems() {
return metadataItems;
}
+ @Override
+ public PartitionOffsets getPartitionOffsets() {
+ return new PartitionOffsets(partitionSession.getSessionInfo(), Collections.singletonList(offsetsToCommit));
+ }
+
public void setDecompressed(boolean decompressed) {
isDecompressed = decompressed;
}
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java
index 5b4661b72..6cf1165ca 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java
@@ -1,6 +1,6 @@
package tech.ydb.topic.read.impl;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.description.OffsetsRange;
/**
* @author Nikolay Perfilov
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java
index 1417e1222..27548ff90 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java
@@ -23,8 +23,8 @@
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.MetadataItem;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.Message;
-import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
@@ -40,7 +40,6 @@ public class PartitionSessionImpl {
private final String path;
private final long partitionId;
private final PartitionSession sessionInfo;
- private final OffsetsRange partitionOffsets;
private final Executor decompressionExecutor;
private final AtomicBoolean isWorking = new AtomicBoolean(true);
@@ -61,13 +60,12 @@ private PartitionSessionImpl(Builder builder) {
this.sessionInfo = new PartitionSession(id, partitionId, path);
this.lastReadOffset = builder.committedOffset;
this.lastCommittedOffset = builder.committedOffset;
- this.partitionOffsets = builder.partitionOffsets;
this.decompressionExecutor = builder.decompressionExecutor;
this.dataEventCallback = builder.dataEventCallback;
this.commitFunction = builder.commitFunction;
logger.info("[{}] Partition session {} (partition {}) is started. CommittedOffset: {}. " +
- "Partition offsets: {}-{}", path, id, partitionId, lastReadOffset, partitionOffsets.getStart(),
- partitionOffsets.getEnd());
+ "Partition offsets: {}-{}", path, id, partitionId, lastReadOffset, builder.partitionOffsets.getStart(),
+ builder.partitionOffsets.getEnd());
}
public static Builder newBuilder() {
@@ -222,7 +220,7 @@ public void commitOffsetRanges(List rangesToCommit) {
.append("] Sending CommitRequest for partition session ").append(id)
.append(" (partition ").append(partitionId).append(") with offset ranges ");
addRangesToString(message, rangesToCommit);
- logger.info(message.toString());
+ logger.debug(message.toString());
}
commitFunction.accept(rangesToCommit);
} else if (logger.isInfoEnabled()) {
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
index 2fd65cf47..94d65ae8f 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
@@ -17,20 +17,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
+import tech.ydb.core.grpc.GrpcRequestSettings;
+import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.impl.GrpcStreamRetrier;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.TopicReadSettings;
+import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;
/**
* @author Nikolay Perfilov
@@ -130,7 +135,83 @@ protected void onShutdown(String reason) {
}
}
- private class ReadSessionImpl extends ReadSession {
+ private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {
+ return GrpcRequestSettings.newBuilder()
+ .withDeadline(settings.getRequestTimeout())
+ .build();
+ }
+
+ protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransaction transaction,
+ Map> offsets,
+ UpdateOffsetsInTransactionSettings settings) {
+ if (offsets.isEmpty()) {
+ throw new IllegalArgumentException("Empty topic list to update in transaction");
+ }
+ if (logger.isDebugEnabled()) {
+ StringBuilder str = new StringBuilder("Updating ");
+ boolean first = true;
+ for (Map.Entry> topicOffsets : offsets.entrySet()) {
+ if (topicOffsets.getValue().isEmpty()) {
+ throw new IllegalArgumentException("Empty offsets range to update in transaction");
+ }
+ for (PartitionOffsets partitionOffsets : topicOffsets.getValue()) {
+ if (!first) {
+ str.append(", ");
+ } else {
+ first = false;
+ }
+ str.append("offsets [").append(partitionOffsets.getOffsets().get(0).getStart()).append("..")
+ .append(partitionOffsets.getOffsets().get(partitionOffsets.getOffsets().size() - 1)
+ .getEnd()).append(") for partition ")
+ .append(partitionOffsets.getPartitionSession().getPartitionId())
+ .append(" [topic ").append(topicOffsets.getKey()).append("]");
+ }
+ }
+ logger.debug(str.toString());
+ }
+ final ReadSessionImpl currentSession = session;
+ transaction.getStatusFuture().whenComplete((status, error) -> {
+ if (error != null) {
+ currentSession.closeDueToError(null,
+ new RuntimeException("Restarting read session due to transaction " + transaction.getId() +
+ " with partition offsets from read session " + currentSession.fullId +
+ " was not committed with reason: " + error));
+ } else if (!status.isSuccess()) {
+ currentSession.closeDueToError(null,
+ new RuntimeException("Restarting read session due to transaction " + transaction.getId() +
+ " with partition offsets from read session " + currentSession.fullId +
+ " was not committed with status: " + status));
+ }
+ });
+ YdbTopic.UpdateOffsetsInTransactionRequest.Builder requestBuilder = YdbTopic.UpdateOffsetsInTransactionRequest
+ .newBuilder()
+ .setTx(YdbTopic.TransactionIdentity.newBuilder()
+ .setId(transaction.getId())
+ .setSession(transaction.getSessionId()))
+ .setConsumer(this.settings.getConsumerName());
+ offsets.forEach((path, topicOffsets) -> {
+ YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.Builder topicOffsetsBuilder = YdbTopic
+ .UpdateOffsetsInTransactionRequest.TopicOffsets.newBuilder()
+ .setPath(path);
+ topicOffsets.forEach(partitionOffsets -> {
+ YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.Builder partitionOffsetsBuilder
+ = YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.newBuilder()
+ .setPartitionId(partitionOffsets.getPartitionSession().getPartitionId());
+ partitionOffsets.getOffsets().forEach(offsetsRange -> partitionOffsetsBuilder.addPartitionOffsets(
+ YdbTopic.OffsetsRange.newBuilder()
+ .setStart(offsetsRange.getStart())
+ .setEnd(offsetsRange.getEnd())
+ .build()));
+ topicOffsetsBuilder.addPartitions(partitionOffsetsBuilder);
+ });
+ requestBuilder.addTopics(topicOffsetsBuilder);
+ });
+
+ final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings);
+ return topicRpc.updateOffsetsInTransaction(requestBuilder.build(), grpcRequestSettings);
+ }
+
+ protected class ReadSessionImpl extends ReadSession {
protected String sessionId = "";
private final String fullId;
// Total size to request with next ReadRequest.
@@ -144,7 +225,7 @@ private ReadSessionImpl() {
public void startAndInitialize() {
logger.debug("[{}] Session {} startAndInitialize called", fullId, sessionId);
- start(this::processMessage).whenComplete(this::onSessionClosing);
+ start(this::processMessage).whenComplete(this::closeDueToError);
YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest
.newBuilder();
@@ -437,7 +518,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
reconnectCounter.set(0);
} else {
logger.warn("[{}] Got non-success status in processMessage method: {}", fullId, message);
- onSessionClosed(Status.of(StatusCode.fromProto(message.getStatus()))
+ closeDueToError(Status.of(StatusCode.fromProto(message.getStatus()))
.withIssues(Issue.of("Got a message with non-success status: " + message,
Issue.Severity.ERROR)), null);
return;
@@ -462,10 +543,9 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) {
}
}
- private void onSessionClosing(Status status, Throwable th) {
- logger.info("[{}] Session {} onSessionClosing called", fullId, sessionId);
- if (isWorking.get()) {
- shutdown();
+ protected void closeDueToError(Status status, Throwable th) {
+ logger.info("[{}] Session {} closeDueToError called", fullId, sessionId);
+ if (shutdown()) {
// Signal reader to retry
onSessionClosed(status, th);
}
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java
index 200190716..75b441e86 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java
@@ -2,6 +2,7 @@
import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -14,6 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.core.Status;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.Message;
@@ -21,7 +23,9 @@
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.settings.ReaderSettings;
+import tech.ydb.topic.settings.ReceiveSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
+import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;
/**
* @author Nikolay Perfilov
@@ -56,9 +60,9 @@ public void initAndWait() {
initImpl().join();
}
- @Override
@Nullable
- public Message receive(long timeout, TimeUnit unit) throws InterruptedException {
+ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, TimeUnit unit)
+ throws InterruptedException {
if (isStopped.get()) {
throw new RuntimeException("Reader was stopped");
}
@@ -96,16 +100,31 @@ public Message receive(long timeout, TimeUnit unit) throws InterruptedException
currentMessageIndex = 0;
currentBatch.future.complete(null);
}
+ if (receiveSettings.getTransaction() != null) {
+ Status updateStatus = sendUpdateOffsetsInTransaction(receiveSettings.getTransaction(),
+ Collections.singletonMap(result.getPartitionSession().getPath(),
+ Collections.singletonList(result.getPartitionOffsets())),
+ UpdateOffsetsInTransactionSettings.newBuilder().build())
+ .join();
+ if (!updateStatus.isSuccess()) {
+ throw new RuntimeException("Couldn't add message offset " + result.getOffset() + " to transaction "
+ + receiveSettings.getTransaction().getId() + ": " + updateStatus);
+ }
+ }
return result;
}
}
@Override
- public Message receive() throws InterruptedException {
+ public Message receive(ReceiveSettings receiveSettings) throws InterruptedException {
+ if (receiveSettings.getTimeout() != null) {
+ return receiveInternal(receiveSettings, receiveSettings.getTimeout(), receiveSettings.getTimeoutTimeUnit());
+ }
+
Message result;
// Poll to prevent infinite wait in case if reader was stopped
do {
- result = receive(POLL_INTERVAL_SECONDS, TimeUnit.SECONDS);
+ result = receiveInternal(receiveSettings, POLL_INTERVAL_SECONDS, TimeUnit.SECONDS);
} while (result == null);
return result;
}
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java
new file mode 100644
index 000000000..d8096a7ab
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java
@@ -0,0 +1,98 @@
+package tech.ydb.topic.read.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.core.Status;
+import tech.ydb.topic.description.OffsetsRange;
+import tech.ydb.topic.read.AsyncReader;
+import tech.ydb.topic.read.Message;
+import tech.ydb.topic.read.PartitionOffsets;
+import tech.ydb.topic.read.PartitionSession;
+import tech.ydb.topic.read.TransactionMessageAccumulator;
+import tech.ydb.topic.read.events.DataReceivedEvent;
+import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
+import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings;
+
+/**
+ * @author Nikolay Perfilov
+ */
+public class TransactionMessageAccumulatorImpl implements TransactionMessageAccumulator {
+ private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);
+
+ private final AsyncReader reader;
+ private final Map> rangesByTopic = new ConcurrentHashMap<>();
+
+ private static class PartitionRanges {
+ private final PartitionSession partitionSession;
+ private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();
+
+ private PartitionRanges(PartitionSession partitionSession) {
+ this.partitionSession = partitionSession;
+ }
+
+ private void add(OffsetsRange offsetRange) {
+ try {
+ synchronized (ranges) {
+ ranges.add(offsetRange);
+ }
+ } catch (RuntimeException exception) {
+ String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
+ partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " +
+ exception.getMessage();
+ logger.error(errorMessage);
+ throw new RuntimeException(errorMessage, exception);
+ }
+ }
+
+ private List getOffsetsRanges() {
+ synchronized (ranges) {
+ return ranges.getRangesAndClear();
+ }
+ }
+ }
+
+ TransactionMessageAccumulatorImpl(AsyncReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void add(Message message) {
+ MessageImpl messageImpl = (MessageImpl) message;
+ PartitionRanges partitionRanges = rangesByTopic
+ .computeIfAbsent(message.getPartitionSession().getPath(), path -> new ConcurrentHashMap<>())
+ .computeIfAbsent(message.getPartitionSession(), PartitionRanges::new);
+ partitionRanges.add(messageImpl.getOffsetsToCommit());
+ }
+
+ @Override
+ public void add(DataReceivedEvent event) {
+ DataReceivedEventImpl eventImpl = (DataReceivedEventImpl) event;
+ PartitionRanges partitionRanges = rangesByTopic
+ .computeIfAbsent(event.getPartitionSession().getPath(), path -> new ConcurrentHashMap<>())
+ .computeIfAbsent(event.getPartitionSession(), PartitionRanges::new);
+ partitionRanges.add(eventImpl.getOffsetsToCommit());
+ }
+
+ @Override
+ public CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction,
+ UpdateOffsetsInTransactionSettings settings) {
+ Map> offsets = new HashMap<>();
+ rangesByTopic.forEach((path, topicRanges) -> {
+ offsets.put(path, topicRanges.entrySet().stream()
+ .map(partitionRange ->
+ new PartitionOffsets(partitionRange.getKey(), partitionRange.getValue().getOffsetsRanges()))
+ .collect(Collectors.toList()));
+ });
+ return reader.updateOffsetsInTransaction(transaction, offsets, settings);
+ }
+
+}
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java
index b65789434..e1e0a1371 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java
@@ -1,10 +1,12 @@
package tech.ydb.topic.read.impl.events;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.Message;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.read.PartitionOffsets;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.CommitterImpl;
@@ -32,6 +34,11 @@ public List getMessages() {
return messages;
}
+ @Override
+ public PartitionOffsets getPartitionOffsets() {
+ return new PartitionOffsets(partitionSession.getSessionInfo(), Collections.singletonList(offsetsToCommit));
+ }
+
@Override
public PartitionSession getPartitionSession() {
return partitionSession.getSessionInfo();
diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java
index 9fa4632e7..200329ae3 100644
--- a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java
@@ -2,7 +2,7 @@
import java.util.function.Consumer;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.StartPartitionSessionEvent;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java
index fbc141a06..1043f0f29 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java
@@ -2,6 +2,9 @@
import tech.ydb.core.settings.OperationSettings;
+/**
+ * @author Nikolay Perfilov
+ */
public class CommitOffsetSettings extends OperationSettings {
private final long partitionId;
private final String consumer;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java
index 89c993415..28333bce6 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java
@@ -2,6 +2,9 @@
import tech.ydb.core.settings.OperationSettings;
+/**
+ * @author Nikolay Perfilov
+ */
public class DescribeTopicSettings extends OperationSettings {
/* TODO: renew api and add stats
private boolean includeStats = false;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java
new file mode 100644
index 000000000..ed16829f9
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java
@@ -0,0 +1,81 @@
+package tech.ydb.topic.settings;
+
+import java.util.concurrent.TimeUnit;
+
+import tech.ydb.common.transaction.YdbTransaction;
+
+/**
+ * @author Nikolay Perfilov
+ */
+public class ReceiveSettings {
+ private Long timeout;
+ private TimeUnit timeoutTimeUnit;
+ private final YdbTransaction transaction;
+
+ private ReceiveSettings(Builder builder) {
+ this.timeout = builder.timeout;
+ this.timeoutTimeUnit = builder.timeoutTimeUnit;
+ this.transaction = builder.transaction;
+ }
+
+ public Long getTimeout() {
+ return timeout;
+ }
+
+ public TimeUnit getTimeoutTimeUnit() {
+ return timeoutTimeUnit;
+ }
+
+ public YdbTransaction getTransaction() {
+ return transaction;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * BUILDER
+ */
+ public static class Builder {
+ private Long timeout;
+ private TimeUnit timeoutTimeUnit;
+ private YdbTransaction transaction;
+
+ /**
+ * Set timeout for receiving a message.
+ *
+ * @param timeout timeout for receiving a message
+ * @param unit {@link TimeUnit} for timeout
+ * @return Builder
+ */
+ public Builder setTimeout(long timeout, TimeUnit unit) {
+ this.timeout = timeout;
+ this.timeoutTimeUnit = unit;
+ return this;
+ }
+
+ /**
+ * Set transaction for receiving message.
+ * When this transaction is committed, the message will be considered by server as read (committed)
+ * If this transaction is rolled back, the reader will restart reading stream internally
+ *
+ * @param transaction Transaction to link a message with.
+ * Transaction has to be active
+ * @return Builder
+ */
+ public Builder setTransaction(YdbTransaction transaction) {
+ if (!transaction.isActive()) {
+ throw new IllegalArgumentException("Transaction is not active. " +
+ "Can only write topic messages in already running transactions from other services");
+ }
+ this.transaction = transaction;
+ return this;
+ }
+
+ public ReceiveSettings build() {
+ return new ReceiveSettings(this);
+ }
+
+ }
+}
diff --git a/topic/src/main/java/tech/ydb/topic/settings/SendSettings.java b/topic/src/main/java/tech/ydb/topic/settings/SendSettings.java
new file mode 100644
index 000000000..50763658f
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/settings/SendSettings.java
@@ -0,0 +1,52 @@
+package tech.ydb.topic.settings;
+
+import tech.ydb.common.transaction.YdbTransaction;
+
+/**
+ * @author Nikolay Perfilov
+ */
+public class SendSettings {
+ private final YdbTransaction transaction;
+
+ private SendSettings(Builder builder) {
+ this.transaction = builder.transaction;
+ }
+
+ public YdbTransaction getTransaction() {
+ return transaction;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * BUILDER
+ */
+ public static class Builder {
+ private YdbTransaction transaction;
+
+ /**
+ * Set transaction for sending message.
+ * When this transaction is committed, the message will be considered by server as written
+ * If this transaction is rolled back, the writer will be shut down
+ *
+ * @param transaction Transaction to link a message with.
+ * Transaction has to be active
+ * @return Builder
+ */
+ public Builder setTransaction(YdbTransaction transaction) {
+ if (!transaction.isActive()) {
+ throw new IllegalArgumentException("Transaction is not active. " +
+ "Can only write topic messages in already running transactions from other services");
+ }
+ this.transaction = transaction;
+ return this;
+ }
+
+ public SendSettings build() {
+ return new SendSettings(this);
+ }
+
+ }
+}
diff --git a/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java b/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java
index b61b40ccb..18b4db578 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java
@@ -1,5 +1,8 @@
package tech.ydb.topic.settings;
+/**
+ * @author Nikolay Perfilov
+ */
public class StartPartitionSessionSettings {
private final Long readOffset;
private final Long commitOffset;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/UpdateOffsetsInTransactionSettings.java b/topic/src/main/java/tech/ydb/topic/settings/UpdateOffsetsInTransactionSettings.java
new file mode 100644
index 000000000..3bef7b273
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/settings/UpdateOffsetsInTransactionSettings.java
@@ -0,0 +1,23 @@
+package tech.ydb.topic.settings;
+
+import tech.ydb.core.settings.OperationSettings;
+
+/**
+ * @author Nikolay Perfilov
+ */
+public class UpdateOffsetsInTransactionSettings extends OperationSettings {
+ private UpdateOffsetsInTransactionSettings(Builder builder) {
+ super(builder);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder extends OperationBuilder {
+ @Override
+ public UpdateOffsetsInTransactionSettings build() {
+ return new UpdateOffsetsInTransactionSettings(this);
+ }
+ }
+}
diff --git a/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java b/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java
index 1cf77f2ae..e7297e1e5 100644
--- a/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java
+++ b/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java
@@ -4,6 +4,8 @@
import io.grpc.ExperimentalApi;
+import tech.ydb.topic.settings.SendSettings;
+
/**
* @author Nikolay Perfilov
*/
@@ -23,6 +25,14 @@ public interface AsyncWriter {
*/
CompletableFuture send(Message message) throws QueueOverflowException;
+ /**
+ * Send message. Non-blocking
+ * @param message message data to write
+ * @param settings send settings
+ * @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement
+ */
+ CompletableFuture send(Message message, SendSettings settings) throws QueueOverflowException;
+
/**
* Stops internal threads and makes cleanup in background. Non-blocking
*/
diff --git a/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java b/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java
index 95d741155..a677da58b 100644
--- a/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java
+++ b/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java
@@ -6,6 +6,8 @@
import io.grpc.ExperimentalApi;
+import tech.ydb.topic.settings.SendSettings;
+
/**
* @author Nikolay Perfilov
@@ -27,20 +29,43 @@ public interface SyncWriter {
/**
* Send message. Blocks infinitely until the message is put into sending buffer.
* @param message message data to write
+ * @param settings send settings
*/
- void send(Message message);
+ void send(Message message, SendSettings settings);
/**
* Send message. Blocks until the message is put into sending buffer.
* If in-flight or memory usage limits is reached, waits until timeout expires and then
* throws {@link TimeoutException} if message was not put into queue
* @param message message data to write
+ * @param settings send settings
* @param timeout timeout to wait until message is punt into sending buffer
* @param unit {@link TimeUnit} for timeout
*/
- void send(Message message, long timeout, TimeUnit unit)
+ void send(Message message, SendSettings settings, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
+ /**
+ * Send message. Blocks infinitely until the message is put into sending buffer.
+ * @param message message data to write
+ */
+ default void send(Message message) {
+ send(message, SendSettings.newBuilder().build());
+ }
+
+ /**
+ * Send message. Blocks until the message is put into sending buffer.
+ * If in-flight or memory usage limits is reached, waits until timeout expires and then
+ * throws {@link TimeoutException} if message was not put into queue
+ * @param message message data to write
+ * @param timeout timeout to wait until message is punt into sending buffer
+ * @param unit {@link TimeUnit} for timeout
+ */
+ default void send(Message message, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ send(message, SendSettings.newBuilder().build(), timeout, unit);
+ }
+
/**
* Waits until all current writes will be sent to server and response will be received. Blocking
*/
diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java
index e611d5bf5..c152e0b8c 100644
--- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java
+++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java
@@ -1,7 +1,5 @@
package tech.ydb.topic.write;
-import javax.annotation.Nullable;
-
/**
* @author Nikolay Perfilov
*/
@@ -41,7 +39,10 @@ public State getState() {
return state;
}
- @Nullable
+ /**
+ * Get details about written offsets
+ * @return {@link Details} with written offsets if state is {@link State#WRITTEN} or null otherwise
+ */
public Details getDetails() {
return details;
}
diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java
index 169a7450a..0c21ded55 100644
--- a/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java
@@ -5,6 +5,7 @@
import java.util.concurrent.Executor;
import tech.ydb.topic.TopicRpc;
+import tech.ydb.topic.settings.SendSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.write.AsyncWriter;
import tech.ydb.topic.write.InitResult;
@@ -27,9 +28,9 @@ public CompletableFuture init() {
}
@Override
- public CompletableFuture send(Message message) throws QueueOverflowException {
+ public CompletableFuture send(Message message, SendSettings settings) throws QueueOverflowException {
try {
- return sendImpl(message, true).join();
+ return sendImpl(message, settings, true).join();
} catch (CompletionException e) {
if (e.getCause() instanceof QueueOverflowException) {
throw (QueueOverflowException) e.getCause();
@@ -39,6 +40,11 @@ public CompletableFuture send(Message message) throws QueueOverflowExc
}
}
+ @Override
+ public CompletableFuture send(Message message) throws QueueOverflowException {
+ return send(message, null);
+ }
+
@Override
public CompletableFuture shutdown() {
return shutdownImpl();
diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java
index 30e9712ea..c2c372a49 100644
--- a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java
+++ b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java
@@ -3,6 +3,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import tech.ydb.common.transaction.YdbTransaction;
+import tech.ydb.topic.settings.SendSettings;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.WriteAck;
@@ -12,12 +14,14 @@ public class EnqueuedMessage {
private final AtomicBoolean isCompressed = new AtomicBoolean();
private final AtomicBoolean isProcessingFailed = new AtomicBoolean();
private final long uncompressedSizeBytes;
+ private final YdbTransaction transaction;
private long compressedSizeBytes;
private Long seqNo;
- public EnqueuedMessage(Message message) {
+ public EnqueuedMessage(Message message, SendSettings sendSettings) {
this.message = message;
this.uncompressedSizeBytes = message.getData().length;
+ this.transaction = sendSettings != null ? sendSettings.getTransaction() : null;
}
public Message getMessage() {
@@ -67,4 +71,8 @@ public Long getSeqNo() {
public void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}
+
+ public YdbTransaction getTransaction() {
+ return transaction;
+ }
}
diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java b/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java
index b19197307..c1ad47630 100644
--- a/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java
+++ b/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java
@@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.MetadataItem;
@@ -34,6 +35,7 @@ public class MessageSender {
private long totalMessageDataProtoSize;
private YdbTopic.StreamWriteMessage.WriteRequest.Builder writeRequestBuilder;
private int messageCount;
+ private YdbTransaction currentTransaction;
public MessageSender(WriterSettings settings) {
this.settings = settings;
@@ -94,6 +96,11 @@ public void addMessage(YdbTopic.StreamWriteMessage.WriteRequest.MessageData mess
}
public void sendWriteRequest() {
+ if (currentTransaction != null) {
+ writeRequestBuilder.setTx(YdbTopic.TransactionIdentity.newBuilder()
+ .setId(currentTransaction.getId())
+ .setSession(currentTransaction.getSessionId()));
+ }
YdbTopic.StreamWriteMessage.FromClient fromClient = YdbTopic.StreamWriteMessage.FromClient.newBuilder()
.setWriteRequest(writeRequestBuilder)
.build();
@@ -133,6 +140,12 @@ public void sendWriteRequest() {
}
public void tryAddMessageToRequest(EnqueuedMessage message) {
+ if (message.getTransaction() != currentTransaction) {
+ if (messageCount > 0) {
+ sendWriteRequest();
+ }
+ currentTransaction = message.getTransaction();
+ }
long messageSeqNo = message.getSeqNo() == null
? (message.getMessage().getSeqNo() == null ? ++seqNo : message.getMessage().getSeqNo())
: message.getSeqNo();
@@ -140,8 +153,6 @@ public void tryAddMessageToRequest(EnqueuedMessage message) {
message.setSeqNo(messageSeqNo);
}
-
-
YdbTopic.StreamWriteMessage.WriteRequest.MessageData.Builder messageDataBuilder =
YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder()
.setSeqNo(messageSeqNo)
diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java
index 45f8e668b..31de1ca2b 100644
--- a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java
@@ -6,6 +6,7 @@
import java.util.concurrent.TimeoutException;
import tech.ydb.topic.TopicRpc;
+import tech.ydb.topic.settings.SendSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.write.InitResult;
import tech.ydb.topic.write.Message;
@@ -32,14 +33,14 @@ public InitResult initAndWait() {
}
@Override
- public void send(Message message) {
- sendImpl(message, false).join();
+ public void send(Message message, SendSettings sendSettings) {
+ sendImpl(message, sendSettings, false).join();
}
@Override
- public void send(Message message, long timeout, TimeUnit unit)
+ public void send(Message message, SendSettings sendSettings, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- sendImpl(message, false).get(timeout, unit);
+ sendImpl(message, sendSettings, false).get(timeout, unit);
}
@Override
diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
index 6438f3e76..918faf07b 100644
--- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java
@@ -23,6 +23,7 @@
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.impl.GrpcStreamRetrier;
+import tech.ydb.topic.settings.SendSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.Encoder;
import tech.ydb.topic.write.InitResult;
@@ -229,7 +230,8 @@ protected CompletableFuture initImpl() {
// Outer future completes when message is put (or declined) into send buffer
// Inner future completes on receiving write ack from server
- protected CompletableFuture> sendImpl(Message message, boolean instant) {
+ protected CompletableFuture> sendImpl(Message message, SendSettings sendSettings,
+ boolean instant) {
if (isStopped.get()) {
throw new RuntimeException("Writer is already stopped");
}
@@ -248,7 +250,7 @@ protected CompletableFuture> sendImpl(Message messag
isSeqNoProvided = message.getSeqNo() != null;
}
- EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message);
+ EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message, sendSettings);
return tryToEnqueue(enqueuedMessage, instant).thenApply(v -> enqueuedMessage.getFuture());
}
@@ -325,7 +327,7 @@ private WriteSessionImpl() {
public void startAndInitialize() {
logger.debug("[{}] Session {} startAndInitialize called", fullId, sessionId);
- start(this::processMessage).whenComplete(this::onSessionClosing);
+ start(this::processMessage).whenComplete(this::closeDueToError);
YdbTopic.StreamWriteMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamWriteMessage.InitRequest
.newBuilder()
@@ -501,10 +503,9 @@ private void processWriteAck(EnqueuedMessage message,
message.getFuture().complete(resultAck);
}
- private void onSessionClosing(Status status, Throwable th) {
- logger.info("[{}] Session {} onSessionClosing called", fullId, sessionId);
- if (isWorking.get()) {
- shutdown();
+ private void closeDueToError(Status status, Throwable th) {
+ logger.info("[{}] Session {} closeDueToError called", fullId, sessionId);
+ if (shutdown()) {
// Signal writer to retry
onSessionClosed(status, th);
}
diff --git a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java
index 02a841422..2d4dec3b7 100644
--- a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java
+++ b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java
@@ -4,7 +4,7 @@
import org.junit.Assert;
import org.junit.Test;
-import tech.ydb.topic.read.OffsetsRange;
+import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.impl.DisjointOffsetRangeSet;
import tech.ydb.topic.read.impl.OffsetsRangeImpl;