Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic transactions #243

Merged
merged 30 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
275f983
Move OffsetsRange to common topic part, remove duplication
pnv1 Jan 23, 2024
ad90537
Refactor topic stream session closing
pnv1 Jan 23, 2024
e372839
Update ydb-proto-api version
pnv1 Jan 26, 2024
da36ec1
Update current transaction state
pnv1 Jan 26, 2024
213515e
Remove unused member partitionOffsets
pnv1 Jan 30, 2024
54d5621
Add first version of updateOffsetsInTransaction method
pnv1 Jan 30, 2024
fe3f3bf
Add transaction message accumulation support
pnv1 Feb 1, 2024
ed14ee8
Move basic transaction interface to new common module
pnv1 Feb 12, 2024
2bb4faf
Tune logging
pnv1 Feb 13, 2024
644949b
Add transaction support for sync reader
pnv1 Feb 13, 2024
f99306f
Add transaction support for topic writes
pnv1 Feb 13, 2024
c71cea3
Fix wrong links in comments and import spacing
pnv1 Feb 20, 2024
b5bd6c5
Add Transaction object support for DataQueryResult, mark string Ids a…
pnv1 Feb 20, 2024
023d317
Add missing ownership
pnv1 Mar 4, 2024
273b6dc
Switch setting transaction method in SyncReader
pnv1 Mar 4, 2024
cd8c1ea
Wait for WriteAcks before committing transaction and shutdown writer …
pnv1 Mar 11, 2024
da011e6
Use transaction status future instead of onRollbackAction callback
pnv1 Mar 18, 2024
fc7379a
Use SendSettings for writing in transaction
pnv1 Mar 19, 2024
a8b6a98
Use default interfaces for writer
pnv1 Mar 19, 2024
527eb76
Use ReceiveSettings to set timeouts too
pnv1 Mar 19, 2024
12b984b
Do not wait on transaction commit
pnv1 Mar 21, 2024
020ed11
Remove false statement from java doc
pnv1 Mar 21, 2024
c5d6f0a
Update pom.xml
pnv1 Mar 21, 2024
aa19dca
Change logging level for message commit
pnv1 Mar 21, 2024
abb6aca
Use txControl with Transaction object
pnv1 Mar 21, 2024
f201a3b
Merge branch 'develop' into topic_transactions
pnv1 Mar 21, 2024
6d203d1
New TableTransaction, inherit QueryTransaction from BaseTransaction
pnv1 Mar 22, 2024
f41954f
Reuse transaction status future after every commit or rollback
pnv1 Mar 23, 2024
8f5f1e6
Control transaction active status in query tests
pnv1 Mar 24, 2024
0fd33a2
Rename BaseTransaction to YdbTransaction
pnv1 Mar 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<version>${ydb-auth-api.version}</version>
</dependency>

<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-core</artifactId>
Expand Down
37 changes: 37 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>ydb-sdk-common</artifactId>
<name>Common module of Java SDK for YDB</name>
<description>Common module of Java SDK for YDB</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package tech.ydb.query;
package tech.ydb.common.transaction;

