Skip to content

Commit

Permalink
Disable useStreamResultSets for scan queries
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Nov 14, 2024
1 parent e3e0dbe commit cdcd273
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
25 changes: 25 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.impl.YdbStaticResultSet;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.result.impl.ProtoValueReaders;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
import tech.ydb.table.values.ListValue;
Expand All @@ -31,6 +36,7 @@ public abstract class BaseYdbExecutor implements YdbExecutor {
private final SessionRetryContext retryCtx;
private final Duration sessionTimeout;
private final TableClient tableClient;
private final boolean useStreamResultSet;

private final AtomicReference<YdbQueryResult> currResult;
protected final boolean traceEnabled;
Expand All @@ -40,6 +46,7 @@ public BaseYdbExecutor(YdbContext ctx) {
this.retryCtx = ctx.getRetryCtx();
this.traceEnabled = ctx.isTxTracerEnabled();
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
this.tableClient = ctx.getTableClient();
this.prefixPragma = ctx.getPrefixPragma();
this.currResult = new AtomicReference<>();
Expand Down Expand Up @@ -138,6 +145,24 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
tracer.query(yql);

final Session session = createNewTableSession(validator);

if (!useStreamResultSet) {
try {
Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();

ctx.traceQuery(query, yql);
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer,
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add)
);

YdbResultSet rs = new YdbStaticResultSet(statement, ProtoValueReaders.forResultSets(resultSets));
return updateCurrentResult(new StaticQueryResult(query, Collections.singletonList(rs)));
} finally {
session.close();
tracer.close();
}
}

StreamQueryResult lazy = validator.call(msg, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ public void close() throws SQLException {
return;
}

for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
if (future.isDone()) {
Result<LazyResultSet> res = future.join();
if (res.isSuccess()) {
res.getValue().close();
}
}
}

LOGGER.log(Level.FINE, "Stream closed with status {0}", status);
if (!status.isSuccess()) {
throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class YdbLazyResultSetImplTest {
private static final YdbHelperExtension ydb = new YdbHelperExtension();

@RegisterExtension
private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb);
private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb)
.withArg("useStreamResultSets", "true");

private static final SqlQueries TEST_TABLE = new SqlQueries("ydb_result_set_test");

Expand Down Expand Up @@ -96,7 +97,7 @@ public static void dropTable() throws SQLException {
@BeforeEach
public void beforeEach() throws SQLException {
statement = jdbc.connection().createStatement();
resultSet = statement.executeQuery(TEST_TABLE.scanSelectSQL());
resultSet = statement.executeQuery(TEST_TABLE.selectSQL());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,9 +883,9 @@ public void testBigBulkAndScan() throws SQLException {
Random rnd = new Random(0x234567);
int payloadLength = 1000;

try {
try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) {
// BULK UPSERT
try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) {
try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) {
for (int idx = 1; idx <= 10000; idx++) {
ps.setInt(1, idx);
String payload = createPayload(rnd, payloadLength);
Expand All @@ -899,7 +899,7 @@ public void testBigBulkAndScan() throws SQLException {
}

// SCAN all table
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
try (PreparedStatement ps = conn.prepareStatement(scanSelectAll)) {
int readed = 0;
Assertions.assertTrue(ps.execute());
try (ResultSet rs = ps.getResultSet()) {
Expand All @@ -913,7 +913,7 @@ public void testBigBulkAndScan() throws SQLException {
}

// Canceled scan
try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) {
try (PreparedStatement ps = conn.prepareStatement(scanSelectAll)) {
Assertions.assertTrue(ps.execute());
ps.getResultSet().next();
ps.getResultSet().close();
Expand All @@ -931,7 +931,7 @@ public void testBigBulkAndScan() throws SQLException {
}

// Scan was cancelled, but connection still work
try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) {
try (PreparedStatement ps = conn.prepareStatement(selectOne)) {
ps.setInt(1, 1234);

Assertions.assertTrue(ps.execute());
Expand Down

0 comments on commit cdcd273

Please sign in to comment.