Skip to content

Commit

Permalink
Support of BulkUpserts (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 authored Sep 16, 2024
2 parents ae07138 + 94880cc commit c698ab4
Show file tree
Hide file tree
Showing 23 changed files with 580 additions and 175 deletions.
13 changes: 13 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.values.ListValue;

public interface YdbConnection extends Connection {
/**
Expand All @@ -34,6 +35,18 @@ public interface YdbConnection extends Connection {
*/
void executeSchemeQuery(String yql, YdbValidator validator) throws SQLException;

/**
* Explicitly execute bulk upsert to the table
*
* @param yql description of request
* @param tablePath path to table
* @param validator handler for logging and warnings
* @param rows bulk rows
* @throws SQLException if query cannot be executed
*/
void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
throws SQLException;

/**
* Explicitly execute query as a data query
*
Expand Down
7 changes: 5 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ public final class YdbConst {
public static final String PARAMETER_NOT_FOUND = "Parameter not found: ";
public static final String PARAMETER_TYPE_UNKNOWN = "Unable to convert sqlType %s to YDB type for parameter: %s";
public static final String INVALID_ROW = "Current row index is out of bounds: ";
public static final String BATCH_UNSUPPORTED = "Batches are not supported in simple prepared statements";
public static final String BATCH_INVALID = "Batches are not supported for query type: ";
public static final String BULKS_UNSUPPORTED = "BULK mode is available only for prepared statement with one UPSERT";
public static final String INVALID_BATCH_COLUMN = "Cannot prepared batch request: cannot find a column";
public static final String BULKS_DESCRIBE_ERROR = "Cannot parse BULK upsert: ";
public static final String METADATA_RS_UNSUPPORTED_IN_PS = "ResultSet metadata is not supported " +
"in prepared statements";
public static final String CANNOT_UNWRAP_TO = "Cannot unwrap to ";
Expand All @@ -92,6 +93,8 @@ public final class YdbConst {
+ "transaction. This behavior may be changed by property scanQueryTxMode";
public static final String SCHEME_QUERY_INSIDE_TRANSACTION = "Scheme query cannot be executed inside active "
+ "transaction. This behavior may be changed by property schemeQueryTxMode";
public static final String BULK_QUERY_INSIDE_TRANSACTION = "Bulk upsert query cannot be executed inside active "
+ "transaction. This behavior may be changed by property bulkUpsertQueryTxMode";

// Cast errors

Expand Down
10 changes: 10 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/common/MappingSetters.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ private static PrimitiveValue castAsUuid(PrimitiveType type, Object x) throws SQ
private static byte castAsByte(PrimitiveType type, Object x) throws SQLException {
if (x instanceof Byte) {
return (Byte) x;
} else if (x instanceof Short) {
return ((Short) x).byteValue();
} else if (x instanceof Integer) {
return ((Integer) x).byteValue();
} else if (x instanceof Long) {
return ((Long) x).byteValue();
} else if (x instanceof Boolean) {
return (byte) (((Boolean) x) ? 1 : 0);
}
Expand All @@ -252,6 +258,10 @@ private static short castAsShort(PrimitiveType type, Object x) throws SQLExcepti
return (Short) x;
} else if (x instanceof Byte) {
return (Byte) x;
} else if (x instanceof Integer) {
return ((Integer) x).shortValue();
} else if (x instanceof Long) {
return ((Long) x).shortValue();
} else if (x instanceof Boolean) {
return (short) (((Boolean) x) ? 1 : 0);
}
Expand Down
50 changes: 24 additions & 26 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,45 @@
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;

import tech.ydb.core.Result;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.result.impl.ProtoValueReaders;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
import tech.ydb.table.values.ListValue;

/**
*
* @author Aleksandr Gorshenin
*/
public abstract class BaseYdbExecutor implements YdbExecutor {
private final Duration sessionTimeout;
private final TableClient tableClient;
private final SessionRetryContext retryCtx;

public BaseYdbExecutor(YdbContext ctx) {
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.tableClient = ctx.getTableClient();
}

protected Session createNewTableSession(YdbValidator validator) throws SQLException {
try {
Result<Session> session = tableClient.createSession(sessionTimeout).join();
validator.addStatusIssues(session.getStatus());
return session.getValue();
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
}
this.retryCtx = ctx.getRetryCtx();
}

@Override
public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException {
ensureOpened();

// Scheme query does not affect transactions or result sets
ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
try (Session session = createNewTableSession(validator)) {
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql, () -> session.executeSchemeQuery(yql, settings));
}
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql,
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
);
}

@Override
public void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
throws SQLException {
ensureOpened();
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
);
}