/**
*
* @author Aleksandr Gorshenin
*/
public enum QueryTx {
public enum TxMode {
NONE,

SERIALIZABLE_RW,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package tech.ydb.common.transaction;

import java.util.concurrent.CompletableFuture;

import tech.ydb.core.Status;

/**
* A base interface for all YDB transactions from different services
* @author Nikolay Perfilov
*/
public interface YdbTransaction {

/**
* Returns identifier of the transaction or null if the transaction is not active = (not
* started/committed/rolled back). When {@link YdbTransaction} is not active - any query on this object
* starts a new transaction on server. When transaction is active any call of {@code commit},
* {@code rollback} or execution of any query with {@code commitAtEnd}=true finishes this transaction
*
* @return identifier of the transaction or null if the transaction is not active
*/
String getId();

/**
* Returns {@link TxMode} with mode of the transaction
*
* @return the transaction mode
*/
TxMode getTxMode();

default boolean isActive() {
return getId() != null;
}

String getSessionId();

CompletableFuture<Status> getStatusFuture();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package tech.ydb.common.transaction.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Status;

/**
* @author Nikolay Perfilov
*/
public abstract class YdbTransactionImpl implements YdbTransaction {
protected final TxMode txMode;
protected final AtomicReference<String> txId;
protected final AtomicReference<CompletableFuture<Status>> statusFuture = new AtomicReference<>(
new CompletableFuture<>());

protected YdbTransactionImpl(TxMode txMode, String txId) {
this.txMode = txMode;
this.txId = new AtomicReference<>(txId);
}

@Override
public String getId() {
return txId.get();
}

@Override
public TxMode getTxMode() {
return txMode;
}

@Override
public CompletableFuture<Status> getStatusFuture() {
return statusFuture.get();
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<modules>
<module>bom</module>
<module>common</module>
<module>core</module>
<module>table</module>
<module>scheme</module>
Expand Down
19 changes: 10 additions & 9 deletions query/src/main/java/tech/ydb/query/QuerySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.CompletableFuture;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.query.settings.BeginTransactionSettings;
import tech.ydb.query.settings.ExecuteQuerySettings;
Expand Down Expand Up @@ -40,7 +41,7 @@ public interface QuerySession extends AutoCloseable {
* @param txMode transaction mode
* @return new implicit transaction
*/
QueryTransaction createNewTransaction(QueryTx txMode);
QueryTransaction createNewTransaction(TxMode txMode);

/**
* Create and start a new <i>active</i> {@link QueryTransaction}. This method creates a transaction on the server
Expand All @@ -50,10 +51,10 @@ public interface QuerySession extends AutoCloseable {
* @param settings additional settings for request
* @return future with result of the transaction starting
*/
CompletableFuture<Result<QueryTransaction>> beginTransaction(QueryTx txMode, BeginTransactionSettings settings);
CompletableFuture<Result<QueryTransaction>> beginTransaction(TxMode txMode, BeginTransactionSettings settings);

/**
* Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and
* Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and
* DCL statements. Supported mix of different statement types depends on the chosen transaction type.
*
* @param query text of query
Expand All @@ -62,33 +63,33 @@ public interface QuerySession extends AutoCloseable {
* @param settings additional settings of query execution
* @return a ready to execute instance of {@link QueryStream}
*/
QueryStream createQuery(String query, QueryTx tx, Params params, ExecuteQuerySettings settings);
QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings);

@Override
void close();

/**
* Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and
* Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and
* DCL statements. Supported mix of different statement types depends on the chosen transaction type.
*
* @param query text of query
* @param tx transaction mode
* @param params query parameters
* @return a ready to execute instance of {@link QueryStream}
*/
default QueryStream createQuery(String query, QueryTx tx, Params params) {
default QueryStream createQuery(String query, TxMode tx, Params params) {
return createQuery(query, tx, params, ExecuteQuerySettings.newBuilder().build());
}

/**
* Create {@link QueryStream} for executing query with specified {@link QueryTx}. The query can contain DML, DDL and
* Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and
* DCL statements. Supported mix of different statement types depends on the chosen transaction type.
*
* @param query text of query
* @param tx transaction mode
* @return a ready to execute instance of {@link QueryStream}
*/
default QueryStream createQuery(String query, QueryTx tx) {
default QueryStream createQuery(String query, TxMode tx) {
return createQuery(query, tx, Params.empty(), ExecuteQuerySettings.newBuilder().build());
}

Expand All @@ -99,7 +100,7 @@ default QueryStream createQuery(String query, QueryTx tx) {
* @param txMode transaction mode
* @return future with result of the transaction starting
*/
default CompletableFuture<Result<QueryTransaction>> beginTransaction(QueryTx txMode) {
default CompletableFuture<Result<QueryTransaction>> beginTransaction(TxMode txMode) {
return beginTransaction(txMode, BeginTransactionSettings.newBuilder().build());
}
}
26 changes: 3 additions & 23 deletions query/src/main/java/tech/ydb/query/QueryTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.CompletableFuture;

import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.query.result.QueryInfo;
Expand All @@ -11,36 +12,15 @@
import tech.ydb.table.query.Params;

/**
* Interface of transaction from query service
* Short-living object allows transactional execution of several queries in one interactive transaction.
* QueryTransaction can be used in implicit mode - without calling commit()/rollback(). When QueryTransaction is not
* active - any execution of query with commitAtEnd=false starts a new transaction. And execution of query with
* commitAtEnd=true commits this transaction.
*
* @author Aleksandr Gorshenin
*/
public interface QueryTransaction {

/**
* Returns identifier of the transaction or null if the transaction is not active = (not
* started/committed/rolled back). When {@link QueryTransaction} is not active - any execution of the query created
* by {@code createQuery} starts a new transaction. When QueryTransaction is active - any call of {@code commit},
* {@code rollback} or execution of the query created by {@code createQuery} with {@code commitAtEnd}=true finishes
* the transaction
*
* @return identifier of the transaction or null if the transaction is not active
*/
String getId();

/**
* Returns {@link QueryTx} with mode of the transaction
*
* @return the transaction mode
*/
QueryTx getQueryTx();

default boolean isActive() {
return getId() != null;
}
public interface QueryTransaction extends YdbTransaction {

/**
* Returns {@link QuerySession} that was used for creating the transaction
Expand Down
Loading
Loading