diff --git a/bom/pom.xml b/bom/pom.xml index e8ddac354..702d14540 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -55,6 +55,11 @@ ${ydb-auth-api.version} + + tech.ydb + ydb-sdk-common + ${project.version} + tech.ydb ydb-sdk-core diff --git a/common/pom.xml b/common/pom.xml new file mode 100644 index 000000000..99b593a62 --- /dev/null +++ b/common/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + + tech.ydb + ydb-sdk-parent + 2.2.0-SNAPSHOT + + + ydb-sdk-common + Common module of Java SDK for YDB + Common module of Java SDK for YDB + + + UTF-8 + + + + + tech.ydb + ydb-sdk-core + + + org.slf4j + slf4j-api + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + diff --git a/query/src/main/java/tech/ydb/query/QueryTx.java b/common/src/main/java/tech/ydb/common/transaction/TxMode.java similarity index 72% rename from query/src/main/java/tech/ydb/query/QueryTx.java rename to common/src/main/java/tech/ydb/common/transaction/TxMode.java index 6c9399a94..5e6307a29 100644 --- a/query/src/main/java/tech/ydb/query/QueryTx.java +++ b/common/src/main/java/tech/ydb/common/transaction/TxMode.java @@ -1,10 +1,10 @@ -package tech.ydb.query; +package tech.ydb.common.transaction; /** * * @author Aleksandr Gorshenin */ -public enum QueryTx { +public enum TxMode { NONE, SERIALIZABLE_RW, diff --git a/common/src/main/java/tech/ydb/common/transaction/YdbTransaction.java b/common/src/main/java/tech/ydb/common/transaction/YdbTransaction.java new file mode 100644 index 000000000..56757c744 --- /dev/null +++ b/common/src/main/java/tech/ydb/common/transaction/YdbTransaction.java @@ -0,0 +1,37 @@ +package tech.ydb.common.transaction; + +import java.util.concurrent.CompletableFuture; + +import tech.ydb.core.Status; + +/** + * A base interface for all YDB transactions from different services + * @author Nikolay Perfilov + */ +public interface YdbTransaction { + + /** + * Returns identifier of the transaction or null if the transaction is not active = (not + * started/committed/rolled back). When {@link YdbTransaction} is not active - any query on this object + * starts a new transaction on server. When transaction is active any call of {@code commit}, + * {@code rollback} or execution of any query with {@code commitAtEnd}=true finishes this transaction + * + * @return identifier of the transaction or null if the transaction is not active + */ + String getId(); + + /** + * Returns {@link TxMode} with mode of the transaction + * + * @return the transaction mode + */ + TxMode getTxMode(); + + default boolean isActive() { + return getId() != null; + } + + String getSessionId(); + + CompletableFuture getStatusFuture(); +} diff --git a/common/src/main/java/tech/ydb/common/transaction/impl/YdbTransactionImpl.java b/common/src/main/java/tech/ydb/common/transaction/impl/YdbTransactionImpl.java new file mode 100644 index 000000000..5af6f3ffe --- /dev/null +++ b/common/src/main/java/tech/ydb/common/transaction/impl/YdbTransactionImpl.java @@ -0,0 +1,38 @@ +package tech.ydb.common.transaction.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.core.Status; + +/** + * @author Nikolay Perfilov + */ +public abstract class YdbTransactionImpl implements YdbTransaction { + protected final TxMode txMode; + protected final AtomicReference txId; + protected final AtomicReference> statusFuture = new AtomicReference<>( + new CompletableFuture<>()); + + protected YdbTransactionImpl(TxMode txMode, String txId) { + this.txMode = txMode; + this.txId = new AtomicReference<>(txId); + } + + @Override + public String getId() { + return txId.get(); + } + + @Override + public TxMode getTxMode() { + return txMode; + } + + @Override + public CompletableFuture getStatusFuture() { + return statusFuture.get(); + } +} diff --git a/pom.xml b/pom.xml index 65de259c3..c9a1255e9 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,7 @@ bom + common core table scheme diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java index dbbd4876b..69d07cd41 100644 --- a/query/src/main/java/tech/ydb/query/QuerySession.java +++ b/query/src/main/java/tech/ydb/query/QuerySession.java @@ -2,6 +2,7 @@ import java.util.concurrent.CompletableFuture; +import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.query.settings.BeginTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; @@ -40,7 +41,7 @@ public interface QuerySession extends AutoCloseable { * @param txMode transaction mode * @return new implicit transaction */ - QueryTransaction createNewTransaction(QueryTx txMode); + QueryTransaction createNewTransaction(TxMode txMode); /** * Create and start a new active {@link QueryTransaction}. This method creates a transaction on the server @@ -50,10 +51,10 @@ public interface QuerySession extends AutoCloseable { * @param settings additional settings for request * @return future with result of the transaction starting */ - CompletableFuture> beginTransaction(QueryTx txMode, BeginTransactionSettings settings); + CompletableFuture> beginTransaction(TxMode txMode, BeginTransactionSettings settings); /** - * Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and * DCL statements. Supported mix of different statement types depends on the chosen transaction type. * * @param query text of query @@ -62,13 +63,13 @@ public interface QuerySession extends AutoCloseable { * @param settings additional settings of query execution * @return a ready to execute instance of {@link QueryStream} */ - QueryStream createQuery(String query, QueryTx tx, Params params, ExecuteQuerySettings settings); + QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings); @Override void close(); /** - * Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and * DCL statements. Supported mix of different statement types depends on the chosen transaction type. * * @param query text of query @@ -76,19 +77,19 @@ public interface QuerySession extends AutoCloseable { * @param params query parameters * @return a ready to execute instance of {@link QueryStream} */ - default QueryStream createQuery(String query, QueryTx tx, Params params) { + default QueryStream createQuery(String query, TxMode tx, Params params) { return createQuery(query, tx, params, ExecuteQuerySettings.newBuilder().build()); } /** - * Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and * DCL statements. Supported mix of different statement types depends on the chosen transaction type. * * @param query text of query * @param tx transaction mode * @return a ready to execute instance of {@link QueryStream} */ - default QueryStream createQuery(String query, QueryTx tx) { + default QueryStream createQuery(String query, TxMode tx) { return createQuery(query, tx, Params.empty(), ExecuteQuerySettings.newBuilder().build()); } @@ -99,7 +100,7 @@ default QueryStream createQuery(String query, QueryTx tx) { * @param txMode transaction mode * @return future with result of the transaction starting */ - default CompletableFuture> beginTransaction(QueryTx txMode) { + default CompletableFuture> beginTransaction(TxMode txMode) { return beginTransaction(txMode, BeginTransactionSettings.newBuilder().build()); } } diff --git a/query/src/main/java/tech/ydb/query/QueryTransaction.java b/query/src/main/java/tech/ydb/query/QueryTransaction.java index bf262bf27..4780c62df 100644 --- a/query/src/main/java/tech/ydb/query/QueryTransaction.java +++ b/query/src/main/java/tech/ydb/query/QueryTransaction.java @@ -2,6 +2,7 @@ import java.util.concurrent.CompletableFuture; +import tech.ydb.common.transaction.YdbTransaction; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.query.result.QueryInfo; @@ -11,6 +12,7 @@ import tech.ydb.table.query.Params; /** + * Interface of transaction from query service * Short-living object allows transactional execution of several queries in one interactive transaction. * QueryTransaction can be used in implicit mode - without calling commit()/rollback(). When QueryTransaction is not * active - any execution of query with commitAtEnd=false starts a new transaction. And execution of query with @@ -18,29 +20,7 @@ * * @author Aleksandr Gorshenin */ -public interface QueryTransaction { - - /** - * Returns identifier of the transaction or null if the transaction is not active = (not - * started/committed/rolled back). When {@link QueryTransaction} is not active - any execution of the query created - * by {@code createQuery} starts a new transaction. When QueryTransaction is active - any call of {@code commit}, - * {@code rollback} or execution of the query created by {@code createQuery} with {@code commitAtEnd}=true finishes - * the transaction - * - * @return identifier of the transaction or null if the transaction is not active - */ - String getId(); - - /** - * Returns {@link QueryTx} with mode of the transaction - * - * @return the transaction mode - */ - QueryTx getQueryTx(); - - default boolean isActive() { - return getId() != null; - } +public interface QueryTransaction extends YdbTransaction { /** * Returns {@link QuerySession} that was used for creating the transaction diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 22c70f66b..05a1d01b6 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -12,6 +12,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.common.transaction.impl.YdbTransactionImpl; import tech.ydb.core.Issue; import tech.ydb.core.Result; import tech.ydb.core.Status; @@ -26,7 +28,6 @@ import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; import tech.ydb.query.QueryTransaction; -import tech.ydb.query.QueryTx; import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryResultPart; import tech.ydb.query.result.QueryStats; @@ -67,7 +68,7 @@ abstract class SessionImpl implements QuerySession { this.sessionId = response.getSessionId(); this.nodeID = getNodeBySessionId(response.getSessionId(), response.getNodeId()); this.isTraceEnabled = logger.isTraceEnabled(); - this.transaction = new AtomicReference<>(new TransactionImpl(QueryTx.SERIALIZABLE_RW, null)); + this.transaction = new AtomicReference<>(new TransactionImpl(TxMode.SERIALIZABLE_RW, null)); } private static Long getNodeBySessionId(String sessionId, long defaultValue) { @@ -99,14 +100,14 @@ public QueryTransaction currentTransaction() { } @Override - public QueryTransaction createNewTransaction(QueryTx txMode) { + public QueryTransaction createNewTransaction(TxMode txMode) { return updateTransaction(new TransactionImpl(txMode, null)); } public abstract void updateSessionState(Status status); @Override - public CompletableFuture> beginTransaction(QueryTx tx, BeginTransactionSettings settings) { + public CompletableFuture> beginTransaction(TxMode tx, BeginTransactionSettings settings) { YdbQuery.BeginTransactionRequest request = YdbQuery.BeginTransactionRequest.newBuilder() .setSessionId(sessionId) .setTxSettings(TxControl.txSettings(tx)) @@ -201,7 +202,7 @@ GrpcReadStream createGrpcStream( } @Override - public QueryStream createQuery(String query, QueryTx tx, Params prms, ExecuteQuerySettings settings) { + public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); return new StreamImpl(createGrpcStream(query, tc, prms, settings)) { @Override @@ -251,6 +252,8 @@ abstract class StreamImpl implements QueryStream { } abstract void handleTxMeta(YdbQuery.TransactionMeta meta); + void handleCompletion(Status status, Throwable th) { + } @Override public CompletableFuture> execute(PartsHandler handler) { @@ -292,6 +295,7 @@ public CompletableFuture> execute(PartsHandler handler) { } } }).whenComplete((status, th) -> { + handleCompletion(status, th); if (th != null) { result.completeExceptionally(th); } @@ -312,23 +316,15 @@ public void cancel() { } } - class TransactionImpl implements QueryTransaction { - private final QueryTx txMode; - private final AtomicReference txId; - - TransactionImpl(QueryTx tx, String txID) { - this.txMode = tx; - this.txId = new AtomicReference<>(txID); - } + class TransactionImpl extends YdbTransactionImpl implements QueryTransaction { - @Override - public String getId() { - return txId.get(); + TransactionImpl(TxMode txMode, String txId) { + super(txMode, txId); } @Override - public QueryTx getQueryTx() { - return txMode; + public String getSessionId() { + return sessionId; } @Override @@ -338,6 +334,10 @@ public QuerySession getSession() { @Override public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, ExecuteQuerySettings settings) { + // If we intend to commit, statusFuture is reset to reflect only future actions in transaction + CompletableFuture currentStatusFuture = commitAtEnd + ? statusFuture.getAndSet(new CompletableFuture<>()) + : statusFuture.get(); final String currentId = txId.get(); YdbQuery.TransactionControl tc = currentId != null ? TxControl.txIdCtrl(currentId, commitAtEnd) @@ -351,13 +351,31 @@ void handleTxMeta(YdbQuery.TransactionMeta meta) { logger.warn("{} lost transaction meta id {}", SessionImpl.this, newId); } } + @Override + void handleCompletion(Status status, Throwable th) { + if (th != null) { + currentStatusFuture.completeExceptionally( + new RuntimeException("Query on transaction failed with exception ", th)); + } + if (status.isSuccess()) { + if (commitAtEnd) { + currentStatusFuture.complete(Status.SUCCESS); + } + } else { + currentStatusFuture.complete(Status + .of(StatusCode.ABORTED) + .withIssues(Issue.of("Query on transaction failed with status " + + status, Issue.Severity.ERROR))); + } + } }; } @Override public CompletableFuture> commit(CommitTransactionSettings settings) { - final String trasactionId = txId.get(); - if (trasactionId == null) { + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); + final String transactionId = txId.get(); + if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); Result res = Result.success(new QueryInfo(null), Status.of(StatusCode.SUCCESS, null, issue)); return CompletableFuture.completedFuture(res); @@ -365,40 +383,54 @@ public CompletableFuture> commit(CommitTransactionSettings set YdbQuery.CommitTransactionRequest request = YdbQuery.CommitTransactionRequest.newBuilder() .setSessionId(sessionId) - .setTxId(trasactionId) + .setTxId(transactionId) .build(); GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings); - return rpc.commitTransaction(request, grpcSettings).thenApply(res -> { - updateSessionState(res.getStatus()); - if (!txId.compareAndSet(trasactionId, null)) { - logger.warn("{} lost commit response for transaction {}", SessionImpl.this, trasactionId); - } - // TODO: CommitTrasactionResponse must contain exec_stats - return res.map(resp -> new QueryInfo(null)); - }); + return rpc.commitTransaction(request, grpcSettings) + .thenApply(res -> { + Status status = res.getStatus(); + currentStatusFuture.complete(status); + updateSessionState(status); + if (!txId.compareAndSet(transactionId, null)) { + logger.warn("{} lost commit response for transaction {}", SessionImpl.this, transactionId); + } + // TODO: CommitTransactionResponse must contain exec_stats + return res.map(resp -> new QueryInfo(null)); + }).whenComplete(((status, th) -> { + if (th != null) { + currentStatusFuture.completeExceptionally( + new RuntimeException("Transaction commit failed with exception", th)); + } + })); } @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { - final String trasactionId = txId.get(); + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); + final String transactionId = txId.get(); - if (trasactionId == null) { + if (transactionId == null) { Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); return CompletableFuture.completedFuture(Status.of(StatusCode.SUCCESS, null, issue)); } YdbQuery.RollbackTransactionRequest request = YdbQuery.RollbackTransactionRequest.newBuilder() .setSessionId(sessionId) - .setTxId(trasactionId) + .setTxId(transactionId) .build(); GrpcRequestSettings grpcSettings = makeGrpcRequestSettings(settings); - return rpc.rollbackTransaction(request, grpcSettings).thenApply(result -> { - updateSessionState(result.getStatus()); - if (!txId.compareAndSet(trasactionId, null)) { - logger.warn("{} lost rollback response for transaction {}", SessionImpl.this, trasactionId); - } - return result.getStatus(); - }); + return rpc.rollbackTransaction(request, grpcSettings) + .thenApply(result -> { + updateSessionState(result.getStatus()); + if (!txId.compareAndSet(transactionId, null)) { + logger.warn("{} lost rollback response for transaction {}", SessionImpl.this, + transactionId); + } + return result.getStatus(); + }) + .whenComplete((status, th) -> currentStatusFuture.complete(Status + .of(StatusCode.ABORTED) + .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR)))); } } } diff --git a/query/src/main/java/tech/ydb/query/impl/TxControl.java b/query/src/main/java/tech/ydb/query/impl/TxControl.java index bedde690d..e0174958f 100644 --- a/query/src/main/java/tech/ydb/query/impl/TxControl.java +++ b/query/src/main/java/tech/ydb/query/impl/TxControl.java @@ -1,7 +1,7 @@ package tech.ydb.query.impl; +import tech.ydb.common.transaction.TxMode; import tech.ydb.proto.query.YdbQuery; -import tech.ydb.query.QueryTx; /** * @@ -31,7 +31,7 @@ class TxControl { private TxControl() { } - public static YdbQuery.TransactionControl txModeCtrl(QueryTx tx, boolean commitTx) { + public static YdbQuery.TransactionControl txModeCtrl(TxMode tx, boolean commitTx) { YdbQuery.TransactionSettings ts = txSettings(tx); if (ts == null) { return null; @@ -49,7 +49,7 @@ public static YdbQuery.TransactionControl txIdCtrl(String txId, boolean commitTx .build(); } - public static YdbQuery.TransactionSettings txSettings(QueryTx tx) { + public static YdbQuery.TransactionSettings txSettings(TxMode tx) { switch (tx) { case SERIALIZABLE_RW: return TS_SERIALIZABLE; diff --git a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java index 7c7e43894..94921659b 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java @@ -21,7 +21,7 @@ import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; -import tech.ydb.query.QueryTx; +import tech.ydb.common.transaction.TxMode; import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryResultPart; import tech.ydb.query.settings.ExecuteQuerySettings; @@ -123,7 +123,7 @@ public static void dropAll() { public void testSimpleSelect() { try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { QueryReader reader = QueryReader.readFrom( - session.createQuery("SELECT 2 + 3;", QueryTx.SERIALIZABLE_RW) + session.createQuery("SELECT 2 + 3;", TxMode.SERIALIZABLE_RW) ).join().getValue(); @@ -152,7 +152,7 @@ public void testSimplePrepare() { .build(); QueryReader reader = QueryReader.readFrom( - session.createQuery(query, QueryTx.NONE, Params.empty(), settings) + session.createQuery(query, TxMode.NONE, Params.empty(), settings) ).join().getValue(); @@ -210,7 +210,7 @@ public void testSimpleCRUD() { ); try (QuerySession session = queryClient.createSession(SESSION_TIMEOUT).join().getValue()) { - session.createQuery(query, QueryTx.SERIALIZABLE_RW, params) + session.createQuery(query, TxMode.SERIALIZABLE_RW, params) .execute(this::printQuerySetPart) .join().getStatus().expectSuccess(); } @@ -218,13 +218,13 @@ public void testSimpleCRUD() { try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { String query = "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;"; - session.createQuery(query, QueryTx.SERIALIZABLE_RW) + session.createQuery(query, TxMode.SERIALIZABLE_RW) .execute(this::printQuerySetPart) .join().getStatus().expectSuccess(); } try (QuerySession session = queryClient.createSession(SESSION_TIMEOUT).join().getValue()) { - session.createQuery("DELETE FROM " + TEST_TABLE, QueryTx.SERIALIZABLE_RW) + session.createQuery("DELETE FROM " + TEST_TABLE, TxMode.SERIALIZABLE_RW) .execute(this::printQuerySetPart) .join().getStatus().expectSuccess(); } @@ -242,14 +242,17 @@ public void printQuerySetPart(QueryResultPart part) { public void updateMultipleTablesInOneTransaction() { try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { - QueryTransaction tx = session.createNewTransaction(QueryTx.SERIALIZABLE_RW); + QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Assert.assertFalse(tx.isActive()); QueryReader.readFrom( tx.createQuery("UPDATE " + TEST_TABLE + " SET name='test' WHERE id=1") ).join().getStatus().expectSuccess(); + Assert.assertTrue(tx.isActive()); QueryReader.readFrom( tx.createQueryWithCommit("UPDATE " + TEST_DOUBLE_TABLE + " SET amount=300 WHERE id=1") ).join().getStatus().expectSuccess(); + Assert.assertFalse(tx.isActive()); } } } @@ -258,15 +261,19 @@ public void updateMultipleTablesInOneTransaction() { public void interactiveTransaction() { try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { - QueryTransaction tx = session.createNewTransaction(QueryTx.SERIALIZABLE_RW); + QueryTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Assert.assertFalse(tx.isActive()); tx.createQuery("INSERT INTO " + TEST_TABLE + " (id, name) VALUES (1, 'rec1');").execute(null) .join().getStatus().expectSuccess(); + Assert.assertTrue(tx.isActive()); tx.createQuery("INSERT INTO " + TEST_TABLE + " (id, name) VALUES (3, 'rec3');").execute(null) .join().getStatus().expectSuccess(); + Assert.assertTrue(tx.isActive()); Iterator rsIter = QueryReader.readFrom( tx.createQuery("SELECT id, name FROM " + TEST_TABLE + " ORDER BY id") ).join().getValue().iterator(); + Assert.assertTrue(tx.isActive()); Assert.assertTrue(rsIter.hasNext()); ResultSetReader rs = rsIter.next(); @@ -278,6 +285,7 @@ public void interactiveTransaction() { Assert.assertFalse(rsIter.hasNext()); tx.commit().join().getStatus().expectSuccess(); + Assert.assertFalse(tx.isActive()); tx.createQuery("INSERT INTO " + TEST_TABLE + " (id, name) VALUES (2, 'rec2');").execute(null) .join().getStatus().expectSuccess(); @@ -326,13 +334,13 @@ public void testSchemeQuery() { try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { CompletableFuture> createTable = session - .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", QueryTx.NONE) + .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", TxMode.NONE) .execute(this::printQuerySetPart); createTable.join().getStatus().expectSuccess(); CompletableFuture> dropTable = session - .createQuery("DROP TABLE demo_table;", QueryTx.NONE) + .createQuery("DROP TABLE demo_table;", TxMode.NONE) .execute(this::printQuerySetPart); dropTable.join().getStatus().expectSuccess(); } @@ -344,13 +352,13 @@ public void testQueryStats() { try (QueryClient client = QueryClient.newClient(ydbTransport).build()) { try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) { CompletableFuture> createTable = session - .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", QueryTx.NONE) + .createQuery("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));", TxMode.NONE) .execute(this::printQuerySetPart); createTable.join().getStatus().expectSuccess(); CompletableFuture> dropTable = session - .createQuery("DROP TABLE demo_table;", QueryTx.NONE) + .createQuery("DROP TABLE demo_table;", TxMode.NONE) .execute(this::printQuerySetPart); dropTable.join().getStatus().expectSuccess(); } diff --git a/table/pom.xml b/table/pom.xml index c54fca140..da51d9c98 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -24,6 +24,10 @@ tech.ydb ydb-sdk-core + + tech.ydb + ydb-sdk-common + tech.ydb.test diff --git a/table/src/main/java/tech/ydb/table/Session.java b/table/src/main/java/tech/ydb/table/Session.java index 28b24b568..c1e48dc57 100644 --- a/table/src/main/java/tech/ydb/table/Session.java +++ b/table/src/main/java/tech/ydb/table/Session.java @@ -3,6 +3,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; @@ -34,6 +35,7 @@ import tech.ydb.table.settings.ReadTableSettings; import tech.ydb.table.settings.RenameTablesSettings; import tech.ydb.table.settings.RollbackTxSettings; +import tech.ydb.table.transaction.TableTransaction; import tech.ydb.table.transaction.Transaction; import tech.ydb.table.transaction.TxControl; import tech.ydb.table.values.ListValue; @@ -41,6 +43,7 @@ /** * @author Sergey Polovko + * @author Nikolay Perfilov */ public interface Session extends AutoCloseable { enum State { @@ -83,10 +86,54 @@ CompletableFuture> executeDataQuery( CompletableFuture> explainDataQuery(String query, ExplainDataQuerySettings settings); + /** + * @deprecated + * Use {@link Session#beginTransaction(TxMode, BeginTxSettings)} instead + */ + @Deprecated CompletableFuture> beginTransaction(Transaction.Mode transactionMode, BeginTxSettings settings); + /** + * Create a new not active {@link TableTransaction}. This TableDescription will have no identifier and + * starts a transaction on server by execution a query + * @param txMode transaction mode + * @return new implicit transaction + */ + TableTransaction createNewTransaction(TxMode txMode); + + /** + * Create and start a new active {@link TableTransaction}. This method creates a transaction on the server + * and returns TableDescription which is ready to execute queries on this server transaction + * + * @param txMode transaction mode + * @param settings additional settings for request + * @return future with result of the transaction starting + */ + CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings); + + /** + * Create and start a new active {@link TableTransaction}. This method creates a transaction on the server + * and returns TableDescription which is ready to execute queries on this server transaction + * + * @param txMode transaction mode + * @return future with result of the transaction starting + */ + default CompletableFuture> beginTransaction(TxMode txMode) { + return beginTransaction(txMode, new BeginTxSettings()); + } + + /** + * @deprecated + * Use {@link TableTransaction#commit()} ()} instead + */ + @Deprecated CompletableFuture commitTransaction(String txId, CommitTxSettings settings); + /** + * @deprecated + * Use {@link TableTransaction#rollback()} instead + */ + @Deprecated CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings); GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings); @@ -165,6 +212,11 @@ default CompletableFuture> explainDataQuery(Strin return explainDataQuery(query, new ExplainDataQuerySettings()); } + /** + * @deprecated + * Use {@link Session#beginTransaction(TxMode)} instead + */ + @Deprecated default CompletableFuture> beginTransaction(Transaction.Mode transactionMode) { return beginTransaction(transactionMode, new BeginTxSettings()); } diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 5fad1ca02..1c53ebd89 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -19,6 +19,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.transaction.TxMode; +import tech.ydb.common.transaction.impl.YdbTransactionImpl; import tech.ydb.core.Issue; import tech.ydb.core.Result; import tech.ydb.core.Status; @@ -79,6 +81,7 @@ import tech.ydb.table.settings.RollbackTxSettings; import tech.ydb.table.settings.StoragePolicy; import tech.ydb.table.settings.TtlSettings; +import tech.ydb.table.transaction.TableTransaction; import tech.ydb.table.transaction.Transaction; import tech.ydb.table.transaction.TxControl; import tech.ydb.table.values.ListType; @@ -94,6 +97,7 @@ /** * @author Sergey Polovko * @author Alexandr Gorshenin + * @author Nikolay Perfilov */ @ThreadSafe public abstract class BaseSession implements Session { @@ -661,13 +665,12 @@ private static YdbTable.TransactionSettings txSettings(Transaction.Mode transact return settings.build(); } - @Override - public CompletableFuture> executeDataQuery( - String query, TxControl txControl, Params params, ExecuteDataQuerySettings settings) { + private CompletableFuture> executeDataQueryInternal( + String query, YdbTable.TransactionControl txControl, Params params, ExecuteDataQuerySettings settings) { YdbTable.ExecuteDataQueryRequest.Builder request = YdbTable.ExecuteDataQueryRequest.newBuilder() .setSessionId(id) .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) - .setTxControl(txControl.toPb()) + .setTxControl(txControl) .setQuery(YdbTable.Query.newBuilder().setYqlText(query)) .setCollectStats(settings.collectStats().toPb()) .putAllParameters(params.toPb()); @@ -703,6 +706,12 @@ public CompletableFuture> executeDataQuery( .thenApply(result -> result.map(DataQueryResult::new)); } + @Override + public CompletableFuture> executeDataQuery( + String query, TxControl txControl, Params params, ExecuteDataQuerySettings settings) { + return executeDataQueryInternal(query, txControl.toPb(), params, settings); + } + @Override public CompletableFuture> readRows(String pathToTable, ReadRowsSettings settings) { YdbTable.ReadRowsRequest.Builder requestBuilder = YdbTable.ReadRowsRequest.newBuilder() @@ -821,7 +830,26 @@ public CompletableFuture> beginTransaction(Transaction.Mode final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); return interceptResultWithLog("begin transaction", tableRpc.beginTransaction(request, grpcRequestSettings)) - .thenApply(result -> result.map(tx -> new TransactionImpl(this, tx.getTxMeta().getId()))); + .thenApply(result -> result.map(tx -> new DeprecatedTransactionImpl(tx.getTxMeta().getId()))); + } + + @Override + public TableTransaction createNewTransaction(TxMode txMode) { + return new TableTransactionImpl(txMode, null); + } + + @Override + public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { + YdbTable.BeginTransactionRequest request = YdbTable.BeginTransactionRequest.newBuilder() + .setSessionId(id) + .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) + .setTxSettings(TxControlToPb.txSettings(txMode)) + .build(); + + final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings.getTimeoutDuration()); + return interceptResultWithLog("begin transaction", + tableRpc.beginTransaction(request, grpcRequestSettings)) + .thenApply(result -> result.map(tx -> new TableTransactionImpl(txMode, tx.getTxMeta().getId()))); } @Override @@ -915,8 +943,7 @@ public GrpcReadStream executeScanQuery(String query, Params par }); } - @Override - public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { + private CompletableFuture commitTransactionInternal(String txId, CommitTxSettings settings) { YdbTable.CommitTransactionRequest request = YdbTable.CommitTransactionRequest.newBuilder() .setSessionId(id) .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) @@ -929,7 +956,11 @@ public CompletableFuture commitTransaction(String txId, CommitTxSettings } @Override - public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { + public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { + return commitTransactionInternal(txId, settings); + } + + private CompletableFuture rollbackTransactionInternal(String txId, RollbackTxSettings settings) { YdbTable.RollbackTransactionRequest request = YdbTable.RollbackTransactionRequest.newBuilder() .setSessionId(id) .setOperationParams(OperationUtils.createParams(settings.toOperationSettings())) @@ -941,6 +972,12 @@ public CompletableFuture rollbackTransaction(String txId, RollbackTxSett tableRpc.rollbackTransaction(request, grpcRequestSettings)); } + @Override + @Deprecated + public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { + return rollbackTransactionInternal(txId, settings); + } + @Override public CompletableFuture> keepAlive(KeepAliveSessionSettings settings) { YdbTable.KeepAliveRequest request = YdbTable.KeepAliveRequest.newBuilder() @@ -1032,6 +1069,115 @@ public String toString() { return "Session{" + id + "}"; } + class TableTransactionImpl extends YdbTransactionImpl implements TableTransaction { + + TableTransactionImpl(TxMode txMode, String txId) { + super(txMode, txId); + } + + @Override + public String getSessionId() { + return id; + } + + @Override + public Session getSession() { + return BaseSession.this; + } + + @Override + public CompletableFuture> executeDataQuery( + String query, boolean commitAtEnd, Params params, ExecuteDataQuerySettings settings) { + // If we intend to commit, statusFuture is reset to reflect only future actions in transaction + CompletableFuture currentStatusFuture = commitAtEnd + ? statusFuture.getAndSet(new CompletableFuture<>()) + : statusFuture.get(); + final String currentId = txId.get(); + YdbTable.TransactionControl transactionControl = currentId != null + ? TxControlToPb.txIdCtrl(currentId, commitAtEnd) + : TxControlToPb.txModeCtrl(txMode, commitAtEnd); + return executeDataQueryInternal(query, transactionControl, params, settings) + .whenComplete((result, th) -> { + if (th != null) { + currentStatusFuture.completeExceptionally( + new RuntimeException("ExecuteDataQuery on transaction failed with exception ", th)); + setNewId(currentId, null); + } else if (result.isSuccess()) { + setNewId(currentId, result.getValue().getTxId()); + if (commitAtEnd) { + currentStatusFuture.complete(Status.SUCCESS); + } + } else { + setNewId(currentId, null); + currentStatusFuture.complete(Status + .of(StatusCode.ABORTED) + .withIssues(Issue.of("ExecuteDataQuery on transaction failed with status " + + result.getStatus(), Issue.Severity.ERROR))); + } + }); + } + + private void setNewId(String currentId, String newId) { + if (!txId.compareAndSet(currentId, newId)) { + logger.warn("{} Couldn't change transaction id from {} to {}", BaseSession.this, currentId, newId); + } + } + + @Override + public CompletableFuture commit(CommitTxSettings settings) { + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); + final String transactionId = txId.get(); + if (transactionId == null) { + Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); + return CompletableFuture.completedFuture(Status.of(StatusCode.SUCCESS, null, issue)); + } + return commitTransactionInternal(transactionId, settings).whenComplete(((status, th) -> { + if (th != null) { + currentStatusFuture.completeExceptionally(th); + } else { + currentStatusFuture.complete(status); + } + })); + } + + @Override + public CompletableFuture rollback(RollbackTxSettings settings) { + CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); + final String transactionId = txId.get(); + if (transactionId == null) { + Issue issue = Issue.of("Transaction is not started", Issue.Severity.WARNING); + return CompletableFuture.completedFuture(Status.of(StatusCode.SUCCESS, null, issue)); + } + return rollbackTransactionInternal(transactionId, settings) + .whenComplete((status, th) -> currentStatusFuture.complete(Status + .of(StatusCode.ABORTED) + .withIssues(Issue.of("Transaction was rolled back", Issue.Severity.ERROR)))); + } + } + + public final class DeprecatedTransactionImpl implements Transaction { + private final String txId; + + DeprecatedTransactionImpl(String txId) { + this.txId = txId; + } + + @Override + public String getId() { + return txId; + } + + @Override + public CompletableFuture commit(CommitTxSettings settings) { + return commitTransactionInternal(txId, settings); + } + + @Override + public CompletableFuture rollback(RollbackTxSettings settings) { + return rollbackTransactionInternal(txId, settings); + } + } + private static class ShutdownHandler implements Consumer { private static final String GRACEFUL_SHUTDOWN_HINT = "session-close"; private volatile boolean needShutdown; diff --git a/table/src/main/java/tech/ydb/table/impl/TransactionImpl.java b/table/src/main/java/tech/ydb/table/impl/TransactionImpl.java deleted file mode 100644 index 435386c71..000000000 --- a/table/src/main/java/tech/ydb/table/impl/TransactionImpl.java +++ /dev/null @@ -1,39 +0,0 @@ -package tech.ydb.table.impl; - -import java.util.concurrent.CompletableFuture; - -import tech.ydb.core.Status; -import tech.ydb.table.Session; -import tech.ydb.table.settings.CommitTxSettings; -import tech.ydb.table.settings.RollbackTxSettings; -import tech.ydb.table.transaction.Transaction; - - -/** - * @author Sergey Polovko - */ -final class TransactionImpl implements Transaction { - - private final Session session; - private final String txId; - - TransactionImpl(Session session, String txId) { - this.session = session; - this.txId = txId; - } - - @Override - public String getId() { - return txId; - } - - @Override - public CompletableFuture commit(CommitTxSettings settings) { - return session.commitTransaction(txId, settings); - } - - @Override - public CompletableFuture rollback(RollbackTxSettings settings) { - return session.rollbackTransaction(txId, settings); - } -} diff --git a/table/src/main/java/tech/ydb/table/impl/TxControlToPb.java b/table/src/main/java/tech/ydb/table/impl/TxControlToPb.java new file mode 100644 index 000000000..517dbad06 --- /dev/null +++ b/table/src/main/java/tech/ydb/table/impl/TxControlToPb.java @@ -0,0 +1,69 @@ +package tech.ydb.table.impl; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.proto.table.YdbTable; + +/** + * @author Aleksandr Gorshenin + * @author Nikolay Perfilov + */ +public class TxControlToPb { + private static final YdbTable.TransactionSettings TS_SERIALIZABLE = YdbTable.TransactionSettings.newBuilder() + .setSerializableReadWrite(YdbTable.SerializableModeSettings.getDefaultInstance()) + .build(); + + private static final YdbTable.TransactionSettings TS_SNAPSHOT = YdbTable.TransactionSettings.newBuilder() + .setSnapshotReadOnly(YdbTable.SnapshotModeSettings.getDefaultInstance()) + .build(); + + private static final YdbTable.TransactionSettings TS_STALE = YdbTable.TransactionSettings.newBuilder() + .setStaleReadOnly(YdbTable.StaleModeSettings.getDefaultInstance()) + .build(); + + private static final YdbTable.TransactionSettings TS_ONLINE = YdbTable.TransactionSettings.newBuilder() + .setOnlineReadOnly(YdbTable.OnlineModeSettings.newBuilder().setAllowInconsistentReads(false).build()) + .build(); + + private static final YdbTable.TransactionSettings TS_ONLINE_INCONSISTENT = YdbTable.TransactionSettings + .newBuilder() + .setOnlineReadOnly(YdbTable.OnlineModeSettings.newBuilder().setAllowInconsistentReads(true).build()) + .build(); + + private TxControlToPb() { } + + public static YdbTable.TransactionControl txModeCtrl(TxMode tx, boolean commitTx) { + YdbTable.TransactionSettings ts = txSettings(tx); + if (ts == null) { + return null; + } + return YdbTable.TransactionControl.newBuilder() + .setBeginTx(ts) + .setCommitTx(commitTx) + .build(); + } + + public static YdbTable.TransactionControl txIdCtrl(String txId, boolean commitTx) { + return YdbTable.TransactionControl.newBuilder() + .setTxId(txId) + .setCommitTx(commitTx) + .build(); + } + + public static YdbTable.TransactionSettings txSettings(TxMode tx) { + switch (tx) { + case SERIALIZABLE_RW: + return TS_SERIALIZABLE; + case SNAPSHOT_RO: + return TS_SNAPSHOT; + case STALE_RO: + return TS_STALE; + case ONLINE_RO: + return TS_ONLINE; + case ONLINE_INCONSISTENT_RO: + return TS_ONLINE_INCONSISTENT; + case NONE: + default: + return null; + } + } +} diff --git a/table/src/main/java/tech/ydb/table/transaction/TableTransaction.java b/table/src/main/java/tech/ydb/table/transaction/TableTransaction.java new file mode 100644 index 000000000..8e4dfc685 --- /dev/null +++ b/table/src/main/java/tech/ydb/table/transaction/TableTransaction.java @@ -0,0 +1,100 @@ +package tech.ydb.table.transaction; + +import java.util.concurrent.CompletableFuture; + +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.table.Session; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.settings.CommitTxSettings; +import tech.ydb.table.settings.ExecuteDataQuerySettings; +import tech.ydb.table.settings.RollbackTxSettings; + +/** + * Interface of transaction from table service + * Short-living object allows transactional execution of several queries in one interactive transaction. + * TableTransaction can be used in implicit mode - without calling commit()/rollback(). + * When TableTransaction is not active, any execution of a query with commitAtEnd=false starts a new transaction. + * And execution of a query with commitAtEnd=true commits this transaction. + * @author Nikolay Perfilov + */ +public interface TableTransaction extends YdbTransaction { + + /** + * Returns {@link Session} that was used to create this transaction + * + * @return session that was used to create this transaction + */ + Session getSession(); + + /** + * Execute DataQuery + * + * @param query text of query. Can only contain DML statements + * @param commitAtEnd true if transaction must be committed after query execution + * @param params query parameters + * @param settings additional settings of query execution + * @return a future to query result + */ + CompletableFuture> executeDataQuery( + String query, boolean commitAtEnd, Params params, ExecuteDataQuerySettings settings); + + /** + * Execute DataQuery. + * Transaction will not be committed after the execution of query. + * + * @param query text of query. Can only contain DML statements + * @return a future to query result + */ + default CompletableFuture> executeDataQuery(String query) { + return executeDataQuery(query, false, Params.empty(), new ExecuteDataQuerySettings()); + } + + /** + * Execute DataQuery. + * Transaction will not be committed after the execution of query. + * + * @param query text of query. Can only contain DML statements + * @param params query parameters + * @return a future to query result + */ + default CompletableFuture> executeDataQuery(String query, Params params) { + return executeDataQuery(query, false, params, new ExecuteDataQuerySettings()); + } + + /** + * Execute DataQuery. + * Transaction will be committed after the execution of query. + * + * @param query text of query. Can only contain DML statements + * @return a future to query result + */ + default CompletableFuture> executeDataQueryAndCommit(String query) { + return executeDataQuery(query, true, Params.empty(), new ExecuteDataQuerySettings()); + } + + /** + * Execute DataQuery. + * Transaction will be committed after the execution of query. + * + * @param query text of query. Can only contain DML statements + * @param params query parameters + * @return a future to query result + */ + default CompletableFuture> executeDataQueryAndCommit(String query, Params params) { + return executeDataQuery(query, false, params, new ExecuteDataQuerySettings()); + } + + CompletableFuture commit(CommitTxSettings settings); + CompletableFuture rollback(RollbackTxSettings settings); + + default CompletableFuture commit() { + return commit(new CommitTxSettings()); + } + + default CompletableFuture rollback() { + return rollback(new RollbackTxSettings()); + } +} diff --git a/table/src/main/java/tech/ydb/table/transaction/Transaction.java b/table/src/main/java/tech/ydb/table/transaction/Transaction.java index 74eb995cd..fd1bbe9b8 100644 --- a/table/src/main/java/tech/ydb/table/transaction/Transaction.java +++ b/table/src/main/java/tech/ydb/table/transaction/Transaction.java @@ -8,7 +8,10 @@ /** * @author Sergey Polovko + * @deprecated + * Use {@link TableTransaction} instead */ +@Deprecated public interface Transaction { enum Mode { SERIALIZABLE_READ_WRITE, diff --git a/table/src/main/java/tech/ydb/table/transaction/TxControl.java b/table/src/main/java/tech/ydb/table/transaction/TxControl.java index 9446c595f..62164055b 100644 --- a/table/src/main/java/tech/ydb/table/transaction/TxControl.java +++ b/table/src/main/java/tech/ydb/table/transaction/TxControl.java @@ -1,12 +1,14 @@ package tech.ydb.table.transaction; +import tech.ydb.common.transaction.TxMode; import tech.ydb.proto.table.YdbTable; import tech.ydb.proto.table.YdbTable.OnlineModeSettings; import tech.ydb.proto.table.YdbTable.SerializableModeSettings; import tech.ydb.proto.table.YdbTable.StaleModeSettings; import tech.ydb.proto.table.YdbTable.TransactionControl; import tech.ydb.proto.table.YdbTable.TransactionSettings; +import tech.ydb.table.settings.BeginTxSettings; /** @@ -31,10 +33,22 @@ protected TxControl(boolean commitTx, TransactionSettings settings) { .build(); } + /** + * @deprecated + * Use {@link TableTransaction} created by {@link tech.ydb.table.Session#createNewTransaction(TxMode)} + * or {@link tech.ydb.table.Session#beginTransaction(TxMode, BeginTxSettings)} to execute queries in transaction + */ + @Deprecated public static TxId id(String id) { return new TxId(true, id); } + /** + * @deprecated + * Use {@link TableTransaction} created by {@link tech.ydb.table.Session#createNewTransaction(TxMode)} + * or {@link tech.ydb.table.Session#beginTransaction(TxMode, BeginTxSettings)} to execute queries in transaction + */ + @Deprecated public static TxId id(Transaction tx) { return new TxId(true, tx.getId()); } @@ -60,7 +74,6 @@ public boolean isCommitTx() { } public abstract Self setCommitTx(boolean commitTx); - public TransactionControl toPb() { return pb; } diff --git a/table/src/test/java/tech/ydb/table/SessionStub.java b/table/src/test/java/tech/ydb/table/SessionStub.java index c6cfe9059..7fb1846a5 100644 --- a/table/src/test/java/tech/ydb/table/SessionStub.java +++ b/table/src/test/java/tech/ydb/table/SessionStub.java @@ -1,11 +1,9 @@ package tech.ydb.table; -import java.sql.ResultSet; -import java.time.Duration; -import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; @@ -37,10 +35,10 @@ import tech.ydb.table.settings.ReadRowsSettings; import tech.ydb.table.settings.ReadTableSettings; import tech.ydb.table.settings.RollbackTxSettings; +import tech.ydb.table.transaction.TableTransaction; import tech.ydb.table.transaction.Transaction; import tech.ydb.table.transaction.TxControl; import tech.ydb.table.values.ListValue; -import tech.ydb.table.values.StructValue; /** @@ -130,12 +128,23 @@ public CompletableFuture> explainDataQuery( } @Override + @Deprecated public CompletableFuture> beginTransaction( Transaction.Mode transactionMode, BeginTxSettings settings) { return notImplemented("beginTransaction()"); } + @Override + public TableTransaction createNewTransaction(TxMode txMode) { + throw new UnsupportedOperationException("createNewTransaction is not implemented"); + } + + @Override + public CompletableFuture> beginTransaction(TxMode txMode, BeginTxSettings settings) { + throw new UnsupportedOperationException("beginTransaction is not implemented"); + } + @Override public GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings) { throw new UnsupportedOperationException("executeReadTable not implemented"); diff --git a/topic/pom.xml b/topic/pom.xml index fb0613f02..f1ab0ae11 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -25,6 +25,11 @@ ydb-sdk-core + + tech.ydb + ydb-sdk-common + + org.anarres.lzo lzo-core diff --git a/topic/src/main/java/tech/ydb/topic/TopicRpc.java b/topic/src/main/java/tech/ydb/topic/TopicRpc.java index 5363e79b3..96d091338 100644 --- a/topic/src/main/java/tech/ydb/topic/TopicRpc.java +++ b/topic/src/main/java/tech/ydb/topic/TopicRpc.java @@ -56,6 +56,15 @@ CompletableFuture> describeTopic(YdbTopic.D */ CompletableFuture commitOffset(YdbTopic.CommitOffsetRequest request, GrpcRequestSettings settings); + /** + * Updates offsets in transaction. + * @param request request proto + * @param settings rpc call settings + * @return completable future with result of operation + */ + CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request, + GrpcRequestSettings settings); + GrpcReadWriteStream writeSession(); GrpcReadWriteStream readSession(); diff --git a/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java index c806e5090..32dcb4cf7 100644 --- a/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java +++ b/topic/src/main/java/tech/ydb/topic/description/OffsetsRange.java @@ -3,20 +3,8 @@ /** * @author Nikolay Perfilov */ -public class OffsetsRange { - private final long start; - private final long end; +public interface OffsetsRange { + long getStart(); - public OffsetsRange(long start, long end) { - this.start = start; - this.end = end; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } + long getEnd(); } diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java index 7ae6ad178..1d4eaa415 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java @@ -70,6 +70,15 @@ public CompletableFuture commitOffset(YdbTopic.CommitOffsetRequest reque .thenApply(OperationManager.syncStatusUnwrapper(YdbTopic.CommitOffsetResponse::getOperation)); } + @Override + public CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request, + GrpcRequestSettings settings) { + return transport + .unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request) + .thenApply(OperationManager.syncStatusUnwrapper( + YdbTopic.UpdateOffsetsInTransactionResponse::getOperation)); + } + @Override public GrpcReadWriteStream< YdbTopic.StreamWriteMessage.FromServer, diff --git a/topic/src/main/java/tech/ydb/topic/impl/Session.java b/topic/src/main/java/tech/ydb/topic/impl/Session.java index 8fc3b07c4..f9745d173 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/Session.java +++ b/topic/src/main/java/tech/ydb/topic/impl/Session.java @@ -5,5 +5,5 @@ */ public interface Session { void startAndInitialize(); - void shutdown(); + boolean shutdown(); } diff --git a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java index 4218cd1cf..5c73d3277 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java +++ b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java @@ -76,11 +76,13 @@ private boolean stop() { @Override - public synchronized void shutdown() { + public synchronized boolean shutdown() { getLogger().info("Session shutdown"); if (stop()) { onStop(); streamConnection.close(); + return true; } + return false; } } diff --git a/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java b/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java index 2be8b7e17..c8ad9b3a9 100644 --- a/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java +++ b/topic/src/main/java/tech/ydb/topic/read/AsyncReader.java @@ -1,9 +1,17 @@ package tech.ydb.topic.read; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import io.grpc.ExperimentalApi; +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.core.Status; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings; + /** * @author Nikolay Perfilov */ @@ -19,4 +27,41 @@ public interface AsyncReader { * Stops internal threads and makes cleanup in background. Non-blocking */ CompletableFuture shutdown(); + + /** + * Add offsets to transaction. Offsets could be from several topics. + * These offsets are "committed" only after said transaction is successfully committed. + * It is a separate request sent outside the reading stream. + * + * @param transaction a {@link YdbTransaction} that offsets should be added to. + * Transaction has to be active. + * @param offsets Offsets that should be added to transaction. + * Map key: topic Path + * Map value: List of Partition ranges for every partition in this topic to add + * @param settings Operation settings. + * @return {@link CompletableFuture} to operation status + */ + CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction, + Map> offsets, + UpdateOffsetsInTransactionSettings settings); + + /** + * Add offsets of a single partition session to transaction.Offsets could be from several topics. + * These offsets are "committed" only after said transaction is successfully committed. + * It is a separate request sent outside the reading stream. + * + * @param transaction a {@link YdbTransaction} that offsets should be added to. + * Transaction has to be active. + * @param offsets Offsets that should be added to transaction. + * Could be constructed with helper methods {@link Message#getPartitionOffsets()} + * or {@link DataReceivedEvent#getPartitionOffsets()} + * @param settings Operation settings. + * @return {@link CompletableFuture} to operation status + */ + default CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction, PartitionOffsets offsets, + UpdateOffsetsInTransactionSettings settings) { + return updateOffsetsInTransaction(transaction, + Collections.singletonMap(offsets.getPartitionSession().getPath(), Collections.singletonList(offsets)), + settings); + } } diff --git a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java index 51172141d..011b28031 100644 --- a/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java +++ b/topic/src/main/java/tech/ydb/topic/read/DeferredCommitter.java @@ -12,7 +12,7 @@ * * @author Nikolay Perfilov */ -public interface DeferredCommitter { +public interface DeferredCommitter extends MessageAccumulator { /** * Creates a new instance of {@link DeferredCommitter} * @@ -22,20 +22,6 @@ static DeferredCommitter newInstance() { return new DeferredCommitterImpl(); } - /** - * Adds a {@link Message} to commit it later with a commit method - * - * @param message a {@link Message} to commit later - */ - void add(Message message); - - /** - * Adds a {@link DataReceivedEvent} to commit all its messages later with a commit method - * - * @param event a {@link DataReceivedEvent} to commit later - */ - void add(DataReceivedEvent event); - /** * Commits offset ranges from all {@link Message}s and {@link DataReceivedEvent}s * that were added to this DeferredCommitter since last commit diff --git a/topic/src/main/java/tech/ydb/topic/read/Message.java b/topic/src/main/java/tech/ydb/topic/read/Message.java index 80be5bcc2..8627ebeab 100644 --- a/topic/src/main/java/tech/ydb/topic/read/Message.java +++ b/topic/src/main/java/tech/ydb/topic/read/Message.java @@ -65,6 +65,11 @@ public interface Message { */ PartitionSession getPartitionSession(); + /** + * @return Partition offsets of this message + */ + PartitionOffsets getPartitionOffsets(); + /** * Commits this message * If there was an error while committing, there is no point of retrying committing the same message: diff --git a/topic/src/main/java/tech/ydb/topic/read/MessageAccumulator.java b/topic/src/main/java/tech/ydb/topic/read/MessageAccumulator.java new file mode 100644 index 000000000..8fdec247d --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/MessageAccumulator.java @@ -0,0 +1,26 @@ +package tech.ydb.topic.read; + +import tech.ydb.topic.read.events.DataReceivedEvent; + +/** + * A common interface that is used to accumulate several {@link Message}s or/and + * {@link tech.ydb.topic.read.events.DataReceivedEvent}s to commit later all at once or to add to transaction. + * + * @author Nikolay Perfilov + */ +public interface MessageAccumulator { + + /** + * Adds a {@link Message} to commit it later or to add to transaction + * + * @param message a {@link Message} + */ + void add(Message message); + + /** + * Adds a {@link DataReceivedEvent} to commit all its messages later or to add to transaction + * + * @param event a {@link DataReceivedEvent} + */ + void add(DataReceivedEvent event); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java b/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java deleted file mode 100644 index 0045e644c..000000000 --- a/topic/src/main/java/tech/ydb/topic/read/OffsetsRange.java +++ /dev/null @@ -1,10 +0,0 @@ -package tech.ydb.topic.read; - -/** - * @author Nikolay Perfilov - */ -public interface OffsetsRange { - long getStart(); - - long getEnd(); -} diff --git a/topic/src/main/java/tech/ydb/topic/read/PartitionOffsets.java b/topic/src/main/java/tech/ydb/topic/read/PartitionOffsets.java new file mode 100644 index 000000000..31d273d5c --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/PartitionOffsets.java @@ -0,0 +1,27 @@ +package tech.ydb.topic.read; + +import java.util.List; + +import tech.ydb.topic.description.OffsetsRange; + +/** + * @author Nikolay Perfilov + */ +public class PartitionOffsets { + private final PartitionSession partitionSession; + private final List offsets; + + public PartitionOffsets(PartitionSession partitionSession, List offsets) { + this.partitionSession = partitionSession; + this.offsets = offsets; + } + + public PartitionSession getPartitionSession() { + return partitionSession; + } + + public List getOffsets() { + return offsets; + } + +} diff --git a/topic/src/main/java/tech/ydb/topic/read/SyncReader.java b/topic/src/main/java/tech/ydb/topic/read/SyncReader.java index e31ddc7a5..940fe9b8b 100644 --- a/topic/src/main/java/tech/ydb/topic/read/SyncReader.java +++ b/topic/src/main/java/tech/ydb/topic/read/SyncReader.java @@ -6,6 +6,8 @@ import io.grpc.ExperimentalApi; +import tech.ydb.topic.settings.ReceiveSettings; + /** * @author Nikolay Perfilov */ @@ -24,21 +26,34 @@ public interface SyncReader { /** * Receive a {@link Message}. Blocks until a Message is received. - * Throws {@link java.util.concurrent.TimeoutException} if timeout runs off + * + * @param settings settings for receiving a Message + * @return returns a {@link Message}, or null if the specified timeout time elapses before a message is available + */ + Message receive(ReceiveSettings settings) throws InterruptedException; + + /** + * Receive a {@link Message}. Blocks until a Message is received. * * @param timeout timeout to wait a Message with * @param unit TimeUnit for timeout * @return returns a {@link Message}, or null if the specified waiting time elapses before a message is available */ @Nullable - Message receive(long timeout, TimeUnit unit) throws InterruptedException; + default Message receive(long timeout, TimeUnit unit) throws InterruptedException { + return receive(ReceiveSettings.newBuilder() + .setTimeout(timeout, unit) + .build()); + } /** * Receive a {@link Message}. Blocks until a Message is received. * * @return {@link Message} */ - Message receive() throws InterruptedException; + default Message receive() throws InterruptedException { + return receive(ReceiveSettings.newBuilder().build()); + } /** * Stops internal threads and makes cleanup in background. Blocking diff --git a/topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java b/topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java new file mode 100644 index 000000000..d6b4d0bfb --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/TransactionMessageAccumulator.java @@ -0,0 +1,22 @@ +package tech.ydb.topic.read; + +import java.util.concurrent.CompletableFuture; + +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.core.Status; +import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings; + +/** + * A helper class that is used to accumulate messages and add them to {@link YdbTransaction}. + * Several {@link Message}s or/and {@link tech.ydb.topic.read.events.DataReceivedEvent}s can be accepted to add to + * {@link YdbTransaction} later all at once. + * All messages should be read from the same Reader this accumulator was created on. + * Contains no data references and therefore may also be useful in cases where messages are committed after processing + * data in an external system. + * + * @author Nikolay Perfilov + */ +public interface TransactionMessageAccumulator extends MessageAccumulator { + CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction, + UpdateOffsetsInTransactionSettings settings); +} diff --git a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java index 416c1481e..896484510 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/DataReceivedEvent.java @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture; import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.PartitionOffsets; import tech.ydb.topic.read.PartitionSession; /** @@ -26,6 +27,13 @@ public interface DataReceivedEvent { */ PartitionSession getPartitionSession(); + /** + * Returns partition offsets of this message + * + * @return Partition offsets of this message + */ + PartitionOffsets getPartitionOffsets(); + /** * Commits all messages in this event at once. * If there was an error while committing, there is no point of retrying committing the same messages: diff --git a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java index aa64e60de..b95a84df9 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/StartPartitionSessionEvent.java @@ -1,6 +1,6 @@ package tech.ydb.topic.read.events; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.settings.StartPartitionSessionSettings; diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 92aa68ac7..fe1c2e0d3 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -1,5 +1,7 @@ package tech.ydb.topic.read.impl; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -11,9 +13,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.core.Status; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.read.AsyncReader; +import tech.ydb.topic.read.PartitionOffsets; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.CommitOffsetAcknowledgementEvent; import tech.ydb.topic.read.events.DataReceivedEvent; @@ -29,6 +34,7 @@ import tech.ydb.topic.settings.ReadEventHandlersSettings; import tech.ydb.topic.settings.ReaderSettings; import tech.ydb.topic.settings.StartPartitionSessionSettings; +import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings; /** * @author Nikolay Perfilov @@ -61,6 +67,17 @@ public CompletableFuture init() { return initImpl(); } + @Override + public CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction, + Map> offsets, + UpdateOffsetsInTransactionSettings settings) { + if (!transaction.isActive()) { + throw new IllegalArgumentException("Transaction is not active. " + + "Can only read topic messages in already running transactions from other services"); + } + return sendUpdateOffsetsInTransaction(transaction, offsets, settings); + } + @Override protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) { return CompletableFuture.runAsync(() -> { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java index 107657a2c..512ee8701 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java @@ -5,7 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.description.OffsetsRange; /** * @author Nikolay Perfilov diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java index fb3457038..6abf30475 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -7,9 +7,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.DeferredCommitter; import tech.ydb.topic.read.Message; -import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java index 41ec47ec7..33130d321 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DisjointOffsetRangeSet.java @@ -7,7 +7,7 @@ import java.util.NavigableMap; import java.util.TreeMap; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.description.OffsetsRange; /** * @author Nikolay Perfilov diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java index 3152046e0..2c90709cf 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java @@ -2,14 +2,16 @@ import java.io.IOException; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import tech.ydb.topic.description.MetadataItem; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.DecompressionException; import tech.ydb.topic.read.Message; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.PartitionOffsets; import tech.ydb.topic.read.PartitionSession; /** @@ -114,6 +116,11 @@ public List getMetadataItems() { return metadataItems; } + @Override + public PartitionOffsets getPartitionOffsets() { + return new PartitionOffsets(partitionSession.getSessionInfo(), Collections.singletonList(offsetsToCommit)); + } + public void setDecompressed(boolean decompressed) { isDecompressed = decompressed; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java index 5b4661b72..6cf1165ca 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/OffsetsRangeImpl.java @@ -1,6 +1,6 @@ package tech.ydb.topic.read.impl; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.description.OffsetsRange; /** * @author Nikolay Perfilov diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 1417e1222..27548ff90 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -23,8 +23,8 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.Codec; import tech.ydb.topic.description.MetadataItem; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.Message; -import tech.ydb.topic.read.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; @@ -40,7 +40,6 @@ public class PartitionSessionImpl { private final String path; private final long partitionId; private final PartitionSession sessionInfo; - private final OffsetsRange partitionOffsets; private final Executor decompressionExecutor; private final AtomicBoolean isWorking = new AtomicBoolean(true); @@ -61,13 +60,12 @@ private PartitionSessionImpl(Builder builder) { this.sessionInfo = new PartitionSession(id, partitionId, path); this.lastReadOffset = builder.committedOffset; this.lastCommittedOffset = builder.committedOffset; - this.partitionOffsets = builder.partitionOffsets; this.decompressionExecutor = builder.decompressionExecutor; this.dataEventCallback = builder.dataEventCallback; this.commitFunction = builder.commitFunction; logger.info("[{}] Partition session {} (partition {}) is started. CommittedOffset: {}. " + - "Partition offsets: {}-{}", path, id, partitionId, lastReadOffset, partitionOffsets.getStart(), - partitionOffsets.getEnd()); + "Partition offsets: {}-{}", path, id, partitionId, lastReadOffset, builder.partitionOffsets.getStart(), + builder.partitionOffsets.getEnd()); } public static Builder newBuilder() { @@ -222,7 +220,7 @@ public void commitOffsetRanges(List rangesToCommit) { .append("] Sending CommitRequest for partition session ").append(id) .append(" (partition ").append(partitionId).append(") with offset ranges "); addRangesToString(message, rangesToCommit); - logger.info(message.toString()); + logger.debug(message.toString()); } commitFunction.accept(rangesToCommit); } else if (logger.isInfoEnabled()) { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 2fd65cf47..94d65ae8f 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -17,20 +17,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.transaction.YdbTransaction; import tech.ydb.core.Issue; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.StatusCodesProtos; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.impl.GrpcStreamRetrier; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.PartitionOffsets; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.settings.ReaderSettings; import tech.ydb.topic.settings.StartPartitionSessionSettings; import tech.ydb.topic.settings.TopicReadSettings; +import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings; /** * @author Nikolay Perfilov @@ -130,7 +135,83 @@ protected void onShutdown(String reason) { } } - private class ReadSessionImpl extends ReadSession { + private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { + return GrpcRequestSettings.newBuilder() + .withDeadline(settings.getRequestTimeout()) + .build(); + } + + protected CompletableFuture sendUpdateOffsetsInTransaction(YdbTransaction transaction, + Map> offsets, + UpdateOffsetsInTransactionSettings settings) { + if (offsets.isEmpty()) { + throw new IllegalArgumentException("Empty topic list to update in transaction"); + } + if (logger.isDebugEnabled()) { + StringBuilder str = new StringBuilder("Updating "); + boolean first = true; + for (Map.Entry> topicOffsets : offsets.entrySet()) { + if (topicOffsets.getValue().isEmpty()) { + throw new IllegalArgumentException("Empty offsets range to update in transaction"); + } + for (PartitionOffsets partitionOffsets : topicOffsets.getValue()) { + if (!first) { + str.append(", "); + } else { + first = false; + } + str.append("offsets [").append(partitionOffsets.getOffsets().get(0).getStart()).append("..") + .append(partitionOffsets.getOffsets().get(partitionOffsets.getOffsets().size() - 1) + .getEnd()).append(") for partition ") + .append(partitionOffsets.getPartitionSession().getPartitionId()) + .append(" [topic ").append(topicOffsets.getKey()).append("]"); + } + } + logger.debug(str.toString()); + } + final ReadSessionImpl currentSession = session; + transaction.getStatusFuture().whenComplete((status, error) -> { + if (error != null) { + currentSession.closeDueToError(null, + new RuntimeException("Restarting read session due to transaction " + transaction.getId() + + " with partition offsets from read session " + currentSession.fullId + + " was not committed with reason: " + error)); + } else if (!status.isSuccess()) { + currentSession.closeDueToError(null, + new RuntimeException("Restarting read session due to transaction " + transaction.getId() + + " with partition offsets from read session " + currentSession.fullId + + " was not committed with status: " + status)); + } + }); + YdbTopic.UpdateOffsetsInTransactionRequest.Builder requestBuilder = YdbTopic.UpdateOffsetsInTransactionRequest + .newBuilder() + .setTx(YdbTopic.TransactionIdentity.newBuilder() + .setId(transaction.getId()) + .setSession(transaction.getSessionId())) + .setConsumer(this.settings.getConsumerName()); + offsets.forEach((path, topicOffsets) -> { + YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.Builder topicOffsetsBuilder = YdbTopic + .UpdateOffsetsInTransactionRequest.TopicOffsets.newBuilder() + .setPath(path); + topicOffsets.forEach(partitionOffsets -> { + YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.Builder partitionOffsetsBuilder + = YdbTopic.UpdateOffsetsInTransactionRequest.TopicOffsets.PartitionOffsets.newBuilder() + .setPartitionId(partitionOffsets.getPartitionSession().getPartitionId()); + partitionOffsets.getOffsets().forEach(offsetsRange -> partitionOffsetsBuilder.addPartitionOffsets( + YdbTopic.OffsetsRange.newBuilder() + .setStart(offsetsRange.getStart()) + .setEnd(offsetsRange.getEnd()) + .build())); + topicOffsetsBuilder.addPartitions(partitionOffsetsBuilder); + }); + requestBuilder.addTopics(topicOffsetsBuilder); + }); + + final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings); + return topicRpc.updateOffsetsInTransaction(requestBuilder.build(), grpcRequestSettings); + } + + protected class ReadSessionImpl extends ReadSession { protected String sessionId = ""; private final String fullId; // Total size to request with next ReadRequest. @@ -144,7 +225,7 @@ private ReadSessionImpl() { public void startAndInitialize() { logger.debug("[{}] Session {} startAndInitialize called", fullId, sessionId); - start(this::processMessage).whenComplete(this::onSessionClosing); + start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamReadMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamReadMessage.InitRequest .newBuilder(); @@ -437,7 +518,7 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { reconnectCounter.set(0); } else { logger.warn("[{}] Got non-success status in processMessage method: {}", fullId, message); - onSessionClosed(Status.of(StatusCode.fromProto(message.getStatus())) + closeDueToError(Status.of(StatusCode.fromProto(message.getStatus())) .withIssues(Issue.of("Got a message with non-success status: " + message, Issue.Severity.ERROR)), null); return; @@ -462,10 +543,9 @@ private void processMessage(YdbTopic.StreamReadMessage.FromServer message) { } } - private void onSessionClosing(Status status, Throwable th) { - logger.info("[{}] Session {} onSessionClosing called", fullId, sessionId); - if (isWorking.get()) { - shutdown(); + protected void closeDueToError(Status status, Throwable th) { + logger.info("[{}] Session {} closeDueToError called", fullId, sessionId); + if (shutdown()) { // Signal reader to retry onSessionClosed(status, th); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index 200190716..75b441e86 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -14,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.core.Status; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.read.Message; @@ -21,7 +23,9 @@ import tech.ydb.topic.read.SyncReader; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.settings.ReaderSettings; +import tech.ydb.topic.settings.ReceiveSettings; import tech.ydb.topic.settings.StartPartitionSessionSettings; +import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings; /** * @author Nikolay Perfilov @@ -56,9 +60,9 @@ public void initAndWait() { initImpl().join(); } - @Override @Nullable - public Message receive(long timeout, TimeUnit unit) throws InterruptedException { + public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, TimeUnit unit) + throws InterruptedException { if (isStopped.get()) { throw new RuntimeException("Reader was stopped"); } @@ -96,16 +100,31 @@ public Message receive(long timeout, TimeUnit unit) throws InterruptedException currentMessageIndex = 0; currentBatch.future.complete(null); } + if (receiveSettings.getTransaction() != null) { + Status updateStatus = sendUpdateOffsetsInTransaction(receiveSettings.getTransaction(), + Collections.singletonMap(result.getPartitionSession().getPath(), + Collections.singletonList(result.getPartitionOffsets())), + UpdateOffsetsInTransactionSettings.newBuilder().build()) + .join(); + if (!updateStatus.isSuccess()) { + throw new RuntimeException("Couldn't add message offset " + result.getOffset() + " to transaction " + + receiveSettings.getTransaction().getId() + ": " + updateStatus); + } + } return result; } } @Override - public Message receive() throws InterruptedException { + public Message receive(ReceiveSettings receiveSettings) throws InterruptedException { + if (receiveSettings.getTimeout() != null) { + return receiveInternal(receiveSettings, receiveSettings.getTimeout(), receiveSettings.getTimeoutTimeUnit()); + } + Message result; // Poll to prevent infinite wait in case if reader was stopped do { - result = receive(POLL_INTERVAL_SECONDS, TimeUnit.SECONDS); + result = receiveInternal(receiveSettings, POLL_INTERVAL_SECONDS, TimeUnit.SECONDS); } while (result == null); return result; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java new file mode 100644 index 000000000..d8096a7ab --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java @@ -0,0 +1,98 @@ +package tech.ydb.topic.read.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.core.Status; +import tech.ydb.topic.description.OffsetsRange; +import tech.ydb.topic.read.AsyncReader; +import tech.ydb.topic.read.Message; +import tech.ydb.topic.read.PartitionOffsets; +import tech.ydb.topic.read.PartitionSession; +import tech.ydb.topic.read.TransactionMessageAccumulator; +import tech.ydb.topic.read.events.DataReceivedEvent; +import tech.ydb.topic.read.impl.events.DataReceivedEventImpl; +import tech.ydb.topic.settings.UpdateOffsetsInTransactionSettings; + +/** + * @author Nikolay Perfilov + */ +public class TransactionMessageAccumulatorImpl implements TransactionMessageAccumulator { + private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class); + + private final AsyncReader reader; + private final Map> rangesByTopic = new ConcurrentHashMap<>(); + + private static class PartitionRanges { + private final PartitionSession partitionSession; + private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + + private PartitionRanges(PartitionSession partitionSession) { + this.partitionSession = partitionSession; + } + + private void add(OffsetsRange offsetRange) { + try { + synchronized (ranges) { + ranges.add(offsetRange); + } + } catch (RuntimeException exception) { + String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + + partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " + + exception.getMessage(); + logger.error(errorMessage); + throw new RuntimeException(errorMessage, exception); + } + } + + private List getOffsetsRanges() { + synchronized (ranges) { + return ranges.getRangesAndClear(); + } + } + } + + TransactionMessageAccumulatorImpl(AsyncReader reader) { + this.reader = reader; + } + + @Override + public void add(Message message) { + MessageImpl messageImpl = (MessageImpl) message; + PartitionRanges partitionRanges = rangesByTopic + .computeIfAbsent(message.getPartitionSession().getPath(), path -> new ConcurrentHashMap<>()) + .computeIfAbsent(message.getPartitionSession(), PartitionRanges::new); + partitionRanges.add(messageImpl.getOffsetsToCommit()); + } + + @Override + public void add(DataReceivedEvent event) { + DataReceivedEventImpl eventImpl = (DataReceivedEventImpl) event; + PartitionRanges partitionRanges = rangesByTopic + .computeIfAbsent(event.getPartitionSession().getPath(), path -> new ConcurrentHashMap<>()) + .computeIfAbsent(event.getPartitionSession(), PartitionRanges::new); + partitionRanges.add(eventImpl.getOffsetsToCommit()); + } + + @Override + public CompletableFuture updateOffsetsInTransaction(YdbTransaction transaction, + UpdateOffsetsInTransactionSettings settings) { + Map> offsets = new HashMap<>(); + rangesByTopic.forEach((path, topicRanges) -> { + offsets.put(path, topicRanges.entrySet().stream() + .map(partitionRange -> + new PartitionOffsets(partitionRange.getKey(), partitionRange.getValue().getOffsetsRanges())) + .collect(Collectors.toList())); + }); + return reader.updateOffsetsInTransaction(transaction, offsets, settings); + } + +} diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java index b65789434..e1e0a1371 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/DataReceivedEventImpl.java @@ -1,10 +1,12 @@ package tech.ydb.topic.read.impl.events; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.Message; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.read.PartitionOffsets; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.DataReceivedEvent; import tech.ydb.topic.read.impl.CommitterImpl; @@ -32,6 +34,11 @@ public List getMessages() { return messages; } + @Override + public PartitionOffsets getPartitionOffsets() { + return new PartitionOffsets(partitionSession.getSessionInfo(), Collections.singletonList(offsetsToCommit)); + } + @Override public PartitionSession getPartitionSession() { return partitionSession.getSessionInfo(); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java index 9fa4632e7..200329ae3 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/StartPartitionSessionEventImpl.java @@ -2,7 +2,7 @@ import java.util.function.Consumer; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.events.StartPartitionSessionEvent; import tech.ydb.topic.settings.StartPartitionSessionSettings; diff --git a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java index fbc141a06..1043f0f29 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java @@ -2,6 +2,9 @@ import tech.ydb.core.settings.OperationSettings; +/** + * @author Nikolay Perfilov + */ public class CommitOffsetSettings extends OperationSettings { private final long partitionId; private final String consumer; diff --git a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java index 89c993415..28333bce6 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java @@ -2,6 +2,9 @@ import tech.ydb.core.settings.OperationSettings; +/** + * @author Nikolay Perfilov + */ public class DescribeTopicSettings extends OperationSettings { /* TODO: renew api and add stats private boolean includeStats = false; diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java new file mode 100644 index 000000000..ed16829f9 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/settings/ReceiveSettings.java @@ -0,0 +1,81 @@ +package tech.ydb.topic.settings; + +import java.util.concurrent.TimeUnit; + +import tech.ydb.common.transaction.YdbTransaction; + +/** + * @author Nikolay Perfilov + */ +public class ReceiveSettings { + private Long timeout; + private TimeUnit timeoutTimeUnit; + private final YdbTransaction transaction; + + private ReceiveSettings(Builder builder) { + this.timeout = builder.timeout; + this.timeoutTimeUnit = builder.timeoutTimeUnit; + this.transaction = builder.transaction; + } + + public Long getTimeout() { + return timeout; + } + + public TimeUnit getTimeoutTimeUnit() { + return timeoutTimeUnit; + } + + public YdbTransaction getTransaction() { + return transaction; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * BUILDER + */ + public static class Builder { + private Long timeout; + private TimeUnit timeoutTimeUnit; + private YdbTransaction transaction; + + /** + * Set timeout for receiving a message. + * + * @param timeout timeout for receiving a message + * @param unit {@link TimeUnit} for timeout + * @return Builder + */ + public Builder setTimeout(long timeout, TimeUnit unit) { + this.timeout = timeout; + this.timeoutTimeUnit = unit; + return this; + } + + /** + * Set transaction for receiving message. + * When this transaction is committed, the message will be considered by server as read (committed) + * If this transaction is rolled back, the reader will restart reading stream internally + * + * @param transaction Transaction to link a message with. + * Transaction has to be active + * @return Builder + */ + public Builder setTransaction(YdbTransaction transaction) { + if (!transaction.isActive()) { + throw new IllegalArgumentException("Transaction is not active. " + + "Can only write topic messages in already running transactions from other services"); + } + this.transaction = transaction; + return this; + } + + public ReceiveSettings build() { + return new ReceiveSettings(this); + } + + } +} diff --git a/topic/src/main/java/tech/ydb/topic/settings/SendSettings.java b/topic/src/main/java/tech/ydb/topic/settings/SendSettings.java new file mode 100644 index 000000000..50763658f --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/settings/SendSettings.java @@ -0,0 +1,52 @@ +package tech.ydb.topic.settings; + +import tech.ydb.common.transaction.YdbTransaction; + +/** + * @author Nikolay Perfilov + */ +public class SendSettings { + private final YdbTransaction transaction; + + private SendSettings(Builder builder) { + this.transaction = builder.transaction; + } + + public YdbTransaction getTransaction() { + return transaction; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * BUILDER + */ + public static class Builder { + private YdbTransaction transaction; + + /** + * Set transaction for sending message. + * When this transaction is committed, the message will be considered by server as written + * If this transaction is rolled back, the writer will be shut down + * + * @param transaction Transaction to link a message with. + * Transaction has to be active + * @return Builder + */ + public Builder setTransaction(YdbTransaction transaction) { + if (!transaction.isActive()) { + throw new IllegalArgumentException("Transaction is not active. " + + "Can only write topic messages in already running transactions from other services"); + } + this.transaction = transaction; + return this; + } + + public SendSettings build() { + return new SendSettings(this); + } + + } +} diff --git a/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java b/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java index b61b40ccb..18b4db578 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/StartPartitionSessionSettings.java @@ -1,5 +1,8 @@ package tech.ydb.topic.settings; +/** + * @author Nikolay Perfilov + */ public class StartPartitionSessionSettings { private final Long readOffset; private final Long commitOffset; diff --git a/topic/src/main/java/tech/ydb/topic/settings/UpdateOffsetsInTransactionSettings.java b/topic/src/main/java/tech/ydb/topic/settings/UpdateOffsetsInTransactionSettings.java new file mode 100644 index 000000000..3bef7b273 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/settings/UpdateOffsetsInTransactionSettings.java @@ -0,0 +1,23 @@ +package tech.ydb.topic.settings; + +import tech.ydb.core.settings.OperationSettings; + +/** + * @author Nikolay Perfilov + */ +public class UpdateOffsetsInTransactionSettings extends OperationSettings { + private UpdateOffsetsInTransactionSettings(Builder builder) { + super(builder); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends OperationBuilder { + @Override + public UpdateOffsetsInTransactionSettings build() { + return new UpdateOffsetsInTransactionSettings(this); + } + } +} diff --git a/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java b/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java index 1cf77f2ae..e7297e1e5 100644 --- a/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java +++ b/topic/src/main/java/tech/ydb/topic/write/AsyncWriter.java @@ -4,6 +4,8 @@ import io.grpc.ExperimentalApi; +import tech.ydb.topic.settings.SendSettings; + /** * @author Nikolay Perfilov */ @@ -23,6 +25,14 @@ public interface AsyncWriter { */ CompletableFuture send(Message message) throws QueueOverflowException; + /** + * Send message. Non-blocking + * @param message message data to write + * @param settings send settings + * @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement + */ + CompletableFuture send(Message message, SendSettings settings) throws QueueOverflowException; + /** * Stops internal threads and makes cleanup in background. Non-blocking */ diff --git a/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java b/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java index 95d741155..a677da58b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java +++ b/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java @@ -6,6 +6,8 @@ import io.grpc.ExperimentalApi; +import tech.ydb.topic.settings.SendSettings; + /** * @author Nikolay Perfilov @@ -27,20 +29,43 @@ public interface SyncWriter { /** * Send message. Blocks infinitely until the message is put into sending buffer. * @param message message data to write + * @param settings send settings */ - void send(Message message); + void send(Message message, SendSettings settings); /** * Send message. Blocks until the message is put into sending buffer. * If in-flight or memory usage limits is reached, waits until timeout expires and then * throws {@link TimeoutException} if message was not put into queue * @param message message data to write + * @param settings send settings * @param timeout timeout to wait until message is punt into sending buffer * @param unit {@link TimeUnit} for timeout */ - void send(Message message, long timeout, TimeUnit unit) + void send(Message message, SendSettings settings, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; + /** + * Send message. Blocks infinitely until the message is put into sending buffer. + * @param message message data to write + */ + default void send(Message message) { + send(message, SendSettings.newBuilder().build()); + } + + /** + * Send message. Blocks until the message is put into sending buffer. + * If in-flight or memory usage limits is reached, waits until timeout expires and then + * throws {@link TimeoutException} if message was not put into queue + * @param message message data to write + * @param timeout timeout to wait until message is punt into sending buffer + * @param unit {@link TimeUnit} for timeout + */ + default void send(Message message, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + send(message, SendSettings.newBuilder().build(), timeout, unit); + } + /** * Waits until all current writes will be sent to server and response will be received. Blocking */ diff --git a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java index e611d5bf5..c152e0b8c 100644 --- a/topic/src/main/java/tech/ydb/topic/write/WriteAck.java +++ b/topic/src/main/java/tech/ydb/topic/write/WriteAck.java @@ -1,7 +1,5 @@ package tech.ydb.topic.write; -import javax.annotation.Nullable; - /** * @author Nikolay Perfilov */ @@ -41,7 +39,10 @@ public State getState() { return state; } - @Nullable + /** + * Get details about written offsets + * @return {@link Details} with written offsets if state is {@link State#WRITTEN} or null otherwise + */ public Details getDetails() { return details; } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java index 169a7450a..0c21ded55 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java @@ -5,6 +5,7 @@ import java.util.concurrent.Executor; import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.AsyncWriter; import tech.ydb.topic.write.InitResult; @@ -27,9 +28,9 @@ public CompletableFuture init() { } @Override - public CompletableFuture send(Message message) throws QueueOverflowException { + public CompletableFuture send(Message message, SendSettings settings) throws QueueOverflowException { try { - return sendImpl(message, true).join(); + return sendImpl(message, settings, true).join(); } catch (CompletionException e) { if (e.getCause() instanceof QueueOverflowException) { throw (QueueOverflowException) e.getCause(); @@ -39,6 +40,11 @@ public CompletableFuture send(Message message) throws QueueOverflowExc } } + @Override + public CompletableFuture send(Message message) throws QueueOverflowException { + return send(message, null); + } + @Override public CompletableFuture shutdown() { return shutdownImpl(); diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java index 30e9712ea..c2c372a49 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java @@ -3,6 +3,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import tech.ydb.common.transaction.YdbTransaction; +import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.write.Message; import tech.ydb.topic.write.WriteAck; @@ -12,12 +14,14 @@ public class EnqueuedMessage { private final AtomicBoolean isCompressed = new AtomicBoolean(); private final AtomicBoolean isProcessingFailed = new AtomicBoolean(); private final long uncompressedSizeBytes; + private final YdbTransaction transaction; private long compressedSizeBytes; private Long seqNo; - public EnqueuedMessage(Message message) { + public EnqueuedMessage(Message message, SendSettings sendSettings) { this.message = message; this.uncompressedSizeBytes = message.getData().length; + this.transaction = sendSettings != null ? sendSettings.getTransaction() : null; } public Message getMessage() { @@ -67,4 +71,8 @@ public Long getSeqNo() { public void setSeqNo(long seqNo) { this.seqNo = seqNo; } + + public YdbTransaction getTransaction() { + return transaction; + } } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java b/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java index b19197307..c1ad47630 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import tech.ydb.common.transaction.YdbTransaction; import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.MetadataItem; @@ -34,6 +35,7 @@ public class MessageSender { private long totalMessageDataProtoSize; private YdbTopic.StreamWriteMessage.WriteRequest.Builder writeRequestBuilder; private int messageCount; + private YdbTransaction currentTransaction; public MessageSender(WriterSettings settings) { this.settings = settings; @@ -94,6 +96,11 @@ public void addMessage(YdbTopic.StreamWriteMessage.WriteRequest.MessageData mess } public void sendWriteRequest() { + if (currentTransaction != null) { + writeRequestBuilder.setTx(YdbTopic.TransactionIdentity.newBuilder() + .setId(currentTransaction.getId()) + .setSession(currentTransaction.getSessionId())); + } YdbTopic.StreamWriteMessage.FromClient fromClient = YdbTopic.StreamWriteMessage.FromClient.newBuilder() .setWriteRequest(writeRequestBuilder) .build(); @@ -133,6 +140,12 @@ public void sendWriteRequest() { } public void tryAddMessageToRequest(EnqueuedMessage message) { + if (message.getTransaction() != currentTransaction) { + if (messageCount > 0) { + sendWriteRequest(); + } + currentTransaction = message.getTransaction(); + } long messageSeqNo = message.getSeqNo() == null ? (message.getMessage().getSeqNo() == null ? ++seqNo : message.getMessage().getSeqNo()) : message.getSeqNo(); @@ -140,8 +153,6 @@ public void tryAddMessageToRequest(EnqueuedMessage message) { message.setSeqNo(messageSeqNo); } - - YdbTopic.StreamWriteMessage.WriteRequest.MessageData.Builder messageDataBuilder = YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder() .setSeqNo(messageSeqNo) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java index 45f8e668b..31de1ca2b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java @@ -6,6 +6,7 @@ import java.util.concurrent.TimeoutException; import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.InitResult; import tech.ydb.topic.write.Message; @@ -32,14 +33,14 @@ public InitResult initAndWait() { } @Override - public void send(Message message) { - sendImpl(message, false).join(); + public void send(Message message, SendSettings sendSettings) { + sendImpl(message, sendSettings, false).join(); } @Override - public void send(Message message, long timeout, TimeUnit unit) + public void send(Message message, SendSettings sendSettings, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - sendImpl(message, false).get(timeout, unit); + sendImpl(message, sendSettings, false).get(timeout, unit); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 6438f3e76..918faf07b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -23,6 +23,7 @@ import tech.ydb.topic.TopicRpc; import tech.ydb.topic.description.Codec; import tech.ydb.topic.impl.GrpcStreamRetrier; +import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.utils.Encoder; import tech.ydb.topic.write.InitResult; @@ -229,7 +230,8 @@ protected CompletableFuture initImpl() { // Outer future completes when message is put (or declined) into send buffer // Inner future completes on receiving write ack from server - protected CompletableFuture> sendImpl(Message message, boolean instant) { + protected CompletableFuture> sendImpl(Message message, SendSettings sendSettings, + boolean instant) { if (isStopped.get()) { throw new RuntimeException("Writer is already stopped"); } @@ -248,7 +250,7 @@ protected CompletableFuture> sendImpl(Message messag isSeqNoProvided = message.getSeqNo() != null; } - EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message); + EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message, sendSettings); return tryToEnqueue(enqueuedMessage, instant).thenApply(v -> enqueuedMessage.getFuture()); } @@ -325,7 +327,7 @@ private WriteSessionImpl() { public void startAndInitialize() { logger.debug("[{}] Session {} startAndInitialize called", fullId, sessionId); - start(this::processMessage).whenComplete(this::onSessionClosing); + start(this::processMessage).whenComplete(this::closeDueToError); YdbTopic.StreamWriteMessage.InitRequest.Builder initRequestBuilder = YdbTopic.StreamWriteMessage.InitRequest .newBuilder() @@ -501,10 +503,9 @@ private void processWriteAck(EnqueuedMessage message, message.getFuture().complete(resultAck); } - private void onSessionClosing(Status status, Throwable th) { - logger.info("[{}] Session {} onSessionClosing called", fullId, sessionId); - if (isWorking.get()) { - shutdown(); + private void closeDueToError(Status status, Throwable th) { + logger.info("[{}] Session {} closeDueToError called", fullId, sessionId); + if (shutdown()) { // Signal writer to retry onSessionClosed(status, th); } diff --git a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java index 02a841422..2d4dec3b7 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java @@ -4,7 +4,7 @@ import org.junit.Assert; import org.junit.Test; -import tech.ydb.topic.read.OffsetsRange; +import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.impl.DisjointOffsetRangeSet; import tech.ydb.topic.read.impl.OffsetsRangeImpl;