@Override
Expand All @@ -63,10 +59,12 @@ public ResultSetReader executeScanQuery(
.build();

ctx.traceQuery(query, yql);
try (Session session = createNewTableSession(validator)) {
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add));
}
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
() -> retryCtx.supplyStatus(session -> {
resultSets.clear();
return session.executeScanQuery(yql, params, settings).start(resultSets::add);
})
);

return ProtoValueReaders.forResultSets(resultSets);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class SchemeExecutor {

public SchemeExecutor(YdbContext ctx) {
this.schemeClient = ctx.getSchemeClient();
this.retryCtx = SessionRetryContext.create(ctx.getTableClient()).build();
this.retryCtx = ctx.getRetryCtx();
}

public CompletableFuture<Result<ListDirectoryResult>> listDirectory(String path) {
Expand Down
18 changes: 18 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
import java.util.ArrayList;
import java.util.List;

import tech.ydb.core.Result;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.query.ExplainedQuery;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
import tech.ydb.table.query.Params;
Expand All @@ -28,10 +32,14 @@
* @author Aleksandr Gorshenin
*/
public class TableServiceExecutor extends BaseYdbExecutor {
private final Duration sessionTimeout;
private final TableClient tableClient;
private volatile TxState tx;

public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
super(ctx);
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.tableClient = ctx.getTableClient();
this.tx = createTx(transactionLevel, autoCommit);
}

Expand All @@ -47,6 +55,16 @@ private void updateState(TxState newTx) {
this.tx = newTx;
}

protected Session createNewTableSession(YdbValidator validator) throws SQLException {
try {
Result<Session> session = tableClient.createSession(sessionTimeout).join();
validator.addStatusIssues(session.getStatus());
return session.getValue();
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
}
}

@Override
public void setTransactionLevel(int level) throws SQLException {
updateState(tx.withTransactionLevel(level));
Expand Down
30 changes: 25 additions & 5 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.ydb.jdbc.query.YdbPreparedQuery;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.query.params.BatchedQuery;
import tech.ydb.jdbc.query.params.BulkUpsertQuery;
import tech.ydb.jdbc.query.params.InMemoryQuery;
import tech.ydb.jdbc.query.params.PreparedQuery;
import tech.ydb.jdbc.settings.YdbClientProperties;
Expand Down Expand Up @@ -144,6 +145,10 @@ public QueryClient getQueryClient() {
return queryClient;
}

public SessionRetryContext getRetryCtx() {
return retryCtx;
}

public String getUrl() {
return config.getUrl();
}
Expand Down Expand Up @@ -346,16 +351,23 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
}
}

if (query.getType() == QueryType.EXPLAIN_QUERY || query.getType() == QueryType.SCHEME_QUERY) {
QueryType type = query.getType();

if (type == QueryType.BULK_QUERY) {
if (query.getYqlBatcher() == null || query.getYqlBatcher().isInsert()) {
throw new SQLException(YdbConst.BULKS_UNSUPPORTED);
}
}

if (type == QueryType.EXPLAIN_QUERY || type == QueryType.SCHEME_QUERY) {
return new InMemoryQuery(query, queryOptions.isDeclareJdbcParameters());
}

if (query.getYqlBatcher() != null && mode == YdbPrepareMode.AUTO) {
if (query.getYqlBatcher() != null && (mode == YdbPrepareMode.AUTO || type == QueryType.BULK_QUERY)) {
String tableName = query.getYqlBatcher().getTableName();
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;
Map<String, Type> types = queryParamsCache.getIfPresent(query.getOriginQuery());
if (types == null) {
String tableName = query.getYqlBatcher().getTableName();
String tablePath = tableName.startsWith("/") ? tableName : getDatabase() + "/" + tableName;

DescribeTableSettings settings = withDefaultTimeout(new DescribeTableSettings());
Result<TableDescription> result = retryCtx.supplyResult(
session -> session.describeTable(tablePath, settings)
Expand All @@ -366,8 +378,16 @@ public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode)
types = descrtiption.getColumns().stream()
.collect(Collectors.toMap(TableColumn::getName, TableColumn::getType));
queryParamsCache.put(query.getOriginQuery(), types);
} else {
if (type == QueryType.BULK_QUERY) {
throw new SQLException(YdbConst.BULKS_DESCRIBE_ERROR + result.getStatus());
}
}
}
if (type == QueryType.BULK_QUERY) {
return BulkUpsertQuery.build(tablePath, query.getYqlBatcher().getColumns(), types);
}

if (types != null) {
BatchedQuery params = BatchedQuery.createAutoBatched(query.getYqlBatcher(), types);
if (params != null) {
Expand Down
4 changes: 4 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.values.ListValue;

/**
*
Expand Down Expand Up @@ -35,6 +36,9 @@ default void ensureOpened() throws SQLException {

void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException;

void executeBulkUpsert(YdbContext ctx, YdbValidator validator, String yql, String tablePath, ListValue rows)
throws SQLException;

List<ResultSetReader> executeDataQuery(YdbContext ctx, YdbValidator validator, YdbQuery query, String yql,
long timeout, boolean poolable, Params params) throws SQLException;

Expand Down
13 changes: 13 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.values.ListValue;

/**
*
Expand Down Expand Up @@ -167,6 +168,18 @@ protected boolean updateState(List<YdbResult> results) {
return state.hasResultSets();
}

protected List<YdbResult> executeBulkUpsert(YdbQuery query, String yql, String tablePath, ListValue rows)
throws SQLException {
connection.executeBulkUpsertQuery(yql, tablePath, validator, rows);

int expressionsCount = query.getStatements().isEmpty() ? 1 : query.getStatements().size();
List<YdbResult> results = new ArrayList<>();
for (int i = 0; i < expressionsCount; i++) {
results.add(HAS_UPDATED);
}
return results;
}

protected List<YdbResult> executeSchemeQuery(YdbQuery query) throws SQLException {
connection.executeSchemeQuery(query.getPreparedYql(), validator);

Expand Down
26 changes: 26 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.values.ListValue;

public class YdbConnectionImpl implements YdbConnection {
private static final Logger LOGGER = Logger.getLogger(YdbConnectionImpl.class.getName());
Expand All @@ -46,13 +47,15 @@ public class YdbConnectionImpl implements YdbConnection {
private final YdbExecutor executor;
private final FakeTxMode scanQueryTxMode;
private final FakeTxMode schemeQueryTxMode;
private final FakeTxMode bulkQueryTxMode;

public YdbConnectionImpl(YdbContext context) throws SQLException {
this.ctx = context;

YdbOperationProperties props = ctx.getOperationProperties();
this.scanQueryTxMode = props.getScanQueryTxMode();
this.schemeQueryTxMode = props.getSchemeQueryTxMode();
this.bulkQueryTxMode = props.getBulkQueryTxMode();

this.validator = new YdbValidator(LOGGER);
this.executor = ctx.createExecutor();
Expand Down Expand Up @@ -206,6 +209,7 @@ public void executeSchemeQuery(String yql, YdbValidator validator) throws SQLExc
@Override
public List<ResultSetReader> executeDataQuery(YdbQuery query, String yql, YdbValidator validator,
int timeout, boolean poolable, Params params) throws SQLException {
executor.ensureOpened();
return executor.executeDataQuery(ctx, validator, query, yql, timeout, poolable, params);
}

Expand All @@ -230,8 +234,30 @@ public ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator
return executor.executeScanQuery(ctx, validator, query, yql, params);
}

@Override
public void executeBulkUpsertQuery(String yql, String tablePath, YdbValidator validator, ListValue rows)
throws SQLException {
executor.ensureOpened();

if (executor.isInsideTransaction()) {
switch (bulkQueryTxMode) {
case FAKE_TX:
break;
case SHADOW_COMMIT:
commit();
break;
case ERROR:
default:
throw new SQLException(YdbConst.BULK_QUERY_INSIDE_TRANSACTION);
}
}

executor.executeBulkUpsert(ctx, validator, yql, tablePath, rows);
}

@Override
public ExplainedQuery executeExplainQuery(String yql, YdbValidator validator) throws SQLException {
executor.ensureOpened();
return executor.executeExplainQuery(ctx, validator, yql);
}

Expand Down
Loading

0 comments on commit c698ab4

Please sign in to comment.