From cdcd273952a70d305c68fdd6d683d6b7f5a0df53 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 14 Nov 2024 10:53:07 +0000 Subject: [PATCH] Disable useStreamResultSets for scan queries --- .../ydb/jdbc/context/BaseYdbExecutor.java | 25 +++++++++++++++++++ .../ydb/jdbc/context/StreamQueryResult.java | 9 +++++++ .../jdbc/impl/YdbLazyResultSetImplTest.java | 5 ++-- .../jdbc/impl/YdbTableConnectionImplTest.java | 10 ++++---- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java index eda2eab..3d58753 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java @@ -2,16 +2,20 @@ import java.sql.SQLException; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; +import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; import tech.ydb.jdbc.impl.YdbQueryResult; +import tech.ydb.jdbc.impl.YdbStaticResultSet; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.table.Session; @@ -19,6 +23,7 @@ import tech.ydb.table.TableClient; import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; import tech.ydb.table.settings.ExecuteScanQuerySettings; import tech.ydb.table.settings.ExecuteSchemeQuerySettings; import tech.ydb.table.values.ListValue; @@ -31,6 +36,7 @@ public abstract class BaseYdbExecutor implements YdbExecutor { private final SessionRetryContext retryCtx; private final Duration sessionTimeout; private final TableClient tableClient; + private final boolean useStreamResultSet; private final AtomicReference currResult; protected final boolean traceEnabled; @@ -40,6 +46,7 @@ public BaseYdbExecutor(YdbContext ctx) { this.retryCtx = ctx.getRetryCtx(); this.traceEnabled = ctx.isTxTracerEnabled(); this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); + this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets(); this.tableClient = ctx.getTableClient(); this.prefixPragma = ctx.getPrefixPragma(); this.currResult = new AtomicReference<>(); @@ -138,6 +145,24 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S tracer.query(yql); final Session session = createNewTableSession(validator); + + if (!useStreamResultSet) { + try { + Collection resultSets = new LinkedBlockingQueue<>(); + + ctx.traceQuery(query, yql); + validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql, tracer, + () -> session.executeScanQuery(yql, params, settings).start(resultSets::add) + ); + + YdbResultSet rs = new YdbStaticResultSet(statement, ProtoValueReaders.forResultSets(resultSets)); + return updateCurrentResult(new StaticQueryResult(query, Collections.singletonList(rs))); + } finally { + session.close(); + tracer.close(); + } + } + StreamQueryResult lazy = validator.call(msg, () -> { final CompletableFuture> future = new CompletableFuture<>(); final GrpcReadStream stream = session.executeScanQuery(yql, params, settings); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java index d7167d3..edb9da8 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java @@ -167,6 +167,15 @@ public void close() throws SQLException { return; } + for (CompletableFuture> future: resultFutures) { + if (future.isDone()) { + Result res = future.join(); + if (res.isSuccess()) { + res.getValue().close(); + } + } + } + LOGGER.log(Level.FINE, "Stream closed with status {0}", status); if (!status.isSuccess()) { throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status, diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbLazyResultSetImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbLazyResultSetImplTest.java index b330f6b..caee5e6 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbLazyResultSetImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbLazyResultSetImplTest.java @@ -64,7 +64,8 @@ public class YdbLazyResultSetImplTest { private static final YdbHelperExtension ydb = new YdbHelperExtension(); @RegisterExtension - private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb); + private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb) + .withArg("useStreamResultSets", "true"); private static final SqlQueries TEST_TABLE = new SqlQueries("ydb_result_set_test"); @@ -96,7 +97,7 @@ public static void dropTable() throws SQLException { @BeforeEach public void beforeEach() throws SQLException { statement = jdbc.connection().createStatement(); - resultSet = statement.executeQuery(TEST_TABLE.scanSelectSQL()); + resultSet = statement.executeQuery(TEST_TABLE.selectSQL()); } @AfterEach diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbTableConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbTableConnectionImplTest.java index 370788e..11492c6 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbTableConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbTableConnectionImplTest.java @@ -883,9 +883,9 @@ public void testBigBulkAndScan() throws SQLException { Random rnd = new Random(0x234567); int payloadLength = 1000; - try { + try (Connection conn = jdbc.createCustomConnection("useStreamResultSets", "true")) { // BULK UPSERT - try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) { + try (PreparedStatement ps = conn.prepareStatement(bulkUpsert)) { for (int idx = 1; idx <= 10000; idx++) { ps.setInt(1, idx); String payload = createPayload(rnd, payloadLength); @@ -899,7 +899,7 @@ public void testBigBulkAndScan() throws SQLException { } // SCAN all table - try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + try (PreparedStatement ps = conn.prepareStatement(scanSelectAll)) { int readed = 0; Assertions.assertTrue(ps.execute()); try (ResultSet rs = ps.getResultSet()) { @@ -913,7 +913,7 @@ public void testBigBulkAndScan() throws SQLException { } // Canceled scan - try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + try (PreparedStatement ps = conn.prepareStatement(scanSelectAll)) { Assertions.assertTrue(ps.execute()); ps.getResultSet().next(); ps.getResultSet().close(); @@ -931,7 +931,7 @@ public void testBigBulkAndScan() throws SQLException { } // Scan was cancelled, but connection still work - try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) { + try (PreparedStatement ps = conn.prepareStatement(selectOne)) { ps.setInt(1, 1234); Assertions.assertTrue(ps.execute());